Mercurial > hg > FederatedLinda
changeset 103:7da06001aeb5 fuchita
add TestTree.java
author | one |
---|---|
date | Wed, 26 May 2010 17:55:40 +0900 (2010-05-26) |
parents | 3b000c4a4d31 |
children | 25f7fc05be71 |
files | src/fdl/test/TestTree.java |
diffstat | 1 files changed, 373 insertions(+), 0 deletions(-) [+] |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/fdl/test/TestTree.java Wed May 26 17:55:40 2010 +0900 @@ -0,0 +1,373 @@ +package fdl.test; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.LinkedList; + +import fdl.FDLindaServ; +import fdl.FederatedLinda; +import fdl.MetaEngine; +import fdl.MetaLinda; +import fdl.PSX; +import fdl.PSXCallback; +import fdl.PSXLinda; +import fdl.PSXReply; + + +public class TestTree { + + public static final int PORT = 10000; + public static final int ConnectReuest = 100; + private static final int Down = 101; + public static final int Up = 102; + private static final int NO_HOST = -1; + private static final int HOST_LIST_END_MARK = -2; + private static final int CONNECT_LIST_END_MARK = -3; + public LinkedList<Thread> lindas = new LinkedList<Thread>(); + public int id = 0; + public boolean debug = true; + + class TreeNode implements Runnable { + public int id, port; + public TreeNode(int id, int port) { this.id = id; this.port = port; } + public void run() { + String[] args = { + // "-d", + "-p",Integer.toString(port)}; + FDLindaServ.main(new TreeMetaProtocolEngine(id,port), args); + } + } + + class TreeMetaProtocolEngine implements MetaEngine { + boolean running = true; + LinkedList<ByteBuffer> leftWaiter = new LinkedList<ByteBuffer>(); + LinkedList<ByteBuffer> rightWaiter = new LinkedList<ByteBuffer>(); + PSXLinda parent, left, right; + String parentHost, leftHost,rightHost; + int parentPort=0, leftPort=0, rightPort=0; + public int nodeId = 0; + public int nodePort = 0; + + public TreeMetaProtocolEngine(int id, int port) { + nodeId = id; + nodePort = port; + } + + public void mainLoop(final MetaLinda ml) { + ml.in(PSX.META_STOP, new PSXCallback() {public void callback(ByteBuffer reply) { + running = false;}}); + ml.in(ConnectReuest, new PSXCallback() {public void callback(ByteBuffer reply) { + connectChildren(ml, reply); + } + }); + while(running) { + ml.sync(0); + } + } + + private void setHostPort(ByteBuffer reply) { + parentPort = reply.getInt(); + leftPort = reply.getInt(); + rightPort = reply.getInt(); + if (parentPort!=0) parentHost = getString(reply); + if (leftPort!=0) leftHost = getString(reply); + if (rightPort!=0) rightHost = getString(reply); + } + + /** + * parent + * | + * self + * / \ + * left right + * + * @param ml + * @param reply + */ + private void connectChildren(final MetaLinda ml, ByteBuffer reply) { + setHostPort(reply); + try { + if (parentPort!=0) parent = ml.fdl.open(parentHost, parentPort); + if (leftPort!=0) left = ml.fdl.open(leftHost, leftPort); + if (rightPort!=0) right = ml.fdl.open(rightHost, rightPort); + + if (debug) System.out.println("Connect this port="+nodePort+" parent="+parentPort+" left="+leftPort+" right="+rightPort); + ml.in(Down, new PSXCallback() {public void callback(ByteBuffer reply) { + ml.in(Down,this); + if (left==null) { + if (debug) System.out.println("Reached in Leaf "+nodeId+" value = "+parentPort); + // Leaf case + ByteBuffer answer = ByteBuffer.allocate(10); + answer.putInt(parentPort); + answer.flip(); + ml.out(Up, answer); + return; + } + if (debug) System.out.println("Pass it to the children from "+nodeId + " to "+ leftPort + " and " + rightPort + "."); + ByteBuffer copy = reply.duplicate(); + left.out(Down, reply); + right.out(Down, copy); + } + + }); + if (leftPort!=0) + left.in(Up, new PSXCallback() {public void callback(ByteBuffer reply) { + if (debug) System.out.println("Up from left at"+nodeId); + left.in(Up,this); + leftWaiter.add(reply); + checkSend(ml); + } + + }); + if (rightPort!=0) + right.in(Up, new PSXCallback() {public void callback(ByteBuffer reply) { + if (debug) System.out.println("Up from right at"+nodeId); + right.in(Up,this); + rightWaiter.add(reply); + checkSend(ml); + } + }); + } catch (IOException e) { + } + } + + private void checkSend(MetaLinda ml) { + if (leftWaiter.isEmpty()||rightWaiter.isEmpty()) return; + ByteBuffer out = ByteBuffer.allocate(10); + int value = leftWaiter.poll().getInt()+rightWaiter.poll().getInt(); + if (parent!=null) { + if (debug) System.out.println("Up the vluae "+value+" from "+nodeId); + out.putInt(value); + out.flip(); + ml.out(Up, out); + } else { + // Top node case + System.out.println("Top level node gets "+value); + } + } + } + + + class Congigure implements Runnable { + public String id; + public ByteBuffer config; + public FederatedLinda fdl; + public LinkedList<PSXLinda> psxs = new LinkedList<PSXLinda>(); + private LinkedList<String> hosts = new LinkedList<String>(); + private LinkedList<Integer> ports = new LinkedList<Integer>(); + public Congigure(int id,ByteBuffer config) { this.id = "Configure"+id; + this.config = config; + } + public void run() { + String[] args = {id}; + main(args); + } + + public void main(String[] arg) { + final String name = arg[0]; + try { + fdl = FederatedLinda.init(); + readConfigure(); + fdl.sync(1000); + startTest(); + stop(); + } catch (IOException e) { + System.err.println(name+": Communication failure."); + } + } + + + public void startTest() { + System.out.println("StartTest"); + try { + PSXLinda psx = psxs.get(0); + for(int i=3;i<10;i++) { + sendData(psx,Down,i); + psx.sync(1000); + } + sleep(1000); + } catch (IOException e) { + e.printStackTrace(); + } + } + + public void stop() throws IOException { + ByteBuffer data = ByteBuffer.allocate(10); + for(PSXLinda psx:psxs) { + psx.out(PSX.META_STOP, data.duplicate()); + psx.sync(1); + } + } + + public void openLinda(String host, int port) throws IOException { + FederatedLinda fdl; + PSXLinda psx; + PSXReply r; + fdl = FederatedLinda.init(); + psx = fdl.open(host,port); + r = psx.in(65535); + fdl.sync(1); + System.out.println(host+" port "+port +": Connected."); + int cnt=0; + while(!r.ready()) { + // psx.sync(1000); + psx.sync(10); + System.out.println(host+" port "+port +": Waiting...."+(cnt++)); + } + print_id(r); + + return ; + } + + public void readConfigure() throws IOException { + int mode = 0; + int host = 0; + while(config.hasRemaining()) { + int port = config.getInt(); + if (mode==0) { + if (port==HOST_LIST_END_MARK) { + mode = 1; + continue; + } + String hostname = getString(config); + + psxs.add(fdl.open(hostname,port)); + hosts.add(hostname); + ports.add(port); + + } else { + if (port == CONNECT_LIST_END_MARK) return; + connect(host++, port,config.getInt(),config.getInt()); + } + } + } + + private void connect(int host, int parent, int left, int right) { + PSXLinda p = psxs.get(host); + ByteBuffer out = ByteBuffer.allocate(4096); + + out.putInt(parent!=NO_HOST? ports.get(parent):0); + out.putInt(left!=NO_HOST? ports.get(left):0); + out.putInt(right!=NO_HOST? ports.get(right):0); + if (parent!=NO_HOST) putString(out,hosts.get(parent)); + if (left!=NO_HOST) putString(out,hosts.get(left)); + if (right!=NO_HOST) putString(out,hosts.get(right)); + out.flip(); + p.out(ConnectReuest, out); + } + + } + + public synchronized void sleep(int time) { + try { + wait(time); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + public void read_wait(PSXLinda psx, PSXReply reply, String mesg) throws IOException { + while(!reply.ready()) psx.sync(10); + System.out.println(mesg); + System.out.println(reply.getData().getInt()); + System.out.println(""); + } + + public ByteBuffer sendData(PSXLinda psx,int id, int n) { + ByteBuffer data = ByteBuffer.allocate(10); + data.putInt(n); + data.flip(); + psx.out(id,data); + return data; + } + + public void in_wait(PSXLinda psx, int i) throws IOException { + PSXReply r = psx.in(i); + while(! r.ready()) { + psx.sync(10); + } + return; + } + + public void print_id (PSXReply ans) throws IOException { + ByteBuffer r = ans.getData(); + System.out.print("ID = "); + System.out.write(r.array()); + System.out.println(""); + } + + + public String getString(ByteBuffer reply) { + char c; + String s = ""; + while(reply.hasRemaining()) { + c = reply.getChar(); + if (c== 0) break; + s += c; + } + return s; + } + + public void putString(ByteBuffer reply,String s) { + for(int i=0; i<s.length(); i++) { + char c= s.charAt(i); + reply.putChar(c); + } + reply.putChar((char) 0); + } + + public static void main(String[] arg) throws InterruptedException { + TestTree me = new TestTree(); + me.test1(); + } + + public void test1() throws InterruptedException { + ByteBuffer config = makeConfig(); + sleep(2000); + System.out.println("Start Configure"); + Thread r1 = new Thread(new Congigure(1,config)); + r1.start(); + r1.join(); + } + + private ByteBuffer makeConfig() { + ByteBuffer config = ByteBuffer.allocate(4096); + putHostRun(config,"localhost",PORT); + putHostRun(config,"localhost",PORT+1); + putHostRun(config,"localhost",PORT+2); + putHostRun(config,"localhost",PORT+3); + putHostRun(config,"localhost",PORT+4); + putHostRun(config,"localhost",PORT+5); + putHostRun(config,"localhost",PORT+6); + config.putInt(HOST_LIST_END_MARK); + putTree(config,NO_HOST,1,2); + putTree(config,0,3,4); + putTree(config,0,5,6); + putTree(config,1,NO_HOST,NO_HOST); + putTree(config,1,NO_HOST,NO_HOST); + putTree(config,2,NO_HOST,NO_HOST); + putTree(config,2,NO_HOST,NO_HOST); + config.putInt(CONNECT_LIST_END_MARK); + config.flip(); + return config; + } + + private void putHostRun(ByteBuffer config, String host, int port2) { + putHost(config,host,port2); + // should run on specified host using ssh + Thread s = new Thread(new TreeNode(id ++, port2)); + s.start(); + lindas.add(s); + } + + private void putHost(ByteBuffer config, String s, int port2) { + config.putInt(port2); + putString(config, s); + } + + private void putTree(ByteBuffer config, int p, int l, int r) { + config.putInt(p); + config.putInt(l); + config.putInt(r); + } +}