Mercurial > hg > Database > Christie
diff src/main/java/christie/topology/manager/keepalive/TaskExecuter.java @ 49:fd944876257b
add node and keepalive
author | akahori |
---|---|
date | Thu, 23 Aug 2018 09:29:05 +0900 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/main/java/christie/topology/manager/keepalive/TaskExecuter.java Thu Aug 23 09:29:05 2018 +0900 @@ -0,0 +1,144 @@ +package christie.topology.manager.keepalive; + +import christie.annotation.Take; +import christie.codegear.CodeGear; +import christie.codegear.CodeGearManager; + +public class TaskExecuter extends CodeGear { + + @Take + ListManager _SCHEDULER; + private TaskInfo nowTask; + private boolean sleepFlag = false; + private boolean skipFlag = false; + private long startTime = 0; + private long remainingTime = 0; + private static TaskExecuter instance = new TaskExecuter(); + + private TaskExecuter() { + } + + public static TaskExecuter getInstance() { + return instance; + } + + public void taskExecutorSetKey() { + //ids.init(); + } + + + @Override + protected synchronized void run(CodeGearManager cgm) { + if (_SCHEDULER.getTaskList().size() == 0) { + TaskInfo task = new TaskInfo(TaskType.CREATE); + task.setSleepTime(3000); + _SCHEDULER.addTask(task); + getLocalDGM().put("_SCHEDULER", _SCHEDULER); + remainingTime = 0; + return; + } + + nowTask = _SCHEDULER.getTaskList().poll(); + if (nowTask.getType() != TaskType.PING) + getLocalDGM().put("_SCHEDULER", _SCHEDULER); + if (skipFlag) { + skipFlag = false; + nowTask.setSleepTime(remainingTime + nowTask.getSleepTime()); + remainingTime = 0; + } + startTime = System.currentTimeMillis(); + if (nowTask.getSleepTime() != 0) { + sleepFlag = true; + try { + wait(nowTask.getSleepTime()); + } catch (InterruptedException e) { + e.printStackTrace(); + } + sleepFlag = false; + } + + if (!skipFlag) { + // ping or close + if (nowTask.getType() == TaskType.PING) { + //ods.ping(nowTask.getManagerKey(), nowTask.getReturnKey()); + TaskInfo task = new TaskInfo(TaskType.CLOSE); + task.setInfo(nowTask.getManagerKey(), 10 * 1000); + _SCHEDULER.addTask(task); + getLocalDGM().put("_SCHEDULER", _SCHEDULER); + cgm.setup(new RespondPing()); + } else if (nowTask.getType() == TaskType.CLOSE) { + // TODO: shotdown処理は後で追加する. + /* // no response from the Remote DataSegment. So close this connection. + if (DataSegment.contains(nowTask.getManagerKey())) { + DataSegment.get(nowTask.getManagerKey()).shutdown(); + System.out.println(nowTask.getManagerKey() +" IS SHOTDOWN"); + } else { + System.out.println(nowTask.getManagerKey() +" IS ALREADY SHOTDOWN"); + }*/ + + } else if (nowTask.getType() == TaskType.CREATE) { + cgm.setup(new CreateTask()); + } + } + nowTask = null; + startTime = 0; + + + // taskExecutorSetKey(); + } + + public synchronized void skip() { + if (sleepFlag) { + skipFlag = true; + if (startTime == 0) { + remainingTime = nowTask.getSleepTime(); + } else { + remainingTime = nowTask.getSleepTime() - (System.currentTimeMillis() - startTime); + } + nowTask = null; + notify(); + } + } + + public synchronized void ignore() { + if (sleepFlag) { + skipFlag = true; + remainingTime = 0; + nowTask = null; + notify(); + } + } + + public synchronized TaskInfo getNowTask() { + return nowTask; + } + + // only use in ListManagerTest + public synchronized void setNowTask(TaskInfo info) { + nowTask = info; + } + + public synchronized long getRemainingTime() { + if (sleepFlag) { + if (startTime != 0) { + return nowTask.getSleepTime() - (System.currentTimeMillis() - startTime); + } else { + return nowTask.getSleepTime(); + } + } else { + return remainingTime; + } + } + + public synchronized boolean compareAndSkip(TaskInfo task) { + if (nowTask != null) { + if (nowTask.getType().equals(task.getType()) + && nowTask.getManagerKey().equals(task.getManagerKey())) { + skip(); + return true; + } + } + return false; + } + +} \ No newline at end of file