# HG changeset patch # User akahori # Date 1545113565 -32400 # Node ID c6e4d0e4954c00c0a51cc298c936ca140a8b68c2 # Parent 136d2a6cd0f42af344af6da78598008297abe46f update datagear add shutdown diff -r 136d2a6cd0f4 -r c6e4d0e4954c scripts/local_test_run2.sh --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/scripts/local_test_run2.sh Tue Dec 18 15:12:45 2018 +0900 @@ -0,0 +1,26 @@ +#!/bin/bash + +if [ ! -d output ]; then + mkdir output +fi + +max=$1 +count=$2 +jar_path=../build/libs/Christie.jar + +mkdir -p Log + +ruby ./ring.rb $1 > Log/ring.dot +#dot -Tpng ./topology/ring.dot > ./topology/ring.png +#open ./topology/ring.png +java -cp $jar_path christie.topology.manager.StartTopologyManager --localPort 10000 --confFile Log/ring.dot & + +#sleep 3 + +cnt=0 +while [ $cnt -lt $max ] +do + java -cp $jar_path christie.test.topology.localTestTopology.StartTorqueTestTopology --managerHost `hostname` --managerPort 10000 --localPort `expr 20000 + $cnt`& + cnt=`expr $cnt + 1` +done +wait \ No newline at end of file diff -r 136d2a6cd0f4 -r c6e4d0e4954c src/main/java/christie/daemon/AcceptThread.java --- a/src/main/java/christie/daemon/AcceptThread.java Sat Dec 15 17:58:43 2018 +0900 +++ b/src/main/java/christie/daemon/AcceptThread.java Tue Dec 18 15:12:45 2018 +0900 @@ -22,14 +22,14 @@ public void run() { while (true) { try { - Socket socket = ss.accept(); + Socket socket = null; + socket = ss.accept(); socket.setTcpNoDelay(true); - //System.out.println("Accept " + socket.getInetAddress().getHostName() + ":" + socket.getPort()); - Connection connection = new Connection(socket); - connection.name = getName(); + System.out.println("Accept " + socket.getInetAddress().getHostName() + ":" + socket.getPort()); + Connection connection = new Connection(socket, cgm); String key = "accept" + counter; IncomingTcpConnection in = - new IncomingTcpConnection(connection, cgm); + new IncomingTcpConnection(connection); in.setName(connection.getInfoString()+"-IncomingTcp"); in.start(); cgm.setAccept(key, in); diff -r 136d2a6cd0f4 -r c6e4d0e4954c src/main/java/christie/daemon/Connection.java --- a/src/main/java/christie/daemon/Connection.java Sat Dec 15 17:58:43 2018 +0900 +++ b/src/main/java/christie/daemon/Connection.java Tue Dec 18 15:12:45 2018 +0900 @@ -1,21 +1,23 @@ package christie.daemon; -import java.awt.*; import java.net.Socket; import java.nio.ByteBuffer; import java.util.concurrent.LinkedBlockingQueue; +import christie.codegear.CodeGearManager; import christie.datagear.command.Command; public class Connection { public Socket socket; public String name; + public CodeGearManager cgm; public LinkedBlockingQueue sendQueue = new LinkedBlockingQueue(); public boolean sendManager = true; - public Connection(Socket socket) { + public Connection(Socket socket, CodeGearManager cgm) { this.socket = socket; + this.cgm = cgm; } public Connection() {} @@ -39,24 +41,24 @@ socket.shutdownOutput(); socket.shutdownInput(); socket.close(); - } catch (Exception e) { } + } catch (Exception e) { + e.printStackTrace(); + } //putConnectionInfo(); } - /* public void putConnectionInfo() { if (name!=null) { - ConnectionInfo c = new ConnectionInfo(name, socket); - ReceiveData rData = new ReceiveData(c); - DataSegment.getLocal().put("_DISCONNECT", rData, false); + ConnectionInfo connectionInfo = new ConnectionInfo(name, socket); + cgm.getLocalDGM().put("_DISCONNECT", connectionInfo); if (sendManager) { - DataSegment.get("manager").put("_DISCONNECTNODE", rData, false); + cgm.getDGM("manager").put("_DISCONNECTNODE", connectionInfo); sendManager = false; } } + }*/ - }*/ public synchronized void write(Command cmd) { ByteBuffer buffer = cmd.convert(); diff -r 136d2a6cd0f4 -r c6e4d0e4954c src/main/java/christie/daemon/IncomingTcpConnection.java --- a/src/main/java/christie/daemon/IncomingTcpConnection.java Sat Dec 15 17:58:43 2018 +0900 +++ b/src/main/java/christie/daemon/IncomingTcpConnection.java Tue Dec 18 15:12:45 2018 +0900 @@ -23,9 +23,9 @@ Connection connection; private MessagePack packer = new MessagePack(); - public IncomingTcpConnection(Connection connection, CodeGearManager cgm) { + public IncomingTcpConnection(Connection connection) { this.connection = connection; - this.cgm = cgm; + this.cgm = connection.cgm; } public void setManager(RemoteDataGearManager manager){ diff -r 136d2a6cd0f4 -r c6e4d0e4954c src/main/java/christie/datagear/DataGearManager.java --- a/src/main/java/christie/datagear/DataGearManager.java Sat Dec 15 17:58:43 2018 +0900 +++ b/src/main/java/christie/datagear/DataGearManager.java Tue Dec 18 15:12:45 2018 +0900 @@ -15,6 +15,8 @@ public abstract void resolveWaitCommand(String key, DataGear dg); public abstract void finish(); public abstract void close(); + public abstract void shutdown(); + } diff -r 136d2a6cd0f4 -r c6e4d0e4954c src/main/java/christie/datagear/LocalDataGearManager.java --- a/src/main/java/christie/datagear/LocalDataGearManager.java Sat Dec 15 17:58:43 2018 +0900 +++ b/src/main/java/christie/datagear/LocalDataGearManager.java Tue Dec 18 15:12:45 2018 +0900 @@ -89,7 +89,12 @@ } @Override - public void close() {} + public void close() { + + } + @Override + public void shutdown() { + } } diff -r 136d2a6cd0f4 -r c6e4d0e4954c src/main/java/christie/datagear/RemoteDataGearManager.java --- a/src/main/java/christie/datagear/RemoteDataGearManager.java Sat Dec 15 17:58:43 2018 +0900 +++ b/src/main/java/christie/datagear/RemoteDataGearManager.java Tue Dec 18 15:12:45 2018 +0900 @@ -10,6 +10,7 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.nio.channels.SocketChannel; +import java.util.concurrent.LinkedBlockingQueue; import static java.lang.Thread.MAX_PRIORITY; @@ -27,7 +28,7 @@ do { try { SocketChannel sc = SocketChannel.open(new InetSocketAddress(address, port)); - connection = new Connection(sc.socket()); + connection = new Connection(sc.socket(), cgm); connection.name = dgmName; connection.socket.setTcpNoDelay(true); @@ -44,7 +45,7 @@ } } } while (!connect); - IncomingTcpConnection in = new IncomingTcpConnection(connection, cgm); + IncomingTcpConnection in = new IncomingTcpConnection(connection); in.setManager(manager); in.setName(dgmName+"-IncomingTcp"); in.setPriority(MAX_PRIORITY); @@ -91,6 +92,7 @@ cm.setInputs(); } + @Override public void finish() { Command cmd = new FinishCommand(); @@ -100,11 +102,16 @@ @Override public void close() { Command cmd = new CloseCommand(); - connection.sendManager = false; connection.sendCommand(cmd); } - // + @Override + public void shutdown() { + connection.close(); + LinkedBlockingQueue queue = connection.sendQueue; + if (!queue.isEmpty()) queue.clear(); + } + public void connectWait(){ synchronized (lock){ while(!connect){ @@ -114,7 +121,6 @@ } } } - }