view src/alice/datasegment/DataSegmentKey.java @ 198:f151dea22b2c working

add flip api
author sugi
date Tue, 19 Mar 2013 01:25:09 +0900
parents b4ca7f75e6b2
children 15b68b65f8a4
line wrap: on
line source

package alice.datasegment;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicInteger;

import alice.datasegment.Command; 

/**
 * Synchronized DataSegment for each DataSegment key
 * @author kazz
 *
 */
public class DataSegmentKey {
	
	private ArrayList<DataSegmentValue> dataList = new ArrayList<DataSegmentValue>();
	private ArrayList<Command> waitList = new ArrayList<Command>();
	private AtomicInteger tailIndex = new AtomicInteger(1);
	
	public int getIndex(){
		return tailIndex.getAndIncrement();
	}
	
	public ArrayList<DataSegmentValue> getDataList(){
		return dataList;
	}
	
	public void runCommand(Command cmd) {
		switch (cmd.type) {
		case UPDATE:
			if (dataList.size() != 0) {
				dataList.remove(0);
			}
		case PUT:
			int index = tailIndex.getAndIncrement();
			DataSegmentValue dsv = new DataSegmentValue(index, cmd.val, cmd.obj,cmd.reverseKey); 
			dataList.add(dsv);
			// Process waiting peek and take commands
			for (Iterator<Command> iter = waitList.iterator(); iter.hasNext(); ) {
				Command waitCmd = iter.next();
				if (waitCmd.index < index) {
					try {
						//waitCmd.replyQueue.put(new Command(CommandType.REPLY, null, null, cmd.val, cmd.obj, index, waitCmd.seq, null, null, cmd.reverseKey));
						waitCmd.replyQueue.put(new Command(CommandType.REPLY, waitCmd.seq, dsv));
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
					iter.remove();
					if (waitCmd.type == CommandType.TAKE) { // someone is waiting for this put or update command
						dataList.remove(dsv);
						break;
					}
				}
			}
			break;
		case PEEK:
			if (cmd.index >= tailIndex.get()) {
				waitList.add(cmd);
				break;
			}
			boolean waitFlag2 = true;
			for (DataSegmentValue data : dataList) {
				if (data.index > cmd.index) {
					try {
						//cmd.replyQueue.put(new Command(CommandType.REPLY, null, null, data.val, data.obj, data.index, cmd.seq, null, null, data.from));
						cmd.replyQueue.put(new Command(CommandType.REPLY, cmd.seq, data));
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
					waitFlag2 = false;
					break;
				}
			}
			if (waitFlag2)
				waitList.add(cmd);
			break;
		case TAKE:
			if (cmd.index >= tailIndex.get()) {
				waitList.add(cmd);
				break;
			}
			boolean waitFlag = true;
			for (Iterator<DataSegmentValue> iter = dataList.iterator(); iter.hasNext(); ) {
				DataSegmentValue data = iter.next();
				if (data.index > cmd.index) {
					try {
						//cmd.replyQueue.put(new Command(CommandType.REPLY, null, null, data.val, data.obj, data.index, cmd.seq, null, null, data.from));
						cmd.replyQueue.put(new Command(CommandType.REPLY, cmd.seq, data));
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
					iter.remove();
					waitFlag = false;
					break;
				}
			}
			if (waitFlag)
				waitList.add(cmd);
			break;
		case FLIP:
			index = cmd.dsv.index;
			// need to check waitList
			for (Iterator<Command> iter = waitList.iterator(); iter.hasNext(); ) {
				Command waitCmd = iter.next();
				if (waitCmd.index < index) {
					try {
						//waitCmd.replyQueue.put(new Command(CommandType.REPLY, null, null, cmd.val, cmd.obj, index, waitCmd.seq, null, null, cmd.reverseKey));
						waitCmd.replyQueue.put(new Command(CommandType.REPLY, waitCmd.seq, cmd.dsv));
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
					iter.remove();
					if (waitCmd.type == CommandType.TAKE) { // someone is waiting for this put or update command
						dataList.remove(cmd.dsv);
						break;
					}
				}
			}
			break;
		case REMOVE:
			// TODO: implements later
			break;
		default:
		}

	}
	
}