Mercurial > hg > Database > Christie
changeset 123:d92f0bbad1eb
fix msgpack downgrade
author | akahori |
---|---|
date | Sat, 15 Dec 2018 17:49:57 +0900 |
parents | 3a0f8a93c84a |
children | cd4b67334b17 |
files | build.gradle src/main/java/christie/daemon/Connection.java src/main/java/christie/daemon/IncomingTcpConnection.java src/main/java/christie/datagear/RemoteMessage.java src/main/java/christie/datagear/command/PutCommand.java src/main/java/christie/datagear/dg/MessagePackDataGear.java src/main/java/christie/test/topology/localTestTopology/StartLocalTestTopology.java src/main/java/christie/topology/HostMessage.java src/main/java/christie/topology/node/ConfigurationFinish.java |
diffstat | 9 files changed, 47 insertions(+), 59 deletions(-) [+] |
line wrap: on
line diff
--- a/build.gradle Tue Dec 11 20:51:29 2018 +0900 +++ b/build.gradle Sat Dec 15 17:49:57 2018 +0900 @@ -16,8 +16,10 @@ dependencies { compile fileTree(dir: 'lib', include: '*.jar') testCompile('org.junit.jupiter:junit-jupiter-api:5.2.0') - compile group: 'org.msgpack', name: 'msgpack-core', version: '0.8.16' - compile group: 'org.msgpack', name: 'jackson-dataformat-msgpack', version: '0.8.16' + //compile group: 'org.msgpack', name: 'msgpack-core', version: '0.8.16' + //compile group: 'org.msgpack', name: 'jackson-dataformat-msgpack', version: '0.8.16' + compile group: 'org.msgpack', name: 'msgpack', version: '0.6.12' + }
--- a/src/main/java/christie/daemon/Connection.java Tue Dec 11 20:51:29 2018 +0900 +++ b/src/main/java/christie/daemon/Connection.java Sat Dec 15 17:49:57 2018 +0900 @@ -64,7 +64,7 @@ while (buffer.hasRemaining()) { socket.getChannel().write(buffer); } - System.out.println("write : " + cmd.key); + //System.out.println("write : " + cmd.key); } catch (Exception e) { e.printStackTrace();
--- a/src/main/java/christie/daemon/IncomingTcpConnection.java Tue Dec 11 20:51:29 2018 +0900 +++ b/src/main/java/christie/daemon/IncomingTcpConnection.java Sat Dec 15 17:49:57 2018 +0900 @@ -8,19 +8,12 @@ import christie.datagear.RemoteMessage; import christie.datagear.command.RemotePeekCommand; import christie.datagear.command.RemoteTakeCommand; -import christie.datagear.dg.DataGear; import christie.datagear.dg.MessagePackDataGear; -import com.fasterxml.jackson.core.JsonParser; -import com.fasterxml.jackson.databind.ObjectMapper; -import org.msgpack.jackson.dataformat.MessagePackFactory; - -//import org.msgpack.MessagePack; -//import org.msgpack.unpacker.Unpacker; - +import org.msgpack.MessagePack; +import org.msgpack.unpacker.Unpacker; import java.io.EOFException; import java.io.IOException; -import java.io.InputStream; import java.nio.channels.ClosedChannelException; public class IncomingTcpConnection extends Thread { @@ -28,48 +21,42 @@ RemoteDataGearManager manager; CodeGearManager cgm; Connection connection; - ObjectMapper objectMapper = new ObjectMapper(new MessagePackFactory()); + private MessagePack packer = new MessagePack(); public IncomingTcpConnection(Connection connection, CodeGearManager cgm) { this.connection = connection; this.cgm = cgm; } - public void setManager(RemoteDataGearManager manager) { + public void setManager(RemoteDataGearManager manager){ this.manager = manager; } public void run() { - - objectMapper.configure(JsonParser.Feature.AUTO_CLOSE_SOURCE, false); - - InputStream in = null; + Unpacker unpacker = null; try { - in = connection.socket.getInputStream(); + unpacker = packer.createUnpacker(connection.socket.getInputStream()); } catch (IOException e) { e.printStackTrace(); } - if (in == null) { - System.out.println("return"); + if (unpacker == null) { return; } - while (true) { try { - - RemoteMessage msg = objectMapper.readValue(in, RemoteMessage.class); - + RemoteMessage msg = unpacker.read(RemoteMessage.class); CommandType type = CommandType.getCommandTypeFromId(msg.type); - System.out.println("read : " + msg.key); + //System.out.println("read " + msg.key); + byte[] data; switch (type) { case PUT: - byte[] msgpackdg = objectMapper.readValue(in, byte[].class); - + data = new byte[unpacker.readInt()]; + connection.socket.getInputStream().read(data); try { - MessagePackDataGear dg = new MessagePackDataGear(msgpackdg, Class.forName(msg.clazz)); + MessagePackDataGear dg = new MessagePackDataGear(data, Class.forName(msg.clazz)); cgm.getLocalDGM().put(msg.key, dg); - } catch (Exception e) { + } catch (ClassNotFoundException e) { e.printStackTrace(); } @@ -83,10 +70,11 @@ break; case REPLY://待っていたwaitListに渡してcsにセット - //data = new byte[objectMapper.readValue(convertedCommand, Integer.class)]; - //connection.socket.getInputStream().read(data); + data = new byte[unpacker.readInt()]; + connection.socket.getInputStream().read(data); + try { - MessagePackDataGear dg = new MessagePackDataGear(null, Class.forName(msg.clazz)); + MessagePackDataGear dg = new MessagePackDataGear(data, Class.forName(msg.clazz)); cgm.getDGM(msg.fromDgmName).resolveWaitCommand(msg.key, dg); } catch (ClassNotFoundException e) { e.printStackTrace(); @@ -96,12 +84,10 @@ default: break; } - } catch (ClosedChannelException e) { - e.printStackTrace(); return; - } catch (EOFException e) { - e.printStackTrace(); + }catch (EOFException e) { + return; } catch (IOException e) { e.printStackTrace(); }
--- a/src/main/java/christie/datagear/RemoteMessage.java Tue Dec 11 20:51:29 2018 +0900 +++ b/src/main/java/christie/datagear/RemoteMessage.java Sat Dec 15 17:49:57 2018 +0900 @@ -1,5 +1,8 @@ package christie.datagear; +import org.msgpack.annotation.Message; + +@Message public class RemoteMessage { public int type;//PUT, PEEKなどのコマンドタイプ public String fromDgmName;//送り元のdsmName。REPLYのときに使用。
--- a/src/main/java/christie/datagear/command/PutCommand.java Tue Dec 11 20:51:29 2018 +0900 +++ b/src/main/java/christie/datagear/command/PutCommand.java Sat Dec 15 17:49:57 2018 +0900 @@ -4,14 +4,12 @@ import christie.datagear.command.CommandType; import christie.datagear.dg.DataGear; import christie.datagear.dg.MessagePackDataGear; -import com.fasterxml.jackson.databind.ObjectMapper; -import org.msgpack.jackson.dataformat.MessagePackFactory; +import org.msgpack.MessagePack; import java.io.IOException; import java.nio.ByteBuffer; public class PutCommand extends Command { - ObjectMapper objectMapper = new ObjectMapper(new MessagePackFactory()); public PutCommand(int cgmID, String toDgmName, String key, DataGear dg){ this.type = CommandType.PUT; @@ -25,20 +23,23 @@ @Override public ByteBuffer convert() { ByteBuffer buf = null; + MessagePack packer = new MessagePack(); + try { - byte[] command = objectMapper.writeValueAsBytes(createRemoteMessage()); - byte[] data = objectMapper.writeValueAsBytes(new MessagePackDataGear(dg.getData()).getMessagePack()); + byte[] command = packer.write(createRemoteMessage()); + byte[] data = new MessagePackDataGear(dg.getData()).getMessagePack(); + byte[] dataSize = packer.write(data.length); - buf = ByteBuffer.allocate(command.length+data.length); + buf = ByteBuffer.allocate(command.length+dataSize.length+data.length); buf.put(command); + buf.put(dataSize); buf.put(data); + buf.flip(); - } catch (IOException e) { e.printStackTrace(); } return buf; } - }
--- a/src/main/java/christie/datagear/dg/MessagePackDataGear.java Tue Dec 11 20:51:29 2018 +0900 +++ b/src/main/java/christie/datagear/dg/MessagePackDataGear.java Sat Dec 15 17:49:57 2018 +0900 @@ -1,18 +1,13 @@ package christie.datagear.dg; -//import org.msgpack.MessagePack; -import com.fasterxml.jackson.databind.ObjectMapper; -import org.msgpack.jackson.dataformat.MessagePackFactory; - +import org.msgpack.MessagePack; import java.io.IOException; public class MessagePackDataGear<T> extends DataGear {//必ずmessagePack形式を持つDataGear private byte[] messagePack = null; private int dataSize; - //private MessagePack packer = new MessagePack(); - ObjectMapper objectMapper = new ObjectMapper(new MessagePackFactory()); - + private MessagePack packer = new MessagePack(); public MessagePackDataGear(T data){ super(data); @@ -33,7 +28,7 @@ return messagePack; } else { try { - messagePack = objectMapper.writeValueAsBytes(data); + messagePack = packer.write(data); setDataSize(messagePack.length); } catch (IOException e) { e.printStackTrace(); @@ -46,7 +41,7 @@ public synchronized T getData(){ if (data == null){ try { - setData(objectMapper.readValue(messagePack, clazz)); + setData(packer.read(messagePack, clazz)); } catch (IOException e) { e.printStackTrace(); }
--- a/src/main/java/christie/test/topology/localTestTopology/StartLocalTestTopology.java Tue Dec 11 20:51:29 2018 +0900 +++ b/src/main/java/christie/test/topology/localTestTopology/StartLocalTestTopology.java Sat Dec 15 17:49:57 2018 +0900 @@ -22,17 +22,17 @@ String[] managerArg = {"--localPort", String.valueOf(managerPort), "--confFile", "scripts/ring.dot"}; TopologyManagerConfig topologyManagerConfig = new TopologyManagerConfig(managerArg); new StartTopologyManager(topologyManagerConfig); - + CodeGearManager nodeCGM = null; for (int i = 1; i<=nodeNum; i++) { - CodeGearManager nodeCGM = createCGM(managerPort + i); + nodeCGM = createCGM(managerPort + i); String[] nodeArg = {"--managerPort", String.valueOf(managerPort), "--managerHost", "localhost"}; TopologyNodeConfig cs = new TopologyNodeConfig(nodeArg); new StartTopologyNode(nodeCGM, cs, new LTRemoteIncrement()); - nodeCGM.getLocalDGM().put("num", 0); } + nodeCGM.getLocalDGM().put("num", 0); }
--- a/src/main/java/christie/topology/HostMessage.java Tue Dec 11 20:51:29 2018 +0900 +++ b/src/main/java/christie/topology/HostMessage.java Sat Dec 15 17:49:57 2018 +0900 @@ -1,9 +1,11 @@ package christie.topology; +import org.msgpack.annotation.Message; + import java.net.InetAddress; import java.net.UnknownHostException; - +@Message public class HostMessage { private String hostName; private int port;
--- a/src/main/java/christie/topology/node/ConfigurationFinish.java Tue Dec 11 20:51:29 2018 +0900 +++ b/src/main/java/christie/topology/node/ConfigurationFinish.java Sat Dec 15 17:49:57 2018 +0900 @@ -25,7 +25,6 @@ @Override protected void run(CodeGearManager cgm) { reverseCount++; - System.out.println(reverseCount + " " + connectNodeNum); if (reverseCount == connectNodeNum) { getDGM(topologyNodeConfig.getManagerKey()).put("nodePrepareDone", "done"); cgm.setup(new Start());