Mercurial > hg > Database > Christie
view src/main/java/christie/topology/manager/keepalive/TaskExecuter.java @ 168:c7300be0fff6
fix incomingHosts end message
author | akahori |
---|---|
date | Tue, 22 Jan 2019 16:00:29 +0900 |
parents | fd944876257b |
children |
line wrap: on
line source
package christie.topology.manager.keepalive; import christie.annotation.Take; import christie.codegear.CodeGear; import christie.codegear.CodeGearManager; public class TaskExecuter extends CodeGear { @Take ListManager _SCHEDULER; private TaskInfo nowTask; private boolean sleepFlag = false; private boolean skipFlag = false; private long startTime = 0; private long remainingTime = 0; private static TaskExecuter instance = new TaskExecuter(); private TaskExecuter() { } public static TaskExecuter getInstance() { return instance; } public void taskExecutorSetKey() { //ids.init(); } @Override protected synchronized void run(CodeGearManager cgm) { if (_SCHEDULER.getTaskList().size() == 0) { TaskInfo task = new TaskInfo(TaskType.CREATE); task.setSleepTime(3000); _SCHEDULER.addTask(task); getLocalDGM().put("_SCHEDULER", _SCHEDULER); remainingTime = 0; return; } nowTask = _SCHEDULER.getTaskList().poll(); if (nowTask.getType() != TaskType.PING) getLocalDGM().put("_SCHEDULER", _SCHEDULER); if (skipFlag) { skipFlag = false; nowTask.setSleepTime(remainingTime + nowTask.getSleepTime()); remainingTime = 0; } startTime = System.currentTimeMillis(); if (nowTask.getSleepTime() != 0) { sleepFlag = true; try { wait(nowTask.getSleepTime()); } catch (InterruptedException e) { e.printStackTrace(); } sleepFlag = false; } if (!skipFlag) { // ping or close if (nowTask.getType() == TaskType.PING) { //ods.ping(nowTask.getManagerKey(), nowTask.getReturnKey()); TaskInfo task = new TaskInfo(TaskType.CLOSE); task.setInfo(nowTask.getManagerKey(), 10 * 1000); _SCHEDULER.addTask(task); getLocalDGM().put("_SCHEDULER", _SCHEDULER); cgm.setup(new RespondPing()); } else if (nowTask.getType() == TaskType.CLOSE) { // TODO: shotdown処理は後で追加する. /* // no response from the Remote DataSegment. So close this connection. if (DataSegment.contains(nowTask.getManagerKey())) { DataSegment.get(nowTask.getManagerKey()).shutdown(); System.out.println(nowTask.getManagerKey() +" IS SHOTDOWN"); } else { System.out.println(nowTask.getManagerKey() +" IS ALREADY SHOTDOWN"); }*/ } else if (nowTask.getType() == TaskType.CREATE) { cgm.setup(new CreateTask()); } } nowTask = null; startTime = 0; // taskExecutorSetKey(); } public synchronized void skip() { if (sleepFlag) { skipFlag = true; if (startTime == 0) { remainingTime = nowTask.getSleepTime(); } else { remainingTime = nowTask.getSleepTime() - (System.currentTimeMillis() - startTime); } nowTask = null; notify(); } } public synchronized void ignore() { if (sleepFlag) { skipFlag = true; remainingTime = 0; nowTask = null; notify(); } } public synchronized TaskInfo getNowTask() { return nowTask; } // only use in ListManagerTest public synchronized void setNowTask(TaskInfo info) { nowTask = info; } public synchronized long getRemainingTime() { if (sleepFlag) { if (startTime != 0) { return nowTask.getSleepTime() - (System.currentTimeMillis() - startTime); } else { return nowTask.getSleepTime(); } } else { return remainingTime; } } public synchronized boolean compareAndSkip(TaskInfo task) { if (nowTask != null) { if (nowTask.getType().equals(task.getType()) && nowTask.getManagerKey().equals(task.getManagerKey())) { skip(); return true; } } return false; } }