Mercurial > hg > Members > tatsuki > Alice
diff src/alice/datasegment/DataSegmentKey.java @ 6:c78a1cc2cd8f
implements Reply
author | one |
---|---|
date | Thu, 12 Jan 2012 13:19:04 +0900 |
parents | 80375ae09a1f |
children | 352eb19d837d |
line wrap: on
line diff
--- a/src/alice/datasegment/DataSegmentKey.java Wed Jan 11 23:28:02 2012 +0900 +++ b/src/alice/datasegment/DataSegmentKey.java Thu Jan 12 13:19:04 2012 +0900 @@ -4,6 +4,7 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicInteger; +import alice.codesegment.Reply; import alice.datasegment.Command; public class DataSegmentKey { @@ -36,12 +37,16 @@ } case PUT: int index = tailIndex.getAndIncrement(); - dataList.add(new DataSegmentValue(index, cmd.val)); + DataSegmentValue dsv = new DataSegmentValue(index, cmd.val); + dataList.add(dsv); // run waiting peek and take for (Command waitCmd : waitList) { if (waitCmd.index < index) { - // TODO: make and send reply msg - + waitCmd.manager.replyQueue.put(new Reply(waitCmd.seq, index, cmd.val)); + if (waitCmd.cmdType == CommandType.TAKE) { // delete data, if it run take cmd. + dataList.remove(dsv); + break; + } } } break; @@ -52,30 +57,25 @@ } for (DataSegmentValue data : dataList) { if (data.index > cmd.index) { - // TODO: make and send reply msg - + cmd.manager.replyQueue.put(new Reply(cmd.seq, data.index, data.val)); break; } } + waitList.add(cmd); break; case TAKE: if (cmd.index >= tailIndex.get()) { waitList.add(cmd); break; } - boolean waitFlag = true; for (DataSegmentValue data : dataList) { if (data.index > cmd.index) { - // TODO: make and send reply msg - - + cmd.manager.replyQueue.put(new Reply(cmd.seq, data.index, data.val)); dataList.remove(data); - waitFlag = false; break; } } - if (waitFlag) - waitList.add(cmd); + waitList.add(cmd); break; case REMOVE: // TODO: implements later @@ -88,7 +88,7 @@ } } }; - keyThread.run(); + new Thread(keyThread).start(); }; }