Mercurial > hg > Database > Alice
changeset 443:2f2623484b77 dispose
change protocol
author | sugi |
---|---|
date | Sat, 18 Oct 2014 19:30:13 +0900 |
parents | 2338b1ef29e8 |
children | 8f006f9d1b9c |
files | src/main/java/alice/daemon/CommandMessage.java src/main/java/alice/daemon/Connection.java src/main/java/alice/daemon/IncomingTcpConnection.java src/main/java/alice/daemon/IncomingUdpConnection.java src/main/java/alice/daemon/MulticastConnection.java src/main/java/alice/datasegment/Command.java src/main/java/alice/datasegment/DataSegmentValue.java src/main/java/alice/datasegment/ReceiveLocalData.java src/main/java/alice/datasegment/ReceiveRemoteData.java src/main/java/alice/datasegment/ReceiverData.java |
diffstat | 10 files changed, 90 insertions(+), 82 deletions(-) [+] |
line wrap: on
line diff
--- a/src/main/java/alice/daemon/CommandMessage.java Sat Oct 04 08:52:34 2014 +0900 +++ b/src/main/java/alice/daemon/CommandMessage.java Sat Oct 18 19:30:13 2014 +0900 @@ -1,7 +1,6 @@ package alice.daemon; import org.msgpack.annotation.Message; -import org.msgpack.type.Value; @Message public class CommandMessage { @@ -9,17 +8,15 @@ public int index; public int seq; public String key; - public Value val; public boolean flag; public CommandMessage() {} - public CommandMessage(int type, int index, int seq, String key, Value val, boolean flag) { + public CommandMessage(int type, int index, int seq, String key, boolean flag) { this.type = type; this.index = index; this.seq = seq; this.key = key; - this.val = val; this.flag = flag; } }
--- a/src/main/java/alice/daemon/Connection.java Sat Oct 04 08:52:34 2014 +0900 +++ b/src/main/java/alice/daemon/Connection.java Sat Oct 18 19:30:13 2014 +0900 @@ -6,7 +6,6 @@ import java.nio.channels.ClosedChannelException; import java.util.concurrent.LinkedBlockingQueue; -import alice.codesegment.SingletonMessage; import alice.datasegment.Command; import alice.datasegment.DataSegment; @@ -35,10 +34,8 @@ } public synchronized void write(Command cmd) { - CommandMessage cmdMsg = cmd.convert(); - ByteBuffer buffer; + ByteBuffer buffer = cmd.convert(); try { - buffer = ByteBuffer.wrap(SingletonMessage.getInstance().write(cmdMsg)); while (buffer.hasRemaining()) { socket.getChannel().write(buffer); }
--- a/src/main/java/alice/daemon/IncomingTcpConnection.java Sat Oct 04 08:52:34 2014 +0900 +++ b/src/main/java/alice/daemon/IncomingTcpConnection.java Sat Oct 18 19:30:13 2014 +0900 @@ -4,6 +4,7 @@ import java.io.IOException; import java.nio.channels.ClosedChannelException; +import org.msgpack.unpacker.MessagePackUnpacker; import org.msgpack.unpacker.Unpacker; import alice.codesegment.SingletonMessage; @@ -41,7 +42,7 @@ public void run() { Unpacker unpacker = null; try { - unpacker = SingletonMessage.getInstance().createUnpacker(connection.socket.getInputStream()); + unpacker = SingletonMessage.getInstance().createUnpacker(connection.socket.getInputStream()); } catch (IOException e) { e.printStackTrace(); } @@ -50,16 +51,19 @@ } while (true) { try { + byte[] val = null; CommandMessage msg = unpacker.read(CommandMessage.class); CommandType type = CommandType.getCommandTypeFromId(msg.type); switch (type) { case UPDATE: + val = getSerializedByteArray(unpacker); lmanager.getDataSegmentKey(msg.key) - .runCommand(new Command(type, null, null, msg.val, 0, 0, null, null, reverseKey)); + .runCommand(new Command(type, null, null, val, 0, 0, null, null, reverseKey)); break; case PUT: + val = getSerializedByteArray(unpacker); lmanager.getDataSegmentKey(msg.key) - .runCommand(new Command(type, null, null, msg.val, 0, 0, null, null, reverseKey)); + .runCommand(new Command(type, null, null, val, 0, 0, null, null, reverseKey)); break; case PEEK: lmanager.getDataSegmentKey(msg.key) @@ -75,7 +79,8 @@ break; case REPLY: Command cmd = manager.getAndRemoveCmd(msg.seq); - cmd.cs.ids.reply(cmd.receiver, new Command(type, null, null, msg.val, msg.index, msg.seq, null, null, null)); + val = getSerializedByteArray(unpacker); + cmd.cs.ids.reply(cmd.receiver, new Command(type, null, null, val, msg.index, msg.seq, null, null, null)); cmd=null; break; case PING: @@ -98,6 +103,18 @@ e.printStackTrace(); } } + } + private byte[] getSerializedByteArray(Unpacker unpacker) { + int len; + byte[] b = null; + try { + len = unpacker.readInt(); + b = ((MessagePackUnpacker) unpacker).getSerializedByteArray(len); + } catch (IOException e) { + e.printStackTrace(); + } + return b; + } }
--- a/src/main/java/alice/daemon/IncomingUdpConnection.java Sat Oct 04 08:52:34 2014 +0900 +++ b/src/main/java/alice/daemon/IncomingUdpConnection.java Sat Oct 18 19:30:13 2014 +0900 @@ -16,7 +16,8 @@ public class IncomingUdpConnection extends IncomingTcpConnection { // receive Data set into LocalDataSegment now but need to set into MulticastDataSegment. - // and this implement has problem. If over 4096 data receive, can not read. + // and this implement has problem. If over 65507 data receive, can not read. + // but Max data length is 65507 because of the max length of UDP payload public MulticastConnection receiver; public MulticastConnection sender; @@ -30,21 +31,23 @@ @Override public void run() { while (true){ - try { - ByteBuffer receive = ByteBuffer.allocate(4096); + try { + // Max data length is 65507 because of the max length of UDP payload + ByteBuffer receive = ByteBuffer.allocate(65507); receiver.receive(receive); Unpacker unpacker = SingletonMessage.getInstance().createBufferUnpacker(receive); receive.flip(); CommandMessage msg = unpacker.read(CommandMessage.class); + byte[] val = unpacker.readByteArray(); CommandType type = CommandType.getCommandTypeFromId(msg.type); switch (type) { case UPDATE: getLocalDataSegmentManager().getDataSegmentKey(msg.key) - .runCommand(new Command(type, null, null, msg.val, 0, 0, null, null, reverseKey)); + .runCommand(new Command(type, null, null, val, 0, 0, null, null, reverseKey)); break; case PUT: getLocalDataSegmentManager().getDataSegmentKey(msg.key) - .runCommand(new Command(type, null, null, msg.val, 0, 0, null, null, reverseKey)); + .runCommand(new Command(type, null, null, val, 0, 0, null, null, reverseKey)); break; case PEEK: getLocalDataSegmentManager().getDataSegmentKey(msg.key) @@ -60,7 +63,7 @@ break; case REPLY: Command cmd = manager.getAndRemoveCmd(msg.seq); - cmd.cs.ids.reply(cmd.receiver, new Command(type, null, null, msg.val, msg.index, msg.seq, null, null, null)); + cmd.cs.ids.reply(cmd.receiver, new Command(type, null, null, val, msg.index, msg.seq, null, null, null)); cmd=null; break; case PING:
--- a/src/main/java/alice/daemon/MulticastConnection.java Sat Oct 04 08:52:34 2014 +0900 +++ b/src/main/java/alice/daemon/MulticastConnection.java Sat Oct 18 19:30:13 2014 +0900 @@ -5,7 +5,6 @@ import java.nio.ByteBuffer; import java.nio.channels.DatagramChannel; -import alice.codesegment.SingletonMessage; import alice.datasegment.Command; public class MulticastConnection extends Connection { @@ -20,10 +19,8 @@ // may need to add infomation who send on ds. @Override public synchronized void write(Command cmd){ - CommandMessage cmdMsg = cmd.convert(); - ByteBuffer buffer; + ByteBuffer buffer = cmd.convert(); try { - buffer = ByteBuffer.wrap(SingletonMessage.getInstance().write(cmdMsg)); while (buffer.hasRemaining()){ dc.send(buffer, sAddr); }
--- a/src/main/java/alice/datasegment/Command.java Sat Oct 04 08:52:34 2014 +0900 +++ b/src/main/java/alice/datasegment/Command.java Sat Oct 18 19:30:13 2014 +0900 @@ -1,9 +1,10 @@ package alice.datasegment; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.concurrent.BlockingQueue; -import org.msgpack.type.Value; +import org.msgpack.MessagePack; import alice.codesegment.CodeSegment; import alice.codesegment.SingletonMessage; import alice.daemon.CommandMessage; @@ -13,7 +14,7 @@ public CommandType type; public String key; public Receiver receiver; - public Value val; + public byte[] val; public int index; public int seq; public Connection connection; // for remote @@ -23,7 +24,7 @@ public Object obj; public boolean flag; - public Command(CommandType cmdType, Receiver receiver, String key, Value val, int index, int seq, BlockingQueue<Command> replyQueue, CodeSegment cs, String reverseKey) { + public Command(CommandType cmdType, Receiver receiver, String key, byte[] val, int index, int seq, BlockingQueue<Command> replyQueue, CodeSegment cs, String reverseKey) { this.type = cmdType; this.receiver = receiver; this.key = key; @@ -36,7 +37,7 @@ this.flag = false; } - public Command(CommandType cmdType, Receiver receiver, String key, Value val, int index, int seq, BlockingQueue<Command> replyQueue, CodeSegment cs, String reverseKey, boolean flag) { + public Command(CommandType cmdType, Receiver receiver, String key, byte[] val, int index, int seq, BlockingQueue<Command> replyQueue, CodeSegment cs, String reverseKey, boolean flag) { this.type = cmdType; this.receiver = receiver; this.key = key; @@ -49,7 +50,7 @@ this.flag = flag; } - public Command(CommandType cmdType, Receiver receiver, String key, Value val, int index, int seq, Connection connection, CodeSegment cs, String reverseKey, boolean flag) { + public Command(CommandType cmdType, Receiver receiver, String key, byte[] val, int index, int seq, Connection connection, CodeSegment cs, String reverseKey, boolean flag) { this.type = cmdType; this.receiver = receiver; this.key = key; @@ -75,7 +76,7 @@ this.flag = false; } - public Command(CommandType cmdType, Receiver receiver, String key, Value val, Object obj, int index, int seq, BlockingQueue<Command> replyQueue, CodeSegment cs, String reverseKey) { + public Command(CommandType cmdType, Receiver receiver, String key, byte[] val, Object obj, int index, int seq, BlockingQueue<Command> replyQueue, CodeSegment cs, String reverseKey) { this.type = cmdType; this.receiver = receiver; this.key = key; @@ -96,15 +97,42 @@ } return this.type + "\t" + key + "\t" + val + "\tindex=" + index + "\tcs=" + csName; } - public CommandMessage convert() { - if (val==null&&obj!=null){ - try { - this.val = SingletonMessage.getInstance().unconvert(obj); - } catch (IOException e) { - e.printStackTrace(); + public ByteBuffer convert() { + ByteBuffer buf = null; + MessagePack msg = SingletonMessage.getInstance(); + try { + byte[] header = msg.write(new CommandMessage(type.id, index, seq, key, flag)); + + switch (type) { + case UPDATE: + case PUT: + case REPLY: + case RESPONSE: + byte[] data = null; + if (val==null&&obj!=null){ + data = msg.write(obj); + } else if (val!=null) { + data = val; + } + byte[] dataSize = msg.write(data.length); + + buf = ByteBuffer.allocate(header.length+dataSize.length+data.length); + buf.put(header); + buf.put(dataSize); + buf.put(data); + break; + + default: + buf = ByteBuffer.allocate(header.length); + buf.put(header); + break; } + + buf.flip(); + } catch (IOException e) { + e.printStackTrace(); } - return new CommandMessage(type.id, index, seq, key, val, flag); + return buf; } }
--- a/src/main/java/alice/datasegment/DataSegmentValue.java Sat Oct 04 08:52:34 2014 +0900 +++ b/src/main/java/alice/datasegment/DataSegmentValue.java Sat Oct 18 19:30:13 2014 +0900 @@ -1,22 +1,20 @@ package alice.datasegment; -import org.msgpack.type.Value; - public class DataSegmentValue { public int index; - public Value val; + public byte[] val; public String from; public Object obj; - public DataSegmentValue(int index, Value val, Object obj,String reverseKey) { + public DataSegmentValue(int index, byte[] val, Object obj,String reverseKey) { this.index = index; this.val = val; this.from = reverseKey; this.obj = obj; } - public DataSegmentValue(int index, Value val,String reverseKey) { + public DataSegmentValue(int index, byte[] val,String reverseKey) { this.index = index; this.val = val; this.from = reverseKey;
--- a/src/main/java/alice/datasegment/ReceiveLocalData.java Sat Oct 04 08:52:34 2014 +0900 +++ b/src/main/java/alice/datasegment/ReceiveLocalData.java Sat Oct 18 19:30:13 2014 +0900 @@ -2,7 +2,6 @@ import java.io.IOException; -import org.msgpack.type.ArrayValue; import org.msgpack.type.Value; import alice.codesegment.SingletonMessage; @@ -11,8 +10,8 @@ public class ReceiveLocalData implements ReceiverData { private Object obj; - public ReceiveLocalData(Object obj2) { - this.obj = obj2; + public ReceiveLocalData(Object obj) { + this.obj = obj; } public String asString() { @@ -27,17 +26,12 @@ return (Float) obj; } - public ArrayValue asArray(){ - return (ArrayValue) obj; - } - @SuppressWarnings("unchecked") public <T> T asClass(Class<T> clazz) { return (T) obj; } - @Override public Value getVal() { try { return SingletonMessage.getInstance().unconvert(obj); @@ -47,7 +41,6 @@ return null; } - @Override public Object getObj() { return obj; }
--- a/src/main/java/alice/datasegment/ReceiveRemoteData.java Sat Oct 04 08:52:34 2014 +0900 +++ b/src/main/java/alice/datasegment/ReceiveRemoteData.java Sat Oct 18 19:30:13 2014 +0900 @@ -2,53 +2,35 @@ import java.io.IOException; -import org.msgpack.type.ArrayValue; import org.msgpack.type.Value; -import org.msgpack.type.ValueType; - import alice.codesegment.SingletonMessage; public class ReceiveRemoteData implements ReceiverData { - public Value val; + public byte[] val; // serialized with MessagePack. need decode - public ReceiveRemoteData() { - } + public ReceiveRemoteData() {} - public ReceiveRemoteData(Value val2) { - this.val = val2; + public ReceiveRemoteData(byte[] val) { + this.val = val; } public String asString() { - if (val.getType() == ValueType.RAW) { - return val.asRawValue().getString(); - } - return null; + return asClass(String.class); } public int asInteger() { - if (val.getType() == ValueType.INTEGER) { - return val.asIntegerValue().getInt(); - } - return 0; + Integer num = asClass(Integer.class); + return num!=null ? num : 0; } public Float asFloat() { - if (val.getType() == ValueType.FLOAT) { - return val.asFloatValue().getFloat(); - } - return 0.0f; - } - - public ArrayValue asArray(){ - if (val.getType() == ValueType.ARRAY){ - return val.asArrayValue(); - } - return null; + Float num = asClass(Float.class); + return num!=null ? num : 0.0f; } public <T> T asClass(Class<T> clazz) { try { - return SingletonMessage.getInstance().convert(val, clazz); + return SingletonMessage.getInstance().read(val, clazz); } catch (IOException e) { e.printStackTrace(); } @@ -56,12 +38,10 @@ } - @Override public Value getVal() { - return val; + return asClass(Value.class); } - @Override public Object getObj() { return val; }
--- a/src/main/java/alice/datasegment/ReceiverData.java Sat Oct 04 08:52:34 2014 +0900 +++ b/src/main/java/alice/datasegment/ReceiverData.java Sat Oct 18 19:30:13 2014 +0900 @@ -1,6 +1,5 @@ package alice.datasegment; -import org.msgpack.type.ArrayValue; import org.msgpack.type.Value; public interface ReceiverData { @@ -8,7 +7,6 @@ public String asString(); public int asInteger(); public Float asFloat() ; - public ArrayValue asArray(); public <T> T asClass(Class<T> clazz); public Value getVal(); public Object getObj();