Mercurial > hg > FederatedLinda
changeset 95:7bf2eeea23a0 fuchita
Merge with ea4ee892baf5df730093bbee7d5eb2f5b5acaf53
author | one |
---|---|
date | Tue, 25 May 2010 23:11:11 +0900 |
parents | d962eecaf9f5 (diff) ea4ee892baf5 (current diff) |
children | 96c63bc659d4 |
files | src/fdl/FederatedLinda.java src/fdl/MetaLinda.java src/fdl/PSXLindaImpl.java |
diffstat | 6 files changed, 181 insertions(+), 3 deletions(-) [+] |
line wrap: on
line diff
--- a/src/fdl/FederatedLinda.java Thu Apr 22 16:13:03 2010 +0900 +++ b/src/fdl/FederatedLinda.java Tue May 25 23:11:11 2010 +0900 @@ -184,6 +184,10 @@ if (level==Level.SEVERE) new IOException().setStackTrace(null); } + + public void wakeup() { + selector.wakeup(); + } } /* end */
--- a/src/fdl/MetaLinda.java Thu Apr 22 16:13:03 2010 +0900 +++ b/src/fdl/MetaLinda.java Tue May 25 23:11:11 2010 +0900 @@ -132,6 +132,10 @@ } while (hasNewReply); return 0; } + + public void wakeup() { + fdl.wakeup(); + } public void send(ByteBuffer command, ByteBuffer data) { } @@ -140,6 +144,15 @@ ts.hook = hook; } + public void waitRd(int id, PSXCallback callback) { + MetaReply r = new MetaReply(PSX.PSX_WAIT_RD,id,ts,callback); + addReply(r); + } + + public PSXReply waitRd(int id) { + MetaReply r = new MetaReply(PSX.PSX_WAIT_RD,id,ts); + return r; + } }
--- a/src/fdl/PSXLinda.java Thu Apr 22 16:13:03 2010 +0900 +++ b/src/fdl/PSXLinda.java Tue May 25 23:11:11 2010 +0900 @@ -31,4 +31,8 @@ public int sync(long mtime) throws IOException ; public void send(ByteBuffer command, ByteBuffer data); + + public void waitRd(int i, PSXCallback callback); + + public PSXReply waitRd(int i); }
--- a/src/fdl/PSXLindaImpl.java Thu Apr 22 16:13:03 2010 +0900 +++ b/src/fdl/PSXLindaImpl.java Tue May 25 23:11:11 2010 +0900 @@ -186,6 +186,15 @@ } + public PSXReply waitRd(int id) { + PSXReply r = fdl.psx_queue(this, id, null, PSX.PSX_WAIT_RD, null); + return r; + } + + public void waitRd(int id, PSXCallback callback) { + fdl.psx_queue(this, id, null, PSX.PSX_WAIT_RD, callback); + } + }
--- a/src/fdl/test/TestPSXLinda.java Thu Apr 22 16:13:03 2010 +0900 +++ b/src/fdl/test/TestPSXLinda.java Tue May 25 23:11:11 2010 +0900 @@ -84,11 +84,10 @@ } } - public static void print_id (PSXReply ans) { + public static void print_id (PSXReply ans) throws IOException { ByteBuffer r = ans.getData(); - id = r.getShort(); System.out.print("ID = "); - System.out.println(id); + System.out.write(r.array()); } }
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/fdl/test/TestWaitRd.java Tue May 25 23:11:11 2010 +0900 @@ -0,0 +1,149 @@ +package fdl.test; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.UnknownHostException; +import java.nio.ByteBuffer; + +import fdl.FDLindaServ; +import fdl.FederatedLinda; +import fdl.PSX; +import fdl.PSXCallback; +import fdl.PSXLinda; +import fdl.PSXReply; + + +public class TestWaitRd { + + public FederatedLinda fdl; + public FDLindaServ fds; + public static final int PORT = 10000; + + class Server implements Runnable { + public void run() { + String[] args = {"-p",Integer.toString(PORT)}; + FDLindaServ.main(args); + } + } + + class Client implements Runnable { + public void run() { + String[] args = {}; + sleep(2000); + main(args); + } + public synchronized void sleep(int time) { + try { + wait(time); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + public void main(String[] arg) { + try { + PSXLinda psx = openLinda(); + ByteBuffer data = sendData(psx,1,0); + + psx.waitRd(1, new PSXCallback() { public void callback(ByteBuffer reply) {read_wait(reply,"First read");} }); + psx.sync(1); + sendData(psx,1,1); + psx.sync(1); + + + sleep(1000); + + psx.waitRd(1, new PSXCallback() { public void callback(ByteBuffer reply) {read_wait(reply,"2nd read");} }); + psx.sync(1); + sendData(psx,1,2); + psx.sync(1); + sleep(1000); + + + for(int i=3;i<10;i++) { + final int j = i; + psx.waitRd(1, new PSXCallback() {public void callback(ByteBuffer reply) {read_wait(reply,""+j+"th read");} }); + psx.sync(1); + sendData(psx,1,i); + psx.sync(1); + } + sleep(1000); + + data.clear(); + psx.out(PSX.META_STOP, data); + psx.sync(1); + + } catch (IOException e) { + System.err.println("Communication failure."); + } + } + + public void read_wait(ByteBuffer r, String mesg) { + System.out.println(mesg); + System.out.println(r.getInt()); + System.out.println(""); + } + + private PSXLinda openLinda() throws IOException { + FederatedLinda fdl; + PSXLinda psx; + PSXReply r; + String host; + InetSocketAddress localAddress; + int port = PORT; + try { + localAddress = new InetSocketAddress(InetAddress.getLocalHost(), port); + host = localAddress.getHostName(); + } catch (UnknownHostException e) { + host = "localhost"; + } + fdl = FederatedLinda.init(); + psx = fdl.open(host,port); + r = psx.in(65535); + fdl.sync(1); + System.out.println("Connected."); + int cnt=0; + while(!r.ready()) { + // psx.sync(1000); + psx.sync(); + System.out.println("Waiting...."+(cnt++)); + } + print_id(r); + return psx; + } + + private ByteBuffer sendData(PSXLinda psx,int id, int n) { + ByteBuffer data = ByteBuffer.allocate(10); + data.putInt(n); + data.flip(); + psx.out(id,data); + return data; + } + + + public void print_id (PSXReply ans) throws IOException { + ByteBuffer r = ans.getData(); + System.out.print("ID = "); + System.out.write(r.array()); + System.out.println(""); + } + } + + public static void main(String[] arg) throws InterruptedException { + TestWaitRd me = new TestWaitRd(); + me.test1(); + } + + public void test1() throws InterruptedException { + Server s = new Server(); + Client c = new Client(); + Thread s1 = new Thread(s); + Thread c1 = new Thread(c); + s1.start(); + c1.start(); + s1.join(); + c1.join(); + + } +}