Mercurial > hg > FederatedLinda
diff src/fdl/TupleSpace.java @ 19:0243987383b7
Meta Protocol Engine and sample implementation of event logger.
ComDebug_Client needs fixes.
author | kono |
---|---|
date | Tue, 19 Aug 2008 05:33:32 +0900 |
parents | 609b288f47f9 |
children | b4fd7fb9135a |
line wrap: on
line diff
--- a/src/fdl/TupleSpace.java Mon Aug 18 09:36:13 2008 +0900 +++ b/src/fdl/TupleSpace.java Tue Aug 19 05:33:32 2008 +0900 @@ -2,56 +2,75 @@ import java.io.IOException; import java.nio.ByteBuffer; -import java.nio.CharBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.SocketChannel; -import java.nio.charset.CharacterCodingException; -import java.nio.charset.Charset; -import java.nio.charset.CharsetDecoder; public class TupleSpace { static final boolean debug = true; static final int CAPSIZE = 4194304; + public static int user = 0; + public byte userchar[] = new byte[2]; public Tuple[] tuple_space; + public IOHandlerHook hook = new NullIOHandlerHook(); - public TupleSpace(Tuple[] _tuple_space) { - super(); + public TupleSpace() { // 読みこんだデータを格納するためのリストの初期化 - tuple_space = _tuple_space; + tuple_space = new Tuple[TupleHandler.MAX_TUPLE]; } - public TupleSpace() { - // TODO Auto-generated constructor stub + + public void newUser() { + Tuple tmpTuple; + //初期生成 + if((tmpTuple = tuple_space[TupleHandler.MAX_TUPLE-1]) == null) { + tmpTuple = tuple_space[TupleHandler.MAX_TUPLE-1] = new Tuple(); + tmpTuple.next = null; + } else { + while(tmpTuple.next != null) tmpTuple = tmpTuple.next; + tmpTuple.next = new Tuple(); + tmpTuple = tmpTuple.next; + tmpTuple.next = null; + } + + user++; + + ByteBuffer data = ByteBuffer.allocate(2); + data.clear(); + userchar[0] = (byte) (user/10 + '0'); + userchar[1] = (byte) (user%10 + '0'); + + data.put(userchar[0]); + data.put(userchar[1]); + + data.rewind(); + tmpTuple.setData(data); + //Tuple + int id = TupleHandler.MAX_TUPLE-1; + tmpTuple.setTuple('o', id, 0, data.limit(), data); + System.out.println("Server: assign id "+user); } - - protected String Out(ByteBuffer command, ByteBuffer data) throws IOException { + + protected void Out(SelectionKey key,ByteBuffer command, ByteBuffer data) { Tuple tuple; int id; int datasize; char idc = (char)command.getShort(PSX.LINDA_ID_OFFSET); command.rewind(); id = (int)idc; - String sendtext = "none"; - + datasize = command.getInt(PSX.LINDA_DATA_LENGTH_OFFSET); command.rewind(); System.out.println("*** out command : id = "+id +" ***"); + int seq = command.getInt(PSX.LINDA_SEQ_OFFSET); + command.rewind(); + hook.outHook(key,id,seq,'o',data); while((tuple_space[id] != null) && ((tuple_space[id].mode == PSX.PSX_WAIT_RD)||(tuple_space[id].mode == PSX.PSX_RD))) { PSX.setAnserCommand(command, tuple_space[id].getSeq()); - //if(debug){ - //int sendsize = tmpTuple.getdataLength()+PSX.LINDA_HEADER_SIZE; - //System.out.println("send size "+sendsize+" : mode = "+(char)mode); - //} - //ByteBuffer sendcommand = tmpTuple.getCommand(); - //ByteBuffer senddata = tmpTuple.getData(); send(tuple_space[id].ch, command, data); - sendtext = getdataString(data); - - removeTuple(id); tuple = null; } @@ -62,12 +81,7 @@ int sendsize = datasize+PSX.LINDA_HEADER_SIZE; System.out.println("send size "+sendsize+" : mode = "+(char)'a'); } - //ByteBuffer sendcommand = tmpTuple.getCommand(); - //ByteBuffer senddata = tmpTuple.getData(); send(tuple_space[id].ch, command, data); - - sendtext = getdataString(data); - removeTuple(id); tuple = null; } else if ((tuple_space[id] == null)|| (tuple_space[id].getMode() == PSX.PSX_OUT)) { @@ -83,8 +97,6 @@ } tuple.setMode('o'); - int seq = command.getInt(PSX.LINDA_SEQ_OFFSET); - command.rewind(); tuple.setSeq(seq); tuple.setData(data); tuple.setDataLength(datasize); @@ -98,7 +110,6 @@ data.clear(); System.exit(1); } - return sendtext; } private void removeTuple(int id) { @@ -124,6 +135,9 @@ int seq = command.getInt(PSX.LINDA_SEQ_OFFSET); command.rewind(); tuple.setSeq(seq); + + hook.waitReadHook(key,id,seq,(char)mode); + tuple.ch = (SocketChannel) key.channel(); tuple.setDataLength(0); ByteBuffer buff = ByteBuffer.allocate(0); @@ -135,7 +149,7 @@ } } - protected String In_Rd(SelectionKey key, ByteBuffer command, int mode) + protected void In_Rd(SelectionKey key, ByteBuffer command, int mode) throws IOException { Tuple tuple = read_in_1(key, command, mode); @@ -145,8 +159,6 @@ ByteBuffer senddata = tuple.getData(); send(key,sendcommand, senddata); } - String sendtext = getdataString(tuple.getData()); - return sendtext; } private Tuple read_in_1(SelectionKey key, ByteBuffer command, int mode) { @@ -159,10 +171,12 @@ char idc = (char)command.getShort(PSX.LINDA_ID_OFFSET); command.rewind(); id = (int)idc; - + int seq = command.getInt(PSX.LINDA_SEQ_OFFSET); + command.rewind(); System.out.println("*** "+(char)mode+" command : id = "+ id +" ***\n"); - + hook.inHook(key,id,seq,(char)mode); + tuple = tuple_space[id]; //wを無視 @@ -172,79 +186,106 @@ } if (tuple != null && (tuple.mode == 'o')){ - //tmpTuple = new Tuple((SocketChannel)key.channel()); - int seq = command.getInt(PSX.LINDA_SEQ_OFFSET); - command.rewind(); - tuple.setCommand('a', seq); - - if(debug){ - int sendsize = tuple.getdataLength()+PSX.LINDA_HEADER_SIZE; - System.out.println("send size "+sendsize+" : mode = "+(char)tuple.getMode()); - } - - - - //INの場合はremoveする - if(mode == PSX.PSX_IN) { - if(tuple.data != null){ - //ByteBuffer buff = ByteBuffer.allocate(0); - //tmpTuple.setData(buff); - tuple.data = null; - } - if(temp != null){ - temp.next = tuple.next; - } - else { - tuple_space[id] = tuple.next; - } - } + tupleIsAvailable(command, mode, tuple, id, temp); } else { - if(tuple == null) { - //ServerSocketChannel sc = (ServerSocketChannel)key.channel(); - tuple = tuple_space[id] = new Tuple((SocketChannel)key.channel()); - tuple.next = null; - }else { - while(tuple.next !=null) tuple =tuple.next; - tuple.next= new Tuple((SocketChannel)key.channel()); - tuple = tuple.next; - tuple.next = null; - } - - tuple.setMode(mode); - int seq2 = command.getInt(PSX.LINDA_SEQ_OFFSET); - command.rewind(); - tuple.setSeq(seq2); - tuple.ch = (SocketChannel) key.channel(); - tuple.setDataLength(0); - ByteBuffer buff = ByteBuffer.allocate(0); - buff.rewind(); - tuple.setData(buff); - tuple = null; - - if(debug){ - System.out.println("data inserted insert seq = "+seq2 +", id = "+id); - } + tuple = setupWait(key, command, mode, tuple, id); } return tuple; } - protected String Check(SelectionKey key, ByteBuffer command) throws IOException { - String sendtext; - ByteBuffer data = check1(command); - send(key, command, data); + public ByteBuffer IN(int id,int mode, ByteBuffer command) { + Tuple tuple,temp=null; + tuple = tuple_space[id]; + + //wを無視 + while(tuple != null && tuple.next != null && (tuple.mode == 'w')){ + temp = tuple; + tuple = tuple.next; + } + + if (tuple != null && (tuple.mode == 'o')){ + ByteBuffer data = tuple.data; + tupleIsAvailable(command, mode, tuple, id, temp); + return data; + } + return null; + } + + private void tupleIsAvailable(ByteBuffer command, int mode, Tuple tuple, + int id, Tuple temp) { + //tmpTuple = new Tuple((SocketChannel)key.channel()); + int seq = command.getInt(PSX.LINDA_SEQ_OFFSET); + command.rewind(); + tuple.setCommand('a', seq); + + if(debug){ + int sendsize = tuple.getdataLength()+PSX.LINDA_HEADER_SIZE; + System.out.println("send size "+sendsize+" : mode = "+(char)tuple.getMode()); + } + + - sendtext = getdataString(data); - - return sendtext; + //INの場合はremoveする + if(mode == PSX.PSX_IN) { + if(tuple.data != null){ + //ByteBuffer buff = ByteBuffer.allocate(0); + //tmpTuple.setData(buff); + tuple.data = null; + } + if(temp != null){ + temp.next = tuple.next; + } + else { + tuple_space[id] = tuple.next; + } + } } - private ByteBuffer check1(ByteBuffer command) { + private Tuple setupWait(SelectionKey key, ByteBuffer command, int mode, + Tuple tuple, int id) { + if(tuple == null) { + //ServerSocketChannel sc = (ServerSocketChannel)key.channel(); + tuple = tuple_space[id] = new Tuple((SocketChannel)key.channel()); + tuple.next = null; + }else { + while(tuple.next !=null) tuple =tuple.next; + tuple.next= new Tuple((SocketChannel)key.channel()); + tuple = tuple.next; + tuple.next = null; + } + + tuple.setMode(mode); + int seq2 = command.getInt(PSX.LINDA_SEQ_OFFSET); + command.rewind(); + tuple.setSeq(seq2); + tuple.ch = (SocketChannel) key.channel(); + tuple.setDataLength(0); + ByteBuffer buff = ByteBuffer.allocate(0); + buff.rewind(); + tuple.setData(buff); + tuple = null; + + if(debug){ + System.out.println("data inserted insert seq = "+seq2 +", id = "+id); + } + return tuple; + } + + protected void Check(SelectionKey key, ByteBuffer command) throws IOException { + ByteBuffer data = check1(key,command); + send(key, command, data); + } + + public ByteBuffer check1(SelectionKey key,ByteBuffer command) { ByteBuffer data; Tuple tmpTuple; int id; char idc = (char)command.getShort(PSX.LINDA_ID_OFFSET); command.rewind(); id = (int)idc; + int seq = command.getInt(PSX.LINDA_SEQ_OFFSET); + command.rewind(); + hook.checkHook(key,id,seq,'c'); tmpTuple = tuple_space[id]; while(tmpTuple != null && tmpTuple.next != null && (tmpTuple.mode == 'w')){ @@ -263,64 +304,44 @@ return data; } - public void send(SocketChannel ch, ByteBuffer command, ByteBuffer data) - throws IOException { - if (debug) { - if (command == null) { - System.out.println("Manager_run: command is null"); - } - if (data == null) { - System.out.println("Manager_run: data is null"); - } - } - int send_size = PSX.LINDA_HEADER_SIZE; - int count = 0; - - //command Send - command.rewind(); - while(send_size > 0){ - count = ch.write(command); - if(count < 0){ - System.out.println("Write Falied! close ch:"+ch); - ch.close(); - return; - } - send_size -= count; - } - - //data Send - data.rewind(); - if(data != null) { - data.rewind(); - ch.write(data); - } + public void send(SocketChannel ch, ByteBuffer command, ByteBuffer data) { + if (debug) { + if (command == null) { + System.out.println("Manager_run: command is null"); + } + if (data == null) { + System.out.println("Manager_run: data is null"); + } + } + int send_size = PSX.LINDA_HEADER_SIZE; + int count = 0; + + try { + //command Send + command.rewind(); + while(send_size > 0){ + count = ch.write(command); + if(count < 0) throw new IOException(); + send_size -= count; } - public void send(SelectionKey key, ByteBuffer command, ByteBuffer data) - throws IOException { - SocketChannel ch = (SocketChannel)key.channel(); - send(ch,command,data); + if (data==null) return; + //data Send + data.rewind(); + while(data.remaining() > 0){ + count = ch.write(data); + if(count < 0) throw new IOException(); } - - private String getdataString(ByteBuffer data) { - String sendtext; - data.rewind(); - //set sendtext - //CharBuffer chardata = data.asCharBuffer(); - - //Decode UTF-8 to System Encoding(UTF-16) - Charset charset = Charset.forName("UTF-8"); - CharsetDecoder decoder = charset.newDecoder(); - CharBuffer cb = null; - try { - cb = decoder.decode(data); - } catch (CharacterCodingException e) { - e.printStackTrace(); + } catch (IOException e) { + System.out.println("Write Falied on:"+ch); + return; } - cb.rewind(); - - sendtext = cb.toString(); - return sendtext; } + public void send(SelectionKey key, ByteBuffer command, ByteBuffer data) { + SocketChannel ch = (SocketChannel)key.channel(); + send(ch,command,data); + } + + } \ No newline at end of file