41
|
1 package christie.topology.manager;
|
|
2
|
|
3
|
|
4 import christie.annotation.Peek;
|
|
5 import christie.annotation.Take;
|
|
6 import christie.codegear.CodeGear;
|
|
7 import christie.codegear.CodeGearManager;
|
|
8 import christie.topology.HostMessage;
|
|
9 import org.apache.log4j.Logger;
|
|
10
|
|
11 import java.util.HashMap;
|
|
12 import java.util.LinkedList;
|
|
13
|
|
14 public class IncomingHosts extends CodeGear {
|
|
15
|
44
|
16 @Take // Topology from parse file
|
|
17 HashMap<String, LinkedList<NodeInfo>> resultParse;
|
41
|
18
|
44
|
19 @Take // nodeName list
|
41
|
20 LinkedList<String> nodeNames;
|
|
21
|
|
22 @Take // new coming host info
|
44
|
23 HostMessage newHost;
|
|
24
|
50
|
25 //@Take
|
|
26 @Peek
|
44
|
27 HashMap<String, String> absCookieTable;
|
|
28
|
|
29 @Take
|
|
30 String MD5;
|
41
|
31
|
|
32
|
|
33 private Logger log = Logger.getLogger(IncomingHosts.class);
|
|
34
|
|
35 public IncomingHosts() {
|
|
36
|
|
37 }
|
|
38
|
|
39 @Override
|
|
40 public void run(CodeGearManager cgm) {
|
|
41
|
|
42 // not have or match cookie
|
|
43 String nodeName = nodeNames.poll();
|
44
|
44
|
41
|
45 // Manager connect to Node
|
50
|
46 cgm.createRemoteDGM(nodeName, newHost.hostName, newHost.port);
|
41
|
47 getDGM(nodeName).put( "host", nodeName);
|
|
48
|
|
49
|
44
|
50 absCookieTable.put(MD5, nodeName);
|
50
|
51 //getLocalDGM().put("absCookieTable", absCookieTable);
|
41
|
52
|
44
|
53 getDGM(nodeName).put("cookie", MD5);
|
41
|
54 log.info( "toplology manager connected from " + nodeName);
|
|
55
|
44
|
56 LinkedList<NodeInfo> nodeInfoList = resultParse.get(nodeName);
|
|
57 for (NodeInfo nodeInfo : nodeInfoList) {
|
50
|
58 HostMessage hostMessage = new HostMessage(newHost.hostName, newHost.port,
|
41
|
59 nodeInfo.connectionName, nodeInfo.reverseName);
|
44
|
60 hostMessage.absName = nodeName;
|
|
61 hostMessage.remoteAbsName = nodeInfo.sourceNodeName;
|
41
|
62
|
44
|
63 getLocalDGM().put("nodeInfo", hostMessage);
|
|
64 getLocalDGM().put(nodeInfo.sourceNodeName, hostMessage);
|
|
65 log.info(" write to " + nodeInfo.sourceNodeName + " config message =" + hostMessage.toString() + " remain "
|
41
|
66 + Integer.toString((nodeNames.size())));
|
44
|
67 cgm.setup(new RecordTopology());
|
41
|
68 }
|
|
69
|
|
70 log.info(" remaining configure host = " + Integer.toString(nodeNames.size()));
|
|
71 if (nodeNames.isEmpty()) {
|
|
72 // configuration finish
|
44
|
73 for (String key : resultParse.keySet()) {
|
41
|
74 log.info(" write to " + key + " end message =" + (new HostMessage("",0,"","")).toString());
|
|
75 getLocalDGM().put(key, new HostMessage("",0,"","")); // end mark
|
|
76 }
|
|
77 }
|
|
78
|
44
|
79
|
|
80 getLocalDGM().put("nodeNames", nodeNames);
|
|
81 getLocalDGM().put("resultParse", resultParse);
|
41
|
82
|
|
83 cgm.setup(new IncomingHosts());
|
|
84 }
|
|
85 }
|