view src/alice/datasegment/DataSegmentKey.java @ 28:98ab26e09a98

Configuration Manager work and implements reverseKey
author kazz <kazz@cr.ie.u-ryukyu.ac.jp>
date Tue, 17 Jan 2012 08:41:34 +0900
parents 0bb03861b5cd
children 73158dc54c59
line wrap: on
line source

package alice.datasegment;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;

import alice.datasegment.Command; 

public class DataSegmentKey {
	
	private String key;
	private LinkedBlockingQueue<Command> cmdQueue = new LinkedBlockingQueue<Command>();
	private ArrayList<DataSegmentValue> dataList = new ArrayList<DataSegmentValue>();
	private ArrayList<Command> waitList = new ArrayList<Command>();
	private AtomicInteger tailIndex = new AtomicInteger(1);
	private Thread keyThread;
	
	public DataSegmentKey(String key) {
		this.key = key;
	}
	
	public void addCommand(Command cmd) {
		cmdQueue.add(cmd);
	}
	
	public void runKeyThread() {
		this.keyThread = new Thread() {
			@Override
			public void run() {
				while (true) {
					try {
						Command cmd = cmdQueue.take();
						switch (cmd.type) {
						case UPDATE:
							if (dataList.size() != 0) {
								dataList.remove(0);
							}
						case PUT:
							int index = tailIndex.getAndIncrement();
							DataSegmentValue dsv = new DataSegmentValue(index, cmd.val, cmd.reverseKey); 
							dataList.add(dsv);
							// run waiting peek and take
							boolean takeFlag = true;
							for (Iterator<Command> iter = waitList.iterator(); iter.hasNext() && takeFlag; ) {
								Command waitCmd = iter.next();
								if (waitCmd.index < index) {
									waitCmd.replyQueue.put(new Command(CommandType.REPLY, null, null, cmd.val, index, waitCmd.seq, null, null, cmd.reverseKey));
									iter.remove();
									if (waitCmd.type == CommandType.TAKE) { // delete data, if it run take cmd.
										dataList.remove(dsv);
										break;
									}
								}
							}
							break;
						case PEEK:
							if (cmd.index >= tailIndex.get()) {
								waitList.add(cmd);
								break;
							}
							for (DataSegmentValue data : dataList) {
								if (data.index > cmd.index) {
									cmd.replyQueue.put(new Command(CommandType.REPLY, null, null, data.val, data.index, cmd.seq, null, null, data.from));
									break;
								}
							}
							waitList.add(cmd);
							break;
						case TAKE:
							if (cmd.index >= tailIndex.get()) {
								waitList.add(cmd);
								break;
							}
							boolean waitFlag = true;
							for (Iterator<DataSegmentValue> iter = dataList.iterator(); iter.hasNext(); ) {
								DataSegmentValue data = iter.next();
								if (data.index > cmd.index) {
									cmd.replyQueue.put(new Command(CommandType.REPLY, null, null, data.val, data.index, cmd.seq, null, null, data.from));
									iter.remove();
									waitFlag = false;
									break;
								}
							}
							if (waitFlag)
								waitList.add(cmd);
							break;
						case REMOVE:
							// TODO: implements later
							break;
						default:
						}
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
				}
			}
		};
		keyThread.setName("DataSegmentKey-" + key);
		keyThread.start();
	}
	
}