view src/fdl/FederatedLinda.java @ 19:0243987383b7

Meta Protocol Engine and sample implementation of event logger. ComDebug_Client needs fixes.
author kono
date Tue, 19 Aug 2008 05:33:32 +0900
parents 609b288f47f9
children fac6e0073b1a
line wrap: on
line source


/*
 * @(#)FederatedLinda.java       1.1 06/04/01
 *
 * Copyright 2006  Shinji KONO
 * 

   PSX Lidna
     Trasport layer of PSX Linda library

 */

package fdl;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.ClosedSelectorException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Hashtable;
import java.util.Set;


/**
 FederatedLinda 
 *
 * @author Shinji Kono
 *
 * @param mytsid   Tuple Space ID

   Initialize connection channel for a tuple space

   one instance for each Tuple space connection

 */

public class FederatedLinda  {

	static FederatedLinda fdl;
	static int MAX_SEQUENCE = 2048;
	static boolean debug = true;

	public int tid;
	public int seq;
	public int qsize;
	public PSXLindaInterface linda;

	public Selector selector;

	public PSXQueue q_top,q_end;
	public PSXReply r_top,r_end;
	public Hashtable<Integer,PSXReply> seqHash;

	static FederatedLinda init() 
	throws IOException {
		if (fdl==null) {
			fdl = new FederatedLinda();
		}
		return fdl;
	}

	private FederatedLinda() 
	throws IOException {
		selector = Selector.open();
		seqHash =  new Hashtable<Integer, PSXReply>();
	}

	public PSXLindaInterface open(String _host,int _port) 
	throws IOException {
		tid++;
		// System.out.print("Tid = ");
		// System.out.println(tid);
		PSXLindaInterface newlinda = new PSXLinda(this,tid,_host,_port);
		linda = newlinda.add(linda);
		return linda;
	}

	/**
  psx_queue (unsigned int tspace_id, unsigned int id,
             unsigned int size, unsigned char *data, char mode,
             void(*callback)(char*,void*), void * obj):   
	 */    

	public PSXReply psx_queue(PSXLinda linda, int id, ByteBuffer s, int length, int mode, PSXCallback callback) {
		PSXQueue c = new PSXQueue(linda,id,mode,s,length,callback);

		if (q_top == null) {
			c = q_end = q_top = c;
		} else {
			q_end.next = c;
			q_end = c;
		}
		qsize++;

		if (mode != PSX.PSX_OUT) {  
			PSXReply p = new PSXReply(PSX.PSX_REPLY,callback);
			p.seq = seq(p);
			c.setSeq(p.seq);
			if (debug) {
				System.out.print("Integer compare=");
				System.out.println((new Integer(2)).equals(new Integer(2)));
				System.out.print("Seding seq=");
				System.out.println(p.seq);
			}
			if (r_top == null){
				r_end = r_top = p;
			} else {
				r_end.next = p;
				r_end = p;
			}
			return p;
		}
		return null;
	}

	public int seq(PSXReply reply) {
		Integer s;
		do {
			seq++;
			if (seq>MAX_SEQUENCE) {
				seq = 0;
			}
			s = new Integer(seq);
		} while (seqHash.containsKey(s));
		if (debug) {
			System.out.print("hash value = ");
			System.out.println(s.hashCode());
		}
		seqHash.put(s,reply);
		seq++;
		return seq-1;
	}

	public Selector selector() {
		return selector;
	}

	public int sync() throws IOException {
		return sync(0);
	}

	public int sync(long mtimeout) 
	throws IOException {
		int key_num = 0;
		Set<SelectionKey> keys;

		while (q_top != null){
			PSXQueue c = q_top;
			c.Send();
			q_top = c.next;
			// psx_free(c);
			// q_top = c = t;
			qsize--;
		}

		try {
			key_num = selector.select(mtimeout);
			keys = selector.selectedKeys();
			for (SelectionKey key : keys) {
				// System.out.println("selecting");
				SocketChannel sock = (SocketChannel)key.channel();
				chkServe(sock);
			}
		} catch (IOException e) {
			e.printStackTrace();
		} catch (ClosedSelectorException e) {
			e.printStackTrace();
		}

		return key_num;
	}

	private void chkServe(SocketChannel sock) 
	throws IOException {
		int length;
		ByteBuffer command = ByteBuffer.allocate(PSX.LINDA_HEADER_SIZE);
		command.order(ByteOrder.BIG_ENDIAN);

		sock.read(command);
		command.rewind();
		length =  command.getInt(PSX.LINDA_DATA_LENGTH_OFFSET);
		if (length>0) {
			ByteBuffer data = ByteBuffer.allocate(length);
			int read = length;
			if (debug) {
				System.out.print("reading:");
				System.out.println(length);
			}

			data.order(ByteOrder.BIG_ENDIAN);
			while(read>0) {
				read -= sock.read(data);
			}
			data.rewind();

			if (debug) {
				PSX.printCommand(command, data);
			}
			/***if (debug) {
	    	System.out.print("header:");
	    	for(int i=0;i<LINDA_HEADER_SIZE;i++) {
	    		System.out.println(command.get(i));
	    	}
	    	System.out.print("data:");
	    	for(int i=0;i<length;i++) {
	    		System.out.println(data.get(i));
	    	}
	    	data.rewind();
	    }***/

			int rseq = command.getInt(PSX.LINDA_SEQ_OFFSET);
			int mode = command.get(PSX.LINDA_MODE_OFFSET);
			Integer a;
			/***
	    if (debug) {
	    	System.out.print("mode = ");
	    	System.out.println(mode);
	    	System.out.print("seq = ");
	    	System.out.println(rseq);
	    }***/
			try {
				PSXReply r = seqHash.get((a = new Integer(rseq)));
				seqHash.remove(a);
				if (debug) {
					System.out.print("hash value = ");
					System.out.println(a.hashCode());
				}

				r.setAnswer(mode,command,data);

				if (r.callback != null ) {
					r.callback.callback(data);
				}
			} catch (NullPointerException e ) {
				if (debug) {
					System.out.println("hashed reply not found");
				}
				// can't happen
				return ;
			}
		}
	}
}

/* end */