Mercurial > hg > Database > Alice
changeset 401:91e1b063a89f dispose
haven't to remove manager package
author | sugi |
---|---|
date | Sun, 22 Jun 2014 23:12:02 +0900 |
parents | 3a0056e03040 |
children | ca2947303438 |
files | src/main/java/alice/topology/manager/ConfigWaiter.java src/main/java/alice/topology/manager/IncomingHosts.java src/main/java/alice/topology/manager/NodeInfo.java src/main/java/alice/topology/manager/StartTopologyManager.java src/main/java/alice/topology/manager/TopologyFinish.java src/main/java/alice/topology/manager/TopologyManager.java src/main/java/alice/topology/manager/TopologyManagerConfig.java |
diffstat | 7 files changed, 224 insertions(+), 0 deletions(-) [+] |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/main/java/alice/topology/manager/ConfigWaiter.java Sun Jun 22 23:12:02 2014 +0900 @@ -0,0 +1,29 @@ +package alice.topology.manager; + +import org.msgpack.type.ValueFactory; + +import alice.codesegment.CodeSegment; +import alice.datasegment.CommandType; +import alice.datasegment.Receiver; + +public class ConfigWaiter extends CodeSegment { + + public Receiver done = ids.create(CommandType.TAKE); + public int count; + + public ConfigWaiter(int nodeNum) { + this.count = nodeNum; + } + + @Override + public void run() { + count--; + if (count == 0) { + ods.put("local", "start", ValueFactory.createNilValue()); + return; + } + ConfigWaiter cs3 = new ConfigWaiter(count); + cs3.done.setKey("local", "done"); + } + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/main/java/alice/topology/manager/IncomingHosts.java Sun Jun 22 23:12:02 2014 +0900 @@ -0,0 +1,49 @@ +package alice.topology.manager; + +import java.util.HashMap; +import java.util.LinkedList; + +import org.msgpack.type.ValueFactory; + +import alice.codesegment.CodeSegment; +import alice.datasegment.CommandType; +import alice.datasegment.DataSegment; +import alice.datasegment.Receiver; +import alice.topology.HostMessage; + +public class IncomingHosts extends CodeSegment { + + HashMap<String, LinkedList<NodeInfo>> topology; + LinkedList<String> nodeNames; + Receiver host = ids.create(CommandType.TAKE); + + public IncomingHosts(HashMap<String, LinkedList<NodeInfo>> topology, LinkedList<String> nodeNames) { + this.topology = topology; + this.nodeNames = nodeNames; + } + + @Override + public void run() { + HostMessage host = this.host.asClass(HostMessage.class); + String nodeName = nodeNames.poll(); + // Manager connect to Node + DataSegment.connect(nodeName, "", host.name, host.port); + ods.put(nodeName, "host", nodeName); + LinkedList<NodeInfo> nodes = topology.get(nodeName); + for (NodeInfo nodeInfo : nodes) { + HostMessage newHost = new HostMessage(host.name, host.port, nodeInfo.connectionName, nodeInfo.reverseName); + ods.put("local", nodeInfo.sourceNodeName, newHost); + } + + if (nodeNames.isEmpty()) { + // configuration finish + for (String key : topology.keySet()) { + ods.put("local", key, ValueFactory.createNilValue()); + } + } else { + IncomingHosts cs = new IncomingHosts(topology, nodeNames); + cs.host.setKey("local", "host"); + } + } + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/main/java/alice/topology/manager/NodeInfo.java Sun Jun 22 23:12:02 2014 +0900 @@ -0,0 +1,14 @@ +package alice.topology.manager; + +public class NodeInfo { + + public String sourceNodeName; + public String connectionName; + public String reverseName; + + public NodeInfo(String source, String connection) { + this.sourceNodeName = source; + this.connectionName = connection; + } + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/main/java/alice/topology/manager/StartTopologyManager.java Sun Jun 22 23:12:02 2014 +0900 @@ -0,0 +1,87 @@ +package alice.topology.manager; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileReader; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedList; + +import org.apache.log4j.Logger; + +import alice.codesegment.CodeSegment; + +import com.alexmerz.graphviz.ParseException; +import com.alexmerz.graphviz.Parser; +import com.alexmerz.graphviz.objects.Edge; +import com.alexmerz.graphviz.objects.Graph; +import com.alexmerz.graphviz.objects.Node; + +public class StartTopologyManager extends CodeSegment { + + TopologyManagerConfig conf; + Logger logger = Logger.getLogger(StartTopologyManager.class); + + public StartTopologyManager(TopologyManagerConfig conf) { + this.conf = conf; + } + + @Override + public void run() { + LinkedList<String> nodeNames = new LinkedList<String>(); + HashMap<String, LinkedList<NodeInfo>> topology = new HashMap<String, LinkedList<NodeInfo>>(); + int nodeNum = 0; + try { + FileReader reader = new FileReader(new File(conf.confFilePath)); + Parser parser = new Parser(); + parser.parse(reader); + ArrayList<Graph> graphs = parser.getGraphs(); + for (Graph graph : graphs) { + ArrayList<Node> nodes = graph.getNodes(false); + nodeNum = nodes.size(); + for (Node node : nodes) { + String nodeName = node.getId().getId(); + nodeNames.add(nodeName); + topology.put(nodeName, new LinkedList<NodeInfo>()); + } + ArrayList<Edge> edges = graph.getEdges(); + HashMap<String, NodeInfo> hash = new HashMap<String, NodeInfo>(); + for (Edge edge : edges) { + String connection = edge.getAttribute("label"); + String source = edge.getSource().getNode().getId().getId(); + String target = edge.getTarget().getNode().getId().getId(); + LinkedList<NodeInfo> sources = topology.get(target); + NodeInfo nodeInfo = new NodeInfo(source, connection); + sources.add(nodeInfo); + hash.put(source + "," + target, nodeInfo); + } + for (Edge edge : edges) { + String connection = edge.getAttribute("label"); + String source = edge.getSource().getNode().getId().getId(); + String target = edge.getTarget().getNode().getId().getId(); + NodeInfo nodeInfo = hash.get(target + "," + source); + if (nodeInfo != null) { + nodeInfo.reverseName = connection; + } + } + } + + } catch (FileNotFoundException e) { + logger.error("File not found: " + conf.confFilePath); + e.printStackTrace(); + } catch (ParseException e) { + logger.error("File format error: " + conf.confFilePath); + e.printStackTrace(); + } + + IncomingHosts cs1 = new IncomingHosts(topology, nodeNames); + cs1.host.setKey("local", "host"); + + TopologyFinish cs2 = new TopologyFinish(); + cs2.finish.setKey("local", "finish"); + + ConfigWaiter cs3 = new ConfigWaiter(nodeNum); + cs3.done.setKey("local", "done"); + } + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/main/java/alice/topology/manager/TopologyFinish.java Sun Jun 22 23:12:02 2014 +0900 @@ -0,0 +1,14 @@ +package alice.topology.manager; + +import alice.codesegment.CodeSegment; +import alice.datasegment.CommandType; +import alice.datasegment.Receiver; + +public class TopologyFinish extends CodeSegment { + public Receiver finish = ids.create(CommandType.TAKE); + @Override + public void run() { + System.exit(0); + } + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/main/java/alice/topology/manager/TopologyManager.java Sun Jun 22 23:12:02 2014 +0900 @@ -0,0 +1,13 @@ +package alice.topology.manager; + +import alice.daemon.AliceDaemon; + +public class TopologyManager { + + public static void main(String[] args) { + TopologyManagerConfig conf = new TopologyManagerConfig(args); + new AliceDaemon(conf).listen(); + new StartTopologyManager(conf).execute(); + } + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/main/java/alice/topology/manager/TopologyManagerConfig.java Sun Jun 22 23:12:02 2014 +0900 @@ -0,0 +1,18 @@ +package alice.topology.manager; + +import alice.daemon.Config; + +public class TopologyManagerConfig extends Config { + + public String confFilePath; + + public TopologyManagerConfig(String[] args) { + super(args); + for (int i = 0; i < args.length; i++) { + if ("-conf".equals(args[i])) { + confFilePath = args[++i]; + } + } + } + +}