Mercurial > hg > Database > Alice
changeset 526:928907206d21 dispose
remove CompressedRDSM & CompressedLDSM class
line wrap: on
line diff
--- a/src/main/java/alice/codesegment/InputDataSegment.java Thu Apr 16 20:33:53 2015 +0900 +++ b/src/main/java/alice/codesegment/InputDataSegment.java Sat Apr 18 19:09:15 2015 +0900 @@ -100,14 +100,15 @@ } public void take(Receiver receiver) { + System.out.println("in TAKE"); cs.register(receiver); if (receiver.compressedFlag){ - if (receiver.managerKey==null){ + if (receiver.managerKey==null){// 指定なしの場合デフォはローカルになる DataSegment.getCompressedLocal().take(receiver, cs, false); } else { if (DataSegment.contains(receiver.managerKey)) { - DataSegment.get(receiver.managerKey + "!").take(receiver, cs, false); + DataSegment.get(receiver.managerKey + "!").take(receiver, cs, false, true); } } } else {
--- a/src/main/java/alice/codesegment/OutputDataSegment.java Thu Apr 16 20:33:53 2015 +0900 +++ b/src/main/java/alice/codesegment/OutputDataSegment.java Sat Apr 18 19:09:15 2015 +0900 @@ -93,17 +93,26 @@ } public void put(String managerKey, String key, Object val, boolean cFlag) {//追加 - if (!managerKey.equals("local") && DataSegment.contains(managerKey)){ - ReceiveData rData = new ReceiveData(val); - rData.setCompressFlag(cFlag); + System.out.println("in PUT"); + ReceiveData rData = new ReceiveData(val); + rData.setCompressFlag(cFlag); + + if (!managerKey.equals("local") && DataSegment.contains(managerKey)){///if remote if (cFlag){ DataSegment.get(managerKey + "!").put(key, rData, false); } else { DataSegment.get(managerKey).put(key, rData, false); } - } else { - put(key, val); + } else {// if local + if (cFlag){ + DataSegment.getCompressedLocal().put(key, rData, false); + } else { + put(key, val); + } } + + + } public void quickPut(String managerKey, String key, ReceiveData rData) {
--- a/src/main/java/alice/daemon/Connection.java Thu Apr 16 20:33:53 2015 +0900 +++ b/src/main/java/alice/daemon/Connection.java Sat Apr 18 19:09:15 2015 +0900 @@ -60,7 +60,6 @@ ReceiveData rData = new ReceiveData(c); DataSegment.getLocal().put("_DISCONNECT", rData, false); if (sendManager) { - SendOption option = new SendOption(false, false); DataSegment.get("manager").put("_DISCONNECTNODE", rData, false); } }
--- a/src/main/java/alice/daemon/IncomingTcpConnection.java Thu Apr 16 20:33:53 2015 +0900 +++ b/src/main/java/alice/daemon/IncomingTcpConnection.java Sat Apr 18 19:09:15 2015 +0900 @@ -21,6 +21,7 @@ protected DataSegmentManager manager; protected String reverseKey; private LocalDataSegmentManager lmanager = DataSegment.getLocal(); + private LocalDataSegmentManager compressedlmanager = DataSegment.getCompressedLocal(); public IncomingTcpConnection(DataSegmentManager manager) { this.manager = manager; @@ -36,6 +37,10 @@ return lmanager; } + public LocalDataSegmentManager getCompressedLocalDataSegmentManager(){//追加 + return compressedlmanager; + } + /** * pipeline thread for receiving */ @@ -58,30 +63,50 @@ switch (type) { case UPDATE: case PUT: + System.out.println("in TCP PUT"); rData = new ReceiveData(unpacker.getSerializedByteArray(unpacker.readInt())); rData.setCompressFlag(msg.compressed); + if (msg.setTime) { rData.setTime = true; rData.time = msg.time; rData.depth = msg.depth; } + cmd = new Command(type, null, null, rData, 0, 0, null, null, reverseKey); - lmanager.getDataSegmentKey(msg.key).runCommand(cmd); + + if (msg.compressed){ + compressedlmanager.getDataSegmentKey(msg.key).runCommand(cmd); + } else { + lmanager.getDataSegmentKey(msg.key).runCommand(cmd); + } + break; case PEEK: case TAKE: + System.out.println("in TCP TAKE"); cmd = new Command(type, null, null, null, msg.index, msg.seq, null, null, connection); cmd.setQuickFlag(msg.quickFlag); - lmanager.getDataSegmentKey(msg.key).runCommand(cmd); + cmd.setCompressFlag(msg.compressed); + + if (msg.compressed) { + compressedlmanager.getDataSegmentKey(msg.key).runCommand(cmd); + } else { + lmanager.getDataSegmentKey(msg.key).runCommand(cmd); + } + break; case REMOVE: cmd = new Command(type, null, null, null, 0, 0, null, null, ""); lmanager.getDataSegmentKey(msg.key).runCommand(cmd); break; case REPLY: + System.out.println("in TCP REPLY"); cmd = manager.getAndRemoveCmd(msg.seq); + rData = new ReceiveData(unpacker.getSerializedByteArray(unpacker.readInt())); rData.setCompressFlag(msg.compressed); + Command rCmd = new Command(type, null, null, rData, msg.index, msg.seq, null, null, ""); cmd.cs.ids.reply(cmd.receiver, rCmd); break;
--- a/src/main/java/alice/daemon/IncomingUdpConnection.java Thu Apr 16 20:33:53 2015 +0900 +++ b/src/main/java/alice/daemon/IncomingUdpConnection.java Sat Apr 18 19:09:15 2015 +0900 @@ -49,22 +49,36 @@ rData = new ReceiveData(unpacker.getSerializedByteArray(unpacker.readInt())); rData.setCompressFlag(msg.compressed); cmd = new Command(type, null, null, rData, 0, 0, null, null, reverseKey); - getLocalDataSegmentManager().getDataSegmentKey(msg.key).runCommand(cmd); + + if (msg.compressed){ + getCompressedLocalDataSegmentManager().getDataSegmentKey(msg.key).runCommand(cmd); + } else { + getLocalDataSegmentManager().getDataSegmentKey(msg.key).runCommand(cmd); + } break; case PEEK: case TAKE: cmd = new Command(type, null, null, null, msg.index, msg.seq, null, null ,sender); cmd.setQuickFlag(msg.quickFlag); - getLocalDataSegmentManager().getDataSegmentKey(msg.key).runCommand(cmd); + cmd.setCompressFlag(msg.compressed); + + if (msg.compressed) { + getCompressedLocalDataSegmentManager().getDataSegmentKey(msg.key).runCommand(cmd); + } else { + getLocalDataSegmentManager().getDataSegmentKey(msg.key).runCommand(cmd); + } break; case REMOVE: cmd = new Command(type, null, null, null, 0, 0, null, null, ""); getLocalDataSegmentManager().getDataSegmentKey(msg.key).runCommand(cmd); break; case REPLY: + System.out.println("in UDP REPLY"); cmd = manager.getAndRemoveCmd(msg.seq); + rData = new ReceiveData(unpacker.getSerializedByteArray(unpacker.readInt())); rData.setCompressFlag(msg.compressed); + Command rCmd = new Command(type, null, null, rData, msg.index, msg.seq, null, null, ""); cmd.cs.ids.reply(cmd.receiver, rCmd); break;
--- a/src/main/java/alice/datasegment/Command.java Thu Apr 16 20:33:53 2015 +0900 +++ b/src/main/java/alice/datasegment/Command.java Sat Apr 18 19:09:15 2015 +0900 @@ -80,8 +80,7 @@ byte[] header = null;//DSのメタデータ用byteArray byte[] data = null;//DS本体用byteArray byte[] dataSize = null;//DSのサイズ - boolean serialized = false;//DSがシリアライズ状態かのフラグ - boolean compressed = false;//DSが圧縮されているかのフラグ + switch (type) { /* * UPDATE, PUT, REPLY need send DataSegment to RemoteDataSegment @@ -94,15 +93,15 @@ case UPDATE: case PUT: case REPLY://ReceiveDataからREPLYするDSを取得 - if (rData.serialized()) {//純粋なオブジェクトの場合 - data = msg.write(rData.getObj());//シリアライズ - serialized = true; - } else { // rData is RAW ByteArray or already serialized ///or Compressed + System.out.println("in REPLY"); + + if (!rData.serialized()) {//純粋なオブジェクトの場合シリアライズ + data = msg.write(rData.getObj()); + } else { // rData is RAW ByteArray or already serialized data = rData.asByteArray(); - serialized = rData.serialized(); } - CommandMessage cm = new CommandMessage(type.id, index, seq, key, false, serialized, compressed); + CommandMessage cm = new CommandMessage(type.id, index, seq, key, false, rData.serialized(), rData.compressed()); if (rData.setTime) {//AliceVNCの計測用(消してもいい) cm.setTime = true; cm.time = rData.time;
--- a/src/main/java/alice/datasegment/CompressedLocalDataSegmentManager.java Thu Apr 16 20:33:53 2015 +0900 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,27 +0,0 @@ -package alice.datasegment; - -import alice.codesegment.CodeSegment; -import org.apache.log4j.Logger; - -import java.util.concurrent.ConcurrentHashMap; - -/** - * Created by e125769 on 4/8/15. - */ -public class CompressedLocalDataSegmentManager extends LocalDataSegmentManager{ - private String reverseKey = "compressedLocal"; - //private ConcurrentHashMap<String, DataSegmentKey> dataSegments = new ConcurrentHashMap<String, DataSegmentKey>(); - private Logger logger = Logger.getLogger("compressedLocal"); - - - @Override - public void peek(Receiver receiver, CodeSegment cs, boolean quickFlag) {//とりあえずコピペ - DataSegmentKey dataSegmentKey = getDataSegmentKey(receiver.key); - int seq = this.seq.getAndIncrement(); - Command cmd = new Command(CommandType.PEEK, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null); - dataSegmentKey.runCommand(cmd); - if (logger.isDebugEnabled()) - logger.debug(cmd.getCommandString()); - } - -}
--- a/src/main/java/alice/datasegment/CompressedRemoteDataSegmentManager.java Thu Apr 16 20:33:53 2015 +0900 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,54 +0,0 @@ -package alice.datasegment; - -import alice.daemon.Connection; -import alice.daemon.IncomingTcpConnection; -import alice.daemon.OutboundTcpConnection; -import org.apache.log4j.Logger; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.nio.channels.SocketChannel; - -/** - * Created by e125769 on 4/11/15. - */ -public class CompressedRemoteDataSegmentManager extends RemoteDataSegmentManager { - - public CompressedRemoteDataSegmentManager(){} - - public CompressedRemoteDataSegmentManager(final String connectionKey, final String reverseKey, final String hostName, final int port) { - logger = Logger.getLogger(connectionKey); - connection = new Connection(); - connection.name = connectionKey; - final RemoteDataSegmentManager manager = this; - //new Thread(replyThread, "RemoteDataSegmentManager-" + connectionKey).start(); - new Thread("Connect-" + connectionKey) { - public void run() { - boolean connect = true; - do { - try { - SocketChannel sc = SocketChannel.open(new InetSocketAddress(hostName, port)); - connection.socket = sc.socket(); - connection.socket.setTcpNoDelay(true); - connect = false; - logger.info("Connect to " + connection.getInfoString()); - } catch (IOException e) { - try { - Thread.sleep(50); - } catch (InterruptedException e1) { - e1.printStackTrace(); - } - } - } while (connect); - IncomingTcpConnection in = new IncomingTcpConnection(connection, manager, reverseKey); - in.setName(reverseKey+"-IncomingTcp"); - in.setPriority(MAX_PRIORITY); - in.start(); - OutboundTcpConnection out = new OutboundTcpConnection(connection); - out.setName(connectionKey+"-OutboundTcp"); - out.setPriority(MAX_PRIORITY); - out.start(); - } - }.start(); - } -}
--- a/src/main/java/alice/datasegment/DataSegment.java Thu Apr 16 20:33:53 2015 +0900 +++ b/src/main/java/alice/datasegment/DataSegment.java Sat Apr 18 19:09:15 2015 +0900 @@ -10,12 +10,13 @@ private static DataSegment dataSegment = new DataSegment(); private LocalDataSegmentManager local = new LocalDataSegmentManager(); - private CompressedLocalDataSegmentManager compressedLocal = new CompressedLocalDataSegmentManager();//追加 + private LocalDataSegmentManager compressedLocal = new LocalDataSegmentManager();//追加 private ConcurrentHashMap<String, DataSegmentManager> dataSegmentManagers = new ConcurrentHashMap<String, DataSegmentManager>(); //TODO Over Head private ConcurrentHashMap<String, IncomingTcpConnection> acceptHash = new ConcurrentHashMap<String, IncomingTcpConnection>(); private DataSegment() { dataSegmentManagers.put("local", local); + dataSegmentManagers.put("local!", compressedLocal); } public static DataSegmentManager get(String key) { @@ -26,7 +27,7 @@ return dataSegment.local; } - public static CompressedLocalDataSegmentManager getCompressedLocal() {//追加 + public static LocalDataSegmentManager getCompressedLocal() {//追加 return dataSegment.compressedLocal; } @@ -41,10 +42,10 @@ System.exit(0); } RemoteDataSegmentManager manager = new RemoteDataSegmentManager(connectionKey, reverseKey, hostName, port); - CompressedRemoteDataSegmentManager compresedManager = new CompressedRemoteDataSegmentManager(connectionKey + "!", reverseKey, hostName, port); + RemoteDataSegmentManager compressedManager = new RemoteDataSegmentManager(connectionKey + "!", reverseKey, hostName, port); register(connectionKey, manager); - register(connectionKey + "!", compresedManager); + register(connectionKey + "!", compressedManager); return manager; }
--- a/src/main/java/alice/datasegment/DataSegmentManager.java Thu Apr 16 20:33:53 2015 +0900 +++ b/src/main/java/alice/datasegment/DataSegmentManager.java Sat Apr 18 19:09:15 2015 +0900 @@ -56,6 +56,8 @@ public abstract void peek(Receiver receiver, CodeSegment cs, boolean quickFlag); public abstract void take(Receiver receiver, CodeSegment cs, boolean quickFlag); + public abstract void take(Receiver receiver, CodeSegment cs, boolean quickFlag, boolean compressFlag); + public abstract void remove(String key); public abstract void shutdown(); public abstract void close();
--- a/src/main/java/alice/datasegment/LocalDataSegmentManager.java Thu Apr 16 20:33:53 2015 +0900 +++ b/src/main/java/alice/datasegment/LocalDataSegmentManager.java Sat Apr 18 19:09:15 2015 +0900 @@ -102,9 +102,15 @@ @Override public void take(Receiver receiver, CodeSegment cs, boolean quickFlag) { + take(receiver, cs, quickFlag, false); + } + + @Override + public void take(Receiver receiver, CodeSegment cs, boolean quickFlag, boolean compressFlag) { DataSegmentKey dataSegmentKey = getDataSegmentKey(receiver.key); int seq = this.seq.getAndIncrement(); Command cmd = new Command(CommandType.TAKE, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null); + cmd.setCompressFlag(compressFlag); dataSegmentKey.runCommand(cmd); if (logger.isDebugEnabled()) logger.debug(cmd.getCommandString());
--- a/src/main/java/alice/datasegment/ReceiveData.java Thu Apr 16 20:33:53 2015 +0900 +++ b/src/main/java/alice/datasegment/ReceiveData.java Sat Apr 18 19:09:15 2015 +0900 @@ -2,12 +2,17 @@ import java.io.*; import java.nio.ByteBuffer; +import java.util.Arrays; import java.util.zip.*; import org.apache.log4j.Logger; import org.msgpack.type.Value; import alice.codesegment.SingletonMessage; +import javax.xml.bind.DatatypeConverter; + +import static java.util.Arrays.*; + /** * 送られてきたDSを一時的に取っておくクラス。inputでも使用。 */ @@ -16,8 +21,6 @@ private byte[] messagePack;//byteArray(serialized)のDS private byte[] zMessagePack;//byteArray(compressed)のDS private Class<?> clazz; - private Logger logger = Logger.getLogger("MessagePack"); - private boolean compressFlag = false; public long time;//測定用 public boolean setTime = false; @@ -29,25 +32,8 @@ * @param obj DS本体(Object) */ public ReceiveData(Object obj) { + clazz = obj.getClass(); val = obj; - if (compressFlag){ - try { - messagePack = asByteArray();//(byte[]) val; だとキャストできない - } catch (IOException e) { - e.printStackTrace(); - } - ByteBuffer buf = null; - - try { - zMessagePack = zip(messagePack); - buf = ByteBuffer.allocate(zMessagePack.length + 1); - buf.put((byte) 0xc1); - buf.put(zMessagePack); - zMessagePack = buf.array(); - } catch (IOException e) { - e.printStackTrace(); - } - } } /** @@ -56,15 +42,15 @@ * @param messagePack DS本体(byteArray) */ public ReceiveData(byte[] messagePack) { + clazz = messagePack.getClass(); if (messagePack[0] == 0xc1){ + System.out.println("ReceiveData is zMessagePack"); this.zMessagePack = messagePack; } else { + System.out.println("ReceiveData is MessagePack"); this.messagePack = messagePack; } - - logger.debug(this.messagePack); - logger.debug(this.zMessagePack); } @@ -77,7 +63,7 @@ } public boolean serialized(){ - return val != null; + return val == null; } public Object getObj(){//Object型のDS本体を取得するメソッド。 @@ -97,9 +83,9 @@ } public Value getVal(){//Value型としてDSを取得するメソッド - if (val == null){//もとはval != null + if (val == null){//val != null return asClass(Value.class); - } else { + } else {//ここに圧縮のときの処理入れるべきなのでは try { return SingletonMessage.getInstance().unconvert(val);//MassagePackでvalue型に変換。できなければnullを返す。 } catch (IOException e) { @@ -119,27 +105,20 @@ * @return */ public <T> T asClass(Class<T> clazz) {//javasist + System.out.println("in asClass val:" + val + ", MP:" + messagePack + ", zMP:" + zMessagePack); + try { if (val != null) { return (T) val; } - byte[] b = messagePack; if (zMessagePack != null && messagePack == null) { - logger.debug("zMessagePack = " + zMessagePack); - messagePack = unzip(zMessagePack); - b = messagePack; + messagePack = unzip(copyOfRange(zMessagePack, 1, zMessagePack.length - 1));//先頭0xC1を削除したものを解凍 } - logger.debug("MessagePack = " + messagePack); + return SingletonMessage.getInstance().read(messagePack, clazz); - if (val == null) { - this.clazz = clazz; - return SingletonMessage.getInstance().read(b, clazz);//messagePackから解凍 - } else { - return (T) val; - } - } catch (Exception e) { + } catch (IOException | DataFormatException e) { e.printStackTrace(); return null; } @@ -178,6 +157,36 @@ return os.toByteArray(); } + public void setCompressFlag(boolean cFlag) { + if (cFlag){ + try { + messagePack = asByteArray(); + } catch (IOException e) { + e.printStackTrace(); + } + + ByteBuffer buf = null; + + try { + System.out.println("in zip"); + byte[] z = zip(messagePack); + buf = ByteBuffer.allocate(z.length + 1); + buf.put((byte) 0xc1); + buf.put(z); + zMessagePack = buf.array(); + val = null; + messagePack = null; + } catch (IOException e) { + e.printStackTrace(); + } + } + } + + public Class getClaszz(){ + return clazz; + } + + public byte[] asByteArray() throws IOException{ ByteArrayOutputStream buff = new ByteArrayOutputStream(); ObjectOutput out = new ObjectOutputStream(buff); @@ -188,8 +197,4 @@ return bytes; } - public void setCompressFlag(boolean cFlag){ - compressFlag = cFlag; - } - }
--- a/src/main/java/alice/datasegment/RemoteDataSegmentManager.java Thu Apr 16 20:33:53 2015 +0900 +++ b/src/main/java/alice/datasegment/RemoteDataSegmentManager.java Sat Apr 18 19:09:15 2015 +0900 @@ -85,9 +85,15 @@ @Override public void take(Receiver receiver, CodeSegment cs, boolean quickFlag) { + take(receiver, cs, quickFlag, false); + } + + public void take(Receiver receiver, CodeSegment cs, boolean quickFlag, boolean compressFlag) { int seq = this.seq.getAndIncrement(); Command cmd = new Command(CommandType.TAKE, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null); cmd.setQuickFlag(quickFlag); + cmd.setCompressFlag(compressFlag); + seqHash.put(seq, cmd); if (quickFlag){ connection.write(cmd);
--- a/src/main/java/alice/test/codesegment/remote/RemoteIncrement.java Thu Apr 16 20:33:53 2015 +0900 +++ b/src/main/java/alice/test/codesegment/remote/RemoteIncrement.java Sat Apr 18 19:09:15 2015 +0900 @@ -6,7 +6,7 @@ public class RemoteIncrement extends CodeSegment { - public Receiver num = ids.create(CommandType.TAKE);//true でCompressedDSMからtake + public Receiver num = ids.create(CommandType.TAKE, true);//true でCompressedDSMからtake /** * Increment DataSegment value up to 10 @@ -20,8 +20,7 @@ RemoteIncrement cs = new RemoteIncrement(); cs.num.setKey("remote", "num"); - ods.put("local", "num", num); - //ods.put("remote", "num", num, true); + ods.put("local", "num", num, true); } } \ No newline at end of file
--- a/src/main/java/alice/test/codesegment/remote/RemoteStartCodeSegment.java Thu Apr 16 20:33:53 2015 +0900 +++ b/src/main/java/alice/test/codesegment/remote/RemoteStartCodeSegment.java Sat Apr 18 19:09:15 2015 +0900 @@ -9,8 +9,7 @@ RemoteIncrement cs = new RemoteIncrement(); cs.num.setKey("remote", "num"); - ods.put("local", "num", 0); - //ods.put("remote", "num", 0, true); + ods.put("local", "num", 0, true); } }