Mercurial > hg > Database > Alice
changeset 266:c0712e0b0a24
creating reconnect Manager
author | sugi |
---|---|
date | Fri, 16 Aug 2013 19:05:43 +0900 |
parents | 41312b857f34 |
children | fac206b7849c |
files | .classpath .settings/org.eclipse.core.resources.prefs src/alice/topology/HostMessage.java src/alice/topology/manager/IncomingHosts.java src/alice/topology/manager/StartTopologyManager.java src/alice/topology/node/IncomingAbstractHostName.java src/alice/topology/node/StartTopologyNode.java |
diffstat | 7 files changed, 47 insertions(+), 6 deletions(-) [+] |
line wrap: on
line diff
--- a/.classpath Tue Aug 13 06:06:02 2013 +0900 +++ b/.classpath Fri Aug 16 19:05:43 2013 +0900 @@ -5,9 +5,9 @@ <classpathentry exported="true" kind="lib" path="lib/log4j-1.2.16.jar"/> <classpathentry exported="true" kind="lib" path="lib/slf4j-api-1.6.1.jar"/> <classpathentry exported="true" kind="lib" path="lib/slf4j-log4j12-1.6.1.jar"/> - <classpathentry exported="true" kind="lib" path="lib/com.alexmerz.graphviz.jar"/> <classpathentry exported="true" kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.6"/> <classpathentry kind="lib" path="lib/msgpack-0.6.8-SNAPSHOT-sources.jar"/> <classpathentry kind="lib" path="lib/msgpack-0.6.8-SNAPSHOT.jar" sourcepath="lib/msgpack-0.6.8-SNAPSHOT-sources.jar"/> + <classpathentry kind="lib" path="lib/com.alexmerz.graphviz.jar"/> <classpathentry kind="output" path="bin"/> </classpath>
--- a/.settings/org.eclipse.core.resources.prefs Tue Aug 13 06:06:02 2013 +0900 +++ b/.settings/org.eclipse.core.resources.prefs Fri Aug 16 19:05:43 2013 +0900 @@ -2,3 +2,4 @@ encoding//src/alice/test/codesegment/local/bitonicsort/SortTest.java=UTF-8 encoding//src/alice/test/codesegment/local/wordcount/SeparateArray.java=UTF-8 encoding//src/alice/test/codesegment/local/wordcount/WordCount.java=UTF-8 +encoding//src/alice/topology/manager/IncomingHosts.java=UTF-8
--- a/src/alice/topology/HostMessage.java Tue Aug 13 06:06:02 2013 +0900 +++ b/src/alice/topology/HostMessage.java Fri Aug 16 19:05:43 2013 +0900 @@ -1,15 +1,15 @@ package alice.topology; import org.msgpack.annotation.Message; -import org.msgpack.annotation.Optional; @Message public class HostMessage { public String name; public int port; - @Optional public String connectionName; - @Optional public String reverseName; + public String connectionName; + public String reverseName; + public String absName; public HostMessage() { } public HostMessage(String name, int port) { @@ -24,4 +24,8 @@ this.reverseName = reverseName; } + public void setABSName(String name){ + absName = name; + } + }
--- a/src/alice/topology/manager/IncomingHosts.java Tue Aug 13 06:06:02 2013 +0900 +++ b/src/alice/topology/manager/IncomingHosts.java Fri Aug 16 19:05:43 2013 +0900 @@ -1,5 +1,6 @@ package alice.topology.manager; +import java.util.ArrayList; import java.util.HashMap; import java.util.LinkedList; @@ -16,6 +17,7 @@ HashMap<String, LinkedList<NodeInfo>> topology; LinkedList<String> nodeNames; Receiver host = ids.create(CommandType.TAKE); + Receiver connection = ids.create(CommandType.TAKE); public IncomingHosts(HashMap<String, LinkedList<NodeInfo>> topology, LinkedList<String> nodeNames) { this.topology = topology; @@ -25,15 +27,30 @@ @Override public void run() { HostMessage host = this.host.asClass(HostMessage.class); + @SuppressWarnings("unchecked") + HashMap<String, ArrayList<HostMessage>> connectionList = this.connection.asClass(HashMap.class); + String nodeName = nodeNames.poll(); // Manager connect to Node DataSegment.connect(nodeName, "", host.name, host.port); ods.put(nodeName, "host", nodeName); LinkedList<NodeInfo> nodes = topology.get(nodeName); + ArrayList<HostMessage> list; for (NodeInfo nodeInfo : nodes) { + //System.out.println(host.name+" "+host.port+" "+nodeInfo.connectionName+" "+ nodeInfo.reverseName+" "+nodeInfo.sourceNodeName); HostMessage newHost = new HostMessage(host.name, host.port, nodeInfo.connectionName, nodeInfo.reverseName); ods.put("local", nodeInfo.sourceNodeName, newHost); + + if (connectionList.containsKey(nodeInfo.sourceNodeName)){ + list = connectionList.get(nodeInfo.sourceNodeName); + } else { + list = new ArrayList<HostMessage>(); + connectionList.put(nodeInfo.sourceNodeName, list); + + } + list.add(newHost); } + ods.update("local", "connection", connectionList); if (nodeNames.isEmpty()) { // configuration finish @@ -43,6 +60,13 @@ } else { IncomingHosts cs = new IncomingHosts(topology, nodeNames); cs.host.setKey("local", "host"); + cs.connection.setKey("local", "connection"); + } + if (connectionList.containsKey("node0")){ + ArrayList<HostMessage> maplist = connectionList.get("node0"); + for (HostMessage i : maplist){ + System.out.println(i.name+" "+i.port+" "+i.connectionName+" "+i.reverseName); + } } }
--- a/src/alice/topology/manager/StartTopologyManager.java Tue Aug 13 06:06:02 2013 +0900 +++ b/src/alice/topology/manager/StartTopologyManager.java Fri Aug 16 19:05:43 2013 +0900 @@ -10,6 +10,7 @@ import org.apache.log4j.Logger; import alice.codesegment.CodeSegment; +import alice.topology.HostMessage; import com.alexmerz.graphviz.ParseException; import com.alexmerz.graphviz.Parser; @@ -73,9 +74,12 @@ logger.error("File format error: " + conf.confFilePath); e.printStackTrace(); } + HashMap<String, ArrayList<HostMessage>> connectionList = new HashMap<String, ArrayList<HostMessage>>(); + ods.put("local", "connection", connectionList); IncomingHosts cs1 = new IncomingHosts(topology, nodeNames); cs1.host.setKey("local", "host"); + cs1.connection.setKey("local", "connection"); TopologyFinish cs2 = new TopologyFinish(); cs2.finish.setKey("local", "finish");
--- a/src/alice/topology/node/IncomingAbstractHostName.java Tue Aug 13 06:06:02 2013 +0900 +++ b/src/alice/topology/node/IncomingAbstractHostName.java Fri Aug 16 19:05:43 2013 +0900 @@ -3,16 +3,24 @@ import alice.codesegment.CodeSegment; import alice.datasegment.CommandType; import alice.datasegment.Receiver; +import alice.topology.HostMessage; public class IncomingAbstractHostName extends CodeSegment { public Receiver absName = ids.create(CommandType.PEEK); - + private HostMessage host; // need CodeSegment + + public IncomingAbstractHostName(HostMessage _host){ + host = _host; + } + @Override public void run() { String absName = this.absName.asString(); IncomingConnectionInfo cs = new IncomingConnectionInfo(absName, 0); cs.hostInfo.setKey("manager", absName); + host.setABSName(absName); + ods.put("manager","_ABS", host); } }
--- a/src/alice/topology/node/StartTopologyNode.java Tue Aug 13 06:06:02 2013 +0900 +++ b/src/alice/topology/node/StartTopologyNode.java Fri Aug 16 19:05:43 2013 +0900 @@ -33,7 +33,7 @@ HostMessage host = new HostMessage(localHostName, conf.localPort); ods.put("manager", "host", host); - IncomingAbstractHostName cs = new IncomingAbstractHostName(); + IncomingAbstractHostName cs = new IncomingAbstractHostName(host); cs.absName.setKey("local", "host"); IncomingReverseKey cs2 = new IncomingReverseKey();