Mercurial > hg > Members > tatsuki > Alice
annotate src/alice/daemon/IncomingTcpConnection.java @ 58:ebdcab7b9b04
add comment
author | one |
---|---|
date | Wed, 08 Feb 2012 17:06:39 +0900 |
parents | f9c82d9a4936 |
children | 7aaadd08288c |
rev | line source |
---|---|
13 | 1 package alice.daemon; |
2 | |
51
919389066887
change protocol header: remove an integer representing message length (work?)
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
43
diff
changeset
|
3 import java.io.EOFException; |
13 | 4 import java.io.IOException; |
42 | 5 import java.nio.channels.ClosedChannelException; |
13 | 6 |
7 import org.msgpack.MessagePack; | |
51
919389066887
change protocol header: remove an integer representing message length (work?)
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
43
diff
changeset
|
8 import org.msgpack.unpacker.Unpacker; |
13 | 9 |
10 import alice.datasegment.Command; | |
11 import alice.datasegment.CommandType; | |
12 import alice.datasegment.DataSegment; | |
13 import alice.datasegment.DataSegmentKey; | |
14
e3f1b21718b0
implements RemoteDataSegment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
13
diff
changeset
|
14 import alice.datasegment.DataSegmentManager; |
13 | 15 import alice.datasegment.LocalDataSegmentManager; |
16 | |
17 public class IncomingTcpConnection extends Thread { | |
18 | |
53
f9c82d9a4936
move msgpack local val to static field;
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
51
diff
changeset
|
19 private static MessagePack MSGPACK = new MessagePack(); |
13 | 20 public Connection connection; |
14
e3f1b21718b0
implements RemoteDataSegment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
13
diff
changeset
|
21 public DataSegmentManager manager; |
28
98ab26e09a98
Configuration Manager work and implements reverseKey
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
17
diff
changeset
|
22 public String reverseKey; |
98ab26e09a98
Configuration Manager work and implements reverseKey
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
17
diff
changeset
|
23 public IncomingTcpConnection(Connection connection, DataSegmentManager manager, String reverseKey) { |
14
e3f1b21718b0
implements RemoteDataSegment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
13
diff
changeset
|
24 this.manager = manager; |
13 | 25 this.connection = connection; |
28
98ab26e09a98
Configuration Manager work and implements reverseKey
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
17
diff
changeset
|
26 this.reverseKey = reverseKey; |
13 | 27 } |
28 | |
58 | 29 /** |
30 * pipeline thread for receiving | |
31 */ | |
13 | 32 public void run() { |
51
919389066887
change protocol header: remove an integer representing message length (work?)
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
43
diff
changeset
|
33 Unpacker unpacker = null; |
919389066887
change protocol header: remove an integer representing message length (work?)
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
43
diff
changeset
|
34 try { |
53
f9c82d9a4936
move msgpack local val to static field;
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
51
diff
changeset
|
35 unpacker = MSGPACK.createUnpacker(connection.socket.getInputStream()); |
51
919389066887
change protocol header: remove an integer representing message length (work?)
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
43
diff
changeset
|
36 } catch (IOException e2) { |
919389066887
change protocol header: remove an integer representing message length (work?)
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
43
diff
changeset
|
37 e2.printStackTrace(); |
919389066887
change protocol header: remove an integer representing message length (work?)
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
43
diff
changeset
|
38 } |
13 | 39 while (true) { |
40 try { | |
51
919389066887
change protocol header: remove an integer representing message length (work?)
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
43
diff
changeset
|
41 CommandMessage msg = unpacker.read(CommandMessage.class); |
13 | 42 CommandType type = CommandType.getCommandTypeFromId(msg.type); |
16 | 43 LocalDataSegmentManager lmanager = (LocalDataSegmentManager)DataSegment.get("local"); |
14
e3f1b21718b0
implements RemoteDataSegment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
13
diff
changeset
|
44 DataSegmentKey dsKey = lmanager.getDataSegmentKey(msg.key); |
13 | 45 switch (type) { |
46 case UPDATE: | |
28
98ab26e09a98
Configuration Manager work and implements reverseKey
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
17
diff
changeset
|
47 dsKey.addCommand(new Command(type, null, null, msg.val, 0, 0, null, null, reverseKey)); |
13 | 48 break; |
49 case PUT: | |
28
98ab26e09a98
Configuration Manager work and implements reverseKey
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
17
diff
changeset
|
50 dsKey.addCommand(new Command(type, null, null, msg.val, 0, 0, null, null, reverseKey)); |
13 | 51 break; |
52 case PEEK: | |
53 //Command(CommandType cmdType, String argKey, Value val, int index, int seq, BlockingQueue<Command> replyQueue, CodeSegment cs) { | |
28
98ab26e09a98
Configuration Manager work and implements reverseKey
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
17
diff
changeset
|
54 dsKey.addCommand(new Command(type, null, null, null, msg.index, msg.seq, connection.sendQueue, null, null)); |
13 | 55 break; |
56 case TAKE: | |
28
98ab26e09a98
Configuration Manager work and implements reverseKey
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
17
diff
changeset
|
57 dsKey.addCommand(new Command(type, null, null, null, msg.index, msg.seq, connection.sendQueue, null, null)); |
13 | 58 break; |
59 case REMOVE: | |
28
98ab26e09a98
Configuration Manager work and implements reverseKey
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
17
diff
changeset
|
60 dsKey.addCommand(new Command(type, null, null, null, 0, 0, null, null, null)); |
13 | 61 break; |
62 case REPLY: | |
63 try { | |
28
98ab26e09a98
Configuration Manager work and implements reverseKey
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
17
diff
changeset
|
64 manager.replyQueue.put(new Command(type, null, null, msg.val, msg.index, msg.seq, null, null, null)); |
13 | 65 } catch (InterruptedException e) { |
66 e.printStackTrace(); | |
67 } | |
68 break; | |
69 default: | |
70 break; | |
71 } | |
42 | 72 } catch (ClosedChannelException e) { |
43 | 73 try { |
74 connection.sendQueue.put(new Command(CommandType.CLOSE, null, null, null, 0, 0, null, null, null)); | |
75 } catch (InterruptedException e1) { | |
76 e1.printStackTrace(); | |
77 } | |
42 | 78 return; |
51
919389066887
change protocol header: remove an integer representing message length (work?)
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
43
diff
changeset
|
79 } catch (EOFException e) { |
919389066887
change protocol header: remove an integer representing message length (work?)
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
43
diff
changeset
|
80 try { |
919389066887
change protocol header: remove an integer representing message length (work?)
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
43
diff
changeset
|
81 connection.sendQueue.put(new Command(CommandType.CLOSE, null, null, null, 0, 0, null, null, null)); |
919389066887
change protocol header: remove an integer representing message length (work?)
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
43
diff
changeset
|
82 } catch (InterruptedException e1) { |
919389066887
change protocol header: remove an integer representing message length (work?)
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
43
diff
changeset
|
83 e1.printStackTrace(); |
919389066887
change protocol header: remove an integer representing message length (work?)
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
43
diff
changeset
|
84 } |
919389066887
change protocol header: remove an integer representing message length (work?)
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
43
diff
changeset
|
85 return; |
13 | 86 } catch (IOException e) { |
87 e.printStackTrace(); | |
88 } | |
89 } | |
90 } | |
91 | |
92 } |