Mercurial > hg > FederatedLinda
view src/fdl/FederatedLinda.java @ 0:083a0b5e12cc
Apply Debug Interface version start
author | fuchita |
---|---|
date | Thu, 07 Feb 2008 14:21:30 +0900 |
parents | |
children | b49e593b2502 |
line wrap: on
line source
/* * @(#)FederatedLinda.java 1.1 06/04/01 * * Copyright 2006 Shinji KONO * PSX Lidna Trasport layer of PSX Linda library */ package fdl; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.nio.channels.ClosedSelectorException; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.Hashtable; import java.util.Set; /** FederatedLinda * * @author Shinji Kono * * @param mytsid Tuple Space ID Initialize connection channel for a tuple space one instance for each Tuple space connection */ public class FederatedLinda implements PSXQueueInterface { static FederatedLinda fdl; static int MAX_SEQUENCE = 2048; static final boolean debug = true; public int tid; public int seq; public int qsize; public PSXLinda linda; public Selector selector; public PSXQueue q_top,q_end; public PSXReply r_top,r_end; public Hashtable<Integer,PSXReply> seqHash; static FederatedLinda init() throws IOException { if (fdl==null) { fdl = new FederatedLinda(); } return fdl; } private FederatedLinda() throws IOException { selector = Selector.open(); seqHash = new Hashtable<Integer, PSXReply>(); } public PSXLinda open(String _host,int _port) throws IOException { tid++; // System.out.print("Tid = "); // System.out.println(tid); PSXLinda newlinda = new PSXLinda(this,tid,_host,_port); linda = newlinda.add(linda); return linda; } /** psx_queue (unsigned int tspace_id, unsigned int id, unsigned int size, unsigned char *data, char mode, void(*callback)(char*,void*), void * obj): */ public PSXReply psx_queue(PSXLinda linda, int id, ByteBuffer s, int length, int mode, PSXCallback callback) { PSXQueue c = new PSXQueue(linda,id,mode,s,length,callback); if (q_top == null) { c = q_end = q_top = c; } else { q_end.next = c; q_end = c; } qsize++; if (mode != PSX_OUT) { PSXReply p = new PSXReply(PSX_REPLY,callback); p.seq = seq(p); c.setSeq(p.seq); if (debug) { System.out.print("Integer compare="); System.out.println((new Integer(2)).equals(new Integer(2))); System.out.print("Seding seq="); System.out.println(p.seq); } if (r_top == null){ r_end = r_top = p; } else { r_end.next = p; r_end = p; } return p; } return null; } public int seq(PSXReply reply) { Integer s; do { seq++; if (seq>MAX_SEQUENCE) { seq = 0; } s = new Integer(seq); } while (seqHash.containsKey(s)); if (debug) { System.out.print("hash value = "); System.out.println(s.hashCode()); } seqHash.put(s,reply); seq++; return seq-1; } public Selector selector() { return selector; } public int sync() throws IOException { return sync(0); } public int sync(long mtimeout) throws IOException { int key_num = 0; Set<SelectionKey> keys; while (q_top != null){ PSXQueue c = q_top; c.Send(); q_top = c.next; // psx_free(c); // q_top = c = t; qsize--; } try { key_num = selector.select(mtimeout); keys = selector.selectedKeys(); for (SelectionKey key : keys) { // System.out.println("selecting"); SocketChannel sock = (SocketChannel)key.channel(); chkServe(sock); } } catch (IOException e) { e.printStackTrace(); } catch (ClosedSelectorException e) { e.printStackTrace(); } return key_num; } // should be in PSXLinda, but sock->linda is unknown here private void chkServe(SocketChannel sock) throws IOException { int length; ByteBuffer command = ByteBuffer.allocate(LINDA_HEADER_SIZE); command.order(ByteOrder.BIG_ENDIAN); sock.read(command); command.rewind(); length = command.getInt(LINDA_DATA_LENGTH_OFFSET); if (length>0) { ByteBuffer data = ByteBuffer.allocate(length); int read = length; if (debug) { System.out.print("reading:"); System.out.println(length); } data.order(ByteOrder.BIG_ENDIAN); while(read>0) { read -= sock.read(data); } data.rewind(); if (debug) { System.out.print("header:"); for(int i=0;i<LINDA_HEADER_SIZE;i++) { System.out.println(command.get(i)); } System.out.print("data:"); for(int i=0;i<length;i++) { System.out.println(data.get(i)); } data.rewind(); } int rseq = command.getInt(LINDA_SEQ_OFFSET); int mode = command.get(LINDA_MODE_OFFSET); Integer a; if (debug) { System.out.print("mode = "); System.out.println(mode); System.out.print("seq = "); System.out.println(rseq); } try { PSXReply r = seqHash.get((a = new Integer(rseq))); if (debug) { System.out.print("hash value = "); System.out.println(a.hashCode()); } r.setAnswer(mode,command,data); if (r.callback != null ) { r.callback.callback(data); } } catch (NullPointerException e ) { if (debug) { System.out.println("hashed reply not found"); } // can't happen return ; } } } } /* end */