Mercurial > hg > Database > Alice
view src/alice/datasegment/DataSegmentKey.java @ 198:f151dea22b2c working
add flip api
author | sugi |
---|---|
date | Tue, 19 Mar 2013 01:25:09 +0900 |
parents | b4ca7f75e6b2 |
children | 15b68b65f8a4 |
line wrap: on
line source
package alice.datasegment; import java.util.ArrayList; import java.util.Iterator; import java.util.concurrent.atomic.AtomicInteger; import alice.datasegment.Command; /** * Synchronized DataSegment for each DataSegment key * @author kazz * */ public class DataSegmentKey { private ArrayList<DataSegmentValue> dataList = new ArrayList<DataSegmentValue>(); private ArrayList<Command> waitList = new ArrayList<Command>(); private AtomicInteger tailIndex = new AtomicInteger(1); public int getIndex(){ return tailIndex.getAndIncrement(); } public ArrayList<DataSegmentValue> getDataList(){ return dataList; } public void runCommand(Command cmd) { switch (cmd.type) { case UPDATE: if (dataList.size() != 0) { dataList.remove(0); } case PUT: int index = tailIndex.getAndIncrement(); DataSegmentValue dsv = new DataSegmentValue(index, cmd.val, cmd.obj,cmd.reverseKey); dataList.add(dsv); // Process waiting peek and take commands for (Iterator<Command> iter = waitList.iterator(); iter.hasNext(); ) { Command waitCmd = iter.next(); if (waitCmd.index < index) { try { //waitCmd.replyQueue.put(new Command(CommandType.REPLY, null, null, cmd.val, cmd.obj, index, waitCmd.seq, null, null, cmd.reverseKey)); waitCmd.replyQueue.put(new Command(CommandType.REPLY, waitCmd.seq, dsv)); } catch (InterruptedException e) { e.printStackTrace(); } iter.remove(); if (waitCmd.type == CommandType.TAKE) { // someone is waiting for this put or update command dataList.remove(dsv); break; } } } break; case PEEK: if (cmd.index >= tailIndex.get()) { waitList.add(cmd); break; } boolean waitFlag2 = true; for (DataSegmentValue data : dataList) { if (data.index > cmd.index) { try { //cmd.replyQueue.put(new Command(CommandType.REPLY, null, null, data.val, data.obj, data.index, cmd.seq, null, null, data.from)); cmd.replyQueue.put(new Command(CommandType.REPLY, cmd.seq, data)); } catch (InterruptedException e) { e.printStackTrace(); } waitFlag2 = false; break; } } if (waitFlag2) waitList.add(cmd); break; case TAKE: if (cmd.index >= tailIndex.get()) { waitList.add(cmd); break; } boolean waitFlag = true; for (Iterator<DataSegmentValue> iter = dataList.iterator(); iter.hasNext(); ) { DataSegmentValue data = iter.next(); if (data.index > cmd.index) { try { //cmd.replyQueue.put(new Command(CommandType.REPLY, null, null, data.val, data.obj, data.index, cmd.seq, null, null, data.from)); cmd.replyQueue.put(new Command(CommandType.REPLY, cmd.seq, data)); } catch (InterruptedException e) { e.printStackTrace(); } iter.remove(); waitFlag = false; break; } } if (waitFlag) waitList.add(cmd); break; case FLIP: index = cmd.dsv.index; // need to check waitList for (Iterator<Command> iter = waitList.iterator(); iter.hasNext(); ) { Command waitCmd = iter.next(); if (waitCmd.index < index) { try { //waitCmd.replyQueue.put(new Command(CommandType.REPLY, null, null, cmd.val, cmd.obj, index, waitCmd.seq, null, null, cmd.reverseKey)); waitCmd.replyQueue.put(new Command(CommandType.REPLY, waitCmd.seq, cmd.dsv)); } catch (InterruptedException e) { e.printStackTrace(); } iter.remove(); if (waitCmd.type == CommandType.TAKE) { // someone is waiting for this put or update command dataList.remove(cmd.dsv); break; } } } break; case REMOVE: // TODO: implements later break; default: } } }