Mercurial > hg > FederatedLinda
changeset 17:609b288f47f9
*** empty log message ***
author | kono |
---|---|
date | Mon, 18 Aug 2008 07:28:29 +0900 |
parents | cccf34386cad |
children | 2cbd98257d61 |
files | src/fdl/AcceptHandler.java src/fdl/ComDebug.java src/fdl/ComDebug_Client.java src/fdl/FDLindaServ.java src/fdl/FederatedLinda.java src/fdl/Handler.java src/fdl/IOHandler.java src/fdl/MetaLinda.java src/fdl/PSX.java src/fdl/PSXLinda.java src/fdl/PSXQueue.java src/fdl/PSXQueueInterface.java src/fdl/PSXReply.java src/fdl/TestEtc.java src/fdl/Tuple.java src/fdl/TupleSpace.java |
diffstat | 16 files changed, 257 insertions(+), 271 deletions(-) [+] |
line wrap: on
line diff
--- a/src/fdl/AcceptHandler.java Mon Aug 18 06:17:54 2008 +0900 +++ b/src/fdl/AcceptHandler.java Mon Aug 18 07:28:29 2008 +0900 @@ -10,7 +10,7 @@ import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; -public class AcceptHandler implements TupleHandler, PSXQueueInterface { +public class AcceptHandler implements TupleHandler { public Tuple[] tuple_space; public static int user = 0;
--- a/src/fdl/ComDebug.java Mon Aug 18 06:17:54 2008 +0900 +++ b/src/fdl/ComDebug.java Mon Aug 18 07:28:29 2008 +0900 @@ -5,7 +5,6 @@ import java.net.InetSocketAddress; import java.net.UnknownHostException; import java.nio.ByteBuffer; -import java.nio.ByteOrder; import java.nio.channels.SelectionKey; import java.nio.channels.SocketChannel; import java.util.Hashtable; @@ -21,7 +20,7 @@ * ldserve のMain Loop を呼び出すか? */ -public class ComDebug implements PSXQueueInterface{ +public class ComDebug { static final boolean debug = true; //public static int seq = 0; public static Hashtable<String, Integer> Com_Hashtable = new Hashtable<String, Integer>(); @@ -45,13 +44,7 @@ data.flip(); // limit = current position, position = 0 //commandをセット - command.order(ByteOrder.BIG_ENDIAN); - command.putInt(LINDA_PACKET_LENGTH_OFFSET,(report_txt).length()*2+LINDA_HEADER_SIZE-INT_SIZE); - command.put(LINDA_MODE_OFFSET,(byte)'D'); - command.putShort(LINDA_ID_OFFSET,(short) 0); - command.putInt(LINDA_SEQ_OFFSET,0); - command.putInt(LINDA_DATA_LENGTH_OFFSET,(report_txt).length()*2); - command.rewind(); + PSX.setReportCommand(command, report_txt); //送信 TupleSpace io = new TupleSpace(); @@ -60,6 +53,7 @@ io.send(it.next(), command, data); } } + private static String getRemoteHostAndPort(SocketChannel channel) { String socketString = channel.socket().getRemoteSocketAddress().toString(); String[] split = socketString.split("/");
--- a/src/fdl/ComDebug_Client.java Mon Aug 18 06:17:54 2008 +0900 +++ b/src/fdl/ComDebug_Client.java Mon Aug 18 07:28:29 2008 +0900 @@ -7,7 +7,7 @@ import java.nio.CharBuffer; -public class ComDebug_Client implements PSXQueueInterface{ +public class ComDebug_Client { static int id; static final boolean debug = false; @@ -56,7 +56,7 @@ } System.out.println("COM_DEBUG Connected.["+host+":"+port+"]"); - psx.in(PRIVILEGED_ID_START+connect_num); + psx.in(PSX.PRIVILEGED_ID_START+connect_num); connect_num++; while(true) { fdl.sync_com(1000);
--- a/src/fdl/FDLindaServ.java Mon Aug 18 06:17:54 2008 +0900 +++ b/src/fdl/FDLindaServ.java Mon Aug 18 07:28:29 2008 +0900 @@ -12,7 +12,7 @@ import java.nio.channels.spi.AbstractSelector; import java.nio.channels.spi.SelectorProvider; -public class FDLindaServ implements PSXQueueInterface { +public class FDLindaServ { static final int MAX_REQ = 1; static final int FAIL = (-1); static final int MAX_UAER = 4;
--- a/src/fdl/FederatedLinda.java Mon Aug 18 06:17:54 2008 +0900 +++ b/src/fdl/FederatedLinda.java Mon Aug 18 07:28:29 2008 +0900 @@ -37,7 +37,7 @@ */ -public class FederatedLinda implements PSXQueueInterface { +public class FederatedLinda { static FederatedLinda fdl; static int MAX_SEQUENCE = 2048; @@ -95,8 +95,8 @@ } qsize++; - if (mode != PSX_OUT) { - PSXReply p = new PSXReply(PSX_REPLY,callback); + if (mode != PSX.PSX_OUT) { + PSXReply p = new PSXReply(PSX.PSX_REPLY,callback); p.seq = seq(p); c.setSeq(p.seq); if (debug) { @@ -208,13 +208,13 @@ private void chkCom(SocketChannel sock) throws IOException { int length; - ByteBuffer command = ByteBuffer.allocate(LINDA_HEADER_SIZE); + ByteBuffer command = ByteBuffer.allocate(PSX.LINDA_HEADER_SIZE); command.order(ByteOrder.BIG_ENDIAN); debug = false; sock.read(command); command.rewind(); - length = command.getInt(LINDA_DATA_LENGTH_OFFSET); + length = command.getInt(PSX.LINDA_DATA_LENGTH_OFFSET); if (length>0) { ByteBuffer data = ByteBuffer.allocate(length); int read = length; @@ -230,14 +230,7 @@ data.rewind(); if (debug) { - char id = (char)command.getShort(LINDA_ID_OFFSET); - System.out.println("LENGTH:"+command.getInt(LINDA_PACKET_LENGTH_OFFSET)+" "+ - "MODE:"+(char)command.get(LINDA_MODE_OFFSET)+" "+ - "ID:"+(int)id+" "+ - "SEQ:"+command.getInt(LINDA_SEQ_OFFSET)+" "+ - "DATA LENGTH:"+command.getInt(LINDA_DATA_LENGTH_OFFSET)+" "); - System.out.println("DATA:"+data); - command.rewind(); + PSX.printCommand(command, data); } //if (debug_com) { String comdata =""; @@ -259,8 +252,8 @@ data.rewind(); }***/ - int rseq = command.getInt(LINDA_SEQ_OFFSET); - int mode = command.get(LINDA_MODE_OFFSET); + int rseq = command.getInt(PSX.LINDA_SEQ_OFFSET); + int mode = command.get(PSX.LINDA_MODE_OFFSET); Integer a; /*** if (debug) { @@ -295,12 +288,12 @@ private void chkServe(SocketChannel sock) throws IOException { int length; - ByteBuffer command = ByteBuffer.allocate(LINDA_HEADER_SIZE); + ByteBuffer command = ByteBuffer.allocate(PSX.LINDA_HEADER_SIZE); command.order(ByteOrder.BIG_ENDIAN); sock.read(command); command.rewind(); - length = command.getInt(LINDA_DATA_LENGTH_OFFSET); + length = command.getInt(PSX.LINDA_DATA_LENGTH_OFFSET); if (length>0) { ByteBuffer data = ByteBuffer.allocate(length); int read = length; @@ -316,14 +309,7 @@ data.rewind(); if (debug) { - char id = (char)command.getShort(LINDA_ID_OFFSET); - System.out.println("LENGTH:"+command.getInt(LINDA_PACKET_LENGTH_OFFSET)+" "+ - "MODE:"+(char)command.get(LINDA_MODE_OFFSET)+" "+ - "ID:"+(int)id+" "+ - "SEQ:"+command.getInt(LINDA_SEQ_OFFSET)+" "+ - "DATA LENGTH:"+command.getInt(LINDA_DATA_LENGTH_OFFSET)+" "); - System.out.println("DATA:"+data); - command.rewind(); + PSX.printCommand(command, data); } /***if (debug) { System.out.print("header:"); @@ -337,8 +323,8 @@ data.rewind(); }***/ - int rseq = command.getInt(LINDA_SEQ_OFFSET); - int mode = command.get(LINDA_MODE_OFFSET); + int rseq = command.getInt(PSX.LINDA_SEQ_OFFSET); + int mode = command.get(PSX.LINDA_MODE_OFFSET); Integer a; /*** if (debug) {
--- a/src/fdl/Handler.java Mon Aug 18 06:17:54 2008 +0900 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,9 +0,0 @@ - -package fdl; -import java.io.IOException; -import java.nio.channels.ClosedChannelException; -import java.nio.channels.SelectionKey; - -public interface Handler { - public void handle(SelectionKey key) throws ClosedChannelException, IOException; -}
--- a/src/fdl/IOHandler.java Mon Aug 18 06:17:54 2008 +0900 +++ b/src/fdl/IOHandler.java Mon Aug 18 07:28:29 2008 +0900 @@ -32,11 +32,11 @@ SocketChannel channel = (SocketChannel)key.channel(); // 読み込み用のバッファの生成 - ByteBuffer command = ByteBuffer.allocate(LINDA_HEADER_SIZE); + ByteBuffer command = ByteBuffer.allocate(PSX.LINDA_HEADER_SIZE); command.order(ByteOrder.BIG_ENDIAN); command.clear(); - int readsize = LINDA_HEADER_SIZE; + int readsize = PSX.LINDA_HEADER_SIZE; int count = 0; // 読み込み @@ -56,8 +56,8 @@ } command.rewind(); - int len = command.getInt(LINDA_PACKET_LENGTH_OFFSET); - int datalen = command.getInt(LINDA_DATA_LENGTH_OFFSET); + int len = command.getInt(PSX.LINDA_PACKET_LENGTH_OFFSET); + int datalen = command.getInt(PSX.LINDA_DATA_LENGTH_OFFSET); ByteBuffer data = ByteBuffer.allocate(datalen); int read = datalen; @@ -78,26 +78,19 @@ command.rewind(); if (debug) { - /*** print data ***/ - char id = (char)command.getShort(LINDA_ID_OFFSET); - System.out.println("LENGTH:"+command.getInt(LINDA_PACKET_LENGTH_OFFSET)+" "+ - "MODE:"+(char)command.get(LINDA_MODE_OFFSET)+" "+ - "ID:"+(int)id+" "+ - "SEQ:"+command.getInt(LINDA_SEQ_OFFSET)+" "); - command.rewind(); + PSX.printData(command); } manager_run(key, command, data, len); key.interestOps(key.interestOps() | SelectionKey.OP_READ ); } - - public void manager_run(SelectionKey key, ByteBuffer command, ByteBuffer data, int len) throws IOException { + public void manager_run(SelectionKey key, ByteBuffer command, ByteBuffer data, int len) throws IOException { command.order(ByteOrder.BIG_ENDIAN); - int mode = command.get(LINDA_MODE_OFFSET); - char idc = (char)command.getShort(LINDA_ID_OFFSET); + int mode = command.get(PSX.LINDA_MODE_OFFSET); + char idc = (char)command.getShort(PSX.LINDA_ID_OFFSET); int id = (int)idc; - int seq = command.getInt(LINDA_SEQ_OFFSET); + int seq = command.getInt(PSX.LINDA_SEQ_OFFSET); command.rewind(); String sendtext = "none"; @@ -112,14 +105,14 @@ if((mode == '!') || (len == 0)) { ClosewishComDebug(key, command, reportCh_list); } - else if(mode == PSX_CHECK) { + else if(mode == PSX.PSX_CHECK) { sendtext = Check(key, command); } - else if(mode == PSX_IN || mode == PSX_RD){ + else if(mode == PSX.PSX_IN || mode == PSX.PSX_RD){ sendtext = In_Rd(key, command, mode); - } else if (mode == PSX_WAIT_RD) { + } else if (mode == PSX.PSX_WAIT_RD) { Wait_Rd(key, command, mode); - } else if(mode == PSX_OUT) { + } else if(mode == PSX.PSX_OUT) { sendtext = Out(command, data); } else { System.out.println("Uncorrect buffer"); @@ -127,7 +120,7 @@ } //COM_DEBUG - if(id > PRIVILEGED_ID_START && id < PRIVILEGED_ID_END){ + if(id > PSX.PRIVILEGED_ID_START && id < PSX.PRIVILEGED_ID_END){ ComDebug.addChannel(key, reportCh_list); } //DEBUG用カウンタ
--- a/src/fdl/MetaLinda.java Mon Aug 18 06:17:54 2008 +0900 +++ b/src/fdl/MetaLinda.java Mon Aug 18 07:28:29 2008 +0900 @@ -24,7 +24,7 @@ */ -public class MetaLinda implements PSXQueueInterface,PSXLindaInterface { +public class MetaLinda implements PSXLindaInterface { public PSXLinda next;
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/fdl/PSX.java Mon Aug 18 07:28:29 2008 +0900 @@ -0,0 +1,106 @@ + +/* + * @(#)PSXQueueInterface.java 1.1 06/04/01 + * + * Copyright 2006 Shinji KONO + * + + PSX Lidna + Trasport layer of PSX Linda library + + */ + +package fdl; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; + + + +/** + PSXQueueInterface + + */ + +public class PSX { + static final int PSX_IN = 'i'; + static final int PSX_OUT = 'o'; + static final int PSX_UPDATE = 'u'; + static final int PSX_RD = 'r'; + static final int PSX_CHECK = 'c'; + static final int PSX_REPLY = '?'; + static final int PSX_WAIT_RD = 'w'; + static final int PSX_ANSWER = 'a'; + static final int PSX_HTTP_ANSWER = 'P'; // Put + static final int PSX_HTTP_REQUEST = 'G'; // Get + static final int PSX_COM_DEBUG = 'D'; //Communication DEBUG + + static final int LINDA_PACKET_LENGTH_OFFSET =0; + static final int LINDA_MODE_OFFSET =0+4; + static final int LINDA_ID_OFFSET =1+4; + static final int LINDA_SEQ_OFFSET =3+4; + static final int LINDA_DATA_LENGTH_OFFSET =7+4; + static final int LINDA_HEADER_SIZE =12+4; + static final int INT_SIZE =4; + static final int SHORT_SIZE =2; + + static final int PRIVILEGED_ID_START = 32768; + static final int PRIVILEGED_ID_END = 36864; + + // this method should be removed + static void setReportCommand(ByteBuffer command, String report_txt) { + command.order(ByteOrder.BIG_ENDIAN); + command.putInt(LINDA_PACKET_LENGTH_OFFSET,(report_txt).length()*2+LINDA_HEADER_SIZE-INT_SIZE); + command.put(LINDA_MODE_OFFSET,(byte)'D'); + command.putShort(LINDA_ID_OFFSET,(short) 0); + command.putInt(LINDA_SEQ_OFFSET,0); + command.putInt(LINDA_DATA_LENGTH_OFFSET,(report_txt).length()*2); + command.rewind(); + } + + static void printCommand(ByteBuffer command, ByteBuffer data) { + char id = (char)command.getShort(LINDA_ID_OFFSET); + System.out.println("LENGTH:"+command.getInt(LINDA_PACKET_LENGTH_OFFSET)+" "+ + "MODE:"+(char)command.get(LINDA_MODE_OFFSET)+" "+ + "ID:"+(int)id+" "+ + "SEQ:"+command.getInt(LINDA_SEQ_OFFSET)+" "+ + "DATA LENGTH:"+command.getInt(LINDA_DATA_LENGTH_OFFSET)+" "); + System.out.println("DATA:"+data); + command.rewind(); + } + + static void printData(ByteBuffer command) { + /*** print data ***/ + char id = (char)command.getShort(LINDA_ID_OFFSET); + System.out.println("LENGTH:"+command.getInt(LINDA_PACKET_LENGTH_OFFSET)+" "+ + "MODE:"+(char)command.get(LINDA_MODE_OFFSET)+" "+ + "ID:"+(int)id+" "+ + "SEQ:"+command.getInt(LINDA_SEQ_OFFSET)+" "); + command.rewind(); + } + + + static void setCommand(ByteBuffer command, int _mode, int _id, int _seq, int _datalen) { + command = ByteBuffer.allocate(LINDA_HEADER_SIZE); + command.order(ByteOrder.BIG_ENDIAN); + + command.putInt(LINDA_PACKET_LENGTH_OFFSET, + _datalen+LINDA_HEADER_SIZE-INT_SIZE); + command.put(LINDA_MODE_OFFSET,(byte)_mode); + command.putShort(LINDA_ID_OFFSET,(short)_id); + command.putInt(LINDA_SEQ_OFFSET,_seq); + command.putInt(LINDA_DATA_LENGTH_OFFSET,_datalen); + command.rewind(); + } + + static void setAnserCommand(ByteBuffer command, int seq) { + command.put(LINDA_MODE_OFFSET, (byte)'a'); + command.rewind(); + command.putInt(LINDA_SEQ_OFFSET, seq); + command.rewind(); + } + + +} + +/* end */
--- a/src/fdl/PSXLinda.java Mon Aug 18 06:17:54 2008 +0900 +++ b/src/fdl/PSXLinda.java Mon Aug 18 07:28:29 2008 +0900 @@ -34,7 +34,7 @@ */ -public class PSXLinda implements PSXQueueInterface,PSXLindaInterface { +public class PSXLinda implements PSXLindaInterface { private FederatedLinda fdl; private SocketChannel socketChannel; public String host; @@ -96,44 +96,44 @@ } public PSXReply in(int id) { - PSXReply r = fdl.psx_queue(this, id, null, 0, PSX_IN, (PSXCallback)null); + PSXReply r = fdl.psx_queue(this, id, null, 0, PSX.PSX_IN, (PSXCallback)null); return r; } public void in(int id, PSXCallback callback) { - fdl.psx_queue(this, id, null, 0, PSX_IN, callback); + fdl.psx_queue(this, id, null, 0, PSX.PSX_IN, callback); } public PSXReply ck(int id) { - PSXReply r = fdl.psx_queue(this, id, null, 0, PSX_IN, null); + PSXReply r = fdl.psx_queue(this, id, null, 0, PSX.PSX_IN, null); return r; } public void ck(int id, PSXCallback callback) { - fdl.psx_queue(this, id, null, 0, PSX_IN, callback); + fdl.psx_queue(this, id, null, 0, PSX.PSX_IN, callback); } public PSXReply out(int id, ByteBuffer data,int size) { - PSXReply r = fdl.psx_queue(this, id, data, size, PSX_OUT, null); + PSXReply r = fdl.psx_queue(this, id, data, size, PSX.PSX_OUT, null); return r; } public PSXReply update(int id, ByteBuffer data,int size) { - PSXReply r = fdl.psx_queue(this, id, data, size, PSX_UPDATE, null); + PSXReply r = fdl.psx_queue(this, id, data, size, PSX.PSX_UPDATE, null); return r; } public void update(int id, ByteBuffer data,int size,PSXCallback callback) { - fdl.psx_queue(this, id, data, size, PSX_UPDATE, callback); + fdl.psx_queue(this, id, data, size, PSX.PSX_UPDATE, callback); } public PSXReply rd(int id) { - PSXReply r = fdl.psx_queue(this, id, null, 0, PSX_RD, null); + PSXReply r = fdl.psx_queue(this, id, null, 0, PSX.PSX_RD, null); return r; } public void rd(int id, PSXCallback callback) { - fdl.psx_queue(this, id, null, 0, PSX_RD, callback); + fdl.psx_queue(this, id, null, 0, PSX.PSX_RD, callback); } public PSXLinda add(PSXLinda linda) {
--- a/src/fdl/PSXQueue.java Mon Aug 18 06:17:54 2008 +0900 +++ b/src/fdl/PSXQueue.java Mon Aug 18 07:28:29 2008 +0900 @@ -14,8 +14,6 @@ import java.io.IOException; import java.nio.ByteBuffer; -import java.nio.ByteOrder; -//import java.nio.channels.*; /** PSXQueue @@ -23,7 +21,7 @@ Iterator */ -public class PSXQueue implements PSXQueueInterface { +public class PSXQueue { public int tspace_id; public int id; public int mode; @@ -46,20 +44,12 @@ } private void setCommand() { - command = ByteBuffer.allocate(LINDA_HEADER_SIZE); - command.order(ByteOrder.BIG_ENDIAN); - - command.putInt(LINDA_PACKET_LENGTH_OFFSET, - size+LINDA_HEADER_SIZE-INT_SIZE); - command.put(LINDA_MODE_OFFSET,(byte)mode); - command.putShort(LINDA_ID_OFFSET,(short)id); - command.putInt(LINDA_SEQ_OFFSET,seq); - command.putInt(LINDA_DATA_LENGTH_OFFSET,size); + PSX.setCommand(command,mode,id,size,seq); } public void setSeq(int _seq) { seq = _seq; - command.putInt(LINDA_SEQ_OFFSET,seq); + command.putInt(PSX.LINDA_SEQ_OFFSET,seq); } public void Send()
--- a/src/fdl/PSXQueueInterface.java Mon Aug 18 06:17:54 2008 +0900 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,51 +0,0 @@ - -/* - * @(#)PSXQueueInterface.java 1.1 06/04/01 - * - * Copyright 2006 Shinji KONO - * - - PSX Lidna - Trasport layer of PSX Linda library - - */ - -package fdl; - -import java.nio.ByteBuffer; - - -/** - PSXQueueInterface - - */ - -public interface PSXQueueInterface { - static final int PSX_IN = 'i'; - static final int PSX_OUT = 'o'; - static final int PSX_UPDATE = 'u'; - static final int PSX_RD = 'r'; - static final int PSX_CHECK = 'c'; - static final int PSX_REPLY = '?'; - static final int PSX_WAIT_RD = 'w'; - static final int PSX_ANSWER = 'a'; - static final int PSX_HTTP_ANSWER = 'P'; // Put - static final int PSX_HTTP_REQUEST = 'G'; // Get - static final int PSX_COM_DEBUG = 'D'; //Communication DEBUG - - static final int LINDA_PACKET_LENGTH_OFFSET =0; - static final int LINDA_MODE_OFFSET =0+4; - static final int LINDA_ID_OFFSET =1+4; - static final int LINDA_SEQ_OFFSET =3+4; - static final int LINDA_DATA_LENGTH_OFFSET =7+4; - static final int LINDA_HEADER_SIZE =12+4; - static final int INT_SIZE =4; - static final int SHORT_SIZE =2; - - static final int PRIVILEGED_ID_START = 32768; - static final int PRIVILEGED_ID_END = 36864; - - -} - -/* end */
--- a/src/fdl/PSXReply.java Mon Aug 18 06:17:54 2008 +0900 +++ b/src/fdl/PSXReply.java Mon Aug 18 07:28:29 2008 +0900 @@ -16,7 +16,7 @@ package fdl; import java.nio.ByteBuffer; -public class PSXReply implements PSXQueueInterface { +public class PSXReply { public ByteBuffer command; public ByteBuffer data; public int seq; @@ -41,24 +41,24 @@ System.out.print("setAnswer mode:"); System.out.println(mode); System.out.print("setAnswer bool:"); - System.out.println(mode==PSX_ANSWER); + System.out.println(mode==PSX.PSX_ANSWER); } } public int getMode() { - return command.get(LINDA_MODE_OFFSET); + return command.get(PSX.LINDA_MODE_OFFSET); } public int getId() { - return command.getShort(LINDA_ID_OFFSET); + return command.getShort(PSX.LINDA_ID_OFFSET); } public int getSeq() { - return command.getInt(LINDA_SEQ_OFFSET); + return command.getInt(PSX.LINDA_SEQ_OFFSET); } public int getLength() { - return command.getInt(LINDA_DATA_LENGTH_OFFSET); + return command.getInt(PSX.LINDA_DATA_LENGTH_OFFSET); } public ByteBuffer getData() { @@ -67,7 +67,7 @@ } public boolean ready() { - return mode==PSX_ANSWER; + return mode==PSX.PSX_ANSWER; } }
--- a/src/fdl/TestEtc.java Mon Aug 18 06:17:54 2008 +0900 +++ b/src/fdl/TestEtc.java Mon Aug 18 07:28:29 2008 +0900 @@ -4,7 +4,7 @@ import java.io.IOException; //import java.nio.ByteBuffer; -public class TestEtc implements PSXQueueInterface { +public class TestEtc { static final int MAX_REQ = 1; static final int FAIL = (-1); static final int MAX_UAER = 4;
--- a/src/fdl/Tuple.java Mon Aug 18 06:17:54 2008 +0900 +++ b/src/fdl/Tuple.java Mon Aug 18 07:28:29 2008 +0900 @@ -2,11 +2,10 @@ package fdl; import java.nio.ByteBuffer; -import java.nio.ByteOrder; import java.nio.channels.SocketChannel; -public class Tuple implements PSXQueueInterface { +public class Tuple { public int mode; public int id; public int seq; @@ -26,33 +25,14 @@ } public void setCommand(int _mode, int _seq) { - command = ByteBuffer.allocate(LINDA_HEADER_SIZE); - command.order(ByteOrder.BIG_ENDIAN); - - command.putInt(LINDA_PACKET_LENGTH_OFFSET, - this.datalen+LINDA_HEADER_SIZE-INT_SIZE); - command.put(LINDA_MODE_OFFSET,(byte)_mode); - command.putShort(LINDA_ID_OFFSET,(short)this.id); - command.putInt(LINDA_SEQ_OFFSET,_seq); - command.putInt(LINDA_DATA_LENGTH_OFFSET,this.datalen); - command.rewind(); - } - - - public void setCommand(int _mode, int _id, int _seq, int _datalen) { - command = ByteBuffer.allocate(LINDA_HEADER_SIZE); - command.order(ByteOrder.BIG_ENDIAN); - - command.putInt(LINDA_PACKET_LENGTH_OFFSET, - _datalen+LINDA_HEADER_SIZE-INT_SIZE); - command.put(LINDA_MODE_OFFSET,(byte)_mode); - command.putShort(LINDA_ID_OFFSET,(short)_id); - command.putInt(LINDA_SEQ_OFFSET,_seq); - command.putInt(LINDA_DATA_LENGTH_OFFSET,_datalen); - command.rewind(); + PSX.setCommand(command, _mode, id, _seq,datalen); } - public void setTuple(int _mode,int _id, int _seq, int _datalen, ByteBuffer _data) { + public void setCommand(int _mode, int _id, int _seq, int _datalen) { + PSX.setCommand(command, _mode, _id, _seq, _datalen); + } + + public void setTuple(int _mode,int _id, int _seq, int _datalen, ByteBuffer _data) { mode = _mode; id = _id; seq = _seq;
--- a/src/fdl/TupleSpace.java Mon Aug 18 06:17:54 2008 +0900 +++ b/src/fdl/TupleSpace.java Mon Aug 18 07:28:29 2008 +0900 @@ -9,7 +9,7 @@ import java.nio.charset.Charset; import java.nio.charset.CharsetDecoder; -public class TupleSpace implements PSXQueueInterface{ +public class TupleSpace { static final boolean debug = true; static final int CAPSIZE = 4194304; public Tuple[] tuple_space; @@ -25,27 +25,24 @@ } protected String Out(ByteBuffer command, ByteBuffer data) throws IOException { - Tuple tmpTuple; + Tuple tuple; int id; int datasize; - char idc = (char)command.getShort(LINDA_ID_OFFSET); + char idc = (char)command.getShort(PSX.LINDA_ID_OFFSET); command.rewind(); id = (int)idc; String sendtext = "none"; - datasize = command.getInt(LINDA_DATA_LENGTH_OFFSET); + datasize = command.getInt(PSX.LINDA_DATA_LENGTH_OFFSET); command.rewind(); System.out.println("*** out command : id = "+id +" ***"); while((tuple_space[id] != null) && - ((tuple_space[id].mode == PSX_WAIT_RD)||(tuple_space[id].mode == PSX_RD))) { - command.put(LINDA_MODE_OFFSET, (byte)'a'); - command.rewind(); - command.putInt(LINDA_SEQ_OFFSET, tuple_space[id].getSeq()); - command.rewind(); + ((tuple_space[id].mode == PSX.PSX_WAIT_RD)||(tuple_space[id].mode == PSX.PSX_RD))) { + PSX.setAnserCommand(command, tuple_space[id].getSeq()); //if(debug){ - //int sendsize = tmpTuple.getdataLength()+LINDA_HEADER_SIZE; + //int sendsize = tmpTuple.getdataLength()+PSX.LINDA_HEADER_SIZE; //System.out.println("send size "+sendsize+" : mode = "+(char)mode); //} //ByteBuffer sendcommand = tmpTuple.getCommand(); @@ -55,19 +52,14 @@ sendtext = getdataString(data); - //後処理 - tmpTuple = tuple_space[id]; - tuple_space[id] = tmpTuple.next; - tmpTuple = null; + removeTuple(id); + tuple = null; } - if(tuple_space[id] != null && tuple_space[id].mode == PSX_IN) { - command.put(LINDA_MODE_OFFSET, (byte)'a'); - command.rewind(); - command.putInt(LINDA_SEQ_OFFSET, tuple_space[id].getSeq()); - command.rewind(); + if(tuple_space[id] != null && tuple_space[id].mode == PSX.PSX_IN) { + PSX.setAnserCommand(command, tuple_space[id].getSeq()); if(debug){ - int sendsize = datasize+LINDA_HEADER_SIZE; + int sendsize = datasize+PSX.LINDA_HEADER_SIZE; System.out.println("send size "+sendsize+" : mode = "+(char)'a'); } //ByteBuffer sendcommand = tmpTuple.getCommand(); @@ -76,30 +68,28 @@ sendtext = getdataString(data); - //後処理 - tmpTuple = tuple_space[id]; - tuple_space[id] = tmpTuple.next; - tmpTuple = null; - } else if ((tuple_space[id] == null)|| (tuple_space[id].getMode() == PSX_OUT)) { - if((tmpTuple = tuple_space[id]) == null) { - tmpTuple = tuple_space[id] = new Tuple(); - tmpTuple.next = null; + removeTuple(id); + tuple = null; + } else if ((tuple_space[id] == null)|| (tuple_space[id].getMode() == PSX.PSX_OUT)) { + if((tuple = tuple_space[id]) == null) { + tuple = tuple_space[id] = new Tuple(); + tuple.next = null; } else { - while(tmpTuple.next != null) tmpTuple = tmpTuple.next; - tmpTuple.next = new Tuple(); - tmpTuple = tmpTuple.next; - tmpTuple.next = null; + while(tuple.next != null) tuple = tuple.next; + tuple.next = new Tuple(); + tuple = tuple.next; + tuple.next = null; } - tmpTuple.setMode('o'); - int seq = command.getInt(LINDA_SEQ_OFFSET); + tuple.setMode('o'); + int seq = command.getInt(PSX.LINDA_SEQ_OFFSET); command.rewind(); - tmpTuple.setSeq(seq); - tmpTuple.setData(data); - tmpTuple.setDataLength(datasize); + tuple.setSeq(seq); + tuple.setData(data); + tuple.setDataLength(datasize); if(debug){ - System.out.println("data inserted len = "+tmpTuple.getdataLength()+" : id = "+id); + System.out.println("data inserted len = "+tuple.getdataLength()+" : id = "+id); } } else { @@ -111,28 +101,35 @@ return sendtext; } + private void removeTuple(int id) { + Tuple tuple; + //後処理 + tuple = tuple_space[id]; + tuple_space[id] = tuple.next; + } + protected void Wait_Rd(SelectionKey key, ByteBuffer command, int mode) { - Tuple tmpTuple; + Tuple tuple; int id; - char idc = (char)command.getShort(LINDA_ID_OFFSET); + char idc = (char)command.getShort(PSX.LINDA_ID_OFFSET); command.rewind(); id = (int)idc; if (debug) System.out.println("*** "+(char)mode+" command : id = "+ id +" ***\n"); - tmpTuple = new Tuple(); - tmpTuple.setMode(mode); - int seq = command.getInt(LINDA_SEQ_OFFSET); + tuple = new Tuple(); + tuple.setMode(mode); + int seq = command.getInt(PSX.LINDA_SEQ_OFFSET); command.rewind(); - tmpTuple.setSeq(seq); - tmpTuple.ch = (SocketChannel) key.channel(); - tmpTuple.setDataLength(0); + tuple.setSeq(seq); + tuple.ch = (SocketChannel) key.channel(); + tuple.setDataLength(0); ByteBuffer buff = ByteBuffer.allocate(0); - tmpTuple.setData(buff); - tmpTuple.next = tuple_space[id]; - tuple_space[id] = tmpTuple; + tuple.setData(buff); + tuple.next = tuple_space[id]; + tuple_space[id] = tuple; if(debug){ System.out.println("data inserted insert seq = "+seq +", id = "+id); } @@ -140,95 +137,95 @@ protected String In_Rd(SelectionKey key, ByteBuffer command, int mode) throws IOException { - Tuple tmpTuple = read_in_1(key, command, mode); + Tuple tuple = read_in_1(key, command, mode); - if (tmpTuple!=null) { + if (tuple!=null) { //send - ByteBuffer sendcommand = tmpTuple.getCommand(); - ByteBuffer senddata = tmpTuple.getData(); + ByteBuffer sendcommand = tuple.getCommand(); + ByteBuffer senddata = tuple.getData(); send(key,sendcommand, senddata); } - String sendtext = getdataString(tmpTuple.getData()); + String sendtext = getdataString(tuple.getData()); return sendtext; } private Tuple read_in_1(SelectionKey key, ByteBuffer command, int mode) { - Tuple tmpTuple; + Tuple tuple; int id; - //id = command.getInt(LINDA_ID_OFFSET); - //int mode = command.getInt(LINDA_MODE_OFFSET); + //id = command.getInt(PSX.LINDA_ID_OFFSET); + //int mode = command.getInt(PSX.LINDA_MODE_OFFSET); Tuple temp = null; - char idc = (char)command.getShort(LINDA_ID_OFFSET); + char idc = (char)command.getShort(PSX.LINDA_ID_OFFSET); command.rewind(); id = (int)idc; System.out.println("*** "+(char)mode+" command : id = "+ id +" ***\n"); - tmpTuple = tuple_space[id]; + tuple = tuple_space[id]; //wを無視 - while(tmpTuple != null && tmpTuple.next != null && (tmpTuple.mode == 'w')){ - temp = tmpTuple; - tmpTuple = tmpTuple.next; + while(tuple != null && tuple.next != null && (tuple.mode == 'w')){ + temp = tuple; + tuple = tuple.next; } - if (tmpTuple != null && (tmpTuple.mode == 'o')){ + if (tuple != null && (tuple.mode == 'o')){ //tmpTuple = new Tuple((SocketChannel)key.channel()); - int seq = command.getInt(LINDA_SEQ_OFFSET); + int seq = command.getInt(PSX.LINDA_SEQ_OFFSET); command.rewind(); - tmpTuple.setCommand('a', seq); + tuple.setCommand('a', seq); if(debug){ - int sendsize = tmpTuple.getdataLength()+LINDA_HEADER_SIZE; - System.out.println("send size "+sendsize+" : mode = "+(char)tmpTuple.getMode()); + int sendsize = tuple.getdataLength()+PSX.LINDA_HEADER_SIZE; + System.out.println("send size "+sendsize+" : mode = "+(char)tuple.getMode()); } //INの場合はremoveする - if(mode == PSX_IN) { - if(tmpTuple.data != null){ + if(mode == PSX.PSX_IN) { + if(tuple.data != null){ //ByteBuffer buff = ByteBuffer.allocate(0); //tmpTuple.setData(buff); - tmpTuple.data = null; + tuple.data = null; } if(temp != null){ - temp.next = tmpTuple.next; + temp.next = tuple.next; } else { - tuple_space[id] = tmpTuple.next; + tuple_space[id] = tuple.next; } } } else { - if(tmpTuple == null) { + if(tuple == null) { //ServerSocketChannel sc = (ServerSocketChannel)key.channel(); - tmpTuple = tuple_space[id] = new Tuple((SocketChannel)key.channel()); - tmpTuple.next = null; + tuple = tuple_space[id] = new Tuple((SocketChannel)key.channel()); + tuple.next = null; }else { - while(tmpTuple.next !=null) tmpTuple =tmpTuple.next; - tmpTuple.next= new Tuple((SocketChannel)key.channel()); - tmpTuple = tmpTuple.next; - tmpTuple.next = null; + while(tuple.next !=null) tuple =tuple.next; + tuple.next= new Tuple((SocketChannel)key.channel()); + tuple = tuple.next; + tuple.next = null; } - tmpTuple.setMode(mode); - int seq2 = command.getInt(LINDA_SEQ_OFFSET); + tuple.setMode(mode); + int seq2 = command.getInt(PSX.LINDA_SEQ_OFFSET); command.rewind(); - tmpTuple.setSeq(seq2); - tmpTuple.ch = (SocketChannel) key.channel(); - tmpTuple.setDataLength(0); + tuple.setSeq(seq2); + tuple.ch = (SocketChannel) key.channel(); + tuple.setDataLength(0); ByteBuffer buff = ByteBuffer.allocate(0); buff.rewind(); - tmpTuple.setData(buff); - tmpTuple = null; + tuple.setData(buff); + tuple = null; if(debug){ System.out.println("data inserted insert seq = "+seq2 +", id = "+id); } } - return tmpTuple; + return tuple; } protected String Check(SelectionKey key, ByteBuffer command) throws IOException { @@ -245,7 +242,7 @@ ByteBuffer data; Tuple tmpTuple; int id; - char idc = (char)command.getShort(LINDA_ID_OFFSET); + char idc = (char)command.getShort(PSX.LINDA_ID_OFFSET); command.rewind(); id = (int)idc; @@ -254,12 +251,12 @@ tmpTuple = tmpTuple.next; } if (tmpTuple != null && (tmpTuple.mode == 'o')) { - command.putInt(LINDA_DATA_LENGTH_OFFSET, tmpTuple.datalen); + command.putInt(PSX.LINDA_DATA_LENGTH_OFFSET, tmpTuple.datalen); command.rewind(); data = tmpTuple.getData(); }else { //means no out tuple - command.putInt(LINDA_DATA_LENGTH_OFFSET, 0); + command.putInt(PSX.LINDA_DATA_LENGTH_OFFSET, 0); command.rewind(); data = ByteBuffer.allocate(0); } @@ -276,7 +273,7 @@ System.out.println("Manager_run: data is null"); } } - int send_size = LINDA_HEADER_SIZE; + int send_size = PSX.LINDA_HEADER_SIZE; int count = 0; //command Send