Mercurial > hg > FederatedLinda
view src/fdl/IOParam.java @ 15:aced4bfc15af
add Meta Linda Interface for debugger.
author | kono |
---|---|
date | Sun, 17 Aug 2008 20:24:24 +0900 |
parents | 4391c9fac885 |
children |
line wrap: on
line source
package fdl; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.CharBuffer; import java.nio.channels.ClosedChannelException; import java.nio.channels.SelectionKey; import java.nio.channels.SocketChannel; import java.nio.charset.CharacterCodingException; import java.nio.charset.Charset; import java.nio.charset.CharsetDecoder; public class IOParam implements TupleHandler, PSXQueueInterface{ static final boolean debug = true; static final int CAPSIZE = 4194304; public Tuple[] tuple_space; public IOParam(Tuple[] _tuple_space) { super(); // 読みこんだデータを格納するためのリストの初期化 tuple_space = _tuple_space; } public IOParam() { // TODO Auto-generated constructor stub } protected String Out(ByteBuffer command, ByteBuffer data) throws IOException { Tuple tmpTuple; int id; int datasize; char idc = (char)command.getShort(LINDA_ID_OFFSET); command.rewind(); id = (int)idc; String sendtext = "none"; datasize = command.getInt(LINDA_DATA_LENGTH_OFFSET); command.rewind(); System.out.println("*** out command : id = "+id +" ***"); while((tuple_space[id] != null) && ((tuple_space[id].mode == PSX_WAIT_RD)||(tuple_space[id].mode == PSX_RD))) { command.put(LINDA_MODE_OFFSET, (byte)'a'); command.rewind(); command.putInt(LINDA_SEQ_OFFSET, tuple_space[id].getSeq()); command.rewind(); //if(debug){ //int sendsize = tmpTuple.getdataLength()+LINDA_HEADER_SIZE; //System.out.println("send size "+sendsize+" : mode = "+(char)mode); //} //ByteBuffer sendcommand = tmpTuple.getCommand(); //ByteBuffer senddata = tmpTuple.getData(); send(tuple_space[id].ch, command, data); sendtext = getdataString(data); //後処理 tmpTuple = tuple_space[id]; tuple_space[id] = tmpTuple.next; tmpTuple = null; } if(tuple_space[id] != null && tuple_space[id].mode == PSX_IN) { command.put(LINDA_MODE_OFFSET, (byte)'a'); command.rewind(); command.putInt(LINDA_SEQ_OFFSET, tuple_space[id].getSeq()); command.rewind(); if(debug){ int sendsize = datasize+LINDA_HEADER_SIZE; System.out.println("send size "+sendsize+" : mode = "+(char)'a'); } //ByteBuffer sendcommand = tmpTuple.getCommand(); //ByteBuffer senddata = tmpTuple.getData(); send(tuple_space[id].ch, command, data); sendtext = getdataString(data); //後処理 tmpTuple = tuple_space[id]; tuple_space[id] = tmpTuple.next; tmpTuple = null; } else if ((tuple_space[id] == null)|| (tuple_space[id].getMode() == PSX_OUT)) { if((tmpTuple = tuple_space[id]) == null) { tmpTuple = tuple_space[id] = new Tuple(); tmpTuple.next = null; } else { while(tmpTuple.next != null) tmpTuple = tmpTuple.next; tmpTuple.next = new Tuple(); tmpTuple = tmpTuple.next; tmpTuple.next = null; } tmpTuple.setMode('o'); int seq = command.getInt(LINDA_SEQ_OFFSET); command.rewind(); tmpTuple.setSeq(seq); tmpTuple.setData(data); tmpTuple.setDataLength(datasize); if(debug){ System.out.println("data inserted len = "+tmpTuple.getdataLength()+" : id = "+id); } } else { System.out.println("Uncorrect mode :"+(char)tuple_space[id].getMode()); command.clear(); data.clear(); System.exit(1); } return sendtext; } protected void Wait_Rd(SelectionKey key, ByteBuffer command, int mode) { Tuple tmpTuple; int id; char idc = (char)command.getShort(LINDA_ID_OFFSET); command.rewind(); id = (int)idc; System.out.println("*** "+(char)mode+" command : id = "+ id +" ***\n"); tmpTuple = new Tuple(); tmpTuple.setMode(mode); int seq = command.getInt(LINDA_SEQ_OFFSET); command.rewind(); tmpTuple.setSeq(seq); tmpTuple.ch = (SocketChannel) key.channel(); tmpTuple.setDataLength(0); ByteBuffer buff = ByteBuffer.allocate(0); tmpTuple.setData(buff); tmpTuple.next = tuple_space[id]; tuple_space[id] = tmpTuple; if(debug){ System.out.println("data inserted insert seq = "+seq +", id = "+id); } } protected String In_Rd(SelectionKey key, ByteBuffer command, int mode) throws IOException { Tuple tmpTuple; int id; //id = command.getInt(LINDA_ID_OFFSET); //int mode = command.getInt(LINDA_MODE_OFFSET); Tuple temp = null; char idc = (char)command.getShort(LINDA_ID_OFFSET); command.rewind(); id = (int)idc; String sendtext = "none"; System.out.println("*** "+(char)mode+" command : id = "+ id +" ***\n"); tmpTuple = tuple_space[id]; //wを無視 while(tmpTuple != null && tmpTuple.next != null && (tmpTuple.mode == 'w')){ temp = tmpTuple; tmpTuple = tmpTuple.next; } if (tmpTuple != null && (tmpTuple.mode == 'o')){ //tmpTuple = new Tuple((SocketChannel)key.channel()); int seq = command.getInt(LINDA_SEQ_OFFSET); command.rewind(); tmpTuple.setCommand('a', seq); if(debug){ int sendsize = tmpTuple.getdataLength()+LINDA_HEADER_SIZE; System.out.println("send size "+sendsize+" : mode = "+(char)tmpTuple.getMode()); } //send ByteBuffer sendcommand = tmpTuple.getCommand(); ByteBuffer senddata = tmpTuple.getData(); send(key,sendcommand, senddata); sendtext = getdataString(senddata); //INの場合はremoveする if(mode == PSX_IN) { if(tmpTuple.data != null){ //ByteBuffer buff = ByteBuffer.allocate(0); //tmpTuple.setData(buff); tmpTuple.data = null; } if(temp != null){ temp.next = tmpTuple.next; } else { tuple_space[id] = tmpTuple.next; } tmpTuple = null; } } else { if(tmpTuple == null) { //ServerSocketChannel sc = (ServerSocketChannel)key.channel(); tmpTuple = tuple_space[id] = new Tuple((SocketChannel)key.channel()); tmpTuple.next = null; }else { while(tmpTuple.next !=null) tmpTuple =tmpTuple.next; tmpTuple.next= new Tuple((SocketChannel)key.channel()); tmpTuple = tmpTuple.next; tmpTuple.next = null; } tmpTuple.setMode(mode); int seq2 = command.getInt(LINDA_SEQ_OFFSET); command.rewind(); tmpTuple.setSeq(seq2); tmpTuple.ch = (SocketChannel) key.channel(); tmpTuple.setDataLength(0); ByteBuffer buff = ByteBuffer.allocate(0); buff.rewind(); tmpTuple.setData(buff); if(debug){ System.out.println("data inserted insert seq = "+seq2 +", id = "+id); } } //} else if (command.getInt(LINDA_MODE_OFFSET) == PSX_WAIT_RD) { return sendtext; } protected String Check(SelectionKey key, ByteBuffer command) throws IOException { ByteBuffer data; Tuple tmpTuple; int id; char idc = (char)command.getShort(LINDA_ID_OFFSET); command.rewind(); id = (int)idc; String sendtext = "none"; tmpTuple = tuple_space[id]; while(tmpTuple != null && tmpTuple.next != null && (tmpTuple.mode == 'w')){ tmpTuple = tmpTuple.next; } if (tmpTuple != null && (tmpTuple.mode == 'o')) { command.putInt(LINDA_DATA_LENGTH_OFFSET, tmpTuple.datalen); command.rewind(); data = tmpTuple.getData(); }else { //means no out tuple command.putInt(LINDA_DATA_LENGTH_OFFSET, 0); command.rewind(); data = null; data = ByteBuffer.allocate(0); } send(key, command, data); sendtext = getdataString(data); return sendtext; } public void send(SocketChannel ch, ByteBuffer command, ByteBuffer data) throws IOException { if (debug) { if (command == null) { System.out.println("Manager_run: command is null"); } if (data == null) { System.out.println("Manager_run: data is null"); } } int send_size = LINDA_HEADER_SIZE; int count = 0; //command Send command.rewind(); while(send_size > 0){ count = ch.write(command); if(count < 0){ System.out.println("Write Falied! close ch:"+ch); ch.close(); return; } send_size -= count; } //data Send data.rewind(); if(data != null) { data.rewind(); ch.write(data); } } public void send(SelectionKey key, ByteBuffer command, ByteBuffer data) throws IOException { SocketChannel ch = (SocketChannel)key.channel(); send(ch,command,data); } private String getdataString(ByteBuffer data) { String sendtext; data.rewind(); //set sendtext //CharBuffer chardata = data.asCharBuffer(); //Decode UTF-8 to System Encoding(UTF-16) Charset charset = Charset.forName("UTF-8"); CharsetDecoder decoder = charset.newDecoder(); CharBuffer cb = null; try { cb = decoder.decode(data); } catch (CharacterCodingException e) { e.printStackTrace(); } cb.rewind(); sendtext = cb.toString(); return sendtext; } public void handle(SelectionKey key) throws ClosedChannelException, IOException { } }