Mercurial > hg > FederatedLinda
changeset 31:846c6c14cf04
worked?
author | kono |
---|---|
date | Fri, 22 Aug 2008 14:48:41 +0900 |
parents | fca6eec8016f |
children | 7e0f6f00763e |
files | src/fdl/ComDebug_Client.java src/fdl/FederatedLinda.java src/fdl/IOHandler.java src/fdl/MetaLinda.java src/fdl/MetaLogEngine.java src/fdl/MetaReply.java src/fdl/PSX.java src/fdl/TupleSpace.java |
diffstat | 8 files changed, 79 insertions(+), 76 deletions(-) [+] |
line wrap: on
line diff
--- a/src/fdl/ComDebug_Client.java Thu Aug 21 18:46:40 2008 +0900 +++ b/src/fdl/ComDebug_Client.java Fri Aug 22 14:48:41 2008 +0900 @@ -76,12 +76,12 @@ } psx.in(65535, new MyCallBack(psx)); - System.out.println("COM_DEBUG Connected.["+host+":"+port+"]"); + System.err.println("COM_DEBUG Connected.["+host+":"+port+"]"); psx.out(PSX.META_MONITOR, nullBuffer); debugCallback = new PSXCallback() { public void callback(ByteBuffer reply) { - System.out.println(PSX.getdataString(reply)); + System.err.println("COM_DEBUG: "+PSX.getdataString(reply)); psx.out(PSX.META_MONITOR, nullBuffer); psx.in(PSX.META_MONITOR_DATA,debugCallback); }
--- a/src/fdl/FederatedLinda.java Thu Aug 21 18:46:40 2008 +0900 +++ b/src/fdl/FederatedLinda.java Fri Aug 22 14:48:41 2008 +0900 @@ -20,6 +20,7 @@ import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.Hashtable; +import java.util.Iterator; import java.util.Set; @@ -129,8 +130,6 @@ public int sync(long mtimeout) throws IOException { int key_num = 0; - Set<SelectionKey> keys; - while (q_top != null){ PSXQueue c = q_top; c.send(); @@ -139,11 +138,12 @@ } try { - key_num = selector.select(mtimeout); - keys = selector.selectedKeys(); - for (SelectionKey key : keys) { - SocketChannel sock = (SocketChannel)key.channel(); - chkServe(sock); + if (selector.select(mtimeout)>0) { + for (Iterator<SelectionKey> it = selector.selectedKeys().iterator();it.hasNext(); ) { + SelectionKey s = it.next(); + it.remove(); + chkServe((SocketChannel)s.channel()); + } } } catch (IOException e) { e.printStackTrace(); @@ -159,36 +159,37 @@ int length; ByteBuffer command = ByteBuffer.allocate(PSX.LINDA_HEADER_SIZE); command.order(ByteOrder.BIG_ENDIAN); + PSX.receive(sock, command, PSX.LINDA_HEADER_SIZE); - sock.read(command); - command.rewind(); length = command.getInt(PSX.LINDA_DATA_LENGTH_OFFSET); - if (length>0) { - ByteBuffer data = ByteBuffer.allocate(length); - int read = length; - if (debug) { - System.out.print("client reading:"); - System.out.println(length); - } + ByteBuffer data = ByteBuffer.allocate(length); + int read = length; + if (debug) { + System.out.print("client reading:"); + System.out.println(length); + } + + data.order(ByteOrder.BIG_ENDIAN); + while(read>0) { + read -= sock.read(data); + } + data.rewind(); - data.order(ByteOrder.BIG_ENDIAN); - while(read>0) { - read -= sock.read(data); - } - data.rewind(); + if (debug) { + PSX.printCommand("chkServe:",command, data); + } - if (debug) { - PSX.printCommand("chkServe:",command, data); - } + int rseq = command.getInt(PSX.LINDA_SEQ_OFFSET); + int mode = command.get(PSX.LINDA_MODE_OFFSET); + PSXReply r = getReply(rseq); + if (r==null) { + System.err.println("Illegal answer sequence."); + return; + } + r.setAnswer(mode,command,data); - int rseq = command.getInt(PSX.LINDA_SEQ_OFFSET); - int mode = command.get(PSX.LINDA_MODE_OFFSET); - PSXReply r = getReply(rseq); - r.setAnswer(mode,command,data); - - if (r.callback != null ) { - r.callback.callback(data); - } + if (r.callback != null ) { + r.callback.callback(data); } } @@ -196,9 +197,6 @@ Integer a; PSXReply r = seqHash.get((a = new Integer(rseq))); - if (r==null) { - System.out.println("hashed reply not found"); - } seqHash.remove(a); return r; }
--- a/src/fdl/IOHandler.java Thu Aug 21 18:46:40 2008 +0900 +++ b/src/fdl/IOHandler.java Fri Aug 22 14:48:41 2008 +0900 @@ -45,36 +45,20 @@ command.clear(); int readsize = PSX.LINDA_HEADER_SIZE; - int count = 0; - + // 読み込み - while(readsize>0) { - if(debug){ - System.out.println("reading command..."+readsize); - } - count = channel.read(command); - if(count==0) throw new IOException(); - if(count < 0) throw new IOException(); - readsize -= count; - } - command.rewind(); + PSX.receive(channel, command, readsize); command.getInt(PSX.LINDA_PACKET_LENGTH_OFFSET); int datalen = command.getInt(PSX.LINDA_DATA_LENGTH_OFFSET); ByteBuffer data = ByteBuffer.allocate(datalen); - int read = datalen; - if (debug) { System.out.println("reading: " +datalen); } data.order(ByteOrder.BIG_ENDIAN); - data.clear(); - while(read>0) { - read -= channel.read(data); - } - data.rewind(); + PSX.receive(channel, data, datalen); command.order(ByteOrder.BIG_ENDIAN); command.rewind();
--- a/src/fdl/MetaLinda.java Thu Aug 21 18:46:40 2008 +0900 +++ b/src/fdl/MetaLinda.java Fri Aug 22 14:48:41 2008 +0900 @@ -14,6 +14,7 @@ import java.io.IOException; import java.nio.ByteBuffer; +import java.util.LinkedList; /** MetaLinda @@ -31,8 +32,7 @@ public FDLindaServ fds; public FederatedLinda fdl=null; public PSXLinda next=null; - private MetaReply replies=new MetaReply(0, 0, ts); - private MetaReply last=replies; + private LinkedList<MetaReply> replies=new LinkedList<MetaReply>(); public MetaLinda(TupleSpace ts,FDLindaServ fds) { this.ts = ts; @@ -49,7 +49,7 @@ } private void addReply(MetaReply r) { - last.next = r; last = r; + replies.add(r); } public PSXReply ck(int id) { @@ -100,14 +100,14 @@ public int sync(long timeout) { fds.checkTuple(timeout); - PSXReply r; - for(r=replies;r!=null&&r.next!=null;r = r.next) { - if (r.next.ready()) { - // ready() may modify replies list - r.next = r.next.next; + // copy replies to avoid insert during r.ready() + LinkedList<MetaReply> list = replies; + replies = new LinkedList<MetaReply>(); + for(MetaReply r:list) { + if (!r.ready()) { + addReply(r); } } - last = (MetaReply)r; if (fdl!=null) { try { fdl.sync(timeout);
--- a/src/fdl/MetaLogEngine.java Thu Aug 21 18:46:40 2008 +0900 +++ b/src/fdl/MetaLogEngine.java Fri Aug 22 14:48:41 2008 +0900 @@ -37,7 +37,8 @@ public void mainLoop() { meta.in(PSX.META_MONITOR,monitor_callback_start); - meta.in(PSX.META_STOP, new PSXCallback() {public void callback(ByteBuffer reply) { running = false;}}); + meta.in(PSX.META_STOP, new PSXCallback() {public void callback(ByteBuffer reply) { + running = false;}}); while(running) meta.sync(); }
--- a/src/fdl/MetaReply.java Thu Aug 21 18:46:40 2008 +0900 +++ b/src/fdl/MetaReply.java Fri Aug 22 14:48:41 2008 +0900 @@ -48,6 +48,7 @@ } break; case PSX.PSX_OUT: + command=PSX.setCommand(PSX.PSX_OUT, id, 0, data.remaining()); ts.Out(null, command, data); return true; case PSX.PSX_UPDATE:
--- a/src/fdl/PSX.java Thu Aug 21 18:46:40 2008 +0900 +++ b/src/fdl/PSX.java Fri Aug 22 14:48:41 2008 +0900 @@ -199,6 +199,21 @@ return (host +":"+port); } } + + static void receive(SocketChannel channel, ByteBuffer command, int readsize) + throws IOException { + int count; + while(readsize>0) { + if(IOHandler.debug){ + System.out.println("reading packet..."+readsize); + } + count = channel.read(command); + if(count==0) throw new IOException(); + if(count < 0) throw new IOException(); + readsize -= count; + } + command.rewind(); + } }
--- a/src/fdl/TupleSpace.java Thu Aug 21 18:46:40 2008 +0900 +++ b/src/fdl/TupleSpace.java Fri Aug 22 14:48:41 2008 +0900 @@ -153,6 +153,9 @@ if (tuple!=null) { //send ByteBuffer sendcommand = tuple.getCommand(); + if (tuple.getSeq()==0) { + System.err.println("Illegal sequence in answer."); + } ByteBuffer senddata = tuple.getData(); PSX.send(key,sendcommand, senddata); } @@ -170,7 +173,7 @@ id = (int)idc; int seq = command.getInt(PSX.LINDA_SEQ_OFFSET); command.rewind(); - + if (debug) System.out.println("*** "+(char)mode+" command : id = "+ id +" ***\n"); hook.inHook(key,id,seq,(char)mode); @@ -183,14 +186,18 @@ } if (tuple != null && (tuple.mode == 'o')){ + tuple.seq = seq; tuple = tupleIsAvailable(command, mode, tuple, id, temp); } else { - tuple = setupWait(key, command, mode, tuple, id); + tuple = setupWait(key, command, mode, tuple, seq, id); } return tuple; } public ByteBuffer IN(int id,int mode, ByteBuffer command) { + /** + * IN for MetaLinda (no wait); + */ Tuple tuple,temp=null; tuple = tuple_space[id]; @@ -202,6 +209,7 @@ if (tuple != null && (tuple.mode == 'o')){ ByteBuffer data = tuple.data; + tuple.seq = 0; tupleIsAvailable(command, mode, tuple, id, temp); return data; } @@ -210,9 +218,7 @@ private Tuple tupleIsAvailable(ByteBuffer command, int mode, Tuple tuple, int id, Tuple temp) { - int seq = command.getInt(PSX.LINDA_SEQ_OFFSET); - command.rewind(); - tuple.setCommand('a', seq); + tuple.setCommand('a', tuple.seq); if(debug){ int sendsize = tuple.getdataLength()+PSX.LINDA_HEADER_SIZE; @@ -231,9 +237,8 @@ } private Tuple setupWait(SelectionKey key, ByteBuffer command, int mode, - Tuple tuple, int id) { + Tuple tuple, int seq, int id) { if(tuple == null) { - //ServerSocketChannel sc = (ServerSocketChannel)key.channel(); tuple = tuple_space[id] = new Tuple((SocketChannel)key.channel()); tuple.next = null; }else { @@ -244,16 +249,15 @@ } tuple.setMode(mode); - int seq2 = command.getInt(PSX.LINDA_SEQ_OFFSET); command.rewind(); - tuple.setSeq(seq2); + tuple.setSeq(seq); tuple.ch = (SocketChannel) key.channel(); ByteBuffer buff = ByteBuffer.allocate(0); tuple.setData(buff); tuple = null; if(debug){ - System.out.println("data inserted insert seq = "+seq2 +", id = "+id); + System.out.println("wait inserted seq = "+seq +", id = "+id); } return tuple; }