Mercurial > hg > Database > Alice
annotate src/alice/daemon/IncomingTcpConnection.java @ 44:73158dc54c59
fix peek api
author | kazz <kazz@cr.ie.u-ryukyu.ac.jp> |
---|---|
date | Sat, 04 Feb 2012 01:34:57 +0900 |
parents | ff33af300567 |
children | 919389066887 |
rev | line source |
---|---|
13 | 1 package alice.daemon; |
2 | |
3 import java.io.IOException; | |
4 import java.nio.ByteBuffer; | |
42 | 5 import java.nio.channels.ClosedChannelException; |
13 | 6 import java.nio.channels.SocketChannel; |
7 | |
8 import org.msgpack.MessagePack; | |
9 | |
10 import alice.datasegment.Command; | |
11 import alice.datasegment.CommandType; | |
12 import alice.datasegment.DataSegment; | |
13 import alice.datasegment.DataSegmentKey; | |
14
e3f1b21718b0
implements RemoteDataSegment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
13
diff
changeset
|
14 import alice.datasegment.DataSegmentManager; |
13 | 15 import alice.datasegment.LocalDataSegmentManager; |
16 | |
17 public class IncomingTcpConnection extends Thread { | |
18 | |
19 public Connection connection; | |
14
e3f1b21718b0
implements RemoteDataSegment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
13
diff
changeset
|
20 public DataSegmentManager manager; |
28
98ab26e09a98
Configuration Manager work and implements reverseKey
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
17
diff
changeset
|
21 public String reverseKey; |
98ab26e09a98
Configuration Manager work and implements reverseKey
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
17
diff
changeset
|
22 public IncomingTcpConnection(Connection connection, DataSegmentManager manager, String reverseKey) { |
14
e3f1b21718b0
implements RemoteDataSegment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
13
diff
changeset
|
23 this.manager = manager; |
13 | 24 this.connection = connection; |
28
98ab26e09a98
Configuration Manager work and implements reverseKey
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
17
diff
changeset
|
25 this.reverseKey = reverseKey; |
13 | 26 } |
27 | |
28 public void run() { | |
29 MessagePack msgpack = new MessagePack(); | |
30 while (true) { | |
31 SocketChannel ch = connection.socket.getChannel(); | |
32 ByteBuffer buf = ByteBuffer.allocateDirect(4); // for int | |
33 try { | |
34 int allReadLen = 0; | |
35 do { | |
36 int readLen = ch.read(buf); | |
16 | 37 if (readLen < 0) return; |
13 | 38 allReadLen += readLen; |
39 } while (allReadLen < 4); | |
40 buf.rewind(); | |
41 int msgLen = buf.getInt(); | |
42 allReadLen = 0; | |
43 ByteBuffer msgBuf = ByteBuffer.allocateDirect(msgLen); | |
44 do { | |
45 int readLen = ch.read(msgBuf); | |
16 | 46 if (readLen < 0) return; |
13 | 47 allReadLen += readLen; |
48 } while (allReadLen < msgLen); | |
16 | 49 msgBuf.flip(); |
13 | 50 CommandMessage msg = msgpack.read(msgBuf, CommandMessage.class); |
16 | 51 msgBuf.flip(); |
13 | 52 CommandType type = CommandType.getCommandTypeFromId(msg.type); |
16 | 53 LocalDataSegmentManager lmanager = (LocalDataSegmentManager)DataSegment.get("local"); |
14
e3f1b21718b0
implements RemoteDataSegment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
13
diff
changeset
|
54 DataSegmentKey dsKey = lmanager.getDataSegmentKey(msg.key); |
13 | 55 switch (type) { |
56 case UPDATE: | |
28
98ab26e09a98
Configuration Manager work and implements reverseKey
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
17
diff
changeset
|
57 dsKey.addCommand(new Command(type, null, null, msg.val, 0, 0, null, null, reverseKey)); |
13 | 58 break; |
59 case PUT: | |
28
98ab26e09a98
Configuration Manager work and implements reverseKey
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
17
diff
changeset
|
60 dsKey.addCommand(new Command(type, null, null, msg.val, 0, 0, null, null, reverseKey)); |
13 | 61 break; |
62 case PEEK: | |
63 //Command(CommandType cmdType, String argKey, Value val, int index, int seq, BlockingQueue<Command> replyQueue, CodeSegment cs) { | |
28
98ab26e09a98
Configuration Manager work and implements reverseKey
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
17
diff
changeset
|
64 dsKey.addCommand(new Command(type, null, null, null, msg.index, msg.seq, connection.sendQueue, null, null)); |
13 | 65 break; |
66 case TAKE: | |
28
98ab26e09a98
Configuration Manager work and implements reverseKey
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
17
diff
changeset
|
67 dsKey.addCommand(new Command(type, null, null, null, msg.index, msg.seq, connection.sendQueue, null, null)); |
13 | 68 break; |
69 case REMOVE: | |
28
98ab26e09a98
Configuration Manager work and implements reverseKey
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
17
diff
changeset
|
70 dsKey.addCommand(new Command(type, null, null, null, 0, 0, null, null, null)); |
13 | 71 break; |
72 case REPLY: | |
73 try { | |
28
98ab26e09a98
Configuration Manager work and implements reverseKey
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
17
diff
changeset
|
74 manager.replyQueue.put(new Command(type, null, null, msg.val, msg.index, msg.seq, null, null, null)); |
13 | 75 } catch (InterruptedException e) { |
76 e.printStackTrace(); | |
77 } | |
78 break; | |
79 default: | |
80 break; | |
81 } | |
42 | 82 } catch (ClosedChannelException e) { |
43 | 83 try { |
84 connection.sendQueue.put(new Command(CommandType.CLOSE, null, null, null, 0, 0, null, null, null)); | |
85 } catch (InterruptedException e1) { | |
86 e1.printStackTrace(); | |
87 } | |
42 | 88 return; |
13 | 89 } catch (IOException e) { |
90 e.printStackTrace(); | |
91 } | |
92 } | |
93 } | |
94 | |
95 } |