Mercurial > hg > FederatedLinda
changeset 15:aced4bfc15af
add Meta Linda Interface for debugger.
author | kono |
---|---|
date | Sun, 17 Aug 2008 20:24:24 +0900 |
parents | 006015077e99 |
children | cccf34386cad |
files | src/fdl/AcceptHandler.java src/fdl/ComDebug.java src/fdl/FDLindaServ.java src/fdl/FederatedLinda.java src/fdl/IOHandler.java src/fdl/IOParam.java src/fdl/MetaLinda.java src/fdl/PSXLinda.java src/fdl/PSXLindaInterface.java src/fdl/PSXQueue.java src/fdl/PSXQueueInterface.java src/fdl/PSXReply.java |
diffstat | 12 files changed, 349 insertions(+), 259 deletions(-) [+] |
line wrap: on
line diff
--- a/src/fdl/AcceptHandler.java Mon Aug 11 22:24:57 2008 +0900 +++ b/src/fdl/AcceptHandler.java Sun Aug 17 20:24:24 2008 +0900 @@ -9,10 +9,8 @@ import java.nio.channels.SelectionKey; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; -//import java.util.LinkedList; public class AcceptHandler implements TupleHandler, PSXQueueInterface { - //public Hashtable<Integer, Tuple> tuple_space; public Tuple[] tuple_space; public static int user = 0;
--- a/src/fdl/ComDebug.java Mon Aug 11 22:24:57 2008 +0900 +++ b/src/fdl/ComDebug.java Sun Aug 17 20:24:24 2008 +0900 @@ -12,6 +12,14 @@ import java.util.Iterator; import java.util.LinkedList; +/* + * こういう実装ではなくて、ldserv 中に、メタ + * プロトコル用のエンジンを作った方が良い。そのためには、ldserve 内部 + * から、PSX Linda APIを呼び出せる仕組みが必要。Thread にすると、 + * 同期が面倒なので、Thread にしたくないが、Engine の書き方は、 + * Main Loop 的にしたい。少し変だが、Meta Engine 側の PSX sync から + * ldserve のMain Loop を呼び出すか? + */ public class ComDebug implements PSXQueueInterface{ static final boolean debug = true;
--- a/src/fdl/FDLindaServ.java Mon Aug 11 22:24:57 2008 +0900 +++ b/src/fdl/FDLindaServ.java Sun Aug 17 20:24:24 2008 +0900 @@ -5,11 +5,12 @@ import java.net.InetAddress; import java.net.InetSocketAddress; //import java.nio.ByteOrder; +import java.nio.channels.ClosedChannelException; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; +import java.nio.channels.spi.AbstractSelector; import java.nio.channels.spi.SelectorProvider; -import java.util.Iterator; public class FDLindaServ implements PSXQueueInterface { static final int MAX_REQ = 1; @@ -18,35 +19,50 @@ static final int MAX_TUPLE = 65536; static final int DEF_PORT = 10000; //public static final int TIMEOUT = 5*1000; - public static Tuple[] tuple_space; + public Tuple[] tuple_space; + public int port = DEF_PORT; + private AbstractSelector selector; + private ServerSocketChannel ssChannel; - @SuppressWarnings("unchecked") - public static void main(final String[] args) throws IOException { - @SuppressWarnings("unused") + public static void main(final String[] args) { final String usages = "usage: FDLindaServ [-p port]"; - int port = DEF_PORT; //バイトオーダー確認 //System.out.println(ByteOrder.nativeOrder().toString()); - tuple_space = new Tuple[MAX_TUPLE]; - + int port = DEF_PORT; //引数判定 try { for (int i=0; i<args.length; ++i) { if("-p".equals(args[i])) { port = Integer.parseInt(args[++i]); - } else { - System.err.println(usages); - } + } } } catch (NumberFormatException e) { - e.printStackTrace(); + System.err.println(usages); + return; + } + try { + FDLindaServ serv; + serv = new FDLindaServ(port); + serv.mainLoop(); + } catch (IOException e) { + System.err.println("Server Communiation Problem."); } - //セレクタを生成 - Selector selector = SelectorProvider.provider().openSelector(); - try { + } + + private void mainLoop() { + while(true) { + checkTuple(selector); + } + } + + public FDLindaServ(int port) throws IOException { + this.port = port; + tuple_space = new Tuple[MAX_TUPLE]; + //セレクタを生成 + selector = SelectorProvider.provider().openSelector(); //ソケット・チャネルを生成・設定 - ServerSocketChannel ssChannel = SelectorProvider.provider().openServerSocketChannel(); + ssChannel = SelectorProvider.provider().openServerSocketChannel(); InetSocketAddress address = new InetSocketAddress(InetAddress.getLocalHost(), port); //ssChannel.socket().bind(new InetSocketAddress(port)); ssChannel.socket().bind(address); @@ -58,34 +74,21 @@ //ssChannel.register(selector, ssChannel.validOps(), new AcceptHandler(tuple_space)); ssChannel.register(selector, SelectionKey.OP_ACCEPT, new AcceptHandler(tuple_space)); - // セレクタによる監視 - while (selector.keys().size() > 0) { - @SuppressWarnings("unused") - int KeyCount = selector.select(); - // Iteratorを用意 - Iterator it = selector.selectedKeys().iterator(); - while (it.hasNext()) { - // SelectionKeyを取り出す - SelectionKey selKey = (SelectionKey)it.next(); - - // 操作に対する処理が行われていると認識させるためにremoveする - it.remove(); + + } - TupleHandler handler = (TupleHandler)selKey.attachment(); - handler.handle(selKey); - } - - } - } catch (IOException exc) { - exc.printStackTrace(); - } /*finally { - try { - for (SelectionKey key: selector.keys()) { - key.channel().close(); - } - } catch(IOException ex) { - ex.printStackTrace(); - }*/ - //} + public void checkTuple(Selector selector) { + // セレクタによる監視 + try { + if (selector.select()>0) { + for(SelectionKey s:selector.selectedKeys()) { + TupleHandler handler = (TupleHandler)s.attachment(); + handler.handle(s); + } + } + } catch (ClosedChannelException e) { + // we have to do something... + } catch (IOException e) { + } } }
--- a/src/fdl/FederatedLinda.java Mon Aug 11 22:24:57 2008 +0900 +++ b/src/fdl/FederatedLinda.java Sun Aug 17 20:24:24 2008 +0900 @@ -349,7 +349,7 @@ }***/ try { PSXReply r = seqHash.get((a = new Integer(rseq))); - seqHash.put(a, null); + seqHash.put(a, null); // should be clear or delete if (debug) { System.out.print("hash value = "); System.out.println(a.hashCode());
--- a/src/fdl/IOHandler.java Mon Aug 11 22:24:57 2008 +0900 +++ b/src/fdl/IOHandler.java Sun Aug 17 20:24:24 2008 +0900 @@ -25,11 +25,6 @@ if (key.isReadable()) { read(key); } - - // 書き込み可であれば,書き込みを行う - /*if (key.isWritable() && key.isValid()) { - write(key); - }*/ } private void read(SelectionKey key) @@ -79,28 +74,18 @@ } data.rewind(); - /* - static final int LINDA_PACKET_LENGTH_OFFSET =0; - static final int LINDA_MODE_OFFSET =0+4; - static final int LINDA_ID_OFFSET =1+4; - static final int LINDA_SEQ_OFFSET =3+4; - static final int LINDA_DATA_LENGTH_OFFSET =7+4; - static final int LINDA_HEADER_SIZE =12+4; - */ command.order(ByteOrder.BIG_ENDIAN); command.rewind(); - - /*** print data ***/ - char id = (char)command.getShort(LINDA_ID_OFFSET); - System.out.println("LENGTH:"+command.getInt(LINDA_PACKET_LENGTH_OFFSET)+" "+ - "MODE:"+(char)command.get(LINDA_MODE_OFFSET)+" "+ - "ID:"+(int)id+" "+ - "SEQ:"+command.getInt(LINDA_SEQ_OFFSET)+" "); - //"SEQ:"+command.getInt(LINDA_SEQ_OFFSET)+" "+ - //"DATA LENGTH:"+command.getInt(LINDA_DATA_LENGTH_OFFSET)+" "); - //System.out.println("DATA:"+data); - //データ処理 - command.rewind(); + + if (debug) { + /*** print data ***/ + char id = (char)command.getShort(LINDA_ID_OFFSET); + System.out.println("LENGTH:"+command.getInt(LINDA_PACKET_LENGTH_OFFSET)+" "+ + "MODE:"+(char)command.get(LINDA_MODE_OFFSET)+" "+ + "ID:"+(int)id+" "+ + "SEQ:"+command.getInt(LINDA_SEQ_OFFSET)+" "); + command.rewind(); + } manager_run(key, command, data, len); key.interestOps(key.interestOps() | SelectionKey.OP_READ );
--- a/src/fdl/IOParam.java Mon Aug 11 22:24:57 2008 +0900 +++ b/src/fdl/IOParam.java Sun Aug 17 20:24:24 2008 +0900 @@ -294,35 +294,7 @@ public void send(SelectionKey key, ByteBuffer command, ByteBuffer data) throws IOException { SocketChannel ch = (SocketChannel)key.channel(); - if (debug) { - if (command == null) { - System.out.println("Manager_run: command is null"); - } - if (data == null) { - System.out.println("Manager_run: data is null"); - } - } - int send_size = LINDA_HEADER_SIZE; - int count = 0; - - //command Send - command.rewind(); - while(send_size > 0){ - count = ch.write(command); - if(count < 0){ - System.out.println("Write Falied! close ch:"+ch); - ch.close(); - return; - } - send_size -= count; - } - - //data Send - data.rewind(); - if(data != null) { - data.rewind(); - ch.write(data); - } + send(ch,command,data); } private String getdataString(ByteBuffer data) {
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/fdl/MetaLinda.java Sun Aug 17 20:24:24 2008 +0900 @@ -0,0 +1,87 @@ + +/* + * @(#)MetaLinda.java 1.1 06/04/01 + * + * Copyright 2008 Shinji KONO + * + + Meta Lidna + Trasport layer of Meta Linda API + + */ + +package fdl; + +import java.nio.ByteBuffer; + +/** + PSXLinda + * + * @author Shinji Kono + * + + meta tuple interface in Linda Server + + */ + +public class MetaLinda implements PSXQueueInterface,PSXLindaInterface { + + + public PSXLinda next; + + public MetaLinda(FederatedLinda _fdl,int _mytsid,String _host,int _port) { + + } + + public PSXReply in(int id) { + return null; + } + + public void in(int id, PSXCallback callback) { + } + + public PSXReply ck(int id) { + PSXReply r = null; + return r; + } + + public void ck(int id, PSXCallback callback) { + } + + public PSXReply out(int id, ByteBuffer data,int size) { + return null; + } + + public PSXReply update(int id, ByteBuffer data,int size) { + return null; + } + + public void update(int id, ByteBuffer data,int size,PSXCallback callback) { + } + + public PSXReply rd(int id) { + return null; + } + + public void rd(int id, PSXCallback callback) { + } + + public PSXLinda add(PSXLinda linda) { + next = linda; + return null; + } + + public void send(ByteBuffer command,ByteBuffer data) { + } + + public int sync() { + return 0; + } + + public int sync(long mtime) { + return 0; + } +} + + +/* end */
--- a/src/fdl/PSXLinda.java Mon Aug 11 22:24:57 2008 +0900 +++ b/src/fdl/PSXLinda.java Sun Aug 17 20:24:24 2008 +0900 @@ -31,141 +31,141 @@ Initialize connection channel for a tuple space one instance for each Tuple space connection - + */ -public class PSXLinda implements PSXQueueInterface { - private FederatedLinda fdl; - private SocketChannel socketChannel; - public String host; - public int port; - public int mytsid; - public PSXLinda next; - static final boolean debug = false; - - public PSXLinda(FederatedLinda _fdl,int _mytsid,String _host,int _port) - throws IOException { - Socket socket; - host = _host; - port = _port; - mytsid = _mytsid; - fdl = _fdl; +public class PSXLinda implements PSXQueueInterface,PSXLindaInterface { + private FederatedLinda fdl; + private SocketChannel socketChannel; + public String host; + public int port; + public int mytsid; + public PSXLinda next; + static final boolean debug = false; - socketChannel = SocketChannel.open(); - socketChannel.configureBlocking(false); + public PSXLinda(FederatedLinda _fdl,int _mytsid,String _host,int _port) + throws IOException { + Socket socket; + host = _host; + port = _port; + mytsid = _mytsid; + fdl = _fdl; + + socketChannel = SocketChannel.open(); + socketChannel.configureBlocking(false); - socket = socketChannel.socket(); - // socket.setReuseAddress(true); - socket.setTcpNoDelay(true); + socket = socketChannel.socket(); + // socket.setReuseAddress(true); + socket.setTcpNoDelay(true); - // can be blocked (thread required?) - //socketChannel.connect(new InetSocketAddress(InetAddress.getLocalHost(), _port)); - socketChannel.connect(new InetSocketAddress(_host, _port)); - while (! socketChannel.finishConnect()) { - if (debug) { - System.out.println("waiting for connect"); + // can be blocked (thread required?) + //socketChannel.connect(new InetSocketAddress(InetAddress.getLocalHost(), _port)); + socketChannel.connect(new InetSocketAddress(_host, _port)); + while (! socketChannel.finishConnect()) { + if (debug) { + System.out.println("waiting for connect"); + } + if (false) { + try { + wait(2000); + } catch (InterruptedException e) { + } + } } -if (false) { - try { - wait(2000); - } catch (InterruptedException e) { - } -} - } - socketChannel.register(fdl.selector(), SelectionKey.OP_READ); + socketChannel.register(fdl.selector(), SelectionKey.OP_READ); - checkConnect("PSXLinda"); - } + checkConnect("PSXLinda"); + } - protected void finalize() { - if (socketChannel != null) { - try { - socketChannel.close(); - } catch (IOException e) { - } + protected void finalize() { + if (socketChannel != null) { + try { + socketChannel.close(); + } catch (IOException e) { + } + } } - } - private void checkConnect(String s) { - System.out.print("Connected:"); - System.out.print(s); - System.out.print(": "); - System.out.println(socketChannel.isConnected()); - } + private void checkConnect(String s) { + System.out.print("Connected:"); + System.out.print(s); + System.out.print(": "); + System.out.println(socketChannel.isConnected()); + } - public PSXReply in(int id) { - PSXReply r = fdl.psx_queue(this, id, null, 0, PSX_IN, (PSXCallback)null); - return r; - } + public PSXReply in(int id) { + PSXReply r = fdl.psx_queue(this, id, null, 0, PSX_IN, (PSXCallback)null); + return r; + } - public void in(int id, PSXCallback callback) { - fdl.psx_queue(this, id, null, 0, PSX_IN, callback); - } + public void in(int id, PSXCallback callback) { + fdl.psx_queue(this, id, null, 0, PSX_IN, callback); + } - public PSXReply ck(int id) { - PSXReply r = fdl.psx_queue(this, id, null, 0, PSX_IN, null); - return r; - } + public PSXReply ck(int id) { + PSXReply r = fdl.psx_queue(this, id, null, 0, PSX_IN, null); + return r; + } - public void ck(int id, PSXCallback callback) { - fdl.psx_queue(this, id, null, 0, PSX_IN, callback); - } + public void ck(int id, PSXCallback callback) { + fdl.psx_queue(this, id, null, 0, PSX_IN, callback); + } - public PSXReply out(int id, ByteBuffer data,int size) { - PSXReply r = fdl.psx_queue(this, id, data, size, PSX_OUT, null); - return r; - } + public PSXReply out(int id, ByteBuffer data,int size) { + PSXReply r = fdl.psx_queue(this, id, data, size, PSX_OUT, null); + return r; + } - public PSXReply update(int id, ByteBuffer data,int size) { - PSXReply r = fdl.psx_queue(this, id, data, size, PSX_UPDATE, null); - return r; - } + public PSXReply update(int id, ByteBuffer data,int size) { + PSXReply r = fdl.psx_queue(this, id, data, size, PSX_UPDATE, null); + return r; + } - public void update(int id, ByteBuffer data,int size,PSXCallback callback) { - fdl.psx_queue(this, id, data, size, PSX_UPDATE, callback); - } + public void update(int id, ByteBuffer data,int size,PSXCallback callback) { + fdl.psx_queue(this, id, data, size, PSX_UPDATE, callback); + } - public PSXReply rd(int id) { - PSXReply r = fdl.psx_queue(this, id, null, 0, PSX_RD, null); - return r; - } + public PSXReply rd(int id) { + PSXReply r = fdl.psx_queue(this, id, null, 0, PSX_RD, null); + return r; + } - public void rd(int id, PSXCallback callback) { - fdl.psx_queue(this, id, null, 0, PSX_RD, callback); - } + public void rd(int id, PSXCallback callback) { + fdl.psx_queue(this, id, null, 0, PSX_RD, callback); + } - public PSXLinda add(PSXLinda linda) { - next = linda; - return this; - } + public PSXLinda add(PSXLinda linda) { + next = linda; + return this; + } - public void send(ByteBuffer command,ByteBuffer data) + public void send(ByteBuffer command,ByteBuffer data) throws IOException { - if (debug) { - checkConnect("send"); - if (command == null) { - System.out.println("PSXLinda:command is null"); - } - if (data == null) { - System.out.println("PSXLinda:data is null"); - } - } - socketChannel.write(command); - if (data != null) - socketChannel.write(data); - } + if (debug) { + checkConnect("send"); + if (command == null) { + System.out.println("PSXLinda:command is null"); + } + if (data == null) { + System.out.println("PSXLinda:data is null"); + } + } + socketChannel.write(command); + if (data != null) + socketChannel.write(data); + } - public int sync() - throws IOException { - return fdl.sync(); - } + public int sync() + throws IOException { + return fdl.sync(); + } - public int sync(long mtime) - throws IOException { - return fdl.sync(mtime); - } + public int sync(long mtime) + throws IOException { + return fdl.sync(mtime); + } }
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/fdl/PSXLindaInterface.java Sun Aug 17 20:24:24 2008 +0900 @@ -0,0 +1,34 @@ +package fdl; + +import java.io.IOException; +import java.nio.ByteBuffer; + +public interface PSXLindaInterface { + + + public PSXReply in(int id) ; + + public void in(int id, PSXCallback callback); + + public PSXReply ck(int id) ; + + public void ck(int id, PSXCallback callback) ; + + public PSXReply out(int id, ByteBuffer data,int size) ; + + public PSXReply update(int id, ByteBuffer data,int size) ; + + public void update(int id, ByteBuffer data,int size,PSXCallback callback) ; + + public PSXReply rd(int id) ; + + public void rd(int id, PSXCallback callback) ; + + public PSXLinda add(PSXLinda linda) ; + + public void send(ByteBuffer command,ByteBuffer data) throws IOException ; + + public int sync() throws IOException ; + + public int sync(long mtime) throws IOException ; +}
--- a/src/fdl/PSXQueue.java Mon Aug 11 22:24:57 2008 +0900 +++ b/src/fdl/PSXQueue.java Sun Aug 17 20:24:24 2008 +0900 @@ -15,7 +15,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.nio.ByteOrder; -// import java.nio.channels.*; +//import java.nio.channels.*; /** PSXQueue @@ -24,50 +24,50 @@ */ public class PSXQueue implements PSXQueueInterface { - public int tspace_id; - public int id; - public int mode; - public int size; - public ByteBuffer data; - public ByteBuffer command; - public int seq; - public PSXCallback callback; - public PSXQueue next; - public PSXLinda linda; + public int tspace_id; + public int id; + public int mode; + public int size; + public ByteBuffer data; + public ByteBuffer command; + public int seq; + public PSXCallback callback; + public PSXQueue next; + public PSXLinda linda; - public PSXQueue( PSXLinda _linda,int _id,int _mode,ByteBuffer _data,int _size,PSXCallback _callback) { - linda = _linda; - id = _id; - data = _data; - size = _size; - mode = _mode; - callback = _callback; - setCommand(); - } + public PSXQueue( PSXLinda _linda,int _id,int _mode,ByteBuffer _data,int _size,PSXCallback _callback) { + linda = _linda; + id = _id; + data = _data; + size = _size; + mode = _mode; + callback = _callback; + setCommand(); + } - private void setCommand() { - command = ByteBuffer.allocate(LINDA_HEADER_SIZE); - command.order(ByteOrder.BIG_ENDIAN); + private void setCommand() { + command = ByteBuffer.allocate(LINDA_HEADER_SIZE); + command.order(ByteOrder.BIG_ENDIAN); - command.putInt(LINDA_PACKET_LENGTH_OFFSET, + command.putInt(LINDA_PACKET_LENGTH_OFFSET, size+LINDA_HEADER_SIZE-INT_SIZE); - command.put(LINDA_MODE_OFFSET,(byte)mode); - command.putShort(LINDA_ID_OFFSET,(short)id); - command.putInt(LINDA_SEQ_OFFSET,seq); - command.putInt(LINDA_DATA_LENGTH_OFFSET,size); - } + command.put(LINDA_MODE_OFFSET,(byte)mode); + command.putShort(LINDA_ID_OFFSET,(short)id); + command.putInt(LINDA_SEQ_OFFSET,seq); + command.putInt(LINDA_DATA_LENGTH_OFFSET,size); + } - public void setSeq(int _seq) { - seq = _seq; - command.putInt(LINDA_SEQ_OFFSET,seq); - } + public void setSeq(int _seq) { + seq = _seq; + command.putInt(LINDA_SEQ_OFFSET,seq); + } - public void Send() - throws IOException { - if (command!=null) command.rewind(); - if (data!=null) data.rewind(); - linda.send(command,data); - } + public void Send() + throws IOException { + if (command!=null) command.rewind(); + if (data!=null) data.rewind(); + linda.send(command,data); + } } /* end */
--- a/src/fdl/PSXQueueInterface.java Mon Aug 11 22:24:57 2008 +0900 +++ b/src/fdl/PSXQueueInterface.java Sun Aug 17 20:24:24 2008 +0900 @@ -12,11 +12,12 @@ package fdl; +import java.nio.ByteBuffer; + /** PSXQueueInterface - Iterator */ public interface PSXQueueInterface { @@ -43,6 +44,8 @@ static final int PRIVILEGED_ID_START = 32768; static final int PRIVILEGED_ID_END = 36864; + + } /* end */
--- a/src/fdl/PSXReply.java Mon Aug 11 22:24:57 2008 +0900 +++ b/src/fdl/PSXReply.java Sun Aug 17 20:24:24 2008 +0900 @@ -34,15 +34,15 @@ } public void setAnswer(int _mode, ByteBuffer _command,ByteBuffer _data) { - mode = _mode; - data = _data; - command = _command; - if (debug) { -System.out.print("setAnswer mode:"); -System.out.println(mode); -System.out.print("setAnswer bool:"); -System.out.println(mode==PSX_ANSWER); - } + mode = _mode; + data = _data; + command = _command; + if (debug) { + System.out.print("setAnswer mode:"); + System.out.println(mode); + System.out.print("setAnswer bool:"); + System.out.println(mode==PSX_ANSWER); + } } public int getMode() {