Mercurial > hg > Members > tatsuki > Alice
changeset 27:f54dcbebde3a
topology manager work!
author | kazz <kazz@cr.ie.u-ryukyu.ac.jp> |
---|---|
date | Tue, 17 Jan 2012 03:52:39 +0900 |
parents | 9c6b9e032338 |
children | 98ab26e09a98 |
files | src/alice/datasegment/RemoteDataSegmentManager.java src/alice/topology/manager/IncomingHosts.java src/alice/topology/manager/NodeInfo.java src/alice/topology/manager/StartTopologyManager.java src/alice/topology/manager/TopologyManager.java src/alice/topology/manager/TopologyManagerConfig.java src/alice/topology/node/IncomingConnectionInfo.java src/alice/topology/node/StartTopologyNode.java src/topology/manager/IncomingHosts.java src/topology/manager/NodeInfo.java src/topology/manager/StartTopologyManager.java src/topology/manager/TopologyManager.java src/topology/manager/TopologyManagerConfig.java |
diffstat | 13 files changed, 174 insertions(+), 176 deletions(-) [+] |
line wrap: on
line diff
--- a/src/alice/datasegment/RemoteDataSegmentManager.java Tue Jan 17 03:11:23 2012 +0900 +++ b/src/alice/datasegment/RemoteDataSegmentManager.java Tue Jan 17 03:52:39 2012 +0900 @@ -31,9 +31,7 @@ public RemoteDataSegmentManager(String key, final String hostName, final int port) { connection = new Connection(); final RemoteDataSegmentManager manager = this; - new Thread(replyThread, "RemoteDataSegmentManager-" - + connection.socket.getInetAddress().getHostName() - + ":" + connection.socket.getPort()).start(); + new Thread(replyThread, "RemoteDataSegmentManager-" + key).start(); new Thread("Connect-" + key) { public void run() { boolean connect = true;
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/alice/topology/manager/IncomingHosts.java Tue Jan 17 03:52:39 2012 +0900 @@ -0,0 +1,59 @@ +package alice.topology.manager; + +import java.io.IOException; +import java.util.HashMap; +import java.util.LinkedList; + +import org.apache.log4j.Logger; +import org.msgpack.MessagePack; +import org.msgpack.type.ValueFactory; + +import alice.codesegment.CodeSegment; +import alice.datasegment.CommandType; +import alice.datasegment.DataSegment; +import alice.datasegment.DataSegmentManager; +import alice.datasegment.DataSegmentReceiver; +import alice.topology.HostMessage; + +public class IncomingHosts extends CodeSegment { + + HashMap<String, LinkedList<NodeInfo>> topology; + LinkedList<String> nodeNames; + DataSegmentReceiver host = new DataSegmentReceiver(ids, CommandType.TAKE); + Logger logger = Logger.getLogger(IncomingHosts.class); + + public IncomingHosts(HashMap<String, LinkedList<NodeInfo>> topology, LinkedList<String> nodeNames) { + this.topology = topology; + this.nodeNames = nodeNames; + } + + @Override + public void run() { + MessagePack msgpack = new MessagePack(); + try { + HostMessage host = msgpack.convert(this.host.val, HostMessage.class); + String nodeName = nodeNames.poll(); + DataSegmentManager manager = DataSegment.connect(nodeName, host.name, host.port); + manager.put("host", ValueFactory.createRawValue(nodeName)); + LinkedList<NodeInfo> nodes = topology.get(nodeName); + for (NodeInfo nodeInfo : nodes) { + HostMessage newHost = new HostMessage(host.name, host.port, nodeInfo.connectionName); + ods.put("local", nodeInfo.sourceNodeName, msgpack.unconvert(newHost)); + } + } catch (IOException e) { + logger.error("HostMessage format error"); + e.printStackTrace(); + } + + if (nodeNames.isEmpty()) { + // configuration finish + for (String key : topology.keySet()) { + ods.put("local", key, ValueFactory.createNilValue()); + } + } else { + IncomingHosts cs = new IncomingHosts(topology, nodeNames); + cs.host.setKey("local", "host"); + } + } + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/alice/topology/manager/NodeInfo.java Tue Jan 17 03:52:39 2012 +0900 @@ -0,0 +1,14 @@ +package alice.topology.manager; + +public class NodeInfo { + + public String sourceNodeName; + public String connectionName; + + public NodeInfo(String source, String connection) { + this.sourceNodeName = source; + this.connectionName = connection; + + } + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/alice/topology/manager/StartTopologyManager.java Tue Jan 17 03:52:39 2012 +0900 @@ -0,0 +1,67 @@ +package alice.topology.manager; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileReader; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedList; + +import org.apache.log4j.Logger; + +import com.alexmerz.graphviz.ParseException; +import com.alexmerz.graphviz.Parser; +import com.alexmerz.graphviz.objects.Edge; +import com.alexmerz.graphviz.objects.Graph; +import com.alexmerz.graphviz.objects.Node; + +import alice.codesegment.CodeSegment; + +public class StartTopologyManager extends CodeSegment { + + TopologyManagerConfig conf; + Logger logger = Logger.getLogger(StartTopologyManager.class); + + public StartTopologyManager(TopologyManagerConfig conf) { + this.conf = conf; + } + + @Override + public void run() { + LinkedList<String> nodeNames = new LinkedList<String>(); + HashMap<String, LinkedList<NodeInfo>> topology = new HashMap<String, LinkedList<NodeInfo>>(); + try { + FileReader reader = new FileReader(new File(conf.confFilePath)); + Parser parser = new Parser(); + parser.parse(reader); + ArrayList<Graph> graphs = parser.getGraphs(); + for (Graph graph : graphs) { + ArrayList<Node> nodes = graph.getNodes(false); + for (Node node : nodes) { + String nodeName = node.getId().getId(); + nodeNames.add(nodeName); + topology.put(nodeName, new LinkedList<NodeInfo>()); + } + ArrayList<Edge> edges = graph.getEdges(); + for (Edge edge : edges) { + String connection = edge.getAttribute("label"); + String source = edge.getSource().getNode().getId().getId(); + String target = edge.getSource().getNode().getId().getId(); + LinkedList<NodeInfo> sources = topology.get(target); + sources.add(new NodeInfo(source, connection)); + } + } + + } catch (FileNotFoundException e) { + logger.error("File not found: " + conf.confFilePath); + e.printStackTrace(); + } catch (ParseException e) { + logger.error("File format error: " + conf.confFilePath); + e.printStackTrace(); + } + + IncomingHosts cs = new IncomingHosts(topology, nodeNames); + cs.host.setKey("local", "host"); + } + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/alice/topology/manager/TopologyManager.java Tue Jan 17 03:52:39 2012 +0900 @@ -0,0 +1,13 @@ +package alice.topology.manager; + +import alice.daemon.AliceDaemon; + +public class TopologyManager { + + public static void main(String[] args) { + TopologyManagerConfig conf = new TopologyManagerConfig(args); + new AliceDaemon(conf).listen(); + new StartTopologyManager(conf).execute(); + } + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/alice/topology/manager/TopologyManagerConfig.java Tue Jan 17 03:52:39 2012 +0900 @@ -0,0 +1,18 @@ +package alice.topology.manager; + +import alice.daemon.Config; + +public class TopologyManagerConfig extends Config { + + public String confFilePath; + + public TopologyManagerConfig(String[] args) { + super(args); + for (int i = 0; i < args.length; i++) { + if ("-conf".equals(args[i])) { + confFilePath = args[++i]; + } + } + } + +}
--- a/src/alice/topology/node/IncomingConnectionInfo.java Tue Jan 17 03:11:23 2012 +0900 +++ b/src/alice/topology/node/IncomingConnectionInfo.java Tue Jan 17 03:52:39 2012 +0900 @@ -23,7 +23,7 @@ @Override public void run() { - if (this.hostInfo.val.isNilValue()) { + if (this.hostInfo.val == null) { System.out.println("Configuration finished"); if (clazz == null) return;
--- a/src/alice/topology/node/StartTopologyNode.java Tue Jan 17 03:11:23 2012 +0900 +++ b/src/alice/topology/node/StartTopologyNode.java Tue Jan 17 03:52:39 2012 +0900 @@ -23,7 +23,7 @@ @Override public void run() { - DataSegmentManager manager = DataSegment.connect("topology_manager", conf.managerHostName, conf.managerPort); + DataSegmentManager manager = DataSegment.connect("manager", conf.managerHostName, conf.managerPort); try { HostMessage host; host = new HostMessage(InetAddress.getLocalHost().getHostName(), conf.localPort);
--- a/src/topology/manager/IncomingHosts.java Tue Jan 17 03:11:23 2012 +0900 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,59 +0,0 @@ -package topology.manager; - -import java.io.IOException; -import java.util.HashMap; -import java.util.LinkedList; - -import org.apache.log4j.Logger; -import org.msgpack.MessagePack; -import org.msgpack.type.ValueFactory; - -import alice.codesegment.CodeSegment; -import alice.datasegment.CommandType; -import alice.datasegment.DataSegment; -import alice.datasegment.DataSegmentManager; -import alice.datasegment.DataSegmentReceiver; -import alice.topology.HostMessage; - -public class IncomingHosts extends CodeSegment { - - HashMap<String, LinkedList<NodeInfo>> topology; - LinkedList<String> nodeNames; - DataSegmentReceiver host = new DataSegmentReceiver(ids, CommandType.TAKE); - Logger logger = Logger.getLogger(IncomingHosts.class); - - public IncomingHosts(HashMap<String, LinkedList<NodeInfo>> topology, LinkedList<String> nodeNames) { - this.topology = topology; - this.nodeNames = nodeNames; - } - - @Override - public void run() { - MessagePack msgpack = new MessagePack(); - try { - HostMessage host = msgpack.convert(this.host.val, HostMessage.class); - String nodeName = nodeNames.poll(); - DataSegmentManager manager = DataSegment.connect(nodeName, host.name, host.port); - manager.put("host", ValueFactory.createRawValue(nodeName)); - LinkedList<NodeInfo> nodes = topology.get(nodeName); - for (NodeInfo nodeInfo : nodes) { - HostMessage newHost = new HostMessage(host.name, host.port, nodeInfo.connectionName); - ods.put("local", nodeInfo.sourceNodeName, msgpack.unconvert(newHost)); - } - } catch (IOException e) { - logger.error("HostMessage format error"); - e.printStackTrace(); - } - - if (nodeNames.isEmpty()) { - // configuration finish - for (String key : topology.keySet()) { - ods.put("local", key, ValueFactory.createNilValue()); - } - } else { - IncomingHosts cs = new IncomingHosts(topology, nodeNames); - cs.host.setKey("local", "host"); - } - } - -}
--- a/src/topology/manager/NodeInfo.java Tue Jan 17 03:11:23 2012 +0900 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,14 +0,0 @@ -package topology.manager; - -public class NodeInfo { - - public String sourceNodeName; - public String connectionName; - - public NodeInfo(String source, String connection) { - this.sourceNodeName = source; - this.connectionName = connection; - - } - -}
--- a/src/topology/manager/StartTopologyManager.java Tue Jan 17 03:11:23 2012 +0900 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,67 +0,0 @@ -package topology.manager; - -import java.io.File; -import java.io.FileNotFoundException; -import java.io.FileReader; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.LinkedList; - -import org.apache.log4j.Logger; - -import com.alexmerz.graphviz.ParseException; -import com.alexmerz.graphviz.Parser; -import com.alexmerz.graphviz.objects.Edge; -import com.alexmerz.graphviz.objects.Graph; -import com.alexmerz.graphviz.objects.Node; - -import alice.codesegment.CodeSegment; - -public class StartTopologyManager extends CodeSegment { - - TopologyManagerConfig conf; - Logger logger = Logger.getLogger(StartTopologyManager.class); - - public StartTopologyManager(TopologyManagerConfig conf) { - conf = this.conf; - } - - @Override - public void run() { - LinkedList<String> nodeNames = new LinkedList<String>(); - HashMap<String, LinkedList<NodeInfo>> topology = new HashMap<String, LinkedList<NodeInfo>>(); - try { - FileReader reader = new FileReader(new File(conf.confFilePath)); - Parser parser = new Parser(); - parser.parse(reader); - ArrayList<Graph> graphs = parser.getGraphs(); - for (Graph graph : graphs) { - ArrayList<Node> nodes = graph.getNodes(false); - for (Node node : nodes) { - String nodeName = node.getId().getId(); - nodeNames.add(nodeName); - topology.put(nodeName, new LinkedList<NodeInfo>()); - } - ArrayList<Edge> edges = graph.getEdges(); - for (Edge edge : edges) { - String connection = edge.getAttribute("label"); - String source = edge.getSource().getNode().getId().getId(); - String target = edge.getSource().getNode().getId().getId(); - LinkedList<NodeInfo> sources = topology.get(target); - sources.add(new NodeInfo(source, connection)); - } - } - - } catch (FileNotFoundException e) { - logger.error("File not found: " + conf.confFilePath); - e.printStackTrace(); - } catch (ParseException e) { - logger.error("File format error: " + conf.confFilePath); - e.printStackTrace(); - } - - IncomingHosts cs = new IncomingHosts(topology, nodeNames); - cs.host.setKey("local", "host"); - } - -}
--- a/src/topology/manager/TopologyManager.java Tue Jan 17 03:11:23 2012 +0900 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,13 +0,0 @@ -package topology.manager; - -import alice.daemon.AliceDaemon; - -public class TopologyManager { - - public static void main(String[] args) { - TopologyManagerConfig conf = new TopologyManagerConfig(args); - new AliceDaemon(conf).listen(); - new StartTopologyManager(conf).execute(); - } - -}
--- a/src/topology/manager/TopologyManagerConfig.java Tue Jan 17 03:11:23 2012 +0900 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,18 +0,0 @@ -package topology.manager; - -import alice.daemon.Config; - -public class TopologyManagerConfig extends Config { - - public String confFilePath; - - public TopologyManagerConfig(String[] args) { - super(args); - for (int i = 0; i < args.length; i++) { - if ("-conf".equals(args[i])) { - confFilePath = args[++i]; - } - } - } - -}