Mercurial > hg > Database > Christie
view src/main/java/christie/daemon/IncomingTcpConnection.java @ 22:77583ea56656
add WaitList and implement RemoteTake but not work
author | Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp> |
---|---|
date | Tue, 23 Jan 2018 22:02:54 +0900 |
parents | 5baccb8f7fbd |
children | 695705dba324 |
line wrap: on
line source
package christie.daemon; import christie.codegear.CodeGearManager; import christie.datagear.*; import org.msgpack.MessagePack; import org.msgpack.unpacker.Unpacker; import java.io.IOException; import java.nio.channels.ClosedChannelException; import java.util.concurrent.ConcurrentHashMap; public class IncomingTcpConnection extends Thread { RemoteDataGearManager manager; ConcurrentHashMap<Integer, CodeGearManager> cgms; Connection connection; private MessagePack packer = new MessagePack(); public IncomingTcpConnection(Connection connection, CodeGearManager cgm) { this.connection = connection; this.cgms = cgm.getCgms(); } public void setManager(RemoteDataGearManager manager){ this.manager = manager; } public void run() { Unpacker unpacker = null; try { unpacker = packer.createUnpacker(connection.socket.getInputStream()); } catch (IOException e) { e.printStackTrace(); } if (unpacker == null) { return; } while (true) { try { Command cmd = null; RemoteMessage msg = unpacker.read(RemoteMessage.class); CommandType type = CommandType.getCommandTypeFromId(msg.type); int dataSize = unpacker.readInt(); byte[] data = new byte[dataSize]; switch (type) { case PUT: connection.socket.getInputStream().read(data); try { MessagePackDataGear dg = new MessagePackDataGear(data, Class.forName(msg.clazz)); if (cgms.containsKey(msg.cgmID)){ cgms.get(msg.cgmID).getLocalDGM().put(msg.key, dg); } else { throw new IllegalArgumentException("DGM_ID:" + msg.cgmID + "is not found"); } } catch (ClassNotFoundException e) { e.printStackTrace(); } break; case PEEK: case TAKE: cmd = new Command(type, null, msg.cgmID, msg.dgmName, msg.key, null, null, connection); if (cgms.containsKey(msg.cgmID)){ cgms.get(msg.cgmID).getLocalDGM().take(cmd); } else { throw new IllegalArgumentException("DGM_ID:" + msg.cgmID + "is not found"); } break; case REPLY://待っていたwaitListに渡してcsにセット connection.socket.getInputStream().read(data); try { MessagePackDataGear dg = new MessagePackDataGear(data, Class.forName(msg.clazz)); DataGearManager dgm = cgms.get(msg.cgmID).getDGM(msg.dgmName); Command cm = dgm.waitList.getAndRemoveCommand(msg.key); cm.dg = dg; dgm.runCommand(cm); } catch (ClassNotFoundException e) { e.printStackTrace(); } break; default: break; } } catch (ClosedChannelException e) { return; } catch (IOException e) { e.printStackTrace(); } } } }