Mercurial > hg > Database > Alice
changeset 454:f8a8f869f016 dispose
bug fix
author | sugi |
---|---|
date | Tue, 28 Oct 2014 17:34:26 +0900 |
parents | 8470db2523d5 |
children | b004f62b83e5 |
files | src/main/java/alice/daemon/IncomingUdpConnection.java |
diffstat | 1 files changed, 75 insertions(+), 68 deletions(-) [+] |
line wrap: on
line diff
--- a/src/main/java/alice/daemon/IncomingUdpConnection.java Tue Oct 28 17:25:30 2014 +0900 +++ b/src/main/java/alice/daemon/IncomingUdpConnection.java Tue Oct 28 17:34:26 2014 +0900 @@ -15,75 +15,82 @@ import alice.topology.manager.keeparive.RespondData; public class IncomingUdpConnection extends IncomingTcpConnection { - // receive Data set into LocalDataSegment now but need to set into MulticastDataSegment. - // and this implement has problem. If over 65507 data receive, can not read. + // receive Data set into LocalDataSegment now but need to set into MulticastDataSegment. + // and this implement has problem. If over 65507 data receive, can not read. // but Max data length is 65507 because of the max length of UDP payload - - public MulticastConnection receiver; - public MulticastConnection sender; + + public MulticastConnection receiver; + public MulticastConnection sender; + + public IncomingUdpConnection(MulticastConnection s, MulticastConnection r, DataSegmentManager manager) { + super(manager); + receiver = r; + sender = s; + reverseKey = "multicast"; + } - public IncomingUdpConnection(MulticastConnection s, MulticastConnection r, DataSegmentManager manager) { - super(manager); - receiver = r; - sender = s; - reverseKey = "multicast"; - } - - @Override - public void run() { - while (true){ - try { - Command cmd = null; - byte[] val = null; - // Max data length is 65507 because of the max length of UDP payload - ByteBuffer receive = ByteBuffer.allocate(65507); - receiver.receive(receive); - Unpacker unpacker = SingletonMessage.getInstance().createBufferUnpacker(receive); - receive.flip(); - CommandMessage msg = unpacker.read(CommandMessage.class); - CommandType type = CommandType.getCommandTypeFromId(msg.type); - switch (type) { - case UPDATE: - case PUT: - val = new byte[unpacker.readInt()]; - receive.get(val); - cmd = new Command(type, null, null, val, 0, 0, null, null, reverseKey); - getLocalDataSegmentManager().getDataSegmentKey(msg.key).runCommand(cmd); - break; - case PEEK: - case TAKE: - cmd = new Command(type, null, null, null, msg.index, msg.seq, null, null ,sender); - cmd.setQuickFlag(msg.quickFlag); - getLocalDataSegmentManager().getDataSegmentKey(msg.key).runCommand(cmd); - break; - case REMOVE: - cmd = new Command(type, null, null, null, 0, 0, null, null, ""); - getLocalDataSegmentManager().getDataSegmentKey(msg.key).runCommand(cmd); - break; - case REPLY: - cmd = manager.getAndRemoveCmd(msg.seq); - val = new byte[unpacker.readInt()]; - receive.get(val); - cmd.cs.ids.reply(cmd.receiver, new Command(type, null, null, val, msg.index, msg.seq, null, null, "")); - break; - case PING: - DataSegment.get(reverseKey).response(msg.key); - break; - case RESPONSE: - DataSegment.getLocal().put(msg.key, new RespondData(reverseKey, System.currentTimeMillis())); - break; - default: - break; - } - - } catch (ClosedChannelException e) { - return; - } catch (EOFException e) { - return; - } catch (IOException e) { - e.printStackTrace(); - } - } - } + @Override + public void run() { + while (true){ + try { + Command cmd = null; + byte[] val = null; + // Max data length is 65507 because of the max length of UDP payload + ByteBuffer receive = ByteBuffer.allocate(65507); + receiver.receive(receive); + Unpacker unpacker = SingletonMessage.getInstance().createBufferUnpacker(receive); + receive.flip(); + CommandMessage msg = unpacker.read(CommandMessage.class); + CommandType type = CommandType.getCommandTypeFromId(msg.type); + switch (type) { + case UPDATE: + case PUT: + val = new byte[unpacker.readInt()]; + receive.get(val); + cmd = new Command(type, null, null, val, 0, 0, null, null, reverseKey); + // these flags express DataSegment status + cmd.setCompressFlag(msg.compressed); + cmd.setSerializeFlag(msg.serialized); + getLocalDataSegmentManager().getDataSegmentKey(msg.key).runCommand(cmd); + break; + case PEEK: + case TAKE: + cmd = new Command(type, null, null, null, msg.index, msg.seq, null, null ,sender); + cmd.setQuickFlag(msg.quickFlag); + getLocalDataSegmentManager().getDataSegmentKey(msg.key).runCommand(cmd); + break; + case REMOVE: + cmd = new Command(type, null, null, null, 0, 0, null, null, ""); + getLocalDataSegmentManager().getDataSegmentKey(msg.key).runCommand(cmd); + break; + case REPLY: + cmd = manager.getAndRemoveCmd(msg.seq); + val = new byte[unpacker.readInt()]; + receive.get(val); + Command rCmd = new Command(type, null, null, val, msg.index, msg.seq, null, null, ""); + // these flags express DataSegment status + rCmd.setCompressFlag(msg.compressed); + rCmd.setSerializeFlag(msg.serialized); + cmd.cs.ids.reply(cmd.receiver, rCmd); + break; + case PING: + DataSegment.get(reverseKey).response(msg.key); + break; + case RESPONSE: + DataSegment.getLocal().put(msg.key, new RespondData(reverseKey, System.currentTimeMillis())); + break; + default: + break; + } + + } catch (ClosedChannelException e) { + return; + } catch (EOFException e) { + return; + } catch (IOException e) { + e.printStackTrace(); + } + } + } }