Mercurial > hg > Members > tatsuki > Alice
view src/alice/daemon/IncomingTcpConnection.java @ 284:7188fe3f7c95
Fix error handling
author | kazz <kazz@cr.ie.u-ryukyu.ac.jp> |
---|---|
date | Fri, 18 Oct 2013 02:55:37 +0900 |
parents | 23e53aaa8720 |
children | a8255a831ade |
line wrap: on
line source
package alice.daemon; import java.io.EOFException; import java.io.IOException; import java.nio.channels.ClosedChannelException; import org.msgpack.unpacker.Unpacker; import alice.codesegment.SingletonMessage; import alice.datasegment.Command; import alice.datasegment.CommandType; import alice.datasegment.DataSegment; import alice.datasegment.DataSegmentKey; import alice.datasegment.DataSegmentManager; import alice.datasegment.LocalDataSegmentManager; import alice.topology.HostMessage; import alice.topology.manager.reconnection.SendError; public class IncomingTcpConnection extends Thread { public Connection connection; public DataSegmentManager manager; public String reverseKey; private LocalDataSegmentManager lmanager = DataSegment.getLocal(); public IncomingTcpConnection(Connection connection, DataSegmentManager manager, String reverseKey) { this.manager = manager; this.connection = connection; this.reverseKey = reverseKey; } /** * pipeline thread for receiving */ public void run() { Unpacker unpacker = this.getUnpacker(); if (unpacker == null) { return; } while (true) { try { CommandMessage msg = unpacker.read(CommandMessage.class); CommandType type = CommandType.getCommandTypeFromId(msg.type); switch (type) { case UPDATE: getDataSegmentKey(msg).runCommand(new Command(type, null, null, msg.val, 0, 0, null, null, reverseKey)); break; case PUT: getDataSegmentKey(msg).runCommand(new Command(type, null, null, msg.val, 0, 0, null, null, reverseKey)); break; case PEEK: getDataSegmentKey(msg).runCommand(new Command(type, null, null, null, msg.index, msg.seq, connection, null, null, msg.flag)); break; case TAKE: getDataSegmentKey(msg).runCommand(new Command(type, null, null, null, msg.index, msg.seq, connection, null, null, msg.flag)); break; case REMOVE: getDataSegmentKey(msg).runCommand(new Command(type, null, null, null, 0, 0, null, null, null)); break; case REPLY: Command cmd = manager.getAndRemoveCmd(msg.seq); cmd.cs.ids.reply(cmd.receiver, new Command(type, null, null, msg.val, msg.index, msg.seq, null, null, null)); cmd=null; break; default: break; } } catch (ClosedChannelException e) { connection.sendCommand(new Command(CommandType.CLOSE, null, null, null, 0, 0, null, null, null)); return; } catch (EOFException e) { new SendError(new HostMessage(connection.socket.getInetAddress().getHostName(), connection.socket.getPort())).execute(); connection.sendCommand(new Command(CommandType.CLOSE, null, null, null, 0, 0, null, null, null)); return; } catch (IOException e) { e.printStackTrace(); } } } private Unpacker getUnpacker() { Unpacker unpacker = null; try { unpacker = SingletonMessage.getInstance().createUnpacker(connection.socket.getInputStream()); } catch (IOException e2) { e2.printStackTrace(); } return unpacker; } private DataSegmentKey getDataSegmentKey(CommandMessage msg) { return lmanager.getDataSegmentKey(msg.key); } }