Mercurial > hg > Members > tatsuki > Alice
changeset 264:b4690114a0cd
refactor API
author | sugi |
---|---|
date | Tue, 13 Aug 2013 06:02:55 +0900 |
parents | 106cc1cfa595 |
children | 41312b857f34 |
files | src/alice/codesegment/InputDataSegment.java src/alice/codesegment/OutputDataSegment.java src/alice/datasegment/DataSegmentManager.java src/alice/datasegment/LocalDataSegmentManager.java src/alice/datasegment/RemoteDataSegmentManager.java |
diffstat | 5 files changed, 99 insertions(+), 141 deletions(-) [+] |
line wrap: on
line diff
--- a/src/alice/codesegment/InputDataSegment.java Tue Aug 06 02:14:40 2013 +0900 +++ b/src/alice/codesegment/InputDataSegment.java Tue Aug 13 06:02:55 2013 +0900 @@ -26,7 +26,7 @@ public void quickPeek(Receiver receiver) { cs.list.add(receiver); if (receiver.managerKey==null){ - DataSegment.getLocal().quickPeek(receiver, cs); + DataSegment.getLocal().peek(receiver, cs); } else { DataSegment.get(receiver.managerKey).quickPeek(receiver ,cs); }
--- a/src/alice/codesegment/OutputDataSegment.java Tue Aug 06 02:14:40 2013 +0900 +++ b/src/alice/codesegment/OutputDataSegment.java Tue Aug 13 06:02:55 2013 +0900 @@ -1,11 +1,5 @@ package alice.codesegment; -import java.io.IOException; - - -import org.msgpack.type.Value; -import org.msgpack.type.ValueFactory; - import alice.datasegment.DataSegment; import alice.datasegment.Receiver; @@ -17,70 +11,31 @@ public void flip(Receiver receiver) { receiver.managerKey=null; - DataSegment.getLocal().putObject(receiver.key, receiver.getObj()); - } - - public void put(String key, String val) { - DataSegment.getLocal().putObject(key, val); + DataSegment.getLocal().put(receiver.key, receiver.getObj()); } public void put(String key, byte[] val) { - DataSegment.getLocal().putObject(key, val); - } - - public void put(String key, int val) { - DataSegment.getLocal().putObject(key, val); + DataSegment.getLocal().put(key, val); } public <T> void put(String key, T val) { - DataSegment.getLocal().putObject(key, val); - } - - public void update(String key, String val) { - DataSegment.getLocal().updateObject(key, val); + DataSegment.getLocal().put(key, val); } public void update(String key, byte[] val) { - DataSegment.getLocal().updateObject(key, val); - } - - public void update(String key, int val) { - DataSegment.getLocal().updateObject(key, val); + DataSegment.getLocal().update(key, val); } public <T> void update(String key, T val) { - DataSegment.getLocal().updateObject(key, val); + DataSegment.getLocal().update(key, val); } - /** * for remote */ - - public void put(String managerKey, String key, Value val) { - DataSegment.get(managerKey).put(key, val); - } - - public void put(String managerKey, String key, String val) { - if (!managerKey.equals("local")){ - DataSegment.get(managerKey).put(key, ValueFactory.createRawValue(val)); - } else { - put(key, val); - } - - } - public void put(String managerKey, String key, byte[] val) { if (!managerKey.equals("local")){ - DataSegment.get(managerKey).put(key, ValueFactory.createRawValue(val, true)); - } else { - put(key, val); - } - } - - public void put(String managerKey, String key, int val) { - if (!managerKey.equals("local")){ - DataSegment.get(managerKey).put(key, ValueFactory.createIntegerValue(val)); + DataSegment.get(managerKey).put(key,val); } else { put(key, val); } @@ -88,39 +43,15 @@ public <T> void put(String managerKey, String key, T val) { if (!managerKey.equals("local")){ - try { - DataSegment.get(managerKey).put(key, SingletonMessage.getInstance().unconvert(val)); - } catch (IOException e) { - e.printStackTrace(); - } + DataSegment.get(managerKey).put(key, val); } else { put(key, val); } } - public void update(String managerKey, String key, Value val) { - DataSegment.get(managerKey).update(key, val); - } - - public void update(String managerKey, String key, String val) { - if (!managerKey.equals("local")){ - DataSegment.get(managerKey).update(key, ValueFactory.createRawValue(val)); - } else { - update(key, val); - } - } - public void update(String managerKey, String key, byte[] val) { if (!managerKey.equals("local")){ - DataSegment.get(managerKey).update(key, ValueFactory.createRawValue(val, true)); - } else { - update(key, val); - } - } - - public void update(String managerKey, String key, int val) { - if (!managerKey.equals("local")){ - DataSegment.get(managerKey).update(key, ValueFactory.createIntegerValue(val)); + DataSegment.get(managerKey).update(key, val); } else { update(key, val); } @@ -128,11 +59,7 @@ public <T> void update(String managerKey, String key, T val) { if (!managerKey.equals("local")){ - try { - DataSegment.get(managerKey).update(key, SingletonMessage.getInstance().unconvert(val)); - } catch (IOException e) { - e.printStackTrace(); - } + DataSegment.get(managerKey).update(key, val); } else { update(key, val); } @@ -157,9 +84,5 @@ public void close(String managerKey) { DataSegment.get(managerKey).close(); } - - - - - + }
--- a/src/alice/datasegment/DataSegmentManager.java Tue Aug 06 02:14:40 2013 +0900 +++ b/src/alice/datasegment/DataSegmentManager.java Tue Aug 13 06:02:55 2013 +0900 @@ -5,7 +5,6 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.log4j.Logger; -import org.msgpack.type.Value; import alice.codesegment.CodeSegment; @@ -51,19 +50,25 @@ } } - public abstract void put(String key, Value val); - public abstract void update(String key, Value val); + public abstract void put(String key, byte[] val); + public abstract void put(String key, Object val); + public abstract void update(String key, byte[] val); + public abstract void update(String key, Object val); public abstract void take(Receiver receiver, CodeSegment cs); public abstract void peek(Receiver receiver, CodeSegment cs); + public abstract void remove(String key); public abstract void close(); public abstract void finish(); - public abstract void quickPut(String key, Value val); - public abstract void quickUpdate(String key, Value val); + public abstract void quickPut(String key, byte[] val); + public abstract void quickPut(String key, Object val); + public abstract void quickUpdate(String key, byte[] val); + public abstract void quickUpdate(String key, Object val); public abstract void quickPeek(Receiver receiver, CodeSegment cs); public abstract void quickTake(Receiver receiver, CodeSegment cs); + }
--- a/src/alice/datasegment/LocalDataSegmentManager.java Tue Aug 06 02:14:40 2013 +0900 +++ b/src/alice/datasegment/LocalDataSegmentManager.java Tue Aug 13 06:02:55 2013 +0900 @@ -6,8 +6,6 @@ import java.util.concurrent.TimeUnit; import org.apache.log4j.Logger; -import org.msgpack.type.Value; - import alice.codesegment.CodeSegment; public class LocalDataSegmentManager extends DataSegmentManager { @@ -62,7 +60,12 @@ } @Override - public void put(String key, Value val) { + public void put(String key, byte[] val) { + put(key, val); + } + + @Override + public void put(String key, Object val) { DataSegmentKey dataSegmentKey = getDataSegmentKey(key); Command cmd = new Command(CommandType.PUT, null, key, val, 0, 0, null, null, reverseKey); dataSegmentKey.runCommand(cmd); @@ -70,19 +73,27 @@ logger.debug(cmd.getCommandString()); } - public void putObject(String key, Object obj) { - DataSegmentKey dataSegmentKey = getDataSegmentKey(key); - Command cmd = new Command(CommandType.PUT, null, key, obj, 0, 0, null, null, reverseKey); - dataSegmentKey.runCommand(cmd); - if (logger.isDebugEnabled()) - logger.debug(cmd.getCommandString()); + @Override + public void quickPut(String key, byte[] val) { + put(key, val); + } + + @Override + public void quickPut(String key, Object val) { + put(key, val); } /** * Enqueue update command to the queue of each DataSegment key */ + @Override - public void update(String key, Value val) { + public void update(String key, byte[] val) { + update(key, val); + } + + @Override + public void update(String key, Object val) { DataSegmentKey dataSegmentKey = getDataSegmentKey(key); Command cmd = new Command(CommandType.UPDATE, null, key, val, 0, 0, null, null, reverseKey); dataSegmentKey.runCommand(cmd); @@ -90,14 +101,18 @@ logger.debug(cmd.getCommandString()); } - public void updateObject(String key, Object val) { - DataSegmentKey dataSegmentKey = getDataSegmentKey(key); - Command cmd = new Command(CommandType.UPDATE, null, key, val, 0, 0, null, null, reverseKey); - dataSegmentKey.runCommand(cmd); - if (logger.isDebugEnabled()) - logger.debug(cmd.getCommandString()); + @Override + public void quickUpdate(String key, byte[] val) { + update(key, val); + } + + @Override + public void quickUpdate(String key, Object val) { + update(key, val); } + + @Override public void take(Receiver receiver, CodeSegment cs) { DataSegmentKey dataSegmentKey = getDataSegmentKey(receiver.key); @@ -109,6 +124,11 @@ } @Override + public void quickTake(Receiver receiver, CodeSegment cs) { + take(receiver, cs); + } + + @Override public void peek(Receiver receiver, CodeSegment cs) { DataSegmentKey dataSegmentKey = getDataSegmentKey(receiver.key); int seq = this.seq.getAndIncrement(); @@ -117,6 +137,12 @@ if (logger.isDebugEnabled()) logger.debug(cmd.getCommandString()); } + + @Override + public void quickPeek(Receiver receiver, CodeSegment cs) { + peek(receiver, cs); + } + @Override public void remove(String key) { @@ -146,26 +172,4 @@ } - @Override - public void quickPeek(Receiver receiver, CodeSegment cs) { - - } - - @Override - public void quickTake(Receiver receiver, CodeSegment cs) { - - } - - @Override - public void quickPut(String key, Value val) { - - } - - @Override - public void quickUpdate(String key, Value val) { - - } - - - }
--- a/src/alice/datasegment/RemoteDataSegmentManager.java Tue Aug 06 02:14:40 2013 +0900 +++ b/src/alice/datasegment/RemoteDataSegmentManager.java Tue Aug 13 06:02:55 2013 +0900 @@ -5,8 +5,6 @@ import java.nio.channels.SocketChannel; import org.apache.log4j.Logger; -import org.msgpack.type.Value; - import alice.codesegment.CodeSegment; import alice.daemon.Connection; import alice.daemon.IncomingTcpConnection; @@ -50,7 +48,15 @@ * send put command to target DataSegment */ @Override - public void put(String key, Value val) { + public void put(String key, byte[] val) { + Command cmd = new Command(CommandType.PUT, null, key, val, 0, 0, null, null, null); + connection.sendCommand(cmd); // put command on the transmission thread + if (logger.isDebugEnabled()) + logger.debug(cmd.getCommandString()); + } + + @Override + public void put(String key, Object val) { Command cmd = new Command(CommandType.PUT, null, key, val, 0, 0, null, null, null); connection.sendCommand(cmd); // put command on the transmission thread if (logger.isDebugEnabled()) @@ -58,7 +64,15 @@ } @Override - public void quickPut(String key, Value val) { + public void quickPut(String key, byte[] val) { + Command cmd = new Command(CommandType.PUT, null, key, val, 0, 0, null, null, null); + connection.write(cmd); // put command is executed right now + if (logger.isDebugEnabled()) + logger.debug(cmd.getCommandString()); + } + + @Override + public void quickPut(String key, Object val) { Command cmd = new Command(CommandType.PUT, null, key, val, 0, 0, null, null, null); connection.write(cmd); // put command is executed right now if (logger.isDebugEnabled()) @@ -66,7 +80,7 @@ } @Override - public void update(String key, Value val) { + public void update(String key, byte[] val) { Command cmd = new Command(CommandType.UPDATE, null, key, val, 0, 0, null, null, null); connection.sendCommand(cmd); if (logger.isDebugEnabled()) @@ -74,12 +88,27 @@ } @Override - public void quickUpdate(String key, Value val) { + public void update(String key, Object val) { + Command cmd = new Command(CommandType.UPDATE, null, key, val, 0, 0, null, null, null); + connection.sendCommand(cmd); + if (logger.isDebugEnabled()) + logger.debug(cmd.getCommandString()); + } + + @Override + public void quickUpdate(String key, byte[] val) { Command cmd = new Command(CommandType.UPDATE, null, key, val, 0, 0, null, null, null); connection.write(cmd); if (logger.isDebugEnabled()) logger.debug(cmd.getCommandString()); - + } + + @Override + public void quickUpdate(String key, Object val) { + Command cmd = new Command(CommandType.UPDATE, null, key, val, 0, 0, null, null, null); + connection.write(cmd); + if (logger.isDebugEnabled()) + logger.debug(cmd.getCommandString()); } @Override @@ -92,7 +121,6 @@ logger.debug(cmd.getCommandString()); } - @Override public void quickTake(Receiver receiver, CodeSegment cs) { int seq = this.seq.getAndIncrement(); Command cmd = new Command(CommandType.TAKE, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null, true); @@ -112,8 +140,6 @@ logger.debug(cmd.getCommandString()); } - - @Override public void quickPeek(Receiver receiver, CodeSegment cs) { int seq = this.seq.getAndIncrement(); Command cmd = new Command(CommandType.PEEK, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null, true);