# HG changeset patch # User akahori # Date 1533026792 -32400 # Node ID cf5a75bc3e5544514fce93f2e9de78b387d86595 # Parent 342931aea0b8ff992492bff74e17da68a45ea5f4 add diff -r 342931aea0b8 -r cf5a75bc3e55 src/main/java/christie/topology/manager/CheckComingHost.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/main/java/christie/topology/manager/CheckComingHost.java Tue Jul 31 17:46:32 2018 +0900 @@ -0,0 +1,52 @@ +package christie.topology.manager; + +import christie.annotation.Peek; +import christie.annotation.Take; +import christie.codegear.CodeGear; +import christie.codegear.CodeGearManager; +import christie.topology.HostMessage; + +import java.util.HashMap; + +public class CheckComingHost extends CodeGear { + + @Take + HostMessage hostMessage; + + @Peek + HashMap absCookieTable; + + public CheckComingHost(){ + } + + @Override + public void run(CodeGearManager cgm) { + + boolean match = false; + // check cookie + if (hostMessage.cookie != null) { + if (absCookieTable.containsKey(hostMessage.cookie)){ + match = true; + hostMessage.absName = absCookieTable.get(hostMessage.cookie); + System.out.println("match"); + } + } + + if (match){ + // coming host has ever joined this App + getLocalDGM().put("reconnectHost", hostMessage); + + // TODO: これから実装する + //new SearchHostName(); + } else { + getLocalDGM().put("orderHash", "order"); + getLocalDGM().put("newHost", hostMessage); + } + + cgm.setup(new CheckComingHost()); + } + + + + +} diff -r 342931aea0b8 -r cf5a75bc3e55 src/main/java/christie/topology/manager/ConfigWaiter.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/main/java/christie/topology/manager/ConfigWaiter.java Tue Jul 31 17:46:32 2018 +0900 @@ -0,0 +1,43 @@ +package christie.topology.manager; + + +import christie.annotation.Take; +import christie.codegear.CodeGear; +import christie.codegear.CodeGearManager; +import org.msgpack.type.ValueFactory; + +public class ConfigWaiter extends CodeGear { + + + // Question; done 変数がわからない...何やっているんだろう. + // public Receiver done = ids.create(CommandType.TAKE); + @Take + boolean done; + + @Take + int nodeNum; + + + + public ConfigWaiter() { } + + @Override + public void run(CodeGearManager cgm) { + nodeNum--; + if (nodeNum == 0) { + put("local", "start", ValueFactory.createNilValue()); + put("startTime", System.currentTimeMillis()); + put("done", true); + //update("running", true); + + return; + } + //ConfigWaiter cs3 = new ConfigWaiter(count); + //cs3.done.setKey("local", "done"); + + cgm.setup(new ConfigWaiter()); + getLocalDGM().put("nodeNum", nodeNum); + getLocalDGM().put("done", false); + } + +} diff -r 342931aea0b8 -r cf5a75bc3e55 src/main/java/christie/topology/manager/IncomingHosts.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/main/java/christie/topology/manager/IncomingHosts.java Tue Jul 31 17:46:32 2018 +0900 @@ -0,0 +1,84 @@ +package christie.topology.manager; + + +import christie.annotation.Peek; +import christie.annotation.Take; +import christie.codegear.CodeGear; +import christie.codegear.CodeGearManager; +import christie.topology.HostMessage; +import org.apache.log4j.Logger; + +import java.util.HashMap; +import java.util.LinkedList; + +public class IncomingHosts extends CodeGear { + + @Peek // Topology from parse file + HashMap> topology; + + @Peek // nodeName list + LinkedList nodeNames; + + @Take // new coming host info + HostMessage host; + + + //private Receiver absCookieTable = ids.create(CommandType.TAKE); // cookie, AbsName HashMap + //private Receiver cookie = ids.create(CommandType.TAKE); // MD5 + private Logger log = Logger.getLogger(IncomingHosts.class); + + public IncomingHosts() { + + } + + @Override + public void run(CodeGearManager cgm) { + + // not have or match cookie + String nodeName = nodeNames.poll(); + // Manager connect to Node + + cgm.createRemoteDGM(nodeName, host.name, host.port); + getDGM(nodeName).put( "host", nodeName); + + + /* cookie + String cookie = this.cookie.asString(); + absCookieTable.put(cookie, nodeName); + ods.put(this.absCookieTable.key, absCookieTable); + + ods.put(nodeName, "cookie", cookie); + */ + log.info( "toplology manager connected from " + nodeName); + + LinkedList nodes = topology.get(nodeName); + for (NodeInfo nodeInfo : nodes) { + HostMessage newHost = new HostMessage(host.name, host.port, + nodeInfo.connectionName, nodeInfo.reverseName); + newHost.absName = nodeName; + newHost.remoteAbsName = nodeInfo.sourceNodeName; + + getLocalDGM().put("nodeInfo", newHost); + getLocalDGM().put(nodeInfo.sourceNodeName, newHost); + log.info(" write to " + nodeInfo.sourceNodeName + " config message =" + newHost.toString() + " remain " + + Integer.toString((nodeNames.size()))); + new RecordTopology(); + } + + log.info(" remaining configure host = " + Integer.toString(nodeNames.size())); + if (nodeNames.isEmpty()) { + // configuration finish + for (String key : topology.keySet()) { + log.info(" write to " + key + " end message =" + (new HostMessage("",0,"","")).toString()); + getLocalDGM().put(key, new HostMessage("",0,"","")); // end mark + } + } + + // idsのときはkeyメソッドが使えたけど, 今はlistなのでエラーが出る. + // これが何をする処理か読めていないので, 今はコメントアウト. + //getLocalDGM().put(this.nodeNames.key, nodeNames); + //getLocalDGM().put(this.topology.key, topology); + + cgm.setup(new IncomingHosts()); + } +} diff -r 342931aea0b8 -r cf5a75bc3e55 src/main/java/christie/topology/manager/NodeInfo.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/main/java/christie/topology/manager/NodeInfo.java Tue Jul 31 17:46:32 2018 +0900 @@ -0,0 +1,15 @@ +package christie.topology.manager; + +public class NodeInfo { + + public String sourceNodeName; + public String connectionName; + public String reverseName; + + + public NodeInfo(String source, String connection) { + this.sourceNodeName = source; + this.connectionName = connection; + } + +} diff -r 342931aea0b8 -r cf5a75bc3e55 src/main/java/christie/topology/manager/RecordTopology.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/main/java/christie/topology/manager/RecordTopology.java Tue Jul 31 17:46:32 2018 +0900 @@ -0,0 +1,44 @@ +package christie.topology.manager; + + +import christie.annotation.Peek; +import christie.annotation.Take; +import christie.codegear.CodeGear; +import christie.codegear.CodeGearManager; +import christie.topology.HostMessage; + +import java.util.HashMap; +import java.util.LinkedList; + +public class RecordTopology extends CodeGear { + + @Take + HostMessage hostInfo; + + @Peek + HashMap> topology; + public RecordTopology(){ } + + @Override + public void run(CodeGearManager cgm) { + LinkedList connections; + if (!topology.containsKey(hostInfo.remoteAbsName)) { + connections = new LinkedList(); + } else { + connections = topology.get(hostInfo.remoteAbsName); + } + connections.add(hostInfo); + topology.put(hostInfo.remoteAbsName, connections); +// need debug option +// for (LinkedList list :topology.values()){ +// System.out.print(list.get(0).remoteAbsName+" : "); +// for (HostMessage host : list){ +// System.out.print("[ "+host.absName+" "+host.name+" "+host.port+" "+host.connectionName+" "+host.reverseName+" "+host.remoteAbsName+" ]"); +// } +// System.out.println(); +// } + + //ods.update(info1.key, topology); + } + +} diff -r 342931aea0b8 -r cf5a75bc3e55 src/main/java/christie/topology/manager/TopologyManager.java --- a/src/main/java/christie/topology/manager/TopologyManager.java Tue Jul 31 16:12:27 2018 +0900 +++ b/src/main/java/christie/topology/manager/TopologyManager.java Tue Jul 31 17:46:32 2018 +0900 @@ -1,14 +1,123 @@ package christie.topology.manager; +import christie.annotation.Peek; import christie.codegear.CodeGear; import christie.codegear.CodeGearManager; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileReader; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedList; + +import org.apache.log4j.Logger; + +import com.alexmerz.graphviz.ParseException; +import com.alexmerz.graphviz.Parser; +import com.alexmerz.graphviz.objects.*; + public class TopologyManager extends CodeGear { + @Peek + TopologyManagerConfig config; + + Logger logger = Logger.getLogger(TopologyManager.class); + + @Override protected void run(CodeGearManager cgm) { + cgm.setup(new CheckComingHost()); + // if (!conf.dynamic) は, conf.dynamic = trueの動作がわからないので, 省いた. + + LinkedList nodeNames = new LinkedList(); + HashMap> topology = new HashMap>(); + int nodeNum = 0; + + try { + FileReader reader = new FileReader(new File(config.confFilePath)); + Parser parser = new Parser(); + parser.parse(reader); + + + ArrayList digraphs = parser.getGraphs(); + + + for (Graph digraph : digraphs) { + ArrayList nodes = digraph.getNodes(false); + nodeNum = nodes.size(); + + for (Node node : nodes) { + String nodeName = node.getId().getId(); + nodeNames.add(nodeName); + topology.put(nodeName, new LinkedList()); + } + + ArrayList edges = digraph.getEdges(); + HashMap hash = new HashMap(); + + String connection; + String source; + String target; + + NodeInfo nodeInfo; + + // まず1回グラフを読み込む + for (Edge edge : edges) { + connection = edge.getAttribute("label"); + source = edge.getSource().getNode().getId().getId(); + target = edge.getTarget().getNode().getId().getId(); + nodeInfo = new NodeInfo(source, connection); + + //Question: この下の2行はどんな役目? + //LinkedList sources = topology.get(target); + //sources.add(nodeInfo); // addしてその後の使い道は...? + + hash.put(source + "," + target, nodeInfo); + } + + // hash.get(target + "," + source); をして, グラフを逆にたどって, reverseNameにlabelを入れてる. + for (Edge edge : edges) { + connection = edge.getAttribute("label"); + source = edge.getSource().getNode().getId().getId(); + target = edge.getTarget().getNode().getId().getId(); + nodeInfo = hash.get(target + "," + source); + + if (nodeInfo != null) { + nodeInfo.reverseName = connection; + } + + } + } + + } catch (FileNotFoundException e) { + logger.error("File not found: " + config.confFilePath); + e.printStackTrace(); + } catch (ParseException e) { + logger.error("File format error: " + config.confFilePath); + e.printStackTrace(); + } + + // for recode topology information + // cookie List + getLocalDGM().put("running", false); + getLocalDGM().put("resultParse", topology); + getLocalDGM().put("nodeNames", nodeNames); + + cgm.setup(new IncomingHosts()); + + + + // Question: この処理何をやっているのかわからない. 一応, その下にそれっぽいコードを書いた. + // ConfigWaiter cs3 = new ConfigWaiter(nodeNum); + // cs3.done.setKey("local", "done"); + + cgm.setup(new ConfigWaiter()); + getLocalDGM().put("nodeNum", nodeNum); + getLocalDGM().put("done", false); } + } diff -r 342931aea0b8 -r cf5a75bc3e55 src/main/java/christie/topology/manager/TopologyManagerConfig.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/main/java/christie/topology/manager/TopologyManagerConfig.java Tue Jul 31 17:46:32 2018 +0900 @@ -0,0 +1,27 @@ +package christie.topology.manager; + +public class TopologyManagerConfig { + public boolean showTime = false; + public String confFilePath; + public TopologyType type = TopologyType.Tree; + public int hasChild = 2; + + public TopologyManagerConfig(String[] args) { + + for (int i = 0; i < args.length; i++) { + if ("-conf".equals(args[i])) { + confFilePath = args[++i]; + } else if ("--Topology".equals(args[i])) { + String typeName = args[++i]; + if ("tree".equals(typeName)) { + type = TopologyType.Tree; + } + } else if ("--Child".equals(args[i])) { + hasChild = Integer.parseInt(args[++i]); + } else if ("--showTime".equals(args[i])) { + showTime = true; + } + } + + } +} diff -r 342931aea0b8 -r cf5a75bc3e55 src/main/java/christie/topology/manager/TopologyType.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/main/java/christie/topology/manager/TopologyType.java Tue Jul 31 17:46:32 2018 +0900 @@ -0,0 +1,15 @@ +package christie.topology.manager; + +public enum TopologyType { + Tree(1); + + + private final int id; + private TopologyType(final int id) { + this.id = id; + } + + public int getId() { + return id; + } +}