Mercurial > hg > Database > Alice
changeset 530:4aeebea0c9b5 dispose
can't unzip
author | Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp> |
---|---|
date | Sun, 03 May 2015 10:04:28 +0900 |
parents | cb7c31848d16 |
children | b6049fb123d8 |
files | src/main/java/alice/codesegment/OutputDataSegment.java src/main/java/alice/daemon/CommandMessage.java src/main/java/alice/daemon/IncomingTcpConnection.java src/main/java/alice/datasegment/Command.java src/main/java/alice/datasegment/CompressedLocalDataSegmentManager.java src/main/java/alice/datasegment/CompressedRemoteDataSegmentManager.java src/main/java/alice/datasegment/DataSegmentKey.java src/main/java/alice/datasegment/ReceiveData.java |
diffstat | 8 files changed, 140 insertions(+), 60 deletions(-) [+] |
line wrap: on
line diff
--- a/src/main/java/alice/codesegment/OutputDataSegment.java Fri May 01 18:19:16 2015 +0900 +++ b/src/main/java/alice/codesegment/OutputDataSegment.java Sun May 03 10:04:28 2015 +0900 @@ -7,7 +7,6 @@ import alice.datasegment.SendOption; public class OutputDataSegment { - private boolean compressFlag = false;//圧縮するかどうか /** * for local @@ -96,7 +95,6 @@ System.out.println("in PUT"); ReceiveData rData = new ReceiveData(val); SendOption option = new SendOption(false, cFlag); - //rData.setCompressFlag(cFlag); if (!managerKey.equals("local") && DataSegment.contains(managerKey)){///if remote if (cFlag){ @@ -134,7 +132,6 @@ if (!managerKey.equals("local") && DataSegment.contains(managerKey)){ ReceiveData rData = new ReceiveData(val); SendOption option = new SendOption(true, cFlag); - //rData.setCompressFlag(cFlag); if (cFlag){ DataSegment.get(managerKey + "!").put(key, rData, option); @@ -166,7 +163,6 @@ public void update(String managerKey, String key, Object val, boolean cFlag) {//追加 ReceiveData rData = new ReceiveData(val); SendOption option = new SendOption(false, cFlag); - //rData.setCompressFlag(cFlag); if (!managerKey.equals("local") && DataSegment.contains(managerKey)){///if remote if (cFlag){ @@ -249,11 +245,4 @@ DataSegment.get(managerKey).ping(returnKey); } - public boolean compressFlag() { - return compressFlag; - } - - public void setCompressFlag(boolean cFlag) { - compressFlag = cFlag; - } }
--- a/src/main/java/alice/daemon/CommandMessage.java Fri May 01 18:19:16 2015 +0900 +++ b/src/main/java/alice/daemon/CommandMessage.java Sun May 03 10:04:28 2015 +0900 @@ -14,7 +14,7 @@ public boolean quickFlag = false;//SEDAを挟まずに処理を行うかどうか public boolean serialized = false;//シリアライズされているかどうか public boolean compressed = false;//圧縮されているかどうか - public int datasize; + public int dataSize = 0; public boolean setTime = false;//? public long time;//? @@ -23,7 +23,7 @@ public CommandMessage() {} public CommandMessage(int type, int index, int seq, String key - , boolean qFlag, boolean sFlag, boolean cFlag) {//コンストラクタ. setter. + , boolean qFlag, boolean sFlag, boolean cFlag, int datasize) { this.type = type; this.index = index; this.seq = seq; @@ -31,6 +31,6 @@ this.quickFlag = qFlag; this.serialized = sFlag; this.compressed = cFlag; - ///this.datasize = datasize; + this.dataSize = datasize; } }
--- a/src/main/java/alice/daemon/IncomingTcpConnection.java Fri May 01 18:19:16 2015 +0900 +++ b/src/main/java/alice/daemon/IncomingTcpConnection.java Sun May 03 10:04:28 2015 +0900 @@ -6,6 +6,7 @@ import alice.datasegment.*; import org.msgpack.MessagePack; +import org.msgpack.type.Value; import org.msgpack.unpacker.Unpacker; import alice.topology.manager.keeparive.RespondData; @@ -54,13 +55,13 @@ try { Command cmd = null; ReceiveData rData = null; - CommandMessage msg = unpacker.read(CommandMessage.class); + CommandMessage msg = unpacker.read(CommandMessage.class);///read header CommandType type = CommandType.getCommandTypeFromId(msg.type); switch (type) { case UPDATE: case PUT: System.out.println("in TCP PUT"); - rData = new ReceiveData(unpacker.getSerializedByteArray(unpacker.readInt()), msg.compressed); + rData = new ReceiveData(unpacker.getSerializedByteArray(unpacker.readInt()), msg.compressed, msg.dataSize);///read rData if (msg.setTime) { rData.setTime = true; @@ -69,30 +70,49 @@ } cmd = new Command(type, null, null, rData, 0, 0, null, null, reverseKey); + cmd.setCompressFlag(msg.compressed); - lmanager.getDataSegmentKey(msg.key).runCommand(cmd); - + if (rData.compressed()){ + compressedlmanager.getDataSegmentKey(msg.key).runCommand(cmd); + } else { + lmanager.getDataSegmentKey(msg.key).runCommand(cmd); + } break; case PEEK: case TAKE: - System.out.println("in TCP TAKE"); + System.out.println("in TCP TAKE:" + msg.compressed); cmd = new Command(type, null, null, null, msg.index, msg.seq, null, null, connection); + cmd.setCompressFlag(msg.compressed); - lmanager.getDataSegmentKey(msg.key).runCommand(cmd); + if (msg.compressed){ + compressedlmanager.getDataSegmentKey(msg.key).runCommand(cmd); + } else { + lmanager.getDataSegmentKey(msg.key).runCommand(cmd); + } break; case REMOVE: cmd = new Command(type, null, null, null, 0, 0, null, null, ""); - lmanager.getDataSegmentKey(msg.key).runCommand(cmd); + lmanager.getDataSegmentKey(msg.key).runCommand(cmd);//ToDo:fix break; case REPLY: - System.out.println("in TCP REPLY"); + System.out.println("in TCP REPLY:"); + System.out.println("After DataSize:" + msg.dataSize); cmd = manager.getAndRemoveCmd(msg.seq); + byte[] unpack = unpacker.getSerializedByteArray(unpacker.readInt()); + //Value unpack2 = packer.read(byte []); + System.out.print("REPLY unpacker: "); + for (int i = 0; i < unpack.length; i++) { + System.out.print(Integer.toHexString(unpack[i] & 0xff)); + } + System.out.print("\n"); - rData = new ReceiveData(unpacker.getSerializedByteArray(unpacker.readInt())); + + rData = new ReceiveData(unpack, msg.compressed, msg.dataSize); Command rCmd = new Command(type, null, null, rData, msg.index, msg.seq, null, null, ""); + cmd.setCompressFlag(msg.compressed); cmd.cs.ids.reply(cmd.receiver, rCmd); break; case PING:
--- a/src/main/java/alice/datasegment/Command.java Fri May 01 18:19:16 2015 +0900 +++ b/src/main/java/alice/datasegment/Command.java Sun May 03 10:04:28 2015 +0900 @@ -94,31 +94,30 @@ case UPDATE: case PUT: case REPLY: - if (rData.compressed()) { - // have already converted - data = (byte[]) rData.getObj(); - compressed = rData.compressed(); // true - serialized = rData.serialized(); + System.out.println("Command reply compressFlag:" + compressFlag); + if(compressFlag){ + System.out.println("Command get zMP:" + rData.getZMessagePack()); + data = packer.write(rData.getZMessagePack()); + compressed = true; } else { - if (!rData.serialized() && !rData.isByteArray()) { - data = packer.write(rData.getObj()); - serialized = true; - } else { // rData is RAW ByteArray or already serialized - data = (byte[]) rData.getObj(); - serialized = rData.serialized(); - } - if (compressFlag) { - rData.zip(); - compressed = true; - } + data = rData.getMessagePack(); + serialized = true; } - CommandMessage cm = new CommandMessage(type.id, index, seq, key, false, serialized, compressed); + + System.out.println("Before DataSize:" + rData.getDataSize()); + CommandMessage cm = new CommandMessage(type.id, index, seq, key, false, serialized, compressed, rData.getDataSize()); if (rData.setTime) { cm.setTime = true; cm.time = rData.time; cm.depth = rData.depth + 1; } + System.out.print("Command packer: "); + for (int i = 0; i < data.length; i++) { + System.out.print(Integer.toHexString(data[i] & 0xff)); + } + System.out.print("\n"); + header = packer.write(cm); dataSize = packer.write(data.length); buf = ByteBuffer.allocate(header.length+dataSize.length+data.length); @@ -127,7 +126,8 @@ buf.put(data); break; default: - header = packer.write(new CommandMessage(type.id, index, seq, key, quickFlag, false, compressFlag)); + System.out.println("default compressFlag:" + compressFlag); + header = packer.write(new CommandMessage(type.id, index, seq, key, quickFlag, false, compressFlag, 0)); buf = ByteBuffer.allocate(header.length); buf.put(header); break;
--- a/src/main/java/alice/datasegment/CompressedLocalDataSegmentManager.java Fri May 01 18:19:16 2015 +0900 +++ b/src/main/java/alice/datasegment/CompressedLocalDataSegmentManager.java Sun May 03 10:04:28 2015 +0900 @@ -111,6 +111,8 @@ DataSegmentKey dataSegmentKey = getDataSegmentKey(receiver.key); int seq = this.seq.getAndIncrement(); Command cmd = new Command(CommandType.TAKE, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null); + cmd.setCompressFlag(option.isCompress()); + dataSegmentKey.runCommand(cmd); if (logger.isDebugEnabled()) logger.debug(cmd.getCommandString()); @@ -121,6 +123,8 @@ DataSegmentKey dataSegmentKey = getDataSegmentKey(receiver.key); int seq = this.seq.getAndIncrement(); Command cmd = new Command(CommandType.PEEK, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null); + cmd.setCompressFlag(option.isCompress()); + dataSegmentKey.runCommand(cmd); if (logger.isDebugEnabled()) logger.debug(cmd.getCommandString());
--- a/src/main/java/alice/datasegment/CompressedRemoteDataSegmentManager.java Fri May 01 18:19:16 2015 +0900 +++ b/src/main/java/alice/datasegment/CompressedRemoteDataSegmentManager.java Sun May 03 10:04:28 2015 +0900 @@ -99,7 +99,9 @@ public void take(Receiver receiver, CodeSegment cs, SendOption option) { int seq = this.seq.getAndIncrement(); Command cmd = new Command(CommandType.TAKE, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null); + cmd.setCompressFlag(option.isCompress()); cmd.setQuickFlag(option.isQuick()); + seqHash.put(seq, cmd); if (option.isQuick()){ connection.write(cmd); @@ -114,7 +116,9 @@ public void peek(Receiver receiver, CodeSegment cs, SendOption option) { int seq = this.seq.getAndIncrement(); Command cmd = new Command(CommandType.PEEK, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null); + cmd.setCompressFlag(option.isCompress()); cmd.setQuickFlag(option.isQuick()); + seqHash.put(seq, cmd); if (option.isQuick()){ connection.write(cmd);
--- a/src/main/java/alice/datasegment/DataSegmentKey.java Fri May 01 18:19:16 2015 +0900 +++ b/src/main/java/alice/datasegment/DataSegmentKey.java Sun May 03 10:04:28 2015 +0900 @@ -31,6 +31,7 @@ dataList.add(dsv); // Process waiting peek and take commands for (Iterator<Command> iter = waitList.iterator(); iter.hasNext(); ) { + System.out.println("in DSkey PUTfor"); Command waitCmd = iter.next(); if (waitCmd.index < index) { System.out.println("DSKey cmdFlag:" + cmd.getCompressFlag()); @@ -51,7 +52,8 @@ boolean waitFlag2 = true; for (DataSegmentValue data : dataList) { if (data.index > cmd.index) { - replyValue(cmd, data); + replyValue(cmd, data, cmd.getCompressFlag()); + //replyValue(cmd, data); waitFlag2 = false; break; } @@ -89,7 +91,7 @@ public void replyValue(Command cmd, DataSegmentValue data){ Command rCmd = new Command(CommandType.REPLY, null, null, data.rData, data.index, cmd.seq, null, null, data.from); - cmd.setCompressFlag(true); + if (cmd.cs!=null){ // if cmd has cs-instance, it means Command from local. cmd.cs.ids.reply(cmd.receiver, rCmd); } else { @@ -108,6 +110,7 @@ public void replyValue(Command cmd, DataSegmentValue data, boolean cFlag){ Command rCmd = new Command(CommandType.REPLY, null, null, data.rData, data.index, cmd.seq, null, null, data.from); rCmd.setCompressFlag(cFlag); + if (cmd.cs!=null){ // if cmd has cs-instance, it means Command from local. cmd.cs.ids.reply(cmd.receiver, rCmd); } else {
--- a/src/main/java/alice/datasegment/ReceiveData.java Fri May 01 18:19:16 2015 +0900 +++ b/src/main/java/alice/datasegment/ReceiveData.java Sun May 03 10:04:28 2015 +0900 @@ -2,9 +2,11 @@ import java.io.*; import java.nio.ByteBuffer; +import java.util.Collection; import java.util.LinkedList; import java.util.zip.*; +import javassist.bytecode.ByteArray; import org.msgpack.MessagePack; import org.msgpack.type.Value; @@ -18,6 +20,7 @@ private Object val;//for Object DS private byte[] messagePack;//for byteArray(serialized) DS private byte[] zMessagePack;//for byteArray(compressed) DS + private int dataSize; private Class<?> clazz; public long time;//測定用 @@ -42,7 +45,9 @@ * * @param messagePack DS本体(byteArray) */ - public ReceiveData(byte[] messagePack, boolean compressed) { + public ReceiveData(byte[] messagePack, boolean compressed, int datasize) { + this.dataSize = datasize; + System.out.println("rData datasize:" + datasize); if (compressed){ this.zMessagePack = messagePack; } else { @@ -109,7 +114,8 @@ } if (zMessagePack != null && messagePack == null) { - messagePack = unzip(zMessagePack, 100);///ToDo:read header and set length + System.out.println("unzip datasize:" + dataSize); + messagePack = unzip(zMessagePack, dataSize);///ToDo:read header and set length } return packer.read(messagePack, clazz); @@ -120,13 +126,51 @@ } } + public byte[] getMessagePack(){ + if (messagePack != null){ + return messagePack; + } else { + try { + messagePack = packer.write(val); + System.out.print("make messagePack1: "); + for (int i = 0; i < messagePack.length; i++) { + System.out.print(Integer.toHexString(messagePack[i] & 0xff)); + } + System.out.print("\n"); + System.out.println("mpLength:" + messagePack.length); + setDataSize(messagePack.length); + } catch (IOException e) { + e.printStackTrace(); + } - public int zip() throws IOException { + return messagePack; + } + } + + public byte[] getZMessagePack(){ + if (zMessagePack != null){ + System.out.println("have zMessagePack"); + return zMessagePack; + } else { + try { + zip(); + + } catch (IOException e) { + e.printStackTrace(); + } + + return zMessagePack; + } + } + + public void zip() throws IOException { + System.out.println("in zip"); LinkedList<ByteBuffer> inputs = new LinkedList<ByteBuffer>(); int inputIndex = 0; LinkedList<ByteBuffer> outputs = new LinkedList<ByteBuffer>(); Deflater deflater = new Deflater(); + inputs.add(ByteBuffer.wrap(getMessagePack())); int len = 0; int INFLATE_BUFSIZE = 1024 * 100;//ToDo:fix ByteBuffer c1 = allocate(INFLATE_BUFSIZE);//for output @@ -141,7 +185,7 @@ int len1 = 0; do { - len1 = deflater.deflate(c1.array(), c1.position(), c1.remaining());///Bytearray for ziped data、start offset、length + len1 = deflater.deflate(c1.array(), c1.position(), c1.remaining()); if (len1 > 0) { len += len1; c1.position(c1.position() + len1); @@ -151,28 +195,46 @@ c1 = allocate(INFLATE_BUFSIZE); } } - } while (len1 > 0 || !deflater.needsInput());//needsInput()...true if setInput is empty + } while (len1 > 0 || !deflater.needsInput()); } if (c1.position() != 0) { c1.flip(); outputs.addLast(c1); } - //zMessagePack = outputs.toArray(); deflater.reset(); - return len;///return length of ziped data + + zMessagePack = new byte[len]; + int tmp = 0; + for (int i = 0; i < outputs.size(); i++){ + System.arraycopy(outputs.get(i).array(), 0, zMessagePack, 0 + tmp, outputs.get(i).limit());//limit? remaining? + tmp += outputs.get(i).limit(); + } + System.out.println("in make zMessagePack1:" + zMessagePack); + System.out.print("in make zMessagePack2: "); + for (int i = 0; i < zMessagePack.length; i++) { + System.out.print(Integer.toHexString(zMessagePack[i] & 0xff)); + } + System.out.print("\n"); } protected byte[] unzip(byte[] input, int zippedLength) {///read header & unzip int length = input.length; Inflater inflater = new Inflater(); + System.out.print("unziped input: "); + for (int i = 0; i < input.length; i++) { + System.out.print(Integer.toHexString(input[i] & 0xff)); + } + System.out.print("\n"); + byte [] output = new byte [zippedLength];///byteArray for unziped data inflater.setInput(input, 0, length);///set unzip data without header try { System.out.println("return:" + inflater.inflate(output, 0, zippedLength));///unzip } catch (DataFormatException e) { + System.out.println("unzip exception:" + e.toString()); e.printStackTrace(); } @@ -208,16 +270,6 @@ return b; } - public byte[] asByteArray() throws IOException{///ToDo : delete - ByteArrayOutputStream buff = new ByteArrayOutputStream(); - ObjectOutput out = new ObjectOutputStream(buff); - out.writeObject(this.val); - byte[] bytes = buff.toByteArray(); - out.close(); - buff.close(); - return bytes; - } - public static int byteArrayToInt(byte[] b) { return b[3] & 0xFF | @@ -236,4 +288,12 @@ }; } + public int getDataSize(){ + return this.dataSize; + } + + public void setDataSize(int datasize){ + this.dataSize = datasize; + } + }