Mercurial > hg > Database > Alice
view src/alice/daemon/IncomingTcpConnection.java @ 28:98ab26e09a98
Configuration Manager work and implements reverseKey
author | kazz <kazz@cr.ie.u-ryukyu.ac.jp> |
---|---|
date | Tue, 17 Jan 2012 08:41:34 +0900 |
parents | bb075e103cd3 |
children | 92aeb6e34683 |
line wrap: on
line source
package alice.daemon; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.SocketChannel; import org.msgpack.MessagePack; import alice.datasegment.Command; import alice.datasegment.CommandType; import alice.datasegment.DataSegment; import alice.datasegment.DataSegmentKey; import alice.datasegment.DataSegmentManager; import alice.datasegment.LocalDataSegmentManager; public class IncomingTcpConnection extends Thread { public Connection connection; public DataSegmentManager manager; public String reverseKey; public IncomingTcpConnection(Connection connection, DataSegmentManager manager, String reverseKey) { this.manager = manager; this.connection = connection; this.reverseKey = reverseKey; } public void run() { MessagePack msgpack = new MessagePack(); while (true) { SocketChannel ch = connection.socket.getChannel(); ByteBuffer buf = ByteBuffer.allocateDirect(4); // for int try { int allReadLen = 0; do { int readLen = ch.read(buf); if (readLen < 0) return; allReadLen += readLen; } while (allReadLen < 4); buf.rewind(); int msgLen = buf.getInt(); allReadLen = 0; ByteBuffer msgBuf = ByteBuffer.allocateDirect(msgLen); do { int readLen = ch.read(msgBuf); if (readLen < 0) return; allReadLen += readLen; } while (allReadLen < msgLen); msgBuf.flip(); CommandMessage msg = msgpack.read(msgBuf, CommandMessage.class); msgBuf.flip(); CommandType type = CommandType.getCommandTypeFromId(msg.type); LocalDataSegmentManager lmanager = (LocalDataSegmentManager)DataSegment.get("local"); DataSegmentKey dsKey = lmanager.getDataSegmentKey(msg.key); switch (type) { case UPDATE: dsKey.addCommand(new Command(type, null, null, msg.val, 0, 0, null, null, reverseKey)); break; case PUT: dsKey.addCommand(new Command(type, null, null, msg.val, 0, 0, null, null, reverseKey)); break; case PEEK: //Command(CommandType cmdType, String argKey, Value val, int index, int seq, BlockingQueue<Command> replyQueue, CodeSegment cs) { dsKey.addCommand(new Command(type, null, null, null, msg.index, msg.seq, connection.sendQueue, null, null)); break; case TAKE: dsKey.addCommand(new Command(type, null, null, null, msg.index, msg.seq, connection.sendQueue, null, null)); break; case REMOVE: dsKey.addCommand(new Command(type, null, null, null, 0, 0, null, null, null)); break; case REPLY: try { manager.replyQueue.put(new Command(type, null, null, msg.val, msg.index, msg.seq, null, null, null)); } catch (InterruptedException e) { e.printStackTrace(); } break; default: break; } } catch (IOException e) { e.printStackTrace(); } } } }