Mercurial > hg > FederatedLinda
view src/fdl/FederatedLinda.java @ 95:7bf2eeea23a0 fuchita
Merge with ea4ee892baf5df730093bbee7d5eb2f5b5acaf53
author | one |
---|---|
date | Tue, 25 May 2010 23:11:11 +0900 |
parents | a1d796c0e975 c0591636a71a |
children | 96c63bc659d4 |
line wrap: on
line source
/* * @(#)FederatedLinda.java 1.1 06/04/01 * * Copyright 2006 Shinji KONO * PSX Lidna Trasport layer of PSX Linda library */ package fdl; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.ClosedSelectorException; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.util.Hashtable; import java.util.Iterator; import java.util.logging.Level; /** FederatedLinda * * @author Shinji Kono * * @param mytsid Tuple Space ID Initialize connection channel for a tuple space one instance for each Tuple space connection */ public class FederatedLinda { FederatedLinda fdl; static int MAX_SEQUENCE = 2048; static boolean debug = false; public int tid; public int seq; public int qsize; public PSXLinda linda; public Selector selector; public PSXQueue q_top,q_end; public PSXReply r_top,r_end; public Hashtable<Integer,PSXReply> seqHash; public static FederatedLinda init() { FederatedLinda fdl = new FederatedLinda(); return fdl; } private FederatedLinda() { try { selector = Selector.open(); } catch (IOException e) { e.printStackTrace(); } seqHash = new Hashtable<Integer, PSXReply>(); } public PSXLinda open(String _host,int _port) throws IOException { tid++; PSXLindaImpl newlinda = new PSXLindaImpl(this,selector,tid,_host,_port); linda = newlinda.add(linda); return linda; } PSXLinda openFromMetaLinda(MetaLinda metaLinda, String _host, int _port) throws IOException { tid++; PSXLindaImpl newlinda = new PSXLindaImpl(this,metaLinda.fds.selector,tid,_host,_port); linda = newlinda.add(linda); return newlinda; } public PSXReply psx_queue(PSXLinda linda, int id, ByteBuffer s, int mode, PSXCallback callback) { PSXQueue c = new PSXQueue(linda,id,mode,s,callback); if (q_top == null) { c = q_end = q_top = c; } else { q_end.next = c; q_end = c; } qsize++; if (mode != PSX.PSX_OUT) { PSXReply p = new PSXReply(PSX.PSX_REPLY,callback); p.seq = seq(p); c.setSeq(p.seq); if (r_top == null){ r_end = r_top = p; } else { r_end.next = p; r_end = p; } return p; } return null; } public int seq(PSXReply reply) { Integer s; do { seq++; if (seq>MAX_SEQUENCE) { seq = 0; } s = new Integer(seq); } while (seqHash.containsKey(s)); if (debug) { log(Level.INFO,"hash value = "+s.hashCode()); } seqHash.put(s,reply); seq++; return seq-1; } public Selector selector() { return selector; } public int sync() throws IOException { return sync(0); } public int sync(long mtimeout) throws IOException { int key_num = 0; queueExec(); try { if (selector.select(mtimeout)>0) { for (Iterator<SelectionKey> it = selector.selectedKeys().iterator();it.hasNext(); ) { SelectionKey s = it.next(); it.remove(); try { if (!s.isReadable()) throw new IOException(); TupleHandler handle = (TupleHandler)s.attachment(); handle.handle(s); } catch (IOException e) { s.cancel(); log(Level.INFO,""+s.channel()+" is closed."); } } } } catch (IOException e) { e.printStackTrace(); } catch (ClosedSelectorException e) { // client should be know } return key_num; } public void queueExec() { while (q_top != null){ PSXQueue c = q_top; c.send(); q_top = c.next; qsize--; } } PSXReply getReply(int rseq) { Integer a; PSXReply r = seqHash.get((a = new Integer(rseq))); seqHash.remove(a); return r; } public void log(Level level,String msg) { System.err.println(msg); if (level==Level.SEVERE) new IOException().setStackTrace(null); } public void wakeup() { selector.wakeup(); } } /* end */