Mercurial > hg > FederatedLinda
changeset 104:25f7fc05be71 fuchita
(no commit message)
author | one |
---|---|
date | Wed, 26 May 2010 19:01:51 +0900 |
parents | 7da06001aeb5 |
children | be9b84a77b15 |
files | src/fdl/test/TestTree.java |
diffstat | 1 files changed, 107 insertions(+), 10 deletions(-) [+] |
line wrap: on
line diff
--- a/src/fdl/test/TestTree.java Wed May 26 17:55:40 2010 +0900 +++ b/src/fdl/test/TestTree.java Wed May 26 19:01:51 2010 +0900 @@ -15,30 +15,58 @@ public class TestTree { - + /** + * Tree Walk Example + * configure tree structured federated Linda + * run message from tree root + * sum up data from the leaves + */ public static final int PORT = 10000; + /** + * Tuple id for the Protocol + */ public static final int ConnectReuest = 100; private static final int Down = 101; public static final int Up = 102; + /** + * Configuration binary format separator + */ private static final int NO_HOST = -1; private static final int HOST_LIST_END_MARK = -2; private static final int CONNECT_LIST_END_MARK = -3; + /** + * Global + */ public LinkedList<Thread> lindas = new LinkedList<Thread>(); public int id = 0; public boolean debug = true; class TreeNode implements Runnable { + /** + * Linda Server Thread + */ public int id, port; public TreeNode(int id, int port) { this.id = id; this.port = port; } public void run() { String[] args = { - // "-d", + // "-d", // Debug Message Flag "-p",Integer.toString(port)}; FDLindaServ.main(new TreeMetaProtocolEngine(id,port), args); } } + /** + * @author kono + * + */ + /** + * @author kono + * + */ class TreeMetaProtocolEngine implements MetaEngine { + /** + * Meta Engine for Configuration and Tree Walk + */ boolean running = true; LinkedList<ByteBuffer> leftWaiter = new LinkedList<ByteBuffer>(); LinkedList<ByteBuffer> rightWaiter = new LinkedList<ByteBuffer>(); @@ -53,18 +81,27 @@ nodePort = port; } + /** + * Meta Engine Main Loop + */ public void mainLoop(final MetaLinda ml) { + /* handler of Server STOP command */ ml.in(PSX.META_STOP, new PSXCallback() {public void callback(ByteBuffer reply) { running = false;}}); + /* handler of Configuration */ ml.in(ConnectReuest, new PSXCallback() {public void callback(ByteBuffer reply) { connectChildren(ml, reply); } }); while(running) { - ml.sync(0); + ml.sync(0); // Wait forever } } + /** + * Configuration Packet Decoder for Tree Node + * @param reply + */ private void setHostPort(ByteBuffer reply) { parentPort = reply.getInt(); leftPort = reply.getInt(); @@ -77,21 +114,27 @@ /** * parent * | - * self + * self = ml (Meta Linda Server) * / \ * left right * - * @param ml - * @param reply + * @param ml Meda Linda Server + * @param reply a contents of configuration packaet */ private void connectChildren(final MetaLinda ml, ByteBuffer reply) { setHostPort(reply); try { + /** + * connect to neighbors + */ if (parentPort!=0) parent = ml.fdl.open(parentHost, parentPort); if (leftPort!=0) left = ml.fdl.open(leftHost, leftPort); if (rightPort!=0) right = ml.fdl.open(rightHost, rightPort); if (debug) System.out.println("Connect this port="+nodePort+" parent="+parentPort+" left="+leftPort+" right="+rightPort); + /* + * Handle Downward tree walk + */ ml.in(Down, new PSXCallback() {public void callback(ByteBuffer reply) { ml.in(Down,this); if (left==null) { @@ -103,6 +146,7 @@ ml.out(Up, answer); return; } + // Intermideate node if (debug) System.out.println("Pass it to the children from "+nodeId + " to "+ leftPort + " and " + rightPort + "."); ByteBuffer copy = reply.duplicate(); left.out(Down, reply); @@ -110,6 +154,9 @@ } }); + /** + * Handle upward tree walk + */ if (leftPort!=0) left.in(Up, new PSXCallback() {public void callback(ByteBuffer reply) { if (debug) System.out.println("Up from left at"+nodeId); @@ -131,11 +178,17 @@ } } + /** + * Wait for all message from the sub node and + * calc sum and pass it to the parent. + * @param ml + */ private void checkSend(MetaLinda ml) { if (leftWaiter.isEmpty()||rightWaiter.isEmpty()) return; ByteBuffer out = ByteBuffer.allocate(10); int value = leftWaiter.poll().getInt()+rightWaiter.poll().getInt(); if (parent!=null) { + // Intermedate node if (debug) System.out.println("Up the vluae "+value+" from "+nodeId); out.putInt(value); out.flip(); @@ -148,14 +201,23 @@ } - class Congigure implements Runnable { + /** + * @author kono + * Configuration Manager + * Read Configuration + * start linda servers + * send connect message to the servers + * then send start message + * stop all the server with Meta stop message + */ + class Configure implements Runnable { public String id; public ByteBuffer config; public FederatedLinda fdl; public LinkedList<PSXLinda> psxs = new LinkedList<PSXLinda>(); private LinkedList<String> hosts = new LinkedList<String>(); private LinkedList<Integer> ports = new LinkedList<Integer>(); - public Congigure(int id,ByteBuffer config) { this.id = "Configure"+id; + public Configure(int id,ByteBuffer config) { this.id = "Configure"+id; this.config = config; } public void run() { @@ -163,6 +225,10 @@ main(args); } + /** + * Configuration main + * @param arg + */ public void main(String[] arg) { final String name = arg[0]; try { @@ -177,6 +243,9 @@ } + /** + * Send start messages to configured servers + */ public void startTest() { System.out.println("StartTest"); try { @@ -191,6 +260,10 @@ } } + /** + * Send Server Stop message + * @throws IOException + */ public void stop() throws IOException { ByteBuffer data = ByteBuffer.allocate(10); for(PSXLinda psx:psxs) { @@ -199,6 +272,12 @@ } } + /** + * This is not used. Open with connection id + * @param host + * @param port + * @throws IOException + */ public void openLinda(String host, int port) throws IOException { FederatedLinda fdl; PSXLinda psx; @@ -219,12 +298,18 @@ return ; } + /** + * Read connection p configuation in a ByteBuffer + * @throws IOException + */ public void readConfigure() throws IOException { int mode = 0; int host = 0; while(config.hasRemaining()) { int port = config.getInt(); if (mode==0) { + // Formar part + // List of host and server name if (port==HOST_LIST_END_MARK) { mode = 1; continue; @@ -236,7 +321,11 @@ ports.add(port); } else { + // Later part + // create interconnection configuration packet + // send it to the server if (port == CONNECT_LIST_END_MARK) return; + // connect this, it's parent, left and right connect(host++, port,config.getInt(),config.getInt()); } } @@ -322,14 +411,22 @@ } public void test1() throws InterruptedException { + // Make Configuration First ByteBuffer config = makeConfig(); - sleep(2000); + sleep(2000); // Wait for servers' start System.out.println("Start Configure"); - Thread r1 = new Thread(new Congigure(1,config)); + // Start configuration server and start test + Thread r1 = new Thread(new Configure(1,config)); r1.start(); r1.join(); } + /** + * Make Configuration + * start servers now + * No connections yet + * @return + */ private ByteBuffer makeConfig() { ByteBuffer config = ByteBuffer.allocate(4096); putHostRun(config,"localhost",PORT);