Mercurial > hg > Members > tatsuki > Alice
comparison src/main/java/alice/daemon/IncomingTcpConnection.java @ 345:8f71c3e6f11d
Change directory structure Maven standard
author | sugi |
---|---|
date | Wed, 16 Apr 2014 18:26:07 +0900 |
parents | |
children | 11ba40caa93b |
comparison
equal
deleted
inserted
replaced
344:9f97ec18f8c5 | 345:8f71c3e6f11d |
---|---|
1 package alice.daemon; | |
2 | |
3 import java.io.EOFException; | |
4 import java.io.IOException; | |
5 import java.nio.channels.ClosedChannelException; | |
6 | |
7 import org.msgpack.unpacker.Unpacker; | |
8 | |
9 import alice.codesegment.SingletonMessage; | |
10 import alice.datasegment.Command; | |
11 import alice.datasegment.CommandType; | |
12 import alice.datasegment.DataSegment; | |
13 import alice.datasegment.DataSegmentKey; | |
14 import alice.datasegment.DataSegmentManager; | |
15 import alice.datasegment.LocalDataSegmentManager; | |
16 import alice.topology.HostMessage; | |
17 import alice.topology.manager.keeparive.RespondData; | |
18 import alice.topology.manager.reconnection.SendError; | |
19 | |
20 public class IncomingTcpConnection extends Thread { | |
21 | |
22 public Connection connection; | |
23 public DataSegmentManager manager; | |
24 public String reverseKey; | |
25 private LocalDataSegmentManager lmanager = DataSegment.getLocal(); | |
26 | |
27 public IncomingTcpConnection(Connection connection, DataSegmentManager manager, String reverseKey) { | |
28 this.manager = manager; | |
29 this.connection = connection; | |
30 this.reverseKey = reverseKey; | |
31 } | |
32 | |
33 /** | |
34 * pipeline thread for receiving | |
35 */ | |
36 public void run() { | |
37 Unpacker unpacker = this.getUnpacker(); | |
38 if (unpacker == null) { | |
39 return; | |
40 } | |
41 while (true) { | |
42 try { | |
43 CommandMessage msg = unpacker.read(CommandMessage.class); | |
44 CommandType type = CommandType.getCommandTypeFromId(msg.type); | |
45 switch (type) { | |
46 case UPDATE: | |
47 getDataSegmentKey(msg).runCommand(new Command(type, null, null, msg.val, 0, 0, null, null, reverseKey)); | |
48 break; | |
49 case PUT: | |
50 getDataSegmentKey(msg).runCommand(new Command(type, null, null, msg.val, 0, 0, null, null, reverseKey)); | |
51 break; | |
52 case PEEK: | |
53 getDataSegmentKey(msg).runCommand(new Command(type, null, null, null, msg.index, msg.seq, connection, null, null, msg.flag)); | |
54 break; | |
55 case TAKE: | |
56 getDataSegmentKey(msg).runCommand(new Command(type, null, null, null, msg.index, msg.seq, connection, null, null, msg.flag)); | |
57 break; | |
58 case REMOVE: | |
59 getDataSegmentKey(msg).runCommand(new Command(type, null, null, null, 0, 0, null, null, null)); | |
60 break; | |
61 case REPLY: | |
62 Command cmd = manager.getAndRemoveCmd(msg.seq); | |
63 cmd.cs.ids.reply(cmd.receiver, new Command(type, null, null, msg.val, msg.index, msg.seq, null, null, null)); | |
64 cmd=null; | |
65 break; | |
66 case PING: | |
67 DataSegment.get(reverseKey).response(msg.key); | |
68 break; | |
69 case RESPONSE: | |
70 DataSegment.getLocal().put(msg.key, new RespondData(reverseKey, System.currentTimeMillis())); | |
71 break; | |
72 default: | |
73 break; | |
74 } | |
75 } catch (ClosedChannelException e) { | |
76 connection.sendCommand(new Command(CommandType.CLOSE, null, null, null, 0, 0, null, null, null)); | |
77 return; | |
78 } catch (EOFException e) { | |
79 new SendError(new HostMessage(connection.socket.getInetAddress().getHostName(), connection.socket.getPort())).execute(); | |
80 connection.sendCommand(new Command(CommandType.CLOSE, null, null, null, 0, 0, null, null, null)); | |
81 return; | |
82 } catch (IOException e) { | |
83 e.printStackTrace(); | |
84 } | |
85 } | |
86 } | |
87 | |
88 private Unpacker getUnpacker() { | |
89 Unpacker unpacker = null; | |
90 try { | |
91 unpacker = SingletonMessage.getInstance().createUnpacker(connection.socket.getInputStream()); | |
92 } catch (IOException e2) { | |
93 e2.printStackTrace(); | |
94 } | |
95 return unpacker; | |
96 } | |
97 | |
98 private DataSegmentKey getDataSegmentKey(CommandMessage msg) { | |
99 return lmanager.getDataSegmentKey(msg.key); | |
100 } | |
101 } |