comparison src/main/java/alice/daemon/IncomingUdpConnection.java @ 443:2f2623484b77 dispose

change protocol
author sugi
date Sat, 18 Oct 2014 19:30:13 +0900
parents 0c24894db37e
children d30451d1882f
comparison
equal deleted inserted replaced
442:2338b1ef29e8 443:2f2623484b77
14 import alice.datasegment.DataSegmentManager; 14 import alice.datasegment.DataSegmentManager;
15 import alice.topology.manager.keeparive.RespondData; 15 import alice.topology.manager.keeparive.RespondData;
16 16
17 public class IncomingUdpConnection extends IncomingTcpConnection { 17 public class IncomingUdpConnection extends IncomingTcpConnection {
18 // receive Data set into LocalDataSegment now but need to set into MulticastDataSegment. 18 // receive Data set into LocalDataSegment now but need to set into MulticastDataSegment.
19 // and this implement has problem. If over 4096 data receive, can not read. 19 // and this implement has problem. If over 65507 data receive, can not read.
20 // but Max data length is 65507 because of the max length of UDP payload
20 21
21 public MulticastConnection receiver; 22 public MulticastConnection receiver;
22 public MulticastConnection sender; 23 public MulticastConnection sender;
23 24
24 public IncomingUdpConnection(MulticastConnection s, MulticastConnection r, DataSegmentManager manager) { 25 public IncomingUdpConnection(MulticastConnection s, MulticastConnection r, DataSegmentManager manager) {
28 } 29 }
29 30
30 @Override 31 @Override
31 public void run() { 32 public void run() {
32 while (true){ 33 while (true){
33 try { 34 try {
34 ByteBuffer receive = ByteBuffer.allocate(4096); 35 // Max data length is 65507 because of the max length of UDP payload
36 ByteBuffer receive = ByteBuffer.allocate(65507);
35 receiver.receive(receive); 37 receiver.receive(receive);
36 Unpacker unpacker = SingletonMessage.getInstance().createBufferUnpacker(receive); 38 Unpacker unpacker = SingletonMessage.getInstance().createBufferUnpacker(receive);
37 receive.flip(); 39 receive.flip();
38 CommandMessage msg = unpacker.read(CommandMessage.class); 40 CommandMessage msg = unpacker.read(CommandMessage.class);
41 byte[] val = unpacker.readByteArray();
39 CommandType type = CommandType.getCommandTypeFromId(msg.type); 42 CommandType type = CommandType.getCommandTypeFromId(msg.type);
40 switch (type) { 43 switch (type) {
41 case UPDATE: 44 case UPDATE:
42 getLocalDataSegmentManager().getDataSegmentKey(msg.key) 45 getLocalDataSegmentManager().getDataSegmentKey(msg.key)
43 .runCommand(new Command(type, null, null, msg.val, 0, 0, null, null, reverseKey)); 46 .runCommand(new Command(type, null, null, val, 0, 0, null, null, reverseKey));
44 break; 47 break;
45 case PUT: 48 case PUT:
46 getLocalDataSegmentManager().getDataSegmentKey(msg.key) 49 getLocalDataSegmentManager().getDataSegmentKey(msg.key)
47 .runCommand(new Command(type, null, null, msg.val, 0, 0, null, null, reverseKey)); 50 .runCommand(new Command(type, null, null, val, 0, 0, null, null, reverseKey));
48 break; 51 break;
49 case PEEK: 52 case PEEK:
50 getLocalDataSegmentManager().getDataSegmentKey(msg.key) 53 getLocalDataSegmentManager().getDataSegmentKey(msg.key)
51 .runCommand(new Command(type, null, null, null, msg.index, msg.seq, sender, null, null, msg.flag)); 54 .runCommand(new Command(type, null, null, null, msg.index, msg.seq, sender, null, null, msg.flag));
52 break; 55 break;
58 getLocalDataSegmentManager().getDataSegmentKey(msg.key) 61 getLocalDataSegmentManager().getDataSegmentKey(msg.key)
59 .runCommand(new Command(type, null, null, null, 0, 0, null, null, null)); 62 .runCommand(new Command(type, null, null, null, 0, 0, null, null, null));
60 break; 63 break;
61 case REPLY: 64 case REPLY:
62 Command cmd = manager.getAndRemoveCmd(msg.seq); 65 Command cmd = manager.getAndRemoveCmd(msg.seq);
63 cmd.cs.ids.reply(cmd.receiver, new Command(type, null, null, msg.val, msg.index, msg.seq, null, null, null)); 66 cmd.cs.ids.reply(cmd.receiver, new Command(type, null, null, val, msg.index, msg.seq, null, null, null));
64 cmd=null; 67 cmd=null;
65 break; 68 break;
66 case PING: 69 case PING:
67 DataSegment.get(reverseKey).response(msg.key); 70 DataSegment.get(reverseKey).response(msg.key);
68 break; 71 break;