view src/fdl/FederatedLinda.java @ 0:083a0b5e12cc

Apply Debug Interface version start
author fuchita
date Thu, 07 Feb 2008 14:21:30 +0900
parents
children b49e593b2502
line wrap: on
line source


/*
 * @(#)FederatedLinda.java       1.1 06/04/01
 *
 * Copyright 2006  Shinji KONO
 * 

   PSX Lidna
     Trasport layer of PSX Linda library

 */

package fdl;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.ClosedSelectorException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Hashtable;
import java.util.Set;


/**
 FederatedLinda 
 *
 * @author Shinji Kono
 *
 * @param mytsid   Tuple Space ID

   Initialize connection channel for a tuple space

   one instance for each Tuple space connection

 */

public class FederatedLinda implements PSXQueueInterface {

    static FederatedLinda fdl;
    static int MAX_SEQUENCE = 2048;
    static final boolean debug = true;

    public int tid;
    public int seq;
    public int qsize;
    public PSXLinda linda;

    public Selector selector;

    public PSXQueue q_top,q_end;
    public PSXReply r_top,r_end;
    public Hashtable<Integer,PSXReply> seqHash;

    static FederatedLinda init() 
			throws IOException {
	if (fdl==null) {
	    fdl = new FederatedLinda();
	}
	return fdl;
    }

    private FederatedLinda() 
			throws IOException {
	selector = Selector.open();
	seqHash =  new Hashtable<Integer, PSXReply>();
    }

    public PSXLinda open(String _host,int _port) 
                                throws IOException {
	tid++;
	// System.out.print("Tid = ");
	// System.out.println(tid);
	PSXLinda newlinda = new PSXLinda(this,tid,_host,_port);
	linda = newlinda.add(linda);
	return linda;
    }

/**
  psx_queue (unsigned int tspace_id, unsigned int id,
             unsigned int size, unsigned char *data, char mode,
             void(*callback)(char*,void*), void * obj):   
 */    
    
    public PSXReply psx_queue(PSXLinda linda, int id, ByteBuffer s, int length, int mode, PSXCallback callback) {
	PSXQueue c = new PSXQueue(linda,id,mode,s,length,callback);

	if (q_top == null) {
	    c = q_end = q_top = c;
	} else {
	    q_end.next = c;
	    q_end = c;
	}
	qsize++;
	
	if (mode != PSX_OUT) {  
	    PSXReply p = new PSXReply(PSX_REPLY,callback);
	    p.seq = seq(p);
	    c.setSeq(p.seq);
	    if (debug) {
	    	System.out.print("Integer compare=");
	    	System.out.println((new Integer(2)).equals(new Integer(2)));
	    	System.out.print("Seding seq=");
	    	System.out.println(p.seq);
	    }
	    if (r_top == null){
		r_end = r_top = p;
	    } else {
		r_end.next = p;
		r_end = p;
	    }
	    return p;
	}
	return null;
    }

    public int seq(PSXReply reply) {
	Integer s;
	do {
	    seq++;
	    if (seq>MAX_SEQUENCE) {
		seq = 0;
	    }
	    s = new Integer(seq);
	} while (seqHash.containsKey(s));
	if (debug) {
		System.out.print("hash value = ");
		System.out.println(s.hashCode());
	}
	seqHash.put(s,reply);
	seq++;
	return seq-1;
    }

    public Selector selector() {
	return selector;
    }

    public int sync() throws IOException {
	return sync(0);
    }

    public int sync(long mtimeout) 
                                throws IOException {
        int key_num = 0;
	Set<SelectionKey> keys;

	while (q_top != null){
	    PSXQueue c = q_top;
	    c.Send();
	    q_top = c.next;
	    // psx_free(c);
	    // q_top = c = t;
	    qsize--;
	}

        try {
            key_num = selector.select(mtimeout);
            keys = selector.selectedKeys();
	    for (SelectionKey key : keys) {
	    // System.out.println("selecting");
		SocketChannel sock = (SocketChannel)key.channel();
		chkServe(sock);
	    }
        } catch (IOException e) {
            e.printStackTrace();
        } catch (ClosedSelectorException e) {
            e.printStackTrace();
        }

        return key_num;
    }

// should be in PSXLinda, but sock->linda is unknown here

    private void chkServe(SocketChannel sock) 
				    throws IOException {
	int length;
	ByteBuffer command = ByteBuffer.allocate(LINDA_HEADER_SIZE);
	command.order(ByteOrder.BIG_ENDIAN);

	sock.read(command);
	command.rewind();
	length =  command.getInt(LINDA_DATA_LENGTH_OFFSET);
	if (length>0) {
	    ByteBuffer data = ByteBuffer.allocate(length);
	    int read = length;
	    if (debug) {
	    	System.out.print("reading:");
	    	System.out.println(length);
	    }

	    data.order(ByteOrder.BIG_ENDIAN);
	    while(read>0) {
		read -= sock.read(data);
	    }
	    data.rewind();
	    if (debug) {
	    	System.out.print("header:");
	    	for(int i=0;i<LINDA_HEADER_SIZE;i++) {
	    		System.out.println(command.get(i));
	    	}
	    	System.out.print("data:");
	    	for(int i=0;i<length;i++) {
	    		System.out.println(data.get(i));
	    	}
	    	data.rewind();
	    }
	    int rseq = command.getInt(LINDA_SEQ_OFFSET);
	    int mode = command.get(LINDA_MODE_OFFSET);
	    Integer a;
	    if (debug) {
	    	System.out.print("mode = ");
	    	System.out.println(mode);
	    	System.out.print("seq = ");
	    	System.out.println(rseq);
	    }
	    try {
	    	PSXReply r = seqHash.get((a = new Integer(rseq)));
		if (debug) {
			System.out.print("hash value = ");
			System.out.println(a.hashCode());
		}
		
		r.setAnswer(mode,command,data);

		if (r.callback != null ) {
		    r.callback.callback(data);
		}
	    } catch (NullPointerException e ) {
	    	if (debug) {
	    		System.out.println("hashed reply not found");
	    	}
		// can't happen
		return ;
	    }
    }
    }
}

/* end */