Mercurial > hg > FederatedLinda
changeset 36:d5bca4b5ee95
meta.sync() recursive call fix.
author | kono |
---|---|
date | Sun, 24 Aug 2008 21:06:39 +0900 |
parents | fe338d497c72 |
children | 2a366abc3f1f |
files | src/fdl/FederatedLinda.java src/fdl/MetaLinda.java src/fdl/MetaLogEngine.java src/fdl/PSX.java src/fdl/PSXLindaImpl.java src/fdl/test/TestMetaLinda.java |
diffstat | 6 files changed, 32 insertions(+), 26 deletions(-) [+] |
line wrap: on
line diff
--- a/src/fdl/FederatedLinda.java Sun Aug 24 19:07:28 2008 +0900 +++ b/src/fdl/FederatedLinda.java Sun Aug 24 21:06:39 2008 +0900 @@ -154,7 +154,7 @@ } catch (IOException e) { e.printStackTrace(); } catch (ClosedSelectorException e) { - e.printStackTrace(); + // client should be know } return key_num;
--- a/src/fdl/MetaLinda.java Sun Aug 24 19:07:28 2008 +0900 +++ b/src/fdl/MetaLinda.java Sun Aug 24 21:06:39 2008 +0900 @@ -49,7 +49,7 @@ } private void addReply(MetaReply r) { - replies.add(r); + replies.addLast(r); } public PSXReply ck(int id) { @@ -99,17 +99,6 @@ } public int sync(long timeout) { - fds.checkTuple(timeout); - if (replies.size()>0) { - // copy replies to avoid insert during r.ready() - LinkedList<MetaReply> list = replies; - replies = new LinkedList<MetaReply>(); - for(MetaReply r:list) { - if (!r.ready()) { - addReply(r); - } - } - } if (fdl!=null) { try { fdl.sync(timeout); @@ -117,6 +106,24 @@ e.printStackTrace(); } } + fds.checkTuple(timeout); + /* + * r.callback() may call meta.sync() and modifies the + * replies queue. Do current size of queue only. The + * rest is checked on the next sync call including + * the recursive case. + */ + int count = replies.size(); + while(count-->0) { + MetaReply r = replies.poll(); + // previous call back may call this sync and make + // replies shorter. + if (r==null) break; + if (r.ready()) { + } else { + addReply(r); + } + } return 0; }
--- a/src/fdl/MetaLogEngine.java Sun Aug 24 19:07:28 2008 +0900 +++ b/src/fdl/MetaLogEngine.java Sun Aug 24 21:06:39 2008 +0900 @@ -29,9 +29,11 @@ data = commDebug.getLog(); if (data!=null) break; meta.sync(); - } while (true); - meta.out(PSX.META_MONITOR_DATA, data); - meta.in(PSX.META_MONITOR,monitor_callback); + } while (running); + if (running) { + meta.out(PSX.META_MONITOR_DATA, data); + meta.in(PSX.META_MONITOR,monitor_callback); + } }}; public void mainLoop() {
--- a/src/fdl/PSX.java Sun Aug 24 19:07:28 2008 +0900 +++ b/src/fdl/PSX.java Sun Aug 24 21:06:39 2008 +0900 @@ -139,13 +139,10 @@ // if datalen in the header is different from ByteBuffer remaining(), we lost // protocol synchronization. Make sure to have correct length now. if (true) { - int seq = command.getInt(LINDA_SEQ_OFFSET); -// if (seq>500) -// System.err.println("Illegal seq sent."); int datalen = data==null?0:(data.limit()-data.position()); - if (command.getInt(LINDA_DATA_LENGTH_OFFSET)>0 && (data==null||data.remaining()!=command.getInt(LINDA_DATA_LENGTH_OFFSET))) { - System.err.println("Missing data."); - } +// if (command.getInt(LINDA_DATA_LENGTH_OFFSET)>0 && (data==null||data.remaining()!=command.getInt(LINDA_DATA_LENGTH_OFFSET))) { +// System.err.println("Missing data."); +// } command.putInt(LINDA_DATA_LENGTH_OFFSET,datalen); command.putInt(LINDA_PACKET_LENGTH_OFFSET,datalen+LINDA_HEADER_SIZE-INT_SIZE); command.rewind();
--- a/src/fdl/PSXLindaImpl.java Sun Aug 24 19:07:28 2008 +0900 +++ b/src/fdl/PSXLindaImpl.java Sun Aug 24 21:06:39 2008 +0900 @@ -51,11 +51,11 @@ fdl = _fdl; socketChannel = SocketChannel.open(); - socketChannel.configureBlocking(false); + socketChannel.configureBlocking(false); // Selector needs this - // Socket socket = socketChannel.socket(); + Socket socket = socketChannel.socket(); // socket.setReuseAddress(true); Client side don't need this. - // socket.setTcpNoDelay(true); + socket.setTcpNoDelay(true); socketChannel.connect(new InetSocketAddress(_host, _port)); while (! socketChannel.finishConnect()) {
--- a/src/fdl/test/TestMetaLinda.java Sun Aug 24 19:07:28 2008 +0900 +++ b/src/fdl/test/TestMetaLinda.java Sun Aug 24 21:06:39 2008 +0900 @@ -76,7 +76,7 @@ System.out.println("Try to stop the server"); psx.out(PSX.META_STOP, null); - psx.sync(); + psx.sync(100); } catch (IOException e) { System.err.println("Communication failure.");