Mercurial > hg > Database > Christie
changeset 116:55e11b053234
refactor msgpack
author | akahori |
---|---|
date | Thu, 06 Dec 2018 02:42:10 +0900 |
parents | e1e919f12ed9 |
children | 53e31b403815 |
files | src/main/java/christie/daemon/AcceptThread.java src/main/java/christie/daemon/IncomingTcpConnection.java src/main/java/christie/datagear/RemoteDataGearManager.java src/main/java/christie/datagear/command/ConvertedCommand.java src/main/java/christie/datagear/command/PutCommand.java src/main/java/christie/datagear/command/RemoteTakeCommand.java |
diffstat | 6 files changed, 91 insertions(+), 151 deletions(-) [+] |
line wrap: on
line diff
--- a/src/main/java/christie/daemon/AcceptThread.java Tue Dec 04 14:20:26 2018 +0900 +++ b/src/main/java/christie/daemon/AcceptThread.java Thu Dec 06 02:42:10 2018 +0900 @@ -24,8 +24,9 @@ try { Socket socket = ss.accept(); socket.setTcpNoDelay(true); - System.out.println("Accept " + socket.getInetAddress().getHostName() + ":" + socket.getPort()); + //System.out.println("Accept " + socket.getInetAddress().getHostName() + ":" + socket.getPort()); Connection connection = new Connection(socket); + connection.name = getName(); String key = "accept" + counter; IncomingTcpConnection in = new IncomingTcpConnection(connection, cgm);
--- a/src/main/java/christie/daemon/IncomingTcpConnection.java Tue Dec 04 14:20:26 2018 +0900 +++ b/src/main/java/christie/daemon/IncomingTcpConnection.java Thu Dec 06 02:42:10 2018 +0900 @@ -6,10 +6,11 @@ import christie.datagear.command.CommandType; import christie.datagear.RemoteMessage; -import christie.datagear.command.ConvertedCommand; import christie.datagear.command.RemotePeekCommand; import christie.datagear.command.RemoteTakeCommand; +import christie.datagear.dg.DataGear; import christie.datagear.dg.MessagePackDataGear; +import com.fasterxml.jackson.core.JsonParser; import com.fasterxml.jackson.databind.ObjectMapper; import org.msgpack.jackson.dataformat.MessagePackFactory; @@ -17,13 +18,10 @@ //import org.msgpack.unpacker.Unpacker; - import java.io.EOFException; import java.io.IOException; import java.io.InputStream; -import java.nio.ByteBuffer; import java.nio.channels.ClosedChannelException; -import java.nio.channels.SocketChannel; public class IncomingTcpConnection extends Thread { @@ -31,91 +29,79 @@ CodeGearManager cgm; Connection connection; ObjectMapper objectMapper = new ObjectMapper(new MessagePackFactory()); - //private MessagePack packer = new MessagePack(); public IncomingTcpConnection(Connection connection, CodeGearManager cgm) { this.connection = connection; this.cgm = cgm; } - public void setManager(RemoteDataGearManager manager){ + public void setManager(RemoteDataGearManager manager) { this.manager = manager; } public void run() { - //Unpacker unpacker = null; - //MessageUnpacker unpacker = null; + + objectMapper.configure(JsonParser.Feature.AUTO_CLOSE_SOURCE, false); - /*try { - //unpacker = packer.createUnpacker(connection.socket.getInputStream()); + InputStream in = null; + try { + in = connection.socket.getInputStream(); } catch (IOException e) { e.printStackTrace(); } - if (unpacker == null) { + if (in == null) { + System.out.println("is null"); return; - }*/ - - ByteBuffer buf = ByteBuffer.allocate(2048); - SocketChannel channel= connection.socket.getChannel(); - + } while (true) { try { - //RemoteMessage msg = unpacker.read(RemoteMessage.class); + + RemoteMessage msg = objectMapper.readValue(in, RemoteMessage.class); - if (channel.read(buf) > 0) { - ConvertedCommand convertedCommand = objectMapper.readValue(buf.array(), ConvertedCommand.class); - RemoteMessage msg = objectMapper.readValue(convertedCommand.getCommand(), RemoteMessage.class); - - CommandType type = CommandType.getCommandTypeFromId(msg.type); - byte[] data; - + CommandType type = CommandType.getCommandTypeFromId(msg.type); + System.out.println("msgkey " + msg.key); - switch (type) { - case PUT: - //data = new byte[unpacker.readInt()]; - //data = new byte[objectMapper.readValue(connectionInputStream, Integer.class)]; - //connection.socket.getInputStream().read(data); - - data = convertedCommand.getData(); + switch (type) { + case PUT: + byte[] msgpackdg = objectMapper.readValue(in, byte[].class); - try { - MessagePackDataGear dg = new MessagePackDataGear(data, Class.forName(msg.clazz)); - cgm.getLocalDGM().put(msg.key, dg); - } catch (ClassNotFoundException e) { - e.printStackTrace(); - } + try { + MessagePackDataGear dg = new MessagePackDataGear(msgpackdg, Class.forName(msg.clazz)); + cgm.getLocalDGM().put(msg.key, dg); + } catch (Exception e) { + e.printStackTrace(); + } - break; + break; - case REMOTEPEEK: - cgm.getLocalDGM().runCommand(new RemotePeekCommand(msg, connection)); - break; - case REMOTETAKE: - cgm.getLocalDGM().runCommand(new RemoteTakeCommand(msg, connection)); + case REMOTEPEEK: + cgm.getLocalDGM().runCommand(new RemotePeekCommand(msg, connection)); + break; + case REMOTETAKE: + cgm.getLocalDGM().runCommand(new RemoteTakeCommand(msg, connection)); - break; - case REPLY://待っていたwaitListに渡してcsにセット - //data = new byte[objectMapper.readValue(convertedCommand, Integer.class)]; - //connection.socket.getInputStream().read(data); - data = convertedCommand.getData(); - try { - MessagePackDataGear dg = new MessagePackDataGear(data, Class.forName(msg.clazz)); - cgm.getDGM(msg.fromDgmName).resolveWaitCommand(msg.key, dg); - } catch (ClassNotFoundException e) { - e.printStackTrace(); - } + break; + case REPLY://待っていたwaitListに渡してcsにセット + //data = new byte[objectMapper.readValue(convertedCommand, Integer.class)]; + //connection.socket.getInputStream().read(data); + try { + MessagePackDataGear dg = new MessagePackDataGear(null, Class.forName(msg.clazz)); + cgm.getDGM(msg.fromDgmName).resolveWaitCommand(msg.key, dg); + } catch (ClassNotFoundException e) { + e.printStackTrace(); + } - break; - default: - break; - } - buf.clear(); + break; + default: + break; } + } catch (ClosedChannelException e) { + e.printStackTrace(); return; - }catch (EOFException e) { - return; + } catch (EOFException e) { + e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); }
--- a/src/main/java/christie/datagear/RemoteDataGearManager.java Tue Dec 04 14:20:26 2018 +0900 +++ b/src/main/java/christie/datagear/RemoteDataGearManager.java Thu Dec 06 02:42:10 2018 +0900 @@ -11,26 +11,31 @@ import java.net.InetSocketAddress; import java.nio.channels.SocketChannel; -public class RemoteDataGearManager extends DataGearManager{ +import static java.lang.Thread.MAX_PRIORITY; - public Connection connection; - public CodeGearManager cgm; +public class RemoteDataGearManager extends DataGearManager{ + private Connection connection; + private CodeGearManager cgm; + boolean connect = false; + Object lock = new Object(); public RemoteDataGearManager(final String dgmName, final String address, final int port, CodeGearManager cgm) { - connection = new Connection(); - connection.name = dgmName; this.cgm = cgm; RemoteDataGearManager manager = this; - new Thread("Connect-" + dgmName) { public void run() { - boolean connect = true; do { try { SocketChannel sc = SocketChannel.open(new InetSocketAddress(address, port)); - connection.socket = sc.socket(); + connection = new Connection(sc.socket()); + connection.name = dgmName; connection.socket.setTcpNoDelay(true); - connect = false; + + + synchronized (lock){ + connect = true; + lock.notify(); + } } catch (IOException e) { try { Thread.sleep(50); @@ -38,7 +43,7 @@ e1.printStackTrace(); } } - } while (connect); + } while (!connect); IncomingTcpConnection in = new IncomingTcpConnection(connection, cgm); in.setManager(manager); in.setName(dgmName+"-IncomingTcp"); @@ -50,11 +55,17 @@ out.start(); } }.start(); + } @Override public void put(String key, Object data) { + Command cm = new PutCommand(0, null, key, new DataGear(data)); + // これ入れないと, connectionがnullの時があるのでしょうがなくwait. + // コンストラクタで呼び出されるThreadをやめて実効すればいんだけどね... + if(!connect) connectWait(); + connection.write(cm); } @@ -93,5 +104,18 @@ connection.sendCommand(cmd); } + // + public void connectWait(){ + synchronized (lock){ + while(!connect){ + try { + lock.wait(); + } catch (InterruptedException e) { + } + } + } + + } + }
--- a/src/main/java/christie/datagear/command/ConvertedCommand.java Tue Dec 04 14:20:26 2018 +0900 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,28 +0,0 @@ -package christie.datagear.command; - - -public class ConvertedCommand { - byte[] command; - byte[] dataSize; - byte[] data; - - public ConvertedCommand(){} - - public ConvertedCommand(byte[] command, byte[] dataSize, byte[] data){ - this.command = command; - this.data = data; - this.dataSize = dataSize; - } - - public byte[] getCommand() { - return command; - } - - public byte[] getData() { - return data; - } - - public byte[] getDataSize() { - return dataSize; - } -}
--- a/src/main/java/christie/datagear/command/PutCommand.java Tue Dec 04 14:20:26 2018 +0900 +++ b/src/main/java/christie/datagear/command/PutCommand.java Thu Dec 06 02:42:10 2018 +0900 @@ -11,6 +11,7 @@ import java.nio.ByteBuffer; public class PutCommand extends Command { + ObjectMapper objectMapper = new ObjectMapper(new MessagePackFactory()); public PutCommand(int cgmID, String toDgmName, String key, DataGear dg){ this.type = CommandType.PUT; @@ -21,26 +22,15 @@ this.clazz = dg.getClazz(); } - /* @Override public ByteBuffer convert() { ByteBuffer buf = null; - //MessagePack packer = new MessagePack(); - - ObjectMapper objectMapper = new ObjectMapper(new MessagePackFactory()); - - try { - //byte[] command = packer.write(createRemoteMessage()); byte[] command = objectMapper.writeValueAsBytes(createRemoteMessage()); - byte[] data = new MessagePackDataGear(dg.getData()).getMessagePack(); - //byte[] dataSize = packer.write(data.length); - byte[] dataSize = objectMapper.writeValueAsBytes(data.length); + byte[] data = objectMapper.writeValueAsBytes(new MessagePackDataGear(dg.getData()).getMessagePack()); - - buf = ByteBuffer.allocate(command.length+dataSize.length+data.length); + buf = ByteBuffer.allocate(command.length+data.length); buf.put(command); - buf.put(dataSize); buf.put(data); buf.flip(); @@ -49,32 +39,6 @@ } return buf; - }*/ - @Override - public ByteBuffer convert() { - ByteBuffer buf = null; - //MessagePack packer = new MessagePack(); - - ObjectMapper objectMapper = new ObjectMapper(new MessagePackFactory()); - //objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL); + } - try { - //byte[] command = packer.write(createRemoteMessage()); - byte[] command = objectMapper.writeValueAsBytes(createRemoteMessage()); - byte[] data = new MessagePackDataGear(dg.getData()).getMessagePack(); - //byte[] dataSize = packer.write(data.length); - byte[] dataSize = objectMapper.writeValueAsBytes(data.length); - - ConvertedCommand convertedCommand = new ConvertedCommand(command, dataSize, data); - byte[] byteCommand = objectMapper.writeValueAsBytes(convertedCommand); - buf = ByteBuffer.allocate(byteCommand.length); - buf.put(byteCommand); - buf.flip(); - - } catch (IOException e) { - e.printStackTrace(); - } - - return buf; - } }
--- a/src/main/java/christie/datagear/command/RemoteTakeCommand.java Tue Dec 04 14:20:26 2018 +0900 +++ b/src/main/java/christie/datagear/command/RemoteTakeCommand.java Thu Dec 06 02:42:10 2018 +0900 @@ -13,6 +13,8 @@ public class RemoteTakeCommand extends Command { + ObjectMapper objectMapper = new ObjectMapper(new MessagePackFactory()); + public RemoteTakeCommand(RemoteMessage msg, Connection cn) { this.type = CommandType.REMOTETAKE; this.fromDgmName = msg.fromDgmName; @@ -38,22 +40,13 @@ @Override public ByteBuffer convert() { ByteBuffer buf = null; - ObjectMapper objectMapper = new ObjectMapper(new MessagePackFactory()); - - //MessagePack packer = new MessagePack(); try { - //byte[] command = packer.write(createRemoteMessage()); byte[] command = objectMapper.writeValueAsBytes(createRemoteMessage()); - ConvertedCommand convertedCommand = new ConvertedCommand(command, null, null); - byte[] byteCommand = objectMapper.writeValueAsBytes(convertedCommand); - - buf = ByteBuffer.allocate(byteCommand.length); - buf.put(byteCommand); - //buf = ByteBuffer.allocate(command.length); - //buf.put(command); + buf = ByteBuffer.allocate(command.length); + buf.put(command); buf.flip(); } catch (IOException e) {