Mercurial > hg > Database > Alice
changeset 533:b3c9554ccb1b dispose
change compressed API to set data specified DSM name
line wrap: on
line diff
--- a/src/main/java/alice/codesegment/InputDataSegment.java Sun May 03 19:40:24 2015 +0900 +++ b/src/main/java/alice/codesegment/InputDataSegment.java Mon May 04 16:02:22 2015 +0900 @@ -32,102 +32,23 @@ public void quickPeek(Receiver receiver) {//SEDAで実行 cs.register(receiver); - - if (receiver.compressedFlag){ - SendOption option = new SendOption(true, true); - if (receiver.managerKey == null){//localの場合 - DataSegment.getCompressedLocal().peek(receiver, cs, option); - } else { - if (DataSegment.contains(receiver.managerKey)) {//remoteの場合 - DataSegment.get(receiver.managerKey + "!").peek(receiver, cs, option); - } - } - } else { - SendOption option = new SendOption(true, false); - if (receiver.managerKey == null){ - DataSegment.getLocal().peek(receiver, cs, option); - } else { - if (DataSegment.contains(receiver.managerKey)) { - DataSegment.get(receiver.managerKey).peek(receiver, cs, option); - } - } - } - + DataSegment.get(receiver.managerKey).peek(receiver, cs, true); } public void peek(Receiver receiver) { cs.register(receiver); - - if (receiver.compressedFlag){ - SendOption option = new SendOption(false, true); - if (receiver.managerKey==null){ - DataSegment.getCompressedLocal().peek(receiver, cs, option); - } else { - if (DataSegment.contains(receiver.managerKey)) { - DataSegment.get(receiver.managerKey + "!").peek(receiver, cs, option); - } - } - } else { - SendOption option = new SendOption(false, false); - if (receiver.managerKey==null){ - DataSegment.getLocal().peek(receiver, cs, option); - } else { - if (DataSegment.contains(receiver.managerKey)) { - DataSegment.get(receiver.managerKey).peek(receiver, cs, option); - } - } - } - + DataSegment.get(receiver.managerKey).peek(receiver, cs, false); } public void quickTake(Receiver receiver) { cs.register(receiver); - - if (receiver.compressedFlag){ - SendOption option = new SendOption(true, true); - if (receiver.managerKey==null){ - DataSegment.getCompressedLocal().take(receiver, cs, option); - } else { - if (DataSegment.contains(receiver.managerKey)) { - DataSegment.get(receiver.managerKey + "!").take(receiver, cs, option); - } - } - } else { - SendOption option = new SendOption(true, false); - if (receiver.managerKey==null){ - DataSegment.getLocal().take(receiver, cs, option); - } else { - if (DataSegment.contains(receiver.managerKey)) { - DataSegment.get(receiver.managerKey).take(receiver, cs, option); - } - } - } + DataSegment.get(receiver.managerKey).take(receiver, cs, true); } public void take(Receiver receiver) { - System.out.println("in TAKE"); cs.register(receiver); - - if (receiver.compressedFlag){ - SendOption option = new SendOption(false, true); - if (receiver.managerKey==null){// 指定なしの場合デフォはローカルになる - DataSegment.getCompressedLocal().take(receiver, cs, option); - } else { - if (DataSegment.contains(receiver.managerKey)) { - DataSegment.get(receiver.managerKey + "!").take(receiver, cs, option); - } - } - } else { - SendOption option = new SendOption(false, false); - if (receiver.managerKey==null){ - DataSegment.getLocal().take(receiver, cs, option); - } else { - if (DataSegment.contains(receiver.managerKey)) { - DataSegment.get(receiver.managerKey).take(receiver, cs, option); - } - } - } + DataSegment.get(receiver.managerKey).take(receiver, cs, false); } public void reply(Receiver receiver, Command reply) { @@ -163,10 +84,6 @@ return new Receiver(this, type); }//Receiverを作成 - public Receiver create(CommandType type, boolean compressFlag) {//追加 - return new Receiver(this, type, compressFlag); - } - public void recommand(Receiver receiver) { // TODO why only local? DataSegment.getLocal().recommand(receiver, cs);
--- a/src/main/java/alice/codesegment/OutputDataSegment.java Sun May 03 19:40:24 2015 +0900 +++ b/src/main/java/alice/codesegment/OutputDataSegment.java Mon May 04 16:02:22 2015 +0900 @@ -17,198 +17,85 @@ */ public void flip(Receiver receiver) { if (receiver.isCompressed()){ - DataSegment.getCompressedLocal().put(receiver.key, receiver.getReceiveData(), null); + DataSegment.getCompressedLocal().put(receiver.key, receiver.getReceiveData(), false); } else { - DataSegment.getLocal().put(receiver.key, receiver.getReceiveData(), null); + DataSegment.getLocal().put(receiver.key, receiver.getReceiveData(), false); } } - public void flip(Receiver receiver, CommandType type) { + public void flip(Receiver receiver, CommandType type) {// ToDo: add remote switch (type) { - case PUT: - if (receiver.isCompressed()){ - DataSegment.getCompressedLocal().put(receiver.key, receiver.getReceiveData(), null);//localなら全部false。 - } else { - DataSegment.getLocal().put(receiver.key, receiver.getReceiveData(), null); - } - break; - case UPDATE: - if (receiver.isCompressed()){ - DataSegment.getCompressedLocal().update(receiver.key, receiver.getReceiveData(), null); - } else { - DataSegment.getLocal().update(receiver.key, receiver.getReceiveData(), null); - } + case PUT: + if (receiver.isCompressed()){ + DataSegment.getCompressedLocal().put(receiver.key, receiver.getReceiveData(), false);//localなら全部false。 + } else { + DataSegment.getLocal().put(receiver.key, receiver.getReceiveData(), false); + } + break; + case UPDATE: + if (receiver.isCompressed()){ + DataSegment.getCompressedLocal().update(receiver.key, receiver.getReceiveData(), false); + } else { + DataSegment.getLocal().update(receiver.key, receiver.getReceiveData(), false); + } - break; - default: - break; + break; + default: + break; } } public void put(String key, ReceiveData rData) { - DataSegment.getLocal().put(key, rData, null); + DataSegment.getLocal().put(key, rData, false); } public void put(String key, Object val) { ReceiveData rData = new ReceiveData(val); - DataSegment.getLocal().put(key, rData, new SendOption(false, false)); - } - - public void put(String key, Object val, boolean cFlag) {///追加 - ReceiveData rData = new ReceiveData(val); - DataSegment.getLocal().put(key, rData, new SendOption(false, cFlag)); + DataSegment.getLocal().put(key, rData, false); } public void update(String key, Object val) { ReceiveData rData = new ReceiveData(val); - DataSegment.getLocal().update(key, rData, new SendOption(false, false)); - } - - public void update(String key, Object val, boolean cFlag) {///追加 - ReceiveData rData = new ReceiveData(val); - DataSegment.getLocal().update(key, rData, new SendOption(false, cFlag)); + DataSegment.getLocal().update(key, rData, false); } /** * for remote */ public void put(String managerKey, String key, ReceiveData rData) { - if (!managerKey.equals("local") && DataSegment.contains(managerKey)){ - SendOption option = new SendOption(false, rData.compressed()); - //SendOption option = new SendOption(false, compressFlag); - - if (rData.compressed()){ - DataSegment.get(managerKey + "!").put(key, rData, option); - } else { - DataSegment.get(managerKey).put(key, rData, option); - } - } else { - put(key, rData); - } - } - - public void put(String managerKey, String key, Object val) { - put(managerKey, key, val, false); + DataSegment.get(managerKey).put(key, rData, false); } - public void put(String managerKey, String key, Object val, boolean cFlag) {//追加 - System.out.println("in PUT"); + public void put(String managerKey, String key, Object val) {//追加 ReceiveData rData = new ReceiveData(val); - SendOption option = new SendOption(false, cFlag); - - if (!managerKey.equals("local") && DataSegment.contains(managerKey)){///if remote - if (cFlag){ - DataSegment.get(managerKey + "!").put(key, rData, option); - } else { - DataSegment.get(managerKey).put(key, rData, option); - } - } else {// if local - if (cFlag){ - DataSegment.getCompressedLocal().put(key, rData, option); - } else { - put(key, val); - } - } + DataSegment.get(managerKey).put(key, rData, false); } public void quickPut(String managerKey, String key, ReceiveData rData) { - if (!managerKey.equals("local") && DataSegment.contains(managerKey)){ - SendOption option = new SendOption(true, false); - if (rData.compressed()){ - DataSegment.get(managerKey + "!").put(key, rData, option); - } else { - DataSegment.get(managerKey).put(key, rData, option); - } - } else { - put(key, rData); - } + DataSegment.get(managerKey).put(key, rData, true); } public void quickPut(String managerKey, String key, Object val) { - quickPut(managerKey, key, val, false); - } - - public void quickPut(String managerKey, String key, Object val, boolean cFlag) {//追加 - if (!managerKey.equals("local") && DataSegment.contains(managerKey)){ - ReceiveData rData = new ReceiveData(val); - SendOption option = new SendOption(true, cFlag); - - if (cFlag){ - DataSegment.get(managerKey + "!").put(key, rData, option); - } else { - DataSegment.get(managerKey).put(key, rData, option); - } - } else { - put(key, val); - } + ReceiveData rData = new ReceiveData(val); + DataSegment.get(managerKey).put(key, rData, true); } public void update(String managerKey, String key, ReceiveData rData) { - if (!managerKey.equals("local") && DataSegment.contains(managerKey)){ - SendOption option = new SendOption(false, rData.compressed()); - if (rData.compressed()){ - DataSegment.get(managerKey + "!").update(key, rData, option); - } else { - DataSegment.get(managerKey).update(key, rData, option); - } - } else { - update(key, rData); - } + DataSegment.get(managerKey).update(key, rData, false); } public void update(String managerKey, String key, Object val) { - update(managerKey, key, val, false); - } - - public void update(String managerKey, String key, Object val, boolean cFlag) {//追加 ReceiveData rData = new ReceiveData(val); - SendOption option = new SendOption(false, cFlag); - - if (!managerKey.equals("local") && DataSegment.contains(managerKey)){///if remote - if (cFlag){ - DataSegment.get(managerKey + "!").update(key, rData, option); - } else { - DataSegment.get(managerKey).update(key, rData, option); - } - } else {// if local - if (cFlag){ - DataSegment.getCompressedLocal().update(key, rData, option); - } else { - update(key, val); - } - } + DataSegment.get(managerKey).update(key, rData, false); } public void quickUpdate(String managerKey, String key, ReceiveData rData) { - if (!managerKey.equals("local") && DataSegment.contains(managerKey)){ - SendOption option = new SendOption(true, rData.compressed()); - if (rData.compressed()){ - DataSegment.get(managerKey + "!").update(key, rData, option); - } else { - DataSegment.get(managerKey).update(key, rData, option); - } - } else { - update(key, rData); - } + DataSegment.get(managerKey).update(key, rData, true); } - public void quickUpdate(String managerKey, String key, Object val) { - quickUpdate(managerKey, key, val, false); - } - - public void quickUpdate(String managerKey, String key, Object val, boolean cFlag) {//追加 - if (!managerKey.equals("local") && DataSegment.contains(managerKey)){ - ReceiveData rData = new ReceiveData(val); - SendOption option = new SendOption(true, cFlag); - //rData.setCompressFlag(cFlag); - if (cFlag){ - DataSegment.get(managerKey + "!").update(key, rData, option); - } else { - DataSegment.get(managerKey).update(key, rData, option); - } - } else { - update(key, val); - } + public void quickUpdate(String managerKey, String key, Object val, boolean cFlag) { + ReceiveData rData = new ReceiveData(val); + DataSegment.get(managerKey).update(key, rData, true); } /**
--- a/src/main/java/alice/daemon/Connection.java Sun May 03 19:40:24 2015 +0900 +++ b/src/main/java/alice/daemon/Connection.java Mon May 04 16:02:22 2015 +0900 @@ -59,10 +59,9 @@ if (name!=null){ ConnectionInfo c = new ConnectionInfo(name, socket); ReceiveData rData = new ReceiveData(c); - DataSegment.getLocal().put("_DISCONNECT", rData, null); + DataSegment.getLocal().put("_DISCONNECT", rData, false); if (sendManager) { - SendOption option = new SendOption(false, false); - DataSegment.get("manager").put("_DISCONNECTNODE", rData, option); + DataSegment.get("manager").put("_DISCONNECTNODE", rData, false); sendManager = false; } }
--- a/src/main/java/alice/daemon/IncomingTcpConnection.java Sun May 03 19:40:24 2015 +0900 +++ b/src/main/java/alice/daemon/IncomingTcpConnection.java Mon May 04 16:02:22 2015 +0900 @@ -59,8 +59,11 @@ switch (type) { case UPDATE: case PUT: - System.out.println("in TCP PUT"); - rData = new ReceiveData(packer.read(unpacker.getSerializedByteArray(unpacker.readInt()), byte[].class), msg.compressed, msg.dataSize);///read rData + if (msg.compressed) { + rData = new ReceiveData(packer.read(unpacker.getSerializedByteArray(unpacker.readInt()), byte[].class), true, msg.dataSize); + } else { + rData = new ReceiveData(unpacker.getSerializedByteArray(unpacker.readInt()), false, msg.dataSize); + } if (msg.setTime) { rData.setTime = true; @@ -80,7 +83,6 @@ break; case PEEK: case TAKE: - System.out.println("in TCP TAKE"); cmd = new Command(type, null, null, null, msg.index, msg.seq, null, null, connection); cmd.setCompressFlag(msg.compressed); @@ -96,12 +98,13 @@ lmanager.getDataSegmentKey(msg.key).runCommand(cmd);//ToDo:fix break; case REPLY: - System.out.println("in TCP REPLY"); cmd = manager.getAndRemoveCmd(msg.seq); - // ToDo: do not pack again on compressedMessagePack - // after size of copressedData get to ByteBuffer of compressed data using - // readPayloadAsReference().toByteBuffer() - rData = new ReceiveData(packer.read(unpacker.getSerializedByteArray(unpacker.readInt()), byte[].class), msg.compressed, msg.dataSize); + + if (msg.compressed) { + rData = new ReceiveData(packer.read(unpacker.getSerializedByteArray(unpacker.readInt()), byte[].class), true, msg.dataSize); + } else { + rData = new ReceiveData(unpacker.getSerializedByteArray(unpacker.readInt()), false, msg.dataSize); + } Command rCmd = new Command(type, null, null, rData, msg.index, msg.seq, null, null, ""); cmd.setCompressFlag(msg.compressed); @@ -113,7 +116,7 @@ break; case RESPONSE: rData = new ReceiveData(new RespondData(reverseKey, System.currentTimeMillis())); - DataSegment.getLocal().put(msg.key, rData, null); + DataSegment.getLocal().put(msg.key, rData, false); break; default: break;
--- a/src/main/java/alice/daemon/IncomingUdpConnection.java Sun May 03 19:40:24 2015 +0900 +++ b/src/main/java/alice/daemon/IncomingUdpConnection.java Mon May 04 16:02:22 2015 +0900 @@ -87,7 +87,7 @@ break; case RESPONSE: rData = new ReceiveData(new RespondData(reverseKey, System.currentTimeMillis())); - DataSegment.getLocal().put(msg.key, rData, null); + DataSegment.getLocal().put(msg.key, rData, false); break; default: break;
--- a/src/main/java/alice/datasegment/CompressedLocalDataSegmentManager.java Sun May 03 19:40:24 2015 +0900 +++ b/src/main/java/alice/datasegment/CompressedLocalDataSegmentManager.java Mon May 04 16:02:22 2015 +0900 @@ -71,15 +71,18 @@ } @Override - public void put(String key, ReceiveData rData, SendOption option) { + public void put(String key, ReceiveData rData, boolean quickFlag) { DataSegmentKey dataSegmentKey = getDataSegmentKey(key); - try { - rData.zip(); - } catch (IOException e) { - e.printStackTrace(); + if (!rData.compressed()){ + try { + rData.zip(); + } catch (IOException e) { + e.printStackTrace(); + } } + Command cmd = new Command(CommandType.PUT, null, key, rData, 0, 0, null, null, reverseKey); - cmd.setCompressFlag(option.isCompress()); + cmd.setCompressFlag(true); dataSegmentKey.runCommand(cmd); if (logger.isDebugEnabled()) @@ -91,15 +94,18 @@ */ @Override - public void update(String key, ReceiveData rData, SendOption option) { + public void update(String key, ReceiveData rData, boolean quickFlag) { DataSegmentKey dataSegmentKey = getDataSegmentKey(key); - try { - rData.zip(); - } catch (IOException e) { - e.printStackTrace(); + if (!rData.compressed()){ + try { + rData.zip(); + } catch (IOException e) { + e.printStackTrace(); + } } + Command cmd = new Command(CommandType.UPDATE, null, key, rData, 0, 0, null, null, reverseKey); - cmd.setCompressFlag(option.isCompress()); + cmd.setCompressFlag(true); dataSegmentKey.runCommand(cmd); if (logger.isDebugEnabled()) @@ -107,11 +113,11 @@ } @Override - public void take(Receiver receiver, CodeSegment cs, SendOption option) { + public void take(Receiver receiver, CodeSegment cs, boolean quickFlag) { DataSegmentKey dataSegmentKey = getDataSegmentKey(receiver.key); int seq = this.seq.getAndIncrement(); Command cmd = new Command(CommandType.TAKE, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null); - cmd.setCompressFlag(option.isCompress()); + cmd.setCompressFlag(true); dataSegmentKey.runCommand(cmd); if (logger.isDebugEnabled()) @@ -119,11 +125,11 @@ } @Override - public void peek(Receiver receiver, CodeSegment cs, SendOption option) { + public void peek(Receiver receiver, CodeSegment cs, boolean quickFlag) { DataSegmentKey dataSegmentKey = getDataSegmentKey(receiver.key); int seq = this.seq.getAndIncrement(); Command cmd = new Command(CommandType.PEEK, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null); - cmd.setCompressFlag(option.isCompress()); + cmd.setCompressFlag(true); dataSegmentKey.runCommand(cmd); if (logger.isDebugEnabled())
--- a/src/main/java/alice/datasegment/CompressedRemoteDataSegmentManager.java Sun May 03 19:40:24 2015 +0900 +++ b/src/main/java/alice/datasegment/CompressedRemoteDataSegmentManager.java Mon May 04 16:02:22 2015 +0900 @@ -58,16 +58,18 @@ * send put command to target DataSegment */ @Override - public void put(String key, ReceiveData rData, SendOption option) { - try { - rData.zip(); - } catch (IOException e) { - e.printStackTrace(); + public void put(String key, ReceiveData rData, boolean quickFlag) { + if (!rData.compressed()){ + try { + rData.zip(); + } catch (IOException e) { + e.printStackTrace(); + } } Command cmd = new Command(CommandType.PUT, null, key, rData, 0, 0, null, null, ""); - cmd.setCompressFlag(option.isCompress());//true + cmd.setCompressFlag(true); - if (option.isQuick()){ + if (quickFlag){ connection.write(cmd); // put command is executed right now } else { connection.sendCommand(cmd); // put command on the transmission thread @@ -77,16 +79,18 @@ } @Override - public void update(String key, ReceiveData rData, SendOption option) { - try { - rData.zip(); - } catch (IOException e) { - e.printStackTrace(); + public void update(String key, ReceiveData rData, boolean quickFlag) { + if (!rData.compressed()){ + try { + rData.zip(); + } catch (IOException e) { + e.printStackTrace(); + } } Command cmd = new Command(CommandType.UPDATE, null, key, rData, 0, 0, null, null, ""); - cmd.setCompressFlag(option.isCompress()); + cmd.setCompressFlag(true); - if (option.isQuick()){ + if (quickFlag){ connection.write(cmd); } else { connection.sendCommand(cmd); @@ -96,14 +100,14 @@ } @Override - public void take(Receiver receiver, CodeSegment cs, SendOption option) { + public void take(Receiver receiver, CodeSegment cs, boolean quickFlag) { int seq = this.seq.getAndIncrement(); Command cmd = new Command(CommandType.TAKE, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null); - cmd.setCompressFlag(option.isCompress()); - cmd.setQuickFlag(option.isQuick()); + cmd.setCompressFlag(true); + cmd.setQuickFlag(quickFlag); seqHash.put(seq, cmd); - if (option.isQuick()){ + if (quickFlag){ connection.write(cmd); } else { connection.sendCommand(cmd); @@ -113,14 +117,14 @@ } @Override - public void peek(Receiver receiver, CodeSegment cs, SendOption option) { + public void peek(Receiver receiver, CodeSegment cs, boolean quickFlag) { int seq = this.seq.getAndIncrement(); Command cmd = new Command(CommandType.PEEK, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null); - cmd.setCompressFlag(option.isCompress()); - cmd.setQuickFlag(option.isQuick()); + cmd.setCompressFlag(true); + cmd.setQuickFlag(quickFlag); seqHash.put(seq, cmd); - if (option.isQuick()){ + if (quickFlag){ connection.write(cmd); } else { connection.sendCommand(cmd); @@ -173,4 +177,5 @@ public void setSendError(boolean b) { connection.sendManager = b; } + }
--- a/src/main/java/alice/datasegment/DataSegment.java Sun May 03 19:40:24 2015 +0900 +++ b/src/main/java/alice/datasegment/DataSegment.java Mon May 04 16:02:22 2015 +0900 @@ -16,7 +16,7 @@ private DataSegment() { dataSegmentManagers.put("local", local); - dataSegmentManagers.put("local!", compressedLocal); + dataSegmentManagers.put("compressedlocal", compressedLocal); } public static DataSegmentManager get(String key) { @@ -36,16 +36,15 @@ } public static RemoteDataSegmentManager connect(String connectionKey, String reverseKey, String hostName, int port) { - if (connectionKey.matches(".*!.*")){//!が含まれていたらエラーを返して終了 - Logger logger = Logger.getLogger("DataSegment"); - logger.error("You can't use '!' for DataSegmentManager name."); + if (connectionKey.startsWith("compressed")){//compressedが含まれていたらエラーを返して終了 + System.out.println("You can't use 'compressed' for DataSegmentManager name."); System.exit(0); } RemoteDataSegmentManager manager = new RemoteDataSegmentManager(connectionKey, reverseKey, hostName, port); CompressedRemoteDataSegmentManager compressedManager = new CompressedRemoteDataSegmentManager(connectionKey + "!", reverseKey, hostName, port); register(connectionKey, manager); - register(connectionKey + "!", compressedManager); + register("compressed" + connectionKey, compressedManager); return manager; }
--- a/src/main/java/alice/datasegment/DataSegmentManager.java Sun May 03 19:40:24 2015 +0900 +++ b/src/main/java/alice/datasegment/DataSegmentManager.java Mon May 04 16:02:22 2015 +0900 @@ -51,10 +51,10 @@ } //各コマンドの抽象クラス - public abstract void put(String key, ReceiveData rData, SendOption option); - public abstract void update(String key, ReceiveData rData, SendOption option); - public abstract void peek(Receiver receiver, CodeSegment cs, SendOption option); - public abstract void take(Receiver receiver, CodeSegment cs, SendOption option); + public abstract void put(String key, ReceiveData rData, boolean quickFlag); + public abstract void update(String key, ReceiveData rData, boolean quickFlag); + public abstract void peek(Receiver receiver, CodeSegment cs, boolean quickFlag); + public abstract void take(Receiver receiver, CodeSegment cs, boolean quickFlag); public abstract void remove(String key); public abstract void shutdown();
--- a/src/main/java/alice/datasegment/LocalDataSegmentManager.java Sun May 03 19:40:24 2015 +0900 +++ b/src/main/java/alice/datasegment/LocalDataSegmentManager.java Mon May 04 16:02:22 2015 +0900 @@ -70,7 +70,7 @@ } @Override - public void put(String key, ReceiveData rData, SendOption option) { + public void put(String key, ReceiveData rData, boolean quickFlag) { DataSegmentKey dataSegmentKey = getDataSegmentKey(key); Command cmd = new Command(CommandType.PUT, null, key, rData, 0, 0, null, null, reverseKey); dataSegmentKey.runCommand(cmd); @@ -83,7 +83,7 @@ */ @Override - public void update(String key, ReceiveData rData, SendOption option) { + public void update(String key, ReceiveData rData, boolean quickFlag) { DataSegmentKey dataSegmentKey = getDataSegmentKey(key); Command cmd = new Command(CommandType.UPDATE, null, key, rData, 0, 0, null, null, reverseKey); dataSegmentKey.runCommand(cmd); @@ -92,7 +92,7 @@ } @Override - public void take(Receiver receiver, CodeSegment cs, SendOption option) { + public void take(Receiver receiver, CodeSegment cs, boolean quickFlag) { DataSegmentKey dataSegmentKey = getDataSegmentKey(receiver.key); int seq = this.seq.getAndIncrement(); Command cmd = new Command(CommandType.TAKE, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null); @@ -102,7 +102,7 @@ } @Override - public void peek(Receiver receiver, CodeSegment cs, SendOption option) { + public void peek(Receiver receiver, CodeSegment cs, boolean quickFlag) { DataSegmentKey dataSegmentKey = getDataSegmentKey(receiver.key); int seq = this.seq.getAndIncrement(); Command cmd = new Command(CommandType.PEEK, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null); @@ -136,7 +136,6 @@ dataSegmentKey.runCommand(cmd); if (logger.isDebugEnabled()) logger.debug(cmd.getCommandString()); - } @Override
--- a/src/main/java/alice/datasegment/RemoteDataSegmentManager.java Sun May 03 19:40:24 2015 +0900 +++ b/src/main/java/alice/datasegment/RemoteDataSegmentManager.java Mon May 04 16:02:22 2015 +0900 @@ -47,7 +47,7 @@ in.setPriority(MAX_PRIORITY); in.start(); OutboundTcpConnection out = new OutboundTcpConnection(connection); - out.setName(connectionKey+"-OutboundTcp"); + out.setName(connectionKey + "-OutboundTcp"); out.setPriority(MAX_PRIORITY); out.start(); } @@ -58,10 +58,10 @@ * send put command to target DataSegment */ @Override - public void put(String key, ReceiveData rData, SendOption option) { + public void put(String key, ReceiveData rData, boolean quickFlag) { Command cmd = new Command(CommandType.PUT, null, key, rData, 0, 0, null, null, ""); - cmd.setCompressFlag(option.isCompress()); - if (option.isQuick()){ + + if (quickFlag){ connection.write(cmd); // put command is executed right now } else { connection.sendCommand(cmd); // put command on the transmission thread @@ -71,10 +71,10 @@ } @Override - public void update(String key, ReceiveData rData, SendOption option) { + public void update(String key, ReceiveData rData, boolean quickFlag) { Command cmd = new Command(CommandType.UPDATE, null, key, rData, 0, 0, null, null, ""); - cmd.setCompressFlag(option.isCompress()); - if (option.isQuick()){ + + if (quickFlag){ connection.write(cmd); } else { connection.sendCommand(cmd); @@ -84,12 +84,12 @@ } @Override - public void take(Receiver receiver, CodeSegment cs, SendOption option) { + public void take(Receiver receiver, CodeSegment cs, boolean quickFlag) { int seq = this.seq.getAndIncrement(); Command cmd = new Command(CommandType.TAKE, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null); - cmd.setQuickFlag(option.isQuick()); + cmd.setQuickFlag(quickFlag); seqHash.put(seq, cmd); - if (option.isQuick()){ + if (quickFlag){ connection.write(cmd); } else { connection.sendCommand(cmd); @@ -99,12 +99,12 @@ } @Override - public void peek(Receiver receiver, CodeSegment cs, SendOption option) { + public void peek(Receiver receiver, CodeSegment cs, boolean quickFlag) { int seq = this.seq.getAndIncrement(); Command cmd = new Command(CommandType.PEEK, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null); - cmd.setQuickFlag(option.isQuick()); + cmd.setQuickFlag(quickFlag); seqHash.put(seq, cmd); - if (option.isQuick()){ + if (quickFlag){ connection.write(cmd); } else { connection.sendCommand(cmd); @@ -157,4 +157,5 @@ public void setSendError(boolean b) { connection.sendManager = b; } + }
--- a/src/main/java/alice/test/codesegment/remote/RemoteIncrement.java Sun May 03 19:40:24 2015 +0900 +++ b/src/main/java/alice/test/codesegment/remote/RemoteIncrement.java Mon May 04 16:02:22 2015 +0900 @@ -6,7 +6,7 @@ public class RemoteIncrement extends CodeSegment { - public Receiver num = ids.create(CommandType.TAKE, true);//true でCompressedDSMからtake + public Receiver num = ids.create(CommandType.TAKE);//true でCompressedDSMからtake /** * Increment DataSegment value up to 10 @@ -18,9 +18,9 @@ if (num == 10) System.exit(0); RemoteIncrement cs = new RemoteIncrement(); - cs.num.setKey("remote", "num"); + cs.num.setKey("compressedremote", "num"); - ods.put("local", "num", num, true); + ods.put("compressedlocal", "num", num); } } \ No newline at end of file
--- a/src/main/java/alice/test/codesegment/remote/RemoteStartCodeSegment.java Sun May 03 19:40:24 2015 +0900 +++ b/src/main/java/alice/test/codesegment/remote/RemoteStartCodeSegment.java Mon May 04 16:02:22 2015 +0900 @@ -7,8 +7,8 @@ @Override public void run() { RemoteIncrement cs = new RemoteIncrement(); - cs.num.setKey("remote", "num"); + cs.num.setKey("compressedremote", "num"); - ods.put("local", "num", 0, true); + ods.put("compressedlocal", "num", 0); } } \ No newline at end of file