changeset 478:cf345b10a21a dispose

bug fix
author sugi
date Tue, 02 Dec 2014 17:16:34 +0900
parents f644dc9e0589
children 1550cb8429df
files src/main/java/alice/daemon/Connection.java src/main/java/alice/daemon/ConnectionInfo.java src/main/java/alice/daemon/IncomingTcpConnection.java src/main/java/alice/datasegment/RemoteDataSegmentManager.java src/main/java/alice/topology/fix/FixTopology.java src/main/java/alice/topology/fix/ReceiveDisconnectMessage.java src/main/java/alice/topology/manager/StartTopologyManager.java src/main/java/alice/topology/node/ConfigurationFinish.java src/main/java/alice/topology/node/DeleteConnection.java src/main/java/alice/topology/node/ExecuteEvent.java src/main/java/alice/topology/node/ReceiveCloseMessage.java
diffstat 11 files changed, 58 insertions(+), 28 deletions(-) [+]
line wrap: on
line diff
--- a/src/main/java/alice/daemon/Connection.java	Tue Dec 02 14:49:20 2014 +0900
+++ b/src/main/java/alice/daemon/Connection.java	Tue Dec 02 17:16:34 2014 +0900
@@ -9,12 +9,14 @@
 import alice.datasegment.Command;
 import alice.datasegment.DataSegment;
 import alice.datasegment.ReceiveData;
+import alice.datasegment.SendOption;
 
 public class Connection {
 
     public Socket socket;
     public String name;
     public LinkedBlockingQueue<Command> sendQueue = new LinkedBlockingQueue<Command>();
+    public boolean sendManager = true;
 
     public Connection(Socket socket) {
         this.socket = socket;
@@ -46,7 +48,7 @@
             putConnectionInfo();
 
         } catch (IOException e) {
-            e.printStackTrace();
+            putConnectionInfo();
         }
     }
 
@@ -55,6 +57,7 @@
             socket.shutdownOutput();
             socket.shutdownInput();
             socket.close();
+            putConnectionInfo();
         } catch (ClosedChannelException e) {
             return;
         } catch (IOException e) {
@@ -68,6 +71,10 @@
             ConnectionInfo c = new ConnectionInfo(name, socket);
             ReceiveData rData = new ReceiveData(c, false, false);
             DataSegment.getLocal().put("_DISCONNECT", rData, null);
+            if (sendManager) {
+                SendOption option = new SendOption(false, false);
+                DataSegment.get("manager").put("_DISCONNECTNODE", rData, option);
+            }
         }
 
     }
--- a/src/main/java/alice/daemon/ConnectionInfo.java	Tue Dec 02 14:49:20 2014 +0900
+++ b/src/main/java/alice/daemon/ConnectionInfo.java	Tue Dec 02 17:16:34 2014 +0900
@@ -11,6 +11,8 @@
     public String addr;
     public int port;
 
+    public ConnectionInfo() {/*messagePack need no arguments constructor*/}
+
     public ConnectionInfo(String name, Socket socket){
         this.nodeName = name;
         this.hostname = socket.getInetAddress().getHostName();
--- a/src/main/java/alice/daemon/IncomingTcpConnection.java	Tue Dec 02 14:49:20 2014 +0900
+++ b/src/main/java/alice/daemon/IncomingTcpConnection.java	Tue Dec 02 17:16:34 2014 +0900
@@ -92,13 +92,11 @@
                 }
             } catch (ClosedChannelException e) {
                 connection.putConnectionInfo();
-                connection.sendCommand(new Command(CommandType.CLOSE, null, null, null, 0, 0, null, null, reverseKey));
                 return;
             } catch (EOFException e) {
-                connection.sendCommand(new Command(CommandType.CLOSE, null, null, null, 0, 0, null, null, reverseKey));
                 return;
             } catch (IOException e) {
-                e.printStackTrace();
+                return;
             }
         }
 
--- a/src/main/java/alice/datasegment/RemoteDataSegmentManager.java	Tue Dec 02 14:49:20 2014 +0900
+++ b/src/main/java/alice/datasegment/RemoteDataSegmentManager.java	Tue Dec 02 17:16:34 2014 +0900
@@ -16,8 +16,6 @@
 
     Connection connection;
     Logger logger;
-    private OutboundTcpConnection out;
-    private IncomingTcpConnection in;
 
     public RemoteDataSegmentManager(){}
 
@@ -45,10 +43,10 @@
                         }
                     }
                 } while (connect);
-                in = new IncomingTcpConnection(connection, manager, reverseKey);
+                IncomingTcpConnection in = new IncomingTcpConnection(connection, manager, reverseKey);
                 in.setPriority(MAX_PRIORITY);
                 in.start();
-                out = new OutboundTcpConnection(connection);
+                OutboundTcpConnection out = new OutboundTcpConnection(connection);
                 out.setPriority(MAX_PRIORITY);
                 out.start();
             }
@@ -143,6 +141,7 @@
     @Override
     public void close() {
         Command cmd = new Command(CommandType.CLOSE, null, null, null, 0, 0, null, null, "");
+        connection.sendManager = false;
         connection.sendCommand(cmd);
     }
 
@@ -154,16 +153,6 @@
             queue.clear();
         }
 
-        if (out != null && out.getState() != Thread.State.TERMINATED) {
-            Command cmd = new Command(CommandType.CLOSE, null, null, null, 0, 0, null, null, "");
-            queue.add(cmd); // outboundTCP Thread will stop.
-            out = null;
-        }
-
-        if (in != null || in.getState() != Thread.State.TERMINATED) {
-            in = null;
-        }
-
         if (DataSegment.contains(connection.name)) {
             DataSegment.remove(connection.name);
         }
--- a/src/main/java/alice/topology/fix/FixTopology.java	Tue Dec 02 14:49:20 2014 +0900
+++ b/src/main/java/alice/topology/fix/FixTopology.java	Tue Dec 02 17:16:34 2014 +0900
@@ -19,7 +19,7 @@
     private Receiver info4 = ids.create(CommandType.TAKE); // parentManager
 
     public FixTopology() {
-        info.setKey("_DISCONNECT");
+        info.setKey("_DISCONNECTNODE");
         info1.setKey("topology");
         info2.setKey("hostCount");
         info3.setKey("nameTable");
@@ -29,7 +29,7 @@
     @SuppressWarnings("unchecked")
     @Override
     public void run() {
-        ConnectionInfo disconnect = info.asClass(ConnectionInfo.class);
+        ConnectionInfo disconnect = info.asClass(ConnectionInfo.class); // send Data is wrong.
         HashMap<String, HostMessage> nameTable = info3.asClass(HashMap.class);
         ParentManager manager = info4.asClass(ParentManager.class);
         HostMessage disconnectNode = nameTable.get(disconnect.nodeName);
@@ -85,6 +85,15 @@
         } else {
             // disconnect message already received.
         }
+
+        // need debug option
+        for (LinkedList<HostMessage> list :topology.values()){
+            System.out.print(list.get(0).remoteAbsName+" : ");
+            for (HostMessage host : list){
+                System.out.print("[ "+host.absName+" "+host.name+" "+host.port+" "+host.connectionName+" "+host.reverseName+" "+host.remoteAbsName+" ]");
+            }
+            System.out.println();
+        }
         ods.put(info1.key, topology);
         ods.put(info2.key, info2.getReceiveData());
         ods.put(info3.key, nameTable);
--- a/src/main/java/alice/topology/fix/ReceiveDisconnectMessage.java	Tue Dec 02 14:49:20 2014 +0900
+++ b/src/main/java/alice/topology/fix/ReceiveDisconnectMessage.java	Tue Dec 02 17:16:34 2014 +0900
@@ -8,7 +8,7 @@
     private Receiver info = ids.create(CommandType.PEEK);
 
     public ReceiveDisconnectMessage() {
-        info.setKey("_DISCONNECT");
+        info.setKey("_DISCONNECTNODE");
     }
 
     @Override
--- a/src/main/java/alice/topology/manager/StartTopologyManager.java	Tue Dec 02 14:49:20 2014 +0900
+++ b/src/main/java/alice/topology/manager/StartTopologyManager.java	Tue Dec 02 17:16:34 2014 +0900
@@ -11,6 +11,7 @@
 
 import alice.codesegment.CodeSegment;
 import alice.topology.HostMessage;
+import alice.topology.fix.ReceiveDisconnectMessage;
 
 import com.alexmerz.graphviz.ParseException;
 import com.alexmerz.graphviz.Parser;
@@ -100,6 +101,7 @@
             ods.put("nameTable", nameTable);
             ods.put("hostCount", cominghostCount);
             new ComingServiceHosts();
+            new ReceiveDisconnectMessage();
         }
 
         ods.put("topology", new HashMap<String, LinkedList<HostMessage>>());
--- a/src/main/java/alice/topology/node/ConfigurationFinish.java	Tue Dec 02 14:49:20 2014 +0900
+++ b/src/main/java/alice/topology/node/ConfigurationFinish.java	Tue Dec 02 17:16:34 2014 +0900
@@ -5,7 +5,6 @@
 import alice.codesegment.CodeSegment;
 import alice.datasegment.CommandType;
 import alice.datasegment.Receiver;
-import alice.topology.manager.keeparive.CatchDisconnectEvent;
 
 public class ConfigurationFinish extends CodeSegment {
 
@@ -26,7 +25,7 @@
             Start cs = new Start(startCS);
             cs.done.setKey("manager", "start");
 
-            DisconnectEventManager.getInstance().register(CatchDisconnectEvent.class);
+            new ReceiveCloseMessage();
             DisconnectEventManager.getInstance().register(DeleteConnection.class);
             DisconnectEventManager.getInstance().setKey();
             return;
--- a/src/main/java/alice/topology/node/DeleteConnection.java	Tue Dec 02 14:49:20 2014 +0900
+++ b/src/main/java/alice/topology/node/DeleteConnection.java	Tue Dec 02 17:16:34 2014 +0900
@@ -21,11 +21,12 @@
         @SuppressWarnings("unchecked")
         List<String> list = info.asClass(List.class);
 
-        if (list.contains(name))
+        if (list.contains(name)) {
             list.remove(name);
-        if (DataSegment.contains(name))
-            DataSegment.get(name).shutdown();
-        ods.put(info.key, list);
+            if (DataSegment.contains(name))
+                DataSegment.get(name).shutdown();
+        }
+        ods.update(info.key, list);
     }
 
 }
--- a/src/main/java/alice/topology/node/ExecuteEvent.java	Tue Dec 02 14:49:20 2014 +0900
+++ b/src/main/java/alice/topology/node/ExecuteEvent.java	Tue Dec 02 17:16:34 2014 +0900
@@ -26,7 +26,7 @@
                 // set MetaInfo
                 if ("CloseEventCodeSegment".equals(clazz.getSuperclass().getSimpleName())) {
                     CloseEventCodeSegment obj = (CloseEventCodeSegment) clazz.newInstance();
-                    obj.metaInfo = info;
+                    obj.metaInfo = info1;
                     obj.ids.setKey();
                     obj.ids.receive();
                 } else if ("CodeSegment".equals(clazz.getSuperclass().getSimpleName())){
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/main/java/alice/topology/node/ReceiveCloseMessage.java	Tue Dec 02 17:16:34 2014 +0900
@@ -0,0 +1,23 @@
+package alice.topology.node;
+
+import alice.codesegment.CodeSegment;
+import alice.datasegment.CommandType;
+import alice.datasegment.DataSegment;
+import alice.datasegment.Receiver;
+
+public class ReceiveCloseMessage extends CodeSegment {
+
+    private Receiver info = ids.create(CommandType.TAKE);
+
+    public ReceiveCloseMessage() {
+        info.setKey("_CLOSEMESSEAGE");
+    }
+
+    @Override
+    public void run() {
+        String managerKey = info.asString();
+        DataSegment.get(managerKey).close();
+        new ReceiveCloseMessage();
+    }
+
+}