comparison src/main/java/alice/daemon/IncomingTcpConnection.java @ 345:8f71c3e6f11d

Change directory structure Maven standard
author sugi
date Wed, 16 Apr 2014 18:26:07 +0900
parents
children 11ba40caa93b
comparison
equal deleted inserted replaced
344:9f97ec18f8c5 345:8f71c3e6f11d
1 package alice.daemon;
2
3 import java.io.EOFException;
4 import java.io.IOException;
5 import java.nio.channels.ClosedChannelException;
6
7 import org.msgpack.unpacker.Unpacker;
8
9 import alice.codesegment.SingletonMessage;
10 import alice.datasegment.Command;
11 import alice.datasegment.CommandType;
12 import alice.datasegment.DataSegment;
13 import alice.datasegment.DataSegmentKey;
14 import alice.datasegment.DataSegmentManager;
15 import alice.datasegment.LocalDataSegmentManager;
16 import alice.topology.HostMessage;
17 import alice.topology.manager.keeparive.RespondData;
18 import alice.topology.manager.reconnection.SendError;
19
20 public class IncomingTcpConnection extends Thread {
21
22 public Connection connection;
23 public DataSegmentManager manager;
24 public String reverseKey;
25 private LocalDataSegmentManager lmanager = DataSegment.getLocal();
26
27 public IncomingTcpConnection(Connection connection, DataSegmentManager manager, String reverseKey) {
28 this.manager = manager;
29 this.connection = connection;
30 this.reverseKey = reverseKey;
31 }
32
33 /**
34 * pipeline thread for receiving
35 */
36 public void run() {
37 Unpacker unpacker = this.getUnpacker();
38 if (unpacker == null) {
39 return;
40 }
41 while (true) {
42 try {
43 CommandMessage msg = unpacker.read(CommandMessage.class);
44 CommandType type = CommandType.getCommandTypeFromId(msg.type);
45 switch (type) {
46 case UPDATE:
47 getDataSegmentKey(msg).runCommand(new Command(type, null, null, msg.val, 0, 0, null, null, reverseKey));
48 break;
49 case PUT:
50 getDataSegmentKey(msg).runCommand(new Command(type, null, null, msg.val, 0, 0, null, null, reverseKey));
51 break;
52 case PEEK:
53 getDataSegmentKey(msg).runCommand(new Command(type, null, null, null, msg.index, msg.seq, connection, null, null, msg.flag));
54 break;
55 case TAKE:
56 getDataSegmentKey(msg).runCommand(new Command(type, null, null, null, msg.index, msg.seq, connection, null, null, msg.flag));
57 break;
58 case REMOVE:
59 getDataSegmentKey(msg).runCommand(new Command(type, null, null, null, 0, 0, null, null, null));
60 break;
61 case REPLY:
62 Command cmd = manager.getAndRemoveCmd(msg.seq);
63 cmd.cs.ids.reply(cmd.receiver, new Command(type, null, null, msg.val, msg.index, msg.seq, null, null, null));
64 cmd=null;
65 break;
66 case PING:
67 DataSegment.get(reverseKey).response(msg.key);
68 break;
69 case RESPONSE:
70 DataSegment.getLocal().put(msg.key, new RespondData(reverseKey, System.currentTimeMillis()));
71 break;
72 default:
73 break;
74 }
75 } catch (ClosedChannelException e) {
76 connection.sendCommand(new Command(CommandType.CLOSE, null, null, null, 0, 0, null, null, null));
77 return;
78 } catch (EOFException e) {
79 new SendError(new HostMessage(connection.socket.getInetAddress().getHostName(), connection.socket.getPort())).execute();
80 connection.sendCommand(new Command(CommandType.CLOSE, null, null, null, 0, 0, null, null, null));
81 return;
82 } catch (IOException e) {
83 e.printStackTrace();
84 }
85 }
86 }
87
88 private Unpacker getUnpacker() {
89 Unpacker unpacker = null;
90 try {
91 unpacker = SingletonMessage.getInstance().createUnpacker(connection.socket.getInputStream());
92 } catch (IOException e2) {
93 e2.printStackTrace();
94 }
95 return unpacker;
96 }
97
98 private DataSegmentKey getDataSegmentKey(CommandMessage msg) {
99 return lmanager.getDataSegmentKey(msg.key);
100 }
101 }