Mercurial > hg > FederatedLinda
view src/fdl/TupleSpace.java @ 20:a0fd653d1121
Debug Client and Meta Engine for logging.
author | kono |
---|---|
date | Tue, 19 Aug 2008 06:26:20 +0900 |
parents | 0243987383b7 |
children | b4fd7fb9135a |
line wrap: on
line source
package fdl; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.SocketChannel; public class TupleSpace { static final boolean debug = true; static final int CAPSIZE = 4194304; public static int user = 0; public byte userchar[] = new byte[2]; public Tuple[] tuple_space; public IOHandlerHook hook = new NullIOHandlerHook(); public TupleSpace() { // 読みこんだデータを格納するためのリストの初期化 tuple_space = new Tuple[TupleHandler.MAX_TUPLE]; } public void newUser() { Tuple tmpTuple; //初期生成 if((tmpTuple = tuple_space[TupleHandler.MAX_TUPLE-1]) == null) { tmpTuple = tuple_space[TupleHandler.MAX_TUPLE-1] = new Tuple(); tmpTuple.next = null; } else { while(tmpTuple.next != null) tmpTuple = tmpTuple.next; tmpTuple.next = new Tuple(); tmpTuple = tmpTuple.next; tmpTuple.next = null; } user++; ByteBuffer data = ByteBuffer.allocate(2); data.clear(); userchar[0] = (byte) (user/10 + '0'); userchar[1] = (byte) (user%10 + '0'); data.put(userchar[0]); data.put(userchar[1]); data.rewind(); tmpTuple.setData(data); //Tuple int id = TupleHandler.MAX_TUPLE-1; tmpTuple.setTuple('o', id, 0, data.limit(), data); System.out.println("Server: assign id "+user); } protected void Out(SelectionKey key,ByteBuffer command, ByteBuffer data) { Tuple tuple; int id; int datasize; char idc = (char)command.getShort(PSX.LINDA_ID_OFFSET); command.rewind(); id = (int)idc; datasize = command.getInt(PSX.LINDA_DATA_LENGTH_OFFSET); command.rewind(); System.out.println("*** out command : id = "+id +" ***"); int seq = command.getInt(PSX.LINDA_SEQ_OFFSET); command.rewind(); hook.outHook(key,id,seq,'o',data); while((tuple_space[id] != null) && ((tuple_space[id].mode == PSX.PSX_WAIT_RD)||(tuple_space[id].mode == PSX.PSX_RD))) { PSX.setAnserCommand(command, tuple_space[id].getSeq()); send(tuple_space[id].ch, command, data); removeTuple(id); tuple = null; } if(tuple_space[id] != null && tuple_space[id].mode == PSX.PSX_IN) { PSX.setAnserCommand(command, tuple_space[id].getSeq()); if(debug){ int sendsize = datasize+PSX.LINDA_HEADER_SIZE; System.out.println("send size "+sendsize+" : mode = "+(char)'a'); } send(tuple_space[id].ch, command, data); removeTuple(id); tuple = null; } else if ((tuple_space[id] == null)|| (tuple_space[id].getMode() == PSX.PSX_OUT)) { if((tuple = tuple_space[id]) == null) { tuple = tuple_space[id] = new Tuple(); tuple.next = null; } else { while(tuple.next != null) tuple = tuple.next; tuple.next = new Tuple(); tuple = tuple.next; tuple.next = null; } tuple.setMode('o'); tuple.setSeq(seq); tuple.setData(data); tuple.setDataLength(datasize); if(debug){ System.out.println("data inserted len = "+tuple.getdataLength()+" : id = "+id); } } else { System.out.println("Incorrect mode :"+(char)tuple_space[id].getMode()); command.clear(); data.clear(); System.exit(1); } } private void removeTuple(int id) { Tuple tuple; //後処理 tuple = tuple_space[id]; tuple_space[id] = tuple.next; } protected void Wait_Rd(SelectionKey key, ByteBuffer command, int mode) { Tuple tuple; int id; char idc = (char)command.getShort(PSX.LINDA_ID_OFFSET); command.rewind(); id = (int)idc; if (debug) System.out.println("*** "+(char)mode+" command : id = "+ id +" ***\n"); tuple = new Tuple(); tuple.setMode(mode); int seq = command.getInt(PSX.LINDA_SEQ_OFFSET); command.rewind(); tuple.setSeq(seq); hook.waitReadHook(key,id,seq,(char)mode); tuple.ch = (SocketChannel) key.channel(); tuple.setDataLength(0); ByteBuffer buff = ByteBuffer.allocate(0); tuple.setData(buff); tuple.next = tuple_space[id]; tuple_space[id] = tuple; if(debug){ System.out.println("data inserted insert seq = "+seq +", id = "+id); } } protected void In_Rd(SelectionKey key, ByteBuffer command, int mode) throws IOException { Tuple tuple = read_in_1(key, command, mode); if (tuple!=null) { //send ByteBuffer sendcommand = tuple.getCommand(); ByteBuffer senddata = tuple.getData(); send(key,sendcommand, senddata); } } private Tuple read_in_1(SelectionKey key, ByteBuffer command, int mode) { Tuple tuple; int id; //id = command.getInt(PSX.LINDA_ID_OFFSET); //int mode = command.getInt(PSX.LINDA_MODE_OFFSET); Tuple temp = null; char idc = (char)command.getShort(PSX.LINDA_ID_OFFSET); command.rewind(); id = (int)idc; int seq = command.getInt(PSX.LINDA_SEQ_OFFSET); command.rewind(); System.out.println("*** "+(char)mode+" command : id = "+ id +" ***\n"); hook.inHook(key,id,seq,(char)mode); tuple = tuple_space[id]; //wを無視 while(tuple != null && tuple.next != null && (tuple.mode == 'w')){ temp = tuple; tuple = tuple.next; } if (tuple != null && (tuple.mode == 'o')){ tupleIsAvailable(command, mode, tuple, id, temp); } else { tuple = setupWait(key, command, mode, tuple, id); } return tuple; } public ByteBuffer IN(int id,int mode, ByteBuffer command) { Tuple tuple,temp=null; tuple = tuple_space[id]; //wを無視 while(tuple != null && tuple.next != null && (tuple.mode == 'w')){ temp = tuple; tuple = tuple.next; } if (tuple != null && (tuple.mode == 'o')){ ByteBuffer data = tuple.data; tupleIsAvailable(command, mode, tuple, id, temp); return data; } return null; } private void tupleIsAvailable(ByteBuffer command, int mode, Tuple tuple, int id, Tuple temp) { //tmpTuple = new Tuple((SocketChannel)key.channel()); int seq = command.getInt(PSX.LINDA_SEQ_OFFSET); command.rewind(); tuple.setCommand('a', seq); if(debug){ int sendsize = tuple.getdataLength()+PSX.LINDA_HEADER_SIZE; System.out.println("send size "+sendsize+" : mode = "+(char)tuple.getMode()); } //INの場合はremoveする if(mode == PSX.PSX_IN) { if(tuple.data != null){ //ByteBuffer buff = ByteBuffer.allocate(0); //tmpTuple.setData(buff); tuple.data = null; } if(temp != null){ temp.next = tuple.next; } else { tuple_space[id] = tuple.next; } } } private Tuple setupWait(SelectionKey key, ByteBuffer command, int mode, Tuple tuple, int id) { if(tuple == null) { //ServerSocketChannel sc = (ServerSocketChannel)key.channel(); tuple = tuple_space[id] = new Tuple((SocketChannel)key.channel()); tuple.next = null; }else { while(tuple.next !=null) tuple =tuple.next; tuple.next= new Tuple((SocketChannel)key.channel()); tuple = tuple.next; tuple.next = null; } tuple.setMode(mode); int seq2 = command.getInt(PSX.LINDA_SEQ_OFFSET); command.rewind(); tuple.setSeq(seq2); tuple.ch = (SocketChannel) key.channel(); tuple.setDataLength(0); ByteBuffer buff = ByteBuffer.allocate(0); buff.rewind(); tuple.setData(buff); tuple = null; if(debug){ System.out.println("data inserted insert seq = "+seq2 +", id = "+id); } return tuple; } protected void Check(SelectionKey key, ByteBuffer command) throws IOException { ByteBuffer data = check1(key,command); send(key, command, data); } public ByteBuffer check1(SelectionKey key,ByteBuffer command) { ByteBuffer data; Tuple tmpTuple; int id; char idc = (char)command.getShort(PSX.LINDA_ID_OFFSET); command.rewind(); id = (int)idc; int seq = command.getInt(PSX.LINDA_SEQ_OFFSET); command.rewind(); hook.checkHook(key,id,seq,'c'); tmpTuple = tuple_space[id]; while(tmpTuple != null && tmpTuple.next != null && (tmpTuple.mode == 'w')){ tmpTuple = tmpTuple.next; } if (tmpTuple != null && (tmpTuple.mode == 'o')) { command.putInt(PSX.LINDA_DATA_LENGTH_OFFSET, tmpTuple.datalen); command.rewind(); data = tmpTuple.getData(); }else { //means no out tuple command.putInt(PSX.LINDA_DATA_LENGTH_OFFSET, 0); command.rewind(); data = ByteBuffer.allocate(0); } return data; } public void send(SocketChannel ch, ByteBuffer command, ByteBuffer data) { if (debug) { if (command == null) { System.out.println("Manager_run: command is null"); } if (data == null) { System.out.println("Manager_run: data is null"); } } int send_size = PSX.LINDA_HEADER_SIZE; int count = 0; try { //command Send command.rewind(); while(send_size > 0){ count = ch.write(command); if(count < 0) throw new IOException(); send_size -= count; } if (data==null) return; //data Send data.rewind(); while(data.remaining() > 0){ count = ch.write(data); if(count < 0) throw new IOException(); } } catch (IOException e) { System.out.println("Write Falied on:"+ch); return; } } public void send(SelectionKey key, ByteBuffer command, ByteBuffer data) { SocketChannel ch = (SocketChannel)key.channel(); send(ch,command,data); } }