view src/alice/datasegment/DataSegmentKey.java @ 69:1d4f2b72fb31

delete KeyThread
author kazz <kazz@cr.ie.u-ryukyu.ac.jp>
date Tue, 21 Feb 2012 19:44:33 +0900
parents eb1714f41caf
children 4bfd81352cfa 8d3cb7e5fa57
line wrap: on
line source

package alice.datasegment;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicInteger;

import alice.datasegment.Command; 

/**
 * Synchronized DataSegment for each DataSegment key
 * @author kazz
 *
 */
public class DataSegmentKey {
	
	private ArrayList<DataSegmentValue> dataList = new ArrayList<DataSegmentValue>();
	private ArrayList<Command> waitList = new ArrayList<Command>();
	private AtomicInteger tailIndex = new AtomicInteger(1);
	
	public void runCommand(Command cmd) {
		switch (cmd.type) {
		case UPDATE:
			if (dataList.size() != 0) {
				dataList.remove(0);
			}
		case PUT:
			int index = tailIndex.getAndIncrement();
			DataSegmentValue dsv = new DataSegmentValue(index, cmd.val, cmd.reverseKey); 
			dataList.add(dsv);
			// Process waiting peek and take commands
			for (Iterator<Command> iter = waitList.iterator(); iter.hasNext(); ) {
				Command waitCmd = iter.next();
				if (waitCmd.index < index) {
					try {
						waitCmd.replyQueue.put(new Command(CommandType.REPLY, null, null, cmd.val, index, waitCmd.seq, null, null, cmd.reverseKey));
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
					iter.remove();
					if (waitCmd.type == CommandType.TAKE) { // someone is waiting for this put or update command
						dataList.remove(dsv);
						break;
					}
				}
			}
			break;
		case PEEK:
			if (cmd.index >= tailIndex.get()) {
				waitList.add(cmd);
				break;
			}
			boolean waitFlag2 = true;
			for (DataSegmentValue data : dataList) {
				if (data.index > cmd.index) {
					try {
						cmd.replyQueue.put(new Command(CommandType.REPLY, null, null, data.val, data.index, cmd.seq, null, null, data.from));
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
					waitFlag2 = false;
					break;
				}
			}
			if (waitFlag2)
				waitList.add(cmd);
			break;
		case TAKE:
			if (cmd.index >= tailIndex.get()) {
				waitList.add(cmd);
				break;
			}
			boolean waitFlag = true;
			for (Iterator<DataSegmentValue> iter = dataList.iterator(); iter.hasNext(); ) {
				DataSegmentValue data = iter.next();
				if (data.index > cmd.index) {
					try {
						cmd.replyQueue.put(new Command(CommandType.REPLY, null, null, data.val, data.index, cmd.seq, null, null, data.from));
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
					iter.remove();
					waitFlag = false;
					break;
				}
			}
			if (waitFlag)
				waitList.add(cmd);
			break;
		case REMOVE:
			// TODO: implements later
			break;
		default:
		}

	}
	
}