Mercurial > hg > FederatedLinda
view src/fdl/TupleSpace.java @ 17:609b288f47f9
*** empty log message ***
author | kono |
---|---|
date | Mon, 18 Aug 2008 07:28:29 +0900 (2008-08-17) |
parents | cccf34386cad |
children | 0243987383b7 |
line wrap: on
line source
package fdl; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.CharBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.SocketChannel; import java.nio.charset.CharacterCodingException; import java.nio.charset.Charset; import java.nio.charset.CharsetDecoder; public class TupleSpace { static final boolean debug = true; static final int CAPSIZE = 4194304; public Tuple[] tuple_space; public TupleSpace(Tuple[] _tuple_space) { super(); // 読みこんだデータを格納するためのリストの初期化 tuple_space = _tuple_space; } public TupleSpace() { // TODO Auto-generated constructor stub } protected String Out(ByteBuffer command, ByteBuffer data) throws IOException { Tuple tuple; int id; int datasize; char idc = (char)command.getShort(PSX.LINDA_ID_OFFSET); command.rewind(); id = (int)idc; String sendtext = "none"; datasize = command.getInt(PSX.LINDA_DATA_LENGTH_OFFSET); command.rewind(); System.out.println("*** out command : id = "+id +" ***"); while((tuple_space[id] != null) && ((tuple_space[id].mode == PSX.PSX_WAIT_RD)||(tuple_space[id].mode == PSX.PSX_RD))) { PSX.setAnserCommand(command, tuple_space[id].getSeq()); //if(debug){ //int sendsize = tmpTuple.getdataLength()+PSX.LINDA_HEADER_SIZE; //System.out.println("send size "+sendsize+" : mode = "+(char)mode); //} //ByteBuffer sendcommand = tmpTuple.getCommand(); //ByteBuffer senddata = tmpTuple.getData(); send(tuple_space[id].ch, command, data); sendtext = getdataString(data); removeTuple(id); tuple = null; } if(tuple_space[id] != null && tuple_space[id].mode == PSX.PSX_IN) { PSX.setAnserCommand(command, tuple_space[id].getSeq()); if(debug){ int sendsize = datasize+PSX.LINDA_HEADER_SIZE; System.out.println("send size "+sendsize+" : mode = "+(char)'a'); } //ByteBuffer sendcommand = tmpTuple.getCommand(); //ByteBuffer senddata = tmpTuple.getData(); send(tuple_space[id].ch, command, data); sendtext = getdataString(data); removeTuple(id); tuple = null; } else if ((tuple_space[id] == null)|| (tuple_space[id].getMode() == PSX.PSX_OUT)) { if((tuple = tuple_space[id]) == null) { tuple = tuple_space[id] = new Tuple(); tuple.next = null; } else { while(tuple.next != null) tuple = tuple.next; tuple.next = new Tuple(); tuple = tuple.next; tuple.next = null; } tuple.setMode('o'); int seq = command.getInt(PSX.LINDA_SEQ_OFFSET); command.rewind(); tuple.setSeq(seq); tuple.setData(data); tuple.setDataLength(datasize); if(debug){ System.out.println("data inserted len = "+tuple.getdataLength()+" : id = "+id); } } else { System.out.println("Incorrect mode :"+(char)tuple_space[id].getMode()); command.clear(); data.clear(); System.exit(1); } return sendtext; } private void removeTuple(int id) { Tuple tuple; //後処理 tuple = tuple_space[id]; tuple_space[id] = tuple.next; } protected void Wait_Rd(SelectionKey key, ByteBuffer command, int mode) { Tuple tuple; int id; char idc = (char)command.getShort(PSX.LINDA_ID_OFFSET); command.rewind(); id = (int)idc; if (debug) System.out.println("*** "+(char)mode+" command : id = "+ id +" ***\n"); tuple = new Tuple(); tuple.setMode(mode); int seq = command.getInt(PSX.LINDA_SEQ_OFFSET); command.rewind(); tuple.setSeq(seq); tuple.ch = (SocketChannel) key.channel(); tuple.setDataLength(0); ByteBuffer buff = ByteBuffer.allocate(0); tuple.setData(buff); tuple.next = tuple_space[id]; tuple_space[id] = tuple; if(debug){ System.out.println("data inserted insert seq = "+seq +", id = "+id); } } protected String In_Rd(SelectionKey key, ByteBuffer command, int mode) throws IOException { Tuple tuple = read_in_1(key, command, mode); if (tuple!=null) { //send ByteBuffer sendcommand = tuple.getCommand(); ByteBuffer senddata = tuple.getData(); send(key,sendcommand, senddata); } String sendtext = getdataString(tuple.getData()); return sendtext; } private Tuple read_in_1(SelectionKey key, ByteBuffer command, int mode) { Tuple tuple; int id; //id = command.getInt(PSX.LINDA_ID_OFFSET); //int mode = command.getInt(PSX.LINDA_MODE_OFFSET); Tuple temp = null; char idc = (char)command.getShort(PSX.LINDA_ID_OFFSET); command.rewind(); id = (int)idc; System.out.println("*** "+(char)mode+" command : id = "+ id +" ***\n"); tuple = tuple_space[id]; //wを無視 while(tuple != null && tuple.next != null && (tuple.mode == 'w')){ temp = tuple; tuple = tuple.next; } if (tuple != null && (tuple.mode == 'o')){ //tmpTuple = new Tuple((SocketChannel)key.channel()); int seq = command.getInt(PSX.LINDA_SEQ_OFFSET); command.rewind(); tuple.setCommand('a', seq); if(debug){ int sendsize = tuple.getdataLength()+PSX.LINDA_HEADER_SIZE; System.out.println("send size "+sendsize+" : mode = "+(char)tuple.getMode()); } //INの場合はremoveする if(mode == PSX.PSX_IN) { if(tuple.data != null){ //ByteBuffer buff = ByteBuffer.allocate(0); //tmpTuple.setData(buff); tuple.data = null; } if(temp != null){ temp.next = tuple.next; } else { tuple_space[id] = tuple.next; } } } else { if(tuple == null) { //ServerSocketChannel sc = (ServerSocketChannel)key.channel(); tuple = tuple_space[id] = new Tuple((SocketChannel)key.channel()); tuple.next = null; }else { while(tuple.next !=null) tuple =tuple.next; tuple.next= new Tuple((SocketChannel)key.channel()); tuple = tuple.next; tuple.next = null; } tuple.setMode(mode); int seq2 = command.getInt(PSX.LINDA_SEQ_OFFSET); command.rewind(); tuple.setSeq(seq2); tuple.ch = (SocketChannel) key.channel(); tuple.setDataLength(0); ByteBuffer buff = ByteBuffer.allocate(0); buff.rewind(); tuple.setData(buff); tuple = null; if(debug){ System.out.println("data inserted insert seq = "+seq2 +", id = "+id); } } return tuple; } protected String Check(SelectionKey key, ByteBuffer command) throws IOException { String sendtext; ByteBuffer data = check1(command); send(key, command, data); sendtext = getdataString(data); return sendtext; } private ByteBuffer check1(ByteBuffer command) { ByteBuffer data; Tuple tmpTuple; int id; char idc = (char)command.getShort(PSX.LINDA_ID_OFFSET); command.rewind(); id = (int)idc; tmpTuple = tuple_space[id]; while(tmpTuple != null && tmpTuple.next != null && (tmpTuple.mode == 'w')){ tmpTuple = tmpTuple.next; } if (tmpTuple != null && (tmpTuple.mode == 'o')) { command.putInt(PSX.LINDA_DATA_LENGTH_OFFSET, tmpTuple.datalen); command.rewind(); data = tmpTuple.getData(); }else { //means no out tuple command.putInt(PSX.LINDA_DATA_LENGTH_OFFSET, 0); command.rewind(); data = ByteBuffer.allocate(0); } return data; } public void send(SocketChannel ch, ByteBuffer command, ByteBuffer data) throws IOException { if (debug) { if (command == null) { System.out.println("Manager_run: command is null"); } if (data == null) { System.out.println("Manager_run: data is null"); } } int send_size = PSX.LINDA_HEADER_SIZE; int count = 0; //command Send command.rewind(); while(send_size > 0){ count = ch.write(command); if(count < 0){ System.out.println("Write Falied! close ch:"+ch); ch.close(); return; } send_size -= count; } //data Send data.rewind(); if(data != null) { data.rewind(); ch.write(data); } } public void send(SelectionKey key, ByteBuffer command, ByteBuffer data) throws IOException { SocketChannel ch = (SocketChannel)key.channel(); send(ch,command,data); } private String getdataString(ByteBuffer data) { String sendtext; data.rewind(); //set sendtext //CharBuffer chardata = data.asCharBuffer(); //Decode UTF-8 to System Encoding(UTF-16) Charset charset = Charset.forName("UTF-8"); CharsetDecoder decoder = charset.newDecoder(); CharBuffer cb = null; try { cb = decoder.decode(data); } catch (CharacterCodingException e) { e.printStackTrace(); } cb.rewind(); sendtext = cb.toString(); return sendtext; } }