Mercurial > hg > Database > Alice
annotate src/main/java/alice/daemon/IncomingTcpConnection.java @ 488:7ef0ebb40c9b dispose
add measurement data in protocol
author | sugi |
---|---|
date | Mon, 08 Dec 2014 23:11:26 +0900 |
parents | c06070403ed4 |
children | 118e150ac9f3 |
rev | line source |
---|---|
345 | 1 package alice.daemon; |
2 | |
3 import java.io.EOFException; | |
4 import java.io.IOException; | |
5 import java.nio.channels.ClosedChannelException; | |
6 | |
443 | 7 import org.msgpack.unpacker.MessagePackUnpacker; |
345 | 8 import org.msgpack.unpacker.Unpacker; |
9 | |
10 import alice.codesegment.SingletonMessage; | |
11 import alice.datasegment.Command; | |
12 import alice.datasegment.CommandType; | |
13 import alice.datasegment.DataSegment; | |
14 import alice.datasegment.DataSegmentManager; | |
15 import alice.datasegment.LocalDataSegmentManager; | |
458 | 16 import alice.datasegment.ReceiveData; |
345 | 17 import alice.topology.manager.keeparive.RespondData; |
18 | |
19 public class IncomingTcpConnection extends Thread { | |
419 | 20 |
480 | 21 private Connection connection; |
22 protected DataSegmentManager manager; | |
23 protected String reverseKey; | |
419 | 24 private LocalDataSegmentManager lmanager = DataSegment.getLocal(); |
25 | |
26 public IncomingTcpConnection(DataSegmentManager manager) { | |
27 this.manager = manager; | |
28 } | |
29 | |
30 public IncomingTcpConnection(Connection connection, DataSegmentManager manager, String reverseKey) { | |
31 this.manager = manager; | |
32 this.connection = connection; | |
33 this.reverseKey = reverseKey; | |
34 } | |
35 | |
36 public LocalDataSegmentManager getLocalDataSegmentManager(){ | |
37 return lmanager; | |
38 } | |
345 | 39 |
419 | 40 /** |
41 * pipeline thread for receiving | |
42 */ | |
43 public void run() { | |
44 Unpacker unpacker = null; | |
45 try { | |
445 | 46 unpacker = SingletonMessage.getInstance().createUnpacker(connection.socket.getInputStream()); |
419 | 47 } catch (IOException e) { |
48 e.printStackTrace(); | |
49 } | |
50 if (unpacker == null) { | |
51 return; | |
52 } | |
53 while (true) { | |
54 try { | |
447 | 55 Command cmd = null; |
458 | 56 ReceiveData rData = null; |
419 | 57 CommandMessage msg = unpacker.read(CommandMessage.class); |
58 CommandType type = CommandType.getCommandTypeFromId(msg.type); | |
59 switch (type) { | |
60 case UPDATE: | |
61 case PUT: | |
458 | 62 rData = new ReceiveData(getSerializedByteArray(unpacker), msg.compressed, msg.serialized); |
488 | 63 if (msg.setTime) { |
64 rData.setTime = true; | |
65 rData.time = msg.time; | |
66 rData.depth = msg.depth; | |
67 } | |
458 | 68 cmd = new Command(type, null, null, rData, 0, 0, null, null, reverseKey); |
446 | 69 lmanager.getDataSegmentKey(msg.key).runCommand(cmd); |
419 | 70 break; |
71 case PEEK: | |
72 case TAKE: | |
446 | 73 cmd = new Command(type, null, null, null, msg.index, msg.seq, null, null, connection); |
449 | 74 cmd.setQuickFlag(msg.quickFlag); |
446 | 75 lmanager.getDataSegmentKey(msg.key).runCommand(cmd); |
467 | 76 break; |
419 | 77 case REMOVE: |
446 | 78 cmd = new Command(type, null, null, null, 0, 0, null, null, ""); |
79 lmanager.getDataSegmentKey(msg.key).runCommand(cmd); | |
419 | 80 break; |
81 case REPLY: | |
446 | 82 cmd = manager.getAndRemoveCmd(msg.seq); |
458 | 83 rData = new ReceiveData(getSerializedByteArray(unpacker), msg.compressed, msg.serialized); |
467 | 84 Command rCmd = new Command(type, null, null, rData, msg.index, msg.seq, null, null, ""); |
452 | 85 cmd.cs.ids.reply(cmd.receiver, rCmd); |
419 | 86 break; |
87 case PING: | |
471
be0b61986ff7
checking having DataSegmentManger before get DataSegmentManager
sugi
parents:
467
diff
changeset
|
88 if (DataSegment.contains(reverseKey)) |
be0b61986ff7
checking having DataSegmentManger before get DataSegmentManager
sugi
parents:
467
diff
changeset
|
89 DataSegment.get(reverseKey).response(msg.key); |
419 | 90 break; |
91 case RESPONSE: | |
458 | 92 rData = new ReceiveData(new RespondData(reverseKey, System.currentTimeMillis()), false, false); |
93 DataSegment.getLocal().put(msg.key, rData, null); | |
419 | 94 break; |
95 default: | |
96 break; | |
97 } | |
98 } catch (ClosedChannelException e) { | |
99 return; | |
100 } catch (EOFException e) { | |
101 return; | |
102 } catch (IOException e) { | |
478 | 103 return; |
419 | 104 } |
105 } | |
467 | 106 |
419 | 107 } |
345 | 108 |
443 | 109 private byte[] getSerializedByteArray(Unpacker unpacker) { |
110 int len; | |
111 byte[] b = null; | |
112 try { | |
113 len = unpacker.readInt(); | |
114 b = ((MessagePackUnpacker) unpacker).getSerializedByteArray(len); | |
115 } catch (IOException e) { | |
116 e.printStackTrace(); | |
117 } | |
118 return b; | |
119 } | |
480 | 120 |
121 public void setReverseKey(String name) { | |
122 this.reverseKey = name; | |
123 setName(name+"-IncomingTcp"); | |
124 } | |
345 | 125 } |