view src/alice/datasegment/DataSegmentKey.java @ 5:80375ae09a1f

add update api
author one
date Wed, 11 Jan 2012 23:28:02 +0900
parents 91057e15065f
children c78a1cc2cd8f
line wrap: on
line source

package alice.datasegment;

import java.util.ArrayList;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;

import alice.datasegment.Command; 

public class DataSegmentKey {
	
	private LinkedBlockingQueue<Command> cmdQueue = new LinkedBlockingQueue<Command>();
	private ArrayList<DataSegmentValue> dataList = new ArrayList<DataSegmentValue>();
	private ArrayList<Command> waitList = new ArrayList<Command>();
	private AtomicInteger tailIndex = new AtomicInteger(1);
	private Runnable keyThread;
	
	public DataSegmentKey() {
		
	}
	
	public void addCommand(Command cmd) {
		cmdQueue.add(cmd);
	}
	
	public void runKeyThread() {
		keyThread = new Runnable() {
			@Override
			public void run() {
				while (true) {
					try {
						Command cmd = cmdQueue.take();
						switch (cmd.cmdType) {
						case UPDATE:
							if (dataList.size() != 0) {
								dataList.remove(0);
							}
						case PUT:
							int index = tailIndex.getAndIncrement();
							dataList.add(new DataSegmentValue(index, cmd.val));
							// run waiting peek and take
							for (Command waitCmd : waitList) {
								if (waitCmd.index < index) {
									// TODO: make and send reply msg
									
								}
							}
							break;
						case PEEK:
							if (cmd.index >= tailIndex.get()) {
								waitList.add(cmd);
								break;
							}
							for (DataSegmentValue data : dataList) {
								if (data.index > cmd.index) {
									// TODO: make and send reply msg
									
									break;
								}
							}
							break;
						case TAKE:
							if (cmd.index >= tailIndex.get()) {
								waitList.add(cmd);
								break;
							}
							boolean waitFlag = true;
							for (DataSegmentValue data : dataList) {
								if (data.index > cmd.index) {
									// TODO: make and send reply msg
									
									
									dataList.remove(data);
									waitFlag = false;
									break;
								}
							}
							if (waitFlag)
								waitList.add(cmd);
							break;
						case REMOVE:
							// TODO: implements later
							break;
						default:
						}
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
				}
			}
		};
		keyThread.run();
	};
	
}