# HG changeset patch # User Nozomi Teruya # Date 1430471956 -32400 # Node ID cb7c31848d168c5acfffaf591aa45ac3f3bff7e4 # Parent 6ebddfac7ff6bbcd2416ae89e6bf8883c401b9ad add CompressedDSMs diff -r 6ebddfac7ff6 -r cb7c31848d16 src/main/java/alice/daemon/CommandMessage.java --- a/src/main/java/alice/daemon/CommandMessage.java Thu Apr 30 20:52:21 2015 +0900 +++ b/src/main/java/alice/daemon/CommandMessage.java Fri May 01 18:19:16 2015 +0900 @@ -14,6 +14,7 @@ public boolean quickFlag = false;//SEDAを挟まずに処理を行うかどうか public boolean serialized = false;//シリアライズされているかどうか public boolean compressed = false;//圧縮されているかどうか + public int datasize; public boolean setTime = false;//? public long time;//? @@ -30,5 +31,6 @@ this.quickFlag = qFlag; this.serialized = sFlag; this.compressed = cFlag; + ///this.datasize = datasize; } } diff -r 6ebddfac7ff6 -r cb7c31848d16 src/main/java/alice/daemon/IncomingTcpConnection.java --- a/src/main/java/alice/daemon/IncomingTcpConnection.java Thu Apr 30 20:52:21 2015 +0900 +++ b/src/main/java/alice/daemon/IncomingTcpConnection.java Fri May 01 18:19:16 2015 +0900 @@ -8,7 +8,6 @@ import org.msgpack.MessagePack; import org.msgpack.unpacker.Unpacker; -import alice.codesegment.SingletonMessage; import alice.topology.manager.keeparive.RespondData; public class IncomingTcpConnection extends Thread { @@ -17,6 +16,7 @@ protected DataSegmentManager manager; protected String reverseKey; private LocalDataSegmentManager lmanager = DataSegment.getLocal(); + private CompressedLocalDataSegmentManager compressedlmanager = DataSegment.getCompressedLocal(); private static final MessagePack packer = new MessagePack(); public IncomingTcpConnection(DataSegmentManager manager) { @@ -33,6 +33,10 @@ return lmanager; } + public CompressedLocalDataSegmentManager getCompressedLocalDataSegmentManager(){ + return compressedlmanager; + } + /** * pipeline thread for receiving */ @@ -86,7 +90,7 @@ System.out.println("in TCP REPLY"); cmd = manager.getAndRemoveCmd(msg.seq); - rData = new ReceiveData(unpacker.getSerializedByteArray(unpacker.readInt()));//ここのコンストラクタでx + rData = new ReceiveData(unpacker.getSerializedByteArray(unpacker.readInt())); Command rCmd = new Command(type, null, null, rData, msg.index, msg.seq, null, null, ""); cmd.cs.ids.reply(cmd.receiver, rCmd); diff -r 6ebddfac7ff6 -r cb7c31848d16 src/main/java/alice/daemon/IncomingUdpConnection.java --- a/src/main/java/alice/daemon/IncomingUdpConnection.java Thu Apr 30 20:52:21 2015 +0900 +++ b/src/main/java/alice/daemon/IncomingUdpConnection.java Fri May 01 18:19:16 2015 +0900 @@ -5,9 +5,9 @@ import java.nio.ByteBuffer; import java.nio.channels.ClosedChannelException; +import org.msgpack.MessagePack; import org.msgpack.unpacker.Unpacker; -import alice.codesegment.SingletonMessage; import alice.datasegment.Command; import alice.datasegment.CommandType; import alice.datasegment.DataSegment; @@ -22,6 +22,7 @@ public MulticastConnection receiver; public MulticastConnection sender; + private static final MessagePack packer = new MessagePack(); public IncomingUdpConnection(MulticastConnection s, MulticastConnection r, DataSegmentManager manager) { super(manager); @@ -39,7 +40,7 @@ // Max data length is 65507 because of the max length of UDP payload ByteBuffer receive = ByteBuffer.allocate(65507); receiver.receive(receive); - Unpacker unpacker = SingletonMessage.getInstance().createBufferUnpacker(receive); + Unpacker unpacker = packer.createBufferUnpacker(receive); receive.flip(); CommandMessage msg = unpacker.read(CommandMessage.class); CommandType type = CommandType.getCommandTypeFromId(msg.type); @@ -47,7 +48,6 @@ case UPDATE: case PUT: rData = new ReceiveData(unpacker.getSerializedByteArray(unpacker.readInt())); - rData.setCompressFlag(msg.compressed); cmd = new Command(type, null, null, rData, 0, 0, null, null, reverseKey); if (msg.compressed){ @@ -77,7 +77,6 @@ cmd = manager.getAndRemoveCmd(msg.seq); rData = new ReceiveData(unpacker.getSerializedByteArray(unpacker.readInt())); - rData.setCompressFlag(msg.compressed); Command rCmd = new Command(type, null, null, rData, msg.index, msg.seq, null, null, ""); cmd.cs.ids.reply(cmd.receiver, rCmd); diff -r 6ebddfac7ff6 -r cb7c31848d16 src/main/java/alice/datasegment/Command.java --- a/src/main/java/alice/datasegment/Command.java Thu Apr 30 20:52:21 2015 +0900 +++ b/src/main/java/alice/datasegment/Command.java Fri May 01 18:19:16 2015 +0900 @@ -7,7 +7,6 @@ import org.msgpack.MessagePack; import alice.codesegment.CodeSegment; -import alice.codesegment.SingletonMessage; import alice.daemon.CommandMessage; import alice.daemon.Connection; @@ -28,6 +27,8 @@ private boolean quickFlag = false;//SEDAを挟まずに処理を行うかどうか。trueだとリモート private boolean compressFlag = false;//trueだったら圧縮する必要がある + private static final MessagePack packer = new MessagePack(); + /** * for PEEK/TAKE */ @@ -69,14 +70,19 @@ } return this.type + "\t" + key + "\t" + rData + "\tindex=" + index + "\tcs=" + csName; } - /** * @return serialized ByteBuffer */ - public ByteBuffer convert() {//byteArrayに変換 + public ByteBuffer convert() { + ByteBuffer buf = null; - ByteBuffer buf = null; - switch (type) { + try { + byte[] header = null; + byte[] data = null; + byte[] dataSize = null; + boolean serialized = false; + boolean compressed = false; + switch (type) { /* * UPDATE, PUT, REPLY need send DataSegment to RemoteDataSegment * case UPDATE and PUT @@ -85,26 +91,52 @@ * these flags represent DataSegment status. * for example, serializeFlag is true. DataSegment had already converted, so no need convert. */ + case UPDATE: + case PUT: + case REPLY: + if (rData.compressed()) { + // have already converted + data = (byte[]) rData.getObj(); + compressed = rData.compressed(); // true + serialized = rData.serialized(); + } else { + if (!rData.serialized() && !rData.isByteArray()) { + data = packer.write(rData.getObj()); + serialized = true; + } else { // rData is RAW ByteArray or already serialized + data = (byte[]) rData.getObj(); + serialized = rData.serialized(); + } + if (compressFlag) { + rData.zip(); + compressed = true; + } + } + CommandMessage cm = new CommandMessage(type.id, index, seq, key, false, serialized, compressed); + if (rData.setTime) { + cm.setTime = true; + cm.time = rData.time; + cm.depth = rData.depth + 1; + } - case UPDATE: - System.out.println("update compressFlag:" + compressFlag); - break; - case PUT: - System.out.println("put compressFlag:" + compressFlag); - break; - case REPLY://ReceiveDataからREPLYするDSを取得 - System.out.println("in REPLY"); - System.out.println("reply compressFlag:" + compressFlag + ", " + type.id+ ", " + index+ ", " + seq + ", " + key+ ", " + false+ ", " + rData.serialized()+ ", " + rData.compressed()); + header = packer.write(cm); + dataSize = packer.write(data.length); + buf = ByteBuffer.allocate(header.length+dataSize.length+data.length); + buf.put(header); + buf.put(dataSize); + buf.put(data); + break; + default: + header = packer.write(new CommandMessage(type.id, index, seq, key, quickFlag, false, compressFlag)); + buf = ByteBuffer.allocate(header.length); + buf.put(header); + break; + } - - break; - default://PEEK, TAKE, RemoveならCommandMessageとそのサイズのみセット - ReceiveData rData2 = new ReceiveData("hoge"); - System.out.println("default compressFlag:" + compressFlag); - - break; + buf.flip(); + } catch (IOException e) { + e.printStackTrace(); } - return buf; } @@ -115,11 +147,11 @@ * @param flag */ - public void setQuickFlag(boolean flag){//SEDA処理の有無フラグのsetter + public void setQuickFlag(boolean flag){ quickFlag = flag; } - public boolean getQuickFlag(){//SEDA処理の有無フラグのgetter + public boolean getQuickFlag(){ return quickFlag; } @@ -130,12 +162,11 @@ * @param flag */ - public void setCompressFlag(boolean flag){//圧縮フラグのsetter + public void setCompressFlag(boolean flag){ compressFlag = flag; } - public boolean getCompressFlag(){//圧縮フラグのgetter + public boolean getCompressFlag(){ return compressFlag; } - } diff -r 6ebddfac7ff6 -r cb7c31848d16 src/main/java/alice/datasegment/CompressedLocalDataSegmentManager.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/main/java/alice/datasegment/CompressedLocalDataSegmentManager.java Fri May 01 18:19:16 2015 +0900 @@ -0,0 +1,176 @@ +package alice.datasegment; + +import java.io.IOException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.apache.log4j.Logger; + +import alice.codesegment.CodeSegment; + +public class CompressedLocalDataSegmentManager extends DataSegmentManager { + + private String reverseKey = "local"; + private ConcurrentHashMap dataSegments = new ConcurrentHashMap(); + private Logger logger = Logger.getLogger("local"); + + private ThreadPoolExecutor dataSegmentExecutor = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), // initial number of threads + Runtime.getRuntime().availableProcessors(), + Integer.MAX_VALUE, // keepAliveTime + TimeUnit.SECONDS, + new LinkedBlockingQueue()); + + public CompressedLocalDataSegmentManager() { + new Thread(replyThread, "LocalDataSegmentManager-replyCommand").start(); + } + + public void setReverseKey(String s){ + reverseKey = s; + } + + private class RunCommand implements Runnable { + + DataSegmentKey key; + Command cmd; + + public RunCommand(DataSegmentKey key, Command cmd) { + this.key = key; + this.cmd = cmd; + } + + @Override + public void run() { + key.runCommand(cmd); + } + + } + + public void submitCommand(DataSegmentKey key, Command cmd) { + dataSegmentExecutor.execute(new RunCommand(key, cmd)); + } + + public DataSegmentKey getDataSegmentKey(String key) { + DataSegmentKey dsKey = dataSegments.get(key); + if (dsKey != null) + return dsKey; + if (key == null) + return null; + DataSegmentKey newDataSegmentKey = new DataSegmentKey(); + DataSegmentKey dataSegmentKey = dataSegments.putIfAbsent(key, newDataSegmentKey); + if (dataSegmentKey == null) { + dataSegmentKey = newDataSegmentKey; + } + return dataSegmentKey; + } + + public void removeDataSegmentKey(String key) { + if (key!=null) + dataSegments.remove(key); + } + + @Override + public void put(String key, ReceiveData rData, SendOption option) { + DataSegmentKey dataSegmentKey = getDataSegmentKey(key); + try { + rData.zip(); + } catch (IOException e) { + e.printStackTrace(); + } + Command cmd = new Command(CommandType.PUT, null, key, rData, 0, 0, null, null, reverseKey); + cmd.setCompressFlag(option.isCompress()); + + dataSegmentKey.runCommand(cmd); + if (logger.isDebugEnabled()) + logger.debug(cmd.getCommandString()); + } + + /** + * Enqueue update command to the queue of each DataSegment key + */ + + @Override + public void update(String key, ReceiveData rData, SendOption option) { + DataSegmentKey dataSegmentKey = getDataSegmentKey(key); + try { + rData.zip(); + } catch (IOException e) { + e.printStackTrace(); + } + Command cmd = new Command(CommandType.UPDATE, null, key, rData, 0, 0, null, null, reverseKey); + cmd.setCompressFlag(option.isCompress()); + + dataSegmentKey.runCommand(cmd); + if (logger.isDebugEnabled()) + logger.debug(cmd.getCommandString()); + } + + @Override + public void take(Receiver receiver, CodeSegment cs, SendOption option) { + DataSegmentKey dataSegmentKey = getDataSegmentKey(receiver.key); + int seq = this.seq.getAndIncrement(); + Command cmd = new Command(CommandType.TAKE, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null); + dataSegmentKey.runCommand(cmd); + if (logger.isDebugEnabled()) + logger.debug(cmd.getCommandString()); + } + + @Override + public void peek(Receiver receiver, CodeSegment cs, SendOption option) { + DataSegmentKey dataSegmentKey = getDataSegmentKey(receiver.key); + int seq = this.seq.getAndIncrement(); + Command cmd = new Command(CommandType.PEEK, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null); + dataSegmentKey.runCommand(cmd); + if (logger.isDebugEnabled()) + logger.debug(cmd.getCommandString()); + } + + @Override + public void remove(String key) { + DataSegmentKey dataSegmentKey = getDataSegmentKey(key); + Command cmd = new Command(CommandType.REMOVE, null, key, null, 0, 0, replyQueue, null, null); + dataSegmentKey.runCommand(cmd); + if (logger.isDebugEnabled()) + logger.debug(cmd.getCommandString()); + } + + @Override public void finish() { + System.exit(0); + } + + @Override + public void close() { + + } + + public void recommand(Receiver receiver, CodeSegment cs) { + DataSegmentKey dataSegmentKey = getDataSegmentKey(receiver.key); + int seq = this.seq.getAndIncrement(); + Command cmd = new Command(receiver.type, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null); + dataSegmentKey.runCommand(cmd); + if (logger.isDebugEnabled()) + logger.debug(cmd.getCommandString()); + + } + + @Override + public void ping(String returnKey) { + + } + + @Override + public void response(String returnKey) { + + } + + @Override + public void shutdown() { + + } + + @Override + public void setSendError(boolean b) { + + } +} diff -r 6ebddfac7ff6 -r cb7c31848d16 src/main/java/alice/datasegment/CompressedRemoteDataSegmentManager.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/main/java/alice/datasegment/CompressedRemoteDataSegmentManager.java Fri May 01 18:19:16 2015 +0900 @@ -0,0 +1,172 @@ +package alice.datasegment; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.channels.SocketChannel; +import java.util.concurrent.LinkedBlockingQueue; + +import org.apache.log4j.Logger; + +import alice.codesegment.CodeSegment; +import alice.daemon.Connection; +import alice.daemon.IncomingTcpConnection; +import alice.daemon.OutboundTcpConnection; + +public class CompressedRemoteDataSegmentManager extends DataSegmentManager { + protected Connection connection; + protected Logger logger; + + public CompressedRemoteDataSegmentManager(){} + + public CompressedRemoteDataSegmentManager(String connectionKey, final String reverseKey, final String hostName, final int port) { + logger = Logger.getLogger(connectionKey); + connection = new Connection(); + connection.name = connectionKey; + final CompressedRemoteDataSegmentManager manager = this; + //new Thread(replyThread, "RemoteDataSegmentManager-" + connectionKey).start(); + new Thread("Connect-" + connectionKey) { + public void run() { + boolean connect = true; + do { + try { + SocketChannel sc = SocketChannel.open(new InetSocketAddress(hostName, port)); + connection.socket = sc.socket(); + connection.socket.setTcpNoDelay(true); + connect = false; + logger.info("Connect to " + connection.getInfoString()); + } catch (IOException e) { + try { + Thread.sleep(50); + } catch (InterruptedException e1) { + e1.printStackTrace(); + } + } + } while (connect); + IncomingTcpConnection in = new IncomingTcpConnection(connection, manager, reverseKey); + in.setName(reverseKey+"-IncomingTcp"); + in.setPriority(MAX_PRIORITY); + in.start(); + OutboundTcpConnection out = new OutboundTcpConnection(connection); + out.setName(connectionKey+"-OutboundTcp"); + out.setPriority(MAX_PRIORITY); + out.start(); + } + }.start(); + } + + /** + * send put command to target DataSegment + */ + @Override + public void put(String key, ReceiveData rData, SendOption option) { + try { + rData.zip(); + } catch (IOException e) { + e.printStackTrace(); + } + Command cmd = new Command(CommandType.PUT, null, key, rData, 0, 0, null, null, ""); + cmd.setCompressFlag(option.isCompress());//true + + if (option.isQuick()){ + connection.write(cmd); // put command is executed right now + } else { + connection.sendCommand(cmd); // put command on the transmission thread + } + if (logger.isDebugEnabled()) + logger.debug(cmd.getCommandString()); + } + + @Override + public void update(String key, ReceiveData rData, SendOption option) { + try { + rData.zip(); + } catch (IOException e) { + e.printStackTrace(); + } + Command cmd = new Command(CommandType.UPDATE, null, key, rData, 0, 0, null, null, ""); + cmd.setCompressFlag(option.isCompress()); + + if (option.isQuick()){ + connection.write(cmd); + } else { + connection.sendCommand(cmd); + } + if (logger.isDebugEnabled()) + logger.debug(cmd.getCommandString()); + } + + @Override + public void take(Receiver receiver, CodeSegment cs, SendOption option) { + int seq = this.seq.getAndIncrement(); + Command cmd = new Command(CommandType.TAKE, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null); + cmd.setQuickFlag(option.isQuick()); + seqHash.put(seq, cmd); + if (option.isQuick()){ + connection.write(cmd); + } else { + connection.sendCommand(cmd); + } + if (logger.isDebugEnabled()) + logger.debug(cmd.getCommandString()); + } + + @Override + public void peek(Receiver receiver, CodeSegment cs, SendOption option) { + int seq = this.seq.getAndIncrement(); + Command cmd = new Command(CommandType.PEEK, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null); + cmd.setQuickFlag(option.isQuick()); + seqHash.put(seq, cmd); + if (option.isQuick()){ + connection.write(cmd); + } else { + connection.sendCommand(cmd); + } + if (logger.isDebugEnabled()) + logger.debug(cmd.getCommandString()); + } + + @Override + public void remove(String key) { + Command cmd = new Command(CommandType.REMOVE, null, key, null, 0, 0, null, null, ""); + connection.sendCommand(cmd); + if (logger.isDebugEnabled()) + logger.debug(cmd.getCommandString()); + } + + @Override + public void finish() { + Command cmd = new Command(CommandType.FINISH, null, null, null, 0, 0, null, null, ""); + connection.sendCommand(cmd); + } + + @Override + public void ping(String returnKey) { + Command cmd = new Command(CommandType.PING, null, returnKey, null, 0, 0, null, null, ""); + connection.write(cmd); + } + + @Override + public void response(String returnKey) { + Command cmd = new Command(CommandType.RESPONSE, null, returnKey, null, 0, 0, null, null, ""); + connection.write(cmd); + } + + @Override + public void close() { + Command cmd = new Command(CommandType.CLOSE, null, null, null, 0, 0, null, null, ""); + connection.sendManager = false; + connection.sendCommand(cmd); + } + + @Override + public void shutdown() { + connection.close(); + LinkedBlockingQueue queue = connection.sendQueue; + if (!queue.isEmpty()) queue.clear(); + } + + @Override + public void setSendError(boolean b) { + connection.sendManager = b; + } +} diff -r 6ebddfac7ff6 -r cb7c31848d16 src/main/java/alice/datasegment/DataSegment.java --- a/src/main/java/alice/datasegment/DataSegment.java Thu Apr 30 20:52:21 2015 +0900 +++ b/src/main/java/alice/datasegment/DataSegment.java Fri May 01 18:19:16 2015 +0900 @@ -10,7 +10,7 @@ private static DataSegment dataSegment = new DataSegment(); private LocalDataSegmentManager local = new LocalDataSegmentManager(); - private LocalDataSegmentManager compressedLocal = new LocalDataSegmentManager();//追加 + private CompressedLocalDataSegmentManager compressedLocal = new CompressedLocalDataSegmentManager();//追加 private ConcurrentHashMap dataSegmentManagers = new ConcurrentHashMap(); //TODO Over Head private ConcurrentHashMap acceptHash = new ConcurrentHashMap(); @@ -27,7 +27,7 @@ return dataSegment.local; } - public static LocalDataSegmentManager getCompressedLocal() {//追加 + public static CompressedLocalDataSegmentManager getCompressedLocal() {//追加 return dataSegment.compressedLocal; } @@ -42,7 +42,7 @@ System.exit(0); } RemoteDataSegmentManager manager = new RemoteDataSegmentManager(connectionKey, reverseKey, hostName, port); - RemoteDataSegmentManager compressedManager = new RemoteDataSegmentManager(connectionKey + "!", reverseKey, hostName, port); + CompressedRemoteDataSegmentManager compressedManager = new CompressedRemoteDataSegmentManager(connectionKey + "!", reverseKey, hostName, port); register(connectionKey, manager); register(connectionKey + "!", compressedManager); diff -r 6ebddfac7ff6 -r cb7c31848d16 src/main/java/alice/datasegment/LocalDataSegmentManager.java --- a/src/main/java/alice/datasegment/LocalDataSegmentManager.java Thu Apr 30 20:52:21 2015 +0900 +++ b/src/main/java/alice/datasegment/LocalDataSegmentManager.java Fri May 01 18:19:16 2015 +0900 @@ -9,10 +9,6 @@ import alice.codesegment.CodeSegment; -/** - * localのDSに対する処理。DS自体は持っていない。→ReceivedData - * DataSegmentKey.runCommandに渡してコマンドを実行する。 - */ public class LocalDataSegmentManager extends DataSegmentManager { private String reverseKey = "local"; @@ -25,7 +21,6 @@ TimeUnit.SECONDS, new LinkedBlockingQueue()); - //コンストラクタ。スレッドが走る。 public LocalDataSegmentManager() { new Thread(replyThread, "LocalDataSegmentManager-replyCommand").start(); } @@ -77,10 +72,7 @@ @Override public void put(String key, ReceiveData rData, SendOption option) { DataSegmentKey dataSegmentKey = getDataSegmentKey(key); - Command cmd = new Command(CommandType.PUT, null, key, rData, 0, 0, null, null, reverseKey);//ここでMP変換している - cmd.setCompressFlag(option.getCompressFlag()); - rData.setCompressFlag(option.getCompressFlag()); - + Command cmd = new Command(CommandType.PUT, null, key, rData, 0, 0, null, null, reverseKey); dataSegmentKey.runCommand(cmd); if (logger.isDebugEnabled()) logger.debug(cmd.getCommandString()); @@ -94,9 +86,6 @@ public void update(String key, ReceiveData rData, SendOption option) { DataSegmentKey dataSegmentKey = getDataSegmentKey(key); Command cmd = new Command(CommandType.UPDATE, null, key, rData, 0, 0, null, null, reverseKey); - cmd.setCompressFlag(option.getCompressFlag()); - rData.setCompressFlag(option.getCompressFlag()); - dataSegmentKey.runCommand(cmd); if (logger.isDebugEnabled()) logger.debug(cmd.getCommandString()); @@ -107,8 +96,6 @@ DataSegmentKey dataSegmentKey = getDataSegmentKey(receiver.key); int seq = this.seq.getAndIncrement(); Command cmd = new Command(CommandType.TAKE, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null); - cmd.setCompressFlag(option.getCompressFlag()); - dataSegmentKey.runCommand(cmd); if (logger.isDebugEnabled()) logger.debug(cmd.getCommandString()); @@ -119,14 +106,11 @@ DataSegmentKey dataSegmentKey = getDataSegmentKey(receiver.key); int seq = this.seq.getAndIncrement(); Command cmd = new Command(CommandType.PEEK, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null); - cmd.setCompressFlag(option.getCompressFlag()); - dataSegmentKey.runCommand(cmd); if (logger.isDebugEnabled()) logger.debug(cmd.getCommandString()); } - //このコマンドは? @Override public void remove(String key) { DataSegmentKey dataSegmentKey = getDataSegmentKey(key); @@ -145,7 +129,6 @@ } - //? public void recommand(Receiver receiver, CodeSegment cs) { DataSegmentKey dataSegmentKey = getDataSegmentKey(receiver.key); int seq = this.seq.getAndIncrement(); diff -r 6ebddfac7ff6 -r cb7c31848d16 src/main/java/alice/datasegment/ReceiveData.java --- a/src/main/java/alice/datasegment/ReceiveData.java Thu Apr 30 20:52:21 2015 +0900 +++ b/src/main/java/alice/datasegment/ReceiveData.java Fri May 01 18:19:16 2015 +0900 @@ -2,18 +2,12 @@ import java.io.*; import java.nio.ByteBuffer; -import java.util.Arrays; import java.util.LinkedList; import java.util.zip.*; -import alice.daemon.CommandMessage; -import org.apache.log4j.Logger; import org.msgpack.MessagePack; import org.msgpack.type.Value; -import alice.codesegment.SingletonMessage; -import javax.xml.bind.DatatypeConverter; -import java.io.ByteArrayInputStream; import static java.util.Arrays.*; @@ -56,7 +50,6 @@ } } - public boolean isByteArray(){ return messagePack != null | zMessagePack != null; } @@ -116,10 +109,10 @@ } if (zMessagePack != null && messagePack == null) { - messagePack = unzip(zMessagePack); + messagePack = unzip(zMessagePack, 100);///ToDo:read header and set length } - return SingletonMessage.getInstance().read(messagePack, clazz); + return packer.read(messagePack, clazz); } catch (IOException e) {// | DataFormatException e e.printStackTrace(); @@ -129,24 +122,19 @@ public int zip() throws IOException { - LinkedList inputs = new LinkedList(messagePack); - int inputIndex - LinkedList outputs; + LinkedList inputs = new LinkedList(); + int inputIndex = 0; + LinkedList outputs = new LinkedList(); + Deflater deflater = new Deflater(); int len = 0; - int INFLATE_BUFSIZE = 1024 * 100; + int INFLATE_BUFSIZE = 1024 * 100;//ToDo:fix ByteBuffer c1 = allocate(INFLATE_BUFSIZE);//for output while (inputIndex < inputs.size()) { ByteBuffer b1 = inputs.get(inputIndex++); deflater.setInput(b1.array(), b1.position(), b1.remaining()); - /** - * If we finish() stream and reset() it, Deflater start new gzip - * stream, this makes continuous zlib reader unhappy. if we remove - * finish(), Deflater.deflate() never flushes its output. The - * original zlib deflate has flush flag. I'm pretty sure this a kind - * of bug of Java library. - */ + if (inputIndex == inputs.size()){ deflater.finish(); } @@ -170,17 +158,17 @@ outputs.addLast(c1); } - zMessagePack = outputs + //zMessagePack = outputs.toArray(); deflater.reset(); return len;///return length of ziped data } - protected byte[] unzip(byte[] input) {///read header & unzip + protected byte[] unzip(byte[] input, int zippedLength) {///read header & unzip int length = input.length; - int zippedLength = byteArrayToInt(copyOfRange(input, 4, 8));///read header...Is this copy OK?? + Inflater inflater = new Inflater(); byte [] output = new byte [zippedLength];///byteArray for unziped data - inflater.setInput(input, 8, length - 8);///set unzip data without header + inflater.setInput(input, 0, length);///set unzip data without header try { System.out.println("return:" + inflater.inflate(output, 0, zippedLength));///unzip @@ -208,7 +196,6 @@ b = ByteBuffer.allocate(size); } catch (OutOfMemoryError e) { b = null; - System.err.println("multicastqueue : wait for heap : " + e); } if (b!=null) { break; @@ -216,7 +203,6 @@ try { wait(); } catch (InterruptedException e) { - System.out.println("thread has interrupted the current thread."); } } return b; diff -r 6ebddfac7ff6 -r cb7c31848d16 src/main/java/alice/datasegment/RemoteDataSegmentManager.java --- a/src/main/java/alice/datasegment/RemoteDataSegmentManager.java Thu Apr 30 20:52:21 2015 +0900 +++ b/src/main/java/alice/datasegment/RemoteDataSegmentManager.java Fri May 01 18:19:16 2015 +0900 @@ -18,7 +18,7 @@ public RemoteDataSegmentManager(){} - public RemoteDataSegmentManager(final String connectionKey, final String reverseKey, final String hostName, final int port) { + public RemoteDataSegmentManager(String connectionKey, final String reverseKey, final String hostName, final int port) { logger = Logger.getLogger(connectionKey); connection = new Connection(); connection.name = connectionKey; @@ -59,9 +59,9 @@ */ @Override public void put(String key, ReceiveData rData, SendOption option) { - Command cmd = new Command(CommandType.PUT, null, key, rData, 0, 0, null, null, "");///set compressed flag - cmd.setCompressFlag(option.getCompressFlag()); - if (option.getQuickFlag()){ + Command cmd = new Command(CommandType.PUT, null, key, rData, 0, 0, null, null, ""); + cmd.setCompressFlag(option.isCompress()); + if (option.isQuick()){ connection.write(cmd); // put command is executed right now } else { connection.sendCommand(cmd); // put command on the transmission thread @@ -73,8 +73,8 @@ @Override public void update(String key, ReceiveData rData, SendOption option) { Command cmd = new Command(CommandType.UPDATE, null, key, rData, 0, 0, null, null, ""); - cmd.setCompressFlag(option.getCompressFlag()); - if (option.getQuickFlag()){ + cmd.setCompressFlag(option.isCompress()); + if (option.isQuick()){ connection.write(cmd); } else { connection.sendCommand(cmd); @@ -87,11 +87,9 @@ public void take(Receiver receiver, CodeSegment cs, SendOption option) { int seq = this.seq.getAndIncrement(); Command cmd = new Command(CommandType.TAKE, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null); - //cmd.setQuickFlag(option.getQuickFlag()); - cmd.setCompressFlag(option.getCompressFlag()); - + cmd.setQuickFlag(option.isQuick()); seqHash.put(seq, cmd); - if (option.getQuickFlag()){ + if (option.isQuick()){ connection.write(cmd); } else { connection.sendCommand(cmd); @@ -104,11 +102,9 @@ public void peek(Receiver receiver, CodeSegment cs, SendOption option) { int seq = this.seq.getAndIncrement(); Command cmd = new Command(CommandType.PEEK, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null); - cmd.setQuickFlag(option.getQuickFlag()); - //cmd.setCompressFlag(option.getCompressFlag()); - + cmd.setQuickFlag(option.isQuick()); seqHash.put(seq, cmd); - if (option.getQuickFlag()){ + if (option.isQuick()){ connection.write(cmd); } else { connection.sendCommand(cmd); diff -r 6ebddfac7ff6 -r cb7c31848d16 src/main/java/alice/datasegment/SendOption.java --- a/src/main/java/alice/datasegment/SendOption.java Thu Apr 30 20:52:21 2015 +0900 +++ b/src/main/java/alice/datasegment/SendOption.java Fri May 01 18:19:16 2015 +0900 @@ -1,30 +1,27 @@ package alice.datasegment; -/** - * フラグを一時的に格納するだけ。 - */ public class SendOption { - private boolean quickFlag = false; - private boolean compressFlag = false; - + private boolean quick = false; + private boolean compress = false; + public SendOption(boolean qFlag, boolean cFlag) { - quickFlag = qFlag; - compressFlag = cFlag; + quick = qFlag; + compress = cFlag; } - public boolean getQuickFlag() { - return quickFlag; + public boolean isQuick() { + return quick; } - public void setQuickFlag(boolean quick) { - this.quickFlag = quick; + public void setQuick(boolean quick) { + this.quick = quick; } - public boolean getCompressFlag() { - return compressFlag; + public boolean isCompress() { + return compress; } - public void setCompressFlag(boolean compress) { - this.compressFlag = compress; + public void setCompress(boolean compress) { + this.compress = compress; } } diff -r 6ebddfac7ff6 -r cb7c31848d16 src/main/java/alice/test/topology/aquarium/fx/CheckAllFishInfoExist.java --- a/src/main/java/alice/test/topology/aquarium/fx/CheckAllFishInfoExist.java Thu Apr 30 20:52:21 2015 +0900 +++ b/src/main/java/alice/test/topology/aquarium/fx/CheckAllFishInfoExist.java Fri May 01 18:19:16 2015 +0900 @@ -10,7 +10,6 @@ import org.msgpack.type.Value; import alice.codesegment.CodeSegment; -import alice.codesegment.SingletonMessage; import alice.datasegment.CommandType; import alice.datasegment.Receiver; @@ -31,11 +30,11 @@ new SendDataOthers(info, info.key); ods.put("registeredList", info.getVal()); ArrayValue fishInfoList = info.asClass(Value.class).asArrayValue(); - MessagePack msg = SingletonMessage.getInstance(); + MessagePack packer = new MessagePack(); Group root = info1.asClass(Group.class); for (Value v : fishInfoList){ boolean exist = false; - FishInfo info = msg.convert(v, FishInfo.class); + FishInfo info = packer.convert(v, FishInfo.class); if (info.name != null) { for (Node n : root.getChildren()) { if (info.name.equals(n.getId())) {