Mercurial > hg > Database > Alice
changeset 22:2ca2d961a8d2
implements outline of TopologyManager
line wrap: on
line diff
--- a/.classpath Sun Jan 15 19:02:01 2012 +0900 +++ b/.classpath Tue Jan 17 00:40:27 2012 +0900 @@ -7,5 +7,6 @@ <classpathentry kind="lib" path="lib/slf4j-api-1.6.1.jar"/> <classpathentry kind="lib" path="lib/slf4j-log4j12-1.6.1.jar"/> <classpathentry kind="lib" path="lib/msgpack-0.6.5-SNAPSHOT.jar" sourcepath="lib/msgpack-0.6.5-SNAPSHOT-sources.jar"/> + <classpathentry kind="lib" path="lib/com.alexmerz.graphviz.jar"/> <classpathentry kind="output" path="bin"/> </classpath>
--- a/src/alice/daemon/AliceDaemon.java Sun Jan 15 19:02:01 2012 +0900 +++ b/src/alice/daemon/AliceDaemon.java Tue Jan 17 00:40:27 2012 +0900 @@ -20,8 +20,8 @@ ServerSocketChannel ssChannel = ServerSocketChannel.open(); ServerSocket ss = ssChannel.socket(); ss.setReuseAddress(true); - ss.bind(new InetSocketAddress(InetAddress.getLocalHost(), conf.port)); - acceptThread = new AcceptThread(ss, "ACCEPT" + conf.port); + ss.bind(new InetSocketAddress(InetAddress.getLocalHost(), conf.localPort)); + acceptThread = new AcceptThread(ss, "ACCEPT" + conf.localPort); acceptThread.start(); } catch (IOException e) {
--- a/src/alice/daemon/Config.java Sun Jan 15 19:02:01 2012 +0900 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,24 +0,0 @@ -package alice.daemon; - -public class Config { - - public int port = 10000; - public String hostname; - public int connectPort = 10000; - public String key; - - public Config(String[] args) { - for (int i = 0; i< args.length; i++) { - if ("-p".equals(args[i])) { - port = Integer.parseInt(args[++i]); - } else if ("-h".equals(args[i])) { - hostname = args[++i]; - } else if ("-cp".equals(args[i])) { - connectPort = Integer.parseInt(args[++i]); - } else if ("-key".equals(args[i])) { - key = args[++i]; - } - } - } - -}
--- a/src/alice/test/codesegment/RemoteIncrement.java Sun Jan 15 19:02:01 2012 +0900 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,25 +0,0 @@ -package alice.test.codesegment; - -import org.msgpack.type.ValueFactory; - -import alice.codesegment.CodeSegment; -import alice.datasegment.CommandType; -import alice.datasegment.DataSegmentReceiver; - -public class RemoteIncrement extends CodeSegment { - - public DataSegmentReceiver num = new DataSegmentReceiver(ids, CommandType.TAKE); - - @Override - public void run() { - int num = this.num.val.asIntegerValue().getInt(); - System.out.println("[CodeSegment] " + num++); - if (num == 10) System.exit(0); - - RemoteIncrement cs = new RemoteIncrement(); - cs.num.setKey("remote", "num"); - - ods.put("local", "num", ValueFactory.createIntegerValue(num)); - } - -}
--- a/src/alice/test/codesegment/RemoteStartCodeSegment.java Sun Jan 15 19:02:01 2012 +0900 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,19 +0,0 @@ -package alice.test.codesegment; - -import org.msgpack.type.Value; -import org.msgpack.type.ValueFactory; - -import alice.codesegment.CodeSegment; - -public class RemoteStartCodeSegment extends CodeSegment { - - @Override - public void run() { - RemoteIncrement cs = new RemoteIncrement(); - cs.num.setKey("remote", "num"); - - Value num = ValueFactory.createIntegerValue(0); - ods.put("local", "num", num); - } - -}
--- a/src/alice/test/codesegment/StartCodeSegment.java Sun Jan 15 19:02:01 2012 +0900 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,22 +0,0 @@ -package alice.test.codesegment; - -import org.msgpack.type.Value; -import org.msgpack.type.ValueFactory; - -import alice.codesegment.CodeSegment; - -public class StartCodeSegment extends CodeSegment { - - @Override - public void run() { - System.out.println("run StartCodeSegment"); - - TestCodeSegment cs = new TestCodeSegment(); - cs.arg1.setKey("local", "key1"); - System.out.println("create TestCodeSegment"); - - Value val = ValueFactory.createRawValue("String data"); - ods.update("local", "key1", val); - } - -}
--- a/src/alice/test/codesegment/TestCodeSegment.java Sun Jan 15 19:02:01 2012 +0900 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,32 +0,0 @@ -package alice.test.codesegment; - -import org.msgpack.type.Value; -import org.msgpack.type.ValueFactory; - -import alice.codesegment.CodeSegment; -import alice.datasegment.CommandType; -import alice.datasegment.DataSegmentReceiver; - -public class TestCodeSegment extends CodeSegment { - - DataSegmentReceiver arg1 = new DataSegmentReceiver(ids, CommandType.PEEK); - - @Override - public void run() { - System.out.println("index = " + arg1.index); - System.out.println("data = " + arg1.val); - System.out.println(arg1.val.getType()); - - if (arg1.index == 10) { - System.exit(0); - return; - } - - TestCodeSegment cs = new TestCodeSegment(); - cs.arg1.setKey("local", "key1", arg1.index); - - Value val = ValueFactory.createRawValue("String data"); - ods.update("local", "key1", val); - } - -}
--- a/src/alice/test/codesegment/TestLocalAlice.java Sun Jan 15 19:02:01 2012 +0900 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,8 +0,0 @@ -package alice.test.codesegment; - -public class TestLocalAlice { - public static void main(String args[]) { - new StartCodeSegment().execute(); - } - -}
--- a/src/alice/test/codesegment/TestRemoteAlice.java Sun Jan 15 19:02:01 2012 +0900 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,40 +0,0 @@ -package alice.test.codesegment; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.nio.channels.SocketChannel; - -import alice.daemon.AliceDaemon; -import alice.daemon.Config; -import alice.daemon.Connection; -import alice.datasegment.DataSegment; -import alice.datasegment.RemoteDataSegmentManager; - -public class TestRemoteAlice { - - public static void main(String[] args) { - Config conf = new Config(args); - - new AliceDaemon(conf).listen(); - - boolean connect = true; - do { - try { - SocketChannel sc = SocketChannel.open(new InetSocketAddress(conf.hostname, conf.connectPort)); - Connection connection = new Connection(sc.socket()); - RemoteDataSegmentManager manager = new RemoteDataSegmentManager(connection); - DataSegment.regist(conf.key, manager); - connect = false; - } catch (IOException e) { - try { - Thread.sleep(500); - } catch (InterruptedException e1) { - e1.printStackTrace(); - } - } - } while (connect); - - new RemoteStartCodeSegment().execute(); - } - -}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/alice/test/codesegment/local/StartCodeSegment.java Tue Jan 17 00:40:27 2012 +0900 @@ -0,0 +1,22 @@ +package alice.test.codesegment.local; + +import org.msgpack.type.Value; +import org.msgpack.type.ValueFactory; + +import alice.codesegment.CodeSegment; + +public class StartCodeSegment extends CodeSegment { + + @Override + public void run() { + System.out.println("run StartCodeSegment"); + + TestCodeSegment cs = new TestCodeSegment(); + cs.arg1.setKey("local", "key1"); + System.out.println("create TestCodeSegment"); + + Value val = ValueFactory.createRawValue("String data"); + ods.update("local", "key1", val); + } + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/alice/test/codesegment/local/TestCodeSegment.java Tue Jan 17 00:40:27 2012 +0900 @@ -0,0 +1,32 @@ +package alice.test.codesegment.local; + +import org.msgpack.type.Value; +import org.msgpack.type.ValueFactory; + +import alice.codesegment.CodeSegment; +import alice.datasegment.CommandType; +import alice.datasegment.DataSegmentReceiver; + +public class TestCodeSegment extends CodeSegment { + + DataSegmentReceiver arg1 = new DataSegmentReceiver(ids, CommandType.PEEK); + + @Override + public void run() { + System.out.println("index = " + arg1.index); + System.out.println("data = " + arg1.val); + System.out.println(arg1.val.getType()); + + if (arg1.index == 10) { + System.exit(0); + return; + } + + TestCodeSegment cs = new TestCodeSegment(); + cs.arg1.setKey("local", "key1", arg1.index); + + Value val = ValueFactory.createRawValue("String data"); + ods.update("local", "key1", val); + } + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/alice/test/codesegment/local/TestLocalAlice.java Tue Jan 17 00:40:27 2012 +0900 @@ -0,0 +1,8 @@ +package alice.test.codesegment.local; + +public class TestLocalAlice { + public static void main(String args[]) { + new StartCodeSegment().execute(); + } + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/alice/test/codesegment/remote/RemoteIncrement.java Tue Jan 17 00:40:27 2012 +0900 @@ -0,0 +1,25 @@ +package alice.test.codesegment.remote; + +import org.msgpack.type.ValueFactory; + +import alice.codesegment.CodeSegment; +import alice.datasegment.CommandType; +import alice.datasegment.DataSegmentReceiver; + +public class RemoteIncrement extends CodeSegment { + + public DataSegmentReceiver num = new DataSegmentReceiver(ids, CommandType.TAKE); + + @Override + public void run() { + int num = this.num.val.asIntegerValue().getInt(); + System.out.println("[CodeSegment] " + num++); + if (num == 10) System.exit(0); + + RemoteIncrement cs = new RemoteIncrement(); + cs.num.setKey("remote", "num"); + + ods.put("local", "num", ValueFactory.createIntegerValue(num)); + } + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/alice/test/codesegment/remote/RemoteStartCodeSegment.java Tue Jan 17 00:40:27 2012 +0900 @@ -0,0 +1,19 @@ +package alice.test.codesegment.remote; + +import org.msgpack.type.Value; +import org.msgpack.type.ValueFactory; + +import alice.codesegment.CodeSegment; + +public class RemoteStartCodeSegment extends CodeSegment { + + @Override + public void run() { + RemoteIncrement cs = new RemoteIncrement(); + cs.num.setKey("remote", "num"); + + Value num = ValueFactory.createIntegerValue(0); + ods.put("local", "num", num); + } + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/alice/test/codesegment/remote/TestRemoteAlice.java Tue Jan 17 00:40:27 2012 +0900 @@ -0,0 +1,39 @@ +package alice.test.codesegment.remote; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.channels.SocketChannel; + +import alice.daemon.AliceDaemon; +import alice.daemon.Connection; +import alice.datasegment.DataSegment; +import alice.datasegment.RemoteDataSegmentManager; + +public class TestRemoteAlice { + + public static void main(String[] args) { + TestRemoteConfig conf = new TestRemoteConfig(args); + + new AliceDaemon(conf).listen(); + + boolean connect = true; + do { + try { + SocketChannel sc = SocketChannel.open(new InetSocketAddress(conf.hostname, conf.connectPort)); + Connection connection = new Connection(sc.socket()); + RemoteDataSegmentManager manager = new RemoteDataSegmentManager(connection); + DataSegment.regist(conf.key, manager); + connect = false; + } catch (IOException e) { + try { + Thread.sleep(500); + } catch (InterruptedException e1) { + e1.printStackTrace(); + } + } + } while (connect); + + new RemoteStartCodeSegment().execute(); + } + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/alice/test/codesegment/remote/TestRemoteConfig.java Tue Jan 17 00:40:27 2012 +0900 @@ -0,0 +1,24 @@ +package alice.test.codesegment.remote; + +import alice.daemon.Config; + +public class TestRemoteConfig extends Config { + + public String hostname; + public int connectPort = 10000; + public String key; + + public TestRemoteConfig(String[] args) { + super(args); + for (int i = 0; i< args.length; i++) { + if ("-h".equals(args[i])) { + hostname = args[++i]; + } else if ("-cp".equals(args[i])) { + connectPort = Integer.parseInt(args[++i]); + } else if ("-key".equals(args[i])) { + key = args[++i]; + } + } + } + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/alice/topology/HostMessage.java Tue Jan 17 00:40:27 2012 +0900 @@ -0,0 +1,25 @@ +package alice.topology; + +import org.msgpack.annotation.Message; +import org.msgpack.annotation.Optional; + +@Message +public class HostMessage { + + public String name; + public int port; + @Optional public String connectionName; + + public HostMessage() { } + public HostMessage(String name, int port) { + this.name = name; + this.port = port; + } + + public HostMessage(String name, int port, String connectionName) { + this.name = name; + this.port = port; + this.connectionName = connectionName; + } + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/topology/manager/IncomingHosts.java Tue Jan 17 00:40:27 2012 +0900 @@ -0,0 +1,48 @@ +package topology.manager; + +import java.io.IOException; +import java.util.HashMap; +import java.util.LinkedList; + +import org.apache.log4j.Logger; +import org.msgpack.MessagePack; + +import alice.codesegment.CodeSegment; +import alice.datasegment.CommandType; +import alice.datasegment.DataSegmentReceiver; +import alice.topology.HostMessage; + +public class IncomingHosts extends CodeSegment { + + HashMap<String, NodeInfo> topology; + LinkedList<String> nodeNames; + DataSegmentReceiver host = new DataSegmentReceiver(ids, CommandType.TAKE); + Logger logger = Logger.getLogger(IncomingHosts.class); + + public IncomingHosts(HashMap<String, NodeInfo> topology, LinkedList<String> nodeNames) { + this.topology = topology; + this.nodeNames = nodeNames; + } + + @Override + public void run() { + MessagePack msgpack = new MessagePack(); + try { + HostMessage host = msgpack.convert(this.host.val, HostMessage.class); + String nodeName = nodeNames.poll(); + + // TODO: send nodeName to node + + } catch (IOException e) { + logger.error("HostMessage format error"); + e.printStackTrace(); + } + + if (!nodeNames.isEmpty()) { + IncomingHosts cs = new IncomingHosts(topology, nodeNames); + cs.host.setKey("local", "host"); + } + + } + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/topology/manager/NodeInfo.java Tue Jan 17 00:40:27 2012 +0900 @@ -0,0 +1,14 @@ +package topology.manager; + +public class NodeInfo { + + public String sourceNodeName; + public String connectionName; + + public NodeInfo(String source, String connection) { + this.sourceNodeName = source; + this.connectionName = connection; + + } + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/topology/manager/StartTopologyManager.java Tue Jan 17 00:40:27 2012 +0900 @@ -0,0 +1,64 @@ +package topology.manager; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileReader; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedList; + +import org.apache.log4j.Logger; + +import com.alexmerz.graphviz.ParseException; +import com.alexmerz.graphviz.Parser; +import com.alexmerz.graphviz.objects.Edge; +import com.alexmerz.graphviz.objects.Graph; +import com.alexmerz.graphviz.objects.Node; + +import alice.codesegment.CodeSegment; + +public class StartTopologyManager extends CodeSegment { + + TopologyManagerConfig conf; + Logger logger = Logger.getLogger(StartTopologyManager.class); + + public StartTopologyManager(TopologyManagerConfig conf) { + conf = this.conf; + } + + @Override + public void run() { + LinkedList<String> nodeNames = new LinkedList<String>(); + HashMap<String, NodeInfo> topology = new HashMap<String, NodeInfo>(); + try { + FileReader reader = new FileReader(new File(conf.confFilePath)); + Parser parser = new Parser(); + parser.parse(reader); + ArrayList<Graph> graphs = parser.getGraphs(); + for (Graph graph : graphs) { + ArrayList<Node> nodes = graph.getNodes(false); + for (Node node : nodes) { + nodeNames.add(node.getId().getId()); + } + ArrayList<Edge> edges = graph.getEdges(); + for (Edge edge : edges) { + String connection = edge.getAttribute("label"); + String source = edge.getSource().getNode().getId().getId(); + String target = edge.getSource().getNode().getId().getId(); + topology.put(target, new NodeInfo(source, connection)); + } + } + + } catch (FileNotFoundException e) { + logger.error("File not found: " + conf.confFilePath); + e.printStackTrace(); + } catch (ParseException e) { + logger.error("File format error: " + conf.confFilePath); + e.printStackTrace(); + } + + IncomingHosts cs = new IncomingHosts(topology, nodeNames); + cs.host.setKey("local", "host"); + } + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/topology/manager/TopologyManager.java Tue Jan 17 00:40:27 2012 +0900 @@ -0,0 +1,13 @@ +package topology.manager; + +import alice.daemon.AliceDaemon; + +public class TopologyManager { + + public static void main(String[] args) { + TopologyManagerConfig conf = new TopologyManagerConfig(args); + new AliceDaemon(conf).listen(); + new StartTopologyManager(conf).execute(); + } + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/topology/manager/TopologyManagerConfig.java Tue Jan 17 00:40:27 2012 +0900 @@ -0,0 +1,18 @@ +package topology.manager; + +import alice.daemon.Config; + +public class TopologyManagerConfig extends Config { + + public String confFilePath; + + public TopologyManagerConfig(String[] args) { + super(args); + for (int i = 0; i < args.length; i++) { + if ("-conf".equals(args[i])) { + confFilePath = args[++i]; + } + } + } + +}