41
|
1 package christie.topology.manager;
|
|
2
|
|
3
|
|
4 import christie.annotation.Peek;
|
|
5 import christie.annotation.Take;
|
|
6 import christie.codegear.CodeGear;
|
|
7 import christie.codegear.CodeGearManager;
|
|
8 import christie.topology.HostMessage;
|
|
9 import org.apache.log4j.Logger;
|
|
10
|
|
11 import java.util.HashMap;
|
|
12 import java.util.LinkedList;
|
|
13
|
|
14 public class IncomingHosts extends CodeGear {
|
|
15
|
44
|
16 @Take // Topology from parse file
|
|
17 HashMap<String, LinkedList<NodeInfo>> resultParse;
|
41
|
18
|
44
|
19 @Take // nodeName list
|
41
|
20 LinkedList<String> nodeNames;
|
|
21
|
|
22 @Take // new coming host info
|
44
|
23 HostMessage newHost;
|
|
24
|
|
25 @Take
|
|
26 HashMap<String, String> absCookieTable;
|
|
27
|
|
28 @Take
|
|
29 String MD5;
|
41
|
30
|
|
31
|
|
32 private Logger log = Logger.getLogger(IncomingHosts.class);
|
|
33
|
|
34 public IncomingHosts() {
|
|
35
|
|
36 }
|
|
37
|
|
38 @Override
|
|
39 public void run(CodeGearManager cgm) {
|
|
40
|
|
41 // not have or match cookie
|
|
42 String nodeName = nodeNames.poll();
|
44
|
43
|
41
|
44 // Manager connect to Node
|
44
|
45 cgm.createRemoteDGM(nodeName, newHost.name, newHost.port);
|
41
|
46 getDGM(nodeName).put( "host", nodeName);
|
|
47
|
|
48
|
44
|
49 absCookieTable.put(MD5, nodeName);
|
|
50 getLocalDGM().put("absCookieTable", absCookieTable);
|
41
|
51
|
44
|
52 getDGM(nodeName).put("cookie", MD5);
|
41
|
53 log.info( "toplology manager connected from " + nodeName);
|
|
54
|
44
|
55 LinkedList<NodeInfo> nodeInfoList = resultParse.get(nodeName);
|
|
56 for (NodeInfo nodeInfo : nodeInfoList) {
|
|
57 HostMessage hostMessage = new HostMessage(newHost.name, newHost.port,
|
41
|
58 nodeInfo.connectionName, nodeInfo.reverseName);
|
44
|
59 hostMessage.absName = nodeName;
|
|
60 hostMessage.remoteAbsName = nodeInfo.sourceNodeName;
|
41
|
61
|
44
|
62 getLocalDGM().put("nodeInfo", hostMessage);
|
|
63 getLocalDGM().put(nodeInfo.sourceNodeName, hostMessage);
|
|
64 log.info(" write to " + nodeInfo.sourceNodeName + " config message =" + hostMessage.toString() + " remain "
|
41
|
65 + Integer.toString((nodeNames.size())));
|
44
|
66 cgm.setup(new RecordTopology());
|
41
|
67 }
|
|
68
|
|
69 log.info(" remaining configure host = " + Integer.toString(nodeNames.size()));
|
|
70 if (nodeNames.isEmpty()) {
|
|
71 // configuration finish
|
44
|
72 for (String key : resultParse.keySet()) {
|
41
|
73 log.info(" write to " + key + " end message =" + (new HostMessage("",0,"","")).toString());
|
|
74 getLocalDGM().put(key, new HostMessage("",0,"","")); // end mark
|
|
75 }
|
|
76 }
|
|
77
|
44
|
78
|
|
79 getLocalDGM().put("nodeNames", nodeNames);
|
|
80 getLocalDGM().put("resultParse", resultParse);
|
41
|
81
|
|
82 cgm.setup(new IncomingHosts());
|
|
83 }
|
|
84 }
|