# HG changeset patch # User kazz # Date 1326757294 -32400 # Node ID 98ab26e09a98067d7ffc7cc190ba71ed1401e222 # Parent f54dcbebde3a365cf8f3587807624d4ad6dfc596 Configuration Manager work and implements reverseKey diff -r f54dcbebde3a -r 98ab26e09a98 build.xml --- a/build.xml Tue Jan 17 03:52:39 2012 +0900 +++ b/build.xml Tue Jan 17 08:41:34 2012 +0900 @@ -37,6 +37,7 @@ + diff -r f54dcbebde3a -r 98ab26e09a98 src/alice/codesegment/InputDataSegment.java --- a/src/alice/codesegment/InputDataSegment.java Tue Jan 17 03:52:39 2012 +0900 +++ b/src/alice/codesegment/InputDataSegment.java Tue Jan 17 08:41:34 2012 +0900 @@ -35,6 +35,7 @@ public void reply(DataSegmentReceiver receiver, DataSegmentValue val) { receiver.index = val.index; receiver.val = val.val; + receiver.from = val.from; receive(); } diff -r f54dcbebde3a -r 98ab26e09a98 src/alice/daemon/AcceptThread.java --- a/src/alice/daemon/AcceptThread.java Tue Jan 17 03:52:39 2012 +0900 +++ b/src/alice/daemon/AcceptThread.java Tue Jan 17 08:41:34 2012 +0900 @@ -12,7 +12,7 @@ private ServerSocket ss; private Logger log = Logger.getLogger(AcceptThread.class); - + public int counter = 0; public AcceptThread(ServerSocket ss, String name) { super(name); @@ -26,8 +26,13 @@ Socket socket = ss.accept(); log.info("Accept " + socket.getInetAddress().getHostName() + ":" + socket.getPort()); Connection connection = new Connection(socket); - new IncomingTcpConnection(connection, DataSegment.get("local")).start(); + String key = "accept" + counter; + IncomingTcpConnection incoming = + new IncomingTcpConnection(connection, DataSegment.get("local"), key); + incoming.start(); + DataSegment.setAccept(key, incoming); new OutboundTcpConnection(connection).start(); + counter++; } catch (IOException e) { e.printStackTrace(); } diff -r f54dcbebde3a -r 98ab26e09a98 src/alice/daemon/IncomingTcpConnection.java --- a/src/alice/daemon/IncomingTcpConnection.java Tue Jan 17 03:52:39 2012 +0900 +++ b/src/alice/daemon/IncomingTcpConnection.java Tue Jan 17 08:41:34 2012 +0900 @@ -17,10 +17,11 @@ public Connection connection; public DataSegmentManager manager; - - public IncomingTcpConnection(Connection connection, DataSegmentManager manager) { + public String reverseKey; + public IncomingTcpConnection(Connection connection, DataSegmentManager manager, String reverseKey) { this.manager = manager; this.connection = connection; + this.reverseKey = reverseKey; } public void run() { @@ -52,24 +53,24 @@ DataSegmentKey dsKey = lmanager.getDataSegmentKey(msg.key); switch (type) { case UPDATE: - dsKey.addCommand(new Command(type, null, null, msg.val, 0, 0, null, null)); + dsKey.addCommand(new Command(type, null, null, msg.val, 0, 0, null, null, reverseKey)); break; case PUT: - dsKey.addCommand(new Command(type, null, null, msg.val, 0, 0, null, null)); + dsKey.addCommand(new Command(type, null, null, msg.val, 0, 0, null, null, reverseKey)); break; case PEEK: //Command(CommandType cmdType, String argKey, Value val, int index, int seq, BlockingQueue replyQueue, CodeSegment cs) { - dsKey.addCommand(new Command(type, null, null, null, msg.index, msg.seq, connection.sendQueue, null)); + dsKey.addCommand(new Command(type, null, null, null, msg.index, msg.seq, connection.sendQueue, null, null)); break; case TAKE: - dsKey.addCommand(new Command(type, null, null, null, msg.index, msg.seq, connection.sendQueue, null)); + dsKey.addCommand(new Command(type, null, null, null, msg.index, msg.seq, connection.sendQueue, null, null)); break; case REMOVE: - dsKey.addCommand(new Command(type, null, null, null, 0, 0, null, null)); + dsKey.addCommand(new Command(type, null, null, null, 0, 0, null, null, null)); break; case REPLY: try { - manager.replyQueue.put(new Command(type, null, null, msg.val, msg.index, msg.seq, null, null)); + manager.replyQueue.put(new Command(type, null, null, msg.val, msg.index, msg.seq, null, null, null)); } catch (InterruptedException e) { e.printStackTrace(); } diff -r f54dcbebde3a -r 98ab26e09a98 src/alice/datasegment/Command.java --- a/src/alice/datasegment/Command.java Tue Jan 17 03:52:39 2012 +0900 +++ b/src/alice/datasegment/Command.java Tue Jan 17 08:41:34 2012 +0900 @@ -15,8 +15,9 @@ public int seq; public BlockingQueue replyQueue; public CodeSegment cs; + public String reverseKey; - public Command(CommandType cmdType, DataSegmentReceiver receiver, String key, Value val, int index, int seq, BlockingQueue replyQueue, CodeSegment cs) { + public Command(CommandType cmdType, DataSegmentReceiver receiver, String key, Value val, int index, int seq, BlockingQueue replyQueue, CodeSegment cs, String reverseKey) { this.type = cmdType; this.receiver = receiver; this.key = key; @@ -25,6 +26,7 @@ this.seq = seq; this.replyQueue = replyQueue; this.cs = cs; + this.reverseKey = reverseKey; } } diff -r f54dcbebde3a -r 98ab26e09a98 src/alice/datasegment/DataSegment.java --- a/src/alice/datasegment/DataSegment.java Tue Jan 17 03:52:39 2012 +0900 +++ b/src/alice/datasegment/DataSegment.java Tue Jan 17 08:41:34 2012 +0900 @@ -2,10 +2,13 @@ import java.util.concurrent.ConcurrentHashMap; +import alice.daemon.IncomingTcpConnection; + public class DataSegment { private static DataSegment dataSegment = new DataSegment(); private ConcurrentHashMap dataSegmentManageres = new ConcurrentHashMap(); + private ConcurrentHashMap acceptHash = new ConcurrentHashMap(); private DataSegment() { dataSegmentManageres.put("local", new LocalDataSegmentManager()); @@ -19,10 +22,18 @@ dataSegment.dataSegmentManageres.put(key, manager); } - public static RemoteDataSegmentManager connect(String key, String hostName, int port) { - RemoteDataSegmentManager manager = new RemoteDataSegmentManager(key, hostName, port); - regist(key, manager); + public static RemoteDataSegmentManager connect(String connectionKey, String reverseKey, String hostName, int port) { + RemoteDataSegmentManager manager = new RemoteDataSegmentManager(connectionKey, reverseKey, hostName, port); + regist(connectionKey, manager); return manager; } + public static void setAccept(String key, IncomingTcpConnection incoming) { + dataSegment.acceptHash.put(key, incoming); + } + + public static IncomingTcpConnection getAccept(String key) { + return dataSegment.acceptHash.get(key); + } + } diff -r f54dcbebde3a -r 98ab26e09a98 src/alice/datasegment/DataSegmentKey.java --- a/src/alice/datasegment/DataSegmentKey.java Tue Jan 17 03:52:39 2012 +0900 +++ b/src/alice/datasegment/DataSegmentKey.java Tue Jan 17 08:41:34 2012 +0900 @@ -38,14 +38,14 @@ } case PUT: int index = tailIndex.getAndIncrement(); - DataSegmentValue dsv = new DataSegmentValue(index, cmd.val); + DataSegmentValue dsv = new DataSegmentValue(index, cmd.val, cmd.reverseKey); dataList.add(dsv); // run waiting peek and take boolean takeFlag = true; for (Iterator iter = waitList.iterator(); iter.hasNext() && takeFlag; ) { Command waitCmd = iter.next(); if (waitCmd.index < index) { - waitCmd.replyQueue.put(new Command(CommandType.REPLY, null, null, cmd.val, index, waitCmd.seq, null, null)); + waitCmd.replyQueue.put(new Command(CommandType.REPLY, null, null, cmd.val, index, waitCmd.seq, null, null, cmd.reverseKey)); iter.remove(); if (waitCmd.type == CommandType.TAKE) { // delete data, if it run take cmd. dataList.remove(dsv); @@ -61,7 +61,7 @@ } for (DataSegmentValue data : dataList) { if (data.index > cmd.index) { - cmd.replyQueue.put(new Command(CommandType.REPLY, null, null, data.val, data.index, cmd.seq, null, null)); + cmd.replyQueue.put(new Command(CommandType.REPLY, null, null, data.val, data.index, cmd.seq, null, null, data.from)); break; } } @@ -76,7 +76,7 @@ for (Iterator iter = dataList.iterator(); iter.hasNext(); ) { DataSegmentValue data = iter.next(); if (data.index > cmd.index) { - cmd.replyQueue.put(new Command(CommandType.REPLY, null, null, data.val, data.index, cmd.seq, null, null)); + cmd.replyQueue.put(new Command(CommandType.REPLY, null, null, data.val, data.index, cmd.seq, null, null, data.from)); iter.remove(); waitFlag = false; break; diff -r f54dcbebde3a -r 98ab26e09a98 src/alice/datasegment/DataSegmentManager.java --- a/src/alice/datasegment/DataSegmentManager.java Tue Jan 17 03:52:39 2012 +0900 +++ b/src/alice/datasegment/DataSegmentManager.java Tue Jan 17 08:41:34 2012 +0900 @@ -22,7 +22,7 @@ try { Command reply = replyQueue.take(); Command cmd = seqHash.get(reply.seq); - cmd.cs.ids.reply(cmd.receiver, new DataSegmentValue(reply.index, reply.val)); + cmd.cs.ids.reply(cmd.receiver, new DataSegmentValue(reply.index, reply.val, reply.reverseKey)); } catch (InterruptedException e) { e.printStackTrace(); } diff -r f54dcbebde3a -r 98ab26e09a98 src/alice/datasegment/DataSegmentReceiver.java --- a/src/alice/datasegment/DataSegmentReceiver.java Tue Jan 17 03:52:39 2012 +0900 +++ b/src/alice/datasegment/DataSegmentReceiver.java Tue Jan 17 08:41:34 2012 +0900 @@ -8,6 +8,7 @@ public InputDataSegment ids; public int index; public Value val; + public String from; public CommandType type; diff -r f54dcbebde3a -r 98ab26e09a98 src/alice/datasegment/DataSegmentValue.java --- a/src/alice/datasegment/DataSegmentValue.java Tue Jan 17 03:52:39 2012 +0900 +++ b/src/alice/datasegment/DataSegmentValue.java Tue Jan 17 08:41:34 2012 +0900 @@ -6,10 +6,12 @@ public int index; public Value val; + public String from; - public DataSegmentValue(int index, Value val) { + public DataSegmentValue(int index, Value val, String reverseKey) { this.index = index; this.val = val; + this.from = reverseKey; } } diff -r f54dcbebde3a -r 98ab26e09a98 src/alice/datasegment/LocalDataSegmentManager.java --- a/src/alice/datasegment/LocalDataSegmentManager.java Tue Jan 17 03:52:39 2012 +0900 +++ b/src/alice/datasegment/LocalDataSegmentManager.java Tue Jan 17 08:41:34 2012 +0900 @@ -7,6 +7,8 @@ public class LocalDataSegmentManager extends DataSegmentManager { + public String reverseKey = "local"; + public LocalDataSegmentManager() { new Thread(replyThread, "LocalDataSegmentManager").start(); } @@ -27,20 +29,20 @@ @Override public void put(String key, Value val) { DataSegmentKey dataSegmentKey = getDataSegmentKey(key); - dataSegmentKey.addCommand(new Command(CommandType.PUT, null, null, val, 0, 0, replyQueue, null)); + dataSegmentKey.addCommand(new Command(CommandType.PUT, null, null, val, 0, 0, replyQueue, null, reverseKey)); } @Override public void update(String key, Value val) { DataSegmentKey dataSegmentKey = getDataSegmentKey(key); - dataSegmentKey.addCommand(new Command(CommandType.UPDATE, null, null, val, 0, 0, replyQueue, null)); + dataSegmentKey.addCommand(new Command(CommandType.UPDATE, null, null, val, 0, 0, replyQueue, null, reverseKey)); } @Override public void take(DataSegmentReceiver receiver, String key, int index, CodeSegment cs) { DataSegmentKey dataSegmentKey = getDataSegmentKey(key); int seq = this.seq.getAndIncrement(); - Command cmd = new Command(CommandType.TAKE, receiver, null, null, index, seq, replyQueue, cs); + Command cmd = new Command(CommandType.TAKE, receiver, null, null, index, seq, replyQueue, cs, null); seqHash.put(seq, cmd); dataSegmentKey.addCommand(cmd); } @@ -49,7 +51,7 @@ public void peek(DataSegmentReceiver receiver, String key, int index, CodeSegment cs) { DataSegmentKey dataSegmentKey = getDataSegmentKey(key); int seq = this.seq.getAndIncrement(); - Command cmd = new Command(CommandType.PEEK, receiver, null, null, index, seq, replyQueue, cs); + Command cmd = new Command(CommandType.PEEK, receiver, null, null, index, seq, replyQueue, cs, null); seqHash.put(seq, cmd); dataSegmentKey.addCommand(cmd); } @@ -57,7 +59,7 @@ @Override public void remove(String key) { DataSegmentKey dataSegmentKey = getDataSegmentKey(key); - dataSegmentKey.addCommand(new Command(CommandType.REMOVE, null, null, null, 0, 0, replyQueue, null)); + dataSegmentKey.addCommand(new Command(CommandType.REMOVE, null, null, null, 0, 0, replyQueue, null, null)); } } diff -r f54dcbebde3a -r 98ab26e09a98 src/alice/datasegment/RemoteDataSegmentManager.java --- a/src/alice/datasegment/RemoteDataSegmentManager.java Tue Jan 17 03:52:39 2012 +0900 +++ b/src/alice/datasegment/RemoteDataSegmentManager.java Tue Jan 17 08:41:34 2012 +0900 @@ -21,18 +21,18 @@ @Deprecated public RemoteDataSegmentManager(Connection connection) { this.connection = connection; - new IncomingTcpConnection(connection, this).start(); + new IncomingTcpConnection(connection, this, "").start(); new OutboundTcpConnection(connection).start(); new Thread(replyThread, "RemoteDataSegmentManager-" + connection.socket.getInetAddress().getHostName() + ":" + connection.socket.getPort()).start(); } - public RemoteDataSegmentManager(String key, final String hostName, final int port) { + public RemoteDataSegmentManager(String connectionKey, final String reverseKey, final String hostName, final int port) { connection = new Connection(); final RemoteDataSegmentManager manager = this; - new Thread(replyThread, "RemoteDataSegmentManager-" + key).start(); - new Thread("Connect-" + key) { + new Thread(replyThread, "RemoteDataSegmentManager-" + connectionKey).start(); + new Thread("Connect-" + connectionKey) { public void run() { boolean connect = true; do { @@ -51,7 +51,7 @@ } } } while (connect); - new IncomingTcpConnection(connection, manager).start(); + new IncomingTcpConnection(connection, manager, reverseKey).start(); new OutboundTcpConnection(connection).start(); } }.start(); @@ -59,18 +59,18 @@ @Override public void put(String key, Value val) { - connection.sendCommand(new Command(CommandType.PUT, null, key, val, 0, 0, null, null)); + connection.sendCommand(new Command(CommandType.PUT, null, key, val, 0, 0, null, null, null)); } @Override public void update(String key, Value val) { - connection.sendCommand(new Command(CommandType.UPDATE, null, key, val, 0, 0, null, null)); + connection.sendCommand(new Command(CommandType.UPDATE, null, key, val, 0, 0, null, null, null)); } @Override public void take(DataSegmentReceiver receiver, String key, int index, CodeSegment cs) { int seq = this.seq.getAndIncrement(); - Command cmd = new Command(CommandType.TAKE, receiver, key, null, index, seq, replyQueue, cs); + Command cmd = new Command(CommandType.TAKE, receiver, key, null, index, seq, replyQueue, cs, null); seqHash.put(seq, cmd); connection.sendCommand(cmd); } @@ -78,14 +78,14 @@ @Override public void peek(DataSegmentReceiver receiver, String key, int index, CodeSegment cs) { int seq = this.seq.getAndIncrement(); - Command cmd = new Command(CommandType.PEEK, receiver, key, null, index, seq, replyQueue, cs); + Command cmd = new Command(CommandType.PEEK, receiver, key, null, index, seq, replyQueue, cs, null); seqHash.put(seq, cmd); connection.sendCommand(cmd); } @Override public void remove(String key) { - connection.sendCommand(new Command(CommandType.REMOVE, null, key, null, 0, 0, null, null)); + connection.sendCommand(new Command(CommandType.REMOVE, null, key, null, 0, 0, null, null, null)); } } diff -r f54dcbebde3a -r 98ab26e09a98 src/alice/topology/HostMessage.java --- a/src/alice/topology/HostMessage.java Tue Jan 17 03:52:39 2012 +0900 +++ b/src/alice/topology/HostMessage.java Tue Jan 17 08:41:34 2012 +0900 @@ -9,6 +9,7 @@ public String name; public int port; @Optional public String connectionName; + @Optional public String reverseName; public HostMessage() { } public HostMessage(String name, int port) { @@ -16,10 +17,11 @@ this.port = port; } - public HostMessage(String name, int port, String connectionName) { + public HostMessage(String name, int port, String connectionName, String reverseName) { this.name = name; this.port = port; this.connectionName = connectionName; + this.reverseName = reverseName; } } diff -r f54dcbebde3a -r 98ab26e09a98 src/alice/topology/manager/IncomingHosts.java --- a/src/alice/topology/manager/IncomingHosts.java Tue Jan 17 03:52:39 2012 +0900 +++ b/src/alice/topology/manager/IncomingHosts.java Tue Jan 17 08:41:34 2012 +0900 @@ -33,11 +33,11 @@ try { HostMessage host = msgpack.convert(this.host.val, HostMessage.class); String nodeName = nodeNames.poll(); - DataSegmentManager manager = DataSegment.connect(nodeName, host.name, host.port); + DataSegmentManager manager = DataSegment.connect(nodeName, "", host.name, host.port); manager.put("host", ValueFactory.createRawValue(nodeName)); LinkedList nodes = topology.get(nodeName); for (NodeInfo nodeInfo : nodes) { - HostMessage newHost = new HostMessage(host.name, host.port, nodeInfo.connectionName); + HostMessage newHost = new HostMessage(host.name, host.port, nodeInfo.connectionName, nodeInfo.reverseName); ods.put("local", nodeInfo.sourceNodeName, msgpack.unconvert(newHost)); } } catch (IOException e) { diff -r f54dcbebde3a -r 98ab26e09a98 src/alice/topology/manager/NodeInfo.java --- a/src/alice/topology/manager/NodeInfo.java Tue Jan 17 03:52:39 2012 +0900 +++ b/src/alice/topology/manager/NodeInfo.java Tue Jan 17 08:41:34 2012 +0900 @@ -4,11 +4,11 @@ public String sourceNodeName; public String connectionName; + public String reverseName; public NodeInfo(String source, String connection) { this.sourceNodeName = source; this.connectionName = connection; - } } diff -r f54dcbebde3a -r 98ab26e09a98 src/alice/topology/manager/StartTopologyManager.java --- a/src/alice/topology/manager/StartTopologyManager.java Tue Jan 17 03:52:39 2012 +0900 +++ b/src/alice/topology/manager/StartTopologyManager.java Tue Jan 17 08:41:34 2012 +0900 @@ -43,12 +43,25 @@ topology.put(nodeName, new LinkedList()); } ArrayList edges = graph.getEdges(); + HashMap hash = new HashMap(); for (Edge edge : edges) { String connection = edge.getAttribute("label"); String source = edge.getSource().getNode().getId().getId(); - String target = edge.getSource().getNode().getId().getId(); + String target = edge.getTarget().getNode().getId().getId(); LinkedList sources = topology.get(target); - sources.add(new NodeInfo(source, connection)); + NodeInfo nodeInfo = new NodeInfo(source, connection); + sources.add(nodeInfo); + hash.put(source + target, nodeInfo); + } + for (Edge edge : edges) { + String connection = edge.getAttribute("label"); + String source = edge.getSource().getNode().getId().getId(); + String target = edge.getTarget().getNode().getId().getId(); + NodeInfo nodeInfo = hash.get(target + source); + if (nodeInfo != null) { + nodeInfo.reverseName = connection; + } + } } diff -r f54dcbebde3a -r 98ab26e09a98 src/alice/topology/node/ConfigurationFinish.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/alice/topology/node/ConfigurationFinish.java Tue Jan 17 08:41:34 2012 +0900 @@ -0,0 +1,38 @@ +package alice.topology.node; + +import alice.codesegment.CodeSegment; +import alice.datasegment.CommandType; +import alice.datasegment.DataSegmentReceiver; + +public class ConfigurationFinish extends CodeSegment { + + public DataSegmentReceiver reverseCount = new DataSegmentReceiver(ids, CommandType.PEEK); + public DataSegmentReceiver configNodeNum = new DataSegmentReceiver(ids, CommandType.PEEK); + private Class clazz; + + public ConfigurationFinish(Class clazz) { + this.clazz = clazz; + } + + @Override + public void run() { + if (reverseCount.val.equals(configNodeNum.val)) { + System.out.println("Configuration finished"); + if (clazz == null) + return; + try { + clazz.newInstance().execute(); + } catch (InstantiationException e) { + e.printStackTrace(); + } catch (IllegalAccessException e) { + e.printStackTrace(); + } + return; + } + + ConfigurationFinish cs3 = new ConfigurationFinish(clazz); + cs3.reverseCount.setKey("local", "reverseCount"); + cs3.configNodeNum.setKey("local", "configNodeNum"); + } + +} diff -r f54dcbebde3a -r 98ab26e09a98 src/alice/topology/node/IncomingAbstractHostName.java --- a/src/alice/topology/node/IncomingAbstractHostName.java Tue Jan 17 03:52:39 2012 +0900 +++ b/src/alice/topology/node/IncomingAbstractHostName.java Tue Jan 17 08:41:34 2012 +0900 @@ -7,16 +7,11 @@ public class IncomingAbstractHostName extends CodeSegment { public DataSegmentReceiver absName = new DataSegmentReceiver(ids, CommandType.PEEK); - private Class clazz; - - public IncomingAbstractHostName(Class clazz) { - this.clazz = clazz; - } @Override public void run() { String absName = this.absName.val.asRawValue().getString(); - IncomingConnectionInfo cs = new IncomingConnectionInfo(absName, clazz); + IncomingConnectionInfo cs = new IncomingConnectionInfo(absName, 0); cs.hostInfo.setKey("manager", absName); } diff -r f54dcbebde3a -r 98ab26e09a98 src/alice/topology/node/IncomingConnectionInfo.java --- a/src/alice/topology/node/IncomingConnectionInfo.java Tue Jan 17 03:52:39 2012 +0900 +++ b/src/alice/topology/node/IncomingConnectionInfo.java Tue Jan 17 08:41:34 2012 +0900 @@ -3,10 +3,12 @@ import java.io.IOException; import org.msgpack.MessagePack; +import org.msgpack.type.ValueFactory; import alice.codesegment.CodeSegment; import alice.datasegment.CommandType; import alice.datasegment.DataSegment; +import alice.datasegment.DataSegmentManager; import alice.datasegment.DataSegmentReceiver; import alice.topology.HostMessage; @@ -14,37 +16,30 @@ public DataSegmentReceiver hostInfo = new DataSegmentReceiver(ids, CommandType.TAKE); private String absName; - private Class clazz; + private int count; - public IncomingConnectionInfo(String absName, Class clazz) { + public IncomingConnectionInfo(String absName, int count) { this.absName = absName; - this.clazz = clazz; + this.count = count; } @Override public void run() { if (this.hostInfo.val == null) { - System.out.println("Configuration finished"); - if (clazz == null) - return; - try { - clazz.newInstance().execute(); - } catch (InstantiationException e) { - e.printStackTrace(); - } catch (IllegalAccessException e) { - e.printStackTrace(); - } + ods.put("local", "configNodeNum", ValueFactory.createIntegerValue(count)); + return; } MessagePack msgpack = new MessagePack(); try { HostMessage hostInfo = msgpack.convert(this.hostInfo.val, HostMessage.class); - DataSegment.connect(hostInfo.connectionName, hostInfo.name, hostInfo.port); + DataSegmentManager manager = DataSegment.connect(hostInfo.connectionName, hostInfo.reverseName, hostInfo.name, hostInfo.port); + manager.put("reverseKey", ValueFactory.createRawValue(hostInfo.reverseName)); } catch (IOException e) { e.printStackTrace(); } - IncomingConnectionInfo cs = new IncomingConnectionInfo(absName, clazz); + IncomingConnectionInfo cs = new IncomingConnectionInfo(absName, ++count); cs.hostInfo.setKey("manager", absName); } diff -r f54dcbebde3a -r 98ab26e09a98 src/alice/topology/node/IncomingReverseKey.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/alice/topology/node/IncomingReverseKey.java Tue Jan 17 08:41:34 2012 +0900 @@ -0,0 +1,30 @@ +package alice.topology.node; + +import org.msgpack.type.ValueFactory; + +import alice.codesegment.CodeSegment; +import alice.datasegment.CommandType; +import alice.datasegment.DataSegment; +import alice.datasegment.DataSegmentReceiver; + +public class IncomingReverseKey extends CodeSegment { + + public DataSegmentReceiver reverseKey = new DataSegmentReceiver(ids, CommandType.TAKE); + public DataSegmentReceiver reverseCount = new DataSegmentReceiver(ids, CommandType.PEEK); + @Override + public void run() { + String reverseKey = this.reverseKey.val.asRawValue().getString(); + String from = this.reverseKey.from; + DataSegment.getAccept(from).reverseKey = reverseKey; + + int reverseCount = this.reverseCount.val.asIntegerValue().getInt(); + reverseCount++; + ods.update("local", "reverseCount", ValueFactory.createIntegerValue(reverseCount)); + + + IncomingReverseKey cs = new IncomingReverseKey(); + cs.reverseKey.setKey("local", "reverseKey"); + cs.reverseCount.setKey("local", "reverseCount"); + } + +} diff -r f54dcbebde3a -r 98ab26e09a98 src/alice/topology/node/StartTopologyNode.java --- a/src/alice/topology/node/StartTopologyNode.java Tue Jan 17 03:52:39 2012 +0900 +++ b/src/alice/topology/node/StartTopologyNode.java Tue Jan 17 08:41:34 2012 +0900 @@ -5,6 +5,7 @@ import java.net.UnknownHostException; import org.msgpack.MessagePack; +import org.msgpack.type.ValueFactory; import alice.codesegment.CodeSegment; import alice.datasegment.DataSegment; @@ -23,7 +24,7 @@ @Override public void run() { - DataSegmentManager manager = DataSegment.connect("manager", conf.managerHostName, conf.managerPort); + DataSegmentManager manager = DataSegment.connect("manager", "", conf.managerHostName, conf.managerPort); try { HostMessage host; host = new HostMessage(InetAddress.getLocalHost().getHostName(), conf.localPort); @@ -35,8 +36,18 @@ e.printStackTrace(); } - IncomingAbstractHostName cs = new IncomingAbstractHostName(clazz); - cs.absName.setKey("local", "host"); + IncomingAbstractHostName cs1 = new IncomingAbstractHostName(); + cs1.absName.setKey("local", "host"); + + IncomingReverseKey cs2 = new IncomingReverseKey(); + cs2.reverseKey.setKey("local", "reverseKey"); + cs2.reverseCount.setKey("local", "reverseCount"); + + ods.put("local", "reverseCount", ValueFactory.createIntegerValue(0)); + + ConfigurationFinish cs3 = new ConfigurationFinish(clazz); + cs3.reverseCount.setKey("local", "reverseCount"); + cs3.configNodeNum.setKey("local", "configNodeNum"); } }