Mercurial > hg > Members > tatsuki > Alice
annotate src/alice/daemon/IncomingTcpConnection.java @ 270:23e53aaa8720
reconnect manager worked.
author | sugi |
---|---|
date | Wed, 21 Aug 2013 15:33:49 +0900 |
parents | 88be2824a989 |
children | 7188fe3f7c95 |
rev | line source |
---|---|
13 | 1 package alice.daemon; |
2 | |
51
919389066887
change protocol header: remove an integer representing message length (work?)
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
43
diff
changeset
|
3 import java.io.EOFException; |
13 | 4 import java.io.IOException; |
42 | 5 import java.nio.channels.ClosedChannelException; |
13 | 6 |
51
919389066887
change protocol header: remove an integer representing message length (work?)
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
43
diff
changeset
|
7 import org.msgpack.unpacker.Unpacker; |
13 | 8 |
126 | 9 import alice.codesegment.SingletonMessage; |
13 | 10 import alice.datasegment.Command; |
11 import alice.datasegment.CommandType; | |
12 import alice.datasegment.DataSegment; | |
13 import alice.datasegment.DataSegmentKey; | |
14
e3f1b21718b0
implements RemoteDataSegment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
13
diff
changeset
|
14 import alice.datasegment.DataSegmentManager; |
13 | 15 import alice.datasegment.LocalDataSegmentManager; |
270 | 16 import alice.topology.HostMessage; |
17 import alice.topology.manager.reconnection.SendError; | |
13 | 18 |
19 public class IncomingTcpConnection extends Thread { | |
20 | |
21 public Connection connection; | |
14
e3f1b21718b0
implements RemoteDataSegment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
13
diff
changeset
|
22 public DataSegmentManager manager; |
28
98ab26e09a98
Configuration Manager work and implements reverseKey
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
17
diff
changeset
|
23 public String reverseKey; |
127 | 24 private LocalDataSegmentManager lmanager = DataSegment.getLocal(); |
69 | 25 |
28
98ab26e09a98
Configuration Manager work and implements reverseKey
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
17
diff
changeset
|
26 public IncomingTcpConnection(Connection connection, DataSegmentManager manager, String reverseKey) { |
14
e3f1b21718b0
implements RemoteDataSegment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
13
diff
changeset
|
27 this.manager = manager; |
13 | 28 this.connection = connection; |
28
98ab26e09a98
Configuration Manager work and implements reverseKey
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
17
diff
changeset
|
29 this.reverseKey = reverseKey; |
13 | 30 } |
31 | |
58 | 32 /** |
33 * pipeline thread for receiving | |
34 */ | |
13 | 35 public void run() { |
51
919389066887
change protocol header: remove an integer representing message length (work?)
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
43
diff
changeset
|
36 Unpacker unpacker = null; |
919389066887
change protocol header: remove an integer representing message length (work?)
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
43
diff
changeset
|
37 try { |
126 | 38 unpacker = SingletonMessage.getInstance().createUnpacker(connection.socket.getInputStream()); |
51
919389066887
change protocol header: remove an integer representing message length (work?)
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
43
diff
changeset
|
39 } catch (IOException e2) { |
919389066887
change protocol header: remove an integer representing message length (work?)
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
43
diff
changeset
|
40 e2.printStackTrace(); |
919389066887
change protocol header: remove an integer representing message length (work?)
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
43
diff
changeset
|
41 } |
13 | 42 while (true) { |
43 try { | |
51
919389066887
change protocol header: remove an integer representing message length (work?)
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
43
diff
changeset
|
44 CommandMessage msg = unpacker.read(CommandMessage.class); |
13 | 45 CommandType type = CommandType.getCommandTypeFromId(msg.type); |
46 switch (type) { | |
47 case UPDATE: | |
240 | 48 getDataSegmentKey(msg).runCommand(new Command(type, null, null, msg.val, 0, 0, null, null, reverseKey)); |
13 | 49 break; |
50 case PUT: | |
240 | 51 getDataSegmentKey(msg).runCommand(new Command(type, null, null, msg.val, 0, 0, null, null, reverseKey)); |
13 | 52 break; |
53 case PEEK: | |
251 | 54 getDataSegmentKey(msg).runCommand(new Command(type, null, null, null, msg.index, msg.seq, connection, null, null, msg.flag)); |
13 | 55 break; |
56 case TAKE: | |
251 | 57 getDataSegmentKey(msg).runCommand(new Command(type, null, null, null, msg.index, msg.seq, connection, null, null, msg.flag)); |
13 | 58 break; |
59 case REMOVE: | |
240 | 60 getDataSegmentKey(msg).runCommand(new Command(type, null, null, null, 0, 0, null, null, null)); |
13 | 61 break; |
62 case REPLY: | |
240 | 63 Command cmd = manager.getAndRemoveCmd(msg.seq); |
64 cmd.cs.ids.reply(cmd.receiver, new Command(type, null, null, msg.val, msg.index, msg.seq, null, null, null)); | |
65 cmd=null; | |
13 | 66 break; |
67 default: | |
68 break; | |
69 } | |
42 | 70 } catch (ClosedChannelException e) { |
69 | 71 connection.sendCommand(new Command(CommandType.CLOSE, null, null, null, 0, 0, null, null, null)); |
42 | 72 return; |
51
919389066887
change protocol header: remove an integer representing message length (work?)
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
43
diff
changeset
|
73 } catch (EOFException e) { |
270 | 74 new SendError(new HostMessage(connection.socket.getInetAddress().getHostName(), connection.socket.getPort())).execute(); |
69 | 75 connection.sendCommand(new Command(CommandType.CLOSE, null, null, null, 0, 0, null, null, null)); |
51
919389066887
change protocol header: remove an integer representing message length (work?)
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
43
diff
changeset
|
76 return; |
13 | 77 } catch (IOException e) { |
78 e.printStackTrace(); | |
79 } | |
80 } | |
81 } | |
64 | 82 private DataSegmentKey getDataSegmentKey(CommandMessage msg) { |
83 return lmanager.getDataSegmentKey(msg.key); | |
84 } | |
13 | 85 } |