# HG changeset patch
# User sugi
# Date 1376647543 -32400
# Node ID c0712e0b0a2418802206c746eb1ac80b4d319e32
# Parent 41312b857f3445938ba210f50ab757f2bdf208cc
creating reconnect Manager
diff -r 41312b857f34 -r c0712e0b0a24 .classpath
--- a/.classpath Tue Aug 13 06:06:02 2013 +0900
+++ b/.classpath Fri Aug 16 19:05:43 2013 +0900
@@ -5,9 +5,9 @@
-
+
diff -r 41312b857f34 -r c0712e0b0a24 .settings/org.eclipse.core.resources.prefs
--- a/.settings/org.eclipse.core.resources.prefs Tue Aug 13 06:06:02 2013 +0900
+++ b/.settings/org.eclipse.core.resources.prefs Fri Aug 16 19:05:43 2013 +0900
@@ -2,3 +2,4 @@
encoding//src/alice/test/codesegment/local/bitonicsort/SortTest.java=UTF-8
encoding//src/alice/test/codesegment/local/wordcount/SeparateArray.java=UTF-8
encoding//src/alice/test/codesegment/local/wordcount/WordCount.java=UTF-8
+encoding//src/alice/topology/manager/IncomingHosts.java=UTF-8
diff -r 41312b857f34 -r c0712e0b0a24 src/alice/topology/HostMessage.java
--- a/src/alice/topology/HostMessage.java Tue Aug 13 06:06:02 2013 +0900
+++ b/src/alice/topology/HostMessage.java Fri Aug 16 19:05:43 2013 +0900
@@ -1,15 +1,15 @@
package alice.topology;
import org.msgpack.annotation.Message;
-import org.msgpack.annotation.Optional;
@Message
public class HostMessage {
public String name;
public int port;
- @Optional public String connectionName;
- @Optional public String reverseName;
+ public String connectionName;
+ public String reverseName;
+ public String absName;
public HostMessage() { }
public HostMessage(String name, int port) {
@@ -24,4 +24,8 @@
this.reverseName = reverseName;
}
+ public void setABSName(String name){
+ absName = name;
+ }
+
}
diff -r 41312b857f34 -r c0712e0b0a24 src/alice/topology/manager/IncomingHosts.java
--- a/src/alice/topology/manager/IncomingHosts.java Tue Aug 13 06:06:02 2013 +0900
+++ b/src/alice/topology/manager/IncomingHosts.java Fri Aug 16 19:05:43 2013 +0900
@@ -1,5 +1,6 @@
package alice.topology.manager;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
@@ -16,6 +17,7 @@
HashMap> topology;
LinkedList nodeNames;
Receiver host = ids.create(CommandType.TAKE);
+ Receiver connection = ids.create(CommandType.TAKE);
public IncomingHosts(HashMap> topology, LinkedList nodeNames) {
this.topology = topology;
@@ -25,15 +27,30 @@
@Override
public void run() {
HostMessage host = this.host.asClass(HostMessage.class);
+ @SuppressWarnings("unchecked")
+ HashMap> connectionList = this.connection.asClass(HashMap.class);
+
String nodeName = nodeNames.poll();
// Manager connect to Node
DataSegment.connect(nodeName, "", host.name, host.port);
ods.put(nodeName, "host", nodeName);
LinkedList nodes = topology.get(nodeName);
+ ArrayList list;
for (NodeInfo nodeInfo : nodes) {
+ //System.out.println(host.name+" "+host.port+" "+nodeInfo.connectionName+" "+ nodeInfo.reverseName+" "+nodeInfo.sourceNodeName);
HostMessage newHost = new HostMessage(host.name, host.port, nodeInfo.connectionName, nodeInfo.reverseName);
ods.put("local", nodeInfo.sourceNodeName, newHost);
+
+ if (connectionList.containsKey(nodeInfo.sourceNodeName)){
+ list = connectionList.get(nodeInfo.sourceNodeName);
+ } else {
+ list = new ArrayList();
+ connectionList.put(nodeInfo.sourceNodeName, list);
+
+ }
+ list.add(newHost);
}
+ ods.update("local", "connection", connectionList);
if (nodeNames.isEmpty()) {
// configuration finish
@@ -43,6 +60,13 @@
} else {
IncomingHosts cs = new IncomingHosts(topology, nodeNames);
cs.host.setKey("local", "host");
+ cs.connection.setKey("local", "connection");
+ }
+ if (connectionList.containsKey("node0")){
+ ArrayList maplist = connectionList.get("node0");
+ for (HostMessage i : maplist){
+ System.out.println(i.name+" "+i.port+" "+i.connectionName+" "+i.reverseName);
+ }
}
}
diff -r 41312b857f34 -r c0712e0b0a24 src/alice/topology/manager/StartTopologyManager.java
--- a/src/alice/topology/manager/StartTopologyManager.java Tue Aug 13 06:06:02 2013 +0900
+++ b/src/alice/topology/manager/StartTopologyManager.java Fri Aug 16 19:05:43 2013 +0900
@@ -10,6 +10,7 @@
import org.apache.log4j.Logger;
import alice.codesegment.CodeSegment;
+import alice.topology.HostMessage;
import com.alexmerz.graphviz.ParseException;
import com.alexmerz.graphviz.Parser;
@@ -73,9 +74,12 @@
logger.error("File format error: " + conf.confFilePath);
e.printStackTrace();
}
+ HashMap> connectionList = new HashMap>();
+ ods.put("local", "connection", connectionList);
IncomingHosts cs1 = new IncomingHosts(topology, nodeNames);
cs1.host.setKey("local", "host");
+ cs1.connection.setKey("local", "connection");
TopologyFinish cs2 = new TopologyFinish();
cs2.finish.setKey("local", "finish");
diff -r 41312b857f34 -r c0712e0b0a24 src/alice/topology/node/IncomingAbstractHostName.java
--- a/src/alice/topology/node/IncomingAbstractHostName.java Tue Aug 13 06:06:02 2013 +0900
+++ b/src/alice/topology/node/IncomingAbstractHostName.java Fri Aug 16 19:05:43 2013 +0900
@@ -3,16 +3,24 @@
import alice.codesegment.CodeSegment;
import alice.datasegment.CommandType;
import alice.datasegment.Receiver;
+import alice.topology.HostMessage;
public class IncomingAbstractHostName extends CodeSegment {
public Receiver absName = ids.create(CommandType.PEEK);
-
+ private HostMessage host; // need CodeSegment
+
+ public IncomingAbstractHostName(HostMessage _host){
+ host = _host;
+ }
+
@Override
public void run() {
String absName = this.absName.asString();
IncomingConnectionInfo cs = new IncomingConnectionInfo(absName, 0);
cs.hostInfo.setKey("manager", absName);
+ host.setABSName(absName);
+ ods.put("manager","_ABS", host);
}
}
diff -r 41312b857f34 -r c0712e0b0a24 src/alice/topology/node/StartTopologyNode.java
--- a/src/alice/topology/node/StartTopologyNode.java Tue Aug 13 06:06:02 2013 +0900
+++ b/src/alice/topology/node/StartTopologyNode.java Fri Aug 16 19:05:43 2013 +0900
@@ -33,7 +33,7 @@
HostMessage host = new HostMessage(localHostName, conf.localPort);
ods.put("manager", "host", host);
- IncomingAbstractHostName cs = new IncomingAbstractHostName();
+ IncomingAbstractHostName cs = new IncomingAbstractHostName(host);
cs.absName.setKey("local", "host");
IncomingReverseKey cs2 = new IncomingReverseKey();