Mercurial > hg > FederatedLinda
changeset 11:1355eb28e41d
fix seqHash removal on checkServe.
author | kono |
---|---|
date | Sat, 09 Aug 2008 19:06:32 +0900 |
parents | abd8cd62b4c6 |
children | 34821c03b206 |
files | src/fdl/FederatedLinda.java |
diffstat | 1 files changed, 262 insertions(+), 260 deletions(-) [+] |
line wrap: on
line diff
--- a/src/fdl/FederatedLinda.java Tue Jun 03 19:58:21 2008 +0900 +++ b/src/fdl/FederatedLinda.java Sat Aug 09 19:06:32 2008 +0900 @@ -39,215 +39,215 @@ public class FederatedLinda implements PSXQueueInterface { - static FederatedLinda fdl; - static int MAX_SEQUENCE = 2048; - static boolean debug = true; + static FederatedLinda fdl; + static int MAX_SEQUENCE = 2048; + static boolean debug = true; - public int tid; - public int seq; - public int qsize; - public PSXLinda linda; + public int tid; + public int seq; + public int qsize; + public PSXLinda linda; + + public Selector selector; - public Selector selector; + public PSXQueue q_top,q_end; + public PSXReply r_top,r_end; + public Hashtable<Integer,PSXReply> seqHash; - public PSXQueue q_top,q_end; - public PSXReply r_top,r_end; - public Hashtable<Integer,PSXReply> seqHash; - - static FederatedLinda init() - throws IOException { - if (fdl==null) { - fdl = new FederatedLinda(); + static FederatedLinda init() + throws IOException { + if (fdl==null) { + fdl = new FederatedLinda(); + } + return fdl; } - return fdl; - } - private FederatedLinda() - throws IOException { - selector = Selector.open(); - seqHash = new Hashtable<Integer, PSXReply>(); - } + private FederatedLinda() + throws IOException { + selector = Selector.open(); + seqHash = new Hashtable<Integer, PSXReply>(); + } - public PSXLinda open(String _host,int _port) - throws IOException { - tid++; - // System.out.print("Tid = "); - // System.out.println(tid); - PSXLinda newlinda = new PSXLinda(this,tid,_host,_port); - linda = newlinda.add(linda); - return linda; - } + public PSXLinda open(String _host,int _port) + throws IOException { + tid++; + // System.out.print("Tid = "); + // System.out.println(tid); + PSXLinda newlinda = new PSXLinda(this,tid,_host,_port); + linda = newlinda.add(linda); + return linda; + } -/** + /** psx_queue (unsigned int tspace_id, unsigned int id, unsigned int size, unsigned char *data, char mode, void(*callback)(char*,void*), void * obj): - */ - - public PSXReply psx_queue(PSXLinda linda, int id, ByteBuffer s, int length, int mode, PSXCallback callback) { - PSXQueue c = new PSXQueue(linda,id,mode,s,length,callback); + */ + + public PSXReply psx_queue(PSXLinda linda, int id, ByteBuffer s, int length, int mode, PSXCallback callback) { + PSXQueue c = new PSXQueue(linda,id,mode,s,length,callback); - if (q_top == null) { - c = q_end = q_top = c; - } else { - q_end.next = c; - q_end = c; - } - qsize++; - - if (mode != PSX_OUT) { - PSXReply p = new PSXReply(PSX_REPLY,callback); - p.seq = seq(p); - c.setSeq(p.seq); - if (debug) { - System.out.print("Integer compare="); - System.out.println((new Integer(2)).equals(new Integer(2))); - System.out.print("Seding seq="); - System.out.println(p.seq); - } - if (r_top == null){ - r_end = r_top = p; - } else { - r_end.next = p; - r_end = p; - } - return p; - } - return null; - } + if (q_top == null) { + c = q_end = q_top = c; + } else { + q_end.next = c; + q_end = c; + } + qsize++; - public int seq(PSXReply reply) { - Integer s; - do { - seq++; - if (seq>MAX_SEQUENCE) { - seq = 0; - } - s = new Integer(seq); - } while (seqHash.containsKey(s)); - if (debug) { - System.out.print("hash value = "); - System.out.println(s.hashCode()); + if (mode != PSX_OUT) { + PSXReply p = new PSXReply(PSX_REPLY,callback); + p.seq = seq(p); + c.setSeq(p.seq); + if (debug) { + System.out.print("Integer compare="); + System.out.println((new Integer(2)).equals(new Integer(2))); + System.out.print("Seding seq="); + System.out.println(p.seq); + } + if (r_top == null){ + r_end = r_top = p; + } else { + r_end.next = p; + r_end = p; + } + return p; + } + return null; } - seqHash.put(s,reply); - seq++; - return seq-1; - } - public Selector selector() { - return selector; - } - - public int sync() throws IOException { - return sync(0); - } + public int seq(PSXReply reply) { + Integer s; + do { + seq++; + if (seq>MAX_SEQUENCE) { + seq = 0; + } + s = new Integer(seq); + } while (seqHash.containsKey(s)); + if (debug) { + System.out.print("hash value = "); + System.out.println(s.hashCode()); + } + seqHash.put(s,reply); + seq++; + return seq-1; + } - public int sync(long mtimeout) - throws IOException { - int key_num = 0; - Set<SelectionKey> keys; + public Selector selector() { + return selector; + } - while (q_top != null){ - PSXQueue c = q_top; - c.Send(); - q_top = c.next; - // psx_free(c); - // q_top = c = t; - qsize--; + public int sync() throws IOException { + return sync(0); } - try { - key_num = selector.select(mtimeout); - keys = selector.selectedKeys(); - for (SelectionKey key : keys) { - // System.out.println("selecting"); - SocketChannel sock = (SocketChannel)key.channel(); - chkServe(sock); - } - } catch (IOException e) { - e.printStackTrace(); - } catch (ClosedSelectorException e) { - e.printStackTrace(); - } + public int sync(long mtimeout) + throws IOException { + int key_num = 0; + Set<SelectionKey> keys; - return key_num; - } - - public int sync_com(long mtimeout) - throws IOException { - int key_num = 0; - Set<SelectionKey> keys; + while (q_top != null){ + PSXQueue c = q_top; + c.Send(); + q_top = c.next; + // psx_free(c); + // q_top = c = t; + qsize--; + } - while (q_top != null){ - PSXQueue c = q_top; - c.Send(); - q_top = c.next; - // psx_free(c); - // q_top = c = t; - qsize--; - } + try { + key_num = selector.select(mtimeout); + keys = selector.selectedKeys(); + for (SelectionKey key : keys) { + // System.out.println("selecting"); + SocketChannel sock = (SocketChannel)key.channel(); + chkServe(sock); + } + } catch (IOException e) { + e.printStackTrace(); + } catch (ClosedSelectorException e) { + e.printStackTrace(); + } - try { - key_num = selector.select(mtimeout); - keys = selector.selectedKeys(); - for (SelectionKey key : keys) { - SocketChannel sock = (SocketChannel)key.channel(); - chkCom(sock); - } - } catch (IOException e) { - e.printStackTrace(); - } catch (ClosedSelectorException e) { - e.printStackTrace(); - } + return key_num; + } + + public int sync_com(long mtimeout) + throws IOException { + int key_num = 0; + Set<SelectionKey> keys; + + while (q_top != null){ + PSXQueue c = q_top; + c.Send(); + q_top = c.next; + // psx_free(c); + // q_top = c = t; + qsize--; + } - return key_num; - } - -// should be in PSXLinda, but sock->linda is unknown here - - private void chkCom(SocketChannel sock) throws IOException { + try { + key_num = selector.select(mtimeout); + keys = selector.selectedKeys(); + for (SelectionKey key : keys) { + SocketChannel sock = (SocketChannel)key.channel(); + chkCom(sock); + } + } catch (IOException e) { + e.printStackTrace(); + } catch (ClosedSelectorException e) { + e.printStackTrace(); + } - int length; - ByteBuffer command = ByteBuffer.allocate(LINDA_HEADER_SIZE); - command.order(ByteOrder.BIG_ENDIAN); - debug = false; + return key_num; + } + +// should be in PSXLinda, but sock->linda is unknown here - sock.read(command); - command.rewind(); - length = command.getInt(LINDA_DATA_LENGTH_OFFSET); - if (length>0) { - ByteBuffer data = ByteBuffer.allocate(length); - int read = length; - if (debug) { - System.out.print("reading:"); - System.out.println(length); - } + private void chkCom(SocketChannel sock) throws IOException { + + int length; + ByteBuffer command = ByteBuffer.allocate(LINDA_HEADER_SIZE); + command.order(ByteOrder.BIG_ENDIAN); + debug = false; - data.order(ByteOrder.BIG_ENDIAN); - while(read>0) { - read -= sock.read(data); - } - data.rewind(); + sock.read(command); + command.rewind(); + length = command.getInt(LINDA_DATA_LENGTH_OFFSET); + if (length>0) { + ByteBuffer data = ByteBuffer.allocate(length); + int read = length; + if (debug) { + System.out.print("reading:"); + System.out.println(length); + } + + data.order(ByteOrder.BIG_ENDIAN); + while(read>0) { + read -= sock.read(data); + } + data.rewind(); - if (debug) { - char id = (char)command.getShort(LINDA_ID_OFFSET); - System.out.println("LENGTH:"+command.getInt(LINDA_PACKET_LENGTH_OFFSET)+" "+ - "MODE:"+(char)command.get(LINDA_MODE_OFFSET)+" "+ - "ID:"+(int)id+" "+ - "SEQ:"+command.getInt(LINDA_SEQ_OFFSET)+" "+ - "DATA LENGTH:"+command.getInt(LINDA_DATA_LENGTH_OFFSET)+" "); - System.out.println("DATA:"+data); - command.rewind(); - } - //if (debug_com) { - String comdata =""; - CharBuffer chardata = data.asCharBuffer(); - comdata = chardata.toString(); + if (debug) { + char id = (char)command.getShort(LINDA_ID_OFFSET); + System.out.println("LENGTH:"+command.getInt(LINDA_PACKET_LENGTH_OFFSET)+" "+ + "MODE:"+(char)command.get(LINDA_MODE_OFFSET)+" "+ + "ID:"+(int)id+" "+ + "SEQ:"+command.getInt(LINDA_SEQ_OFFSET)+" "+ + "DATA LENGTH:"+command.getInt(LINDA_DATA_LENGTH_OFFSET)+" "); + System.out.println("DATA:"+data); + command.rewind(); + } + //if (debug_com) { + String comdata =""; + CharBuffer chardata = data.asCharBuffer(); + comdata = chardata.toString(); - //System.out.println("Com_data =>"); - System.out.println(comdata); - //} - /***if (debug) { + //System.out.println("Com_data =>"); + System.out.println(comdata); + //} + /***if (debug) { System.out.print("header:"); for(int i=0;i<LINDA_HEADER_SIZE;i++) { System.out.println(command.get(i)); @@ -258,73 +258,74 @@ } data.rewind(); }***/ - - int rseq = command.getInt(LINDA_SEQ_OFFSET); - int mode = command.get(LINDA_MODE_OFFSET); - Integer a; - /*** + + int rseq = command.getInt(LINDA_SEQ_OFFSET); + int mode = command.get(LINDA_MODE_OFFSET); + Integer a; + /*** if (debug) { System.out.print("mode = "); System.out.println(mode); System.out.print("seq = "); System.out.println(rseq); }***/ - try { - PSXReply r = seqHash.get((a = new Integer(rseq))); - if (debug) { - System.out.print("hash value = "); - System.out.println(a.hashCode()); - } - - r.setAnswer(mode,command,data); + try { + PSXReply r = seqHash.get((a = new Integer(rseq))); + seqHash.put(a, null); + if (debug) { + System.out.print("hash value = "); + System.out.println(a.hashCode()); + } + + r.setAnswer(mode,command,data); - if (r.callback != null ) { - r.callback.callback(data); - } - } catch (NullPointerException e ) { - if (debug) { - System.out.println("hashed reply not found"); - } - // can't happen - return ; - } - } + if (r.callback != null ) { + r.callback.callback(data); + } + } catch (NullPointerException e ) { + if (debug) { + System.out.println("hashed reply not found"); + } + // can't happen + return ; + } + } } private void chkServe(SocketChannel sock) - throws IOException { - int length; - ByteBuffer command = ByteBuffer.allocate(LINDA_HEADER_SIZE); - command.order(ByteOrder.BIG_ENDIAN); + throws IOException { + int length; + ByteBuffer command = ByteBuffer.allocate(LINDA_HEADER_SIZE); + command.order(ByteOrder.BIG_ENDIAN); - sock.read(command); - command.rewind(); - length = command.getInt(LINDA_DATA_LENGTH_OFFSET); - if (length>0) { - ByteBuffer data = ByteBuffer.allocate(length); - int read = length; - if (debug) { - System.out.print("reading:"); - System.out.println(length); - } + sock.read(command); + command.rewind(); + length = command.getInt(LINDA_DATA_LENGTH_OFFSET); + if (length>0) { + ByteBuffer data = ByteBuffer.allocate(length); + int read = length; + if (debug) { + System.out.print("reading:"); + System.out.println(length); + } - data.order(ByteOrder.BIG_ENDIAN); - while(read>0) { - read -= sock.read(data); - } - data.rewind(); + data.order(ByteOrder.BIG_ENDIAN); + while(read>0) { + read -= sock.read(data); + } + data.rewind(); - if (debug) { - char id = (char)command.getShort(LINDA_ID_OFFSET); - System.out.println("LENGTH:"+command.getInt(LINDA_PACKET_LENGTH_OFFSET)+" "+ - "MODE:"+(char)command.get(LINDA_MODE_OFFSET)+" "+ - "ID:"+(int)id+" "+ - "SEQ:"+command.getInt(LINDA_SEQ_OFFSET)+" "+ - "DATA LENGTH:"+command.getInt(LINDA_DATA_LENGTH_OFFSET)+" "); - System.out.println("DATA:"+data); - command.rewind(); - } - /***if (debug) { + if (debug) { + char id = (char)command.getShort(LINDA_ID_OFFSET); + System.out.println("LENGTH:"+command.getInt(LINDA_PACKET_LENGTH_OFFSET)+" "+ + "MODE:"+(char)command.get(LINDA_MODE_OFFSET)+" "+ + "ID:"+(int)id+" "+ + "SEQ:"+command.getInt(LINDA_SEQ_OFFSET)+" "+ + "DATA LENGTH:"+command.getInt(LINDA_DATA_LENGTH_OFFSET)+" "); + System.out.println("DATA:"+data); + command.rewind(); + } + /***if (debug) { System.out.print("header:"); for(int i=0;i<LINDA_HEADER_SIZE;i++) { System.out.println(command.get(i)); @@ -335,38 +336,39 @@ } data.rewind(); }***/ - - int rseq = command.getInt(LINDA_SEQ_OFFSET); - int mode = command.get(LINDA_MODE_OFFSET); - Integer a; - /*** + + int rseq = command.getInt(LINDA_SEQ_OFFSET); + int mode = command.get(LINDA_MODE_OFFSET); + Integer a; + /*** if (debug) { System.out.print("mode = "); System.out.println(mode); System.out.print("seq = "); System.out.println(rseq); }***/ - try { - PSXReply r = seqHash.get((a = new Integer(rseq))); - if (debug) { - System.out.print("hash value = "); - System.out.println(a.hashCode()); - } - - r.setAnswer(mode,command,data); + try { + PSXReply r = seqHash.get((a = new Integer(rseq))); + seqHash.put(a, null); + if (debug) { + System.out.print("hash value = "); + System.out.println(a.hashCode()); + } + + r.setAnswer(mode,command,data); - if (r.callback != null ) { - r.callback.callback(data); + if (r.callback != null ) { + r.callback.callback(data); + } + } catch (NullPointerException e ) { + if (debug) { + System.out.println("hashed reply not found"); + } + // can't happen + return ; + } } - } catch (NullPointerException e ) { - if (debug) { - System.out.println("hashed reply not found"); - } - // can't happen - return ; - } - } - } + } } /* end */