Mercurial > hg > Database > Alice
changeset 400:3a0056e03040 dispose
remove reconnection package
author | sugi |
---|---|
date | Sun, 22 Jun 2014 23:10:23 +0900 |
parents | 743ac2baf8d4 |
children | 91e1b063a89f |
files | src/main/java/alice/daemon/IncomingTcpConnection.java src/main/java/alice/datasegment/DataSegment.java src/main/java/alice/datasegment/RemoteDataSegmentManager.java src/main/java/alice/test/codesegment/remote/TestRemoteAlice.java src/main/java/alice/topology/manager/ConfigWaiter.java src/main/java/alice/topology/manager/IncomingHosts.java src/main/java/alice/topology/manager/NodeInfo.java src/main/java/alice/topology/manager/StartTopologyManager.java src/main/java/alice/topology/manager/TopologyFinish.java src/main/java/alice/topology/manager/TopologyManager.java src/main/java/alice/topology/manager/TopologyManagerConfig.java src/main/java/alice/topology/manager/createABSIPList.java src/main/java/alice/topology/manager/reconnection/CheckABSName.java src/main/java/alice/topology/manager/reconnection/CheckConnectionList.java src/main/java/alice/topology/manager/reconnection/ReceiveError.java src/main/java/alice/topology/manager/reconnection/ReceiveReconnectData.java src/main/java/alice/topology/manager/reconnection/SendError.java src/main/java/alice/topology/node/ConfigurationFinish.java src/main/java/alice/topology/node/IncomingConnectionInfo.java src/main/java/alice/topology/node/StartTopologyNode.java |
diffstat | 20 files changed, 9 insertions(+), 495 deletions(-) [+] |
line wrap: on
line diff
--- a/src/main/java/alice/daemon/IncomingTcpConnection.java Sun Jun 22 21:43:41 2014 +0900 +++ b/src/main/java/alice/daemon/IncomingTcpConnection.java Sun Jun 22 23:10:23 2014 +0900 @@ -12,9 +12,7 @@ import alice.datasegment.DataSegment; import alice.datasegment.DataSegmentManager; import alice.datasegment.LocalDataSegmentManager; -import alice.topology.HostMessage; import alice.topology.manager.keeparive.RespondData; -import alice.topology.manager.reconnection.SendError; public class IncomingTcpConnection extends Thread { @@ -93,7 +91,6 @@ connection.sendCommand(new Command(CommandType.CLOSE, null, null, null, 0, 0, null, null, null)); return; } catch (EOFException e) { - new SendError(new HostMessage(connection.socket.getInetAddress().getHostName(), connection.socket.getPort())).execute(); connection.sendCommand(new Command(CommandType.CLOSE, null, null, null, 0, 0, null, null, null)); return; } catch (IOException e) {
--- a/src/main/java/alice/datasegment/DataSegment.java Sun Jun 22 21:43:41 2014 +0900 +++ b/src/main/java/alice/datasegment/DataSegment.java Sun Jun 22 23:10:23 2014 +0900 @@ -27,8 +27,8 @@ dataSegment.dataSegmentManagers.put(key, manager); } - public static RemoteDataSegmentManager connect(String connectionKey, String reverseKey, String hostName, int port, boolean rFlag) { - RemoteDataSegmentManager manager = new RemoteDataSegmentManager(connectionKey, reverseKey, hostName, port, rFlag); + public static RemoteDataSegmentManager connect(String connectionKey, String reverseKey, String hostName, int port) { + RemoteDataSegmentManager manager = new RemoteDataSegmentManager(connectionKey, reverseKey, hostName, port); register(connectionKey, manager); return manager; }
--- a/src/main/java/alice/datasegment/RemoteDataSegmentManager.java Sun Jun 22 21:43:41 2014 +0900 +++ b/src/main/java/alice/datasegment/RemoteDataSegmentManager.java Sun Jun 22 23:10:23 2014 +0900 @@ -10,8 +10,6 @@ import alice.daemon.Connection; import alice.daemon.IncomingTcpConnection; import alice.daemon.OutboundTcpConnection; -import alice.topology.HostMessage; -import alice.topology.manager.reconnection.SendError; public class RemoteDataSegmentManager extends DataSegmentManager { @@ -20,7 +18,7 @@ public RemoteDataSegmentManager(){} - public RemoteDataSegmentManager(String connectionKey, final String reverseKey, final String hostName, final int port, final boolean rFlag) { + public RemoteDataSegmentManager(String connectionKey, final String reverseKey, final String hostName, final int port) { logger = Logger.getLogger(connectionKey); connection = new Connection(); final RemoteDataSegmentManager manager = this; @@ -42,13 +40,9 @@ e1.printStackTrace(); } } - } while (connect&&!rFlag); + } while (connect); new IncomingTcpConnection(connection, manager, reverseKey).start(); - new OutboundTcpConnection(connection).start(); - // if connection failed need to stop these thread - if (connect){ - new SendError(new HostMessage(hostName, port)).execute(); - } + new OutboundTcpConnection(connection).start(); } }.start(); }
--- a/src/main/java/alice/test/codesegment/remote/TestRemoteAlice.java Sun Jun 22 21:43:41 2014 +0900 +++ b/src/main/java/alice/test/codesegment/remote/TestRemoteAlice.java Sun Jun 22 23:10:23 2014 +0900 @@ -9,7 +9,7 @@ TestRemoteConfig conf = new TestRemoteConfig(args); new AliceDaemon(conf).listen(); - DataSegment.connect(conf.key, "", conf.hostname, conf.connectPort, false); + DataSegment.connect(conf.key, "", conf.hostname, conf.connectPort); new RemoteStartCodeSegment().execute(); }
--- a/src/main/java/alice/topology/manager/ConfigWaiter.java Sun Jun 22 21:43:41 2014 +0900 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,31 +0,0 @@ -package alice.topology.manager; - -import org.msgpack.type.ValueFactory; - -import alice.codesegment.CodeSegment; -import alice.datasegment.CommandType; -import alice.datasegment.Receiver; -import alice.topology.manager.reconnection.ReceiveError; - -public class ConfigWaiter extends CodeSegment { - - public Receiver done = ids.create(CommandType.TAKE); - public int count; - - public ConfigWaiter(int nodeNum) { - this.count = nodeNum; - } - - @Override - public void run() { - count--; - if (count == 0) { - ods.put("local", "start", ValueFactory.createNilValue()); - new ReceiveError(); - return; - } - ConfigWaiter cs3 = new ConfigWaiter(count); - cs3.done.setKey("local", "done"); - } - -}
--- a/src/main/java/alice/topology/manager/IncomingHosts.java Sun Jun 22 21:43:41 2014 +0900 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,67 +0,0 @@ -package alice.topology.manager; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.LinkedList; - -import org.msgpack.type.ValueFactory; - -import alice.codesegment.CodeSegment; -import alice.datasegment.CommandType; -import alice.datasegment.DataSegment; -import alice.datasegment.Receiver; -import alice.topology.HostMessage; - -public class IncomingHosts extends CodeSegment { - - HashMap<String, LinkedList<NodeInfo>> topology; - LinkedList<String> nodeNames; - Receiver host = ids.create(CommandType.TAKE); - Receiver connection = ids.create(CommandType.TAKE); - - public IncomingHosts(HashMap<String, LinkedList<NodeInfo>> topology, LinkedList<String> nodeNames) { - this.topology = topology; - this.nodeNames = nodeNames; - } - - @Override - public void run() { - HostMessage host = this.host.asClass(HostMessage.class); - @SuppressWarnings("unchecked") - HashMap<String, ArrayList<HostMessage>> connectionList = this.connection.asClass(HashMap.class); - - String nodeName = nodeNames.poll(); - // Manager connect to Node - - DataSegment.connect(nodeName, "", host.name, host.port, host.reconnectFlag); - ods.put(nodeName, "host", nodeName); - LinkedList<NodeInfo> nodes = topology.get(nodeName); - ArrayList<HostMessage> list; - for (NodeInfo nodeInfo : nodes) { - HostMessage newHost = new HostMessage(host.name, host.port, nodeInfo.connectionName, nodeInfo.reverseName); - ods.put("local", nodeInfo.sourceNodeName, newHost); - - if (connectionList.containsKey(nodeInfo.sourceNodeName)){ - list = connectionList.get(nodeInfo.sourceNodeName); - } else { - list = new ArrayList<HostMessage>(); - connectionList.put(nodeInfo.sourceNodeName, list); - - } - list.add(newHost); - } - ods.update("local", "connection", connectionList); - - if (nodeNames.isEmpty()) { - // configuration finish - for (String key : topology.keySet()) { - ods.put("local", key, ValueFactory.createNilValue()); - } - } else { - IncomingHosts cs = new IncomingHosts(topology, nodeNames); - cs.host.setKey("local", "host"); - cs.connection.setKey("local", "connection"); - } - } - -}
--- a/src/main/java/alice/topology/manager/NodeInfo.java Sun Jun 22 21:43:41 2014 +0900 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,14 +0,0 @@ -package alice.topology.manager; - -public class NodeInfo { - - public String sourceNodeName; - public String connectionName; - public String reverseName; - - public NodeInfo(String source, String connection) { - this.sourceNodeName = source; - this.connectionName = connection; - } - -}
--- a/src/main/java/alice/topology/manager/StartTopologyManager.java Sun Jun 22 21:43:41 2014 +0900 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,93 +0,0 @@ -package alice.topology.manager; - -import java.io.File; -import java.io.FileNotFoundException; -import java.io.FileReader; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.LinkedList; - -import org.apache.log4j.Logger; - -import alice.codesegment.CodeSegment; -import alice.topology.HostMessage; - -import com.alexmerz.graphviz.ParseException; -import com.alexmerz.graphviz.Parser; -import com.alexmerz.graphviz.objects.Edge; -import com.alexmerz.graphviz.objects.Graph; -import com.alexmerz.graphviz.objects.Node; - -public class StartTopologyManager extends CodeSegment { - - TopologyManagerConfig conf; - Logger logger = Logger.getLogger(StartTopologyManager.class); - - public StartTopologyManager(TopologyManagerConfig conf) { - this.conf = conf; - } - - @Override - public void run() { - LinkedList<String> nodeNames = new LinkedList<String>(); - HashMap<String, LinkedList<NodeInfo>> topology = new HashMap<String, LinkedList<NodeInfo>>(); - int nodeNum = 0; - try { - FileReader reader = new FileReader(new File(conf.confFilePath)); - Parser parser = new Parser(); - parser.parse(reader); - ArrayList<Graph> graphs = parser.getGraphs(); - for (Graph graph : graphs) { - ArrayList<Node> nodes = graph.getNodes(false); - nodeNum = nodes.size(); - for (Node node : nodes) { - String nodeName = node.getId().getId(); - nodeNames.add(nodeName); - topology.put(nodeName, new LinkedList<NodeInfo>()); - } - ArrayList<Edge> edges = graph.getEdges(); - HashMap<String, NodeInfo> hash = new HashMap<String, NodeInfo>(); - for (Edge edge : edges) { - String connection = edge.getAttribute("label"); - String source = edge.getSource().getNode().getId().getId(); - String target = edge.getTarget().getNode().getId().getId(); - LinkedList<NodeInfo> sources = topology.get(target); - NodeInfo nodeInfo = new NodeInfo(source, connection); - sources.add(nodeInfo); - hash.put(source + "," + target, nodeInfo); - } - for (Edge edge : edges) { - String connection = edge.getAttribute("label"); - String source = edge.getSource().getNode().getId().getId(); - String target = edge.getTarget().getNode().getId().getId(); - NodeInfo nodeInfo = hash.get(target + "," + source); - if (nodeInfo != null) { - nodeInfo.reverseName = connection; - } - } - } - - } catch (FileNotFoundException e) { - logger.error("File not found: " + conf.confFilePath); - e.printStackTrace(); - } catch (ParseException e) { - logger.error("File format error: " + conf.confFilePath); - e.printStackTrace(); - } - HashMap<String, ArrayList<HostMessage>> connectionList = new HashMap<String, ArrayList<HostMessage>>(); - ods.put("connection", connectionList); - ods.put("_ABSIPTABLE" ,new ArrayList<HostMessage>()); - - new createABSIPList(); - - IncomingHosts cs1 = new IncomingHosts(topology, nodeNames); - cs1.host.setKey("host"); - cs1.connection.setKey("connection"); - - ConfigWaiter cs3 = new ConfigWaiter(nodeNum); - cs3.done.setKey("local", "done"); - - - } - -}
--- a/src/main/java/alice/topology/manager/TopologyFinish.java Sun Jun 22 21:43:41 2014 +0900 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,14 +0,0 @@ -package alice.topology.manager; - -import alice.codesegment.CodeSegment; -import alice.datasegment.CommandType; -import alice.datasegment.Receiver; - -public class TopologyFinish extends CodeSegment { - public Receiver finish = ids.create(CommandType.TAKE); - @Override - public void run() { - System.exit(0); - } - -}
--- a/src/main/java/alice/topology/manager/TopologyManager.java Sun Jun 22 21:43:41 2014 +0900 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,13 +0,0 @@ -package alice.topology.manager; - -import alice.daemon.AliceDaemon; - -public class TopologyManager { - - public static void main(String[] args) { - TopologyManagerConfig conf = new TopologyManagerConfig(args); - new AliceDaemon(conf).listen(); - new StartTopologyManager(conf).execute(); - } - -}
--- a/src/main/java/alice/topology/manager/TopologyManagerConfig.java Sun Jun 22 21:43:41 2014 +0900 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,18 +0,0 @@ -package alice.topology.manager; - -import alice.daemon.Config; - -public class TopologyManagerConfig extends Config { - - public String confFilePath; - - public TopologyManagerConfig(String[] args) { - super(args); - for (int i = 0; i < args.length; i++) { - if ("-conf".equals(args[i])) { - confFilePath = args[++i]; - } - } - } - -}
--- a/src/main/java/alice/topology/manager/createABSIPList.java Sun Jun 22 21:43:41 2014 +0900 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,31 +0,0 @@ -package alice.topology.manager; - -import java.util.List; - -import alice.codesegment.CodeSegment; -import alice.datasegment.CommandType; -import alice.datasegment.Receiver; -import alice.topology.HostMessage; - -public class createABSIPList extends CodeSegment{ - private Receiver rData = ids.create(CommandType.TAKE); - private Receiver hlist = ids.create(CommandType.PEEK); - - public createABSIPList(){ - rData.setKey("_ABS_IP"); - hlist.setKey("_ABSIPTABLE"); - } - - @Override - public void run() { - HostMessage host = rData.asClass(HostMessage.class); - @SuppressWarnings("unchecked") - List<HostMessage> ABSIPList = hlist.asClass(List.class); - ABSIPList.add(host); - ods.update("_ABSIPTABLE", ABSIPList); - - new createABSIPList(); - - } - -}
--- a/src/main/java/alice/topology/manager/reconnection/CheckABSName.java Sun Jun 22 21:43:41 2014 +0900 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,47 +0,0 @@ -package alice.topology.manager.reconnection; - -import java.util.List; - -import alice.codesegment.CodeSegment; -import alice.datasegment.CommandType; -import alice.datasegment.DataSegment; -import alice.datasegment.Receiver; -import alice.topology.HostMessage; - -public class CheckABSName extends CodeSegment{ - private Receiver abs = ids.create(CommandType.PEEK); - private Receiver host = ids.create(CommandType.TAKE); - private HostMessage message; - - public CheckABSName(HostMessage mes) { - message = mes; - abs.setKey("_ABSIPTABLE"); - host.setKey("host"); - } - - @Override - public void run() { - System.out.println("CheckABSName"); - @SuppressWarnings("unchecked") - List<HostMessage> ABSIPList = abs.asClass(List.class); - HostMessage hostInfo = host.asClass(HostMessage.class); - for (HostMessage mes : ABSIPList){ - if (mes.name.equals(message.name)&&mes.port == message.port){ - DataSegment.remove(mes.getABSName()); - DataSegment.connect(mes.getABSName(), "", hostInfo.name, hostInfo.port, hostInfo.reconnectFlag); - ods.put(mes.getABSName(), "host", mes.getABSName()); - ods.put("_RECABSNAME", mes.getABSName()); - ods.put("_HMCLONE", new HostMessage(mes.name, mes.port)); - mes.port = hostInfo.port; - mes.name = hostInfo.name; - ods.update("_ABSIPTABLE", ABSIPList); - new CheckConnectionList(); - return; - } - } - ods.put("host", hostInfo); - new ReceiveError(); - System.out.println("not match"); - } - -}
--- a/src/main/java/alice/topology/manager/reconnection/CheckConnectionList.java Sun Jun 22 21:43:41 2014 +0900 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,70 +0,0 @@ -package alice.topology.manager.reconnection; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; - -import org.msgpack.type.ValueFactory; - -import alice.codesegment.CodeSegment; -import alice.datasegment.CommandType; -import alice.datasegment.Receiver; -import alice.topology.HostMessage; - -public class CheckConnectionList extends CodeSegment { - - private Receiver name = ids.create(CommandType.TAKE); - private Receiver connection = ids.create(CommandType.TAKE); - private Receiver abs = ids.create(CommandType.PEEK); - private Receiver clone = ids.create(CommandType.TAKE); - - public CheckConnectionList(){ - name.setKey("_RECABSNAME"); - connection.setKey("connection"); - abs.setKey("_ABSIPTABLE"); - clone.setKey("_HMCLONE"); - } - - @Override - public void run() { - @SuppressWarnings("unchecked") - HashMap<String, ArrayList<HostMessage>> connectionList = connection.asClass(HashMap.class); - @SuppressWarnings("unchecked") - List<HostMessage> ABSIPList = abs.asClass(List.class); - String absName = name.asClass(String.class); - HostMessage oldInfo = clone.asClass(HostMessage.class); - HostMessage newInfo = null; - for (HostMessage mes : ABSIPList){ - if (mes.absName.equals(absName)){ - newInfo = mes; - break; - } - } - - List<HostMessage> clist = connectionList.get(absName); - for (HostMessage mes : clist){ - mes.setFlag(); - ods.put(absName, mes); - for (HostMessage mes2 : ABSIPList){ - if (mes.name.equals(mes2.name)&&mes.port == mes2.port){ - String absName2 = mes2.getABSName(); - List<HostMessage> clist2 = connectionList.get(absName2); - for (HostMessage mes3 : clist2){ - if (mes3.name.equals(oldInfo.name)&&mes3.port == oldInfo.port){ - mes3.name = newInfo.name; - mes3.port = newInfo.port; - mes3.setFlag(); - ods.put(absName2, "_RECODATA", mes3); - break; - } - } - break; - } - } - } - System.out.println("SendHost Data"); - ods.put(absName, ValueFactory.createNilValue()); - ods.update("connection", connectionList); - new ReceiveError(); - } -} \ No newline at end of file
--- a/src/main/java/alice/topology/manager/reconnection/ReceiveError.java Sun Jun 22 21:43:41 2014 +0900 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,33 +0,0 @@ -package alice.topology.manager.reconnection; - -import java.io.IOException; -import java.net.Socket; -import alice.codesegment.CodeSegment; -import alice.datasegment.CommandType; -import alice.datasegment.Receiver; -import alice.topology.HostMessage; - -public class ReceiveError extends CodeSegment { - public Receiver reportInfo = ids.create(CommandType.TAKE); - - public ReceiveError(){ - reportInfo.setKey("_ERROR"); - } - - @Override - public void run() { - HostMessage message = reportInfo.asClass(HostMessage.class); - try { - System.out.println("Receive Error "+ message.port); - Socket socket = new Socket(message.name, message.port); - socket.close(); - System.out.println("alive "+message.port); - } catch (IOException e) { - System.out.println("Receive Error "+ message.port); - new CheckABSName(message); - return; - } - new ReceiveError(); - } - -}
--- a/src/main/java/alice/topology/manager/reconnection/ReceiveReconnectData.java Sun Jun 22 21:43:41 2014 +0900 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,26 +0,0 @@ -package alice.topology.manager.reconnection; - -import alice.codesegment.CodeSegment; -import alice.datasegment.CommandType; -import alice.datasegment.DataSegment; -import alice.datasegment.Receiver; -import alice.topology.HostMessage; - -public class ReceiveReconnectData extends CodeSegment{ - private Receiver hostData = ids.create(CommandType.TAKE); - - public ReceiveReconnectData(){ - hostData.setKey("_RECODATA"); - } - - @Override - public void run() { - HostMessage host = hostData.asClass(HostMessage.class); - DataSegment.remove(host.connectionName); - DataSegment.connect(host.connectionName, host.reverseName, host.name, host.port, host.reconnectFlag); - ods.put(host.connectionName, "reverseKey", host.reverseName); - new ReceiveReconnectData(); - } - - -}
--- a/src/main/java/alice/topology/manager/reconnection/SendError.java Sun Jun 22 21:43:41 2014 +0900 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,18 +0,0 @@ -package alice.topology.manager.reconnection; - -import alice.codesegment.CodeSegment; -import alice.topology.HostMessage; - -public class SendError extends CodeSegment{ - private HostMessage message; - - public SendError(HostMessage mes){ - message = mes; - } - - @Override - public void run() { - ods.put("manager", "_ERROR", message); - } - -}
--- a/src/main/java/alice/topology/node/ConfigurationFinish.java Sun Jun 22 21:43:41 2014 +0900 +++ b/src/main/java/alice/topology/node/ConfigurationFinish.java Sun Jun 22 23:10:23 2014 +0900 @@ -5,7 +5,6 @@ import alice.codesegment.CodeSegment; import alice.datasegment.CommandType; import alice.datasegment.Receiver; -import alice.topology.manager.reconnection.ReceiveReconnectData; public class ConfigurationFinish extends CodeSegment { @@ -20,8 +19,7 @@ @Override public void run() { if (reverseCount.getVal().equals(configNodeNum.getVal())) { - new ReceiveReconnectData(); - + ods.put("manager", "done", ValueFactory.createNilValue()); Start cs = new Start(startCS); cs.done.setKey("manager", "start");
--- a/src/main/java/alice/topology/node/IncomingConnectionInfo.java Sun Jun 22 21:43:41 2014 +0900 +++ b/src/main/java/alice/topology/node/IncomingConnectionInfo.java Sun Jun 22 23:10:23 2014 +0900 @@ -37,7 +37,7 @@ HostMessage hostInfo = this.hostInfo.asClass(HostMessage.class); //System.out.println(hostInfo.reconnectFlag); - DataSegment.connect(hostInfo.connectionName, hostInfo.reverseName, hostInfo.name, hostInfo.port, hostInfo.reconnectFlag); + DataSegment.connect(hostInfo.connectionName, hostInfo.reverseName, hostInfo.name, hostInfo.port); ods.put(hostInfo.connectionName, "reverseKey", hostInfo.reverseName); connectionList.add(hostInfo.connectionName); ods.update("_CLIST", connectionList);
--- a/src/main/java/alice/topology/node/StartTopologyNode.java Sun Jun 22 21:43:41 2014 +0900 +++ b/src/main/java/alice/topology/node/StartTopologyNode.java Sun Jun 22 23:10:23 2014 +0900 @@ -18,7 +18,7 @@ @Override public void run() { - DataSegment.connect("manager", "", conf.getManagerHostName(), conf.getManagerPort(), false); + DataSegment.connect("manager", "", conf.getManagerHostName(), conf.getManagerPort()); String localHostName = null; try { localHostName = InetAddress.getLocalHost().getHostName();