Mercurial > hg > FederatedLinda
changeset 33:64071f8e2e0d
*** empty log message ***
author | kono |
---|---|
date | Sun, 24 Aug 2008 03:23:08 +0900 |
parents | 7e0f6f00763e |
children | e7c5958fd285 |
files | src/fdl/ComDebug_Client.java src/fdl/FederatedLinda.java src/fdl/IOHandler.java src/fdl/MetaLinda.java src/fdl/MetaLogEngine.java src/fdl/MetaReply.java src/fdl/NullMetaEngine.java src/fdl/PSX.java src/fdl/PSXQueue.java src/fdl/Tuple.java src/fdl/TupleSpace.java src/fdl/test/TestMetaLinda.java src/fdl/test/TestMonitor.java |
diffstat | 13 files changed, 97 insertions(+), 85 deletions(-) [+] |
line wrap: on
line diff
--- a/src/fdl/ComDebug_Client.java Fri Aug 22 14:49:52 2008 +0900 +++ b/src/fdl/ComDebug_Client.java Sun Aug 24 03:23:08 2008 +0900 @@ -13,7 +13,6 @@ static int id; static final boolean debug = false; - ByteBuffer nullBuffer = ByteBuffer.allocate(0); PSXCallback debugCallback ; FederatedLinda fdl; @@ -77,16 +76,16 @@ psx.in(65535, new MyCallBack(psx)); System.err.println("COM_DEBUG Connected.["+host+":"+port+"]"); - psx.out(PSX.META_MONITOR, nullBuffer); + psx.out(PSX.META_MONITOR, null); debugCallback = new PSXCallback() { public void callback(ByteBuffer reply) { System.err.println("COM_DEBUG: "+PSX.getdataString(reply)); - psx.out(PSX.META_MONITOR, nullBuffer); + psx.out(PSX.META_MONITOR, null); psx.in(PSX.META_MONITOR_DATA,debugCallback); } }; - psx.out(PSX.META_MONITOR, nullBuffer); + psx.out(PSX.META_MONITOR, null); psx.in(PSX.META_MONITOR_DATA,debugCallback); connect_num++;
--- a/src/fdl/FederatedLinda.java Fri Aug 22 14:49:52 2008 +0900 +++ b/src/fdl/FederatedLinda.java Sun Aug 24 03:23:08 2008 +0900 @@ -39,7 +39,7 @@ static FederatedLinda fdl = new FederatedLinda(); static int MAX_SEQUENCE = 2048; - static boolean debug = true; + static boolean debug = false; public int tid; public int seq; @@ -140,8 +140,14 @@ for (Iterator<SelectionKey> it = selector.selectedKeys().iterator();it.hasNext(); ) { SelectionKey s = it.next(); it.remove(); - chkServe((SocketChannel)s.channel()); - } + try { + if (!s.isReadable()) throw new IOException(); + chkServe((SocketChannel)s.channel()); + } catch (IOException e) { + selector.selectedKeys().remove(s); + System.err.println(""+s.channel()+" is closed."); + } + } } } catch (IOException e) { e.printStackTrace(); @@ -152,26 +158,10 @@ return key_num; } - private void chkServe(SocketChannel sock) - throws IOException { - int length; + private void chkServe(SocketChannel sock) throws IOException { ByteBuffer command = ByteBuffer.allocate(PSX.LINDA_HEADER_SIZE); command.order(ByteOrder.BIG_ENDIAN); - PSX.receive(sock, command, PSX.LINDA_HEADER_SIZE); - - length = command.getInt(PSX.LINDA_DATA_LENGTH_OFFSET); - ByteBuffer data = ByteBuffer.allocate(length); - int read = length; - if (debug) { - System.out.print("client reading:"); - System.out.println(length); - } - - data.order(ByteOrder.BIG_ENDIAN); - while(read>0) { - read -= sock.read(data); - } - data.rewind(); + ByteBuffer data = PSX.receivePacket(sock, command); if (debug) { PSX.printCommand("chkServe:",command, data);
--- a/src/fdl/IOHandler.java Fri Aug 22 14:49:52 2008 +0900 +++ b/src/fdl/IOHandler.java Sun Aug 24 03:23:08 2008 +0900 @@ -28,8 +28,10 @@ try { read(key); } catch (ClosedChannelException e) { + key.cancel(); tupleSpace.hook.closeHook(key); } catch (IOException e) { + key.cancel(); tupleSpace.hook.closeHook(key); } } @@ -42,26 +44,8 @@ // 読み込み用のバッファの生成 ByteBuffer command = ByteBuffer.allocate(PSX.LINDA_HEADER_SIZE); command.order(ByteOrder.BIG_ENDIAN); - command.clear(); - int readsize = PSX.LINDA_HEADER_SIZE; - - // 読み込み - PSX.receive(channel, command, readsize); - - command.getInt(PSX.LINDA_PACKET_LENGTH_OFFSET); - int datalen = command.getInt(PSX.LINDA_DATA_LENGTH_OFFSET); - - ByteBuffer data = ByteBuffer.allocate(datalen); - if (debug) { - System.out.println("reading: " +datalen); - } - - data.order(ByteOrder.BIG_ENDIAN); - PSX.receive(channel, data, datalen); - - command.order(ByteOrder.BIG_ENDIAN); - command.rewind(); + ByteBuffer data = PSX.receivePacket(channel, command); if (debug) { PSX.printData("IOHandler:",command); @@ -71,7 +55,7 @@ } public void manager_run(SelectionKey key, ByteBuffer command, ByteBuffer data) throws IOException { - command.order(ByteOrder.BIG_ENDIAN); + command.order(ByteOrder.BIG_ENDIAN); int mode = command.get(PSX.LINDA_MODE_OFFSET); command.rewind(); @@ -82,8 +66,7 @@ tupleSpace.hook.closeHook(key); } else if(mode == PSX.PSX_CHECK) { tupleSpace.Check(key, command); - } - else if(mode == PSX.PSX_IN || mode == PSX.PSX_RD){ + } else if(mode == PSX.PSX_IN || mode == PSX.PSX_RD){ tupleSpace.In_Rd(key, command, mode); } else if (mode == PSX.PSX_WAIT_RD) { tupleSpace.Wait_Rd(key, command, mode); @@ -91,7 +74,7 @@ tupleSpace.Out(key, command, data); } else { tupleSpace.hook.closeHook(key); - System.out.println("Incorrect tuple operation"); + System.err.println("Incorrect tuple operation"); System.exit(1); }
--- a/src/fdl/MetaLinda.java Fri Aug 22 14:49:52 2008 +0900 +++ b/src/fdl/MetaLinda.java Sun Aug 24 03:23:08 2008 +0900 @@ -100,12 +100,14 @@ public int sync(long timeout) { fds.checkTuple(timeout); - // copy replies to avoid insert during r.ready() - LinkedList<MetaReply> list = replies; - replies = new LinkedList<MetaReply>(); - for(MetaReply r:list) { - if (!r.ready()) { - addReply(r); + if (replies.size()>0) { + // copy replies to avoid insert during r.ready() + LinkedList<MetaReply> list = replies; + replies = new LinkedList<MetaReply>(); + for(MetaReply r:list) { + if (!r.ready()) { + addReply(r); + } } } if (fdl!=null) {
--- a/src/fdl/MetaLogEngine.java Fri Aug 22 14:49:52 2008 +0900 +++ b/src/fdl/MetaLogEngine.java Sun Aug 24 03:23:08 2008 +0900 @@ -19,8 +19,7 @@ PSXCallback monitor_callback_start = new PSXCallback() {public void callback(ByteBuffer reply) { meta.ts.hook = commDebug = new CommDebugHook(); - ByteBuffer data = ByteBuffer.allocate(0) ; - meta.out(PSX.META_MONITOR_DATA, data); + meta.out(PSX.META_MONITOR_DATA, null); meta.in(PSX.META_MONITOR,monitor_callback); }}; PSXCallback monitor_callback =
--- a/src/fdl/MetaReply.java Fri Aug 22 14:49:52 2008 +0900 +++ b/src/fdl/MetaReply.java Sun Aug 24 03:23:08 2008 +0900 @@ -48,7 +48,7 @@ } break; case PSX.PSX_OUT: - command=PSX.setCommand(PSX.PSX_OUT, id, 0, data.remaining()); + command=PSX.setCommand(PSX.PSX_OUT, id, 0, data); ts.Out(null, command, data); return true; case PSX.PSX_UPDATE:
--- a/src/fdl/NullMetaEngine.java Fri Aug 22 14:49:52 2008 +0900 +++ b/src/fdl/NullMetaEngine.java Sun Aug 24 03:23:08 2008 +0900 @@ -1,5 +1,7 @@ package fdl; +import java.nio.ByteBuffer; + public class NullMetaEngine implements MetaEngine { public MetaLinda meta; public boolean running=true; @@ -9,7 +11,9 @@ } public void mainLoop() { + meta.in(PSX.META_STOP, new PSXCallback() {public void callback(ByteBuffer reply) { + running = false;}}); while(running) - meta.fds.checkTuple(); + meta.sync(); } }
--- a/src/fdl/PSX.java Fri Aug 22 14:49:52 2008 +0900 +++ b/src/fdl/PSX.java Sun Aug 24 03:23:08 2008 +0900 @@ -65,12 +65,15 @@ static void printCommand(String comment, ByteBuffer command, ByteBuffer data) { char id = (char)command.getShort(LINDA_ID_OFFSET); - System.out.println(comment+" LENGTH:"+command.getInt(LINDA_PACKET_LENGTH_OFFSET)+" "+ + System.err.println(comment+" LENGTH:"+command.getInt(LINDA_PACKET_LENGTH_OFFSET)+" "+ "MODE:"+(char)command.get(LINDA_MODE_OFFSET)+" "+ "ID:"+(int)id+" "+ "SEQ:"+command.getInt(LINDA_SEQ_OFFSET)+" "+ "DATA LENGTH:"+command.getInt(LINDA_DATA_LENGTH_OFFSET)+" "); - System.out.println("DATA:"+data); + if(data!=null) { + System.err.println("DATA:"+data); + data.rewind(); + } command.rewind(); } @@ -85,7 +88,8 @@ } - static ByteBuffer setCommand(int _mode, int _id, int _seq, int _datalen) { + static ByteBuffer setCommand(int _mode, int _id, int _seq, ByteBuffer data) { + int _datalen = data==null?0:data.remaining(); ByteBuffer command = ByteBuffer.allocate(LINDA_HEADER_SIZE); command.order(ByteOrder.BIG_ENDIAN); @@ -116,7 +120,6 @@ public static String getdataString(ByteBuffer data) { String sendtext; - data.rewind(); //Decode UTF-8 to System Encoding(UTF-16) Charset charset = Charset.forName("UTF-8"); CharsetDecoder decoder = charset.newDecoder(); @@ -124,9 +127,9 @@ try { cb = decoder.decode(data); } catch (CharacterCodingException e) { - e.printStackTrace(); } cb.rewind(); + data.rewind(); sendtext = cb.toString(); return sendtext; @@ -134,28 +137,36 @@ public static void send(SocketChannel ch, ByteBuffer command, ByteBuffer data) { int send_size = LINDA_HEADER_SIZE; - int count = 0; - - if (command.position()!=0||command.limit()!=PSX.LINDA_HEADER_SIZE) - System.err.println("command length erron send"); + // if datalen in the header is different from ByteBuffer remaining(), we lost + // protocol synchronization. Make sure to have correct length now. + if (true && data!=null) { + int datalen = data.limit()-data.position(); + command.putInt(LINDA_DATA_LENGTH_OFFSET,datalen); + command.putInt(LINDA_PACKET_LENGTH_OFFSET,datalen+LINDA_HEADER_SIZE-INT_SIZE); + command.rewind(); + } try { //command Send while(send_size > 0){ - count = ch.write(command); - if(count < 0) throw new IOException(); + int count = ch.write(command); + if(count <= 0) throw new IOException(); send_size -= count; } if (data==null) return; //data Send while(data.remaining() > 0){ - count = ch.write(data); + int count = ch.write(data); + System.err.println("Data out length = "+count); if(count < 0) throw new IOException(); } } catch (IOException e) { System.out.println("Write Falied on:"+ch); return; } + // command or data may be shared among PSX queue + command.rewind(); + data.rewind(); } public static void send(SelectionKey key, ByteBuffer command, ByteBuffer data) { @@ -203,6 +214,9 @@ static void receive(SocketChannel channel, ByteBuffer command, int readsize) throws IOException { int count; + if (command.capacity()!=readsize) { + System.err.println("read size mismatch"+readsize+" and "+command.capacity()); + } while(readsize>0) { if(IOHandler.debug){ System.out.println("reading packet..."+readsize); @@ -214,6 +228,22 @@ } command.rewind(); } + + static ByteBuffer receivePacket(SocketChannel channel, ByteBuffer command) + throws IOException { + /** + * Receive a command and data according to the command. + */ + receive(channel, command,LINDA_HEADER_SIZE); + int datalen = command.getInt(LINDA_DATA_LENGTH_OFFSET); + int packetlen = command.getInt(LINDA_PACKET_LENGTH_OFFSET); + command.rewind(); + if (datalen+LINDA_HEADER_SIZE!=packetlen) return null; + ByteBuffer data = ByteBuffer.allocate(datalen); + data.order(ByteOrder.BIG_ENDIAN); + receive(channel, data, datalen); + return data; + } }
--- a/src/fdl/PSXQueue.java Fri Aug 22 14:49:52 2008 +0900 +++ b/src/fdl/PSXQueue.java Sun Aug 24 03:23:08 2008 +0900 @@ -46,7 +46,7 @@ } private void setCommand() { - command = PSX.setCommand(mode,id,seq,size); + command = PSX.setCommand(mode,id,seq,data); } public void setSeq(int _seq) {
--- a/src/fdl/Tuple.java Fri Aug 22 14:49:52 2008 +0900 +++ b/src/fdl/Tuple.java Sun Aug 24 03:23:08 2008 +0900 @@ -24,11 +24,11 @@ } public void setCommand(int _mode, int _seq) { - setCommand( _mode, id, _seq,data.limit()); + setCommand( _mode, id, _seq,data); } - public void setCommand(int _mode, int _id, int _seq, int _datalen) { - command = PSX.setCommand(_mode, _id, _seq, _datalen); + public void setCommand(int _mode, int _id, int _seq, ByteBuffer data) { + command = PSX.setCommand(_mode, _id, _seq, data); } public void setTuple(int _mode,int _id, int _seq, ByteBuffer _data) { @@ -59,7 +59,7 @@ } public int getdataLength() { - return data.limit(); + return data==null?0:data.remaining(); } public ByteBuffer getData() {
--- a/src/fdl/TupleSpace.java Fri Aug 22 14:49:52 2008 +0900 +++ b/src/fdl/TupleSpace.java Sun Aug 24 03:23:08 2008 +0900 @@ -77,8 +77,7 @@ PSX.setAnserCommand(command, tuple_space[id].getSeq()); if(debug){ - int sendsize = datasize+PSX.LINDA_HEADER_SIZE; - System.out.println("send size "+sendsize+" : mode = "+(char)'a'); + System.out.println("send size "+datasize+" : mode = "+(char)'a'); } PSX.send(tuple_space[id].ch, command, data); removeTuple(id);
--- a/src/fdl/test/TestMetaLinda.java Fri Aug 22 14:49:52 2008 +0900 +++ b/src/fdl/test/TestMetaLinda.java Sun Aug 24 03:23:08 2008 +0900 @@ -57,23 +57,25 @@ psx = fdl.open(host,port); System.out.println("Connected."); - ByteBuffer data = ByteBuffer.allocate(10); r = psx.in(1); for(int i=0;i<10;i++) { - data.clear(); + ByteBuffer data = ByteBuffer.allocate(10); data.putInt(i); data.flip(); psx.out(1,data); + } + for(int i=0;i<100;i++) { if (r.ready()) { System.err.println("Get:"+r.data.getInt()); r = psx.in(1); } - psx.sync(100); + // System.out.println("syncing..."+i); + psx.sync(10); } - data.clear(); - psx.out(PSX.META_STOP, data); + System.out.println("Try to stop the server"); + psx.out(PSX.META_STOP, null); psx.sync(); } catch (IOException e) {
--- a/src/fdl/test/TestMonitor.java Fri Aug 22 14:49:52 2008 +0900 +++ b/src/fdl/test/TestMonitor.java Sun Aug 24 03:23:08 2008 +0900 @@ -69,9 +69,13 @@ Server s = new Server(); Client c = new Client(); Monitor m = new Monitor(); - new Thread(s).start(); - new Thread(m).start(); - new Thread(c).start(); + Thread ts = new Thread(s); ts.start(); + Thread tm = new Thread(m); tm.start(); + Thread tc = new Thread(c); tc.start(); + try { + ts.join(); tm.join(); tc.join(); + } catch (InterruptedException e) { + } }