Mercurial > hg > Database > Alice
changeset 572:ef3dc954eb43 dispose
delete serializeFlag
author | Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp> |
---|---|
date | Sun, 13 Dec 2015 23:49:46 +0900 |
parents | 80a6c4a1c601 |
children | fa3c8424dea4 |
files | src/main/java/alice/codesegment/OutputDataSegment.java src/main/java/alice/daemon/CommandMessage.java src/main/java/alice/daemon/IncomingTcpConnection.java src/main/java/alice/daemon/IncomingUdpConnection.java src/main/java/alice/datasegment/Command.java src/main/java/alice/datasegment/CompressedLocalDataSegmentManager.java src/main/java/alice/datasegment/DataSegment.java src/main/java/alice/datasegment/DataSegmentKey.java src/main/java/alice/datasegment/LocalDataSegmentManager.java src/main/java/alice/datasegment/ReceiveData.java src/main/java/alice/test/codesegment/remote/RemoteIncrement.java src/main/java/alice/test/codesegment/remote/RemoteStartCodeSegment.java |
diffstat | 12 files changed, 35 insertions(+), 262 deletions(-) [+] |
line wrap: on
line diff
--- a/src/main/java/alice/codesegment/OutputDataSegment.java Tue Dec 08 16:29:09 2015 +0900 +++ b/src/main/java/alice/codesegment/OutputDataSegment.java Sun Dec 13 23:49:46 2015 +0900 @@ -16,11 +16,7 @@ * input→ds変更→outputのときコピーを防ぐ */ public void flip(Receiver receiver) { - if (receiver.isCompressed()){ - DataSegment.getCompressedLocal().put(receiver.key, receiver.getReceiveData(), false); - } else { - DataSegment.getLocal().put(receiver.key, receiver.getReceiveData(), false); - } + DataSegment.getLocal().put(receiver.key, receiver.getReceiveData(), false); } public void flip(String managerKey, String key, Receiver receiver){ @@ -35,19 +31,10 @@ public void flip(Receiver receiver, CommandType type) { switch (type) { case PUT: - if (receiver.isCompressed()){ - DataSegment.getCompressedLocal().put(receiver.key, receiver.getReceiveData(), false);//localなら全部false。 - } else { - DataSegment.getLocal().put(receiver.key, receiver.getReceiveData(), false); - } + DataSegment.getLocal().put(receiver.key, receiver.getReceiveData(), false); break; case UPDATE: - if (receiver.isCompressed()){ - DataSegment.getCompressedLocal().update(receiver.key, receiver.getReceiveData(), false); - } else { - DataSegment.getLocal().update(receiver.key, receiver.getReceiveData(), false); - } - + DataSegment.getLocal().update(receiver.key, receiver.getReceiveData(), false); break; default: break;
--- a/src/main/java/alice/daemon/CommandMessage.java Tue Dec 08 16:29:09 2015 +0900 +++ b/src/main/java/alice/daemon/CommandMessage.java Sun Dec 13 23:49:46 2015 +0900 @@ -12,7 +12,6 @@ public int seq;//DSの待ち合わせを行っているCSを表すunique number public String key;//DS key public boolean quickFlag = false;//SEDAを挟まずに処理を行うかどうか - public boolean serialized = false;//シリアライズされているかどうか public boolean compressed = false;//圧縮されているかどうか public int dataSize = 0;//圧縮前のサイズ @@ -26,13 +25,12 @@ public CommandMessage() {} public CommandMessage(int type, int index, int seq, String key - , boolean qFlag, boolean sFlag, boolean cFlag, int datasize) { + , boolean qFlag, boolean cFlag, int datasize) { this.type = type; this.index = index; this.seq = seq; this.key = key; this.quickFlag = qFlag; - this.serialized = sFlag; this.compressed = cFlag; this.dataSize = datasize; }
--- a/src/main/java/alice/daemon/IncomingTcpConnection.java Tue Dec 08 16:29:09 2015 +0900 +++ b/src/main/java/alice/daemon/IncomingTcpConnection.java Sun Dec 13 23:49:46 2015 +0900 @@ -16,7 +16,6 @@ protected DataSegmentManager manager; protected String reverseKey; private LocalDataSegmentManager lmanager = DataSegment.getLocal(); - private CompressedLocalDataSegmentManager compressedlmanager = DataSegment.getCompressedLocal(); private static final MessagePack packer = new MessagePack(); public IncomingTcpConnection(DataSegmentManager manager) { @@ -33,10 +32,6 @@ return lmanager; } - public CompressedLocalDataSegmentManager getCompressedLocalDataSegmentManager(){ - return compressedlmanager; - } - /** * pipeline thread for receiving */ @@ -60,11 +55,8 @@ case UPDATE: case PUT: int dataSize = unpacker.readInt(); - if (msg.compressed) { - rData = new ReceiveData(packer.read(unpacker.getSerializedByteArray(dataSize), byte[].class), true, msg.dataSize); - } else { - rData = new ReceiveData(unpacker.getSerializedByteArray(dataSize), false, msg.dataSize); - } + rData = new ReceiveData(unpacker.getSerializedByteArray(dataSize), msg.compressed, msg.dataSize); + if (msg.setTime) { rData.setTimes(msg.time, true, msg.depth); @@ -78,11 +70,7 @@ cmd = new Command(type, null, null, rData, 0, 0, null, null, reverseKey); cmd.setCompressFlag(msg.compressed); - if (rData.compressed()){ - compressedlmanager.getDataSegmentKey(msg.key).runCommand(cmd); - } else { - lmanager.getDataSegmentKey(msg.key).runCommand(cmd); - } + lmanager.getDataSegmentKey(msg.key).runCommand(cmd); break; case PEEK: @@ -90,11 +78,7 @@ cmd = new Command(type, null, null, null, msg.index, msg.seq, null, null, connection); cmd.setCompressFlag(msg.compressed); - if (msg.compressed){ - compressedlmanager.getDataSegmentKey(msg.key).runCommand(cmd); - } else { - lmanager.getDataSegmentKey(msg.key).runCommand(cmd); - } + lmanager.getDataSegmentKey(msg.key).runCommand(cmd); break; case REMOVE: @@ -104,11 +88,7 @@ case REPLY: cmd = manager.getAndRemoveCmd(msg.seq); - if (msg.compressed) { - rData = new ReceiveData(packer.read(unpacker.getSerializedByteArray(unpacker.readInt()), byte[].class), true, msg.dataSize); - } else { - rData = new ReceiveData(unpacker.getSerializedByteArray(unpacker.readInt()), false, msg.dataSize); - } + rData = new ReceiveData(unpacker.getSerializedByteArray(unpacker.readInt()), msg.compressed, msg.dataSize); Command rCmd = new Command(type, null, null, rData, msg.index, msg.seq, null, null, ""); cmd.setCompressFlag(msg.compressed);
--- a/src/main/java/alice/daemon/IncomingUdpConnection.java Tue Dec 08 16:29:09 2015 +0900 +++ b/src/main/java/alice/daemon/IncomingUdpConnection.java Sun Dec 13 23:49:46 2015 +0900 @@ -50,11 +50,8 @@ rData = new ReceiveData(unpacker.getSerializedByteArray(unpacker.readInt())); cmd = new Command(type, null, null, rData, 0, 0, null, null, reverseKey); - if (msg.compressed){ - getCompressedLocalDataSegmentManager().getDataSegmentKey(msg.key).runCommand(cmd); - } else { - getLocalDataSegmentManager().getDataSegmentKey(msg.key).runCommand(cmd); - } + getLocalDataSegmentManager().getDataSegmentKey(msg.key).runCommand(cmd); + break; case PEEK: case TAKE: @@ -62,11 +59,8 @@ cmd.setQuickFlag(msg.quickFlag); cmd.setCompressFlag(msg.compressed); - if (msg.compressed) { - getCompressedLocalDataSegmentManager().getDataSegmentKey(msg.key).runCommand(cmd); - } else { - getLocalDataSegmentManager().getDataSegmentKey(msg.key).runCommand(cmd); - } + getLocalDataSegmentManager().getDataSegmentKey(msg.key).runCommand(cmd); + break; case REMOVE: cmd = new Command(type, null, null, null, 0, 0, null, null, "");
--- a/src/main/java/alice/datasegment/Command.java Tue Dec 08 16:29:09 2015 +0900 +++ b/src/main/java/alice/datasegment/Command.java Sun Dec 13 23:49:46 2015 +0900 @@ -80,8 +80,6 @@ byte[] header = null; byte[] data = null; byte[] dataSize = null; - boolean serialized = false; - boolean compressed = false; switch (type) { /* * UPDATE, PUT, REPLY need send DataSegment to RemoteDataSegment @@ -95,15 +93,12 @@ case PUT: case REPLY: if(compressFlag){ - // ToDo: Do not pack again - data = packer.write(rData.getZMessagePack()); - compressed = true; + data = rData.getZMessagePack(); } else { data = rData.getMessagePack(); - serialized = true; } - CommandMessage cm = new CommandMessage(type.id, index, seq, key, false, serialized, compressed, rData.getDataSize()); + CommandMessage cm = new CommandMessage(type.id, index, seq, key, false, compressFlag, rData.getDataSize()); if (rData.getSetTime()) { cm.setTime = true; cm.time = rData.getTime(); @@ -123,7 +118,7 @@ buf.put(data); break; default: - header = packer.write(new CommandMessage(type.id, index, seq, key, quickFlag, false, compressFlag, 0)); + header = packer.write(new CommandMessage(type.id, index, seq, key, quickFlag, compressFlag, 0)); buf = ByteBuffer.allocate(header.length); buf.put(header); break;
--- a/src/main/java/alice/datasegment/CompressedLocalDataSegmentManager.java Tue Dec 08 16:29:09 2015 +0900 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,146 +0,0 @@ -package alice.datasegment; - -import java.io.IOException; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; - -import org.apache.log4j.Logger; - -import alice.codesegment.CodeSegment; - -public class CompressedLocalDataSegmentManager extends DataSegmentManager { - - LocalDataSegmentManager manager; - private String reverseKey = "compressedlocal"; - - public CompressedLocalDataSegmentManager(LocalDataSegmentManager manager) { - this.manager = manager; - new Thread(replyThread, "CompressedLocalDataSegmentManager-replyCommand").start(); - } - - public void setReverseKey(String s){ - reverseKey = s; - } - - private class RunCommand implements Runnable { - - DataSegmentKey key; - Command cmd; - - public RunCommand(DataSegmentKey key, Command cmd) { - this.key = key; - this.cmd = cmd; - } - - @Override - public void run() { - key.runCommand(cmd); - } - - } - - public void submitCommand(DataSegmentKey key, Command cmd) { - manager.submitCommand(key, cmd); - } - - public DataSegmentKey getDataSegmentKey(String key) { - return manager.getDataSegmentKey(key); - } - - public void removeDataSegmentKey(String key) { - manager.removeDataSegmentKey(key); - } - - @Override - public void put(String key, ReceiveData rData, boolean quickFlag) { - if (!rData.compressed()){ - try { - rData.zip(); - } catch (IOException e) { - e.printStackTrace(); - } - } - - Command cmd = new Command(CommandType.PUT, null, key, rData, 0, 0, null, null, reverseKey); - cmd.setCompressFlag(true); - - manager.put1(key, cmd); - } - - /** - * Enqueue update command to the queue of each DataSegment key - */ - - @Override - public void update(String key, ReceiveData rData, boolean quickFlag) { - - if (!rData.compressed()){ - try { - rData.zip(); - } catch (IOException e) { - e.printStackTrace(); - } - } - - Command cmd = new Command(CommandType.UPDATE, null, key, rData, 0, 0, null, null, reverseKey); - cmd.setCompressFlag(true); - - manager.put1(key, cmd); - } - - @Override - public void take(Receiver receiver, CodeSegment cs, boolean quickFlag) { - Command cmd = new Command(CommandType.TAKE, receiver, receiver.key, null, receiver.index, 0, replyQueue, cs, null); - cmd.setCompressFlag(true); - - manager.take1(receiver, cmd); - } - - @Override - public void peek(Receiver receiver, CodeSegment cs, boolean quickFlag) { - Command cmd = new Command(CommandType.PEEK, receiver, receiver.key, null, receiver.index, 0, replyQueue, cs, null); - cmd.setCompressFlag(true); - - manager.take1(receiver, cmd); - } - - @Override - public void remove(String key) { - manager.remove(key); - } - - @Override public void finish() { - manager.finish(); - } - - @Override - public void close() { - - } - - public void recommand(Receiver receiver, CodeSegment cs) { - manager.recommand(receiver, cs); - } - - @Override - public void ping(String returnKey) { - - } - - @Override - public void response(String returnKey) { - - } - - @Override - public void shutdown() { - - } - - @Override - public void setSendError(boolean b) { - - } -}
--- a/src/main/java/alice/datasegment/DataSegment.java Tue Dec 08 16:29:09 2015 +0900 +++ b/src/main/java/alice/datasegment/DataSegment.java Sun Dec 13 23:49:46 2015 +0900 @@ -10,13 +10,11 @@ private static DataSegment dataSegment = new DataSegment(); private LocalDataSegmentManager local = new LocalDataSegmentManager(); - private CompressedLocalDataSegmentManager compressedLocal = new CompressedLocalDataSegmentManager(local);//追加 private ConcurrentHashMap<String, DataSegmentManager> dataSegmentManagers = new ConcurrentHashMap<String, DataSegmentManager>(); //TODO Over Head private ConcurrentHashMap<String, IncomingTcpConnection> acceptHash = new ConcurrentHashMap<String, IncomingTcpConnection>(); private DataSegment() { dataSegmentManagers.put("local", local); - dataSegmentManagers.put("compressedlocal", compressedLocal); } public static DataSegmentManager get(String key) { @@ -31,16 +29,12 @@ return dataSegment.local; } - public static CompressedLocalDataSegmentManager getCompressedLocal() {//追加 - return dataSegment.compressedLocal; - } - public static void register(String key, DataSegmentManager manager) { dataSegment.dataSegmentManagers.put(key, manager); } - public static RemoteDataSegmentManager connect(String connectionKey, String reverseKey, String hostName, int port) { - if (connectionKey.startsWith("compressed")){//compressedが含まれていたらエラーを返して終了 + public static RemoteDataSegmentManager connect(String connectionKey, String reverseKey, String hostName, int port) {//create RemoteDSM + if (connectionKey.startsWith("compressed")){ System.out.println("You can't use 'compressed' for DataSegmentManager name."); System.exit(0); }
--- a/src/main/java/alice/datasegment/DataSegmentKey.java Tue Dec 08 16:29:09 2015 +0900 +++ b/src/main/java/alice/datasegment/DataSegmentKey.java Sun Dec 13 23:49:46 2015 +0900 @@ -6,7 +6,7 @@ import alice.datasegment.Command; /** - * run command + * This class has DataSegment value and run command method * * Synchronized DataSegment for each DataSegment key * @author kazz
--- a/src/main/java/alice/datasegment/LocalDataSegmentManager.java Tue Dec 08 16:29:09 2015 +0900 +++ b/src/main/java/alice/datasegment/LocalDataSegmentManager.java Sun Dec 13 23:49:46 2015 +0900 @@ -15,12 +15,6 @@ private ConcurrentHashMap<String, DataSegmentKey> dataSegments = new ConcurrentHashMap<String, DataSegmentKey>(); private Logger logger = Logger.getLogger("local"); - private ThreadPoolExecutor dataSegmentExecutor = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), // initial number of threads - Runtime.getRuntime().availableProcessors(), - Integer.MAX_VALUE, // keepAliveTime - TimeUnit.SECONDS, - new LinkedBlockingQueue<Runnable>()); - public LocalDataSegmentManager() { //new Thread(replyThread, "LocalDataSegmentManager-replyCommand").start(); } @@ -29,27 +23,6 @@ reverseKey = s; } - private class RunCommand implements Runnable { - - DataSegmentKey key; - Command cmd; - - public RunCommand(DataSegmentKey key, Command cmd) { - this.key = key; - this.cmd = cmd; - } - - @Override - public void run() { - key.runCommand(cmd); - } - - } - - public void submitCommand(DataSegmentKey key, Command cmd) { - dataSegmentExecutor.execute(new RunCommand(key, cmd)); - } - public DataSegmentKey getDataSegmentKey(String key) { DataSegmentKey dsKey = dataSegments.get(key); if (dsKey != null) @@ -72,10 +45,10 @@ @Override public void put(String key, ReceiveData rData, boolean quickFlag) { Command cmd = new Command(CommandType.PUT, null, key, rData, 0, 0, null, null, reverseKey); - put1(key, cmd); + runOdsAPI(key, cmd); } - public void put1(String key, Command cmd) { + private void runOdsAPI(String key, Command cmd) { DataSegmentKey dataSegmentKey = getDataSegmentKey(key); dataSegmentKey.runCommand(cmd); if (logger.isDebugEnabled()) @@ -89,20 +62,18 @@ @Override public void update(String key, ReceiveData rData, boolean quickFlag) { Command cmd = new Command(CommandType.UPDATE, null, key, rData, 0, 0, null, null, reverseKey); - put1(key, cmd); + runOdsAPI(key, cmd); } @Override public void take(Receiver receiver, CodeSegment cs, boolean quickFlag) { - Command cmd = new Command(CommandType.TAKE, receiver, receiver.key, null, receiver.index, 0, replyQueue, cs, null); - take1(receiver, cmd); + runIdsAPI(receiver, cmd); } - public void take1(Receiver receiver, Command cmd) { + private void runIdsAPI(Receiver receiver, Command cmd) { int seq = this.seq.getAndIncrement(); cmd.setSeq(seq); - //seqHash.put(seq, cmd); DataSegmentKey dataSegmentKey = getDataSegmentKey(receiver.key); dataSegmentKey.runCommand(cmd); if (logger.isDebugEnabled()) @@ -112,7 +83,7 @@ @Override public void peek(Receiver receiver, CodeSegment cs, boolean quickFlag) { Command cmd = new Command(CommandType.PEEK, receiver, receiver.key, null, receiver.index, 0, replyQueue, cs, null); - take1(receiver, cmd); + runIdsAPI(receiver, cmd); } @Override
--- a/src/main/java/alice/datasegment/ReceiveData.java Tue Dec 08 16:29:09 2015 +0900 +++ b/src/main/java/alice/datasegment/ReceiveData.java Sun Dec 13 23:49:46 2015 +0900 @@ -104,15 +104,15 @@ public <T> T asClass(Class<T> clazz) {///javasist try { - if (val != null) { - return (T) val; + if (val == null) { + if (zMessagePack != null && messagePack == null) { + messagePack = unzip(zMessagePack, dataSize); + } + + val = packer.read(messagePack, clazz); } - if (zMessagePack != null && messagePack == null) { - messagePack = unzip(zMessagePack, dataSize); - } - - return packer.read(messagePack, clazz); + return (T) val; } catch (IOException e) {// | DataFormatException e e.printStackTrace();
--- a/src/main/java/alice/test/codesegment/remote/RemoteIncrement.java Tue Dec 08 16:29:09 2015 +0900 +++ b/src/main/java/alice/test/codesegment/remote/RemoteIncrement.java Sun Dec 13 23:49:46 2015 +0900 @@ -18,9 +18,9 @@ if (num == 10) System.exit(0); RemoteIncrement cs = new RemoteIncrement(); + + ods.put("local", "num", num); cs.num.setKey("compressedremote", "num"); - - ods.put("compressedlocal", "num", num); } } \ No newline at end of file
--- a/src/main/java/alice/test/codesegment/remote/RemoteStartCodeSegment.java Tue Dec 08 16:29:09 2015 +0900 +++ b/src/main/java/alice/test/codesegment/remote/RemoteStartCodeSegment.java Sun Dec 13 23:49:46 2015 +0900 @@ -7,8 +7,8 @@ @Override public void run() { RemoteIncrement cs = new RemoteIncrement(); + ods.put("local", "num", 0); cs.num.setKey("compressedremote", "num"); - ods.put("compressedlocal", "num", 0); } } \ No newline at end of file