Mercurial > hg > Database > Alice
changeset 542:bd245df5cba3 dispose
add AliceDeamon on paramater
author | Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp> |
---|---|
date | Mon, 03 Aug 2015 19:00:06 +0900 |
parents | ba9e8b32220d |
children | f3f7e256ee03 |
files | src/main/java/alice/codesegment/CodeSegmentManager.java src/main/java/alice/codesegment/InputDataSegment.java src/main/java/alice/codesegment/OutputDataSegment.java src/main/java/alice/daemon/AcceptThread.java src/main/java/alice/daemon/AliceDaemon.java src/main/java/alice/daemon/Connection.java src/main/java/alice/daemon/MulticastConnection.java src/main/java/alice/daemon/OutboundTcpConnection.java src/main/java/alice/datasegment/Command.java src/main/java/alice/datasegment/DataSegment.java src/main/java/alice/datasegment/DataSegmentKey.java src/main/java/alice/datasegment/RemoteDataSegmentManager.java |
diffstat | 12 files changed, 88 insertions(+), 77 deletions(-) [+] |
line wrap: on
line diff
--- a/src/main/java/alice/codesegment/CodeSegmentManager.java Mon Aug 03 18:40:41 2015 +0900 +++ b/src/main/java/alice/codesegment/CodeSegmentManager.java Mon Aug 03 19:00:06 2015 +0900 @@ -14,10 +14,11 @@ private ThreadFactory maxPriority; private ThreadFactory minPriority; private ThreadFactory normalPriority; + private AliceDaemon alice; private Logger logger = Logger.getLogger(CodeSegmentManager.class); - public CodeSegmentManager() { + public CodeSegmentManager(AliceDaemon aliceDaemon) { codeSegmentExecutor = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), // initial number of threads Runtime.getRuntime().availableProcessors(), Integer.MAX_VALUE, // keepAliveTime @@ -26,14 +27,15 @@ normalPriority = codeSegmentExecutor.getThreadFactory(); maxPriority = makeThreadFactory(Thread.MAX_PRIORITY); minPriority = makeThreadFactory(Thread.MIN_PRIORITY); + alice = aliceDaemon; } - public CodeSegmentManager get() {//ToDo:あとでstatic消す - return new CodeSegmentManager(); + public CodeSegmentManager get() { + return new CodeSegmentManager(alice); } - public void submit(CodeSegment cs) {//ToDo:あとでstatic消す - CodeSegmentManager csManager = AliceDaemon.codeSegmentManager.get(); + public void submit(CodeSegment cs) { + CodeSegmentManager csManager = alice.codeSegmentManager.get(); if (cs.getPriority() < Thread.NORM_PRIORITY) { csManager.codeSegmentExecutor.setThreadFactory(csManager.minPriority); } else if (cs.getPriority() < Thread.MAX_PRIORITY) {
--- a/src/main/java/alice/codesegment/InputDataSegment.java Mon Aug 03 18:40:41 2015 +0900 +++ b/src/main/java/alice/codesegment/InputDataSegment.java Mon Aug 03 19:00:06 2015 +0900 @@ -35,23 +35,23 @@ public void quickPeek(Receiver receiver) {//SEDAで実行 cs.register(receiver); - AliceDaemon.dataSegment.get(receiver.managerKey).peek(receiver, cs, true); + alice.dataSegment.get(receiver.managerKey).peek(receiver, cs, true); } public void peek(Receiver receiver) { cs.register(receiver); - AliceDaemon.dataSegment.get(receiver.managerKey).peek(receiver, cs, false); + alice.dataSegment.get(receiver.managerKey).peek(receiver, cs, false); } public void quickTake(Receiver receiver) { cs.register(receiver); - AliceDaemon.dataSegment.get(receiver.managerKey).take(receiver, cs, true); + alice.dataSegment.get(receiver.managerKey).take(receiver, cs, true); } public void take(Receiver receiver) { cs.register(receiver); - AliceDaemon.dataSegment.get(receiver.managerKey).take(receiver, cs, false); + alice.dataSegment.get(receiver.managerKey).take(receiver, cs, false); } public void reply(Receiver receiver, Command reply) { @@ -74,7 +74,7 @@ public void receive() { if (count.decrementAndGet() == 0) { - AliceDaemon.codeSegmentManager.submit(cs); + alice.codeSegmentManager.submit(cs); } } @@ -89,7 +89,7 @@ public void recommand(Receiver receiver) { // TODO why only local? - AliceDaemon.dataSegment.getLocal().recommand(receiver, cs); + alice.dataSegment.getLocal().recommand(receiver, cs); } public void setCounter(int cnt){
--- a/src/main/java/alice/codesegment/OutputDataSegment.java Mon Aug 03 18:40:41 2015 +0900 +++ b/src/main/java/alice/codesegment/OutputDataSegment.java Mon Aug 03 19:00:06 2015 +0900 @@ -10,7 +10,7 @@ public class OutputDataSegment { private final CodeSegment cs; - private final AliceDaemon alice; + private final AliceDaemon alice; public OutputDataSegment(CodeSegment cs){ this.cs = cs; @@ -26,17 +26,17 @@ */ public void flip(Receiver receiver) { if (receiver.isCompressed()){ - AliceDaemon.dataSegment.getCompressedLocal().put(receiver.key, receiver.getReceiveData(), false); + alice.dataSegment.getCompressedLocal().put(receiver.key, receiver.getReceiveData(), false); } else { - AliceDaemon.dataSegment.getLocal().put(receiver.key, receiver.getReceiveData(), false); + alice.dataSegment.getLocal().put(receiver.key, receiver.getReceiveData(), false); } } public void flip(String managerKey, String key, Receiver receiver){ if (receiver.isCompressed()){ - AliceDaemon.dataSegment.get("compressed" + managerKey).put(key, receiver.getReceiveData(), false); + alice.dataSegment.get("compressed" + managerKey).put(key, receiver.getReceiveData(), false); } else { - AliceDaemon.dataSegment.get(managerKey).put(key, receiver.getReceiveData(), false); + alice.dataSegment.get(managerKey).put(key, receiver.getReceiveData(), false); } } @@ -45,16 +45,16 @@ switch (type) { case PUT: if (receiver.isCompressed()){ - AliceDaemon.dataSegment.getCompressedLocal().put(receiver.key, receiver.getReceiveData(), false);//localなら全部false。 + alice.dataSegment.getCompressedLocal().put(receiver.key, receiver.getReceiveData(), false);//localなら全部false。 } else { - AliceDaemon.dataSegment.getLocal().put(receiver.key, receiver.getReceiveData(), false); + alice.dataSegment.getLocal().put(receiver.key, receiver.getReceiveData(), false); } break; case UPDATE: if (receiver.isCompressed()){ - AliceDaemon.dataSegment.getCompressedLocal().update(receiver.key, receiver.getReceiveData(), false); + alice.dataSegment.getCompressedLocal().update(receiver.key, receiver.getReceiveData(), false); } else { - AliceDaemon.dataSegment.getLocal().update(receiver.key, receiver.getReceiveData(), false); + alice.dataSegment.getLocal().update(receiver.key, receiver.getReceiveData(), false); } break; @@ -64,56 +64,56 @@ } public void put(String key, ReceiveData rData) { - AliceDaemon.dataSegment.getLocal().put(key, rData, false); + alice.dataSegment.getLocal().put(key, rData, false); } public void put(String key, Object val) { ReceiveData rData = new ReceiveData(val); - AliceDaemon.dataSegment.getLocal().put(key, rData, false); + alice.dataSegment.getLocal().put(key, rData, false); } public void update(String key, Object val) { ReceiveData rData = new ReceiveData(val); - AliceDaemon.dataSegment.getLocal().update(key, rData, false); + alice.dataSegment.getLocal().update(key, rData, false); } /** * for remote */ public void put(String managerKey, String key, ReceiveData rData) { - AliceDaemon.dataSegment.get(managerKey).put(key, rData, false); + alice.dataSegment.get(managerKey).put(key, rData, false); } public void put(String managerKey, String key, Object val) {//追加 ReceiveData rData = new ReceiveData(val); - AliceDaemon.dataSegment.get(managerKey).put(key, rData, false); + alice.dataSegment.get(managerKey).put(key, rData, false); } public void quickPut(String managerKey, String key, ReceiveData rData) { - AliceDaemon.dataSegment.get(managerKey).put(key, rData, true); + alice.dataSegment.get(managerKey).put(key, rData, true); } public void quickPut(String managerKey, String key, Object val) { ReceiveData rData = new ReceiveData(val); - AliceDaemon.dataSegment.get(managerKey).put(key, rData, true); + alice.dataSegment.get(managerKey).put(key, rData, true); } public void update(String managerKey, String key, ReceiveData rData) { - AliceDaemon.dataSegment.get(managerKey).update(key, rData, false); + alice.dataSegment.get(managerKey).update(key, rData, false); } public void update(String managerKey, String key, Object val) { ReceiveData rData = new ReceiveData(val); - AliceDaemon.dataSegment.get(managerKey).update(key, rData, false); + alice.dataSegment.get(managerKey).update(key, rData, false); } public void quickUpdate(String managerKey, String key, ReceiveData rData) { - AliceDaemon.dataSegment.get(managerKey).update(key, rData, true); + alice.dataSegment.get(managerKey).update(key, rData, true); } public void quickUpdate(String managerKey, String key, Object val, boolean cFlag) { ReceiveData rData = new ReceiveData(val); - AliceDaemon.dataSegment.get(managerKey).update(key, rData, true); + alice.dataSegment.get(managerKey).update(key, rData, true); } /** @@ -123,19 +123,19 @@ */ public void finish(String managerKey) { - if (AliceDaemon.dataSegment.contains(managerKey)) - AliceDaemon.dataSegment.get(managerKey).finish(); + if (alice.dataSegment.contains(managerKey)) + alice.dataSegment.get(managerKey).finish(); } /** - * close socket for RemoteAliceDaemon.dataSegment after send other messages. + * close socket for Remotealice.dataSegment after send other messages. * * @param managerKey */ public void close(String managerKey) { - if (AliceDaemon.dataSegment.contains(managerKey)) - AliceDaemon.dataSegment.get(managerKey).close(); + if (alice.dataSegment.contains(managerKey)) + alice.dataSegment.get(managerKey).close(); } /** @@ -146,8 +146,8 @@ * @param returnKey */ public void ping(String managerKey, String returnKey) { - if (AliceDaemon.dataSegment.contains(managerKey)) - AliceDaemon.dataSegment.get(managerKey).ping(returnKey); + if (alice.dataSegment.contains(managerKey)) + alice.dataSegment.get(managerKey).ping(returnKey); } }
--- a/src/main/java/alice/daemon/AcceptThread.java Mon Aug 03 18:40:41 2015 +0900 +++ b/src/main/java/alice/daemon/AcceptThread.java Mon Aug 03 19:00:06 2015 +0900 @@ -10,13 +10,15 @@ public class AcceptThread extends Thread { + private final AliceDaemon alice; private ServerSocket ss; private Logger log = Logger.getLogger(AcceptThread.class); public int counter = 0; - public AcceptThread(ServerSocket ss, String name) { + public AcceptThread(ServerSocket ss, String name, AliceDaemon aliceDaemon) { super(name); this.ss = ss; + this.alice = aliceDaemon; } @Override @@ -26,13 +28,13 @@ Socket socket = ss.accept(); socket.setTcpNoDelay(true); log.info("Accept " + socket.getInetAddress().getHostName() + ":" + socket.getPort()); - Connection connection = new Connection(socket); + Connection connection = new Connection(socket, alice); String key = "accept" + counter; IncomingTcpConnection in = - new IncomingTcpConnection(connection, AliceDaemon.dataSegment.get("local"), key); + new IncomingTcpConnection(connection, alice.dataSegment.get("local"), key); in.setName(connection.getInfoString()+"-IncomingTcp"); in.start(); - AliceDaemon.dataSegment.setAccept(key, in); + alice.dataSegment.setAccept(key, in); OutboundTcpConnection out = new OutboundTcpConnection(connection); out.setName(connection.getInfoString()+"-OutboundTcp"); out.start();
--- a/src/main/java/alice/daemon/AliceDaemon.java Mon Aug 03 18:40:41 2015 +0900 +++ b/src/main/java/alice/daemon/AliceDaemon.java Mon Aug 03 19:00:06 2015 +0900 @@ -27,12 +27,12 @@ public final MessagePack packer = new MessagePack(); public DataSegment dataSegment = new DataSegment(); public HashMap<Integer, CommandType> hash = new HashMap<Integer, CommandType>();//コマンド対応表 - public int lastId = 0;//コマンドの総数 - public CodeSegmentManager codeSegmentManager = new CodeSegmentManager(); + public CodeSegmentManager codeSegmentManager; public AliceDaemon(Config conf) { this.conf = conf; setLogger(); + codeSegmentManager = new CodeSegmentManager(this); commandInit();//static } @@ -79,7 +79,7 @@ System.out.println("AliceDaemon.listen: bind to " + a); ss.bind(a); - acceptThread = new AcceptThread(ss, "ACCEPT" + conf.localPort); + acceptThread = new AcceptThread(ss, "ACCEPT" + conf.localPort, this); acceptThread.start(); } catch (IOException e) { e.printStackTrace();
--- a/src/main/java/alice/daemon/Connection.java Mon Aug 03 18:40:41 2015 +0900 +++ b/src/main/java/alice/daemon/Connection.java Mon Aug 03 19:00:06 2015 +0900 @@ -11,16 +11,20 @@ public class Connection { + private final AliceDaemon alice; public Socket socket; public String name; public LinkedBlockingQueue<Command> sendQueue = new LinkedBlockingQueue<Command>(); public boolean sendManager = true; - public Connection(Socket socket) { + public Connection(Socket socket, AliceDaemon alice) { this.socket = socket; + this.alice = alice; } - public Connection() {} + public Connection(AliceDaemon alice) { + this.alice = alice; + } public void sendCommand(Command cmd) { try { @@ -35,8 +39,8 @@ + ":" + socket.getPort(); } - public synchronized void write(Command cmd) { - ByteBuffer buffer = cmd.convert(); + public synchronized void write(Command cmd, Object alie) { + ByteBuffer buffer = cmd.convert(alice); try { while (buffer.hasRemaining()) { socket.getChannel().write(buffer); @@ -59,9 +63,9 @@ if (name!=null){ ConnectionInfo c = new ConnectionInfo(name, socket); ReceiveData rData = new ReceiveData(c); - AliceDaemon.dataSegment.getLocal().put("_DISCONNECT", rData, false); + alice.dataSegment.getLocal().put("_DISCONNECT", rData, false); if (sendManager) { - AliceDaemon.dataSegment.get("manager").put("_DISCONNECTNODE", rData, false); + alice.dataSegment.get("manager").put("_DISCONNECTNODE", rData, false); sendManager = false; } }
--- a/src/main/java/alice/daemon/MulticastConnection.java Mon Aug 03 18:40:41 2015 +0900 +++ b/src/main/java/alice/daemon/MulticastConnection.java Mon Aug 03 19:00:06 2015 +0900 @@ -18,8 +18,8 @@ // may need to add infomation who send on ds. @Override - public synchronized void write(Command cmd){ - ByteBuffer buffer = cmd.convert(); + public synchronized void write(Command cmd, Object alie){ + ByteBuffer buffer = cmd.convert(alice); try { while (buffer.hasRemaining()){ dc.send(buffer, sAddr);
--- a/src/main/java/alice/daemon/OutboundTcpConnection.java Mon Aug 03 18:40:41 2015 +0900 +++ b/src/main/java/alice/daemon/OutboundTcpConnection.java Mon Aug 03 19:00:06 2015 +0900 @@ -27,7 +27,7 @@ default: break; } - connection.write(cmd);//ここでconvert()がよばれてる + connection.write(cmd, alie);//ここでconvert()がよばれてる } catch (InterruptedException e) { e.printStackTrace(); }
--- a/src/main/java/alice/datasegment/Command.java Mon Aug 03 18:40:41 2015 +0900 +++ b/src/main/java/alice/datasegment/Command.java Mon Aug 03 19:00:06 2015 +0900 @@ -28,8 +28,6 @@ private boolean quickFlag = false;//SEDAを挟まずに処理を行うかどうか。trueだとリモート private boolean compressFlag = false;//trueだったら圧縮する必要がある - private MessagePack packer = AliceDaemon.packer; - /** * for PEEK/TAKE */ @@ -73,9 +71,11 @@ } /** * @return serialized ByteBuffer + * @param alice */ - public ByteBuffer convert() { + public ByteBuffer convert(AliceDaemon alice) { ByteBuffer buf = null; + MessagePack packer = alice.packer; try { byte[] header = null;
--- a/src/main/java/alice/datasegment/DataSegment.java Mon Aug 03 18:40:41 2015 +0900 +++ b/src/main/java/alice/datasegment/DataSegment.java Mon Aug 03 19:00:06 2015 +0900 @@ -9,35 +9,37 @@ public class DataSegment { + private final AliceDaemon alice; //private static DataSegment dataSegment = new DataSegment();//ToDo: static消す private LocalDataSegmentManager local = new LocalDataSegmentManager(); private CompressedLocalDataSegmentManager compressedLocal = new CompressedLocalDataSegmentManager(local);//追加 private ConcurrentHashMap<String, DataSegmentManager> dataSegmentManagers = new ConcurrentHashMap<String, DataSegmentManager>(); //TODO Over Head private ConcurrentHashMap<String, IncomingTcpConnection> acceptHash = new ConcurrentHashMap<String, IncomingTcpConnection>(); - public DataSegment() { + public DataSegment(AliceDaemon alice) { dataSegmentManagers.put("local", local); dataSegmentManagers.put("compressedlocal", compressedLocal); + this.alice = alice; } public DataSegmentManager get(String key) { if (key == null){ - return AliceDaemon.dataSegment.dataSegmentManagers.get("local"); + return alice.dataSegment.dataSegmentManagers.get("local"); } else { - return AliceDaemon.dataSegment.dataSegmentManagers.get(key); + return alice.dataSegment.dataSegmentManagers.get(key); } } public LocalDataSegmentManager getLocal() { - return AliceDaemon.dataSegment.local; + return alice.dataSegment.local; } public CompressedLocalDataSegmentManager getCompressedLocal() {//追加 - return AliceDaemon.dataSegment.compressedLocal; + return alice.dataSegment.compressedLocal; } public void register(String key, DataSegmentManager manager) { - AliceDaemon.dataSegment.dataSegmentManagers.put(key, manager); + alice.dataSegment.dataSegmentManagers.put(key, manager); } public RemoteDataSegmentManager connect(String connectionKey, String reverseKey, String hostName, int port) { @@ -45,7 +47,7 @@ System.out.println("You can't use 'compressed' for DataSegmentManager name."); System.exit(0); } - RemoteDataSegmentManager manager = new RemoteDataSegmentManager(connectionKey, reverseKey, hostName, port); + RemoteDataSegmentManager manager = new RemoteDataSegmentManager(connectionKey, reverseKey, hostName, port, alice); CompressedRemoteDataSegmentManager compressedManager = new CompressedRemoteDataSegmentManager(manager); register(connectionKey, manager); @@ -62,18 +64,18 @@ } public void setAccept(String key, IncomingTcpConnection incoming) { - AliceDaemon.dataSegment.acceptHash.put(key, incoming); + alice.dataSegment.acceptHash.put(key, incoming); } public IncomingTcpConnection getAccept(String key) { - return AliceDaemon.dataSegment.acceptHash.get(key); + return alice.dataSegment.acceptHash.get(key); } public void remove(String key){ - AliceDaemon.dataSegment.dataSegmentManagers.remove(key); + alice.dataSegment.dataSegmentManagers.remove(key); } public boolean contains(String key){ - return AliceDaemon.dataSegment.dataSegmentManagers.containsKey(key); + return alice.dataSegment.dataSegmentManagers.containsKey(key); } }
--- a/src/main/java/alice/datasegment/DataSegmentKey.java Mon Aug 03 18:40:41 2015 +0900 +++ b/src/main/java/alice/datasegment/DataSegmentKey.java Mon Aug 03 19:00:06 2015 +0900 @@ -3,8 +3,6 @@ import java.util.ArrayList; import java.util.Iterator; -import alice.datasegment.Command; - /** * ここがコマンドの中身部分 * @@ -95,7 +93,7 @@ if (!cmd.getQuickFlag()) { cmd.connection.sendQueue.put(rCmd); } else { - cmd.connection.write(rCmd); + cmd.connection.write(rCmd, alie); } } catch (InterruptedException e) { e.printStackTrace();
--- a/src/main/java/alice/datasegment/RemoteDataSegmentManager.java Mon Aug 03 18:40:41 2015 +0900 +++ b/src/main/java/alice/datasegment/RemoteDataSegmentManager.java Mon Aug 03 19:00:06 2015 +0900 @@ -5,6 +5,7 @@ import java.nio.channels.SocketChannel; import java.util.concurrent.LinkedBlockingQueue; +import alice.daemon.AliceDaemon; import org.apache.log4j.Logger; import alice.codesegment.CodeSegment; @@ -18,9 +19,9 @@ public RemoteDataSegmentManager(){} - public RemoteDataSegmentManager(String connectionKey, final String reverseKey, final String hostName, final int port) { + public RemoteDataSegmentManager(String connectionKey, final String reverseKey, final String hostName, final int port, AliceDaemon alice) { logger = Logger.getLogger(connectionKey); - connection = new Connection(); + connection = new Connection(alice); connection.name = connectionKey; final RemoteDataSegmentManager manager = this; //new Thread(replyThread, "RemoteDataSegmentManager-" + connectionKey).start(); @@ -56,6 +57,8 @@ }.start(); } + + /** * send put command to target DataSegment */ @@ -68,7 +71,7 @@ public void put1(boolean quickFlag, Command cmd) { if (quickFlag){ - connection.write(cmd); // put command is executed right now + connection.write(cmd, alie); // put command is executed right now } else { connection.sendCommand(cmd); // put command on the transmission thread } @@ -97,7 +100,7 @@ seqHash.put(seq, cmd); cmd.setQuickFlag(quickFlag); if (quickFlag){ - connection.write(cmd); + connection.write(cmd, alie); } else { connection.sendCommand(cmd); } @@ -128,13 +131,13 @@ @Override public void ping(String returnKey) { Command cmd = new Command(CommandType.PING, null, returnKey, null, 0, 0, null, null, ""); - connection.write(cmd); + connection.write(cmd, alie); } @Override public void response(String returnKey) { Command cmd = new Command(CommandType.RESPONSE, null, returnKey, null, 0, 0, null, null, ""); - connection.write(cmd); + connection.write(cmd, alie); } @Override