Mercurial > hg > FederatedLinda
changeset 39:81abceebc869
*** empty log message ***
author | kono |
---|---|
date | Mon, 25 Aug 2008 14:01:19 +0900 |
parents | 9a0cb612f576 |
children | 046feb56a196 |
files | src/fdl/FDLindaServ.java src/fdl/FederatedLinda.java src/fdl/IOHandler.java src/fdl/MetaLinda.java src/fdl/PSXLindaImpl.java |
diffstat | 5 files changed, 86 insertions(+), 78 deletions(-) [+] |
line wrap: on
line diff
--- a/src/fdl/FDLindaServ.java Sun Aug 24 21:28:04 2008 +0900 +++ b/src/fdl/FDLindaServ.java Mon Aug 25 14:01:19 2008 +0900 @@ -20,7 +20,7 @@ static final int FAIL = (-1); static final int DEF_PORT = 10000; public int port = DEF_PORT; - private AbstractSelector selector; + AbstractSelector selector; private ServerSocketChannel ssChannel; public TupleSpace tupleSpace; public MetaEngine me;
--- a/src/fdl/FederatedLinda.java Sun Aug 24 21:28:04 2008 +0900 +++ b/src/fdl/FederatedLinda.java Mon Aug 25 14:01:19 2008 +0900 @@ -69,11 +69,19 @@ public PSXLinda open(String _host,int _port) throws IOException { tid++; - PSXLindaImpl newlinda = new PSXLindaImpl(this,tid,_host,_port); + PSXLindaImpl newlinda = new PSXLindaImpl(this,selector,tid,_host,_port); linda = newlinda.add(linda); return linda; } + PSXLinda openFromMetaLinda(MetaLinda metaLinda, String _host, int _port) + throws IOException { + tid++; + PSXLindaImpl newlinda = new PSXLindaImpl(this,metaLinda.fds.selector,tid,_host,_port); + linda = newlinda.add(linda); + return newlinda; + } + public PSXReply psx_queue(PSXLinda linda, int id, ByteBuffer s, int mode, PSXCallback callback) { PSXQueue c = new PSXQueue(linda,id,mode,s,callback); @@ -129,12 +137,7 @@ public int sync(long mtimeout) throws IOException { int key_num = 0; - while (q_top != null){ - PSXQueue c = q_top; - c.send(); - q_top = c.next; - qsize--; - } + queueExec(); try { if (selector.select(mtimeout)>0) { @@ -144,7 +147,8 @@ try { if (!s.isReadable()) throw new IOException(); - chkServe((SocketChannel)s.channel()); + TupleHandler handle = (TupleHandler)s.attachment(); + handle.handle(s); } catch (IOException e) { s.cancel(); System.err.println(""+s.channel()+" is closed."); @@ -159,39 +163,24 @@ return key_num; } - - // should be IOHandler.handler method. - private void chkServe(SocketChannel sock) throws IOException { - ByteBuffer command = ByteBuffer.allocate(PSX.LINDA_HEADER_SIZE); - command.order(ByteOrder.BIG_ENDIAN); - ByteBuffer data = PSX.receivePacket(sock, command); - - if (debug) { - PSX.printCommand("chkServe:",command, data); - } - - int rseq = command.getInt(PSX.LINDA_SEQ_OFFSET); - int mode = command.get(PSX.LINDA_MODE_OFFSET); - PSXReply r = getReply(rseq); - if (r==null) { - System.err.println("Illegal answer sequence."); - return; - } - r.setAnswer(mode,command,data); - - if (r.callback != null ) { - r.callback.callback(data); + private void queueExec() { + while (q_top != null){ + PSXQueue c = q_top; + c.send(); + q_top = c.next; + qsize--; } } - - private PSXReply getReply(int rseq) { + + PSXReply getReply(int rseq) { Integer a; PSXReply r = seqHash.get((a = new Integer(rseq))); seqHash.remove(a); return r; } + } /* end */
--- a/src/fdl/IOHandler.java Sun Aug 24 21:28:04 2008 +0900 +++ b/src/fdl/IOHandler.java Mon Aug 25 14:01:19 2008 +0900 @@ -28,7 +28,15 @@ // 書き込み可であれば,読み込みを行う if (key.isReadable()) { try { - read(key); + SocketChannel channel = (SocketChannel)key.channel(); + if (ch!=channel) { + System.err.println("Wrong socket on IOHandler"); + } + // 読み込み用のバッファの生成 + ByteBuffer command = ByteBuffer.allocate(PSX.LINDA_HEADER_SIZE); + command.order(ByteOrder.BIG_ENDIAN); + ByteBuffer data = PSX.receivePacket(channel,command); + manager_run(key, command, data); } catch (ClosedChannelException e) { key.cancel(); tupleSpace.hook.closeHook(key); @@ -39,26 +47,6 @@ } } - void read(SelectionKey key) - throws ClosedChannelException, IOException { - SocketChannel channel = (SocketChannel)key.channel(); - if (ch!=channel) { - System.err.println("Wrong socket on IOHandler"); - } - - // 読み込み用のバッファの生成 - ByteBuffer command = ByteBuffer.allocate(PSX.LINDA_HEADER_SIZE); - command.order(ByteOrder.BIG_ENDIAN); - - ByteBuffer data = PSX.receivePacket(channel, command); - - if (debug) { - PSX.printData("IOHandler:",command); - } - manager_run(key, command, data); - // assert((key.interestOps()& SelectionKey.OP_READ) !=0); - } - public void manager_run(SelectionKey key, ByteBuffer command, ByteBuffer data) throws IOException { command.order(ByteOrder.BIG_ENDIAN); int mode = command.get(PSX.LINDA_MODE_OFFSET);
--- a/src/fdl/MetaLinda.java Sun Aug 24 21:28:04 2008 +0900 +++ b/src/fdl/MetaLinda.java Mon Aug 25 14:01:19 2008 +0900 @@ -30,7 +30,7 @@ public TupleSpace ts; public FDLindaServ fds; - public FederatedLinda fdl=null; + public FederatedLinda fdl=FederatedLinda.init(); public PSXLinda next=null; private LinkedList<MetaReply> replies=new LinkedList<MetaReply>(); @@ -39,6 +39,11 @@ this.fds = fds; } + public PSXLinda open(String _host,int _port) + throws IOException { + return fdl.openFromMetaLinda(this, _host, _port); + } + public PSXReply in(int id) { return null; } @@ -99,31 +104,24 @@ } public int sync(long timeout) { - if (fdl!=null) { - try { - fdl.sync(timeout); - } catch (IOException e) { - e.printStackTrace(); - } - } - fds.checkTuple(timeout); // should have done in fdl selector + fds.checkTuple(timeout); // fdl sync is also handled here /* * r.callback() may call meta.sync() and modifies the * replies queue. Do current size of queue only. The * rest is checked on the next sync call including * the recursive case. */ - int count = replies.size(); - while(count-->0) { - MetaReply r = replies.poll(); - // previous call back may call this sync and make - // replies shorter. - if (r==null) break; - if (r.ready()) { - } else { - addReply(r); - } + int count = replies.size(); + while(count-->0) { + MetaReply r = replies.poll(); + // previous call back may call this sync and make + // replies shorter. + if (r==null) break; + if (r.ready()) { + } else { + addReply(r); } + } return 0; }
--- a/src/fdl/PSXLindaImpl.java Sun Aug 24 21:28:04 2008 +0900 +++ b/src/fdl/PSXLindaImpl.java Mon Aug 25 14:01:19 2008 +0900 @@ -17,7 +17,10 @@ import java.net.InetSocketAddress; import java.net.Socket; import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.nio.channels.ClosedChannelException; import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; import java.nio.channels.SocketChannel; @@ -34,7 +37,7 @@ */ -public class PSXLindaImpl implements PSXLinda { +public class PSXLindaImpl implements PSXLinda,TupleHandler { private FederatedLinda fdl; SocketChannel socketChannel; public String host; @@ -43,7 +46,7 @@ public PSXLinda next; static final boolean debug = false; - public PSXLindaImpl(FederatedLinda _fdl,int _mytsid,String _host,int _port) + public PSXLindaImpl(FederatedLinda _fdl,Selector selector,int _mytsid,String _host,int _port) throws IOException { host = _host; port = _port; @@ -70,12 +73,40 @@ } } System.err.println("Linda client connect to "+socketChannel); - - socketChannel.register(fdl.selector(), SelectionKey.OP_READ); - + socketChannel.register(selector,SelectionKey.OP_READ,this); checkConnect("PSXLinda"); } + + + public void handle(SelectionKey key) throws ClosedChannelException, + IOException { + SocketChannel sock = (SocketChannel)key.channel(); + if (sock!=socketChannel) { + System.err.println("wrong socket on PSXLindaImple."); + } + ByteBuffer command = ByteBuffer.allocate(PSX.LINDA_HEADER_SIZE); + command.order(ByteOrder.BIG_ENDIAN); + ByteBuffer data = PSX.receivePacket(sock, command); + + if (debug) { + PSX.printCommand("chkServe:",command, data); + } + + int rseq = command.getInt(PSX.LINDA_SEQ_OFFSET); + int mode = command.get(PSX.LINDA_MODE_OFFSET); + PSXReply r = fdl.getReply(rseq); + if (r==null) { + System.err.println("Illegal answer sequence."); + return; + } + r.setAnswer(mode,command,data); + + if (r.callback != null ) { + r.callback.callback(data); + } + } + protected void finalize() { if (socketChannel != null) { @@ -152,6 +183,8 @@ public void send(ByteBuffer command, ByteBuffer data) { PSX.send(socketChannel, command, data); } + + }