Mercurial > hg > FederatedLinda
view src/fdl/FederatedLinda.java @ 25:330fa49bc4fd
*** empty log message ***
author | kono |
---|---|
date | Wed, 20 Aug 2008 14:12:15 +0900 |
parents | 35375016b2f0 |
children | 846c6c14cf04 |
line wrap: on
line source
/* * @(#)FederatedLinda.java 1.1 06/04/01 * * Copyright 2006 Shinji KONO * PSX Lidna Trasport layer of PSX Linda library */ package fdl; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.nio.channels.ClosedSelectorException; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.Hashtable; import java.util.Set; /** FederatedLinda * * @author Shinji Kono * * @param mytsid Tuple Space ID Initialize connection channel for a tuple space one instance for each Tuple space connection */ public class FederatedLinda { static FederatedLinda fdl = new FederatedLinda(); static int MAX_SEQUENCE = 2048; static boolean debug = true; public int tid; public int seq; public int qsize; public PSXLinda linda; public Selector selector; public PSXQueue q_top,q_end; public PSXReply r_top,r_end; public Hashtable<Integer,PSXReply> seqHash; public static FederatedLinda init() { return fdl; } private FederatedLinda() { try { selector = Selector.open(); } catch (IOException e) { e.printStackTrace(); } seqHash = new Hashtable<Integer, PSXReply>(); } public PSXLinda open(String _host,int _port) throws IOException { tid++; PSXLindaImpl newlinda = new PSXLindaImpl(this,tid,_host,_port); linda = newlinda.add(linda); return linda; } public PSXReply psx_queue(PSXLinda linda, int id, ByteBuffer s, int mode, PSXCallback callback) { PSXQueue c = new PSXQueue(linda,id,mode,s,callback); if (q_top == null) { c = q_end = q_top = c; } else { q_end.next = c; q_end = c; } qsize++; if (mode != PSX.PSX_OUT) { PSXReply p = new PSXReply(PSX.PSX_REPLY,callback); p.seq = seq(p); c.setSeq(p.seq); if (r_top == null){ r_end = r_top = p; } else { r_end.next = p; r_end = p; } return p; } return null; } public int seq(PSXReply reply) { Integer s; do { seq++; if (seq>MAX_SEQUENCE) { seq = 0; } s = new Integer(seq); } while (seqHash.containsKey(s)); if (debug) { System.out.print("hash value = "); System.out.println(s.hashCode()); } seqHash.put(s,reply); seq++; return seq-1; } public Selector selector() { return selector; } public int sync() throws IOException { return sync(0); } public int sync(long mtimeout) throws IOException { int key_num = 0; Set<SelectionKey> keys; while (q_top != null){ PSXQueue c = q_top; c.send(); q_top = c.next; qsize--; } try { key_num = selector.select(mtimeout); keys = selector.selectedKeys(); for (SelectionKey key : keys) { SocketChannel sock = (SocketChannel)key.channel(); chkServe(sock); } } catch (IOException e) { e.printStackTrace(); } catch (ClosedSelectorException e) { e.printStackTrace(); } return key_num; } private void chkServe(SocketChannel sock) throws IOException { int length; ByteBuffer command = ByteBuffer.allocate(PSX.LINDA_HEADER_SIZE); command.order(ByteOrder.BIG_ENDIAN); sock.read(command); command.rewind(); length = command.getInt(PSX.LINDA_DATA_LENGTH_OFFSET); if (length>0) { ByteBuffer data = ByteBuffer.allocate(length); int read = length; if (debug) { System.out.print("client reading:"); System.out.println(length); } data.order(ByteOrder.BIG_ENDIAN); while(read>0) { read -= sock.read(data); } data.rewind(); if (debug) { PSX.printCommand("chkServe:",command, data); } int rseq = command.getInt(PSX.LINDA_SEQ_OFFSET); int mode = command.get(PSX.LINDA_MODE_OFFSET); PSXReply r = getReply(rseq); r.setAnswer(mode,command,data); if (r.callback != null ) { r.callback.callback(data); } } } private PSXReply getReply(int rseq) { Integer a; PSXReply r = seqHash.get((a = new Integer(rseq))); if (r==null) { System.out.println("hashed reply not found"); } seqHash.remove(a); return r; } } /* end */