Mercurial > hg > Database > Alice
diff src/main/java/alice/datasegment/DataSegmentKey.java @ 345:8f71c3e6f11d
Change directory structure Maven standard
author | sugi |
---|---|
date | Wed, 16 Apr 2014 18:26:07 +0900 |
parents | |
children | aefbe41fcf12 |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/main/java/alice/datasegment/DataSegmentKey.java Wed Apr 16 18:26:07 2014 +0900 @@ -0,0 +1,103 @@ +package alice.datasegment; + +import java.util.ArrayList; +import java.util.Iterator; + +import alice.datasegment.Command; + +/** + * Synchronized DataSegment for each DataSegment key + * @author kazz + * + */ +public class DataSegmentKey { + + private ArrayList<DataSegmentValue> dataList = new ArrayList<DataSegmentValue>(); + private ArrayList<Command> waitList = new ArrayList<Command>(); + private int tailIndex = 1; + + public synchronized void runCommand(Command cmd) { + switch (cmd.type) { + case UPDATE: + if (dataList.size() != 0) { + dataList.remove(0); + } + case PUT: + int index = tailIndex; + tailIndex++; + DataSegmentValue dsv = new DataSegmentValue(index, cmd.val, cmd.obj, cmd.reverseKey); + dataList.add(dsv); + // Process waiting peek and take commands + for (Iterator<Command> iter = waitList.iterator(); iter.hasNext(); ) { + Command waitCmd = iter.next(); + if (waitCmd.index < index) { + replyValue(waitCmd ,dsv); + iter.remove(); + if (waitCmd.type == CommandType.TAKE) { // someone is waiting for this put or update command + dataList.remove(dsv); + break; + } + } + } + break; + case PEEK: + if (cmd.index >= tailIndex) { + waitList.add(cmd); + break; + } + boolean waitFlag2 = true; + for (DataSegmentValue data : dataList) { + if (data.index > cmd.index) { + replyValue(cmd ,data); + waitFlag2 = false; + break; + } + } + if (waitFlag2) + waitList.add(cmd); + break; + case TAKE: + if (cmd.index >= tailIndex) { + waitList.add(cmd); + break; + } + boolean waitFlag = true; + for (Iterator<DataSegmentValue> iter = dataList.iterator(); iter.hasNext(); ) { + DataSegmentValue data = iter.next(); + if (data.index > cmd.index) { + replyValue(cmd ,data); + iter.remove(); + waitFlag = false; + break; + } + } + if (waitFlag) + waitList.add(cmd); + break; + case REMOVE: + // TODO: implements later + break; + default: + } + + } + + public void replyValue(Command cmd, DataSegmentValue data){ + if (cmd.cs!=null){ // if cmd has cs-instance, it means Command from local. + cmd.cs.ids.reply(cmd.receiver, new Command(CommandType.REPLY, null, null, data.val, data.obj, data.index, cmd.seq, null, null, data.from)); + } else { + try { + if (!cmd.flag){ + cmd.connection.sendQueue.put(new Command(CommandType.REPLY, null, null, data.val, data.obj, data.index, cmd.seq, null, null, data.from)); + } + else { + cmd.connection.write(new Command(CommandType.REPLY, null, null, data.val, data.obj, data.index, cmd.seq, null, null, data.from)); + } + + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + +}