Mercurial > hg > Members > tatsuki > Alice
view src/alice/datasegment/DataSegmentKey.java @ 10:5f7cce38b25c
bug fix for wait command
author | one |
---|---|
date | Thu, 12 Jan 2012 19:51:11 +0900 |
parents | 352eb19d837d |
children | c4d6ff56b9bf |
line wrap: on
line source
package alice.datasegment; import java.util.ArrayList; import java.util.LinkedList; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicInteger; import alice.datasegment.Command; public class DataSegmentKey { private LinkedBlockingQueue<Command> cmdQueue = new LinkedBlockingQueue<Command>(); private ArrayList<DataSegmentValue> dataList = new ArrayList<DataSegmentValue>(); private ArrayList<Command> waitList = new ArrayList<Command>(); private AtomicInteger tailIndex = new AtomicInteger(1); private Runnable keyThread; public DataSegmentKey() { } public void addCommand(Command cmd) { cmdQueue.add(cmd); } public void runKeyThread() { keyThread = new Runnable() { @Override public void run() { while (true) { try { Command cmd = cmdQueue.take(); switch (cmd.cmdType) { case UPDATE: if (dataList.size() != 0) { dataList.remove(0); } case PUT: int index = tailIndex.getAndIncrement(); DataSegmentValue dsv = new DataSegmentValue(index, cmd.val); dataList.add(dsv); // run waiting peek and take LinkedList<Command> removeList = new LinkedList<Command>(); for (Command waitCmd : waitList) { if (waitCmd.index < index) { waitCmd.manager.replyQueue.put(new Reply(waitCmd.seq, index, cmd.val)); removeList.add(waitCmd); if (waitCmd.cmdType == CommandType.TAKE) { // delete data, if it run take cmd. dataList.remove(dsv); break; } } } for (Command rmCmd : removeList) { waitList.remove(rmCmd); } break; case PEEK: if (cmd.index >= tailIndex.get()) { waitList.add(cmd); break; } for (DataSegmentValue data : dataList) { if (data.index > cmd.index) { cmd.manager.replyQueue.put(new Reply(cmd.seq, data.index, data.val)); break; } } waitList.add(cmd); break; case TAKE: if (cmd.index >= tailIndex.get()) { waitList.add(cmd); break; } for (DataSegmentValue data : dataList) { if (data.index > cmd.index) { cmd.manager.replyQueue.put(new Reply(cmd.seq, data.index, data.val)); dataList.remove(data); break; } } waitList.add(cmd); break; case REMOVE: // TODO: implements later break; default: } } catch (InterruptedException e) { e.printStackTrace(); } } } }; new Thread(keyThread).start(); }; }