Mercurial > hg > FederatedLinda
changeset 34:e7c5958fd285
*** empty log message ***
author | kono |
---|---|
date | Sun, 24 Aug 2008 17:36:14 +0900 |
parents | 64071f8e2e0d |
children | fe338d497c72 |
files | src/fdl/CommDebugHook.java src/fdl/FederatedLinda.java src/fdl/PSX.java src/fdl/TupleSpace.java src/fdl/test/TestMetaLinda.java |
diffstat | 5 files changed, 36 insertions(+), 36 deletions(-) [+] |
line wrap: on
line diff
--- a/src/fdl/CommDebugHook.java Sun Aug 24 03:23:08 2008 +0900 +++ b/src/fdl/CommDebugHook.java Sun Aug 24 17:36:14 2008 +0900 @@ -14,24 +14,27 @@ public void checkHook(SelectionKey key, int id, int seq, char mode) { - if (key!=null) logs.add(log(key, id, seq, mode, null)); + if (key==null || id<PSX.PRIVILEGED_ID_START) return; + logs.add(log(key, id, seq, mode, null)); } public void inHook(SelectionKey key, int id, int seq, char mode) { - if (key!=null) logs.add(log(key, id, seq, mode, null)); + if (key==null || id<PSX.PRIVILEGED_ID_START) return; + logs.add(log(key, id, seq, mode, null)); } public void outHook(SelectionKey key, int id, int seq, char mode, ByteBuffer data) { - if (key==null) return; + if (key==null || id<PSX.PRIVILEGED_ID_START) return; String sendtext = PSX.getdataString(data); logs.add(log(key, id, seq, mode, sendtext)); } public void waitReadHook(SelectionKey key, int id, int seq, char mode) { - if (key!=null) logs.add(log(key, id, seq, mode, null)); + if (key==null || id<PSX.PRIVILEGED_ID_START) return; + logs.add(log(key, id, seq, mode, null)); } public String log(SelectionKey key,int id, int seq,char mode, String sendtext) {
--- a/src/fdl/FederatedLinda.java Sun Aug 24 03:23:08 2008 +0900 +++ b/src/fdl/FederatedLinda.java Sun Aug 24 17:36:14 2008 +0900 @@ -141,10 +141,11 @@ SelectionKey s = it.next(); it.remove(); try { - if (!s.isReadable()) throw new IOException(); + if (!s.isReadable()) + throw new IOException(); chkServe((SocketChannel)s.channel()); } catch (IOException e) { - selector.selectedKeys().remove(s); + s.cancel(); System.err.println(""+s.channel()+" is closed."); } }
--- a/src/fdl/PSX.java Sun Aug 24 03:23:08 2008 +0900 +++ b/src/fdl/PSX.java Sun Aug 24 17:36:14 2008 +0900 @@ -136,28 +136,29 @@ } public static void send(SocketChannel ch, ByteBuffer command, ByteBuffer data) { - int send_size = LINDA_HEADER_SIZE; // if datalen in the header is different from ByteBuffer remaining(), we lost // protocol synchronization. Make sure to have correct length now. - if (true && data!=null) { - int datalen = data.limit()-data.position(); + if (true) { + int datalen = data==null?0:(data.limit()-data.position()); + if (command.getInt(LINDA_DATA_LENGTH_OFFSET)>0 && (data==null||data.remaining()!=command.getInt(LINDA_DATA_LENGTH_OFFSET))) { + System.err.println("Missing data."); + } command.putInt(LINDA_DATA_LENGTH_OFFSET,datalen); command.putInt(LINDA_PACKET_LENGTH_OFFSET,datalen+LINDA_HEADER_SIZE-INT_SIZE); command.rewind(); } try { //command Send - while(send_size > 0){ + while(command.remaining() > 0){ int count = ch.write(command); if(count <= 0) throw new IOException(); - send_size -= count; } + command.rewind(); if (data==null) return; //data Send while(data.remaining() > 0){ int count = ch.write(data); - System.err.println("Data out length = "+count); if(count < 0) throw new IOException(); } } catch (IOException e) { @@ -165,7 +166,6 @@ return; } // command or data may be shared among PSX queue - command.rewind(); data.rewind(); } @@ -218,12 +218,12 @@ System.err.println("read size mismatch"+readsize+" and "+command.capacity()); } while(readsize>0) { - if(IOHandler.debug){ + if(false && IOHandler.debug){ System.out.println("reading packet..."+readsize); } count = channel.read(command); - if(count==0) throw new IOException(); - if(count < 0) throw new IOException(); + if(count<0) + throw new IOException(); readsize -= count; } command.rewind(); @@ -238,7 +238,10 @@ int datalen = command.getInt(LINDA_DATA_LENGTH_OFFSET); int packetlen = command.getInt(LINDA_PACKET_LENGTH_OFFSET); command.rewind(); - if (datalen+LINDA_HEADER_SIZE!=packetlen) return null; + if (datalen+LINDA_HEADER_SIZE-INT_SIZE!=packetlen) { + System.err.println("Bad packet received. "+(datalen+LINDA_HEADER_SIZE-INT_SIZE)+"!="+packetlen); + throw new IOException(); + } ByteBuffer data = ByteBuffer.allocate(datalen); data.order(ByteOrder.BIG_ENDIAN); receive(channel, data, datalen);
--- a/src/fdl/TupleSpace.java Sun Aug 24 03:23:08 2008 +0900 +++ b/src/fdl/TupleSpace.java Sun Aug 24 17:36:14 2008 +0900 @@ -20,16 +20,16 @@ public void newUser() { - Tuple tmpTuple; + Tuple tuple; //初期生成 - if((tmpTuple = tuple_space[TupleSpace.MAX_TUPLE_ID-1]) == null) { - tmpTuple = tuple_space[TupleSpace.MAX_TUPLE_ID-1] = new Tuple(); - tmpTuple.next = null; + if((tuple = tuple_space[TupleSpace.MAX_TUPLE_ID-1]) == null) { + tuple = tuple_space[TupleSpace.MAX_TUPLE_ID-1] = new Tuple(); + tuple.next = null; } else { - while(tmpTuple.next != null) tmpTuple = tmpTuple.next; - tmpTuple.next = new Tuple(); - tmpTuple = tmpTuple.next; - tmpTuple.next = null; + while(tuple.next != null) tuple = tuple.next; + tuple.next = new Tuple(); + tuple = tuple.next; + tuple.next = null; } user++; @@ -41,11 +41,9 @@ data.put(userchar[1]); data.flip(); - tmpTuple.setData(data); - //Tuple int id = TupleSpace.MAX_TUPLE_ID-1; int seq = 0; - tmpTuple.setTuple('o', id, seq, data); + tuple.setTuple('o', id, seq, data); System.out.println("Server: assign id "+user); } @@ -82,6 +80,7 @@ PSX.send(tuple_space[id].ch, command, data); removeTuple(id); tuple = null; + // Incoming Out tuple is consumed here, and wating IN tuple is also removed. } else if ((tuple_space[id] == null)|| (tuple_space[id].getMode() == PSX.PSX_OUT)) { if((tuple = tuple_space[id]) == null) { tuple = tuple_space[id] = new Tuple(); @@ -100,8 +99,7 @@ if(debug){ System.out.println("data inserted len = "+tuple.getdataLength()+" : id = "+id); } - } - else { + } else { System.out.println("Incorrect mode :"+(char)tuple_space[id].getMode()); command.clear(); data.clear(); @@ -152,9 +150,6 @@ if (tuple!=null) { //send ByteBuffer sendcommand = tuple.getCommand(); - if (tuple.getSeq()==0) { - System.err.println("Illegal sequence in answer."); - } ByteBuffer senddata = tuple.getData(); PSX.send(key,sendcommand, senddata); } @@ -168,7 +163,6 @@ Tuple temp = null; char idc = (char)command.getShort(PSX.LINDA_ID_OFFSET); - command.rewind(); id = (int)idc; int seq = command.getInt(PSX.LINDA_SEQ_OFFSET); command.rewind(); @@ -248,7 +242,6 @@ } tuple.setMode(mode); - command.rewind(); tuple.setSeq(seq); tuple.ch = (SocketChannel) key.channel(); ByteBuffer buff = ByteBuffer.allocate(0);