Mercurial > hg > FederatedLinda
changeset 19:0243987383b7
Meta Protocol Engine and sample implementation of event logger.
ComDebug_Client needs fixes.
author | kono |
---|---|
date | Tue, 19 Aug 2008 05:33:32 +0900 |
parents | 2cbd98257d61 |
children | a0fd653d1121 |
files | src/fdl/AcceptHandler.java src/fdl/ComDebug_Client.java src/fdl/CommDebugHook.java src/fdl/FDLindaServ.java src/fdl/FederatedLinda.java src/fdl/IOHandler.java src/fdl/IOHandlerHook.java src/fdl/MetaEngine.java src/fdl/MetaLinda.java src/fdl/MetaReply.java src/fdl/NullIOHandlerHook.java src/fdl/PSX.java src/fdl/PSXLinda.java src/fdl/PSXLindaInterface.java src/fdl/TestPSXLinda.java src/fdl/TupleSpace.java |
diffstat | 16 files changed, 587 insertions(+), 429 deletions(-) [+] |
line wrap: on
line diff
--- a/src/fdl/AcceptHandler.java Mon Aug 18 09:36:13 2008 +0900 +++ b/src/fdl/AcceptHandler.java Tue Aug 19 05:33:32 2008 +0900 @@ -1,73 +1,37 @@ package fdl; import java.io.IOException; -//import java.net.InetAddress; -//import java.net.InetSocketAddress; -//import java.net.UnknownHostException; -import java.nio.ByteBuffer; import java.nio.channels.ClosedChannelException; import java.nio.channels.SelectionKey; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; public class AcceptHandler implements TupleHandler { - public Tuple[] tuple_space; + + public TupleSpace tupleSpace; - public static int user = 0; - public byte userchar[] = new byte[2]; - public final int MAX_TUPLE = TupleHandler.MAX_TUPLE; - public Tuple tmpTuple; - - public AcceptHandler(Tuple[] _tuple_space) { + public AcceptHandler(TupleSpace tupleSpace) { // 読みこんだデータを格納するためのリストの初期化 - tuple_space = _tuple_space; + this.tupleSpace = tupleSpace; } public void handle(SelectionKey key) throws ClosedChannelException, IOException { ServerSocketChannel serverChannel = (ServerSocketChannel)key.channel(); - + // アクセプト処理 SocketChannel channel = serverChannel.accept(); channel.configureBlocking(false); System.out.println("Server: accepted "+channel.socket()); - //初期生成 - if((tmpTuple = tuple_space[MAX_TUPLE-1]) == null) { - tmpTuple = tuple_space[MAX_TUPLE-1] = new Tuple(); - tmpTuple.next = null; - } else { - while(tmpTuple.next != null) tmpTuple = tmpTuple.next; - tmpTuple.next = new Tuple(); - tmpTuple = tmpTuple.next; - tmpTuple.next = null; - } - - user++; - - ByteBuffer data = ByteBuffer.allocate(2); - data.clear(); - userchar[0] = (byte) (user/10 + '0'); - userchar[1] = (byte) (user%10 + '0'); - - data.put(userchar[0]); - data.put(userchar[1]); - - data.rewind(); - tmpTuple.setData(data); - //Tuple - int id = MAX_TUPLE-1; - tmpTuple.setTuple('o', id, 0, data.limit(), data); - - - System.out.println("Server: assign id "+user); + tupleSpace.newUser(); // 入出力用のハンドラを生成し,アタッチする // 監視する操作は読み込みのみ - TupleSpace handler = new IOHandler(tuple_space); channel.register(key.selector(), SelectionKey.OP_READ, - handler); + new IOHandler(tupleSpace,key)); } + }
--- a/src/fdl/ComDebug_Client.java Mon Aug 18 09:36:13 2008 +0900 +++ b/src/fdl/ComDebug_Client.java Tue Aug 19 05:33:32 2008 +0900 @@ -2,11 +2,11 @@ import java.io.IOException; -//import java.nio.ByteBuffer; -//import java.nio.CharBuffer; -import java.nio.CharBuffer; - +/* + * それぞれのLinda Serverのmeta protocolに接続して、順々にモニタデータを + * 受け取って表示する。 + */ public class ComDebug_Client { static int id; @@ -14,7 +14,7 @@ public static void main(String[] args) { FederatedLinda fdl; - PSXLinda psx; + PSXLindaInterface psx; String host = "localhost"; int port = 10000; int connect_num = 1; @@ -59,7 +59,7 @@ psx.in(PSX.PRIVILEGED_ID_START+connect_num); connect_num++; while(true) { - fdl.sync_com(1000); + fdl.sync(1000); } } catch (IOException nfex) { nfex.printStackTrace();
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/fdl/CommDebugHook.java Tue Aug 19 05:33:32 2008 +0900 @@ -0,0 +1,72 @@ +package fdl; + +import java.nio.ByteBuffer; +import java.nio.channels.SelectionKey; +import java.util.LinkedList; + +public class CommDebugHook implements IOHandlerHook { + + public LinkedList<String> logs = new LinkedList<String>(); + + public void closeHook(SelectionKey key) { + if (key!=null) logs.add(closeLog(key)); + } + + + @Override + public void checkHook(SelectionKey key, int id, int seq, char mode) { + if (key!=null) logs.add(log(key, id, seq, mode, null)); + } + + + @Override + public void inHook(SelectionKey key, int id, int seq, char mode) { + if (key!=null) logs.add(log(key, id, seq, mode, null)); + } + + + @Override + public void outHook(SelectionKey key, int id, int seq, char mode, ByteBuffer data) { + if (key==null) return; + String sendtext = PSX.getdataString(data); + logs.add(log(key, id, seq, mode, sendtext)); + } + + + @Override + public void waitReadHook(SelectionKey key, int id, int seq, char mode) { + if (key!=null) logs.add(log(key, id, seq, mode, null)); + } + + public String log(SelectionKey key,int id, int seq,char mode, String sendtext) { + //通信ログ Hostname:port 'mode' =number 形式でインクリメント + int cnt = 0; + if (sendtext==null) sendtext="none"; + + IOHandler io = (IOHandler) key.attachment(); + String ComKey = io.localString + "--" + io.remoteString + " " + mode; + + io.cnt++; + long time = System.currentTimeMillis(); + return (time+" "+ComKey+"="+cnt+" id="+id+" seq="+seq+" data="+sendtext); + } + + private String closeLog(SelectionKey key) { + String close_Channel = key.channel().toString(); + String[] split = close_Channel.split("/"); + String local[] = split[1].split(" "); + String remote[] = split[2].split("]"); + + String localAddress = local[0]; + String remoteAddress = remote[0]; + + return "CloseInfo >"+localAddress+"--"+remoteAddress; + } + + public ByteBuffer getLog() { + String log = logs.poll(); + if (log==null) return ByteBuffer.allocate(0); + return PSX.string2ByteBuffer(log); + } + +}
--- a/src/fdl/FDLindaServ.java Mon Aug 18 09:36:13 2008 +0900 +++ b/src/fdl/FDLindaServ.java Tue Aug 19 05:33:32 2008 +0900 @@ -4,10 +4,8 @@ import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; -//import java.nio.ByteOrder; import java.nio.channels.ClosedChannelException; import java.nio.channels.SelectionKey; -import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.spi.AbstractSelector; import java.nio.channels.spi.SelectorProvider; @@ -15,11 +13,8 @@ public class FDLindaServ { static final int MAX_REQ = 1; static final int FAIL = (-1); - static final int MAX_UAER = 4; - static final int MAX_TUPLE = 65536; static final int DEF_PORT = 10000; //public static final int TIMEOUT = 5*1000; - public Tuple[] tuple_space; public int port = DEF_PORT; private AbstractSelector selector; private ServerSocketChannel ssChannel; @@ -52,32 +47,29 @@ private void mainLoop() { while(true) { - checkTuple(selector); + checkTuple(); } } public FDLindaServ(int port) throws IOException { this.port = port; - tuple_space = new Tuple[MAX_TUPLE]; - //セレクタを生成 - selector = SelectorProvider.provider().openSelector(); - //ソケット・チャネルを生成・設定 - ssChannel = SelectorProvider.provider().openServerSocketChannel(); - InetSocketAddress address = new InetSocketAddress(InetAddress.getLocalHost(), port); - //ssChannel.socket().bind(new InetSocketAddress(port)); - ssChannel.socket().bind(address); - ssChannel.configureBlocking(false); - //ssChannel.socket().setReuseAddress(true); - System.out.println("Server: litening at "+ssChannel); - //セレクタにチャンネルを登録 - //ssChannel.register(selector, SelectionKey.OP_ACCEPT, new AcceptHandler(tuple_space)); - //ssChannel.register(selector, ssChannel.validOps(), new AcceptHandler(tuple_space)); - ssChannel.register(selector, SelectionKey.OP_ACCEPT, new AcceptHandler(tuple_space)); - + //セレクタを生成 + selector = SelectorProvider.provider().openSelector(); + //ソケット・チャネルを生成・設定 + ssChannel = SelectorProvider.provider().openServerSocketChannel(); + InetSocketAddress address = new InetSocketAddress(InetAddress.getLocalHost(), port); + ssChannel.socket().bind(address); + ssChannel.configureBlocking(false); + //ssChannel.socket().setReuseAddress(true); + System.out.println("Server: litening at "+ssChannel); + //セレクタにチャンネルを登録 + TupleSpace tupleSpace = new TupleSpace(); + ssChannel.register(selector, SelectionKey.OP_ACCEPT, new AcceptHandler(tupleSpace)); + } - public void checkTuple(Selector selector) { + public void checkTuple() { // セレクタによる監視 try { if (selector.select()>0) { @@ -91,4 +83,19 @@ } catch (IOException e) { } } + + public void checkTuple(long timeout) { + // セレクタによる監視 + try { + if (selector.select(timeout)>0) { + for(SelectionKey s:selector.selectedKeys()) { + TupleHandler handler = (TupleHandler)s.attachment(); + handler.handle(s); + } + } + } catch (ClosedChannelException e) { + // we have to do something... + } catch (IOException e) { + } + } }
--- a/src/fdl/FederatedLinda.java Mon Aug 18 09:36:13 2008 +0900 +++ b/src/fdl/FederatedLinda.java Tue Aug 19 05:33:32 2008 +0900 @@ -15,7 +15,6 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.nio.ByteOrder; -import java.nio.CharBuffer; import java.nio.channels.ClosedSelectorException; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; @@ -46,7 +45,7 @@ public int tid; public int seq; public int qsize; - public PSXLinda linda; + public PSXLindaInterface linda; public Selector selector; @@ -68,12 +67,12 @@ seqHash = new Hashtable<Integer, PSXReply>(); } - public PSXLinda open(String _host,int _port) + public PSXLindaInterface open(String _host,int _port) throws IOException { tid++; // System.out.print("Tid = "); // System.out.println(tid); - PSXLinda newlinda = new PSXLinda(this,tid,_host,_port); + PSXLindaInterface newlinda = new PSXLinda(this,tid,_host,_port); linda = newlinda.add(linda); return linda; } @@ -173,118 +172,6 @@ return key_num; } - public int sync_com(long mtimeout) - throws IOException { - int key_num = 0; - Set<SelectionKey> keys; - - while (q_top != null){ - PSXQueue c = q_top; - c.Send(); - q_top = c.next; - // psx_free(c); - // q_top = c = t; - qsize--; - } - - try { - key_num = selector.select(mtimeout); - keys = selector.selectedKeys(); - for (SelectionKey key : keys) { - SocketChannel sock = (SocketChannel)key.channel(); - chkCom(sock); - } - } catch (IOException e) { - e.printStackTrace(); - } catch (ClosedSelectorException e) { - e.printStackTrace(); - } - - return key_num; - } - -// should be in PSXLinda, but sock->linda is unknown here - - private void chkCom(SocketChannel sock) throws IOException { - - int length; - ByteBuffer command = ByteBuffer.allocate(PSX.LINDA_HEADER_SIZE); - command.order(ByteOrder.BIG_ENDIAN); - debug = false; - - sock.read(command); - command.rewind(); - length = command.getInt(PSX.LINDA_DATA_LENGTH_OFFSET); - if (length>0) { - ByteBuffer data = ByteBuffer.allocate(length); - int read = length; - if (debug) { - System.out.print("reading:"); - System.out.println(length); - } - - data.order(ByteOrder.BIG_ENDIAN); - while(read>0) { - read -= sock.read(data); - } - data.rewind(); - - if (debug) { - PSX.printCommand(command, data); - } - //if (debug_com) { - String comdata =""; - CharBuffer chardata = data.asCharBuffer(); - comdata = chardata.toString(); - - //System.out.println("Com_data =>"); - System.out.println(comdata); - //} - /***if (debug) { - System.out.print("header:"); - for(int i=0;i<LINDA_HEADER_SIZE;i++) { - System.out.println(command.get(i)); - } - System.out.print("data:"); - for(int i=0;i<length;i++) { - System.out.println(data.get(i)); - } - data.rewind(); - }***/ - - int rseq = command.getInt(PSX.LINDA_SEQ_OFFSET); - int mode = command.get(PSX.LINDA_MODE_OFFSET); - Integer a; - /*** - if (debug) { - System.out.print("mode = "); - System.out.println(mode); - System.out.print("seq = "); - System.out.println(rseq); - }***/ - try { - PSXReply r = seqHash.get((a = new Integer(rseq))); - seqHash.put(a, null); - if (debug) { - System.out.print("hash value = "); - System.out.println(a.hashCode()); - } - - r.setAnswer(mode,command,data); - - if (r.callback != null ) { - r.callback.callback(data); - } - } catch (NullPointerException e ) { - if (debug) { - System.out.println("hashed reply not found"); - } - // can't happen - return ; - } - } - } - private void chkServe(SocketChannel sock) throws IOException { int length; @@ -335,7 +222,7 @@ }***/ try { PSXReply r = seqHash.get((a = new Integer(rseq))); - seqHash.put(a, null); // should be clear or delete + seqHash.remove(a); if (debug) { System.out.print("hash value = "); System.out.println(a.hashCode());
--- a/src/fdl/IOHandler.java Mon Aug 18 09:36:13 2008 +0900 +++ b/src/fdl/IOHandler.java Tue Aug 19 05:33:32 2008 +0900 @@ -1,33 +1,45 @@ package fdl; import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.UnknownHostException; import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.nio.channels.ClosedChannelException; import java.nio.channels.SelectionKey; import java.nio.channels.SocketChannel; -import java.util.Hashtable; -import java.util.LinkedList; - -public class IOHandler extends TupleSpace { +public class IOHandler { static final boolean debug = false; - public Tuple[] tuple_space; - public ComDebug com_debug; + public TupleSpace tupleSpace; - public IOHandler(Tuple[] _tuple_space) { - super(_tuple_space); - } + String remoteString; + String localString; + public int cnt = 0; - public void handle(SelectionKey key) - throws ClosedChannelException, IOException { + public IOHandler(TupleSpace tupleSpace,SelectionKey key) { + this.tupleSpace = tupleSpace; + + SocketChannel ch = (SocketChannel) key.channel(); + remoteString = getRemoteHostAndPort(ch); + localString = getLocalHostAndPort(ch); + } + + public void handle(SelectionKey key) { // 書き込み可であれば,読み込みを行う if (key.isReadable()) { - read(key); + try { + read(key); + } catch (ClosedChannelException e) { + tupleSpace.hook.closeHook(key); + } catch (IOException e) { + tupleSpace.hook.closeHook(key); + } } } - private void read(SelectionKey key) + void read(SelectionKey key) throws ClosedChannelException, IOException { SocketChannel channel = (SocketChannel)key.channel(); @@ -45,18 +57,12 @@ System.out.print("reading command..."); } count = channel.read(command); - - if(count < 0) { - LinkedList<SocketChannel> reportCh_list = ComDebug.Report_Channellist; - ClosewishComDebug(key, command, reportCh_list); - readsize = -1; - return; - } + if(count < 0) throw new IOException(); readsize -= count; } command.rewind(); - int len = command.getInt(PSX.LINDA_PACKET_LENGTH_OFFSET); + command.getInt(PSX.LINDA_PACKET_LENGTH_OFFSET); int datalen = command.getInt(PSX.LINDA_DATA_LENGTH_OFFSET); ByteBuffer data = ByteBuffer.allocate(datalen); @@ -79,82 +85,81 @@ if (debug) { PSX.printData(command); - } - manager_run(key, command, data, len); - - key.interestOps(key.interestOps() | SelectionKey.OP_READ ); + } + // I believe we don't need this + //key.interestOps(key.interestOps() | SelectionKey.OP_READ ); + assert((key.interestOps()& SelectionKey.OP_READ) !=0); } public void manager_run(SelectionKey key, ByteBuffer command, ByteBuffer data, int len) throws IOException { command.order(ByteOrder.BIG_ENDIAN); int mode = command.get(PSX.LINDA_MODE_OFFSET); - char idc = (char)command.getShort(PSX.LINDA_ID_OFFSET); - int id = (int)idc; - int seq = command.getInt(PSX.LINDA_SEQ_OFFSET); command.rewind(); - String sendtext = "none"; - - - com_debug = new ComDebug(); - Hashtable<String, Integer> com_Loggingtable = ComDebug.Com_Hashtable; - LinkedList<SocketChannel> reportCh_list = ComDebug.Report_Channellist; - + if (debug) { System.out.println("data from : "+key.channel()); } if((mode == '!') || (len == 0)) { - ClosewishComDebug(key, command, reportCh_list); - } - else if(mode == PSX.PSX_CHECK) { - sendtext = Check(key, command); + tupleSpace.hook.closeHook(key); + } else if(mode == PSX.PSX_CHECK) { + tupleSpace.Check(key, command); } else if(mode == PSX.PSX_IN || mode == PSX.PSX_RD){ - sendtext = In_Rd(key, command, mode); + tupleSpace.In_Rd(key, command, mode); } else if (mode == PSX.PSX_WAIT_RD) { - Wait_Rd(key, command, mode); + tupleSpace.Wait_Rd(key, command, mode); } else if(mode == PSX.PSX_OUT) { - sendtext = Out(command, data); + tupleSpace.Out(key, command, data); } else { - System.out.println("Uncorrect buffer"); + tupleSpace.hook.closeHook(key); + System.out.println("Incorrect tuple operation"); System.exit(1); } - - //COM_DEBUG - if(id > PSX.PRIVILEGED_ID_START && id < PSX.PRIVILEGED_ID_END){ - ComDebug.addChannel(key, reportCh_list); - } - //DEBUG用カウンタ - String debug_rep = ComDebug.Com_inc(key, com_Loggingtable, mode, id, seq, sendtext); - //DEBUG用レポート - ComDebug.Report(reportCh_list, command, debug_rep); - - if (key.interestOps() - != (SelectionKey.OP_READ)) { - // 読み込み操作に対する監視を行う - key.interestOps(SelectionKey.OP_READ ); - } } - private void ClosewishComDebug(SelectionKey key, ByteBuffer command, - LinkedList<SocketChannel> reportCh_list) throws IOException { - String close_Channel = key.channel().toString(); - String[] split = close_Channel.split("/"); - String local[] = split[1].split(" "); - String remote[] = split[2].split("]"); - - String localAddress = local[0]; - String remoteAddress = remote[0]; - - Connection_Close(key); - com_debug.reportCh_remove(key, reportCh_list); - ComDebug.Report(reportCh_list, command, "CloseInfo >"+localAddress+"--"+remoteAddress); - } - - private void Connection_Close(SelectionKey key) throws IOException { + void Connection_Close(SelectionKey key) throws IOException { System.out.println("Connection closed by "+key.channel()); SocketChannel channel = (SocketChannel)key.channel(); key.cancel(); channel.close(); } + + private static String getRemoteHostAndPort(SocketChannel channel) { + String socketString = channel.socket().getRemoteSocketAddress().toString(); + String[] split = socketString.split("/"); + int length = split.length; + String hostAndPort = split[length-1]; + split = hostAndPort.split(":"); + String host = split[0]; + String port = split[1]; + int portnum = Integer.parseInt(port); + try { + InetSocketAddress address = new InetSocketAddress(InetAddress.getByName(host), portnum); + host = address.getHostName().toString(); + return (host +":"+port); + } + catch( UnknownHostException e ){ + return hostAndPort; + } + } + + private static String getLocalHostAndPort(SocketChannel channel) { + String socketString = channel.socket().getLocalSocketAddress().toString(); + String[] split = socketString.split("/"); + int length = split.length; + String hostAndPort = split[length-1]; + split = hostAndPort.split(":"); + String host = split[0]; + String port = split[1]; + try { + InetAddress localhost = InetAddress.getLocalHost(); + host = localhost.getHostName(); + return (host +":"+port); + } + catch( UnknownHostException e ){ + return (host +":"+port); + } + } + }
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/fdl/IOHandlerHook.java Tue Aug 19 05:33:32 2008 +0900 @@ -0,0 +1,16 @@ +package fdl; + +import java.nio.ByteBuffer; +import java.nio.channels.SelectionKey; + +public interface IOHandlerHook { + void closeHook(SelectionKey key); + + void inHook(SelectionKey key, int id, int seq, char mode); + + void outHook(SelectionKey key, int id, int seq, char mode, ByteBuffer data); + + void waitReadHook(SelectionKey key, int id, int seq, char mode); + + void checkHook(SelectionKey key, int id, int seq, char mode); +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/fdl/MetaEngine.java Tue Aug 19 05:33:32 2008 +0900 @@ -0,0 +1,35 @@ +package fdl; + +import java.nio.ByteBuffer; + +/** + * @author kono + * Meta Protocol Engine for each Linda Server + */ + +public class MetaEngine { + MetaLinda meta; + boolean running = true; + CommDebugHook commDebug; + + PSXCallback monitor_callback_start = + new PSXCallback() {public void callback(ByteBuffer reply) { + meta.ts.hook = commDebug = new CommDebugHook(); + ByteBuffer data = ByteBuffer.allocate(0) ; + meta.out(PSX.META_MONITOR_DATA, data, 0); + meta.in(PSX.META_MONITOR,monitor_callback); + }}; + PSXCallback monitor_callback = + new PSXCallback() {public void callback(ByteBuffer reply) { + ByteBuffer data = commDebug.getLog(); + meta.out(PSX.META_MONITOR_DATA, data, data.limit()); + meta.in(PSX.META_MONITOR,monitor_callback); + }}; + + void mainLoop() { + meta.in(PSX.META_MONITOR,monitor_callback_start); + meta.in(PSX.META_STOP, new PSXCallback() {public void callback(ByteBuffer reply) { running = false;}}); + while(running) + meta.sync(); + } +}
--- a/src/fdl/MetaLinda.java Mon Aug 18 09:36:13 2008 +0900 +++ b/src/fdl/MetaLinda.java Tue Aug 19 05:33:32 2008 +0900 @@ -12,10 +12,12 @@ package fdl; +import java.io.IOException; import java.nio.ByteBuffer; +import java.util.LinkedList; /** - PSXLinda + MetaLinda * * @author Shinji Kono * @@ -26,11 +28,15 @@ public class MetaLinda implements PSXLindaInterface { - - public PSXLinda next; + public TupleSpace ts; + public FDLindaServ fds; + public FederatedLinda fdl; + public PSXLindaInterface next; + public LinkedList<MetaReply> replies; - public MetaLinda(FederatedLinda _fdl,int _mytsid,String _host,int _port) { - + public MetaLinda(TupleSpace ts,FDLindaServ fdl) { + this.ts = ts; + this.fds = fdl; } public PSXReply in(int id) { @@ -38,47 +44,69 @@ } public void in(int id, PSXCallback callback) { + replies.add(new MetaReply(PSX.PSX_IN,id,ts, callback)); } public PSXReply ck(int id) { - PSXReply r = null; + MetaReply r = new MetaReply(PSX.PSX_CHECK,id,ts); return r; } public void ck(int id, PSXCallback callback) { + replies.add(new MetaReply(PSX.PSX_CHECK,id,ts,callback)); } public PSXReply out(int id, ByteBuffer data,int size) { - return null; + MetaReply r = new MetaReply(PSX.PSX_OUT,id,ts,data,null); + replies.add(r); + return r; } public PSXReply update(int id, ByteBuffer data,int size) { - return null; + MetaReply r = new MetaReply(PSX.PSX_UPDATE,id,ts,data,null); + return r; } public void update(int id, ByteBuffer data,int size,PSXCallback callback) { + MetaReply r = new MetaReply(PSX.PSX_UPDATE,id,ts,data,callback); + replies.add(r); } public PSXReply rd(int id) { - return null; + MetaReply r = new MetaReply(PSX.PSX_RD,id,ts); + return r; } public void rd(int id, PSXCallback callback) { + replies.add(new MetaReply(PSX.PSX_RD,id,ts,callback)); } - public PSXLinda add(PSXLinda linda) { + public PSXLindaInterface add(PSXLindaInterface linda) { next = linda; - return null; + return this; } public void send(ByteBuffer command,ByteBuffer data) { } public int sync() { - return 0; + return sync(0); } - public int sync(long mtime) { + public int sync(long timeout) { + fds.checkTuple(timeout); + for(MetaReply r: replies) { + if (r.ready()) { + replies.remove(); + } + } + if (fdl!=null) { + try { + fdl.sync(timeout); + } catch (IOException e) { + e.printStackTrace(); + } + } return 0; } }
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/fdl/MetaReply.java Tue Aug 19 05:33:32 2008 +0900 @@ -0,0 +1,59 @@ +package fdl; + +import java.nio.ByteBuffer; + +public class MetaReply extends PSXReply { + + public int id; + public TupleSpace ts; + + public MetaReply(int mode, int id,TupleSpace ts) { + this.mode = mode; + this.id = id; + this.ts = ts; + this.command = ByteBuffer.allocate(PSX.LINDA_HEADER_SIZE); + } + + public MetaReply(int mode, int id, TupleSpace ts,PSXCallback callback) { + this(mode,id,ts); + this.callback = callback; + } + + public MetaReply(int mode, int id, TupleSpace ts,ByteBuffer data, + PSXCallback callback) { + this(mode,id,ts); + this.data = data; + this.callback = callback; + } + + void checkTuple() { + ByteBuffer data = ts.IN(id, mode, command); + if (data!=null) { + this.data = data; + mode = PSX.PSX_ANSWER; + } + } + + public boolean ready() { + switch(mode) { + case PSX.PSX_IN: + case PSX.PSX_RD: + checkTuple(); + break; + case PSX.PSX_CHECK: + data = ts.check1(null, command); + if (data!=null) { + mode = PSX.PSX_ANSWER; + } + break; + case PSX.PSX_OUT: + ts.Out(null, command, data); + return true; + case PSX.PSX_UPDATE: + // not implemented + break; + } + return mode==PSX.PSX_ANSWER; + } + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/fdl/NullIOHandlerHook.java Tue Aug 19 05:33:32 2008 +0900 @@ -0,0 +1,30 @@ +package fdl; + +import java.nio.ByteBuffer; +import java.nio.channels.SelectionKey; + +public class NullIOHandlerHook implements IOHandlerHook { + + @Override + public void checkHook(SelectionKey key, int id, int seq, char mode) { + } + + @Override + public void closeHook(SelectionKey key) { + } + + @Override + public void inHook(SelectionKey key, int id, int seq, char mode) { + } + + @Override + public void outHook(SelectionKey key, int id, int seq, char mode, + ByteBuffer data) { + } + + @Override + public void waitReadHook(SelectionKey key, int id, int seq, char mode) { + } + + +}
--- a/src/fdl/PSX.java Mon Aug 18 09:36:13 2008 +0900 +++ b/src/fdl/PSX.java Tue Aug 19 05:33:32 2008 +0900 @@ -14,6 +14,10 @@ import java.nio.ByteBuffer; import java.nio.ByteOrder; +import java.nio.CharBuffer; +import java.nio.charset.CharacterCodingException; +import java.nio.charset.Charset; +import java.nio.charset.CharsetDecoder; @@ -47,6 +51,9 @@ static final int PRIVILEGED_ID_START = 32768; static final int PRIVILEGED_ID_END = 36864; + static final int META_STOP = PRIVILEGED_ID_START; + static final int META_MONITOR = PRIVILEGED_ID_START+1; + static final int META_MONITOR_DATA = PRIVILEGED_ID_START+2; // this method should be removed static void setReportCommand(ByteBuffer command, String report_txt) { @@ -100,6 +107,33 @@ command.putInt(LINDA_SEQ_OFFSET, seq); command.rewind(); } + + public static ByteBuffer string2ByteBuffer(String log) { + ByteBuffer blog = ByteBuffer.allocate(log.length()*2); // this is incorrect... + for(int i=0;i<log.length();i++) { + blog.putChar(log.charAt(i)); + } + blog.flip(); + return blog; + } + + public static String getdataString(ByteBuffer data) { + String sendtext; + data.rewind(); + //Decode UTF-8 to System Encoding(UTF-16) + Charset charset = Charset.forName("UTF-8"); + CharsetDecoder decoder = charset.newDecoder(); + CharBuffer cb = null; + try { + cb = decoder.decode(data); + } catch (CharacterCodingException e) { + e.printStackTrace(); + } + cb.rewind(); + + sendtext = cb.toString(); + return sendtext; + } }
--- a/src/fdl/PSXLinda.java Mon Aug 18 09:36:13 2008 +0900 +++ b/src/fdl/PSXLinda.java Tue Aug 19 05:33:32 2008 +0900 @@ -40,7 +40,7 @@ public String host; public int port; public int mytsid; - public PSXLinda next; + public PSXLindaInterface next; static final boolean debug = false; public PSXLinda(FederatedLinda _fdl,int _mytsid,String _host,int _port) @@ -136,7 +136,7 @@ fdl.psx_queue(this, id, null, 0, PSX.PSX_RD, callback); } - public PSXLinda add(PSXLinda linda) { + public PSXLindaInterface add(PSXLindaInterface linda) { next = linda; return this; }
--- a/src/fdl/PSXLindaInterface.java Mon Aug 18 09:36:13 2008 +0900 +++ b/src/fdl/PSXLindaInterface.java Tue Aug 19 05:33:32 2008 +0900 @@ -24,7 +24,7 @@ public void rd(int id, PSXCallback callback) ; - public PSXLinda add(PSXLinda linda) ; + public PSXLindaInterface add(PSXLindaInterface linda) ; public void send(ByteBuffer command,ByteBuffer data) throws IOException ;
--- a/src/fdl/TestPSXLinda.java Mon Aug 18 09:36:13 2008 +0900 +++ b/src/fdl/TestPSXLinda.java Tue Aug 19 05:33:32 2008 +0900 @@ -31,7 +31,7 @@ public static void main (String args[]) { FederatedLinda fdl; - PSXLinda psx; + PSXLindaInterface psx; String host = "localhost"; int port = 10000; PSXReply r;
--- a/src/fdl/TupleSpace.java Mon Aug 18 09:36:13 2008 +0900 +++ b/src/fdl/TupleSpace.java Tue Aug 19 05:33:32 2008 +0900 @@ -2,56 +2,75 @@ import java.io.IOException; import java.nio.ByteBuffer; -import java.nio.CharBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.SocketChannel; -import java.nio.charset.CharacterCodingException; -import java.nio.charset.Charset; -import java.nio.charset.CharsetDecoder; public class TupleSpace { static final boolean debug = true; static final int CAPSIZE = 4194304; + public static int user = 0; + public byte userchar[] = new byte[2]; public Tuple[] tuple_space; + public IOHandlerHook hook = new NullIOHandlerHook(); - public TupleSpace(Tuple[] _tuple_space) { - super(); + public TupleSpace() { // 読みこんだデータを格納するためのリストの初期化 - tuple_space = _tuple_space; + tuple_space = new Tuple[TupleHandler.MAX_TUPLE]; } - public TupleSpace() { - // TODO Auto-generated constructor stub + + public void newUser() { + Tuple tmpTuple; + //初期生成 + if((tmpTuple = tuple_space[TupleHandler.MAX_TUPLE-1]) == null) { + tmpTuple = tuple_space[TupleHandler.MAX_TUPLE-1] = new Tuple(); + tmpTuple.next = null; + } else { + while(tmpTuple.next != null) tmpTuple = tmpTuple.next; + tmpTuple.next = new Tuple(); + tmpTuple = tmpTuple.next; + tmpTuple.next = null; + } + + user++; + + ByteBuffer data = ByteBuffer.allocate(2); + data.clear(); + userchar[0] = (byte) (user/10 + '0'); + userchar[1] = (byte) (user%10 + '0'); + + data.put(userchar[0]); + data.put(userchar[1]); + + data.rewind(); + tmpTuple.setData(data); + //Tuple + int id = TupleHandler.MAX_TUPLE-1; + tmpTuple.setTuple('o', id, 0, data.limit(), data); + System.out.println("Server: assign id "+user); } - - protected String Out(ByteBuffer command, ByteBuffer data) throws IOException { + + protected void Out(SelectionKey key,ByteBuffer command, ByteBuffer data) { Tuple tuple; int id; int datasize; char idc = (char)command.getShort(PSX.LINDA_ID_OFFSET); command.rewind(); id = (int)idc; - String sendtext = "none"; - + datasize = command.getInt(PSX.LINDA_DATA_LENGTH_OFFSET); command.rewind(); System.out.println("*** out command : id = "+id +" ***"); + int seq = command.getInt(PSX.LINDA_SEQ_OFFSET); + command.rewind(); + hook.outHook(key,id,seq,'o',data); while((tuple_space[id] != null) && ((tuple_space[id].mode == PSX.PSX_WAIT_RD)||(tuple_space[id].mode == PSX.PSX_RD))) { PSX.setAnserCommand(command, tuple_space[id].getSeq()); - //if(debug){ - //int sendsize = tmpTuple.getdataLength()+PSX.LINDA_HEADER_SIZE; - //System.out.println("send size "+sendsize+" : mode = "+(char)mode); - //} - //ByteBuffer sendcommand = tmpTuple.getCommand(); - //ByteBuffer senddata = tmpTuple.getData(); send(tuple_space[id].ch, command, data); - sendtext = getdataString(data); - - removeTuple(id); tuple = null; } @@ -62,12 +81,7 @@ int sendsize = datasize+PSX.LINDA_HEADER_SIZE; System.out.println("send size "+sendsize+" : mode = "+(char)'a'); } - //ByteBuffer sendcommand = tmpTuple.getCommand(); - //ByteBuffer senddata = tmpTuple.getData(); send(tuple_space[id].ch, command, data); - - sendtext = getdataString(data); - removeTuple(id); tuple = null; } else if ((tuple_space[id] == null)|| (tuple_space[id].getMode() == PSX.PSX_OUT)) { @@ -83,8 +97,6 @@ } tuple.setMode('o'); - int seq = command.getInt(PSX.LINDA_SEQ_OFFSET); - command.rewind(); tuple.setSeq(seq); tuple.setData(data); tuple.setDataLength(datasize); @@ -98,7 +110,6 @@ data.clear(); System.exit(1); } - return sendtext; } private void removeTuple(int id) { @@ -124,6 +135,9 @@ int seq = command.getInt(PSX.LINDA_SEQ_OFFSET); command.rewind(); tuple.setSeq(seq); + + hook.waitReadHook(key,id,seq,(char)mode); + tuple.ch = (SocketChannel) key.channel(); tuple.setDataLength(0); ByteBuffer buff = ByteBuffer.allocate(0); @@ -135,7 +149,7 @@ } } - protected String In_Rd(SelectionKey key, ByteBuffer command, int mode) + protected void In_Rd(SelectionKey key, ByteBuffer command, int mode) throws IOException { Tuple tuple = read_in_1(key, command, mode); @@ -145,8 +159,6 @@ ByteBuffer senddata = tuple.getData(); send(key,sendcommand, senddata); } - String sendtext = getdataString(tuple.getData()); - return sendtext; } private Tuple read_in_1(SelectionKey key, ByteBuffer command, int mode) { @@ -159,10 +171,12 @@ char idc = (char)command.getShort(PSX.LINDA_ID_OFFSET); command.rewind(); id = (int)idc; - + int seq = command.getInt(PSX.LINDA_SEQ_OFFSET); + command.rewind(); System.out.println("*** "+(char)mode+" command : id = "+ id +" ***\n"); - + hook.inHook(key,id,seq,(char)mode); + tuple = tuple_space[id]; //wを無視 @@ -172,79 +186,106 @@ } if (tuple != null && (tuple.mode == 'o')){ - //tmpTuple = new Tuple((SocketChannel)key.channel()); - int seq = command.getInt(PSX.LINDA_SEQ_OFFSET); - command.rewind(); - tuple.setCommand('a', seq); - - if(debug){ - int sendsize = tuple.getdataLength()+PSX.LINDA_HEADER_SIZE; - System.out.println("send size "+sendsize+" : mode = "+(char)tuple.getMode()); - } - - - - //INの場合はremoveする - if(mode == PSX.PSX_IN) { - if(tuple.data != null){ - //ByteBuffer buff = ByteBuffer.allocate(0); - //tmpTuple.setData(buff); - tuple.data = null; - } - if(temp != null){ - temp.next = tuple.next; - } - else { - tuple_space[id] = tuple.next; - } - } + tupleIsAvailable(command, mode, tuple, id, temp); } else { - if(tuple == null) { - //ServerSocketChannel sc = (ServerSocketChannel)key.channel(); - tuple = tuple_space[id] = new Tuple((SocketChannel)key.channel()); - tuple.next = null; - }else { - while(tuple.next !=null) tuple =tuple.next; - tuple.next= new Tuple((SocketChannel)key.channel()); - tuple = tuple.next; - tuple.next = null; - } - - tuple.setMode(mode); - int seq2 = command.getInt(PSX.LINDA_SEQ_OFFSET); - command.rewind(); - tuple.setSeq(seq2); - tuple.ch = (SocketChannel) key.channel(); - tuple.setDataLength(0); - ByteBuffer buff = ByteBuffer.allocate(0); - buff.rewind(); - tuple.setData(buff); - tuple = null; - - if(debug){ - System.out.println("data inserted insert seq = "+seq2 +", id = "+id); - } + tuple = setupWait(key, command, mode, tuple, id); } return tuple; } - protected String Check(SelectionKey key, ByteBuffer command) throws IOException { - String sendtext; - ByteBuffer data = check1(command); - send(key, command, data); + public ByteBuffer IN(int id,int mode, ByteBuffer command) { + Tuple tuple,temp=null; + tuple = tuple_space[id]; + + //wを無視 + while(tuple != null && tuple.next != null && (tuple.mode == 'w')){ + temp = tuple; + tuple = tuple.next; + } + + if (tuple != null && (tuple.mode == 'o')){ + ByteBuffer data = tuple.data; + tupleIsAvailable(command, mode, tuple, id, temp); + return data; + } + return null; + } + + private void tupleIsAvailable(ByteBuffer command, int mode, Tuple tuple, + int id, Tuple temp) { + //tmpTuple = new Tuple((SocketChannel)key.channel()); + int seq = command.getInt(PSX.LINDA_SEQ_OFFSET); + command.rewind(); + tuple.setCommand('a', seq); + + if(debug){ + int sendsize = tuple.getdataLength()+PSX.LINDA_HEADER_SIZE; + System.out.println("send size "+sendsize+" : mode = "+(char)tuple.getMode()); + } + + - sendtext = getdataString(data); - - return sendtext; + //INの場合はremoveする + if(mode == PSX.PSX_IN) { + if(tuple.data != null){ + //ByteBuffer buff = ByteBuffer.allocate(0); + //tmpTuple.setData(buff); + tuple.data = null; + } + if(temp != null){ + temp.next = tuple.next; + } + else { + tuple_space[id] = tuple.next; + } + } } - private ByteBuffer check1(ByteBuffer command) { + private Tuple setupWait(SelectionKey key, ByteBuffer command, int mode, + Tuple tuple, int id) { + if(tuple == null) { + //ServerSocketChannel sc = (ServerSocketChannel)key.channel(); + tuple = tuple_space[id] = new Tuple((SocketChannel)key.channel()); + tuple.next = null; + }else { + while(tuple.next !=null) tuple =tuple.next; + tuple.next= new Tuple((SocketChannel)key.channel()); + tuple = tuple.next; + tuple.next = null; + } + + tuple.setMode(mode); + int seq2 = command.getInt(PSX.LINDA_SEQ_OFFSET); + command.rewind(); + tuple.setSeq(seq2); + tuple.ch = (SocketChannel) key.channel(); + tuple.setDataLength(0); + ByteBuffer buff = ByteBuffer.allocate(0); + buff.rewind(); + tuple.setData(buff); + tuple = null; + + if(debug){ + System.out.println("data inserted insert seq = "+seq2 +", id = "+id); + } + return tuple; + } + + protected void Check(SelectionKey key, ByteBuffer command) throws IOException { + ByteBuffer data = check1(key,command); + send(key, command, data); + } + + public ByteBuffer check1(SelectionKey key,ByteBuffer command) { ByteBuffer data; Tuple tmpTuple; int id; char idc = (char)command.getShort(PSX.LINDA_ID_OFFSET); command.rewind(); id = (int)idc; + int seq = command.getInt(PSX.LINDA_SEQ_OFFSET); + command.rewind(); + hook.checkHook(key,id,seq,'c'); tmpTuple = tuple_space[id]; while(tmpTuple != null && tmpTuple.next != null && (tmpTuple.mode == 'w')){ @@ -263,64 +304,44 @@ return data; } - public void send(SocketChannel ch, ByteBuffer command, ByteBuffer data) - throws IOException { - if (debug) { - if (command == null) { - System.out.println("Manager_run: command is null"); - } - if (data == null) { - System.out.println("Manager_run: data is null"); - } - } - int send_size = PSX.LINDA_HEADER_SIZE; - int count = 0; - - //command Send - command.rewind(); - while(send_size > 0){ - count = ch.write(command); - if(count < 0){ - System.out.println("Write Falied! close ch:"+ch); - ch.close(); - return; - } - send_size -= count; - } - - //data Send - data.rewind(); - if(data != null) { - data.rewind(); - ch.write(data); - } + public void send(SocketChannel ch, ByteBuffer command, ByteBuffer data) { + if (debug) { + if (command == null) { + System.out.println("Manager_run: command is null"); + } + if (data == null) { + System.out.println("Manager_run: data is null"); + } + } + int send_size = PSX.LINDA_HEADER_SIZE; + int count = 0; + + try { + //command Send + command.rewind(); + while(send_size > 0){ + count = ch.write(command); + if(count < 0) throw new IOException(); + send_size -= count; } - public void send(SelectionKey key, ByteBuffer command, ByteBuffer data) - throws IOException { - SocketChannel ch = (SocketChannel)key.channel(); - send(ch,command,data); + if (data==null) return; + //data Send + data.rewind(); + while(data.remaining() > 0){ + count = ch.write(data); + if(count < 0) throw new IOException(); } - - private String getdataString(ByteBuffer data) { - String sendtext; - data.rewind(); - //set sendtext - //CharBuffer chardata = data.asCharBuffer(); - - //Decode UTF-8 to System Encoding(UTF-16) - Charset charset = Charset.forName("UTF-8"); - CharsetDecoder decoder = charset.newDecoder(); - CharBuffer cb = null; - try { - cb = decoder.decode(data); - } catch (CharacterCodingException e) { - e.printStackTrace(); + } catch (IOException e) { + System.out.println("Write Falied on:"+ch); + return; } - cb.rewind(); - - sendtext = cb.toString(); - return sendtext; } + public void send(SelectionKey key, ByteBuffer command, ByteBuffer data) { + SocketChannel ch = (SocketChannel)key.channel(); + send(ch,command,data); + } + + } \ No newline at end of file