41
|
1 package christie.topology.manager;
|
|
2
|
|
3
|
|
4 import christie.annotation.Peek;
|
|
5 import christie.annotation.Take;
|
|
6 import christie.codegear.CodeGear;
|
|
7 import christie.codegear.CodeGearManager;
|
|
8 import christie.topology.HostMessage;
|
176
|
9 import christie.topology.Message;
|
161
|
10 import christie.topology.TopologyDataGear;
|
41
|
11
|
|
12 import java.util.HashMap;
|
|
13 import java.util.LinkedList;
|
|
14
|
|
15 public class IncomingHosts extends CodeGear {
|
|
16
|
129
|
17 @Take
|
94
|
18 HashMap<String, LinkedList<HostMessage>> resultParse;
|
129
|
19 @Take
|
60
|
20 LinkedList<String> nodeNames;
|
129
|
21 @Take
|
60
|
22 HashMap<String, String> absCookieTable;
|
41
|
23
|
|
24 @Take // new coming host info
|
44
|
25 HostMessage newHost;
|
|
26 @Take
|
|
27 String MD5;
|
41
|
28
|
170
|
29 @Take
|
|
30 HashMap<String, LinkedList<HostMessage>> topology;
|
|
31
|
160
|
32 @Peek
|
161
|
33 TopologyDataGear topoDG;
|
158
|
34
|
41
|
35 public IncomingHosts() {
|
|
36
|
|
37 }
|
|
38
|
|
39 @Override
|
57
|
40 protected void run(CodeGearManager cgm) {
|
41
|
41 // not have or match cookie
|
|
42 String nodeName = nodeNames.poll();
|
129
|
43 getLocalDGM().put("nodeNames", nodeNames);
|
44
|
44
|
41
|
45 // Manager connect to Node
|
158
|
46 cgm.createRemoteDGM(nodeName,
|
|
47 newHost.getHostName(),
|
|
48 newHost.getPort());
|
41
|
49
|
44
|
50 absCookieTable.put(MD5, nodeName);
|
129
|
51 getLocalDGM().put("absCookieTable", absCookieTable);
|
41
|
52
|
44
|
53 getDGM(nodeName).put("cookie", MD5);
|
41
|
54
|
94
|
55 LinkedList<HostMessage> nodeInfoList = resultParse.get(nodeName);
|
170
|
56 put(nodeName, "connectNodeNum", nodeInfoList.size());
|
176
|
57 if(nodeInfoList.size() == 0) put(nodeName,"_CONNECTIONMESSAGE", new Message());
|
170
|
58
|
94
|
59 for (HostMessage nodeInfo : nodeInfoList) {
|
158
|
60
|
170
|
61
|
158
|
62 nodeInfo.setHostAndPort(newHost);
|
41
|
63
|
170
|
64 //getLocalDGM().put("nodeInfo", nodeInfo);
|
|
65
|
|
66 //getDGM(nodeName).put("remoteNodeInfo", nodeInfo);
|
|
67 //cgm.setup(new RecordTopology());
|
|
68
|
|
69 String nodeInfoNodeName = nodeInfo.getNodeName();
|
|
70 LinkedList<HostMessage> connections;
|
|
71
|
|
72 if (!topology.containsKey(nodeInfoNodeName)) {
|
|
73 connections = new LinkedList<HostMessage>();
|
|
74 } else {
|
|
75 connections = topology.get(nodeInfoNodeName);
|
|
76 }
|
|
77 connections.add(nodeInfo);
|
|
78 topology.put(nodeInfoNodeName, connections);
|
41
|
79 }
|
167
|
80
|
170
|
81 getLocalDGM().put("topology", topology);
|
|
82
|
|
83
|
168
|
84 if (nodeNames.isEmpty()) {
|
167
|
85 // configuration finish
|
|
86 for (String key : resultParse.keySet()) {
|
170
|
87 topoDG.setNodeName(key);
|
|
88 getDGM(key).put("topoDG", topoDG);
|
|
89 if(topology.containsKey(key)){
|
|
90 LinkedList<HostMessage> connections = topology.get(key);
|
|
91 for(HostMessage connection: connections) {
|
|
92 put(key, "remoteNodeInfo", connection);
|
|
93 }
|
|
94 }
|
198
|
95 //put(key, "remoteNodeInfo", new HostMessage()); // end mark
|
167
|
96 }
|
176
|
97 }else{
|
|
98 cgm.setup(new IncomingHosts());
|
168
|
99 }
|
129
|
100 getLocalDGM().put("resultParse", resultParse);
|
|
101
|
41
|
102 }
|
|
103 }
|