Mercurial > hg > Database > Alice
comparison src/main/java/alice/daemon/IncomingUdpConnection.java @ 443:2f2623484b77 dispose
change protocol
author | sugi |
---|---|
date | Sat, 18 Oct 2014 19:30:13 +0900 |
parents | 0c24894db37e |
children | d30451d1882f |
comparison
equal
deleted
inserted
replaced
442:2338b1ef29e8 | 443:2f2623484b77 |
---|---|
14 import alice.datasegment.DataSegmentManager; | 14 import alice.datasegment.DataSegmentManager; |
15 import alice.topology.manager.keeparive.RespondData; | 15 import alice.topology.manager.keeparive.RespondData; |
16 | 16 |
17 public class IncomingUdpConnection extends IncomingTcpConnection { | 17 public class IncomingUdpConnection extends IncomingTcpConnection { |
18 // receive Data set into LocalDataSegment now but need to set into MulticastDataSegment. | 18 // receive Data set into LocalDataSegment now but need to set into MulticastDataSegment. |
19 // and this implement has problem. If over 4096 data receive, can not read. | 19 // and this implement has problem. If over 65507 data receive, can not read. |
20 // but Max data length is 65507 because of the max length of UDP payload | |
20 | 21 |
21 public MulticastConnection receiver; | 22 public MulticastConnection receiver; |
22 public MulticastConnection sender; | 23 public MulticastConnection sender; |
23 | 24 |
24 public IncomingUdpConnection(MulticastConnection s, MulticastConnection r, DataSegmentManager manager) { | 25 public IncomingUdpConnection(MulticastConnection s, MulticastConnection r, DataSegmentManager manager) { |
28 } | 29 } |
29 | 30 |
30 @Override | 31 @Override |
31 public void run() { | 32 public void run() { |
32 while (true){ | 33 while (true){ |
33 try { | 34 try { |
34 ByteBuffer receive = ByteBuffer.allocate(4096); | 35 // Max data length is 65507 because of the max length of UDP payload |
36 ByteBuffer receive = ByteBuffer.allocate(65507); | |
35 receiver.receive(receive); | 37 receiver.receive(receive); |
36 Unpacker unpacker = SingletonMessage.getInstance().createBufferUnpacker(receive); | 38 Unpacker unpacker = SingletonMessage.getInstance().createBufferUnpacker(receive); |
37 receive.flip(); | 39 receive.flip(); |
38 CommandMessage msg = unpacker.read(CommandMessage.class); | 40 CommandMessage msg = unpacker.read(CommandMessage.class); |
41 byte[] val = unpacker.readByteArray(); | |
39 CommandType type = CommandType.getCommandTypeFromId(msg.type); | 42 CommandType type = CommandType.getCommandTypeFromId(msg.type); |
40 switch (type) { | 43 switch (type) { |
41 case UPDATE: | 44 case UPDATE: |
42 getLocalDataSegmentManager().getDataSegmentKey(msg.key) | 45 getLocalDataSegmentManager().getDataSegmentKey(msg.key) |
43 .runCommand(new Command(type, null, null, msg.val, 0, 0, null, null, reverseKey)); | 46 .runCommand(new Command(type, null, null, val, 0, 0, null, null, reverseKey)); |
44 break; | 47 break; |
45 case PUT: | 48 case PUT: |
46 getLocalDataSegmentManager().getDataSegmentKey(msg.key) | 49 getLocalDataSegmentManager().getDataSegmentKey(msg.key) |
47 .runCommand(new Command(type, null, null, msg.val, 0, 0, null, null, reverseKey)); | 50 .runCommand(new Command(type, null, null, val, 0, 0, null, null, reverseKey)); |
48 break; | 51 break; |
49 case PEEK: | 52 case PEEK: |
50 getLocalDataSegmentManager().getDataSegmentKey(msg.key) | 53 getLocalDataSegmentManager().getDataSegmentKey(msg.key) |
51 .runCommand(new Command(type, null, null, null, msg.index, msg.seq, sender, null, null, msg.flag)); | 54 .runCommand(new Command(type, null, null, null, msg.index, msg.seq, sender, null, null, msg.flag)); |
52 break; | 55 break; |
58 getLocalDataSegmentManager().getDataSegmentKey(msg.key) | 61 getLocalDataSegmentManager().getDataSegmentKey(msg.key) |
59 .runCommand(new Command(type, null, null, null, 0, 0, null, null, null)); | 62 .runCommand(new Command(type, null, null, null, 0, 0, null, null, null)); |
60 break; | 63 break; |
61 case REPLY: | 64 case REPLY: |
62 Command cmd = manager.getAndRemoveCmd(msg.seq); | 65 Command cmd = manager.getAndRemoveCmd(msg.seq); |
63 cmd.cs.ids.reply(cmd.receiver, new Command(type, null, null, msg.val, msg.index, msg.seq, null, null, null)); | 66 cmd.cs.ids.reply(cmd.receiver, new Command(type, null, null, val, msg.index, msg.seq, null, null, null)); |
64 cmd=null; | 67 cmd=null; |
65 break; | 68 break; |
66 case PING: | 69 case PING: |
67 DataSegment.get(reverseKey).response(msg.key); | 70 DataSegment.get(reverseKey).response(msg.key); |
68 break; | 71 break; |