Mercurial > hg > Database > Alice
view src/main/java/alice/daemon/IncomingUdpConnection.java @ 525:30a74eee59c7 dispose
working TestRemoteAlice
author | Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp> |
---|---|
date | Thu, 16 Apr 2015 20:33:53 +0900 |
parents | 145c425db88d |
children | 928907206d21 |
line wrap: on
line source
package alice.daemon; import java.io.EOFException; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.ClosedChannelException; import org.msgpack.unpacker.Unpacker; import alice.codesegment.SingletonMessage; import alice.datasegment.Command; import alice.datasegment.CommandType; import alice.datasegment.DataSegment; import alice.datasegment.DataSegmentManager; import alice.datasegment.ReceiveData; import alice.topology.manager.keeparive.RespondData; public class IncomingUdpConnection extends IncomingTcpConnection { // receive Data set into LocalDataSegment now but need to set into MulticastDataSegment. // and this implement has problem. If over 65507 data receive, can not read. // but Max data length is 65507 because of the max length of UDP payload public MulticastConnection receiver; public MulticastConnection sender; public IncomingUdpConnection(MulticastConnection s, MulticastConnection r, DataSegmentManager manager) { super(manager); receiver = r; sender = s; reverseKey = "multicast"; } @Override public void run() { while (true){ try { Command cmd = null; ReceiveData rData = null; // Max data length is 65507 because of the max length of UDP payload ByteBuffer receive = ByteBuffer.allocate(65507); receiver.receive(receive); Unpacker unpacker = SingletonMessage.getInstance().createBufferUnpacker(receive); receive.flip(); CommandMessage msg = unpacker.read(CommandMessage.class); CommandType type = CommandType.getCommandTypeFromId(msg.type); switch (type) { case UPDATE: case PUT: rData = new ReceiveData(unpacker.getSerializedByteArray(unpacker.readInt())); rData.setCompressFlag(msg.compressed); cmd = new Command(type, null, null, rData, 0, 0, null, null, reverseKey); getLocalDataSegmentManager().getDataSegmentKey(msg.key).runCommand(cmd); break; case PEEK: case TAKE: cmd = new Command(type, null, null, null, msg.index, msg.seq, null, null ,sender); cmd.setQuickFlag(msg.quickFlag); getLocalDataSegmentManager().getDataSegmentKey(msg.key).runCommand(cmd); break; case REMOVE: cmd = new Command(type, null, null, null, 0, 0, null, null, ""); getLocalDataSegmentManager().getDataSegmentKey(msg.key).runCommand(cmd); break; case REPLY: cmd = manager.getAndRemoveCmd(msg.seq); rData = new ReceiveData(unpacker.getSerializedByteArray(unpacker.readInt())); rData.setCompressFlag(msg.compressed); Command rCmd = new Command(type, null, null, rData, msg.index, msg.seq, null, null, ""); cmd.cs.ids.reply(cmd.receiver, rCmd); break; case PING: if (DataSegment.contains(reverseKey)) DataSegment.get(reverseKey).response(msg.key); break; case RESPONSE: rData = new ReceiveData(new RespondData(reverseKey, System.currentTimeMillis())); DataSegment.getLocal().put(msg.key, rData, false); break; default: break; } } catch (ClosedChannelException e) { return; } catch (EOFException e) { return; } catch (IOException e) { e.printStackTrace(); } } } }