view src/alice/topology/manager/keeparive/TaskExecuter.java @ 330:d35ff0f588e8

keep alive may work... but bug exist.
author sugi
date Mon, 31 Mar 2014 22:28:52 +0900
parents 35b4e24e3e02
children 8266d7cfba7e
line wrap: on
line source

package alice.topology.manager.keeparive;

import alice.codesegment.CodeSegment;
import alice.datasegment.CommandType;
import alice.datasegment.DataSegment;
import alice.datasegment.Receiver;

public class TaskExecuter extends CodeSegment {
	private Receiver info = ids.create(CommandType.TAKE);
	private TaskInfo nowTask;
	private boolean skipFlag = false;
	private long startTime = 0;
	private long remainingTime = 0; 
	private static TaskExecuter instance = new TaskExecuter();

	private TaskExecuter() {}
	public static TaskExecuter getInstance() {
		return instance;
	}

	public void setKey() {
		ids.init();
		info.setKey("_WAITINGLIST");
	}

	@Override
	public synchronized void run()  {
		ListManager list = info.asClass(ListManager.class);
		if (list.getTaskList().size() == 0){
			if (remainingTime !=0){
				TaskInfo info = new TaskInfo(TaskType.SKIP);
				info.setSleepTime(remainingTime);
				remainingTime = 0;
				list.addTask(info);
			}
			ods.update("_WAITINGLIST", list);
		} else {
			setNowTask(list.getTaskList().poll());
			ods.update("_WAITINGLIST", list);
			
			if (skipFlag) {
				skipFlag = false;
				nowTask.setSleepTime(remainingTime + nowTask.getSleepTime());
				remainingTime = 0;
			}
			
			startTime = System.currentTimeMillis();
			System.out.println(nowTask.getSleepTime()+" "+nowTask.getType());
			if (nowTask.getSleepTime() != 0){
				try {
					wait(nowTask.getSleepTime());
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
			
			
			if (!skipFlag){
				execTask();
			}
			setNowTask(null);
			startTime = 0;
		}
		setKey();
	}

	private synchronized void execTask(){
		// ping or close
		System.out.println("aaaa");
		if (nowTask.getType() == TaskType.PING) {
			System.out.println("bbb");
			ods.ping(nowTask.getManagerKey(), nowTask.getReturnKey());
			TaskInfo task = new TaskInfo(TaskType.CLOSE);
			task.setInfo(nowTask.getManagerKey(), 10 * 1000);
			ods.put("_TASKINFO", task);
			new RespondPing(nowTask.getReturnKey());
		} else if (nowTask.getType() == TaskType.CLOSE) {
			// no response from the Remote DataSegment. So close this connection.
			//DataSegment.get(nowTask.getManagerKey()).close();
			System.out.println("CLOSE");
			
			nowTask.show();
			System.exit(0);
		}
	}
	
	public synchronized void skip() {
		skipFlag = true;
		if (startTime == 0){
			remainingTime = nowTask.getSleepTime();
		} else {
			remainingTime = nowTask.getSleepTime() - (System.currentTimeMillis() - startTime);
		}
		System.out.println(remainingTime);
		nowTask = null;
		notify();
		System.out.println(nowTask);
	}

	public synchronized void ignore() {
		skipFlag = true;
		remainingTime = 0;
		nowTask = null;
		notify();
	}

	public synchronized TaskInfo getNowTask() {
		return nowTask;
	}

	public synchronized void setNowTask(TaskInfo info) {
		nowTask = info;
	}

	public long getRemainingTime() {
		if (nowTask != null) {
			if (startTime !=0) {
				return nowTask.getSleepTime() - (System.currentTimeMillis() - startTime);
			} else {
				return nowTask.getSleepTime();
			}
		} else {
			return remainingTime;
		}
	}

	public boolean compareNowTask(TaskInfo task) {
		if (nowTask != null){
			if (nowTask.getType().equals(task.getType())
					&& nowTask.getManagerKey().equals(task.getManagerKey())){
				return true;
			} 
		}
		return false;
	}

}