changeset 95:7bf2eeea23a0 fuchita

Merge with ea4ee892baf5df730093bbee7d5eb2f5b5acaf53
author one
date Tue, 25 May 2010 23:11:11 +0900
parents d962eecaf9f5 (diff) ea4ee892baf5 (current diff)
children 96c63bc659d4
files src/fdl/FederatedLinda.java src/fdl/MetaLinda.java src/fdl/PSXLindaImpl.java
diffstat 6 files changed, 181 insertions(+), 3 deletions(-) [+]
line wrap: on
line diff
--- a/src/fdl/FederatedLinda.java	Thu Apr 22 16:13:03 2010 +0900
+++ b/src/fdl/FederatedLinda.java	Tue May 25 23:11:11 2010 +0900
@@ -184,6 +184,10 @@
 		if (level==Level.SEVERE)
 			new IOException().setStackTrace(null);
 	}
+
+	public void wakeup() {
+		selector.wakeup();		
+	}
 }
 
 /* end */
--- a/src/fdl/MetaLinda.java	Thu Apr 22 16:13:03 2010 +0900
+++ b/src/fdl/MetaLinda.java	Tue May 25 23:11:11 2010 +0900
@@ -132,6 +132,10 @@
 		} while (hasNewReply);
 		return 0;
 	}
+	
+	public void wakeup() {
+		fdl.wakeup();
+	}
 
 	public void send(ByteBuffer command, ByteBuffer data) {
 	}
@@ -140,6 +144,15 @@
 		ts.hook = hook;
 	}
 
+	public void waitRd(int id, PSXCallback callback) {
+		MetaReply r = new MetaReply(PSX.PSX_WAIT_RD,id,ts,callback);
+		addReply(r);
+	}
+
+	public PSXReply waitRd(int id) {
+		MetaReply r = new MetaReply(PSX.PSX_WAIT_RD,id,ts);
+		return r;
+	}
 }
 
 
--- a/src/fdl/PSXLinda.java	Thu Apr 22 16:13:03 2010 +0900
+++ b/src/fdl/PSXLinda.java	Tue May 25 23:11:11 2010 +0900
@@ -31,4 +31,8 @@
 	public int sync(long mtime) throws IOException ;
 
 	public void send(ByteBuffer command, ByteBuffer data);
+
+	public void waitRd(int i, PSXCallback callback);
+	
+	public PSXReply waitRd(int i);
 }
--- a/src/fdl/PSXLindaImpl.java	Thu Apr 22 16:13:03 2010 +0900
+++ b/src/fdl/PSXLindaImpl.java	Tue May 25 23:11:11 2010 +0900
@@ -186,6 +186,15 @@
 	}
 
 
+	public PSXReply waitRd(int id) {
+		PSXReply r = fdl.psx_queue(this, id, null, PSX.PSX_WAIT_RD, null);
+		return r;
+	}
+
+	public void waitRd(int id, PSXCallback callback) {
+		fdl.psx_queue(this, id, null, PSX.PSX_WAIT_RD, callback);
+	}
+
 }
 
 
--- a/src/fdl/test/TestPSXLinda.java	Thu Apr 22 16:13:03 2010 +0900
+++ b/src/fdl/test/TestPSXLinda.java	Tue May 25 23:11:11 2010 +0900
@@ -84,11 +84,10 @@
 		}
 	}
 
-	public static void print_id (PSXReply ans) {
+	public static void print_id (PSXReply ans) throws IOException {
 		ByteBuffer r = ans.getData();
-		id = r.getShort();
 		System.out.print("ID = ");
-		System.out.println(id);
+		System.out.write(r.array());
 	}
 }
 
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/fdl/test/TestWaitRd.java	Tue May 25 23:11:11 2010 +0900
@@ -0,0 +1,149 @@
+package fdl.test;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
+
+import fdl.FDLindaServ;
+import fdl.FederatedLinda;
+import fdl.PSX;
+import fdl.PSXCallback;
+import fdl.PSXLinda;
+import fdl.PSXReply;
+
+
+public class TestWaitRd {
+
+	public FederatedLinda fdl;
+	public FDLindaServ fds;
+	public static final int PORT = 10000;
+
+	class Server implements Runnable {
+		public void run() {
+			String[] args = {"-p",Integer.toString(PORT)};
+			FDLindaServ.main(args);
+		}
+	}
+
+	class Client implements Runnable {
+		public void run() {
+			String[] args = {};
+			sleep(2000);
+			main(args);
+		}
+		public synchronized void sleep(int time) {
+			try {
+				wait(time);
+			} catch (InterruptedException e) {
+				e.printStackTrace();
+			}
+		}
+		
+		public void main(String[] arg) {
+			try {
+				PSXLinda psx = openLinda();
+				ByteBuffer data = sendData(psx,1,0);
+
+				psx.waitRd(1, new PSXCallback() {	public void callback(ByteBuffer reply) {read_wait(reply,"First read");} });
+				psx.sync(1);
+				sendData(psx,1,1);
+				psx.sync(1);
+				
+
+				sleep(1000);
+
+				psx.waitRd(1, new PSXCallback() {	public void callback(ByteBuffer reply) {read_wait(reply,"2nd read");} });
+				psx.sync(1);
+				sendData(psx,1,2);
+				psx.sync(1);
+				sleep(1000);
+
+
+				for(int i=3;i<10;i++) {
+					final int j = i;
+					psx.waitRd(1, new PSXCallback() {public void callback(ByteBuffer reply) {read_wait(reply,""+j+"th read");} });
+					psx.sync(1);
+					sendData(psx,1,i);
+					psx.sync(1);
+				}
+				sleep(1000);
+				
+				data.clear();
+				psx.out(PSX.META_STOP, data);
+				psx.sync(1);
+
+			} catch (IOException e) {
+				System.err.println("Communication failure.");
+			}
+		}
+		
+		public void read_wait(ByteBuffer r, String mesg) {
+			System.out.println(mesg);
+			System.out.println(r.getInt());
+			System.out.println("");
+		}
+		
+		private PSXLinda openLinda() throws IOException {
+			FederatedLinda fdl;
+			PSXLinda psx;
+			PSXReply r;
+			String host;
+			InetSocketAddress localAddress;
+			int port = PORT;
+			try {
+				localAddress = new InetSocketAddress(InetAddress.getLocalHost(), port);
+				host = localAddress.getHostName();
+			} catch (UnknownHostException e) {
+				host = "localhost";
+			}
+			fdl = FederatedLinda.init();
+			psx = fdl.open(host,port);
+			r = psx.in(65535);
+			fdl.sync(1);
+			System.out.println("Connected.");
+			int cnt=0;
+			while(!r.ready()) {
+				// psx.sync(1000);
+				psx.sync();
+				System.out.println("Waiting...."+(cnt++));
+			}
+			print_id(r);
+			return psx;
+		}
+		
+		private ByteBuffer sendData(PSXLinda psx,int id, int n) {
+			ByteBuffer data = ByteBuffer.allocate(10);
+			data.putInt(n);
+			data.flip();
+			psx.out(id,data);
+			return data;
+		}
+
+
+		public void print_id (PSXReply ans) throws IOException {
+			ByteBuffer r = ans.getData();
+			System.out.print("ID = ");
+			System.out.write(r.array());
+			System.out.println("");
+		}
+	}
+
+	public static void main(String[] arg) throws InterruptedException {
+		TestWaitRd me = new TestWaitRd();
+		me.test1();
+	}
+
+	public void test1() throws InterruptedException {
+		Server s = new Server();
+		Client c = new Client();
+		Thread s1 = new Thread(s);
+		Thread c1 = new Thread(c);
+		s1.start();
+		c1.start();
+		s1.join();
+		c1.join();
+		
+	}
+}