Mercurial > hg > Database > Alice
changeset 478:cf345b10a21a dispose
bug fix
line wrap: on
line diff
--- a/src/main/java/alice/daemon/Connection.java Tue Dec 02 14:49:20 2014 +0900 +++ b/src/main/java/alice/daemon/Connection.java Tue Dec 02 17:16:34 2014 +0900 @@ -9,12 +9,14 @@ import alice.datasegment.Command; import alice.datasegment.DataSegment; import alice.datasegment.ReceiveData; +import alice.datasegment.SendOption; public class Connection { public Socket socket; public String name; public LinkedBlockingQueue<Command> sendQueue = new LinkedBlockingQueue<Command>(); + public boolean sendManager = true; public Connection(Socket socket) { this.socket = socket; @@ -46,7 +48,7 @@ putConnectionInfo(); } catch (IOException e) { - e.printStackTrace(); + putConnectionInfo(); } } @@ -55,6 +57,7 @@ socket.shutdownOutput(); socket.shutdownInput(); socket.close(); + putConnectionInfo(); } catch (ClosedChannelException e) { return; } catch (IOException e) { @@ -68,6 +71,10 @@ ConnectionInfo c = new ConnectionInfo(name, socket); ReceiveData rData = new ReceiveData(c, false, false); DataSegment.getLocal().put("_DISCONNECT", rData, null); + if (sendManager) { + SendOption option = new SendOption(false, false); + DataSegment.get("manager").put("_DISCONNECTNODE", rData, option); + } } }
--- a/src/main/java/alice/daemon/ConnectionInfo.java Tue Dec 02 14:49:20 2014 +0900 +++ b/src/main/java/alice/daemon/ConnectionInfo.java Tue Dec 02 17:16:34 2014 +0900 @@ -11,6 +11,8 @@ public String addr; public int port; + public ConnectionInfo() {/*messagePack need no arguments constructor*/} + public ConnectionInfo(String name, Socket socket){ this.nodeName = name; this.hostname = socket.getInetAddress().getHostName();
--- a/src/main/java/alice/daemon/IncomingTcpConnection.java Tue Dec 02 14:49:20 2014 +0900 +++ b/src/main/java/alice/daemon/IncomingTcpConnection.java Tue Dec 02 17:16:34 2014 +0900 @@ -92,13 +92,11 @@ } } catch (ClosedChannelException e) { connection.putConnectionInfo(); - connection.sendCommand(new Command(CommandType.CLOSE, null, null, null, 0, 0, null, null, reverseKey)); return; } catch (EOFException e) { - connection.sendCommand(new Command(CommandType.CLOSE, null, null, null, 0, 0, null, null, reverseKey)); return; } catch (IOException e) { - e.printStackTrace(); + return; } }
--- a/src/main/java/alice/datasegment/RemoteDataSegmentManager.java Tue Dec 02 14:49:20 2014 +0900 +++ b/src/main/java/alice/datasegment/RemoteDataSegmentManager.java Tue Dec 02 17:16:34 2014 +0900 @@ -16,8 +16,6 @@ Connection connection; Logger logger; - private OutboundTcpConnection out; - private IncomingTcpConnection in; public RemoteDataSegmentManager(){} @@ -45,10 +43,10 @@ } } } while (connect); - in = new IncomingTcpConnection(connection, manager, reverseKey); + IncomingTcpConnection in = new IncomingTcpConnection(connection, manager, reverseKey); in.setPriority(MAX_PRIORITY); in.start(); - out = new OutboundTcpConnection(connection); + OutboundTcpConnection out = new OutboundTcpConnection(connection); out.setPriority(MAX_PRIORITY); out.start(); } @@ -143,6 +141,7 @@ @Override public void close() { Command cmd = new Command(CommandType.CLOSE, null, null, null, 0, 0, null, null, ""); + connection.sendManager = false; connection.sendCommand(cmd); } @@ -154,16 +153,6 @@ queue.clear(); } - if (out != null && out.getState() != Thread.State.TERMINATED) { - Command cmd = new Command(CommandType.CLOSE, null, null, null, 0, 0, null, null, ""); - queue.add(cmd); // outboundTCP Thread will stop. - out = null; - } - - if (in != null || in.getState() != Thread.State.TERMINATED) { - in = null; - } - if (DataSegment.contains(connection.name)) { DataSegment.remove(connection.name); }
--- a/src/main/java/alice/topology/fix/FixTopology.java Tue Dec 02 14:49:20 2014 +0900 +++ b/src/main/java/alice/topology/fix/FixTopology.java Tue Dec 02 17:16:34 2014 +0900 @@ -19,7 +19,7 @@ private Receiver info4 = ids.create(CommandType.TAKE); // parentManager public FixTopology() { - info.setKey("_DISCONNECT"); + info.setKey("_DISCONNECTNODE"); info1.setKey("topology"); info2.setKey("hostCount"); info3.setKey("nameTable"); @@ -29,7 +29,7 @@ @SuppressWarnings("unchecked") @Override public void run() { - ConnectionInfo disconnect = info.asClass(ConnectionInfo.class); + ConnectionInfo disconnect = info.asClass(ConnectionInfo.class); // send Data is wrong. HashMap<String, HostMessage> nameTable = info3.asClass(HashMap.class); ParentManager manager = info4.asClass(ParentManager.class); HostMessage disconnectNode = nameTable.get(disconnect.nodeName); @@ -85,6 +85,15 @@ } else { // disconnect message already received. } + + // need debug option + for (LinkedList<HostMessage> list :topology.values()){ + System.out.print(list.get(0).remoteAbsName+" : "); + for (HostMessage host : list){ + System.out.print("[ "+host.absName+" "+host.name+" "+host.port+" "+host.connectionName+" "+host.reverseName+" "+host.remoteAbsName+" ]"); + } + System.out.println(); + } ods.put(info1.key, topology); ods.put(info2.key, info2.getReceiveData()); ods.put(info3.key, nameTable);
--- a/src/main/java/alice/topology/fix/ReceiveDisconnectMessage.java Tue Dec 02 14:49:20 2014 +0900 +++ b/src/main/java/alice/topology/fix/ReceiveDisconnectMessage.java Tue Dec 02 17:16:34 2014 +0900 @@ -8,7 +8,7 @@ private Receiver info = ids.create(CommandType.PEEK); public ReceiveDisconnectMessage() { - info.setKey("_DISCONNECT"); + info.setKey("_DISCONNECTNODE"); } @Override
--- a/src/main/java/alice/topology/manager/StartTopologyManager.java Tue Dec 02 14:49:20 2014 +0900 +++ b/src/main/java/alice/topology/manager/StartTopologyManager.java Tue Dec 02 17:16:34 2014 +0900 @@ -11,6 +11,7 @@ import alice.codesegment.CodeSegment; import alice.topology.HostMessage; +import alice.topology.fix.ReceiveDisconnectMessage; import com.alexmerz.graphviz.ParseException; import com.alexmerz.graphviz.Parser; @@ -100,6 +101,7 @@ ods.put("nameTable", nameTable); ods.put("hostCount", cominghostCount); new ComingServiceHosts(); + new ReceiveDisconnectMessage(); } ods.put("topology", new HashMap<String, LinkedList<HostMessage>>());
--- a/src/main/java/alice/topology/node/ConfigurationFinish.java Tue Dec 02 14:49:20 2014 +0900 +++ b/src/main/java/alice/topology/node/ConfigurationFinish.java Tue Dec 02 17:16:34 2014 +0900 @@ -5,7 +5,6 @@ import alice.codesegment.CodeSegment; import alice.datasegment.CommandType; import alice.datasegment.Receiver; -import alice.topology.manager.keeparive.CatchDisconnectEvent; public class ConfigurationFinish extends CodeSegment { @@ -26,7 +25,7 @@ Start cs = new Start(startCS); cs.done.setKey("manager", "start"); - DisconnectEventManager.getInstance().register(CatchDisconnectEvent.class); + new ReceiveCloseMessage(); DisconnectEventManager.getInstance().register(DeleteConnection.class); DisconnectEventManager.getInstance().setKey(); return;
--- a/src/main/java/alice/topology/node/DeleteConnection.java Tue Dec 02 14:49:20 2014 +0900 +++ b/src/main/java/alice/topology/node/DeleteConnection.java Tue Dec 02 17:16:34 2014 +0900 @@ -21,11 +21,12 @@ @SuppressWarnings("unchecked") List<String> list = info.asClass(List.class); - if (list.contains(name)) + if (list.contains(name)) { list.remove(name); - if (DataSegment.contains(name)) - DataSegment.get(name).shutdown(); - ods.put(info.key, list); + if (DataSegment.contains(name)) + DataSegment.get(name).shutdown(); + } + ods.update(info.key, list); } }
--- a/src/main/java/alice/topology/node/ExecuteEvent.java Tue Dec 02 14:49:20 2014 +0900 +++ b/src/main/java/alice/topology/node/ExecuteEvent.java Tue Dec 02 17:16:34 2014 +0900 @@ -26,7 +26,7 @@ // set MetaInfo if ("CloseEventCodeSegment".equals(clazz.getSuperclass().getSimpleName())) { CloseEventCodeSegment obj = (CloseEventCodeSegment) clazz.newInstance(); - obj.metaInfo = info; + obj.metaInfo = info1; obj.ids.setKey(); obj.ids.receive(); } else if ("CodeSegment".equals(clazz.getSuperclass().getSimpleName())){
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/main/java/alice/topology/node/ReceiveCloseMessage.java Tue Dec 02 17:16:34 2014 +0900 @@ -0,0 +1,23 @@ +package alice.topology.node; + +import alice.codesegment.CodeSegment; +import alice.datasegment.CommandType; +import alice.datasegment.DataSegment; +import alice.datasegment.Receiver; + +public class ReceiveCloseMessage extends CodeSegment { + + private Receiver info = ids.create(CommandType.TAKE); + + public ReceiveCloseMessage() { + info.setKey("_CLOSEMESSEAGE"); + } + + @Override + public void run() { + String managerKey = info.asString(); + DataSegment.get(managerKey).close(); + new ReceiveCloseMessage(); + } + +}