changeset 22:2ca2d961a8d2

implements outline of TopologyManager
author kazz <kazz@cr.ie.u-ryukyu.ac.jp>
date Tue, 17 Jan 2012 00:40:27 +0900
parents 145667a554ad
children 54bf607118ae
files .classpath lib/com.alexmerz.graphviz.jar src/alice/daemon/AliceDaemon.java src/alice/daemon/Config.java src/alice/test/codesegment/RemoteIncrement.java src/alice/test/codesegment/RemoteStartCodeSegment.java src/alice/test/codesegment/StartCodeSegment.java src/alice/test/codesegment/TestCodeSegment.java src/alice/test/codesegment/TestLocalAlice.java src/alice/test/codesegment/TestRemoteAlice.java src/alice/test/codesegment/local/StartCodeSegment.java src/alice/test/codesegment/local/TestCodeSegment.java src/alice/test/codesegment/local/TestLocalAlice.java src/alice/test/codesegment/remote/RemoteIncrement.java src/alice/test/codesegment/remote/RemoteStartCodeSegment.java src/alice/test/codesegment/remote/TestRemoteAlice.java src/alice/test/codesegment/remote/TestRemoteConfig.java src/alice/topology/HostMessage.java src/topology/manager/IncomingHosts.java src/topology/manager/NodeInfo.java src/topology/manager/StartTopologyManager.java src/topology/manager/TopologyManager.java src/topology/manager/TopologyManagerConfig.java
diffstat 23 files changed, 354 insertions(+), 172 deletions(-) [+]
line wrap: on
line diff
--- a/.classpath	Sun Jan 15 19:02:01 2012 +0900
+++ b/.classpath	Tue Jan 17 00:40:27 2012 +0900
@@ -7,5 +7,6 @@
 	<classpathentry kind="lib" path="lib/slf4j-api-1.6.1.jar"/>
 	<classpathentry kind="lib" path="lib/slf4j-log4j12-1.6.1.jar"/>
 	<classpathentry kind="lib" path="lib/msgpack-0.6.5-SNAPSHOT.jar" sourcepath="lib/msgpack-0.6.5-SNAPSHOT-sources.jar"/>
+	<classpathentry kind="lib" path="lib/com.alexmerz.graphviz.jar"/>
 	<classpathentry kind="output" path="bin"/>
 </classpath>
Binary file lib/com.alexmerz.graphviz.jar has changed
--- a/src/alice/daemon/AliceDaemon.java	Sun Jan 15 19:02:01 2012 +0900
+++ b/src/alice/daemon/AliceDaemon.java	Tue Jan 17 00:40:27 2012 +0900
@@ -20,8 +20,8 @@
 			ServerSocketChannel ssChannel = ServerSocketChannel.open();
 			ServerSocket ss = ssChannel.socket();
 			ss.setReuseAddress(true);
-			ss.bind(new InetSocketAddress(InetAddress.getLocalHost(), conf.port));
-			acceptThread = new AcceptThread(ss, "ACCEPT" + conf.port);
+			ss.bind(new InetSocketAddress(InetAddress.getLocalHost(), conf.localPort));
+			acceptThread = new AcceptThread(ss, "ACCEPT" + conf.localPort);
 			acceptThread.start();
 			
 		} catch (IOException e) {
--- a/src/alice/daemon/Config.java	Sun Jan 15 19:02:01 2012 +0900
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,24 +0,0 @@
-package alice.daemon;
-
-public class Config {
-
-	public int port = 10000;
-	public String hostname;
-	public int connectPort = 10000;
-	public String key;
-
-	public Config(String[] args) {
-		for (int i = 0; i< args.length; i++) {
-			if ("-p".equals(args[i])) {
-				port = Integer.parseInt(args[++i]);
-			} else if ("-h".equals(args[i])) {
-				hostname = args[++i];
-			} else if ("-cp".equals(args[i])) {
-				connectPort = Integer.parseInt(args[++i]);
-			} else if ("-key".equals(args[i])) {
-				key = args[++i];
-			}
-		}
-	}
-
-}
--- a/src/alice/test/codesegment/RemoteIncrement.java	Sun Jan 15 19:02:01 2012 +0900
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,25 +0,0 @@
-package alice.test.codesegment;
-
-import org.msgpack.type.ValueFactory;
-
-import alice.codesegment.CodeSegment;
-import alice.datasegment.CommandType;
-import alice.datasegment.DataSegmentReceiver;
-
-public class RemoteIncrement extends CodeSegment {
-
-	public DataSegmentReceiver num = new DataSegmentReceiver(ids, CommandType.TAKE);
-
-	@Override
-	public void run() {
-		int num = this.num.val.asIntegerValue().getInt();
-		System.out.println("[CodeSegment] " + num++);
-		if (num == 10) System.exit(0);
-
-		RemoteIncrement cs = new RemoteIncrement();
-		cs.num.setKey("remote", "num");
-		
-		ods.put("local", "num", ValueFactory.createIntegerValue(num));
-	}
-
-}
--- a/src/alice/test/codesegment/RemoteStartCodeSegment.java	Sun Jan 15 19:02:01 2012 +0900
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,19 +0,0 @@
-package alice.test.codesegment;
-
-import org.msgpack.type.Value;
-import org.msgpack.type.ValueFactory;
-
-import alice.codesegment.CodeSegment;
-
-public class RemoteStartCodeSegment extends CodeSegment {
-	
-	@Override
-	public void run() {
-		RemoteIncrement cs = new RemoteIncrement();
-		cs.num.setKey("remote", "num");
-		
-		Value num = ValueFactory.createIntegerValue(0);
-		ods.put("local", "num", num);
-	}
-
-}
--- a/src/alice/test/codesegment/StartCodeSegment.java	Sun Jan 15 19:02:01 2012 +0900
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,22 +0,0 @@
-package alice.test.codesegment;
-
-import org.msgpack.type.Value;
-import org.msgpack.type.ValueFactory;
-
-import alice.codesegment.CodeSegment;
-
-public class StartCodeSegment extends CodeSegment {
-
-	@Override
-	public void run() {
-		System.out.println("run StartCodeSegment");
-		
-		TestCodeSegment cs = new TestCodeSegment();
-		cs.arg1.setKey("local", "key1");
-		System.out.println("create TestCodeSegment");
-		
-		Value val = ValueFactory.createRawValue("String data");
-		ods.update("local", "key1", val);
-	}
-
-}
--- a/src/alice/test/codesegment/TestCodeSegment.java	Sun Jan 15 19:02:01 2012 +0900
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,32 +0,0 @@
-package alice.test.codesegment;
-
-import org.msgpack.type.Value;
-import org.msgpack.type.ValueFactory;
-
-import alice.codesegment.CodeSegment;
-import alice.datasegment.CommandType;
-import alice.datasegment.DataSegmentReceiver;
-
-public class TestCodeSegment extends CodeSegment {
-	
-	DataSegmentReceiver arg1 = new DataSegmentReceiver(ids, CommandType.PEEK);
-	
-	@Override
-	public void run() {
-		System.out.println("index = " + arg1.index);
-		System.out.println("data = " + arg1.val);
-		System.out.println(arg1.val.getType());
-		
-		if (arg1.index == 10) {
-			System.exit(0);
-			return;
-		}
-		
-		TestCodeSegment cs = new TestCodeSegment();
-		cs.arg1.setKey("local", "key1", arg1.index);
-		
-		Value val = ValueFactory.createRawValue("String data");
-		ods.update("local", "key1", val);
-	}
-
-}
--- a/src/alice/test/codesegment/TestLocalAlice.java	Sun Jan 15 19:02:01 2012 +0900
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,8 +0,0 @@
-package alice.test.codesegment;
-
-public class TestLocalAlice {
-	public static void main(String args[]) {
-		new StartCodeSegment().execute();
-	}
-	
-}
--- a/src/alice/test/codesegment/TestRemoteAlice.java	Sun Jan 15 19:02:01 2012 +0900
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,40 +0,0 @@
-package alice.test.codesegment;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.nio.channels.SocketChannel;
-
-import alice.daemon.AliceDaemon;
-import alice.daemon.Config;
-import alice.daemon.Connection;
-import alice.datasegment.DataSegment;
-import alice.datasegment.RemoteDataSegmentManager;
-
-public class TestRemoteAlice {
-
-	public static void main(String[] args) {
-		Config conf = new Config(args);
-
-		new AliceDaemon(conf).listen();
-		
-		boolean connect = true;
-		do {
-			try {
-				SocketChannel sc = SocketChannel.open(new InetSocketAddress(conf.hostname, conf.connectPort));
-				Connection connection = new Connection(sc.socket());
-				RemoteDataSegmentManager manager = new RemoteDataSegmentManager(connection);
-				DataSegment.regist(conf.key, manager);
-				connect = false;
-			} catch (IOException e) {
-				try {
-					Thread.sleep(500);
-				} catch (InterruptedException e1) {
-					e1.printStackTrace();
-				}
-			}
-		} while (connect);
-		
-		new RemoteStartCodeSegment().execute();
-	}
-
-}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/alice/test/codesegment/local/StartCodeSegment.java	Tue Jan 17 00:40:27 2012 +0900
@@ -0,0 +1,22 @@
+package alice.test.codesegment.local;
+
+import org.msgpack.type.Value;
+import org.msgpack.type.ValueFactory;
+
+import alice.codesegment.CodeSegment;
+
+public class StartCodeSegment extends CodeSegment {
+
+	@Override
+	public void run() {
+		System.out.println("run StartCodeSegment");
+		
+		TestCodeSegment cs = new TestCodeSegment();
+		cs.arg1.setKey("local", "key1");
+		System.out.println("create TestCodeSegment");
+		
+		Value val = ValueFactory.createRawValue("String data");
+		ods.update("local", "key1", val);
+	}
+
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/alice/test/codesegment/local/TestCodeSegment.java	Tue Jan 17 00:40:27 2012 +0900
@@ -0,0 +1,32 @@
+package alice.test.codesegment.local;
+
+import org.msgpack.type.Value;
+import org.msgpack.type.ValueFactory;
+
+import alice.codesegment.CodeSegment;
+import alice.datasegment.CommandType;
+import alice.datasegment.DataSegmentReceiver;
+
+public class TestCodeSegment extends CodeSegment {
+	
+	DataSegmentReceiver arg1 = new DataSegmentReceiver(ids, CommandType.PEEK);
+	
+	@Override
+	public void run() {
+		System.out.println("index = " + arg1.index);
+		System.out.println("data = " + arg1.val);
+		System.out.println(arg1.val.getType());
+		
+		if (arg1.index == 10) {
+			System.exit(0);
+			return;
+		}
+		
+		TestCodeSegment cs = new TestCodeSegment();
+		cs.arg1.setKey("local", "key1", arg1.index);
+		
+		Value val = ValueFactory.createRawValue("String data");
+		ods.update("local", "key1", val);
+	}
+
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/alice/test/codesegment/local/TestLocalAlice.java	Tue Jan 17 00:40:27 2012 +0900
@@ -0,0 +1,8 @@
+package alice.test.codesegment.local;
+
+public class TestLocalAlice {
+	public static void main(String args[]) {
+		new StartCodeSegment().execute();
+	}
+	
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/alice/test/codesegment/remote/RemoteIncrement.java	Tue Jan 17 00:40:27 2012 +0900
@@ -0,0 +1,25 @@
+package alice.test.codesegment.remote;
+
+import org.msgpack.type.ValueFactory;
+
+import alice.codesegment.CodeSegment;
+import alice.datasegment.CommandType;
+import alice.datasegment.DataSegmentReceiver;
+
+public class RemoteIncrement extends CodeSegment {
+
+	public DataSegmentReceiver num = new DataSegmentReceiver(ids, CommandType.TAKE);
+
+	@Override
+	public void run() {
+		int num = this.num.val.asIntegerValue().getInt();
+		System.out.println("[CodeSegment] " + num++);
+		if (num == 10) System.exit(0);
+
+		RemoteIncrement cs = new RemoteIncrement();
+		cs.num.setKey("remote", "num");
+		
+		ods.put("local", "num", ValueFactory.createIntegerValue(num));
+	}
+
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/alice/test/codesegment/remote/RemoteStartCodeSegment.java	Tue Jan 17 00:40:27 2012 +0900
@@ -0,0 +1,19 @@
+package alice.test.codesegment.remote;
+
+import org.msgpack.type.Value;
+import org.msgpack.type.ValueFactory;
+
+import alice.codesegment.CodeSegment;
+
+public class RemoteStartCodeSegment extends CodeSegment {
+	
+	@Override
+	public void run() {
+		RemoteIncrement cs = new RemoteIncrement();
+		cs.num.setKey("remote", "num");
+		
+		Value num = ValueFactory.createIntegerValue(0);
+		ods.put("local", "num", num);
+	}
+
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/alice/test/codesegment/remote/TestRemoteAlice.java	Tue Jan 17 00:40:27 2012 +0900
@@ -0,0 +1,39 @@
+package alice.test.codesegment.remote;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.channels.SocketChannel;
+
+import alice.daemon.AliceDaemon;
+import alice.daemon.Connection;
+import alice.datasegment.DataSegment;
+import alice.datasegment.RemoteDataSegmentManager;
+
+public class TestRemoteAlice {
+
+	public static void main(String[] args) {
+		TestRemoteConfig conf = new TestRemoteConfig(args);
+
+		new AliceDaemon(conf).listen();
+		
+		boolean connect = true;
+		do {
+			try {
+				SocketChannel sc = SocketChannel.open(new InetSocketAddress(conf.hostname, conf.connectPort));
+				Connection connection = new Connection(sc.socket());
+				RemoteDataSegmentManager manager = new RemoteDataSegmentManager(connection);
+				DataSegment.regist(conf.key, manager);
+				connect = false;
+			} catch (IOException e) {
+				try {
+					Thread.sleep(500);
+				} catch (InterruptedException e1) {
+					e1.printStackTrace();
+				}
+			}
+		} while (connect);
+		
+		new RemoteStartCodeSegment().execute();
+	}
+
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/alice/test/codesegment/remote/TestRemoteConfig.java	Tue Jan 17 00:40:27 2012 +0900
@@ -0,0 +1,24 @@
+package alice.test.codesegment.remote;
+
+import alice.daemon.Config;
+
+public class TestRemoteConfig extends Config {
+
+	public String hostname;
+	public int connectPort = 10000;
+	public String key;
+
+	public TestRemoteConfig(String[] args) {
+		super(args);
+		for (int i = 0; i< args.length; i++) {
+			if ("-h".equals(args[i])) {
+				hostname = args[++i];
+			} else if ("-cp".equals(args[i])) {
+				connectPort = Integer.parseInt(args[++i]);
+			} else if ("-key".equals(args[i])) {
+				key = args[++i];
+			}
+		}
+	}
+
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/alice/topology/HostMessage.java	Tue Jan 17 00:40:27 2012 +0900
@@ -0,0 +1,25 @@
+package alice.topology;
+
+import org.msgpack.annotation.Message;
+import org.msgpack.annotation.Optional;
+
+@Message
+public class HostMessage {
+	
+	public String name;
+	public int port;
+	@Optional public String connectionName;
+	
+	public HostMessage() { }
+	public HostMessage(String name, int port) {
+		this.name = name;
+		this.port = port;
+	}
+
+	public HostMessage(String name, int port, String connectionName) {
+		this.name = name;
+		this.port = port;
+		this.connectionName = connectionName;
+	}
+	
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/topology/manager/IncomingHosts.java	Tue Jan 17 00:40:27 2012 +0900
@@ -0,0 +1,48 @@
+package topology.manager;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.LinkedList;
+
+import org.apache.log4j.Logger;
+import org.msgpack.MessagePack;
+
+import alice.codesegment.CodeSegment;
+import alice.datasegment.CommandType;
+import alice.datasegment.DataSegmentReceiver;
+import alice.topology.HostMessage;
+
+public class IncomingHosts extends CodeSegment {
+	
+	HashMap<String, NodeInfo> topology;
+	LinkedList<String> nodeNames;
+	DataSegmentReceiver host = new DataSegmentReceiver(ids, CommandType.TAKE);
+	Logger logger = Logger.getLogger(IncomingHosts.class);
+	
+	public IncomingHosts(HashMap<String, NodeInfo> topology, LinkedList<String> nodeNames) {
+		this.topology = topology;
+		this.nodeNames = nodeNames;
+	}
+
+	@Override
+	public void run() {
+		MessagePack msgpack = new MessagePack();
+		try {
+			HostMessage host = msgpack.convert(this.host.val, HostMessage.class);
+			String nodeName = nodeNames.poll();
+			
+			// TODO: send nodeName to node
+			
+		} catch (IOException e) {
+			logger.error("HostMessage format error");
+			e.printStackTrace();
+		}
+		
+		if (!nodeNames.isEmpty()) {
+			IncomingHosts cs = new IncomingHosts(topology, nodeNames);
+			cs.host.setKey("local", "host");
+		}
+
+	}
+
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/topology/manager/NodeInfo.java	Tue Jan 17 00:40:27 2012 +0900
@@ -0,0 +1,14 @@
+package topology.manager;
+
+public class NodeInfo {
+	
+	public String sourceNodeName;
+	public String connectionName;
+	
+	public NodeInfo(String source, String connection) {
+		this.sourceNodeName = source;
+		this.connectionName = connection;
+		
+	}
+	
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/topology/manager/StartTopologyManager.java	Tue Jan 17 00:40:27 2012 +0900
@@ -0,0 +1,64 @@
+package topology.manager;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+
+import org.apache.log4j.Logger;
+
+import com.alexmerz.graphviz.ParseException;
+import com.alexmerz.graphviz.Parser;
+import com.alexmerz.graphviz.objects.Edge;
+import com.alexmerz.graphviz.objects.Graph;
+import com.alexmerz.graphviz.objects.Node;
+
+import alice.codesegment.CodeSegment;
+
+public class StartTopologyManager extends CodeSegment {
+
+	TopologyManagerConfig conf;
+	Logger logger = Logger.getLogger(StartTopologyManager.class);
+
+	public StartTopologyManager(TopologyManagerConfig conf) {
+		conf = this.conf;
+	}
+
+	@Override
+	public void run() {
+		LinkedList<String> nodeNames = new LinkedList<String>();
+		HashMap<String, NodeInfo> topology = new HashMap<String, NodeInfo>();
+		try {
+			FileReader reader = new FileReader(new File(conf.confFilePath));
+			Parser parser = new Parser();
+			parser.parse(reader);
+			ArrayList<Graph> graphs = parser.getGraphs();
+			for (Graph graph : graphs) {
+				ArrayList<Node> nodes = graph.getNodes(false);
+				for (Node node : nodes) {
+					nodeNames.add(node.getId().getId());
+				}
+				ArrayList<Edge> edges = graph.getEdges();
+				for (Edge edge : edges) {
+					String connection = edge.getAttribute("label");
+					String source = edge.getSource().getNode().getId().getId();
+					String target = edge.getSource().getNode().getId().getId();
+					topology.put(target, new NodeInfo(source, connection));
+				}
+			}
+			
+		} catch (FileNotFoundException e) {
+			logger.error("File not found: " + conf.confFilePath);
+			e.printStackTrace();
+		} catch (ParseException e) {
+			logger.error("File format error: " + conf.confFilePath);
+			e.printStackTrace();
+		}
+		
+		IncomingHosts cs = new IncomingHosts(topology, nodeNames);
+		cs.host.setKey("local", "host");
+	}
+
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/topology/manager/TopologyManager.java	Tue Jan 17 00:40:27 2012 +0900
@@ -0,0 +1,13 @@
+package topology.manager;
+
+import alice.daemon.AliceDaemon;
+
+public class TopologyManager {
+
+	public static void main(String[] args) {
+		TopologyManagerConfig conf = new TopologyManagerConfig(args);
+		new AliceDaemon(conf).listen();
+		new StartTopologyManager(conf).execute();
+	}
+
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/topology/manager/TopologyManagerConfig.java	Tue Jan 17 00:40:27 2012 +0900
@@ -0,0 +1,18 @@
+package topology.manager;
+
+import alice.daemon.Config;
+
+public class TopologyManagerConfig extends Config {
+	
+	public String confFilePath;
+	
+	public TopologyManagerConfig(String[] args) {
+		super(args);
+		for (int i = 0; i < args.length; i++) {
+			if ("-conf".equals(args[i])) {
+				confFilePath = args[++i];
+			}
+		}
+	}
+
+}