Mercurial > hg > Database > Alice
changeset 470:780ae843cdac dispose
Delete disconnect managerKey from connection list
author | sugi |
---|---|
date | Mon, 24 Nov 2014 19:52:13 +0900 |
parents | b31b1d197c42 |
children | be0b61986ff7 |
files | src/main/java/alice/codesegment/CloseEventCodeSegment.java src/main/java/alice/daemon/AcceptThread.java src/main/java/alice/daemon/Connection.java src/main/java/alice/daemon/ConnectionInfo.java src/main/java/alice/datasegment/RemoteDataSegmentManager.java src/main/java/alice/topology/manager/keeparive/CatchDisconnectEvent.java src/main/java/alice/topology/node/CloseEventCodeSegment.java src/main/java/alice/topology/node/ConfigurationFinish.java src/main/java/alice/topology/node/DeleteConnection.java src/main/java/alice/topology/node/DisconnectEventManager.java src/main/java/alice/topology/node/ExecuteEvent.java |
diffstat | 11 files changed, 94 insertions(+), 31 deletions(-) [+] |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/main/java/alice/codesegment/CloseEventCodeSegment.java Mon Nov 24 19:52:13 2014 +0900 @@ -0,0 +1,15 @@ +package alice.codesegment; + +import alice.daemon.ConnectionInfo; +import alice.datasegment.CommandType; +import alice.datasegment.Receiver; + +public abstract class CloseEventCodeSegment extends CodeSegment { + + public Receiver metaInfo = ids.create(CommandType.PEEK); + + public ConnectionInfo getConnectionInfo() { + return metaInfo.asClass(ConnectionInfo.class); + } + +}
--- a/src/main/java/alice/daemon/AcceptThread.java Sun Nov 23 22:14:30 2014 +0900 +++ b/src/main/java/alice/daemon/AcceptThread.java Mon Nov 24 19:52:13 2014 +0900 @@ -30,11 +30,9 @@ String key = "accept" + counter; IncomingTcpConnection incoming = new IncomingTcpConnection(connection, DataSegment.get("local"), key); - incoming.setPriority(MAX_PRIORITY); incoming.start(); DataSegment.setAccept(key, incoming); OutboundTcpConnection outbound = new OutboundTcpConnection(connection); - outbound.setPriority(MAX_PRIORITY); outbound.start(); counter++; } catch (IOException e) {
--- a/src/main/java/alice/daemon/Connection.java Sun Nov 23 22:14:30 2014 +0900 +++ b/src/main/java/alice/daemon/Connection.java Mon Nov 24 19:52:13 2014 +0900 @@ -13,6 +13,7 @@ public class Connection { public Socket socket; + public String name; public LinkedBlockingQueue<Command> sendQueue = new LinkedBlockingQueue<Command>(); public Connection(Socket socket) { @@ -55,17 +56,19 @@ socket.shutdownInput(); socket.close(); } catch (ClosedChannelException e) { - putConnectionInfo(); + return; } catch (IOException e) { - e.printStackTrace(); + return; } } public void putConnectionInfo() { - ConnectionInfo c = new ConnectionInfo(socket.getInetAddress().toString(), socket.getPort()); - ReceiveData rData = new ReceiveData(c, false, false); - DataSegment.getLocal().put("disconnect", rData, null); + if (name!=null){ + ConnectionInfo c = new ConnectionInfo(name, socket.getInetAddress().getHostAddress(), socket.getPort()); + ReceiveData rData = new ReceiveData(c, false, false); + DataSegment.getLocal().put("_DISCONNECT", rData, null); + } } }
--- a/src/main/java/alice/daemon/ConnectionInfo.java Sun Nov 23 22:14:30 2014 +0900 +++ b/src/main/java/alice/daemon/ConnectionInfo.java Mon Nov 24 19:52:13 2014 +0900 @@ -4,12 +4,14 @@ @Message public class ConnectionInfo { + public String name; public String addr; public int port; public ConnectionInfo(){} - public ConnectionInfo(String addr, int port) { + public ConnectionInfo(String name, String addr, int port) { + this.name = name; this.addr = addr; this.port = port; }
--- a/src/main/java/alice/datasegment/RemoteDataSegmentManager.java Sun Nov 23 22:14:30 2014 +0900 +++ b/src/main/java/alice/datasegment/RemoteDataSegmentManager.java Mon Nov 24 19:52:13 2014 +0900 @@ -3,6 +3,7 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.nio.channels.SocketChannel; +import java.util.concurrent.LinkedBlockingQueue; import org.apache.log4j.Logger; @@ -15,14 +16,17 @@ Connection connection; Logger logger; + private OutboundTcpConnection out; + private IncomingTcpConnection in; public RemoteDataSegmentManager(){} public RemoteDataSegmentManager(String connectionKey, final String reverseKey, final String hostName, final int port) { logger = Logger.getLogger(connectionKey); connection = new Connection(); + connection.name = connectionKey; final RemoteDataSegmentManager manager = this; - new Thread(replyThread, "RemoteDataSegmentManager-" + connectionKey).start(); + //new Thread(replyThread, "RemoteDataSegmentManager-" + connectionKey).start(); new Thread("Connect-" + connectionKey) { public void run() { boolean connect = true; @@ -41,8 +45,12 @@ } } } while (connect); - new IncomingTcpConnection(connection, manager, reverseKey).start(); - new OutboundTcpConnection(connection).start(); + in = new IncomingTcpConnection(connection, manager, reverseKey); + in.setPriority(MAX_PRIORITY); + in.start(); + out = new OutboundTcpConnection(connection); + out.setPriority(MAX_PRIORITY); + out.start(); } }.start(); } @@ -141,6 +149,26 @@ @Override public void shutdown() { connection.close(); + LinkedBlockingQueue<Command> queue = connection.sendQueue; + if (!queue.isEmpty()) { + queue.clear(); + } + + System.out.println(out.getState()); + if (out != null && out.getState() != Thread.State.TERMINATED) { + Command cmd = new Command(CommandType.CLOSE, null, null, null, 0, 0, null, null, ""); + queue.add(cmd); // outboundTCP Thread will stop. + out = null; + } + + System.out.println(in.getState()); + if (in != null || in.getState() != Thread.State.TERMINATED) { + in = null; + } + + if (DataSegment.contains(connection.name)) { + DataSegment.remove(connection.name); + } } }
--- a/src/main/java/alice/topology/manager/keeparive/CatchDisconnectEvent.java Sun Nov 23 22:14:30 2014 +0900 +++ b/src/main/java/alice/topology/manager/keeparive/CatchDisconnectEvent.java Mon Nov 24 19:52:13 2014 +0900 @@ -1,7 +1,7 @@ package alice.topology.manager.keeparive; +import alice.codesegment.CloseEventCodeSegment; import alice.daemon.ConnectionInfo; -import alice.topology.node.CloseEventCodeSegment; public class CatchDisconnectEvent extends CloseEventCodeSegment{
--- a/src/main/java/alice/topology/node/CloseEventCodeSegment.java Sun Nov 23 22:14:30 2014 +0900 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,16 +0,0 @@ -package alice.topology.node; - -import alice.codesegment.CodeSegment; -import alice.daemon.ConnectionInfo; -import alice.datasegment.CommandType; -import alice.datasegment.Receiver; - -public abstract class CloseEventCodeSegment extends CodeSegment { - - public Receiver metaInfo = ids.create(CommandType.PEEK); - - public ConnectionInfo getConnectionInfo() { - return metaInfo.asClass(ConnectionInfo.class); - } - -}
--- a/src/main/java/alice/topology/node/ConfigurationFinish.java Sun Nov 23 22:14:30 2014 +0900 +++ b/src/main/java/alice/topology/node/ConfigurationFinish.java Mon Nov 24 19:52:13 2014 +0900 @@ -26,8 +26,9 @@ Start cs = new Start(startCS); cs.done.setKey("manager", "start"); + DisconnectEventManager.getInstance().register(CatchDisconnectEvent.class); + DisconnectEventManager.getInstance().register(DeleteConnection.class); DisconnectEventManager.getInstance().setKey(); - DisconnectEventManager.getInstance().register(CatchDisconnectEvent.class); return; }
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/main/java/alice/topology/node/DeleteConnection.java Mon Nov 24 19:52:13 2014 +0900 @@ -0,0 +1,31 @@ +package alice.topology.node; + +import java.util.List; + +import alice.codesegment.CloseEventCodeSegment; +import alice.datasegment.CommandType; +import alice.datasegment.DataSegment; +import alice.datasegment.Receiver; + +public class DeleteConnection extends CloseEventCodeSegment { + + private Receiver info = ids.create(CommandType.TAKE); + + public DeleteConnection() { + info.setKey("_CLIST"); + } + + @Override + public void run() { + String name = getConnectionInfo().name; + @SuppressWarnings("unchecked") + List<String> list = info.asClass(List.class); + + if (list.contains(name)) + list.remove(name); + + DataSegment.get(name).shutdown(); + ods.put(info.key, list); + } + +}
--- a/src/main/java/alice/topology/node/DisconnectEventManager.java Sun Nov 23 22:14:30 2014 +0900 +++ b/src/main/java/alice/topology/node/DisconnectEventManager.java Mon Nov 24 19:52:13 2014 +0900 @@ -21,7 +21,7 @@ public void setKey() { ids.init(); - info.setKey("disconnect"); + info.setKey("_DISCONNECT"); } @SuppressWarnings("rawtypes")
--- a/src/main/java/alice/topology/node/ExecuteEvent.java Sun Nov 23 22:14:30 2014 +0900 +++ b/src/main/java/alice/topology/node/ExecuteEvent.java Mon Nov 24 19:52:13 2014 +0900 @@ -3,6 +3,7 @@ import java.lang.reflect.Field; import java.util.List; +import alice.codesegment.CloseEventCodeSegment; import alice.codesegment.CodeSegment; import alice.datasegment.CommandType; import alice.datasegment.Receiver; @@ -14,7 +15,7 @@ public ExecuteEvent() { info.setKey("_DEVENTLIST"); - info1.setKey("disconnect"); + info1.setKey("_DISCONNECT"); } @SuppressWarnings("unchecked")