Mercurial > hg > Database > Alice
changeset 455:b004f62b83e5 dispose
refactor (remove quick method from DataSegmentManager and use flag)
author | sugi |
---|---|
date | Sun, 02 Nov 2014 18:07:43 +0900 (2014-11-02) |
parents | f8a8f869f016 |
children | 212a81cf7a86 |
files | src/main/java/alice/codesegment/InputDataSegment.java src/main/java/alice/codesegment/OutputDataSegment.java src/main/java/alice/daemon/Connection.java src/main/java/alice/daemon/IncomingTcpConnection.java src/main/java/alice/daemon/IncomingUdpConnection.java src/main/java/alice/datasegment/Command.java src/main/java/alice/datasegment/DataSegmentManager.java src/main/java/alice/datasegment/LocalDataSegmentManager.java src/main/java/alice/datasegment/RemoteDataSegmentManager.java |
diffstat | 9 files changed, 81 insertions(+), 122 deletions(-) [+] |
line wrap: on
line diff
--- a/src/main/java/alice/codesegment/InputDataSegment.java Tue Oct 28 17:34:26 2014 +0900 +++ b/src/main/java/alice/codesegment/InputDataSegment.java Sun Nov 02 18:07:43 2014 +0900 @@ -7,6 +7,7 @@ import alice.datasegment.DataSegment; import alice.datasegment.ReceiveData; import alice.datasegment.Receiver; +import alice.datasegment.SendOption; /** * InputDataSegment Manager @@ -31,37 +32,41 @@ public void quickPeek(Receiver receiver) { cs.list.add(receiver); if (receiver.managerKey==null){ - DataSegment.getLocal().peek(receiver, cs); + DataSegment.getLocal().peek(receiver, cs, null); } else { - DataSegment.get(receiver.managerKey).quickPeek(receiver ,cs); + SendOption option = new SendOption(true, false); + DataSegment.get(receiver.managerKey).peek(receiver, cs, option); + } + } + + + public void peek(Receiver receiver) { + cs.list.add(receiver); + if (receiver.managerKey==null){ + DataSegment.getLocal().peek(receiver, cs, null); + } else { + SendOption option = new SendOption(false, false); + DataSegment.get(receiver.managerKey).peek(receiver, cs, option); } } public void quickTake(Receiver receiver) { cs.list.add(receiver); if (receiver.managerKey==null){ - DataSegment.getLocal().quickTake(receiver, cs); + DataSegment.getLocal().take(receiver, cs, null); } else { - DataSegment.get(receiver.managerKey).quickTake(receiver ,cs); + SendOption option = new SendOption(true, false); + DataSegment.get(receiver.managerKey).take(receiver, cs, option); } } - public void peek(Receiver receiver) { - cs.list.add(receiver); - if (receiver.managerKey==null){ - DataSegment.getLocal().peek(receiver, cs); - } else { - DataSegment.get(receiver.managerKey).peek(receiver, cs); - } - } - - public void take(Receiver receiver) { cs.list.add(receiver); if (receiver.managerKey==null){ - DataSegment.getLocal().take(receiver, cs); + DataSegment.getLocal().take(receiver, cs, null); } else { - DataSegment.get(receiver.managerKey).take(receiver, cs); + SendOption option = new SendOption(false, false); + DataSegment.get(receiver.managerKey).take(receiver, cs, option); } }
--- a/src/main/java/alice/codesegment/OutputDataSegment.java Tue Oct 28 17:34:26 2014 +0900 +++ b/src/main/java/alice/codesegment/OutputDataSegment.java Sun Nov 02 18:07:43 2014 +0900 @@ -3,24 +3,26 @@ import alice.datasegment.CommandType; import alice.datasegment.DataSegment; import alice.datasegment.Receiver; +import alice.datasegment.SendOption; public class OutputDataSegment { + private boolean compressFlag = false; /** * for local */ public void flip(Receiver receiver) { - DataSegment.getLocal().put(receiver.key, receiver.getObj()); + DataSegment.getLocal().put(receiver.key, receiver.getObj(), null); } public void flip(Receiver receiver, CommandType type) { switch (type) { case PUT: - DataSegment.getLocal().put(receiver.key, receiver.getObj()); + DataSegment.getLocal().put(receiver.key, receiver.getObj(), null); break; case UPDATE: - DataSegment.getLocal().update(receiver.key, receiver.getObj()); + DataSegment.getLocal().update(receiver.key, receiver.getObj(), null); break; default: break; @@ -28,19 +30,11 @@ } public void put(String key, Object val) { - DataSegment.getLocal().put(key, val); - } - - public void quickPut(String key, Object val) { - put(key, val); + DataSegment.getLocal().put(key, val, null); } public void update(String key, Object val) { - DataSegment.getLocal().update(key, val); - } - - public void quickuUpdate(String key, Object val) { - update(key, val); + DataSegment.getLocal().update(key, val, null); } /** @@ -48,7 +42,8 @@ */ public void put(String managerKey, String key, Object val) { if (!managerKey.equals("local")){ - DataSegment.get(managerKey).put(key,val); + SendOption option = new SendOption(false, compressFlag()); + DataSegment.get(managerKey).put(key, val, option); } else { put(key, val); } @@ -56,7 +51,8 @@ public void quickPut(String managerKey, String key, Object val) { if (!managerKey.equals("local")){ - DataSegment.get(managerKey).quickPut(key, val); + SendOption option = new SendOption(true, compressFlag()); + DataSegment.get(managerKey).put(key, val, option); } else { put(key, val); } @@ -64,7 +60,8 @@ public void update(String managerKey, String key, Object val) { if (!managerKey.equals("local")){ - DataSegment.get(managerKey).update(key, val); + SendOption option = new SendOption(false, compressFlag()); + DataSegment.get(managerKey).update(key, val, option); } else { update(key, val); } @@ -72,7 +69,8 @@ public void quickUpdate(String managerKey, String key, Object val) { if (!managerKey.equals("local")){ - DataSegment.get(managerKey).update(key, val); + SendOption option = new SendOption(true, compressFlag()); + DataSegment.get(managerKey).update(key, val, option); } else { update(key, val); } @@ -118,4 +116,12 @@ public void shutdown(String managerKey){ DataSegment.get(managerKey).shutdown(); } + + public boolean compressFlag() { + return compressFlag; + } + + public void setCompressFlag(boolean cFlag) { + compressFlag = cFlag; + } }
--- a/src/main/java/alice/daemon/Connection.java Tue Oct 28 17:34:26 2014 +0900 +++ b/src/main/java/alice/daemon/Connection.java Sun Nov 02 18:07:43 2014 +0900 @@ -63,7 +63,7 @@ public void putConnectionInfo() { ConnectionInfo c = new ConnectionInfo(socket.getInetAddress().toString() ,socket.getPort()); - DataSegment.getLocal().put("disconnect", c); + DataSegment.getLocal().put("disconnect", c, null); } }
--- a/src/main/java/alice/daemon/IncomingTcpConnection.java Tue Oct 28 17:34:26 2014 +0900 +++ b/src/main/java/alice/daemon/IncomingTcpConnection.java Sun Nov 02 18:07:43 2014 +0900 @@ -87,7 +87,7 @@ DataSegment.get(reverseKey).response(msg.key); break; case RESPONSE: - DataSegment.getLocal().put(msg.key, new RespondData(reverseKey, System.currentTimeMillis())); + DataSegment.getLocal().put(msg.key, new RespondData(reverseKey, System.currentTimeMillis()), null); break; default: break;
--- a/src/main/java/alice/daemon/IncomingUdpConnection.java Tue Oct 28 17:34:26 2014 +0900 +++ b/src/main/java/alice/daemon/IncomingUdpConnection.java Sun Nov 02 18:07:43 2014 +0900 @@ -77,7 +77,7 @@ DataSegment.get(reverseKey).response(msg.key); break; case RESPONSE: - DataSegment.getLocal().put(msg.key, new RespondData(reverseKey, System.currentTimeMillis())); + DataSegment.getLocal().put(msg.key, new RespondData(reverseKey, System.currentTimeMillis()), null); break; default: break;
--- a/src/main/java/alice/datasegment/Command.java Tue Oct 28 17:34:26 2014 +0900 +++ b/src/main/java/alice/datasegment/Command.java Sun Nov 02 18:07:43 2014 +0900 @@ -85,10 +85,7 @@ if (!serializeFlag) { data = (byte[]) val; } else { - long start = System.currentTimeMillis(); data = msg.write(val); - long end = System.currentTimeMillis(); - System.out.println("convert DataSegment" +(end - start)); } if (compressFlag) { data = zip(data); @@ -98,10 +95,7 @@ buf = ByteBuffer.allocate(header.length+dataSize.length+data.length); buf.put(header); buf.put(dataSize); - long start = System.currentTimeMillis(); buf.put(data); - long end = System.currentTimeMillis(); - System.out.println("put DataSegment" +(end - start)); break; case REPLY: // only serialize if (serializeFlag) {
--- a/src/main/java/alice/datasegment/DataSegmentManager.java Tue Oct 28 17:34:26 2014 +0900 +++ b/src/main/java/alice/datasegment/DataSegmentManager.java Sun Nov 02 18:07:43 2014 +0900 @@ -50,15 +50,10 @@ } } - public abstract void put(String key, Object val); - public abstract void update(String key, Object val); - public abstract void peek(Receiver receiver, CodeSegment cs); - public abstract void take(Receiver receiver, CodeSegment cs); - - public abstract void quickPut(String key, Object val); - public abstract void quickUpdate(String key, Object val); - public abstract void quickPeek(Receiver receiver, CodeSegment cs); - public abstract void quickTake(Receiver receiver, CodeSegment cs); + public abstract void put(String key, Object val, SendOption option); + public abstract void update(String key, Object val, SendOption option); + public abstract void peek(Receiver receiver, CodeSegment cs, SendOption option); + public abstract void take(Receiver receiver, CodeSegment cs, SendOption option); public abstract void remove(String key); public abstract void shutdown();
--- a/src/main/java/alice/datasegment/LocalDataSegmentManager.java Tue Oct 28 17:34:26 2014 +0900 +++ b/src/main/java/alice/datasegment/LocalDataSegmentManager.java Sun Nov 02 18:07:43 2014 +0900 @@ -65,7 +65,7 @@ } @Override - public void put(String key, Object val) { + public void put(String key, Object val, SendOption option) { DataSegmentKey dataSegmentKey = getDataSegmentKey(key); Command cmd = new Command(CommandType.PUT, null, key, val, 0, 0, null, null, reverseKey); dataSegmentKey.runCommand(cmd); @@ -73,17 +73,12 @@ logger.debug(cmd.getCommandString()); } - @Override - public void quickPut(String key, Object val) { - put(key, val); - } - /** * Enqueue update command to the queue of each DataSegment key */ @Override - public void update(String key, Object val) { + public void update(String key, Object val, SendOption option) { DataSegmentKey dataSegmentKey = getDataSegmentKey(key); Command cmd = new Command(CommandType.UPDATE, null, key, val, 0, 0, null, null, reverseKey); dataSegmentKey.runCommand(cmd); @@ -92,12 +87,7 @@ } @Override - public void quickUpdate(String key, Object val) { - update(key, val); - } - - @Override - public void take(Receiver receiver, CodeSegment cs) { + public void take(Receiver receiver, CodeSegment cs, SendOption option) { DataSegmentKey dataSegmentKey = getDataSegmentKey(receiver.key); int seq = this.seq.getAndIncrement(); Command cmd = new Command(CommandType.TAKE, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null); @@ -107,12 +97,7 @@ } @Override - public void quickTake(Receiver receiver, CodeSegment cs) { - take(receiver, cs); - } - - @Override - public void peek(Receiver receiver, CodeSegment cs) { + public void peek(Receiver receiver, CodeSegment cs, SendOption option) { DataSegmentKey dataSegmentKey = getDataSegmentKey(receiver.key); int seq = this.seq.getAndIncrement(); Command cmd = new Command(CommandType.PEEK, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null); @@ -122,11 +107,6 @@ } @Override - public void quickPeek(Receiver receiver, CodeSegment cs) { - peek(receiver, cs); - } - - @Override public void remove(String key) { DataSegmentKey dataSegmentKey = getDataSegmentKey(key); Command cmd = new Command(CommandType.REMOVE, null, key, null, 0, 0, replyQueue, null, null);
--- a/src/main/java/alice/datasegment/RemoteDataSegmentManager.java Tue Oct 28 17:34:26 2014 +0900 +++ b/src/main/java/alice/datasegment/RemoteDataSegmentManager.java Sun Nov 02 18:07:43 2014 +0900 @@ -51,82 +51,61 @@ * send put command to target DataSegment */ @Override - public void put(String key, Object val) { + public void put(String key, Object val, SendOption option) { Command cmd = new Command(CommandType.PUT, null, key, val, 0, 0, null, null, ""); cmd.setSerializeFlag(true); - connection.sendCommand(cmd); // put command on the transmission thread + if (option.isQuick()){ + connection.write(cmd); // put command is executed right now + } else { + connection.sendCommand(cmd); // put command on the transmission thread + } if (logger.isDebugEnabled()) logger.debug(cmd.getCommandString()); } @Override - public void quickPut(String key, Object val) { - Command cmd = new Command(CommandType.PUT, null, key, val, 0, 0, null, null, ""); - cmd.setSerializeFlag(true); - connection.write(cmd); // put command is executed right now - if (logger.isDebugEnabled()) - logger.debug(cmd.getCommandString()); - } - - @Override - public void update(String key, Object val) { + public void update(String key, Object val, SendOption option) { Command cmd = new Command(CommandType.UPDATE, null, key, val, 0, 0, null, null, ""); cmd.setSerializeFlag(true); - connection.sendCommand(cmd); - if (logger.isDebugEnabled()) - logger.debug(cmd.getCommandString()); - } - - @Override - public void quickUpdate(String key, Object val) { - Command cmd = new Command(CommandType.UPDATE, null, key, val, 0, 0, null, null, ""); - cmd.setSerializeFlag(true); - connection.write(cmd); + if (option.isQuick()){ + connection.write(cmd); + } else { + connection.sendCommand(cmd); + } if (logger.isDebugEnabled()) logger.debug(cmd.getCommandString()); } @Override - public void take(Receiver receiver, CodeSegment cs) { + public void take(Receiver receiver, CodeSegment cs, SendOption option) { int seq = this.seq.getAndIncrement(); Command cmd = new Command(CommandType.TAKE, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null); + cmd.setQuickFlag(option.isQuick()); seqHash.put(seq, cmd); - connection.sendCommand(cmd); - if (logger.isDebugEnabled()) - logger.debug(cmd.getCommandString()); - } - - public void quickTake(Receiver receiver, CodeSegment cs) { - int seq = this.seq.getAndIncrement(); - Command cmd = new Command(CommandType.TAKE, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null); - cmd.setQuickFlag(true); - seqHash.put(seq, cmd); - connection.write(cmd); + if (option.isQuick()){ + connection.write(cmd); + } else { + connection.sendCommand(cmd); + } if (logger.isDebugEnabled()) logger.debug(cmd.getCommandString()); } @Override - public void peek(Receiver receiver, CodeSegment cs) { + public void peek(Receiver receiver, CodeSegment cs, SendOption option) { int seq = this.seq.getAndIncrement(); Command cmd = new Command(CommandType.PEEK, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null); + cmd.setQuickFlag(option.isQuick()); seqHash.put(seq, cmd); - connection.sendCommand(cmd); + if (option.isQuick()){ + connection.write(cmd); + } else { + connection.sendCommand(cmd); + } if (logger.isDebugEnabled()) logger.debug(cmd.getCommandString()); } - public void quickPeek(Receiver receiver, CodeSegment cs) { - int seq = this.seq.getAndIncrement(); - Command cmd = new Command(CommandType.PEEK, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null); - cmd.setQuickFlag(true); - seqHash.put(seq, cmd); - connection.write(cmd); - if (logger.isDebugEnabled()) - logger.debug(cmd.getCommandString()); - - } - @Override public void remove(String key) { Command cmd = new Command(CommandType.REMOVE, null, key, null, 0, 0, null, null, "");