41
|
1 package christie.topology.manager;
|
|
2
|
|
3
|
|
4 import christie.annotation.Peek;
|
|
5 import christie.annotation.Take;
|
|
6 import christie.codegear.CodeGear;
|
|
7 import christie.codegear.CodeGearManager;
|
|
8 import christie.topology.HostMessage;
|
|
9 import org.apache.log4j.Logger;
|
|
10
|
|
11 import java.util.HashMap;
|
|
12 import java.util.LinkedList;
|
|
13
|
|
14 public class IncomingHosts extends CodeGear {
|
|
15
|
60
|
16 @Peek // Topology from parse file
|
44
|
17 HashMap<String, LinkedList<NodeInfo>> resultParse;
|
60
|
18 @Peek // nodeName list
|
|
19 LinkedList<String> nodeNames;
|
|
20 @Peek
|
|
21 HashMap<String, String> absCookieTable;
|
41
|
22
|
|
23 @Take // new coming host info
|
44
|
24 HostMessage newHost;
|
|
25 @Take
|
|
26 String MD5;
|
41
|
27
|
59
|
28
|
41
|
29 private Logger log = Logger.getLogger(IncomingHosts.class);
|
|
30
|
|
31 public IncomingHosts() {
|
|
32
|
|
33 }
|
|
34
|
|
35 @Override
|
57
|
36 protected void run(CodeGearManager cgm) {
|
41
|
37 // not have or match cookie
|
|
38 String nodeName = nodeNames.poll();
|
44
|
39
|
41
|
40 // Manager connect to Node
|
50
|
41 cgm.createRemoteDGM(nodeName, newHost.hostName, newHost.port);
|
41
|
42
|
44
|
43 absCookieTable.put(MD5, nodeName);
|
41
|
44
|
64
|
45 getDGM(nodeName).put( "nodeName", nodeName);
|
44
|
46 getDGM(nodeName).put("cookie", MD5);
|
41
|
47 log.info( "toplology manager connected from " + nodeName);
|
|
48
|
44
|
49 LinkedList<NodeInfo> nodeInfoList = resultParse.get(nodeName);
|
68
|
50 getDGM(nodeName).put("connectNodeNum", nodeInfoList.size());
|
44
|
51 for (NodeInfo nodeInfo : nodeInfoList) {
|
50
|
52 HostMessage hostMessage = new HostMessage(newHost.hostName, newHost.port,
|
64
|
53 nodeInfo.connectionName, "");
|
|
54 hostMessage.nodeName = nodeName;
|
|
55 hostMessage.remoteNodeName = nodeInfo.targetNodeName;
|
41
|
56
|
44
|
57 getLocalDGM().put("nodeInfo", hostMessage);
|
64
|
58
|
44
|
59 log.info(" write to " + nodeInfo.sourceNodeName + " config message =" + hostMessage.toString() + " remain "
|
41
|
60 + Integer.toString((nodeNames.size())));
|
57
|
61
|
68
|
62
|
44
|
63 cgm.setup(new RecordTopology());
|
41
|
64 }
|
|
65
|
|
66 log.info(" remaining configure host = " + Integer.toString(nodeNames.size()));
|
68
|
67
|
41
|
68 cgm.setup(new IncomingHosts());
|
|
69 }
|
|
70 }
|