Mercurial > hg > Database > Alice
view src/alice/datasegment/LocalDataSegmentManager.java @ 12:c4d6ff56b9bf
unite Command and Reply
and add Network outline
author | one |
---|---|
date | Fri, 13 Jan 2012 07:04:38 +0900 |
parents | 78b415d019de |
children | 30f97d776a3e |
line wrap: on
line source
package alice.datasegment; import java.util.concurrent.atomic.AtomicInteger; import org.msgpack.type.Value; import alice.codesegment.CodeSegment; import alice.datasegment.CommandType; public class LocalDataSegmentManager extends DataSegmentManager { private AtomicInteger seq = new AtomicInteger(1); private Runnable replyThread = new Runnable() { @Override public void run() { while (true) { try { Command reply = replyQueue.take(); Command cmd = seqHash.get(reply.seq); cmd.cs.ids.reply(cmd.argKey, new DataSegmentValue(reply.index, reply.val)); } catch (InterruptedException e) { e.printStackTrace(); } } } }; public LocalDataSegmentManager() { new Thread(replyThread).start(); } private DataSegmentKey getDataSegmentKey(String key) { DataSegmentKey newDataSegmentKey = new DataSegmentKey(); DataSegmentKey dataSegmentKey = dataSegments.putIfAbsent(key, newDataSegmentKey); if (dataSegmentKey == null) { newDataSegmentKey.runKeyThread(); dataSegmentKey = newDataSegmentKey; } return dataSegmentKey; } @Override public void put(String key, Value val) { DataSegmentKey dataSegmentKey = getDataSegmentKey(key); dataSegmentKey.addCommand(new Command(CommandType.PUT, null, val, 0, 0, this, null)); } @Override public void update(String key, Value val) { DataSegmentKey dataSegmentKey = getDataSegmentKey(key); dataSegmentKey.addCommand(new Command(CommandType.UPDATE, null, val, 0, 0, this, null)); } @Override public void take(String argKey, String key, int index, CodeSegment cs) { DataSegmentKey dataSegmentKey = getDataSegmentKey(key); int seq = this.seq.getAndIncrement(); Command cmd = new Command(CommandType.TAKE, argKey, null, index, seq, this, cs); seqHash.put(seq, cmd); dataSegmentKey.addCommand(cmd); } @Override public void peek(String argKey, String key, int index, CodeSegment cs) { DataSegmentKey dataSegmentKey = getDataSegmentKey(key); int seq = this.seq.getAndIncrement(); Command cmd = new Command(CommandType.PEEK, argKey, null, index, seq, this, cs); seqHash.put(seq, cmd); dataSegmentKey.addCommand(cmd); } @Override public void remove(String key) { DataSegmentKey dataSegmentKey = getDataSegmentKey(key); dataSegmentKey.addCommand(new Command(CommandType.REMOVE, null, null, 0, 0, this, null)); } }