Mercurial > hg > FederatedLinda
changeset 35:fe338d497c72 meta-comdebug-wokred
FederatedLinda was static singleton. It does not work on Thread based test.
author | kono |
---|---|
date | Sun, 24 Aug 2008 19:07:28 +0900 |
parents | e7c5958fd285 |
children | d5bca4b5ee95 |
files | src/fdl/AcceptHandler.java src/fdl/CommDebugHook.java src/fdl/FDLindaServ.java src/fdl/FederatedLinda.java src/fdl/IOHandler.java src/fdl/PSX.java src/fdl/PSXLindaImpl.java src/fdl/test/TestMetaLinda.java src/fdl/test/TestMonitor.java |
diffstat | 9 files changed, 34 insertions(+), 14 deletions(-) [+] |
line wrap: on
line diff
--- a/src/fdl/AcceptHandler.java Sun Aug 24 17:36:14 2008 +0900 +++ b/src/fdl/AcceptHandler.java Sun Aug 24 19:07:28 2008 +0900 @@ -9,10 +9,12 @@ public class AcceptHandler implements TupleHandler { public TupleSpace tupleSpace; + public ServerSocketChannel ss; - public AcceptHandler(TupleSpace tupleSpace) { + public AcceptHandler(ServerSocketChannel ss, TupleSpace tupleSpace) { // 読みこんだデータを格納するためのリストの初期化 - this.tupleSpace = tupleSpace; + this.tupleSpace = tupleSpace; + this.ss = ss; } public void handle(SelectionKey key) @@ -20,6 +22,9 @@ ServerSocketChannel serverChannel = (ServerSocketChannel)key.channel(); + if (ss!=serverChannel) { + System.err.println("Wrong server socket channel."); + } // アクセプト処理 SocketChannel channel = serverChannel.accept(); channel.configureBlocking(false);
--- a/src/fdl/CommDebugHook.java Sun Aug 24 17:36:14 2008 +0900 +++ b/src/fdl/CommDebugHook.java Sun Aug 24 19:07:28 2008 +0900 @@ -14,26 +14,31 @@ public void checkHook(SelectionKey key, int id, int seq, char mode) { - if (key==null || id<PSX.PRIVILEGED_ID_START) return; + if (isPrivilege(key, id)) return; logs.add(log(key, id, seq, mode, null)); } public void inHook(SelectionKey key, int id, int seq, char mode) { - if (key==null || id<PSX.PRIVILEGED_ID_START) return; + if (isPrivilege(key, id)) return; logs.add(log(key, id, seq, mode, null)); } + private boolean isPrivilege(SelectionKey key, int id) { + return key==null || id>=PSX.PRIVILEGED_ID_START; + } + + public void outHook(SelectionKey key, int id, int seq, char mode, ByteBuffer data) { - if (key==null || id<PSX.PRIVILEGED_ID_START) return; + if (isPrivilege(key, id)) return; String sendtext = PSX.getdataString(data); logs.add(log(key, id, seq, mode, sendtext)); } public void waitReadHook(SelectionKey key, int id, int seq, char mode) { - if (key==null || id<PSX.PRIVILEGED_ID_START) return; + if (isPrivilege(key, id)) return; logs.add(log(key, id, seq, mode, null)); }
--- a/src/fdl/FDLindaServ.java Sun Aug 24 17:36:14 2008 +0900 +++ b/src/fdl/FDLindaServ.java Sun Aug 24 19:07:28 2008 +0900 @@ -70,7 +70,7 @@ System.out.println("Server: litening at "+ssChannel); //セレクタにチャンネルを登録 tupleSpace = new TupleSpace(); - ssChannel.register(selector, SelectionKey.OP_ACCEPT, new AcceptHandler(tupleSpace)); + ssChannel.register(selector, SelectionKey.OP_ACCEPT, new AcceptHandler(ssChannel,tupleSpace)); }
--- a/src/fdl/FederatedLinda.java Sun Aug 24 17:36:14 2008 +0900 +++ b/src/fdl/FederatedLinda.java Sun Aug 24 19:07:28 2008 +0900 @@ -37,7 +37,7 @@ public class FederatedLinda { - static FederatedLinda fdl = new FederatedLinda(); + FederatedLinda fdl; static int MAX_SEQUENCE = 2048; static boolean debug = false; @@ -53,6 +53,7 @@ public Hashtable<Integer,PSXReply> seqHash; public static FederatedLinda init() { + FederatedLinda fdl = new FederatedLinda(); return fdl; }
--- a/src/fdl/IOHandler.java Sun Aug 24 17:36:14 2008 +0900 +++ b/src/fdl/IOHandler.java Sun Aug 24 19:07:28 2008 +0900 @@ -10,6 +10,7 @@ public class IOHandler implements TupleHandler { static final boolean debug = true; public TupleSpace tupleSpace; + public SocketChannel ch; String remoteString; String localString; @@ -20,6 +21,7 @@ remoteString = PSX.getRemoteHostAndPort(ch); localString = PSX.getLocalHostAndPort(ch); + this.ch = ch; } public void handle(SelectionKey key) { @@ -40,6 +42,9 @@ void read(SelectionKey key) throws ClosedChannelException, IOException { SocketChannel channel = (SocketChannel)key.channel(); + if (ch!=channel) { + System.err.println("Wrong socket on IOHandler"); + } // 読み込み用のバッファの生成 ByteBuffer command = ByteBuffer.allocate(PSX.LINDA_HEADER_SIZE);
--- a/src/fdl/PSX.java Sun Aug 24 17:36:14 2008 +0900 +++ b/src/fdl/PSX.java Sun Aug 24 19:07:28 2008 +0900 @@ -139,6 +139,9 @@ // if datalen in the header is different from ByteBuffer remaining(), we lost // protocol synchronization. Make sure to have correct length now. if (true) { + int seq = command.getInt(LINDA_SEQ_OFFSET); +// if (seq>500) +// System.err.println("Illegal seq sent."); int datalen = data==null?0:(data.limit()-data.position()); if (command.getInt(LINDA_DATA_LENGTH_OFFSET)>0 && (data==null||data.remaining()!=command.getInt(LINDA_DATA_LENGTH_OFFSET))) { System.err.println("Missing data.");
--- a/src/fdl/PSXLindaImpl.java Sun Aug 24 17:36:14 2008 +0900 +++ b/src/fdl/PSXLindaImpl.java Sun Aug 24 19:07:28 2008 +0900 @@ -45,7 +45,6 @@ public PSXLindaImpl(FederatedLinda _fdl,int _mytsid,String _host,int _port) throws IOException { - Socket socket; host = _host; port = _port; mytsid = _mytsid; @@ -54,9 +53,9 @@ socketChannel = SocketChannel.open(); socketChannel.configureBlocking(false); - socket = socketChannel.socket(); + // Socket socket = socketChannel.socket(); // socket.setReuseAddress(true); Client side don't need this. - socket.setTcpNoDelay(true); + // socket.setTcpNoDelay(true); socketChannel.connect(new InetSocketAddress(_host, _port)); while (! socketChannel.finishConnect()) { @@ -70,7 +69,8 @@ } } } - + System.err.println("Linda client connect to "+socketChannel); + socketChannel.register(fdl.selector(), SelectionKey.OP_READ);
--- a/src/fdl/test/TestMetaLinda.java Sun Aug 24 17:36:14 2008 +0900 +++ b/src/fdl/test/TestMetaLinda.java Sun Aug 24 19:07:28 2008 +0900 @@ -65,7 +65,7 @@ data.flip(); psx.out(1,data); } - for(int i=0;i<100;i++) { + for(int i=0;i<50;i++) { if (r.ready()) { System.err.println("Get:"+r.data.getInt()); r = psx.in(1);
--- a/src/fdl/test/TestMonitor.java Sun Aug 24 17:36:14 2008 +0900 +++ b/src/fdl/test/TestMonitor.java Sun Aug 24 19:07:28 2008 +0900 @@ -73,7 +73,8 @@ Thread tm = new Thread(m); tm.start(); Thread tc = new Thread(c); tc.start(); try { - ts.join(); tm.join(); tc.join(); + ts.join(); tm.join(); + tc.join(); } catch (InterruptedException e) { } }