Mercurial > hg > Database > Alice
changeset 23:54bf607118ae
change method to create RemoteDSM
author | kazz <kazz@cr.ie.u-ryukyu.ac.jp> |
---|---|
date | Tue, 17 Jan 2012 01:10:29 +0900 |
parents | 2ca2d961a8d2 |
children | ebd91e607b63 |
files | src/alice/daemon/Config.java src/alice/daemon/Connection.java src/alice/datasegment/DataSegment.java src/alice/datasegment/RemoteDataSegmentManager.java src/topology/manager/IncomingHosts.java |
diffstat | 5 files changed, 63 insertions(+), 3 deletions(-) [+] |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/alice/daemon/Config.java Tue Jan 17 01:10:29 2012 +0900 @@ -0,0 +1,16 @@ +package alice.daemon; + +public class Config { + + public int localPort = 10000; + + public Config(String[] args) { + for (int i = 0; i< args.length; i++) { + if ("-p".equals(args[i])) { + localPort = Integer.parseInt(args[++i]); + return; + } + } + } + +}
--- a/src/alice/daemon/Connection.java Tue Jan 17 00:40:27 2012 +0900 +++ b/src/alice/daemon/Connection.java Tue Jan 17 01:10:29 2012 +0900 @@ -14,6 +14,8 @@ this.socket = socket; } + public Connection() { } + public void sendCommand(Command cmd) { try { sendQueue.put(cmd);
--- a/src/alice/datasegment/DataSegment.java Tue Jan 17 00:40:27 2012 +0900 +++ b/src/alice/datasegment/DataSegment.java Tue Jan 17 01:10:29 2012 +0900 @@ -19,4 +19,10 @@ dataSegment.dataSegmentManageres.put(key, manager); } + public static RemoteDataSegmentManager connect(String key, String hostName, int port) { + RemoteDataSegmentManager manager = new RemoteDataSegmentManager(key, hostName, port); + regist(key, manager); + return manager; + } + }
--- a/src/alice/datasegment/RemoteDataSegmentManager.java Tue Jan 17 00:40:27 2012 +0900 +++ b/src/alice/datasegment/RemoteDataSegmentManager.java Tue Jan 17 01:10:29 2012 +0900 @@ -1,5 +1,9 @@ package alice.datasegment; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.channels.SocketChannel; + import org.msgpack.type.Value; import alice.codesegment.CodeSegment; @@ -11,6 +15,8 @@ Connection connection; + // TODO: delete this constructor later + @Deprecated public RemoteDataSegmentManager(Connection connection) { this.connection = connection; new IncomingTcpConnection(connection, this).start(); @@ -20,6 +26,34 @@ + ":" + connection.socket.getPort()).start(); } + public RemoteDataSegmentManager(String key, final String hostName, final int port) { + connection = new Connection(); + final RemoteDataSegmentManager manager = this; + new Thread(replyThread, "RemoteDataSegmentManager-" + + connection.socket.getInetAddress().getHostName() + + ":" + connection.socket.getPort()).start(); + new Thread("Connect-" + key) { + public void run() { + boolean connect = true; + do { + try { + SocketChannel sc = SocketChannel.open(new InetSocketAddress(hostName, port)); + connection.socket = sc.socket(); + connect = false; + } catch (IOException e) { + try { + Thread.sleep(500); + } catch (InterruptedException e1) { + e1.printStackTrace(); + } + } + } while (connect); + new IncomingTcpConnection(connection, manager).start(); + new OutboundTcpConnection(connection).start(); + } + }.start(); + } + @Override public void put(String key, Value val) { connection.sendCommand(new Command(CommandType.PUT, null, key, val, 0, 0, null, null));
--- a/src/topology/manager/IncomingHosts.java Tue Jan 17 00:40:27 2012 +0900 +++ b/src/topology/manager/IncomingHosts.java Tue Jan 17 01:10:29 2012 +0900 @@ -6,9 +6,12 @@ import org.apache.log4j.Logger; import org.msgpack.MessagePack; +import org.msgpack.type.ValueFactory; import alice.codesegment.CodeSegment; import alice.datasegment.CommandType; +import alice.datasegment.DataSegment; +import alice.datasegment.DataSegmentManager; import alice.datasegment.DataSegmentReceiver; import alice.topology.HostMessage; @@ -30,9 +33,8 @@ try { HostMessage host = msgpack.convert(this.host.val, HostMessage.class); String nodeName = nodeNames.poll(); - - // TODO: send nodeName to node - + DataSegmentManager manager = DataSegment.connect(nodeName, host.name, host.port); + manager.put("host", ValueFactory.createRawValue(nodeName)); } catch (IOException e) { logger.error("HostMessage format error"); e.printStackTrace();