Mercurial > hg > FederatedLinda
view src/fdl/TupleSpace.java @ 25:330fa49bc4fd
*** empty log message ***
author | kono |
---|---|
date | Wed, 20 Aug 2008 14:12:15 +0900 |
parents | 35375016b2f0 |
children | 846c6c14cf04 |
line wrap: on
line source
package fdl; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.SocketChannel; public class TupleSpace { static final boolean debug = true; public static int user = 0; public byte userchar[] = new byte[2]; public Tuple[] tuple_space; public IOHandlerHook hook = new NullIOHandlerHook(); public static final int MAX_TUPLE_ID = 65536; public TupleSpace() { // 読みこんだデータを格納するためのリストの初期化 tuple_space = new Tuple[TupleSpace.MAX_TUPLE_ID]; } public void newUser() { Tuple tmpTuple; //初期生成 if((tmpTuple = tuple_space[TupleSpace.MAX_TUPLE_ID-1]) == null) { tmpTuple = tuple_space[TupleSpace.MAX_TUPLE_ID-1] = new Tuple(); tmpTuple.next = null; } else { while(tmpTuple.next != null) tmpTuple = tmpTuple.next; tmpTuple.next = new Tuple(); tmpTuple = tmpTuple.next; tmpTuple.next = null; } user++; ByteBuffer data = ByteBuffer.allocate(2); userchar[0] = (byte) (user/10 + '0'); userchar[1] = (byte) (user%10 + '0'); data.put(userchar[0]); data.put(userchar[1]); data.flip(); tmpTuple.setData(data); //Tuple int id = TupleSpace.MAX_TUPLE_ID-1; int seq = 0; tmpTuple.setTuple('o', id, seq, data); System.out.println("Server: assign id "+user); } protected void Out(SelectionKey key,ByteBuffer command, ByteBuffer data) { Tuple tuple; int id; int datasize; char idc = (char)command.getShort(PSX.LINDA_ID_OFFSET); command.rewind(); id = (int)idc; datasize = command.getInt(PSX.LINDA_DATA_LENGTH_OFFSET); command.rewind(); if (debug) System.out.println("*** out command : id = "+id +" ***"); int seq = command.getInt(PSX.LINDA_SEQ_OFFSET); command.rewind(); hook.outHook(key,id,seq,'o',data); while((tuple_space[id] != null) && ((tuple_space[id].mode == PSX.PSX_WAIT_RD)||(tuple_space[id].mode == PSX.PSX_RD))) { PSX.setAnserCommand(command, tuple_space[id].getSeq()); PSX.send(tuple_space[id].ch, command, data); removeTuple(id); tuple = null; } if(tuple_space[id] != null && tuple_space[id].mode == PSX.PSX_IN) { PSX.setAnserCommand(command, tuple_space[id].getSeq()); if(debug){ int sendsize = datasize+PSX.LINDA_HEADER_SIZE; System.out.println("send size "+sendsize+" : mode = "+(char)'a'); } PSX.send(tuple_space[id].ch, command, data); removeTuple(id); tuple = null; } else if ((tuple_space[id] == null)|| (tuple_space[id].getMode() == PSX.PSX_OUT)) { if((tuple = tuple_space[id]) == null) { tuple = tuple_space[id] = new Tuple(); tuple.next = null; } else { while(tuple.next != null) tuple = tuple.next; tuple.next = new Tuple(); tuple = tuple.next; tuple.next = null; } tuple.setMode('o'); tuple.setSeq(seq); tuple.setData(data); if(debug){ System.out.println("data inserted len = "+tuple.getdataLength()+" : id = "+id); } } else { System.out.println("Incorrect mode :"+(char)tuple_space[id].getMode()); command.clear(); data.clear(); System.exit(1); } } private void removeTuple(int id) { Tuple tuple; //後処理 tuple = tuple_space[id]; tuple_space[id] = tuple.next; } protected void Wait_Rd(SelectionKey key, ByteBuffer command, int mode) { Tuple tuple; int id; char idc = (char)command.getShort(PSX.LINDA_ID_OFFSET); command.rewind(); id = (int)idc; if (debug) System.out.println("*** "+(char)mode+" command : id = "+ id +" ***\n"); tuple = new Tuple(); tuple.setMode(mode); int seq = command.getInt(PSX.LINDA_SEQ_OFFSET); command.rewind(); tuple.setSeq(seq); hook.waitReadHook(key,id,seq,(char)mode); tuple.ch = (SocketChannel) key.channel(); ByteBuffer buff = ByteBuffer.allocate(0); tuple.setData(buff); tuple.next = tuple_space[id]; tuple_space[id] = tuple; if(debug){ System.out.println("data inserted insert seq = "+seq +", id = "+id); } } protected void In_Rd(SelectionKey key, ByteBuffer command, int mode) throws IOException { Tuple tuple = read_in_1(key, command, mode); if (tuple!=null) { //send ByteBuffer sendcommand = tuple.getCommand(); ByteBuffer senddata = tuple.getData(); PSX.send(key,sendcommand, senddata); } } private Tuple read_in_1(SelectionKey key, ByteBuffer command, int mode) { Tuple tuple; int id; //id = command.getInt(PSX.LINDA_ID_OFFSET); //int mode = command.getInt(PSX.LINDA_MODE_OFFSET); Tuple temp = null; char idc = (char)command.getShort(PSX.LINDA_ID_OFFSET); command.rewind(); id = (int)idc; int seq = command.getInt(PSX.LINDA_SEQ_OFFSET); command.rewind(); if (debug) System.out.println("*** "+(char)mode+" command : id = "+ id +" ***\n"); hook.inHook(key,id,seq,(char)mode); tuple = tuple_space[id]; //wを無視 while(tuple != null && tuple.next != null && (tuple.mode == 'w')){ temp = tuple; tuple = tuple.next; } if (tuple != null && (tuple.mode == 'o')){ tuple = tupleIsAvailable(command, mode, tuple, id, temp); } else { tuple = setupWait(key, command, mode, tuple, id); } return tuple; } public ByteBuffer IN(int id,int mode, ByteBuffer command) { Tuple tuple,temp=null; tuple = tuple_space[id]; //wを無視 while(tuple != null && tuple.next != null && (tuple.mode == 'w')){ temp = tuple; tuple = tuple.next; } if (tuple != null && (tuple.mode == 'o')){ ByteBuffer data = tuple.data; tupleIsAvailable(command, mode, tuple, id, temp); return data; } return null; } private Tuple tupleIsAvailable(ByteBuffer command, int mode, Tuple tuple, int id, Tuple temp) { int seq = command.getInt(PSX.LINDA_SEQ_OFFSET); command.rewind(); tuple.setCommand('a', seq); if(debug){ int sendsize = tuple.getdataLength()+PSX.LINDA_HEADER_SIZE; System.out.println("send size "+sendsize+" : mode = "+(char)tuple.getMode()); } //INの場合はremoveする if(mode == PSX.PSX_IN) { if(temp != null){ temp.next = tuple.next; } else { tuple_space[id] = tuple.next; } } return tuple; } private Tuple setupWait(SelectionKey key, ByteBuffer command, int mode, Tuple tuple, int id) { if(tuple == null) { //ServerSocketChannel sc = (ServerSocketChannel)key.channel(); tuple = tuple_space[id] = new Tuple((SocketChannel)key.channel()); tuple.next = null; }else { while(tuple.next !=null) tuple =tuple.next; tuple.next= new Tuple((SocketChannel)key.channel()); tuple = tuple.next; tuple.next = null; } tuple.setMode(mode); int seq2 = command.getInt(PSX.LINDA_SEQ_OFFSET); command.rewind(); tuple.setSeq(seq2); tuple.ch = (SocketChannel) key.channel(); ByteBuffer buff = ByteBuffer.allocate(0); tuple.setData(buff); tuple = null; if(debug){ System.out.println("data inserted insert seq = "+seq2 +", id = "+id); } return tuple; } protected void Check(SelectionKey key, ByteBuffer command) throws IOException { ByteBuffer data = check1(key,command); PSX.send(key, command, data); } public ByteBuffer check1(SelectionKey key,ByteBuffer command) { ByteBuffer data; Tuple tmpTuple; int id; char idc = (char)command.getShort(PSX.LINDA_ID_OFFSET); command.rewind(); id = (int)idc; int seq = command.getInt(PSX.LINDA_SEQ_OFFSET); command.rewind(); hook.checkHook(key,id,seq,'c'); tmpTuple = tuple_space[id]; while(tmpTuple != null && tmpTuple.next != null && (tmpTuple.mode == 'w')){ tmpTuple = tmpTuple.next; } if (tmpTuple != null && (tmpTuple.mode == 'o')) { command.putInt(PSX.LINDA_DATA_LENGTH_OFFSET, tmpTuple.getdataLength()); command.rewind(); data = tmpTuple.getData(); }else { //means no out tuple command.putInt(PSX.LINDA_DATA_LENGTH_OFFSET, 0); command.rewind(); data = ByteBuffer.allocate(0); } return data; } }