diff src/alice/datasegment/RemoteDataSegmentManager.java @ 14:e3f1b21718b0

implements RemoteDataSegment
author kazz <kazz@cr.ie.u-ryukyu.ac.jp>
date Sun, 15 Jan 2012 00:56:25 +0900
parents src/alice/datasegment/RemoteDataSegment.java@30f97d776a3e
children 72dd27d952b0
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/alice/datasegment/RemoteDataSegmentManager.java	Sun Jan 15 00:56:25 2012 +0900
@@ -0,0 +1,52 @@
+package alice.datasegment;
+
+import org.msgpack.type.Value;
+
+import alice.codesegment.CodeSegment;
+import alice.daemon.Connection;
+import alice.daemon.IncomingTcpConnection;
+import alice.daemon.OutboundTcpConnection;
+
+public class RemoteDataSegmentManager extends DataSegmentManager {
+	
+	Connection connection;
+	
+	public RemoteDataSegmentManager(Connection connection) {
+		this.connection = connection;
+		new IncomingTcpConnection(connection, this).start();
+		new OutboundTcpConnection(connection).start();
+		new Thread(replyThread).start();
+	}
+	
+	@Override
+	public void put(String key, Value val) {
+		connection.sendCommand(new Command(CommandType.PUT, null, key, val, 0, 0, null, null));
+	}
+
+	@Override
+	public void update(String key, Value val) {
+		connection.sendCommand(new Command(CommandType.UPDATE, null, key, val, 0, 0, null, null));
+	}
+
+	@Override
+	public void take(String argKey, String key, int index, CodeSegment cs) {
+		int seq = this.seq.getAndIncrement();
+		Command cmd = new Command(CommandType.TAKE, argKey, key, null, index, seq, replyQueue, cs);
+		seqHash.put(seq, cmd);
+		connection.sendCommand(cmd);		
+	}
+
+	@Override
+	public void peek(String argKey, String key, int index, CodeSegment cs) {
+		int seq = this.seq.getAndIncrement();
+		Command cmd = new Command(CommandType.PEEK, argKey, key, null, index, seq, replyQueue, cs);
+		seqHash.put(seq, cmd);
+		connection.sendCommand(cmd);
+	}
+
+	@Override
+	public void remove(String key) {
+		connection.sendCommand(new Command(CommandType.REMOVE, null, key, null, 0, 0, null, null));
+	}
+
+}