345
|
1 package alice.daemon;
|
|
2
|
|
3 import java.io.EOFException;
|
|
4 import java.io.IOException;
|
|
5 import java.nio.channels.ClosedChannelException;
|
|
6
|
|
7 import org.msgpack.unpacker.Unpacker;
|
|
8
|
|
9 import alice.codesegment.SingletonMessage;
|
|
10 import alice.datasegment.Command;
|
|
11 import alice.datasegment.CommandType;
|
|
12 import alice.datasegment.DataSegment;
|
|
13 import alice.datasegment.DataSegmentManager;
|
|
14 import alice.datasegment.LocalDataSegmentManager;
|
|
15 import alice.topology.HostMessage;
|
|
16 import alice.topology.manager.keeparive.RespondData;
|
|
17 import alice.topology.manager.reconnection.SendError;
|
|
18
|
|
19 public class IncomingTcpConnection extends Thread {
|
|
20
|
|
21 public Connection connection;
|
|
22 public DataSegmentManager manager;
|
|
23 public String reverseKey;
|
363
|
24 protected LocalDataSegmentManager lmanager = DataSegment.getLocal();
|
345
|
25
|
|
26 public IncomingTcpConnection(Connection connection, DataSegmentManager manager, String reverseKey) {
|
|
27 this.manager = manager;
|
|
28 this.connection = connection;
|
|
29 this.reverseKey = reverseKey;
|
|
30 }
|
|
31
|
|
32 /**
|
|
33 * pipeline thread for receiving
|
|
34 */
|
|
35 public void run() {
|
363
|
36 Unpacker unpacker = null;
|
|
37 try {
|
|
38 unpacker = SingletonMessage.getInstance().createUnpacker(connection.socket.getInputStream());
|
|
39 } catch (IOException e) {
|
|
40 e.printStackTrace();
|
|
41 }
|
345
|
42 if (unpacker == null) {
|
|
43 return;
|
|
44 }
|
|
45 while (true) {
|
|
46 try {
|
|
47 CommandMessage msg = unpacker.read(CommandMessage.class);
|
|
48 CommandType type = CommandType.getCommandTypeFromId(msg.type);
|
|
49 switch (type) {
|
|
50 case UPDATE:
|
359
|
51 lmanager.getDataSegmentKey(msg.key)
|
|
52 .runCommand(new Command(type, null, null, msg.val, 0, 0, null, null, reverseKey));
|
345
|
53 break;
|
|
54 case PUT:
|
359
|
55 lmanager.getDataSegmentKey(msg.key)
|
|
56 .runCommand(new Command(type, null, null, msg.val, 0, 0, null, null, reverseKey));
|
345
|
57 break;
|
|
58 case PEEK:
|
359
|
59 lmanager.getDataSegmentKey(msg.key)
|
|
60 .runCommand(new Command(type, null, null, null, msg.index, msg.seq, connection, null, null, msg.flag));
|
345
|
61 break;
|
|
62 case TAKE:
|
359
|
63 lmanager.getDataSegmentKey(msg.key)
|
|
64 .runCommand(new Command(type, null, null, null, msg.index, msg.seq, connection, null, null, msg.flag));
|
345
|
65 break;
|
|
66 case REMOVE:
|
359
|
67 lmanager.getDataSegmentKey(msg.key)
|
|
68 .runCommand(new Command(type, null, null, null, 0, 0, null, null, null));
|
345
|
69 break;
|
|
70 case REPLY:
|
|
71 Command cmd = manager.getAndRemoveCmd(msg.seq);
|
|
72 cmd.cs.ids.reply(cmd.receiver, new Command(type, null, null, msg.val, msg.index, msg.seq, null, null, null));
|
|
73 cmd=null;
|
|
74 break;
|
|
75 case PING:
|
|
76 DataSegment.get(reverseKey).response(msg.key);
|
|
77 break;
|
|
78 case RESPONSE:
|
|
79 DataSegment.getLocal().put(msg.key, new RespondData(reverseKey, System.currentTimeMillis()));
|
|
80 break;
|
|
81 default:
|
|
82 break;
|
|
83 }
|
|
84 } catch (ClosedChannelException e) {
|
|
85 connection.sendCommand(new Command(CommandType.CLOSE, null, null, null, 0, 0, null, null, null));
|
|
86 return;
|
|
87 } catch (EOFException e) {
|
|
88 new SendError(new HostMessage(connection.socket.getInetAddress().getHostName(), connection.socket.getPort())).execute();
|
|
89 connection.sendCommand(new Command(CommandType.CLOSE, null, null, null, 0, 0, null, null, null));
|
|
90 return;
|
|
91 } catch (IOException e) {
|
|
92 e.printStackTrace();
|
|
93 }
|
|
94 }
|
|
95 }
|
|
96
|
|
97 }
|