Mercurial > hg > FederatedLinda
diff src/fdl/TupleSpace.java @ 17:609b288f47f9
*** empty log message ***
author | kono |
---|---|
date | Mon, 18 Aug 2008 07:28:29 +0900 |
parents | cccf34386cad |
children | 0243987383b7 |
line wrap: on
line diff
--- a/src/fdl/TupleSpace.java Mon Aug 18 06:17:54 2008 +0900 +++ b/src/fdl/TupleSpace.java Mon Aug 18 07:28:29 2008 +0900 @@ -9,7 +9,7 @@ import java.nio.charset.Charset; import java.nio.charset.CharsetDecoder; -public class TupleSpace implements PSXQueueInterface{ +public class TupleSpace { static final boolean debug = true; static final int CAPSIZE = 4194304; public Tuple[] tuple_space; @@ -25,27 +25,24 @@ } protected String Out(ByteBuffer command, ByteBuffer data) throws IOException { - Tuple tmpTuple; + Tuple tuple; int id; int datasize; - char idc = (char)command.getShort(LINDA_ID_OFFSET); + char idc = (char)command.getShort(PSX.LINDA_ID_OFFSET); command.rewind(); id = (int)idc; String sendtext = "none"; - datasize = command.getInt(LINDA_DATA_LENGTH_OFFSET); + datasize = command.getInt(PSX.LINDA_DATA_LENGTH_OFFSET); command.rewind(); System.out.println("*** out command : id = "+id +" ***"); while((tuple_space[id] != null) && - ((tuple_space[id].mode == PSX_WAIT_RD)||(tuple_space[id].mode == PSX_RD))) { - command.put(LINDA_MODE_OFFSET, (byte)'a'); - command.rewind(); - command.putInt(LINDA_SEQ_OFFSET, tuple_space[id].getSeq()); - command.rewind(); + ((tuple_space[id].mode == PSX.PSX_WAIT_RD)||(tuple_space[id].mode == PSX.PSX_RD))) { + PSX.setAnserCommand(command, tuple_space[id].getSeq()); //if(debug){ - //int sendsize = tmpTuple.getdataLength()+LINDA_HEADER_SIZE; + //int sendsize = tmpTuple.getdataLength()+PSX.LINDA_HEADER_SIZE; //System.out.println("send size "+sendsize+" : mode = "+(char)mode); //} //ByteBuffer sendcommand = tmpTuple.getCommand(); @@ -55,19 +52,14 @@ sendtext = getdataString(data); - //後処理 - tmpTuple = tuple_space[id]; - tuple_space[id] = tmpTuple.next; - tmpTuple = null; + removeTuple(id); + tuple = null; } - if(tuple_space[id] != null && tuple_space[id].mode == PSX_IN) { - command.put(LINDA_MODE_OFFSET, (byte)'a'); - command.rewind(); - command.putInt(LINDA_SEQ_OFFSET, tuple_space[id].getSeq()); - command.rewind(); + if(tuple_space[id] != null && tuple_space[id].mode == PSX.PSX_IN) { + PSX.setAnserCommand(command, tuple_space[id].getSeq()); if(debug){ - int sendsize = datasize+LINDA_HEADER_SIZE; + int sendsize = datasize+PSX.LINDA_HEADER_SIZE; System.out.println("send size "+sendsize+" : mode = "+(char)'a'); } //ByteBuffer sendcommand = tmpTuple.getCommand(); @@ -76,30 +68,28 @@ sendtext = getdataString(data); - //後処理 - tmpTuple = tuple_space[id]; - tuple_space[id] = tmpTuple.next; - tmpTuple = null; - } else if ((tuple_space[id] == null)|| (tuple_space[id].getMode() == PSX_OUT)) { - if((tmpTuple = tuple_space[id]) == null) { - tmpTuple = tuple_space[id] = new Tuple(); - tmpTuple.next = null; + removeTuple(id); + tuple = null; + } else if ((tuple_space[id] == null)|| (tuple_space[id].getMode() == PSX.PSX_OUT)) { + if((tuple = tuple_space[id]) == null) { + tuple = tuple_space[id] = new Tuple(); + tuple.next = null; } else { - while(tmpTuple.next != null) tmpTuple = tmpTuple.next; - tmpTuple.next = new Tuple(); - tmpTuple = tmpTuple.next; - tmpTuple.next = null; + while(tuple.next != null) tuple = tuple.next; + tuple.next = new Tuple(); + tuple = tuple.next; + tuple.next = null; } - tmpTuple.setMode('o'); - int seq = command.getInt(LINDA_SEQ_OFFSET); + tuple.setMode('o'); + int seq = command.getInt(PSX.LINDA_SEQ_OFFSET); command.rewind(); - tmpTuple.setSeq(seq); - tmpTuple.setData(data); - tmpTuple.setDataLength(datasize); + tuple.setSeq(seq); + tuple.setData(data); + tuple.setDataLength(datasize); if(debug){ - System.out.println("data inserted len = "+tmpTuple.getdataLength()+" : id = "+id); + System.out.println("data inserted len = "+tuple.getdataLength()+" : id = "+id); } } else { @@ -111,28 +101,35 @@ return sendtext; } + private void removeTuple(int id) { + Tuple tuple; + //後処理 + tuple = tuple_space[id]; + tuple_space[id] = tuple.next; + } + protected void Wait_Rd(SelectionKey key, ByteBuffer command, int mode) { - Tuple tmpTuple; + Tuple tuple; int id; - char idc = (char)command.getShort(LINDA_ID_OFFSET); + char idc = (char)command.getShort(PSX.LINDA_ID_OFFSET); command.rewind(); id = (int)idc; if (debug) System.out.println("*** "+(char)mode+" command : id = "+ id +" ***\n"); - tmpTuple = new Tuple(); - tmpTuple.setMode(mode); - int seq = command.getInt(LINDA_SEQ_OFFSET); + tuple = new Tuple(); + tuple.setMode(mode); + int seq = command.getInt(PSX.LINDA_SEQ_OFFSET); command.rewind(); - tmpTuple.setSeq(seq); - tmpTuple.ch = (SocketChannel) key.channel(); - tmpTuple.setDataLength(0); + tuple.setSeq(seq); + tuple.ch = (SocketChannel) key.channel(); + tuple.setDataLength(0); ByteBuffer buff = ByteBuffer.allocate(0); - tmpTuple.setData(buff); - tmpTuple.next = tuple_space[id]; - tuple_space[id] = tmpTuple; + tuple.setData(buff); + tuple.next = tuple_space[id]; + tuple_space[id] = tuple; if(debug){ System.out.println("data inserted insert seq = "+seq +", id = "+id); } @@ -140,95 +137,95 @@ protected String In_Rd(SelectionKey key, ByteBuffer command, int mode) throws IOException { - Tuple tmpTuple = read_in_1(key, command, mode); + Tuple tuple = read_in_1(key, command, mode); - if (tmpTuple!=null) { + if (tuple!=null) { //send - ByteBuffer sendcommand = tmpTuple.getCommand(); - ByteBuffer senddata = tmpTuple.getData(); + ByteBuffer sendcommand = tuple.getCommand(); + ByteBuffer senddata = tuple.getData(); send(key,sendcommand, senddata); } - String sendtext = getdataString(tmpTuple.getData()); + String sendtext = getdataString(tuple.getData()); return sendtext; } private Tuple read_in_1(SelectionKey key, ByteBuffer command, int mode) { - Tuple tmpTuple; + Tuple tuple; int id; - //id = command.getInt(LINDA_ID_OFFSET); - //int mode = command.getInt(LINDA_MODE_OFFSET); + //id = command.getInt(PSX.LINDA_ID_OFFSET); + //int mode = command.getInt(PSX.LINDA_MODE_OFFSET); Tuple temp = null; - char idc = (char)command.getShort(LINDA_ID_OFFSET); + char idc = (char)command.getShort(PSX.LINDA_ID_OFFSET); command.rewind(); id = (int)idc; System.out.println("*** "+(char)mode+" command : id = "+ id +" ***\n"); - tmpTuple = tuple_space[id]; + tuple = tuple_space[id]; //wを無視 - while(tmpTuple != null && tmpTuple.next != null && (tmpTuple.mode == 'w')){ - temp = tmpTuple; - tmpTuple = tmpTuple.next; + while(tuple != null && tuple.next != null && (tuple.mode == 'w')){ + temp = tuple; + tuple = tuple.next; } - if (tmpTuple != null && (tmpTuple.mode == 'o')){ + if (tuple != null && (tuple.mode == 'o')){ //tmpTuple = new Tuple((SocketChannel)key.channel()); - int seq = command.getInt(LINDA_SEQ_OFFSET); + int seq = command.getInt(PSX.LINDA_SEQ_OFFSET); command.rewind(); - tmpTuple.setCommand('a', seq); + tuple.setCommand('a', seq); if(debug){ - int sendsize = tmpTuple.getdataLength()+LINDA_HEADER_SIZE; - System.out.println("send size "+sendsize+" : mode = "+(char)tmpTuple.getMode()); + int sendsize = tuple.getdataLength()+PSX.LINDA_HEADER_SIZE; + System.out.println("send size "+sendsize+" : mode = "+(char)tuple.getMode()); } //INの場合はremoveする - if(mode == PSX_IN) { - if(tmpTuple.data != null){ + if(mode == PSX.PSX_IN) { + if(tuple.data != null){ //ByteBuffer buff = ByteBuffer.allocate(0); //tmpTuple.setData(buff); - tmpTuple.data = null; + tuple.data = null; } if(temp != null){ - temp.next = tmpTuple.next; + temp.next = tuple.next; } else { - tuple_space[id] = tmpTuple.next; + tuple_space[id] = tuple.next; } } } else { - if(tmpTuple == null) { + if(tuple == null) { //ServerSocketChannel sc = (ServerSocketChannel)key.channel(); - tmpTuple = tuple_space[id] = new Tuple((SocketChannel)key.channel()); - tmpTuple.next = null; + tuple = tuple_space[id] = new Tuple((SocketChannel)key.channel()); + tuple.next = null; }else { - while(tmpTuple.next !=null) tmpTuple =tmpTuple.next; - tmpTuple.next= new Tuple((SocketChannel)key.channel()); - tmpTuple = tmpTuple.next; - tmpTuple.next = null; + while(tuple.next !=null) tuple =tuple.next; + tuple.next= new Tuple((SocketChannel)key.channel()); + tuple = tuple.next; + tuple.next = null; } - tmpTuple.setMode(mode); - int seq2 = command.getInt(LINDA_SEQ_OFFSET); + tuple.setMode(mode); + int seq2 = command.getInt(PSX.LINDA_SEQ_OFFSET); command.rewind(); - tmpTuple.setSeq(seq2); - tmpTuple.ch = (SocketChannel) key.channel(); - tmpTuple.setDataLength(0); + tuple.setSeq(seq2); + tuple.ch = (SocketChannel) key.channel(); + tuple.setDataLength(0); ByteBuffer buff = ByteBuffer.allocate(0); buff.rewind(); - tmpTuple.setData(buff); - tmpTuple = null; + tuple.setData(buff); + tuple = null; if(debug){ System.out.println("data inserted insert seq = "+seq2 +", id = "+id); } } - return tmpTuple; + return tuple; } protected String Check(SelectionKey key, ByteBuffer command) throws IOException { @@ -245,7 +242,7 @@ ByteBuffer data; Tuple tmpTuple; int id; - char idc = (char)command.getShort(LINDA_ID_OFFSET); + char idc = (char)command.getShort(PSX.LINDA_ID_OFFSET); command.rewind(); id = (int)idc; @@ -254,12 +251,12 @@ tmpTuple = tmpTuple.next; } if (tmpTuple != null && (tmpTuple.mode == 'o')) { - command.putInt(LINDA_DATA_LENGTH_OFFSET, tmpTuple.datalen); + command.putInt(PSX.LINDA_DATA_LENGTH_OFFSET, tmpTuple.datalen); command.rewind(); data = tmpTuple.getData(); }else { //means no out tuple - command.putInt(LINDA_DATA_LENGTH_OFFSET, 0); + command.putInt(PSX.LINDA_DATA_LENGTH_OFFSET, 0); command.rewind(); data = ByteBuffer.allocate(0); } @@ -276,7 +273,7 @@ System.out.println("Manager_run: data is null"); } } - int send_size = LINDA_HEADER_SIZE; + int send_size = PSX.LINDA_HEADER_SIZE; int count = 0; //command Send