Mercurial > hg > Database > Alice
view src/main/java/alice/daemon/IncomingTcpConnection.java @ 447:d30451d1882f dispose
fix IncomingUdpConnection and passed UdpTest
author | sugi |
---|---|
date | Mon, 27 Oct 2014 01:29:32 +0900 |
parents | a91890dff56e |
children | 09a80f83c605 |
line wrap: on
line source
package alice.daemon; import java.io.EOFException; import java.io.IOException; import java.nio.channels.ClosedChannelException; import org.msgpack.unpacker.MessagePackUnpacker; import org.msgpack.unpacker.Unpacker; import alice.codesegment.SingletonMessage; import alice.datasegment.Command; import alice.datasegment.CommandType; import alice.datasegment.DataSegment; import alice.datasegment.DataSegmentManager; import alice.datasegment.LocalDataSegmentManager; import alice.topology.manager.keeparive.RespondData; public class IncomingTcpConnection extends Thread { public Connection connection; public DataSegmentManager manager; public String reverseKey; private LocalDataSegmentManager lmanager = DataSegment.getLocal(); public IncomingTcpConnection(DataSegmentManager manager) { this.manager = manager; } public IncomingTcpConnection(Connection connection, DataSegmentManager manager, String reverseKey) { this.manager = manager; this.connection = connection; this.reverseKey = reverseKey; } public LocalDataSegmentManager getLocalDataSegmentManager(){ return lmanager; } /** * pipeline thread for receiving */ public void run() { Unpacker unpacker = null; try { unpacker = SingletonMessage.getInstance().createUnpacker(connection.socket.getInputStream()); } catch (IOException e) { e.printStackTrace(); } if (unpacker == null) { return; } while (true) { try { Command cmd = null; byte[] val = null; CommandMessage msg = unpacker.read(CommandMessage.class); CommandType type = CommandType.getCommandTypeFromId(msg.type); switch (type) { case UPDATE: case PUT: val = getSerializedByteArray(unpacker); cmd = new Command(type, null, null, val, 0, 0, null, null, reverseKey); lmanager.getDataSegmentKey(msg.key).runCommand(cmd); break; case PEEK: case TAKE: cmd = new Command(type, null, null, null, msg.index, msg.seq, null, null, connection); cmd.setQuickFlag(msg.flag); lmanager.getDataSegmentKey(msg.key).runCommand(cmd); break; case REMOVE: cmd = new Command(type, null, null, null, 0, 0, null, null, ""); lmanager.getDataSegmentKey(msg.key).runCommand(cmd); break; case REPLY: cmd = manager.getAndRemoveCmd(msg.seq); val = getSerializedByteArray(unpacker); cmd.cs.ids.reply(cmd.receiver, new Command(type, null, null, val, msg.index, msg.seq, null, null, "")); break; case PING: DataSegment.get(reverseKey).response(msg.key); break; case RESPONSE: DataSegment.getLocal().put(msg.key, new RespondData(reverseKey, System.currentTimeMillis())); break; default: break; } } catch (ClosedChannelException e) { connection.putConnectionInfo(); connection.sendCommand(new Command(CommandType.CLOSE, null, null, null, 0, 0, null, null, reverseKey)); return; } catch (EOFException e) { connection.sendCommand(new Command(CommandType.CLOSE, null, null, null, 0, 0, null, null, reverseKey)); return; } catch (IOException e) { e.printStackTrace(); } } } private byte[] getSerializedByteArray(Unpacker unpacker) { int len; byte[] b = null; try { len = unpacker.readInt(); b = ((MessagePackUnpacker) unpacker).getSerializedByteArray(len); } catch (IOException e) { e.printStackTrace(); } return b; } }