Mercurial > hg > FederatedLinda
changeset 96:96c63bc659d4 fuchita
change sync() for no wait sync. (sorry for this)
more practical wait read example.
author | one |
---|---|
date | Wed, 26 May 2010 08:56:24 +0900 |
parents | 7bf2eeea23a0 |
children | 0ea086f0e96f |
files | src/fdl/FDLindaServ.java src/fdl/FederatedLinda.java src/fdl/IOHandler.java src/fdl/MetaLinda.java src/fdl/PSX.java src/fdl/test/TestWaitRd.java |
diffstat | 6 files changed, 219 insertions(+), 111 deletions(-) [+] |
line wrap: on
line diff
--- a/src/fdl/FDLindaServ.java Tue May 25 23:11:11 2010 +0900 +++ b/src/fdl/FDLindaServ.java Wed May 26 08:56:24 2010 +0900 @@ -17,6 +17,7 @@ static final int MAX_REQ = 1; static final int FAIL = (-1); static final int DEF_PORT = 10000; + public static boolean debug = false; public int port = DEF_PORT; AbstractSelector selector; private ServerSocketChannel ssChannel; @@ -24,7 +25,7 @@ public MetaEngine me; public static void main(final String[] args) { - final String usages = "usage: FDLindaServ [-p port]"; + final String usages = "usage: FDLindaServ [-d] [-p port]"; int port = DEF_PORT; //引数判定 @@ -32,7 +33,9 @@ for (int i=0; i<args.length; ++i) { if("-p".equals(args[i])) { port = Integer.parseInt(args[++i]); - } + } else if("-d".equals(args[i])) { + debug = true; + } } } catch (NumberFormatException e) { System.err.println(usages); @@ -80,8 +83,25 @@ } public void checkTuple() { - checkTuple(0); - } + // セレクタによる監視 + try { + if (selector.select()>0) { +// this does not work because #it.remove() is not called. +// for(SelectionKey s:selector.selectedKeys()) { +// TupleHandler handler = (TupleHandler)s.attachment(); +// handler.handle(s); +// } + for (Iterator<SelectionKey> it = selector.selectedKeys().iterator();it.hasNext(); ) { + SelectionKey s = it.next(); + it.remove(); + TupleHandler handler = (TupleHandler)s.attachment(); + handler.handle(s); + } + } + } catch (ClosedChannelException e) { + // we have to do something... + } catch (IOException e) { + }} public void checkTuple(long timeout) { // セレクタによる監視 @@ -107,6 +127,7 @@ public void log(Level level,String msg) { + if (level!=Level.SEVERE && !debug) return; System.err.println(msg); if (level==Level.SEVERE) new IOException().setStackTrace(null);
--- a/src/fdl/FederatedLinda.java Tue May 25 23:11:11 2010 +0900 +++ b/src/fdl/FederatedLinda.java Wed May 26 08:56:24 2010 +0900 @@ -38,7 +38,7 @@ FederatedLinda fdl; static int MAX_SEQUENCE = 2048; - static boolean debug = false; + public static boolean debug = false; public int tid; public int seq; @@ -116,9 +116,7 @@ } s = new Integer(seq); } while (seqHash.containsKey(s)); - if (debug) { - log(Level.INFO,"hash value = "+s.hashCode()); - } + // log(Level.INFO,"hash value = "+s.hashCode()); seqHash.put(s,reply); seq++; return seq-1; @@ -128,10 +126,46 @@ return selector; } + /** + * sync with no wait + * @return 0 + * @throws IOException + */ public int sync() throws IOException { - return sync(0); + int key_num = 0; + queueExec(); + + try { + if (selector.select()>0) { + for (Iterator<SelectionKey> it = selector.selectedKeys().iterator();it.hasNext(); ) { + SelectionKey s = it.next(); + it.remove(); + try { + if (!s.isReadable()) + throw new IOException(); + TupleHandler handle = (TupleHandler)s.attachment(); + handle.handle(s); + } catch (IOException e) { + s.cancel(); + log(Level.INFO,""+s.channel()+" is closed."); + } + } + } + } catch (IOException e) { + e.printStackTrace(); + } catch (ClosedSelectorException e) { + // client should be know + } + + return key_num; } + /** + * sync with mtimeout msec wait + * @param mtimeout 0 means indifinite wait + * @return 0 + * @throws IOException + */ public int sync(long mtimeout) throws IOException { int key_num = 0; @@ -180,6 +214,7 @@ } public void log(Level level,String msg) { + if (level!=Level.SEVERE && !debug) return; System.err.println(msg); if (level==Level.SEVERE) new IOException().setStackTrace(null);
--- a/src/fdl/IOHandler.java Tue May 25 23:11:11 2010 +0900 +++ b/src/fdl/IOHandler.java Wed May 26 08:56:24 2010 +0900 @@ -9,7 +9,7 @@ import java.util.logging.Level; public class IOHandler implements TupleHandler { - static final boolean debug = true; + static final boolean debug = false; public TupleSpace tupleSpace; public SocketChannel ch; public FDLindaServ fds;
--- a/src/fdl/MetaLinda.java Tue May 25 23:11:11 2010 +0900 +++ b/src/fdl/MetaLinda.java Wed May 26 08:56:24 2010 +0900 @@ -95,19 +95,42 @@ addReply(r); } + + public void waitRd(int id, PSXCallback callback) { + MetaReply r = new MetaReply(PSX.PSX_WAIT_RD,id,ts,callback); + addReply(r); + } + + public PSXReply waitRd(int id) { + MetaReply r = new MetaReply(PSX.PSX_WAIT_RD,id,ts); + return r; + } + public PSXLinda add(PSXLinda linda) { next = linda; return this; } - + /** + * Meta Sync with no wait + */ public int sync() { - return sync(0); + fdl.queueExec(); + fds.checkTuple(); // fdl sync is also handled here + return metaSync(); } + /** + * Meta Sync with wait + * @param timeout wait timeout msec, if 0 wait indefinitely + */ public int sync(long timeout) { fdl.queueExec(); fds.checkTuple(timeout); // fdl sync is also handled here + return metaSync(); + } + + public int metaSync() { /* * r.callback() may call meta.sync() and modifies the * replies queue. Do current size of queue only. The @@ -143,16 +166,6 @@ public void setTupleSpaceHook(IOHandlerHook hook) { ts.hook = hook; } - - public void waitRd(int id, PSXCallback callback) { - MetaReply r = new MetaReply(PSX.PSX_WAIT_RD,id,ts,callback); - addReply(r); - } - - public PSXReply waitRd(int id) { - MetaReply r = new MetaReply(PSX.PSX_WAIT_RD,id,ts); - return r; - } }
--- a/src/fdl/PSX.java Tue May 25 23:11:11 2010 +0900 +++ b/src/fdl/PSX.java Wed May 26 08:56:24 2010 +0900 @@ -218,7 +218,7 @@ System.err.println("read size mismatch"+readsize+" and "+command.capacity()); } while(readsize>0) { - if(false && IOHandler.debug){ + if(IOHandler.debug){ System.out.println("reading packet..."+readsize); } count = channel.read(command);
--- a/src/fdl/test/TestWaitRd.java Tue May 25 23:11:11 2010 +0900 +++ b/src/fdl/test/TestWaitRd.java Wed May 26 08:56:24 2010 +0900 @@ -20,130 +20,169 @@ public FDLindaServ fds; public static final int PORT = 10000; - class Server implements Runnable { + class LindaServer implements Runnable { + public int id; + public LindaServer(int id) { this.id = id; } public void run() { - String[] args = {"-p",Integer.toString(PORT)}; + String[] args = {/* "-d",*/"-p",Integer.toString(PORT)}; FDLindaServ.main(args); } } - class Client implements Runnable { + class Sender implements Runnable { + public String id; + public Sender(int id) { this.id = "Sender"+id; } public void run() { - String[] args = {}; + String[] args = {id}; sleep(2000); main(args); } - public synchronized void sleep(int time) { + + public void main(String[] arg) { try { - wait(time); - } catch (InterruptedException e) { + PSXLinda psx = openLinda(id); + sleep(1000); + sendData(psx,1,0); + psx.sync(1000); + waitIn(psx,1); + sendData(psx,1,1); + psx.sync(1000); + waitIn(psx,1); + sendData(psx,1,2); + psx.sync(1000); + waitIn(psx,1); + + for(int i=3;i<10;i++) { + sendData(psx,1,i); + psx.sync(1000); + waitIn(psx,1); + } + sleep(1000); + } catch (IOException e) { e.printStackTrace(); } } - - public void main(String[] arg) { - try { - PSXLinda psx = openLinda(); - ByteBuffer data = sendData(psx,1,0); + } - psx.waitRd(1, new PSXCallback() { public void callback(ByteBuffer reply) {read_wait(reply,"First read");} }); - psx.sync(1); - sendData(psx,1,1); - psx.sync(1); - + class Reader implements Runnable { + public String id; + public Reader(int id) { this.id = "Reader"+id; } + public void run() { + String[] args = {id}; + sleep(2000); + main(args); + } - sleep(1000); + public void main(String[] arg) { + final String name = arg[0]; + try { + PSXLinda psx = openLinda(id); - psx.waitRd(1, new PSXCallback() { public void callback(ByteBuffer reply) {read_wait(reply,"2nd read");} }); - psx.sync(1); - sendData(psx,1,2); - psx.sync(1); - sleep(1000); - + PSXReply reply = psx.waitRd(1); + read_wait(psx, reply,name+": First read"); + PSXReply reply1 = psx.waitRd(1); + read_wait(psx, reply1,name+": 2nd read"); for(int i=3;i<10;i++) { - final int j = i; - psx.waitRd(1, new PSXCallback() {public void callback(ByteBuffer reply) {read_wait(reply,""+j+"th read");} }); - psx.sync(1); - sendData(psx,1,i); - psx.sync(1); + PSXReply replyn = psx.waitRd(1); + read_wait(psx, replyn,name+": "+i+"th read"); } - sleep(1000); - - data.clear(); + + ByteBuffer data = ByteBuffer.allocate(10); psx.out(PSX.META_STOP, data); psx.sync(1); } catch (IOException e) { - System.err.println("Communication failure."); + System.err.println(name+": Communication failure."); } } - - public void read_wait(ByteBuffer r, String mesg) { - System.out.println(mesg); - System.out.println(r.getInt()); - System.out.println(""); - } - - private PSXLinda openLinda() throws IOException { - FederatedLinda fdl; - PSXLinda psx; - PSXReply r; - String host; - InetSocketAddress localAddress; - int port = PORT; - try { - localAddress = new InetSocketAddress(InetAddress.getLocalHost(), port); - host = localAddress.getHostName(); - } catch (UnknownHostException e) { - host = "localhost"; - } - fdl = FederatedLinda.init(); - psx = fdl.open(host,port); - r = psx.in(65535); - fdl.sync(1); - System.out.println("Connected."); - int cnt=0; - while(!r.ready()) { - // psx.sync(1000); - psx.sync(); - System.out.println("Waiting...."+(cnt++)); - } - print_id(r); - return psx; - } - - private ByteBuffer sendData(PSXLinda psx,int id, int n) { - ByteBuffer data = ByteBuffer.allocate(10); - data.putInt(n); - data.flip(); - psx.out(id,data); - return data; - } + } - - public void print_id (PSXReply ans) throws IOException { - ByteBuffer r = ans.getData(); - System.out.print("ID = "); - System.out.write(r.array()); - System.out.println(""); + public synchronized void sleep(int time) { + try { + wait(time); + } catch (InterruptedException e) { + e.printStackTrace(); } } + public void read_wait(PSXLinda psx, PSXReply reply, String mesg) throws IOException { + while(!reply.ready()) psx.sync(10); + System.out.println(mesg); + System.out.println(reply.getData().getInt()); + System.out.println(""); + } + + public PSXLinda openLinda(String name) throws IOException { + FederatedLinda fdl; + PSXLinda psx; + PSXReply r; + String host; + InetSocketAddress localAddress; + int port = PORT; + try { + localAddress = new InetSocketAddress(InetAddress.getLocalHost(), port); + host = localAddress.getHostName(); + } catch (UnknownHostException e) { + host = "localhost"; + } + fdl = FederatedLinda.init(); + psx = fdl.open(host,port); + r = psx.in(65535); + fdl.sync(1); + System.out.println(name+": Connected."); + int cnt=0; + while(!r.ready()) { + // psx.sync(1000); + psx.sync(); + System.out.println(name+": Waiting...."+(cnt++)); + } + print_id(r); + return psx; + } + + public ByteBuffer sendData(PSXLinda psx,int id, int n) { + ByteBuffer data = ByteBuffer.allocate(10); + data.putInt(n); + data.flip(); + psx.out(id,data); + return data; + } + + public void waitIn(PSXLinda psx, int i) throws IOException { + PSXReply r = psx.in(i); + while(! r.ready()) { + psx.sync(10); + } + return; + } + + public void print_id (PSXReply ans) throws IOException { + ByteBuffer r = ans.getData(); + System.out.print("ID = "); + System.out.write(r.array()); + System.out.println(""); + } + + public static void main(String[] arg) throws InterruptedException { TestWaitRd me = new TestWaitRd(); me.test1(); } public void test1() throws InterruptedException { - Server s = new Server(); - Client c = new Client(); - Thread s1 = new Thread(s); - Thread c1 = new Thread(c); + Thread l1 = new Thread(new LindaServer(1)); + Thread r1 = new Thread(new Reader(1)); + Thread r2 = new Thread(new Reader(2)); + Thread s1 = new Thread(new Sender(1)); + l1.start(); s1.start(); - c1.start(); + r1.start(); + r2.start(); s1.join(); - c1.join(); - + r1.join(); + r2.join(); + l1.join(); + } }