Mercurial > hg > FederatedLinda
changeset 16:cccf34386cad
*** empty log message ***
author | kono |
---|---|
date | Mon, 18 Aug 2008 06:17:54 +0900 |
parents | aced4bfc15af |
children | 609b288f47f9 |
files | src/fdl/AcceptHandler.java src/fdl/ComDebug.java src/fdl/IOHandler.java src/fdl/IOParam.java src/fdl/TupleSpace.java |
diffstat | 5 files changed, 333 insertions(+), 328 deletions(-) [+] |
line wrap: on
line diff
--- a/src/fdl/AcceptHandler.java Sun Aug 17 20:24:24 2008 +0900 +++ b/src/fdl/AcceptHandler.java Mon Aug 18 06:17:54 2008 +0900 @@ -65,7 +65,7 @@ // 入出力用のハンドラを生成し,アタッチする // 監視する操作は読み込みのみ - IOParam handler = new IOHandler(tuple_space); + TupleSpace handler = new IOHandler(tuple_space); channel.register(key.selector(), SelectionKey.OP_READ, handler);
--- a/src/fdl/ComDebug.java Sun Aug 17 20:24:24 2008 +0900 +++ b/src/fdl/ComDebug.java Mon Aug 18 06:17:54 2008 +0900 @@ -54,7 +54,7 @@ command.rewind(); //送信 - IOParam io = new IOParam(); + TupleSpace io = new TupleSpace(); Iterator <SocketChannel> it = reportCh_list.iterator(); while(it.hasNext()) { io.send(it.next(), command, data); @@ -101,6 +101,7 @@ //通信ログ Hostname:port 'mode' =number 形式でインクリメント int cnt = 0; SocketChannel ch = (SocketChannel) key.channel(); + if (sendtext==null) sendtext="none"; String remoteString = getRemoteHostAndPort(ch); String localString = getLocalHostAndPort(ch);
--- a/src/fdl/IOHandler.java Sun Aug 17 20:24:24 2008 +0900 +++ b/src/fdl/IOHandler.java Mon Aug 18 06:17:54 2008 +0900 @@ -10,7 +10,7 @@ import java.util.LinkedList; -public class IOHandler extends IOParam { +public class IOHandler extends TupleSpace { static final boolean debug = false; public Tuple[] tuple_space; public ComDebug com_debug;
--- a/src/fdl/IOParam.java Sun Aug 17 20:24:24 2008 +0900 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,325 +0,0 @@ -package fdl; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.CharBuffer; -import java.nio.channels.ClosedChannelException; -import java.nio.channels.SelectionKey; -import java.nio.channels.SocketChannel; -import java.nio.charset.CharacterCodingException; -import java.nio.charset.Charset; -import java.nio.charset.CharsetDecoder; - -public class IOParam implements TupleHandler, PSXQueueInterface{ - static final boolean debug = true; - static final int CAPSIZE = 4194304; - public Tuple[] tuple_space; - - public IOParam(Tuple[] _tuple_space) { - super(); - // 読みこんだデータを格納するためのリストの初期化 - tuple_space = _tuple_space; - } - - public IOParam() { - // TODO Auto-generated constructor stub - } - - protected String Out(ByteBuffer command, ByteBuffer data) throws IOException { - Tuple tmpTuple; - int id; - int datasize; - char idc = (char)command.getShort(LINDA_ID_OFFSET); - command.rewind(); - id = (int)idc; - String sendtext = "none"; - - datasize = command.getInt(LINDA_DATA_LENGTH_OFFSET); - command.rewind(); - - System.out.println("*** out command : id = "+id +" ***"); - - while((tuple_space[id] != null) && - ((tuple_space[id].mode == PSX_WAIT_RD)||(tuple_space[id].mode == PSX_RD))) { - command.put(LINDA_MODE_OFFSET, (byte)'a'); - command.rewind(); - command.putInt(LINDA_SEQ_OFFSET, tuple_space[id].getSeq()); - command.rewind(); - //if(debug){ - //int sendsize = tmpTuple.getdataLength()+LINDA_HEADER_SIZE; - //System.out.println("send size "+sendsize+" : mode = "+(char)mode); - //} - //ByteBuffer sendcommand = tmpTuple.getCommand(); - //ByteBuffer senddata = tmpTuple.getData(); - send(tuple_space[id].ch, command, data); - - sendtext = getdataString(data); - - - //後処理 - tmpTuple = tuple_space[id]; - tuple_space[id] = tmpTuple.next; - tmpTuple = null; - } - if(tuple_space[id] != null && tuple_space[id].mode == PSX_IN) { - command.put(LINDA_MODE_OFFSET, (byte)'a'); - command.rewind(); - command.putInt(LINDA_SEQ_OFFSET, tuple_space[id].getSeq()); - command.rewind(); - - if(debug){ - int sendsize = datasize+LINDA_HEADER_SIZE; - System.out.println("send size "+sendsize+" : mode = "+(char)'a'); - } - //ByteBuffer sendcommand = tmpTuple.getCommand(); - //ByteBuffer senddata = tmpTuple.getData(); - send(tuple_space[id].ch, command, data); - - sendtext = getdataString(data); - - //後処理 - tmpTuple = tuple_space[id]; - tuple_space[id] = tmpTuple.next; - tmpTuple = null; - } else if ((tuple_space[id] == null)|| (tuple_space[id].getMode() == PSX_OUT)) { - if((tmpTuple = tuple_space[id]) == null) { - tmpTuple = tuple_space[id] = new Tuple(); - tmpTuple.next = null; - } - else { - while(tmpTuple.next != null) tmpTuple = tmpTuple.next; - tmpTuple.next = new Tuple(); - tmpTuple = tmpTuple.next; - tmpTuple.next = null; - } - - tmpTuple.setMode('o'); - int seq = command.getInt(LINDA_SEQ_OFFSET); - command.rewind(); - tmpTuple.setSeq(seq); - tmpTuple.setData(data); - tmpTuple.setDataLength(datasize); - if(debug){ - System.out.println("data inserted len = "+tmpTuple.getdataLength()+" : id = "+id); - } - } - else { - System.out.println("Uncorrect mode :"+(char)tuple_space[id].getMode()); - command.clear(); - data.clear(); - System.exit(1); - } - return sendtext; - } - - protected void Wait_Rd(SelectionKey key, ByteBuffer command, int mode) { - Tuple tmpTuple; - int id; - - char idc = (char)command.getShort(LINDA_ID_OFFSET); - command.rewind(); - id = (int)idc; - - System.out.println("*** "+(char)mode+" command : id = "+ id +" ***\n"); - - tmpTuple = new Tuple(); - tmpTuple.setMode(mode); - int seq = command.getInt(LINDA_SEQ_OFFSET); - command.rewind(); - tmpTuple.setSeq(seq); - tmpTuple.ch = (SocketChannel) key.channel(); - tmpTuple.setDataLength(0); - ByteBuffer buff = ByteBuffer.allocate(0); - tmpTuple.setData(buff); - tmpTuple.next = tuple_space[id]; - tuple_space[id] = tmpTuple; - if(debug){ - System.out.println("data inserted insert seq = "+seq +", id = "+id); - } - } - - protected String In_Rd(SelectionKey key, ByteBuffer command, int mode) - throws IOException { - Tuple tmpTuple; - int id; - //id = command.getInt(LINDA_ID_OFFSET); - //int mode = command.getInt(LINDA_MODE_OFFSET); - Tuple temp = null; - - char idc = (char)command.getShort(LINDA_ID_OFFSET); - command.rewind(); - id = (int)idc; - - String sendtext = "none"; - - System.out.println("*** "+(char)mode+" command : id = "+ id +" ***\n"); - - tmpTuple = tuple_space[id]; - - //wを無視 - while(tmpTuple != null && tmpTuple.next != null && (tmpTuple.mode == 'w')){ - temp = tmpTuple; - tmpTuple = tmpTuple.next; - } - - if (tmpTuple != null && (tmpTuple.mode == 'o')){ - //tmpTuple = new Tuple((SocketChannel)key.channel()); - int seq = command.getInt(LINDA_SEQ_OFFSET); - command.rewind(); - tmpTuple.setCommand('a', seq); - - if(debug){ - int sendsize = tmpTuple.getdataLength()+LINDA_HEADER_SIZE; - System.out.println("send size "+sendsize+" : mode = "+(char)tmpTuple.getMode()); - } - - //send - ByteBuffer sendcommand = tmpTuple.getCommand(); - ByteBuffer senddata = tmpTuple.getData(); - send(key,sendcommand, senddata); - - sendtext = getdataString(senddata); - - - //INの場合はremoveする - if(mode == PSX_IN) { - if(tmpTuple.data != null){ - //ByteBuffer buff = ByteBuffer.allocate(0); - //tmpTuple.setData(buff); - tmpTuple.data = null; - } - if(temp != null){ - temp.next = tmpTuple.next; - } - else { - tuple_space[id] = tmpTuple.next; - } - tmpTuple = null; - } - } else { - if(tmpTuple == null) { - //ServerSocketChannel sc = (ServerSocketChannel)key.channel(); - tmpTuple = tuple_space[id] = new Tuple((SocketChannel)key.channel()); - tmpTuple.next = null; - }else { - while(tmpTuple.next !=null) tmpTuple =tmpTuple.next; - tmpTuple.next= new Tuple((SocketChannel)key.channel()); - tmpTuple = tmpTuple.next; - tmpTuple.next = null; - } - - tmpTuple.setMode(mode); - int seq2 = command.getInt(LINDA_SEQ_OFFSET); - command.rewind(); - tmpTuple.setSeq(seq2); - tmpTuple.ch = (SocketChannel) key.channel(); - tmpTuple.setDataLength(0); - ByteBuffer buff = ByteBuffer.allocate(0); - buff.rewind(); - tmpTuple.setData(buff); - - if(debug){ - System.out.println("data inserted insert seq = "+seq2 +", id = "+id); - } - } - - //} else if (command.getInt(LINDA_MODE_OFFSET) == PSX_WAIT_RD) { - return sendtext; - } - - protected String Check(SelectionKey key, ByteBuffer command) throws IOException { - ByteBuffer data; - Tuple tmpTuple; - int id; - char idc = (char)command.getShort(LINDA_ID_OFFSET); - command.rewind(); - id = (int)idc; - String sendtext = "none"; - - tmpTuple = tuple_space[id]; - while(tmpTuple != null && tmpTuple.next != null && (tmpTuple.mode == 'w')){ - tmpTuple = tmpTuple.next; - } - if (tmpTuple != null && (tmpTuple.mode == 'o')) { - command.putInt(LINDA_DATA_LENGTH_OFFSET, tmpTuple.datalen); - command.rewind(); - data = tmpTuple.getData(); - }else { - //means no out tuple - command.putInt(LINDA_DATA_LENGTH_OFFSET, 0); - command.rewind(); - data = null; - data = ByteBuffer.allocate(0); - } - send(key, command, data); - - sendtext = getdataString(data); - - return sendtext; - } - - public void send(SocketChannel ch, ByteBuffer command, ByteBuffer data) - throws IOException { - if (debug) { - if (command == null) { - System.out.println("Manager_run: command is null"); - } - if (data == null) { - System.out.println("Manager_run: data is null"); - } - } - int send_size = LINDA_HEADER_SIZE; - int count = 0; - - //command Send - command.rewind(); - while(send_size > 0){ - count = ch.write(command); - if(count < 0){ - System.out.println("Write Falied! close ch:"+ch); - ch.close(); - return; - } - send_size -= count; - } - - //data Send - data.rewind(); - if(data != null) { - data.rewind(); - ch.write(data); - } - } - - public void send(SelectionKey key, ByteBuffer command, ByteBuffer data) - throws IOException { - SocketChannel ch = (SocketChannel)key.channel(); - send(ch,command,data); - } - - private String getdataString(ByteBuffer data) { - String sendtext; - data.rewind(); - //set sendtext - //CharBuffer chardata = data.asCharBuffer(); - - //Decode UTF-8 to System Encoding(UTF-16) - Charset charset = Charset.forName("UTF-8"); - CharsetDecoder decoder = charset.newDecoder(); - CharBuffer cb = null; - try { - cb = decoder.decode(data); - } catch (CharacterCodingException e) { - e.printStackTrace(); - } - cb.rewind(); - - sendtext = cb.toString(); - return sendtext; - } - - public void handle(SelectionKey key) throws ClosedChannelException, - IOException { - - } -} \ No newline at end of file
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/fdl/TupleSpace.java Mon Aug 18 06:17:54 2008 +0900 @@ -0,0 +1,329 @@ +package fdl; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.CharBuffer; +import java.nio.channels.SelectionKey; +import java.nio.channels.SocketChannel; +import java.nio.charset.CharacterCodingException; +import java.nio.charset.Charset; +import java.nio.charset.CharsetDecoder; + +public class TupleSpace implements PSXQueueInterface{ + static final boolean debug = true; + static final int CAPSIZE = 4194304; + public Tuple[] tuple_space; + + public TupleSpace(Tuple[] _tuple_space) { + super(); + // 読みこんだデータを格納するためのリストの初期化 + tuple_space = _tuple_space; + } + + public TupleSpace() { + // TODO Auto-generated constructor stub + } + + protected String Out(ByteBuffer command, ByteBuffer data) throws IOException { + Tuple tmpTuple; + int id; + int datasize; + char idc = (char)command.getShort(LINDA_ID_OFFSET); + command.rewind(); + id = (int)idc; + String sendtext = "none"; + + datasize = command.getInt(LINDA_DATA_LENGTH_OFFSET); + command.rewind(); + + System.out.println("*** out command : id = "+id +" ***"); + + while((tuple_space[id] != null) && + ((tuple_space[id].mode == PSX_WAIT_RD)||(tuple_space[id].mode == PSX_RD))) { + command.put(LINDA_MODE_OFFSET, (byte)'a'); + command.rewind(); + command.putInt(LINDA_SEQ_OFFSET, tuple_space[id].getSeq()); + command.rewind(); + //if(debug){ + //int sendsize = tmpTuple.getdataLength()+LINDA_HEADER_SIZE; + //System.out.println("send size "+sendsize+" : mode = "+(char)mode); + //} + //ByteBuffer sendcommand = tmpTuple.getCommand(); + //ByteBuffer senddata = tmpTuple.getData(); + send(tuple_space[id].ch, command, data); + + sendtext = getdataString(data); + + + //後処理 + tmpTuple = tuple_space[id]; + tuple_space[id] = tmpTuple.next; + tmpTuple = null; + } + if(tuple_space[id] != null && tuple_space[id].mode == PSX_IN) { + command.put(LINDA_MODE_OFFSET, (byte)'a'); + command.rewind(); + command.putInt(LINDA_SEQ_OFFSET, tuple_space[id].getSeq()); + command.rewind(); + + if(debug){ + int sendsize = datasize+LINDA_HEADER_SIZE; + System.out.println("send size "+sendsize+" : mode = "+(char)'a'); + } + //ByteBuffer sendcommand = tmpTuple.getCommand(); + //ByteBuffer senddata = tmpTuple.getData(); + send(tuple_space[id].ch, command, data); + + sendtext = getdataString(data); + + //後処理 + tmpTuple = tuple_space[id]; + tuple_space[id] = tmpTuple.next; + tmpTuple = null; + } else if ((tuple_space[id] == null)|| (tuple_space[id].getMode() == PSX_OUT)) { + if((tmpTuple = tuple_space[id]) == null) { + tmpTuple = tuple_space[id] = new Tuple(); + tmpTuple.next = null; + } + else { + while(tmpTuple.next != null) tmpTuple = tmpTuple.next; + tmpTuple.next = new Tuple(); + tmpTuple = tmpTuple.next; + tmpTuple.next = null; + } + + tmpTuple.setMode('o'); + int seq = command.getInt(LINDA_SEQ_OFFSET); + command.rewind(); + tmpTuple.setSeq(seq); + tmpTuple.setData(data); + tmpTuple.setDataLength(datasize); + if(debug){ + System.out.println("data inserted len = "+tmpTuple.getdataLength()+" : id = "+id); + } + } + else { + System.out.println("Incorrect mode :"+(char)tuple_space[id].getMode()); + command.clear(); + data.clear(); + System.exit(1); + } + return sendtext; + } + + protected void Wait_Rd(SelectionKey key, ByteBuffer command, int mode) { + Tuple tmpTuple; + int id; + + char idc = (char)command.getShort(LINDA_ID_OFFSET); + command.rewind(); + id = (int)idc; + + if (debug) + System.out.println("*** "+(char)mode+" command : id = "+ id +" ***\n"); + + tmpTuple = new Tuple(); + tmpTuple.setMode(mode); + int seq = command.getInt(LINDA_SEQ_OFFSET); + command.rewind(); + tmpTuple.setSeq(seq); + tmpTuple.ch = (SocketChannel) key.channel(); + tmpTuple.setDataLength(0); + ByteBuffer buff = ByteBuffer.allocate(0); + tmpTuple.setData(buff); + tmpTuple.next = tuple_space[id]; + tuple_space[id] = tmpTuple; + if(debug){ + System.out.println("data inserted insert seq = "+seq +", id = "+id); + } + } + + protected String In_Rd(SelectionKey key, ByteBuffer command, int mode) + throws IOException { + Tuple tmpTuple = read_in_1(key, command, mode); + + if (tmpTuple!=null) { + //send + ByteBuffer sendcommand = tmpTuple.getCommand(); + ByteBuffer senddata = tmpTuple.getData(); + send(key,sendcommand, senddata); + } + String sendtext = getdataString(tmpTuple.getData()); + return sendtext; + } + + private Tuple read_in_1(SelectionKey key, ByteBuffer command, int mode) { + Tuple tmpTuple; + int id; + //id = command.getInt(LINDA_ID_OFFSET); + //int mode = command.getInt(LINDA_MODE_OFFSET); + Tuple temp = null; + + char idc = (char)command.getShort(LINDA_ID_OFFSET); + command.rewind(); + id = (int)idc; + + + System.out.println("*** "+(char)mode+" command : id = "+ id +" ***\n"); + + tmpTuple = tuple_space[id]; + + //wを無視 + while(tmpTuple != null && tmpTuple.next != null && (tmpTuple.mode == 'w')){ + temp = tmpTuple; + tmpTuple = tmpTuple.next; + } + + if (tmpTuple != null && (tmpTuple.mode == 'o')){ + //tmpTuple = new Tuple((SocketChannel)key.channel()); + int seq = command.getInt(LINDA_SEQ_OFFSET); + command.rewind(); + tmpTuple.setCommand('a', seq); + + if(debug){ + int sendsize = tmpTuple.getdataLength()+LINDA_HEADER_SIZE; + System.out.println("send size "+sendsize+" : mode = "+(char)tmpTuple.getMode()); + } + + + + //INの場合はremoveする + if(mode == PSX_IN) { + if(tmpTuple.data != null){ + //ByteBuffer buff = ByteBuffer.allocate(0); + //tmpTuple.setData(buff); + tmpTuple.data = null; + } + if(temp != null){ + temp.next = tmpTuple.next; + } + else { + tuple_space[id] = tmpTuple.next; + } + } + } else { + if(tmpTuple == null) { + //ServerSocketChannel sc = (ServerSocketChannel)key.channel(); + tmpTuple = tuple_space[id] = new Tuple((SocketChannel)key.channel()); + tmpTuple.next = null; + }else { + while(tmpTuple.next !=null) tmpTuple =tmpTuple.next; + tmpTuple.next= new Tuple((SocketChannel)key.channel()); + tmpTuple = tmpTuple.next; + tmpTuple.next = null; + } + + tmpTuple.setMode(mode); + int seq2 = command.getInt(LINDA_SEQ_OFFSET); + command.rewind(); + tmpTuple.setSeq(seq2); + tmpTuple.ch = (SocketChannel) key.channel(); + tmpTuple.setDataLength(0); + ByteBuffer buff = ByteBuffer.allocate(0); + buff.rewind(); + tmpTuple.setData(buff); + tmpTuple = null; + + if(debug){ + System.out.println("data inserted insert seq = "+seq2 +", id = "+id); + } + } + return tmpTuple; + } + + protected String Check(SelectionKey key, ByteBuffer command) throws IOException { + String sendtext; + ByteBuffer data = check1(command); + send(key, command, data); + + sendtext = getdataString(data); + + return sendtext; + } + + private ByteBuffer check1(ByteBuffer command) { + ByteBuffer data; + Tuple tmpTuple; + int id; + char idc = (char)command.getShort(LINDA_ID_OFFSET); + command.rewind(); + id = (int)idc; + + tmpTuple = tuple_space[id]; + while(tmpTuple != null && tmpTuple.next != null && (tmpTuple.mode == 'w')){ + tmpTuple = tmpTuple.next; + } + if (tmpTuple != null && (tmpTuple.mode == 'o')) { + command.putInt(LINDA_DATA_LENGTH_OFFSET, tmpTuple.datalen); + command.rewind(); + data = tmpTuple.getData(); + }else { + //means no out tuple + command.putInt(LINDA_DATA_LENGTH_OFFSET, 0); + command.rewind(); + data = ByteBuffer.allocate(0); + } + return data; + } + + public void send(SocketChannel ch, ByteBuffer command, ByteBuffer data) + throws IOException { + if (debug) { + if (command == null) { + System.out.println("Manager_run: command is null"); + } + if (data == null) { + System.out.println("Manager_run: data is null"); + } + } + int send_size = LINDA_HEADER_SIZE; + int count = 0; + + //command Send + command.rewind(); + while(send_size > 0){ + count = ch.write(command); + if(count < 0){ + System.out.println("Write Falied! close ch:"+ch); + ch.close(); + return; + } + send_size -= count; + } + + //data Send + data.rewind(); + if(data != null) { + data.rewind(); + ch.write(data); + } + } + + public void send(SelectionKey key, ByteBuffer command, ByteBuffer data) + throws IOException { + SocketChannel ch = (SocketChannel)key.channel(); + send(ch,command,data); + } + + private String getdataString(ByteBuffer data) { + String sendtext; + data.rewind(); + //set sendtext + //CharBuffer chardata = data.asCharBuffer(); + + //Decode UTF-8 to System Encoding(UTF-16) + Charset charset = Charset.forName("UTF-8"); + CharsetDecoder decoder = charset.newDecoder(); + CharBuffer cb = null; + try { + cb = decoder.decode(data); + } catch (CharacterCodingException e) { + e.printStackTrace(); + } + cb.rewind(); + + sendtext = cb.toString(); + return sendtext; + } + +} \ No newline at end of file