Mercurial > hg > Database > Alice
changeset 488:7ef0ebb40c9b dispose
add measurement data in protocol
author | sugi |
---|---|
date | Mon, 08 Dec 2014 23:11:26 +0900 |
parents | c1cf44777eef |
children | 9a7dd7591ddc |
files | src/main/java/alice/daemon/CommandMessage.java src/main/java/alice/daemon/IncomingTcpConnection.java src/main/java/alice/datasegment/Command.java src/main/java/alice/datasegment/ReceiveData.java src/main/java/alice/topology/manager/keeparive/ListManager.java src/main/java/alice/topology/manager/keeparive/RespondPing.java src/main/java/alice/topology/node/ConfigurationFinish.java |
diffstat | 7 files changed, 33 insertions(+), 12 deletions(-) [+] |
line wrap: on
line diff
--- a/src/main/java/alice/daemon/CommandMessage.java Mon Dec 08 20:55:10 2014 +0900 +++ b/src/main/java/alice/daemon/CommandMessage.java Mon Dec 08 23:11:26 2014 +0900 @@ -12,6 +12,10 @@ public boolean serialized = false; public boolean compressed = false; + public boolean setTime = false; + public long time; + public int depth; + public CommandMessage() {} public CommandMessage(int type, int index, int seq, String key
--- a/src/main/java/alice/daemon/IncomingTcpConnection.java Mon Dec 08 20:55:10 2014 +0900 +++ b/src/main/java/alice/daemon/IncomingTcpConnection.java Mon Dec 08 23:11:26 2014 +0900 @@ -60,6 +60,11 @@ case UPDATE: case PUT: rData = new ReceiveData(getSerializedByteArray(unpacker), msg.compressed, msg.serialized); + if (msg.setTime) { + rData.setTime = true; + rData.time = msg.time; + rData.depth = msg.depth; + } cmd = new Command(type, null, null, rData, 0, 0, null, null, reverseKey); lmanager.getDataSegmentKey(msg.key).runCommand(cmd); break;
--- a/src/main/java/alice/datasegment/Command.java Mon Dec 08 20:55:10 2014 +0900 +++ b/src/main/java/alice/datasegment/Command.java Mon Dec 08 23:11:26 2014 +0900 @@ -102,8 +102,14 @@ compressed = true; } } + CommandMessage cm = new CommandMessage(type.id, index, seq, key, false, serialized, compressed); + if (rData.setTime) { + cm.setTime = true; + cm.time = rData.time; + cm.depth = rData.depth + 1; + } - header = msg.write(new CommandMessage(type.id, index, seq, key, false, serialized, compressed)); + header = msg.write(cm); dataSize = msg.write(data.length); buf = ByteBuffer.allocate(header.length+dataSize.length+data.length); buf.put(header);
--- a/src/main/java/alice/datasegment/ReceiveData.java Mon Dec 08 20:55:10 2014 +0900 +++ b/src/main/java/alice/datasegment/ReceiveData.java Mon Dec 08 23:11:26 2014 +0900 @@ -18,6 +18,10 @@ private boolean serialized = false; private boolean byteArray = false; + public long time; + public boolean setTime = false; + public int depth = 1; + public ReceiveData(Object obj, boolean cFlag, boolean sFlag){ val = obj; compressed = cFlag;
--- a/src/main/java/alice/topology/manager/keeparive/ListManager.java Mon Dec 08 20:55:10 2014 +0900 +++ b/src/main/java/alice/topology/manager/keeparive/ListManager.java Mon Dec 08 23:11:26 2014 +0900 @@ -85,7 +85,8 @@ } public void deleteAll(String name) { - if (TaskExecuter.getInstance().getNowTask().getManagerKey().equals(name)) + if (TaskExecuter.getInstance().getNowTask().getManagerKey() != null && + TaskExecuter.getInstance().getNowTask().getManagerKey().equals(name)) TaskExecuter.getInstance().skip(); TaskInfo task = new TaskInfo(TaskType.CLOSE); task.setInfo(name, 0);
--- a/src/main/java/alice/topology/manager/keeparive/RespondPing.java Mon Dec 08 20:55:10 2014 +0900 +++ b/src/main/java/alice/topology/manager/keeparive/RespondPing.java Mon Dec 08 23:11:26 2014 +0900 @@ -2,12 +2,11 @@ import alice.codesegment.CodeSegment; import alice.datasegment.CommandType; -import alice.datasegment.DataSegment; import alice.datasegment.Receiver; public class RespondPing extends CodeSegment{ private Receiver respond = ids.create(CommandType.TAKE); - private long pingedTime = System.currentTimeMillis(); +// private long pingedTime = System.currentTimeMillis(); public RespondPing(String key) { respond.setKey(key); @@ -16,13 +15,13 @@ @Override public void run() { RespondData d = respond.asClass(RespondData.class); - System.out.print("ping from "+d.from); - System.out.println(" Recieved time "+(d.time - pingedTime)); - if (d.time - pingedTime > 60 * 1000){ - // need check, this connection is alive. may be close - if (DataSegment.contains(d.from)) - DataSegment.get(d.from).shutdown(); - } else { +// System.out.print("ping from "+d.from); +// System.out.println(" Recieved time "+(d.time - pingedTime)); +// if (d.time - pingedTime > 60 * 1000){ +// // need check, this connection is alive. may be close +// if (DataSegment.contains(d.from)) +// DataSegment.get(d.from).shutdown(); +// } else { // if nowTask close d.from's socket cancel. // if not remove close task in the Queue. TaskExecuter exec = TaskExecuter.getInstance(); @@ -36,6 +35,6 @@ ods.put("_REMOVETASK",task); new RemoveTask(); } - } +// } } }
--- a/src/main/java/alice/topology/node/ConfigurationFinish.java Mon Dec 08 20:55:10 2014 +0900 +++ b/src/main/java/alice/topology/node/ConfigurationFinish.java Mon Dec 08 23:11:26 2014 +0900 @@ -5,6 +5,7 @@ import alice.codesegment.CodeSegment; import alice.datasegment.CommandType; import alice.datasegment.Receiver; +import alice.topology.manager.keeparive.StartKeepAlive; public class ConfigurationFinish extends CodeSegment { @@ -25,6 +26,7 @@ Start cs = new Start(startCS); cs.done.setKey("manager", "start"); + //new StartKeepAlive().execute(); new ReceiveCloseMessage(); DisconnectEventManager.getInstance().register(DeleteConnection.class); DisconnectEventManager.getInstance().setKey();