Mercurial > hg > Database > Alice
view src/main/java/alice/datasegment/CompressedLocalDataSegmentManager.java @ 574:ea21af9a4762 dispose
delete serializeFlag, fix MessagePack pack&unpack
author | Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp> |
---|---|
date | Tue, 15 Dec 2015 11:49:07 +0900 |
parents | 8c17a9e66cc7 |
children |
line wrap: on
line source
package alice.datasegment; import java.io.IOException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.apache.log4j.Logger; import alice.codesegment.CodeSegment; public class CompressedLocalDataSegmentManager extends DataSegmentManager { LocalDataSegmentManager manager; private String reverseKey = "compressedlocal"; public CompressedLocalDataSegmentManager(LocalDataSegmentManager manager) { this.manager = manager; } public void setReverseKey(String s){ reverseKey = s; } public void submitCommand(DataSegmentKey key, Command cmd) { manager.submitCommand(key, cmd); } public DataSegmentKey getDataSegmentKey(String key) { return manager.getDataSegmentKey(key); } public void removeDataSegmentKey(String key) { manager.removeDataSegmentKey(key); } @Override public void put(String key, ReceiveData rData, boolean quickFlag) { if (!rData.compressed()){ try { rData.zip(); } catch (IOException e) { e.printStackTrace(); } } Command cmd = new Command(CommandType.PUT, null, key, rData, 0, 0, null, null, reverseKey); cmd.setCompressFlag(true); manager.put1(key, cmd); } /** * Enqueue update command to the queue of each DataSegment key */ @Override public void update(String key, ReceiveData rData, boolean quickFlag) { if (!rData.compressed()){ try { rData.zip(); } catch (IOException e) { e.printStackTrace(); } } Command cmd = new Command(CommandType.UPDATE, null, key, rData, 0, 0, null, null, reverseKey); cmd.setCompressFlag(true); manager.put1(key, cmd); } @Override public void take(Receiver receiver, CodeSegment cs, boolean quickFlag) { Command cmd = new Command(CommandType.TAKE, receiver, receiver.key, null, receiver.index, 0, replyQueue, cs, null); cmd.setCompressFlag(true); manager.take1(receiver, cmd); } @Override public void peek(Receiver receiver, CodeSegment cs, boolean quickFlag) { Command cmd = new Command(CommandType.PEEK, receiver, receiver.key, null, receiver.index, 0, replyQueue, cs, null); cmd.setCompressFlag(true); manager.take1(receiver, cmd); } @Override public void remove(String key) { manager.remove(key); } @Override public void finish() { manager.finish(); } @Override public void close() { } public void recommand(Receiver receiver, CodeSegment cs) { manager.recommand(receiver, cs); } @Override public void ping(String returnKey) { } @Override public void response(String returnKey) { } @Override public void shutdown() { } @Override public void setSendError(boolean b) { } }