41
|
1 package christie.topology.manager;
|
|
2
|
|
3
|
|
4 import christie.annotation.Peek;
|
|
5 import christie.annotation.Take;
|
|
6 import christie.codegear.CodeGear;
|
|
7 import christie.codegear.CodeGearManager;
|
|
8 import christie.topology.HostMessage;
|
|
9 import org.apache.log4j.Logger;
|
|
10
|
|
11 import java.util.HashMap;
|
|
12 import java.util.LinkedList;
|
|
13
|
|
14 public class IncomingHosts extends CodeGear {
|
|
15
|
|
16 @Peek // Topology from parse file
|
|
17 HashMap<String, LinkedList<NodeInfo>> topology;
|
|
18
|
|
19 @Peek // nodeName list
|
|
20 LinkedList<String> nodeNames;
|
|
21
|
|
22 @Take // new coming host info
|
|
23 HostMessage host;
|
|
24
|
|
25
|
|
26 //private Receiver absCookieTable = ids.create(CommandType.TAKE); // cookie, AbsName HashMap
|
|
27 //private Receiver cookie = ids.create(CommandType.TAKE); // MD5
|
|
28 private Logger log = Logger.getLogger(IncomingHosts.class);
|
|
29
|
|
30 public IncomingHosts() {
|
|
31
|
|
32 }
|
|
33
|
|
34 @Override
|
|
35 public void run(CodeGearManager cgm) {
|
|
36
|
|
37 // not have or match cookie
|
|
38 String nodeName = nodeNames.poll();
|
|
39 // Manager connect to Node
|
|
40
|
|
41 cgm.createRemoteDGM(nodeName, host.name, host.port);
|
|
42 getDGM(nodeName).put( "host", nodeName);
|
|
43
|
|
44
|
|
45 /* cookie
|
|
46 String cookie = this.cookie.asString();
|
|
47 absCookieTable.put(cookie, nodeName);
|
|
48 ods.put(this.absCookieTable.key, absCookieTable);
|
|
49
|
|
50 ods.put(nodeName, "cookie", cookie);
|
|
51 */
|
|
52 log.info( "toplology manager connected from " + nodeName);
|
|
53
|
|
54 LinkedList<NodeInfo> nodes = topology.get(nodeName);
|
|
55 for (NodeInfo nodeInfo : nodes) {
|
|
56 HostMessage newHost = new HostMessage(host.name, host.port,
|
|
57 nodeInfo.connectionName, nodeInfo.reverseName);
|
|
58 newHost.absName = nodeName;
|
|
59 newHost.remoteAbsName = nodeInfo.sourceNodeName;
|
|
60
|
|
61 getLocalDGM().put("nodeInfo", newHost);
|
|
62 getLocalDGM().put(nodeInfo.sourceNodeName, newHost);
|
|
63 log.info(" write to " + nodeInfo.sourceNodeName + " config message =" + newHost.toString() + " remain "
|
|
64 + Integer.toString((nodeNames.size())));
|
|
65 new RecordTopology();
|
|
66 }
|
|
67
|
|
68 log.info(" remaining configure host = " + Integer.toString(nodeNames.size()));
|
|
69 if (nodeNames.isEmpty()) {
|
|
70 // configuration finish
|
|
71 for (String key : topology.keySet()) {
|
|
72 log.info(" write to " + key + " end message =" + (new HostMessage("",0,"","")).toString());
|
|
73 getLocalDGM().put(key, new HostMessage("",0,"","")); // end mark
|
|
74 }
|
|
75 }
|
|
76
|
|
77 // idsのときはkeyメソッドが使えたけど, 今はlistなのでエラーが出る.
|
|
78 // これが何をする処理か読めていないので, 今はコメントアウト.
|
|
79 //getLocalDGM().put(this.nodeNames.key, nodeNames);
|
|
80 //getLocalDGM().put(this.topology.key, topology);
|
|
81
|
|
82 cgm.setup(new IncomingHosts());
|
|
83 }
|
|
84 }
|