view src/main/java/christie/topology/manager/IncomingHosts.java @ 64:f884c1bd0d36

fix IncomingHosts and RecordTopology. and refactor.
author akahori
date Thu, 30 Aug 2018 10:55:37 +0900
parents cfd79a71f9cd
children d74a64a3940a
line wrap: on
line source

package christie.topology.manager;


import christie.annotation.Peek;
import christie.annotation.Take;
import christie.codegear.CodeGear;
import christie.codegear.CodeGearManager;
import christie.topology.HostMessage;
import org.apache.log4j.Logger;

import java.util.HashMap;
import java.util.LinkedList;

public class IncomingHosts extends CodeGear {

    @Peek // Topology from parse file
    HashMap<String, LinkedList<NodeInfo>> resultParse;
    @Peek // nodeName list
    LinkedList<String> nodeNames;
    @Peek
    HashMap<String, String> absCookieTable;

    @Take // new coming host info
    HostMessage newHost;
    @Take
    String MD5;


    private Logger log = Logger.getLogger(IncomingHosts.class);

    public IncomingHosts() {

    }

    @Override
    protected void run(CodeGearManager cgm) {
        // not have or match cookie
        String nodeName = nodeNames.poll();

        // Manager connect to Node
        cgm.createRemoteDGM(nodeName, newHost.hostName, newHost.port);

        absCookieTable.put(MD5, nodeName);

        getDGM(nodeName).put( "nodeName", nodeName);
        getDGM(nodeName).put("cookie", MD5);
        log.info( "toplology manager connected from " + nodeName);



        LinkedList<NodeInfo> nodeInfoList = resultParse.get(nodeName);
        for (NodeInfo nodeInfo : nodeInfoList) {
            HostMessage hostMessage = new HostMessage(newHost.hostName, newHost.port,
                    nodeInfo.connectionName, "");
            hostMessage.nodeName = nodeName;
            hostMessage.remoteNodeName = nodeInfo.targetNodeName;

            getLocalDGM().put("nodeInfo", hostMessage);

            log.info(" write to " + nodeInfo.sourceNodeName + " config message =" + hostMessage.toString() + " remain "
                    + Integer.toString((nodeNames.size())));

            cgm.setup(new RecordTopology());
        }

        log.info(" remaining configure host = " + Integer.toString(nodeNames.size()));
        if (nodeNames.isEmpty()) {
            // configuration finish
            for (String key : resultParse.keySet()) {
                log.info(" write to " + key + " end message =" + (new HostMessage("",0,"","")).toString());
                getLocalDGM().put("hostInfo", new HostMessage("",0,"","")); // end mark
            }
        }
        cgm.setup(new IncomingHosts());
    }
}