Mercurial > hg > Database > Alice
changeset 308:a8255a831ade
implement ping api
author | sugi |
---|---|
date | Tue, 19 Nov 2013 17:39:44 +0900 |
parents | 52bb813ed52e |
children | 797267843126 |
files | src/alice/codesegment/OutputDataSegment.java src/alice/daemon/IncomingTcpConnection.java src/alice/datasegment/CommandType.java src/alice/datasegment/DataSegmentKey.java src/alice/datasegment/DataSegmentManager.java src/alice/datasegment/LocalDataSegmentManager.java src/alice/datasegment/RemoteDataSegmentManager.java src/alice/topology/manager/keeparive/PingScheduler.java src/alice/topology/manager/keeparive/SendPing.java |
diffstat | 9 files changed, 55 insertions(+), 15 deletions(-) [+] |
line wrap: on
line diff
--- a/src/alice/codesegment/OutputDataSegment.java Tue Nov 19 15:51:52 2013 +0900 +++ b/src/alice/codesegment/OutputDataSegment.java Tue Nov 19 17:39:44 2013 +0900 @@ -85,4 +85,15 @@ DataSegment.get(managerKey).close(); } + /** + * "key" is not remote DataSegment's key. + * "Ping Response" return in this "key" + * + * @param managerKey + * @param key + */ + public void ping(String managerKey ,String returnKey) { + DataSegment.get(managerKey).ping(returnKey); + } + }
--- a/src/alice/daemon/IncomingTcpConnection.java Tue Nov 19 15:51:52 2013 +0900 +++ b/src/alice/daemon/IncomingTcpConnection.java Tue Nov 19 17:39:44 2013 +0900 @@ -62,6 +62,12 @@ cmd.cs.ids.reply(cmd.receiver, new Command(type, null, null, msg.val, msg.index, msg.seq, null, null, null)); cmd=null; break; + case PING: + DataSegment.get(reverseKey).response(msg.key); + break; + case RESPONSE: + DataSegment.getLocal().put(msg.key, System.currentTimeMillis()); + break; default: break; }
--- a/src/alice/datasegment/CommandType.java Tue Nov 19 15:51:52 2013 +0900 +++ b/src/alice/datasegment/CommandType.java Tue Nov 19 17:39:44 2013 +0900 @@ -10,7 +10,9 @@ REMOVE, REPLY, CLOSE, - FINISH; + FINISH, + PING, + RESPONSE; public int id; public static HashMap<Integer, CommandType> hash = new HashMap<Integer, CommandType>();
--- a/src/alice/datasegment/DataSegmentKey.java Tue Nov 19 15:51:52 2013 +0900 +++ b/src/alice/datasegment/DataSegmentKey.java Tue Nov 19 17:39:44 2013 +0900 @@ -25,7 +25,7 @@ case PUT: int index = tailIndex; tailIndex++; - DataSegmentValue dsv = new DataSegmentValue(index, cmd.val, cmd.obj,cmd.reverseKey); + DataSegmentValue dsv = new DataSegmentValue(index, cmd.val, cmd.obj, cmd.reverseKey); dataList.add(dsv); // Process waiting peek and take commands for (Iterator<Command> iter = waitList.iterator(); iter.hasNext(); ) {
--- a/src/alice/datasegment/DataSegmentManager.java Tue Nov 19 15:51:52 2013 +0900 +++ b/src/alice/datasegment/DataSegmentManager.java Tue Nov 19 17:39:44 2013 +0900 @@ -10,7 +10,7 @@ public abstract class DataSegmentManager { - protected ConcurrentHashMap<Integer, Command> seqHash = new ConcurrentHashMap<Integer, Command>(); //TODO Over Head + protected ConcurrentHashMap<Integer, Command> seqHash = new ConcurrentHashMap<Integer, Command>(); protected LinkedBlockingQueue<Command> replyQueue = new LinkedBlockingQueue<Command>(); protected AtomicInteger seq = new AtomicInteger(1); // waiting for PUT or UPDATE at unique sequence number // but it doesn't need for Local @@ -49,23 +49,22 @@ e.printStackTrace(); } } - - + public abstract void put(String key, Object val); public abstract void update(String key, Object val); public abstract void take(Receiver receiver, CodeSegment cs); public abstract void peek(Receiver receiver, CodeSegment cs); + + public abstract void quickPut(String key, Object val); + public abstract void quickUpdate(String key, Object val); + public abstract void quickPeek(Receiver receiver, CodeSegment cs); + public abstract void quickTake(Receiver receiver, CodeSegment cs); public abstract void remove(String key); public abstract void close(); public abstract void finish(); - public abstract void quickPut(String key, Object val); - public abstract void quickUpdate(String key, Object val); - - public abstract void quickPeek(Receiver receiver, CodeSegment cs); - public abstract void quickTake(Receiver receiver, CodeSegment cs); - - + public abstract void ping(String returnKey); + public abstract void response(String returnKey); }
--- a/src/alice/datasegment/LocalDataSegmentManager.java Tue Nov 19 15:51:52 2013 +0900 +++ b/src/alice/datasegment/LocalDataSegmentManager.java Tue Nov 19 17:39:44 2013 +0900 @@ -153,4 +153,14 @@ } + @Override + public void ping(String returnKey) { + + } + + @Override + public void response(String returnKey) { + + } + }
--- a/src/alice/datasegment/RemoteDataSegmentManager.java Tue Nov 19 15:51:52 2013 +0900 +++ b/src/alice/datasegment/RemoteDataSegmentManager.java Tue Nov 19 17:39:44 2013 +0900 @@ -144,5 +144,17 @@ connection.sendCommand(cmd); } + @Override + public void ping(String returnKey) { + Command cmd = new Command(CommandType.PING, null, returnKey, null, 0, 0, null, null, null); + connection.write(cmd); + } + + @Override + public void response(String returnKey) { + Command cmd = new Command(CommandType.RESPONSE, null, returnKey, null, 0, 0, null, null, null); + connection.write(cmd); + } + }
--- a/src/alice/topology/manager/keeparive/PingScheduler.java Tue Nov 19 15:51:52 2013 +0900 +++ b/src/alice/topology/manager/keeparive/PingScheduler.java Tue Nov 19 17:39:44 2013 +0900 @@ -59,7 +59,7 @@ if (interruptFlag){ interruptFlag = false; } else { - //ods.put("_SENDPING", nowTask); + ods.put("_SENDPING", nowTask.getManagerKey()); } } } catch (InterruptedException e) {
--- a/src/alice/topology/manager/keeparive/SendPing.java Tue Nov 19 15:51:52 2013 +0900 +++ b/src/alice/topology/manager/keeparive/SendPing.java Tue Nov 19 17:39:44 2013 +0900 @@ -13,8 +13,8 @@ @Override public void run(){ - TaskInfo info = taskInfo.asClass(TaskInfo.class); - ods.ping(info.getManagerKey()); + String managerKey = taskInfo.asString(); + ods.ping(managerKey , "UUID"); //new RespondPing(); } }