Mercurial > hg > Database > Alice
changeset 538:8c17a9e66cc7 dispose work-compressedDSM
Compressed LDSM refactoring & flip refactoring
author | Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp> |
---|---|
date | Fri, 19 Jun 2015 14:06:10 +0900 |
parents | 8f949fa80653 |
children | 0832af83583f 3841f137cbae 767d93626b88 5a9b83c64ddf ae13e4854c55 |
files | src/main/java/alice/codesegment/OutputDataSegment.java src/main/java/alice/datasegment/CompressedLocalDataSegmentManager.java src/main/java/alice/datasegment/DataSegment.java src/main/java/alice/datasegment/LocalDataSegmentManager.java |
diffstat | 4 files changed, 52 insertions(+), 75 deletions(-) [+] |
line wrap: on
line diff
--- a/src/main/java/alice/codesegment/OutputDataSegment.java Mon Jun 15 19:34:00 2015 +0900 +++ b/src/main/java/alice/codesegment/OutputDataSegment.java Fri Jun 19 14:06:10 2015 +0900 @@ -23,7 +23,16 @@ } } - public void flip(Receiver receiver, CommandType type) {// ToDo: add remote + public void flip(String managerKey, String key, Receiver receiver){ + if (receiver.isCompressed()){ + DataSegment.get("compressed" + managerKey).put(key, receiver.getReceiveData(), false); + } else { + DataSegment.get(managerKey).put(key, receiver.getReceiveData(), false); + } + + } + + public void flip(Receiver receiver, CommandType type) { switch (type) { case PUT: if (receiver.isCompressed()){
--- a/src/main/java/alice/datasegment/CompressedLocalDataSegmentManager.java Mon Jun 15 19:34:00 2015 +0900 +++ b/src/main/java/alice/datasegment/CompressedLocalDataSegmentManager.java Fri Jun 19 14:06:10 2015 +0900 @@ -12,18 +12,12 @@ public class CompressedLocalDataSegmentManager extends DataSegmentManager { - private String reverseKey = "local"; - private ConcurrentHashMap<String, DataSegmentKey> dataSegments = new ConcurrentHashMap<String, DataSegmentKey>(); - private Logger logger = Logger.getLogger("local"); + LocalDataSegmentManager manager; + private String reverseKey = "compressedlocal"; - private ThreadPoolExecutor dataSegmentExecutor = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), // initial number of threads - Runtime.getRuntime().availableProcessors(), - Integer.MAX_VALUE, // keepAliveTime - TimeUnit.SECONDS, - new LinkedBlockingQueue<Runnable>()); - - public CompressedLocalDataSegmentManager() { - new Thread(replyThread, "LocalDataSegmentManager-replyCommand").start(); + public CompressedLocalDataSegmentManager(LocalDataSegmentManager manager) { + this.manager = manager; + new Thread(replyThread, "CompressedLocalDataSegmentManager-replyCommand").start(); } public void setReverseKey(String s){ @@ -48,31 +42,19 @@ } public void submitCommand(DataSegmentKey key, Command cmd) { - dataSegmentExecutor.execute(new RunCommand(key, cmd)); + manager.submitCommand(key, cmd); } public DataSegmentKey getDataSegmentKey(String key) { - DataSegmentKey dsKey = dataSegments.get(key); - if (dsKey != null) - return dsKey; - if (key == null) - return null; - DataSegmentKey newDataSegmentKey = new DataSegmentKey(); - DataSegmentKey dataSegmentKey = dataSegments.putIfAbsent(key, newDataSegmentKey); - if (dataSegmentKey == null) { - dataSegmentKey = newDataSegmentKey; - } - return dataSegmentKey; + return manager.getDataSegmentKey(key); } public void removeDataSegmentKey(String key) { - if (key!=null) - dataSegments.remove(key); + manager.removeDataSegmentKey(key); } @Override public void put(String key, ReceiveData rData, boolean quickFlag) { - DataSegmentKey dataSegmentKey = getDataSegmentKey(key); if (!rData.compressed()){ try { rData.zip(); @@ -84,9 +66,7 @@ Command cmd = new Command(CommandType.PUT, null, key, rData, 0, 0, null, null, reverseKey); cmd.setCompressFlag(true); - dataSegmentKey.runCommand(cmd); - if (logger.isDebugEnabled()) - logger.debug(cmd.getCommandString()); + manager.put1(key, cmd); } /** @@ -95,7 +75,7 @@ @Override public void update(String key, ReceiveData rData, boolean quickFlag) { - DataSegmentKey dataSegmentKey = getDataSegmentKey(key); + if (!rData.compressed()){ try { rData.zip(); @@ -107,46 +87,32 @@ Command cmd = new Command(CommandType.UPDATE, null, key, rData, 0, 0, null, null, reverseKey); cmd.setCompressFlag(true); - dataSegmentKey.runCommand(cmd); - if (logger.isDebugEnabled()) - logger.debug(cmd.getCommandString()); + manager.put1(key, cmd); } @Override public void take(Receiver receiver, CodeSegment cs, boolean quickFlag) { - DataSegmentKey dataSegmentKey = getDataSegmentKey(receiver.key); - int seq = this.seq.getAndIncrement(); - Command cmd = new Command(CommandType.TAKE, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null); + Command cmd = new Command(CommandType.TAKE, receiver, receiver.key, null, receiver.index, 0, replyQueue, cs, null); cmd.setCompressFlag(true); - dataSegmentKey.runCommand(cmd); - if (logger.isDebugEnabled()) - logger.debug(cmd.getCommandString()); + manager.take1(receiver, cmd); } @Override public void peek(Receiver receiver, CodeSegment cs, boolean quickFlag) { - DataSegmentKey dataSegmentKey = getDataSegmentKey(receiver.key); - int seq = this.seq.getAndIncrement(); - Command cmd = new Command(CommandType.PEEK, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null); + Command cmd = new Command(CommandType.PEEK, receiver, receiver.key, null, receiver.index, 0, replyQueue, cs, null); cmd.setCompressFlag(true); - dataSegmentKey.runCommand(cmd); - if (logger.isDebugEnabled()) - logger.debug(cmd.getCommandString()); + manager.take1(receiver, cmd); } @Override public void remove(String key) { - DataSegmentKey dataSegmentKey = getDataSegmentKey(key); - Command cmd = new Command(CommandType.REMOVE, null, key, null, 0, 0, replyQueue, null, null); - dataSegmentKey.runCommand(cmd); - if (logger.isDebugEnabled()) - logger.debug(cmd.getCommandString()); + manager.remove(key); } @Override public void finish() { - System.exit(0); + manager.finish(); } @Override @@ -155,13 +121,7 @@ } public void recommand(Receiver receiver, CodeSegment cs) { - DataSegmentKey dataSegmentKey = getDataSegmentKey(receiver.key); - int seq = this.seq.getAndIncrement(); - Command cmd = new Command(receiver.type, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null); - dataSegmentKey.runCommand(cmd); - if (logger.isDebugEnabled()) - logger.debug(cmd.getCommandString()); - + manager.recommand(receiver, cs); } @Override
--- a/src/main/java/alice/datasegment/DataSegment.java Mon Jun 15 19:34:00 2015 +0900 +++ b/src/main/java/alice/datasegment/DataSegment.java Fri Jun 19 14:06:10 2015 +0900 @@ -10,7 +10,7 @@ private static DataSegment dataSegment = new DataSegment(); private LocalDataSegmentManager local = new LocalDataSegmentManager(); - private CompressedLocalDataSegmentManager compressedLocal = new CompressedLocalDataSegmentManager();//追加 + private CompressedLocalDataSegmentManager compressedLocal = new CompressedLocalDataSegmentManager(local);//追加 private ConcurrentHashMap<String, DataSegmentManager> dataSegmentManagers = new ConcurrentHashMap<String, DataSegmentManager>(); //TODO Over Head private ConcurrentHashMap<String, IncomingTcpConnection> acceptHash = new ConcurrentHashMap<String, IncomingTcpConnection>(); @@ -20,7 +20,11 @@ } public static DataSegmentManager get(String key) { - return dataSegment.dataSegmentManagers.get(key); + if (key == null){ + return dataSegment.dataSegmentManagers.get("local"); + } else { + return dataSegment.dataSegmentManagers.get(key); + } } public static LocalDataSegmentManager getLocal() { @@ -41,7 +45,7 @@ System.exit(0); } RemoteDataSegmentManager manager = new RemoteDataSegmentManager(connectionKey, reverseKey, hostName, port); - CompressedRemoteDataSegmentManager compressedManager = new CompressedRemoteDataSegmentManager(manager.connection); + CompressedRemoteDataSegmentManager compressedManager = new CompressedRemoteDataSegmentManager(manager); register(connectionKey, manager); register("compressed" + connectionKey, compressedManager);
--- a/src/main/java/alice/datasegment/LocalDataSegmentManager.java Mon Jun 15 19:34:00 2015 +0900 +++ b/src/main/java/alice/datasegment/LocalDataSegmentManager.java Fri Jun 19 14:06:10 2015 +0900 @@ -71,8 +71,12 @@ @Override public void put(String key, ReceiveData rData, boolean quickFlag) { + Command cmd = new Command(CommandType.PUT, null, key, rData, 0, 0, null, null, reverseKey); + put1(key, cmd); + } + + public void put1(String key, Command cmd) { DataSegmentKey dataSegmentKey = getDataSegmentKey(key); - Command cmd = new Command(CommandType.PUT, null, key, rData, 0, 0, null, null, reverseKey); dataSegmentKey.runCommand(cmd); if (logger.isDebugEnabled()) logger.debug(cmd.getCommandString()); @@ -84,18 +88,22 @@ @Override public void update(String key, ReceiveData rData, boolean quickFlag) { - DataSegmentKey dataSegmentKey = getDataSegmentKey(key); Command cmd = new Command(CommandType.UPDATE, null, key, rData, 0, 0, null, null, reverseKey); - dataSegmentKey.runCommand(cmd); - if (logger.isDebugEnabled()) - logger.debug(cmd.getCommandString()); + put1(key, cmd); } @Override public void take(Receiver receiver, CodeSegment cs, boolean quickFlag) { - DataSegmentKey dataSegmentKey = getDataSegmentKey(receiver.key); + + Command cmd = new Command(CommandType.TAKE, receiver, receiver.key, null, receiver.index, 0, replyQueue, cs, null); + take1(receiver, cmd); + } + + public void take1(Receiver receiver, Command cmd) { int seq = this.seq.getAndIncrement(); - Command cmd = new Command(CommandType.TAKE, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null); + cmd.setSeq(seq); + //seqHash.put(seq, cmd); + DataSegmentKey dataSegmentKey = getDataSegmentKey(receiver.key); dataSegmentKey.runCommand(cmd); if (logger.isDebugEnabled()) logger.debug(cmd.getCommandString()); @@ -103,12 +111,8 @@ @Override public void peek(Receiver receiver, CodeSegment cs, boolean quickFlag) { - DataSegmentKey dataSegmentKey = getDataSegmentKey(receiver.key); - int seq = this.seq.getAndIncrement(); - Command cmd = new Command(CommandType.PEEK, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null); - dataSegmentKey.runCommand(cmd); - if (logger.isDebugEnabled()) - logger.debug(cmd.getCommandString()); + Command cmd = new Command(CommandType.PEEK, receiver, receiver.key, null, receiver.index, 0, replyQueue, cs, null); + take1(receiver, cmd); } @Override