Mercurial > hg > Members > tatsuki > Alice
view src/alice/datasegment/DataSegmentKey.java @ 28:98ab26e09a98
Configuration Manager work and implements reverseKey
author | kazz <kazz@cr.ie.u-ryukyu.ac.jp> |
---|---|
date | Tue, 17 Jan 2012 08:41:34 +0900 |
parents | 0bb03861b5cd |
children | 73158dc54c59 |
line wrap: on
line source
package alice.datasegment; import java.util.ArrayList; import java.util.Iterator; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicInteger; import alice.datasegment.Command; public class DataSegmentKey { private String key; private LinkedBlockingQueue<Command> cmdQueue = new LinkedBlockingQueue<Command>(); private ArrayList<DataSegmentValue> dataList = new ArrayList<DataSegmentValue>(); private ArrayList<Command> waitList = new ArrayList<Command>(); private AtomicInteger tailIndex = new AtomicInteger(1); private Thread keyThread; public DataSegmentKey(String key) { this.key = key; } public void addCommand(Command cmd) { cmdQueue.add(cmd); } public void runKeyThread() { this.keyThread = new Thread() { @Override public void run() { while (true) { try { Command cmd = cmdQueue.take(); switch (cmd.type) { case UPDATE: if (dataList.size() != 0) { dataList.remove(0); } case PUT: int index = tailIndex.getAndIncrement(); DataSegmentValue dsv = new DataSegmentValue(index, cmd.val, cmd.reverseKey); dataList.add(dsv); // run waiting peek and take boolean takeFlag = true; for (Iterator<Command> iter = waitList.iterator(); iter.hasNext() && takeFlag; ) { Command waitCmd = iter.next(); if (waitCmd.index < index) { waitCmd.replyQueue.put(new Command(CommandType.REPLY, null, null, cmd.val, index, waitCmd.seq, null, null, cmd.reverseKey)); iter.remove(); if (waitCmd.type == CommandType.TAKE) { // delete data, if it run take cmd. dataList.remove(dsv); break; } } } break; case PEEK: if (cmd.index >= tailIndex.get()) { waitList.add(cmd); break; } for (DataSegmentValue data : dataList) { if (data.index > cmd.index) { cmd.replyQueue.put(new Command(CommandType.REPLY, null, null, data.val, data.index, cmd.seq, null, null, data.from)); break; } } waitList.add(cmd); break; case TAKE: if (cmd.index >= tailIndex.get()) { waitList.add(cmd); break; } boolean waitFlag = true; for (Iterator<DataSegmentValue> iter = dataList.iterator(); iter.hasNext(); ) { DataSegmentValue data = iter.next(); if (data.index > cmd.index) { cmd.replyQueue.put(new Command(CommandType.REPLY, null, null, data.val, data.index, cmd.seq, null, null, data.from)); iter.remove(); waitFlag = false; break; } } if (waitFlag) waitList.add(cmd); break; case REMOVE: // TODO: implements later break; default: } } catch (InterruptedException e) { e.printStackTrace(); } } } }; keyThread.setName("DataSegmentKey-" + key); keyThread.start(); } }