41
|
1 package christie.topology.manager;
|
|
2
|
|
3
|
|
4 import christie.annotation.Peek;
|
|
5 import christie.annotation.Take;
|
|
6 import christie.codegear.CodeGear;
|
|
7 import christie.codegear.CodeGearManager;
|
|
8 import christie.topology.HostMessage;
|
|
9 import org.apache.log4j.Logger;
|
|
10
|
|
11 import java.util.HashMap;
|
|
12 import java.util.LinkedList;
|
|
13
|
|
14 public class IncomingHosts extends CodeGear {
|
|
15
|
60
|
16 @Peek // Topology from parse file
|
44
|
17 HashMap<String, LinkedList<NodeInfo>> resultParse;
|
60
|
18 @Peek // nodeName list
|
|
19 LinkedList<String> nodeNames;
|
|
20 @Peek
|
|
21 HashMap<String, String> absCookieTable;
|
41
|
22
|
|
23 @Take // new coming host info
|
44
|
24 HostMessage newHost;
|
|
25 @Take
|
|
26 String MD5;
|
41
|
27
|
59
|
28
|
41
|
29 private Logger log = Logger.getLogger(IncomingHosts.class);
|
|
30
|
|
31 public IncomingHosts() {
|
|
32
|
|
33 }
|
|
34
|
|
35 @Override
|
57
|
36 protected void run(CodeGearManager cgm) {
|
41
|
37 // not have or match cookie
|
|
38 String nodeName = nodeNames.poll();
|
44
|
39
|
41
|
40 // Manager connect to Node
|
50
|
41 cgm.createRemoteDGM(nodeName, newHost.hostName, newHost.port);
|
41
|
42
|
44
|
43 absCookieTable.put(MD5, nodeName);
|
41
|
44
|
64
|
45 getDGM(nodeName).put( "nodeName", nodeName);
|
44
|
46 getDGM(nodeName).put("cookie", MD5);
|
41
|
47 log.info( "toplology manager connected from " + nodeName);
|
|
48
|
64
|
49
|
|
50
|
44
|
51 LinkedList<NodeInfo> nodeInfoList = resultParse.get(nodeName);
|
|
52 for (NodeInfo nodeInfo : nodeInfoList) {
|
50
|
53 HostMessage hostMessage = new HostMessage(newHost.hostName, newHost.port,
|
64
|
54 nodeInfo.connectionName, "");
|
|
55 hostMessage.nodeName = nodeName;
|
|
56 hostMessage.remoteNodeName = nodeInfo.targetNodeName;
|
41
|
57
|
44
|
58 getLocalDGM().put("nodeInfo", hostMessage);
|
64
|
59
|
44
|
60 log.info(" write to " + nodeInfo.sourceNodeName + " config message =" + hostMessage.toString() + " remain "
|
41
|
61 + Integer.toString((nodeNames.size())));
|
57
|
62
|
44
|
63 cgm.setup(new RecordTopology());
|
41
|
64 }
|
|
65
|
|
66 log.info(" remaining configure host = " + Integer.toString(nodeNames.size()));
|
|
67 if (nodeNames.isEmpty()) {
|
|
68 // configuration finish
|
44
|
69 for (String key : resultParse.keySet()) {
|
41
|
70 log.info(" write to " + key + " end message =" + (new HostMessage("",0,"","")).toString());
|
59
|
71 getLocalDGM().put("hostInfo", new HostMessage("",0,"","")); // end mark
|
41
|
72 }
|
|
73 }
|
|
74 cgm.setup(new IncomingHosts());
|
|
75 }
|
|
76 }
|