Mercurial > hg > Database > Alice
changeset 536:d2f7d02c4976 dispose
remoteDSM refactoring
author | Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp> |
---|---|
date | Mon, 15 Jun 2015 19:27:06 +0900 |
parents | dd20acf579bd |
children | 8f949fa80653 |
files | src/main/java/alice/daemon/AliceDaemon.java src/main/java/alice/datasegment/Command.java src/main/java/alice/datasegment/CompressedRemoteDataSegmentManager.java src/main/java/alice/datasegment/DataSegment.java src/main/java/alice/datasegment/DataSegmentManager.java src/main/java/alice/datasegment/RemoteDataSegmentManager.java src/main/java/alice/test/codesegment/remote/RemoteIncrement.java src/main/java/alice/test/codesegment/remote/RemoteStartCodeSegment.java |
diffstat | 8 files changed, 31 insertions(+), 28 deletions(-) [+] |
line wrap: on
line diff
--- a/src/main/java/alice/daemon/AliceDaemon.java Mon Jun 15 18:22:27 2015 +0900 +++ b/src/main/java/alice/daemon/AliceDaemon.java Mon Jun 15 19:27:06 2015 +0900 @@ -50,7 +50,7 @@ // listen on any address ipv4/ipv6 InetSocketAddress a = new InetSocketAddress("::", conf.localPort); - //System.out.println("AliceDaemon.listen: bind to " + a); + System.out.println("AliceDaemon.listen: bind to " + a); ss.bind(a); acceptThread = new AcceptThread(ss, "ACCEPT" + conf.localPort); acceptThread.start();
--- a/src/main/java/alice/datasegment/Command.java Mon Jun 15 18:22:27 2015 +0900 +++ b/src/main/java/alice/datasegment/Command.java Mon Jun 15 19:27:06 2015 +0900 @@ -160,4 +160,8 @@ public boolean getCompressFlag(){ return compressFlag; } + + public void setSeq(int seq) { + this.seq = seq; + } }
--- a/src/main/java/alice/datasegment/CompressedRemoteDataSegmentManager.java Mon Jun 15 18:22:27 2015 +0900 +++ b/src/main/java/alice/datasegment/CompressedRemoteDataSegmentManager.java Mon Jun 15 19:27:06 2015 +0900 @@ -19,6 +19,7 @@ public CompressedRemoteDataSegmentManager(){} public CompressedRemoteDataSegmentManager(Connection c) { + logger = Logger.getLogger(c.name); connection = c; connection.name = "compressed" + c.name; } @@ -100,13 +101,14 @@ } else { connection.sendCommand(cmd); } - if (logger.isDebugEnabled()) + if (logger.isDebugEnabled() logger.debug(cmd.getCommandString()); } @Override public void take(Receiver receiver, CodeSegment cs, boolean quickFlag) { int seq = this.seq.getAndIncrement(); + System.err.println("CompressedDataSegment take seq :" + seq); Command cmd = new Command(CommandType.TAKE, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null); cmd.setCompressFlag(true); cmd.setQuickFlag(quickFlag);
--- a/src/main/java/alice/datasegment/DataSegment.java Mon Jun 15 18:22:27 2015 +0900 +++ b/src/main/java/alice/datasegment/DataSegment.java Mon Jun 15 19:27:06 2015 +0900 @@ -41,10 +41,10 @@ System.exit(0); } RemoteDataSegmentManager manager = new RemoteDataSegmentManager(connectionKey, reverseKey, hostName, port); - //CompressedRemoteDataSegmentManager compressedManager = new CompressedRemoteDataSegmentManager(manager.connection); + CompressedRemoteDataSegmentManager compressedManager = new CompressedRemoteDataSegmentManager(manager.connection); register(connectionKey, manager); - //register("compressed" + connectionKey, compressedManager); + register("compressed" + connectionKey, compressedManager); return manager; }
--- a/src/main/java/alice/datasegment/DataSegmentManager.java Mon Jun 15 18:22:27 2015 +0900 +++ b/src/main/java/alice/datasegment/DataSegmentManager.java Mon Jun 15 19:27:06 2015 +0900 @@ -39,6 +39,7 @@ }; public Command getAndRemoveCmd(int index){ + System.err.println("DSM getAndRemoveCmd seq : " + index); return seqHash.remove(index); }
--- a/src/main/java/alice/datasegment/RemoteDataSegmentManager.java Mon Jun 15 18:22:27 2015 +0900 +++ b/src/main/java/alice/datasegment/RemoteDataSegmentManager.java Mon Jun 15 19:27:06 2015 +0900 @@ -63,6 +63,10 @@ public void put(String key, ReceiveData rData, boolean quickFlag) { Command cmd = new Command(CommandType.PUT, null, key, rData, 0, 0, null, null, ""); + put1(quickFlag, cmd); + } + + public void put1(boolean quickFlag, Command cmd) { if (quickFlag){ connection.write(cmd); // put command is executed right now } else { @@ -76,21 +80,22 @@ public void update(String key, ReceiveData rData, boolean quickFlag) { Command cmd = new Command(CommandType.UPDATE, null, key, rData, 0, 0, null, null, ""); - if (quickFlag){ - connection.write(cmd); - } else { - connection.sendCommand(cmd); - } - if (logger.isDebugEnabled()) - logger.debug(cmd.getCommandString()); + put1(quickFlag, cmd); } @Override public void take(Receiver receiver, CodeSegment cs, boolean quickFlag) { + + Command cmd = new Command(CommandType.TAKE, receiver, receiver.key, null, receiver.index, 0, replyQueue, cs, null); + take1(quickFlag, cmd); + } + + public void take1(boolean quickFlag, Command cmd) { int seq = this.seq.getAndIncrement(); - Command cmd = new Command(CommandType.TAKE, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null); + System.err.println("DataSegment take seq :" + seq); + cmd.setSeq(seq); + seqHash.put(seq, cmd); cmd.setQuickFlag(quickFlag); - seqHash.put(seq, cmd); if (quickFlag){ connection.write(cmd); } else { @@ -102,17 +107,8 @@ @Override public void peek(Receiver receiver, CodeSegment cs, boolean quickFlag) { - int seq = this.seq.getAndIncrement(); - Command cmd = new Command(CommandType.PEEK, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null); - cmd.setQuickFlag(quickFlag); - seqHash.put(seq, cmd); - if (quickFlag){ - connection.write(cmd); - } else { - connection.sendCommand(cmd); - } - if (logger.isDebugEnabled()) - logger.debug(cmd.getCommandString()); + Command cmd = new Command(CommandType.PEEK, receiver, receiver.key, null, receiver.index, 0, replyQueue, cs, null); + take1(quickFlag, cmd); } @Override
--- a/src/main/java/alice/test/codesegment/remote/RemoteIncrement.java Mon Jun 15 18:22:27 2015 +0900 +++ b/src/main/java/alice/test/codesegment/remote/RemoteIncrement.java Mon Jun 15 19:27:06 2015 +0900 @@ -18,9 +18,9 @@ if (num == 10) System.exit(0); RemoteIncrement cs = new RemoteIncrement(); - cs.num.setKey("remote", "num"); + cs.num.setKey("compressedremote", "num"); - ods.put("local", "num", num); + ods.put("compressedlocal", "num", num); } } \ No newline at end of file
--- a/src/main/java/alice/test/codesegment/remote/RemoteStartCodeSegment.java Mon Jun 15 18:22:27 2015 +0900 +++ b/src/main/java/alice/test/codesegment/remote/RemoteStartCodeSegment.java Mon Jun 15 19:27:06 2015 +0900 @@ -7,8 +7,8 @@ @Override public void run() { RemoteIncrement cs = new RemoteIncrement(); - cs.num.setKey("remote", "num"); + cs.num.setKey("compressedremote", "num"); - ods.put("local", "num", 0); + ods.put("compressedlocal", "num", 0); } } \ No newline at end of file