Mercurial > hg > FederatedLinda
changeset 0:083a0b5e12cc
Apply Debug Interface version start
author | fuchita |
---|---|
date | Thu, 07 Feb 2008 14:21:30 +0900 |
parents | |
children | cdc08d4722ec |
files | .classpath .project .settings/org.eclipse.core.resources.prefs .settings/org.eclipse.jdt.ui.prefs src/fdl/.classpath src/fdl/.project src/fdl/AcceptHandler.java src/fdl/ByteBufferReader.java src/fdl/ComDebug.java src/fdl/FDLindaServ.java src/fdl/FederatedLinda.java src/fdl/Handler.java src/fdl/IOHandler.java src/fdl/IOParam.java src/fdl/PSXCallback.java src/fdl/PSXLinda.java src/fdl/PSXQueue.java src/fdl/PSXQueueInterface.java src/fdl/PSXReply.java src/fdl/TestEtc.java src/fdl/TestPSXLinda.java src/fdl/Tuple.java src/fdl/TupleHandler.java src/fdl/URLKicker.java |
diffstat | 24 files changed, 1766 insertions(+), 0 deletions(-) [+] |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/.classpath Thu Feb 07 14:21:30 2008 +0900 @@ -0,0 +1,6 @@ +<?xml version="1.0" encoding="UTF-8"?> +<classpath> + <classpathentry kind="src" path="src"/> + <classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER"/> + <classpathentry kind="output" path="bin"/> +</classpath>
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/.project Thu Feb 07 14:21:30 2008 +0900 @@ -0,0 +1,17 @@ +<?xml version="1.0" encoding="UTF-8"?> +<projectDescription> + <name>FederatedLinda-Java</name> + <comment></comment> + <projects> + </projects> + <buildSpec> + <buildCommand> + <name>org.eclipse.jdt.core.javabuilder</name> + <arguments> + </arguments> + </buildCommand> + </buildSpec> + <natures> + <nature>org.eclipse.jdt.core.javanature</nature> + </natures> +</projectDescription>
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/.settings/org.eclipse.core.resources.prefs Thu Feb 07 14:21:30 2008 +0900 @@ -0,0 +1,3 @@ +#Fri Dec 21 18:09:04 JST 2007 +eclipse.preferences.version=1 +encoding/<project>=UTF-8
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/.settings/org.eclipse.jdt.ui.prefs Thu Feb 07 14:21:30 2008 +0900 @@ -0,0 +1,7 @@ +#Wed Jun 27 10:08:20 JST 2007 +eclipse.preferences.version=1 +org.eclipse.jdt.ui.ignorelowercasenames=true +org.eclipse.jdt.ui.importorder=java;javax;org;com; +org.eclipse.jdt.ui.ondemandthreshold=99 +org.eclipse.jdt.ui.staticondemandthreshold=99 +org.eclipse.jdt.ui.text.custom_code_templates=<?xml version\="1.0" encoding\="UTF-8"?><templates/>
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/fdl/.classpath Thu Feb 07 14:21:30 2008 +0900 @@ -0,0 +1,4 @@ +<?xml version="1.0" encoding="UTF-8"?> +<classpath> + <classpathentry kind="output" path="bin"/> +</classpath>
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/fdl/.project Thu Feb 07 14:21:30 2008 +0900 @@ -0,0 +1,17 @@ +<?xml version="1.0" encoding="UTF-8"?> +<projectDescription> + <name>UML</name> + <comment></comment> + <projects> + </projects> + <buildSpec> + <buildCommand> + <name>org.eclipse.jdt.core.javabuilder</name> + <arguments> + </arguments> + </buildCommand> + </buildSpec> + <natures> + <nature>org.eclipse.jdt.core.javanature</nature> + </natures> +</projectDescription>
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/fdl/AcceptHandler.java Thu Feb 07 14:21:30 2008 +0900 @@ -0,0 +1,77 @@ + +package fdl; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.ClosedChannelException; +import java.nio.channels.SelectionKey; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; + +public class AcceptHandler implements TupleHandler, PSXQueueInterface { + //public Hashtable<Integer, Tuple> tuple_space; + public Tuple[] tuple_space; + + public static int user = 0; + public byte userchar[] = new byte[2]; + public final int MAX_TUPLE = TupleHandler.MAX_TUPLE; + public Tuple tmpTuple; + + public AcceptHandler(Tuple[] _tuple_space) { + // 読みこんだデータを格納するためのリストの初期化 + tuple_space = _tuple_space; + } + + public void handle(SelectionKey key) + throws ClosedChannelException, IOException { + ServerSocketChannel serverChannel + = (ServerSocketChannel)key.channel(); + + // アクセプト処理 + SocketChannel channel = serverChannel.accept(); + channel.configureBlocking(false); + System.out.println("Server: accepted "+channel.socket().getInetAddress()); + + + //初期生成 + if((tmpTuple = tuple_space[MAX_TUPLE-1]) == null) { + tmpTuple = tuple_space[MAX_TUPLE-1] = new Tuple(); + tmpTuple.next = null; + } else { + while(tmpTuple.next != null) tmpTuple = tmpTuple.next; + tmpTuple.next = new Tuple(); + tmpTuple = tmpTuple.next; + tmpTuple.next = null; + } + + user++; + + //data set + //ByteBuffer data = ByteBuffer.allocate(SHORT_SIZE); + //data.rewind(); + //data.putShort((short) (user)); + + ByteBuffer data = ByteBuffer.allocate(2); + data.clear(); + userchar[0] = (byte) (user/10 + '0'); + userchar[1] = (byte) (user%10 + '0'); + + data.put(userchar[0]); + data.put(userchar[1]); + + data.rewind(); + tmpTuple.setData(data); + //Tuple + int id = MAX_TUPLE-1; + tmpTuple.setTuple('o', id, 0, data.limit(), data); + + + System.out.println("Server: assign id "+user); + + // 入出力用のハンドラを生成し,アタッチする + // 監視する操作は読み込みのみ + IOParam handler = new IOHandler(tuple_space); + channel.register(key.selector(), + SelectionKey.OP_READ, + handler); + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/fdl/ByteBufferReader.java Thu Feb 07 14:21:30 2008 +0900 @@ -0,0 +1,36 @@ + +package fdl; + +import java.io.*; +import java.io.IOException; +import java.nio.*; + +public class ByteBufferReader extends Reader { + public ByteBuffer _buf; + + public ByteBufferReader(ByteBuffer buf) { + _buf = buf; + _buf.reset(); + } + + @Override + public int read(char[] cbuf, int off, int len) throws IOException { + // TODO Auto-generated method stub + for(int i=0;i<len;i++) { + cbuf[i]=_buf.getChar(); + } + return len; + } + + @Override + public void reset() { + _buf.reset(); + } + + @Override + public void close() throws IOException { + // TODO Auto-generated method stub + + } + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/fdl/ComDebug.java Thu Feb 07 14:21:30 2008 +0900 @@ -0,0 +1,70 @@ +package fdl; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.nio.channels.SelectionKey; +import java.nio.channels.SocketChannel; +import java.util.Hashtable; +import java.util.Iterator; +import java.util.LinkedList; + + +public class ComDebug implements PSXQueueInterface{ + static final boolean debug = true; + public static Hashtable<String, Integer> Com_Hashtable = new Hashtable<String, Integer>(); + public static LinkedList<SocketChannel> Report_Channellist = new LinkedList<SocketChannel>(); + + ComDebug(){ + + } + + public static void Report(LinkedList<SocketChannel> reportCh_list, ByteBuffer command, String report_txt) throws IOException { + //レポートするチャンネルが0ならreturn + if(reportCh_list.isEmpty()) { + return; + } + //(ByteBuffer)dataに(String)report_txtを入れる + byte[] txt = report_txt.getBytes(); + ByteBuffer data = ByteBuffer.wrap(txt); + data.rewind(); + + //commandをセット + command.order(ByteOrder.BIG_ENDIAN); + command.putInt(LINDA_PACKET_LENGTH_OFFSET,txt.length+LINDA_HEADER_SIZE-INT_SIZE); + command.put(LINDA_MODE_OFFSET,(byte)'D'); + command.putShort(LINDA_ID_OFFSET,(short) 0); + command.putInt(LINDA_SEQ_OFFSET,0); + command.putInt(LINDA_DATA_LENGTH_OFFSET,txt.length); + command.rewind(); + + //送信 + IOParam io = new IOParam(); + Iterator <SocketChannel> it = reportCh_list.iterator(); + while(it.hasNext()) { + io.send(it.next(), command, data); + } + } + + public static void Com_inc(SelectionKey key, Hashtable<String, Integer> comlist, int mode) { + //通信ログ Hostname:port 'mode' =number 形式でインクリメント + int cnt = 0; + SocketChannel ch = (SocketChannel) key.channel(); + String socketString = ch.socket().getRemoteSocketAddress().toString(); + String[] split = socketString.split("/"); + int length = split.length; + String ComKey = split[length-1]; + ComKey += " "+(char)mode+ " "; + if(comlist.containsKey(ComKey)){ + cnt = comlist.get(ComKey); + } + cnt++; + comlist.put(ComKey, cnt); + } + + public static void addChannel(SelectionKey key, LinkedList<SocketChannel> reportCh_list) { + SocketChannel repch = (SocketChannel) key.channel(); + reportCh_list.add(repch); + } + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/fdl/FDLindaServ.java Thu Feb 07 14:21:30 2008 +0900 @@ -0,0 +1,87 @@ + +package fdl; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.ServerSocketChannel; +import java.util.Iterator; + +public class FDLindaServ implements PSXQueueInterface { + static final int MAX_REQ = 1; + static final int FAIL = (-1); + static final int MAX_UAER = 4; + static final int MAX_TUPLE = 65536; + static final int DEF_PORT = 10000; + static final String PATHNAME = "/tmp/ldserv"; + + public static Tuple[] tuple_space; + + public static void main(final String[] args) throws IOException { + @SuppressWarnings("unused") + final String usages = "usage: FDLindaServ [-p port]"; + int port = DEF_PORT; + + tuple_space = new Tuple[MAX_TUPLE]; + + //引数判定 + try { + for (int i=0; i<args.length; ++i) { + if("-p".equals(args[i])) { + port = Integer.parseInt(args[++i]); + } else { + System.err.println(usages); + } + } + } catch (NumberFormatException e) { + e.printStackTrace(); + } + + //セレクタを生成 + Selector selector = Selector.open(); + + try { + //ソケット・チャネルを生成・設定 + ServerSocketChannel ssChannel = ServerSocketChannel.open(); + ssChannel.configureBlocking(false); + ssChannel.socket().setReuseAddress(true); + InetSocketAddress address = new InetSocketAddress(InetAddress.getLocalHost(), port); + ssChannel.socket().bind(address); + System.out.println("Server: litening at "+ssChannel); + + //セレクタにチャンネルを登録 + ssChannel.register(selector, SelectionKey.OP_ACCEPT, new AcceptHandler(tuple_space)); + + // セレクタによる監視 + while (selector.select() > 0) { + + // Iteratorを用意 + Iterator <SelectionKey>it = selector.selectedKeys().iterator(); + + while (it.hasNext()) { + // SelectionKeyを取り出す + SelectionKey selKey = (SelectionKey)it.next(); + + // 操作に対する処理が行われていると認識させるためにremoveする + it.remove(); + + TupleHandler handler = (TupleHandler)selKey.attachment(); + handler.handle(selKey); + } + + } + } catch (IOException exc) { + exc.printStackTrace(); + } finally { + try { + for (SelectionKey key: selector.keys()) { + key.channel().close(); + } + } catch(IOException ex) { + ex.printStackTrace(); + } + } + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/fdl/FederatedLinda.java Thu Feb 07 14:21:30 2008 +0900 @@ -0,0 +1,242 @@ + +/* + * @(#)FederatedLinda.java 1.1 06/04/01 + * + * Copyright 2006 Shinji KONO + * + + PSX Lidna + Trasport layer of PSX Linda library + + */ + +package fdl; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.nio.channels.ClosedSelectorException; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.SocketChannel; +import java.util.Hashtable; +import java.util.Set; + + +/** + FederatedLinda + * + * @author Shinji Kono + * + * @param mytsid Tuple Space ID + + Initialize connection channel for a tuple space + + one instance for each Tuple space connection + + */ + +public class FederatedLinda implements PSXQueueInterface { + + static FederatedLinda fdl; + static int MAX_SEQUENCE = 2048; + static final boolean debug = true; + + public int tid; + public int seq; + public int qsize; + public PSXLinda linda; + + public Selector selector; + + public PSXQueue q_top,q_end; + public PSXReply r_top,r_end; + public Hashtable<Integer,PSXReply> seqHash; + + static FederatedLinda init() + throws IOException { + if (fdl==null) { + fdl = new FederatedLinda(); + } + return fdl; + } + + private FederatedLinda() + throws IOException { + selector = Selector.open(); + seqHash = new Hashtable<Integer, PSXReply>(); + } + + public PSXLinda open(String _host,int _port) + throws IOException { + tid++; + // System.out.print("Tid = "); + // System.out.println(tid); + PSXLinda newlinda = new PSXLinda(this,tid,_host,_port); + linda = newlinda.add(linda); + return linda; + } + +/** + psx_queue (unsigned int tspace_id, unsigned int id, + unsigned int size, unsigned char *data, char mode, + void(*callback)(char*,void*), void * obj): + */ + + public PSXReply psx_queue(PSXLinda linda, int id, ByteBuffer s, int length, int mode, PSXCallback callback) { + PSXQueue c = new PSXQueue(linda,id,mode,s,length,callback); + + if (q_top == null) { + c = q_end = q_top = c; + } else { + q_end.next = c; + q_end = c; + } + qsize++; + + if (mode != PSX_OUT) { + PSXReply p = new PSXReply(PSX_REPLY,callback); + p.seq = seq(p); + c.setSeq(p.seq); + if (debug) { + System.out.print("Integer compare="); + System.out.println((new Integer(2)).equals(new Integer(2))); + System.out.print("Seding seq="); + System.out.println(p.seq); + } + if (r_top == null){ + r_end = r_top = p; + } else { + r_end.next = p; + r_end = p; + } + return p; + } + return null; + } + + public int seq(PSXReply reply) { + Integer s; + do { + seq++; + if (seq>MAX_SEQUENCE) { + seq = 0; + } + s = new Integer(seq); + } while (seqHash.containsKey(s)); + if (debug) { + System.out.print("hash value = "); + System.out.println(s.hashCode()); + } + seqHash.put(s,reply); + seq++; + return seq-1; + } + + public Selector selector() { + return selector; + } + + public int sync() throws IOException { + return sync(0); + } + + public int sync(long mtimeout) + throws IOException { + int key_num = 0; + Set<SelectionKey> keys; + + while (q_top != null){ + PSXQueue c = q_top; + c.Send(); + q_top = c.next; + // psx_free(c); + // q_top = c = t; + qsize--; + } + + try { + key_num = selector.select(mtimeout); + keys = selector.selectedKeys(); + for (SelectionKey key : keys) { + // System.out.println("selecting"); + SocketChannel sock = (SocketChannel)key.channel(); + chkServe(sock); + } + } catch (IOException e) { + e.printStackTrace(); + } catch (ClosedSelectorException e) { + e.printStackTrace(); + } + + return key_num; + } + +// should be in PSXLinda, but sock->linda is unknown here + + private void chkServe(SocketChannel sock) + throws IOException { + int length; + ByteBuffer command = ByteBuffer.allocate(LINDA_HEADER_SIZE); + command.order(ByteOrder.BIG_ENDIAN); + + sock.read(command); + command.rewind(); + length = command.getInt(LINDA_DATA_LENGTH_OFFSET); + if (length>0) { + ByteBuffer data = ByteBuffer.allocate(length); + int read = length; + if (debug) { + System.out.print("reading:"); + System.out.println(length); + } + + data.order(ByteOrder.BIG_ENDIAN); + while(read>0) { + read -= sock.read(data); + } + data.rewind(); + if (debug) { + System.out.print("header:"); + for(int i=0;i<LINDA_HEADER_SIZE;i++) { + System.out.println(command.get(i)); + } + System.out.print("data:"); + for(int i=0;i<length;i++) { + System.out.println(data.get(i)); + } + data.rewind(); + } + int rseq = command.getInt(LINDA_SEQ_OFFSET); + int mode = command.get(LINDA_MODE_OFFSET); + Integer a; + if (debug) { + System.out.print("mode = "); + System.out.println(mode); + System.out.print("seq = "); + System.out.println(rseq); + } + try { + PSXReply r = seqHash.get((a = new Integer(rseq))); + if (debug) { + System.out.print("hash value = "); + System.out.println(a.hashCode()); + } + + r.setAnswer(mode,command,data); + + if (r.callback != null ) { + r.callback.callback(data); + } + } catch (NullPointerException e ) { + if (debug) { + System.out.println("hashed reply not found"); + } + // can't happen + return ; + } + } + } +} + +/* end */
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/fdl/Handler.java Thu Feb 07 14:21:30 2008 +0900 @@ -0,0 +1,9 @@ + +package fdl; +import java.io.IOException; +import java.nio.channels.ClosedChannelException; +import java.nio.channels.SelectionKey; + +public interface Handler { + public void handle(SelectionKey key) throws ClosedChannelException, IOException; +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/fdl/IOHandler.java Thu Feb 07 14:21:30 2008 +0900 @@ -0,0 +1,166 @@ + +package fdl; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.nio.channels.ClosedChannelException; +import java.nio.channels.SelectionKey; +import java.nio.channels.SocketChannel; +import java.util.Hashtable; +import java.util.LinkedList; + + +public class IOHandler extends IOParam { + static final boolean debug = false; + public Tuple[] tuple_space; + public ComDebug com_debug; + //public static Hashtable<String, Integer> com_Loggingtable = new Hashtable<String, Integer>(); + //public static LinkedList<SocketChannel> reportCh_list = new LinkedList<SocketChannel>(); + + public IOHandler(Tuple[] _tuple_space) { + super(_tuple_space); + } + + public void handle(SelectionKey key) + throws ClosedChannelException, IOException { + // 書き込み可であれば,読み込みを行う + if (key.isReadable()) { + read(key); + } + + // 書き込み可であれば,書き込みを行う + /*if (key.isWritable() && key.isValid()) { + write(key); + }*/ + } + + private void read(SelectionKey key) + throws ClosedChannelException, IOException { + SocketChannel channel = (SocketChannel)key.channel(); + + // 読み込み用のバッファの生成 + ByteBuffer command = ByteBuffer.allocate(LINDA_HEADER_SIZE); + command.order(ByteOrder.BIG_ENDIAN); + command.clear(); + + int readsize = LINDA_HEADER_SIZE; + int count = 0; + + // 読み込み + while(readsize>0) { + if(debug){ + System.out.print("reading command..."); + } + count = channel.read(command); + + if(count < 0) { + System.out.println("Connection closed by "+key.channel()); + //System.out.println("Close channel on EOF; channel: "+channel); + key.cancel(); + channel.close(); + readsize = -1; + return; + } + readsize -= count; + } + command.rewind(); + + int len = command.getInt(LINDA_PACKET_LENGTH_OFFSET); + int datalen = command.getInt(LINDA_DATA_LENGTH_OFFSET); + + ByteBuffer data = ByteBuffer.allocate(datalen); + int read = datalen; + + if (debug) { + System.out.println("reading: " +datalen); + } + + data.order(ByteOrder.BIG_ENDIAN); + data.clear(); + while(read>0) { + //System.out.print("reading2..."); + read -= channel.read(data); + } + data.rewind(); + + /* + static final int LINDA_PACKET_LENGTH_OFFSET =0; + static final int LINDA_MODE_OFFSET =0+4; + static final int LINDA_ID_OFFSET =1+4; + static final int LINDA_SEQ_OFFSET =3+4; + static final int LINDA_DATA_LENGTH_OFFSET =7+4; + static final int LINDA_HEADER_SIZE =12+4; + */ + command.order(ByteOrder.BIG_ENDIAN); + command.rewind(); + + /*** print data ***/ + char id = (char)command.getShort(LINDA_ID_OFFSET); + System.out.println("LENGTH:"+command.getInt(LINDA_PACKET_LENGTH_OFFSET)+" "+ + "MODE:"+(char)command.get(LINDA_MODE_OFFSET)+" "+ + "ID:"+(int)id+" "+ + "SEQ:"+command.getInt(LINDA_SEQ_OFFSET)+" "); + //"SEQ:"+command.getInt(LINDA_SEQ_OFFSET)+" "+ + //"DATA LENGTH:"+command.getInt(LINDA_DATA_LENGTH_OFFSET)+" "); + //System.out.println("DATA:"+data); + //データ処理 + command.rewind(); + manager_run(key, command, data, len); + + key.interestOps(key.interestOps() | SelectionKey.OP_READ ); + } + + + public void manager_run(SelectionKey key, ByteBuffer command, ByteBuffer data, int len) throws IOException { + command.order(ByteOrder.BIG_ENDIAN); + int mode = command.get(LINDA_MODE_OFFSET); + int id = command.get(LINDA_ID_OFFSET); + command.rewind(); + + com_debug = new ComDebug(); + Hashtable<String, Integer> com_Loggingtable = ComDebug.Com_Hashtable; + LinkedList<SocketChannel> reportCh_list = ComDebug.Report_Channellist; + + if (debug) { + System.out.println("data from : "+key.channel()); + } + if((mode == '!') || (len == 0)) { + Connection_Close(key); + }else if(id > PRIVILEGED_ID && id < MAX_TUPLE-1){ + ComDebug.addChannel(key, reportCh_list); + } + else if(mode == PSX_CHECK) { + Check(key, command); + } + else if(mode == PSX_IN || mode == PSX_RD){ + In_Rd(key, command, mode); + } else if (mode == PSX_WAIT_RD) { + Wait_Rd(key, command, mode); + } else if(mode == PSX_OUT) { + Out(command, data); + } else { + System.out.println("Uncorrect buffer"); + System.exit(1); + } + //DEBUG用カウンタ + ComDebug.Com_inc(key, com_Loggingtable, mode); + System.out.println("Com_Debug:"); + System.out.println(com_Loggingtable.toString()); + //DEBUG用レポート + ComDebug.Report(reportCh_list, command, com_Loggingtable.toString()); + + if (key.interestOps() + != (SelectionKey.OP_READ)) { + // 読み込み操作に対する監視を行う + key.interestOps(SelectionKey.OP_READ ); + } + + } + + private void Connection_Close(SelectionKey key) throws IOException { + System.out.println("Connection closed by "+key.channel()); + SocketChannel channel = (SocketChannel)key.channel(); + key.cancel(); + channel.close(); + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/fdl/IOParam.java Thu Feb 07 14:21:30 2008 +0900 @@ -0,0 +1,309 @@ +package fdl; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.ClosedChannelException; +import java.nio.channels.SelectionKey; +import java.nio.channels.SocketChannel; + +public class IOParam implements TupleHandler, PSXQueueInterface{ + static final boolean debug = true; + static final int CAPSIZE = 4194304; + public Tuple[] tuple_space; + + public IOParam(Tuple[] _tuple_space) { + super(); + // 読みこんだデータを格納するためのリストの初期化 + tuple_space = _tuple_space; + } + + public IOParam() { + // TODO Auto-generated constructor stub + } + + protected void Out(ByteBuffer command, ByteBuffer data) throws IOException { + Tuple tmpTuple; + int id; + int datasize; + char idc = (char)command.getShort(LINDA_ID_OFFSET); + command.rewind(); + id = (int)idc; + + datasize = command.getInt(LINDA_DATA_LENGTH_OFFSET); + command.rewind(); + + System.out.println("*** out command : id = "+id +" ***"); + + while((tuple_space[id] != null) && + ((tuple_space[id].mode == PSX_WAIT_RD)||(tuple_space[id].mode == PSX_RD))) { + command.put(LINDA_MODE_OFFSET, (byte)'a'); + command.rewind(); + command.putInt(LINDA_SEQ_OFFSET, tuple_space[id].getSeq()); + command.rewind(); + //if(debug){ + //int sendsize = tmpTuple.getdataLength()+LINDA_HEADER_SIZE; + //System.out.println("send size "+sendsize+" : mode = "+(char)mode); + //} + //ByteBuffer sendcommand = tmpTuple.getCommand(); + //ByteBuffer senddata = tmpTuple.getData(); + send(tuple_space[id].ch, command, data); + + //後処理 + tmpTuple = tuple_space[id]; + tuple_space[id] = tmpTuple.next; + tmpTuple = null; + } + if(tuple_space[id] != null && tuple_space[id].mode == PSX_IN) { + command.put(LINDA_MODE_OFFSET, (byte)'a'); + command.rewind(); + command.putInt(LINDA_SEQ_OFFSET, tuple_space[id].getSeq()); + command.rewind(); + + if(debug){ + int sendsize = datasize+LINDA_HEADER_SIZE; + System.out.println("send size "+sendsize+" : mode = "+(char)'a'); + } + //ByteBuffer sendcommand = tmpTuple.getCommand(); + //ByteBuffer senddata = tmpTuple.getData(); + send(tuple_space[id].ch, command, data); + + //後処理 + tmpTuple = tuple_space[id]; + tuple_space[id] = tmpTuple.next; + tmpTuple = null; + } else if ((tuple_space[id] == null)|| (tuple_space[id].getMode() == PSX_OUT)) { + if((tmpTuple = tuple_space[id]) == null) { + tmpTuple = tuple_space[id] = new Tuple(); + tmpTuple.next = null; + } + else { + while(tmpTuple.next != null) tmpTuple = tmpTuple.next; + tmpTuple.next = new Tuple(); + tmpTuple = tmpTuple.next; + tmpTuple.next = null; + } + + tmpTuple.setMode('o'); + int seq = command.getInt(LINDA_SEQ_OFFSET); + command.rewind(); + tmpTuple.setSeq(seq); + tmpTuple.setData(data); + tmpTuple.setDataLength(datasize); + if(debug){ + System.out.println("data inserted len = "+tmpTuple.getdataLength()+" : id = "+id); + } + } + else { + System.out.println("Uncorrect mode :"+(char)tuple_space[id].getMode()); + command.clear(); + data.clear(); + System.exit(1); + } + } + + protected void Wait_Rd(SelectionKey key, ByteBuffer command, int mode) { + Tuple tmpTuple; + int id; + + char idc = (char)command.getShort(LINDA_ID_OFFSET); + command.rewind(); + id = (int)idc; + + System.out.println("*** "+(char)mode+" command : id = "+ id +" ***\n"); + + tmpTuple = new Tuple(); + tmpTuple.setMode(mode); + int seq = command.getInt(LINDA_SEQ_OFFSET); + command.rewind(); + tmpTuple.setSeq(seq); + tmpTuple.ch = (SocketChannel) key.channel(); + tmpTuple.setDataLength(0); + ByteBuffer buff = ByteBuffer.allocate(0); + tmpTuple.setData(buff); + tmpTuple.next = tuple_space[id]; + tuple_space[id] = tmpTuple; + if(debug){ + System.out.println("data inserted insert seq = "+seq +", id = "+id); + } + } + + protected void In_Rd(SelectionKey key, ByteBuffer command, int mode) + throws IOException { + Tuple tmpTuple; + int id; + //id = command.getInt(LINDA_ID_OFFSET); + //int mode = command.getInt(LINDA_MODE_OFFSET); + Tuple temp = null; + + char idc = (char)command.getShort(LINDA_ID_OFFSET); + command.rewind(); + id = (int)idc; + + System.out.println("*** "+(char)mode+" command : id = "+ id +" ***\n"); + + tmpTuple = tuple_space[id]; + + //wを無視 + while(tmpTuple != null && tmpTuple.next != null && (tmpTuple.mode == 'w')){ + temp = tmpTuple; + tmpTuple = tmpTuple.next; + } + + if (tmpTuple != null && (tmpTuple.mode == 'o')){ + //tmpTuple = new Tuple((SocketChannel)key.channel()); + int seq = command.getInt(LINDA_SEQ_OFFSET); + command.rewind(); + tmpTuple.setCommand('a', seq); + + if(debug){ + int sendsize = tmpTuple.getdataLength()+LINDA_HEADER_SIZE; + System.out.println("send size "+sendsize+" : mode = "+(char)tmpTuple.getMode()); + } + + //send + ByteBuffer sendcommand = tmpTuple.getCommand(); + ByteBuffer senddata = tmpTuple.getData(); + send(key,sendcommand, senddata); + + //INの場合はremoveする + if(mode == PSX_IN) { + if(tmpTuple.data != null){ + //ByteBuffer buff = ByteBuffer.allocate(0); + //tmpTuple.setData(buff); + tmpTuple.data = null; + } + if(temp != null){ + temp.next = tmpTuple.next; + } + else { + tuple_space[id] = tmpTuple.next; + } + tmpTuple = null; + } + } else { + if(tmpTuple == null) { + //ServerSocketChannel sc = (ServerSocketChannel)key.channel(); + tmpTuple = tuple_space[id] = new Tuple((SocketChannel)key.channel()); + tmpTuple.next = null; + }else { + while(tmpTuple.next !=null) tmpTuple =tmpTuple.next; + tmpTuple.next= new Tuple((SocketChannel)key.channel()); + tmpTuple = tmpTuple.next; + tmpTuple.next = null; + } + + tmpTuple.setMode(mode); + int seq2 = command.getInt(LINDA_SEQ_OFFSET); + command.rewind(); + tmpTuple.setSeq(seq2); + tmpTuple.ch = (SocketChannel) key.channel(); + tmpTuple.setDataLength(0); + ByteBuffer buff = ByteBuffer.allocate(0); + buff.rewind(); + tmpTuple.setData(buff); + + if(debug){ + System.out.println("data inserted insert seq = "+seq2 +", id = "+id); + } + } + + //} else if (command.getInt(LINDA_MODE_OFFSET) == PSX_WAIT_RD) { + } + + protected void Check(SelectionKey key, ByteBuffer command) throws IOException { + ByteBuffer data; + Tuple tmpTuple; + int id; + char idc = (char)command.getShort(LINDA_ID_OFFSET); + command.rewind(); + id = (int)idc; + tmpTuple = tuple_space[id]; + while(tmpTuple != null && tmpTuple.next != null && (tmpTuple.mode == 'w')){ + tmpTuple = tmpTuple.next; + } + if (tmpTuple != null && (tmpTuple.mode == 'o')) { + command.putInt(LINDA_DATA_LENGTH_OFFSET, tmpTuple.datalen); + command.rewind(); + data = tmpTuple.getData(); + }else { + //means no out tuple + command.putInt(LINDA_DATA_LENGTH_OFFSET, 0); + command.rewind(); + data = null; + data = ByteBuffer.allocate(0); + } + send(key, command, data); + } + + public void send(SocketChannel ch, ByteBuffer command, ByteBuffer data) + throws IOException { + if (debug) { + if (command == null) { + System.out.println("Manager_run: command is null"); + } + if (data == null) { + System.out.println("Manager_run: data is null"); + } + } + int send_size = LINDA_HEADER_SIZE; + int count = 0; + + //command Send + command.rewind(); + while(send_size > 0){ + count = ch.write(command); + if(count < 0){ + System.out.println("Write Falied! close ch:"+ch); + ch.close(); + return; + } + send_size -= count; + } + + //data Send + data.rewind(); + if(data != null) { + data.rewind(); + ch.write(data); + } + } + + public void send(SelectionKey key, ByteBuffer command, ByteBuffer data) + throws IOException { + SocketChannel ch = (SocketChannel)key.channel(); + if (debug) { + if (command == null) { + System.out.println("Manager_run: command is null"); + } + if (data == null) { + System.out.println("Manager_run: data is null"); + } + } + int send_size = LINDA_HEADER_SIZE; + int count = 0; + + //command Send + command.rewind(); + while(send_size > 0){ + count = ch.write(command); + if(count < 0){ + System.out.println("Write Falied! close ch:"+ch); + ch.close(); + return; + } + send_size -= count; + } + + //data Send + data.rewind(); + if(data != null) { + data.rewind(); + ch.write(data); + } + } + + public void handle(SelectionKey key) throws ClosedChannelException, + IOException { + + } +} \ No newline at end of file
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/fdl/PSXCallback.java Thu Feb 07 14:21:30 2008 +0900 @@ -0,0 +1,20 @@ + +/** + PSXCallback + * + * @author Shinji Kono + * + * @param reply retruned String + + call back interface + + */ + +package fdl; +import java.nio.ByteBuffer; + +public interface PSXCallback { + public void callback(ByteBuffer reply) ; +} + +
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/fdl/PSXLinda.java Thu Feb 07 14:21:30 2008 +0900 @@ -0,0 +1,171 @@ + +/* + * @(#)PSXLinda.java 1.1 06/04/01 + * + * Copyright 2006 Shinji KONO + * + + PSX Lidna + Trasport layer of PSX Linda library + + */ + +package fdl; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.nio.ByteBuffer; +import java.nio.channels.SelectionKey; +import java.nio.channels.SocketChannel; + + +/** + PSXLinda + * + * @author Shinji Kono + * + * @param mytsid Tuple Space ID + + Initialize connection channel for a tuple space + + one instance for each Tuple space connection + + */ + +public class PSXLinda implements PSXQueueInterface { + private FederatedLinda fdl; + private SocketChannel socketChannel; + public String host; + public int port; + public int mytsid; + public PSXLinda next; + static final boolean debug = false; + + public PSXLinda(FederatedLinda _fdl,int _mytsid,String _host,int _port) + throws IOException { + Socket socket; + host = _host; + port = _port; + mytsid = _mytsid; + fdl = _fdl; + + socketChannel = SocketChannel.open(); + socketChannel.configureBlocking(false); + + socket = socketChannel.socket(); + // socket.setReuseAddress(true); + socket.setTcpNoDelay(true); + + // can be blocked (thread required?) + socketChannel.connect(new InetSocketAddress(InetAddress.getLocalHost(), _port)); + while (! socketChannel.finishConnect()) { + if (debug) { + System.out.println("waiting for connect"); + } +if (false) { + try { + wait(2000); + } catch (InterruptedException e) { + } +} + } + + socketChannel.register(fdl.selector(), SelectionKey.OP_READ); + + + checkConnect("PSXLinda"); + } + + protected void finalize() { + if (socketChannel != null) { + try { + socketChannel.close(); + } catch (IOException e) { + } + } + } + + private void checkConnect(String s) { + System.out.print("Connected:"); + System.out.print(s); + System.out.print(": "); + System.out.println(socketChannel.isConnected()); + } + + public PSXReply in(int id) { + PSXReply r = fdl.psx_queue(this, id, null, 0, PSX_IN, (PSXCallback)null); + return r; + } + + public void in(int id, PSXCallback callback) { + fdl.psx_queue(this, id, null, 0, PSX_IN, callback); + } + + public PSXReply ck(int id) { + PSXReply r = fdl.psx_queue(this, id, null, 0, PSX_IN, null); + return r; + } + + public void ck(int id, PSXCallback callback) { + fdl.psx_queue(this, id, null, 0, PSX_IN, callback); + } + + public PSXReply out(int id, ByteBuffer data,int size) { + PSXReply r = fdl.psx_queue(this, id, data, size, PSX_OUT, null); + return r; + } + + public PSXReply update(int id, ByteBuffer data,int size) { + PSXReply r = fdl.psx_queue(this, id, data, size, PSX_UPDATE, null); + return r; + } + + public void update(int id, ByteBuffer data,int size,PSXCallback callback) { + fdl.psx_queue(this, id, data, size, PSX_UPDATE, callback); + } + + public PSXReply rd(int id) { + PSXReply r = fdl.psx_queue(this, id, null, 0, PSX_RD, null); + return r; + } + + public void rd(int id, PSXCallback callback) { + fdl.psx_queue(this, id, null, 0, PSX_RD, callback); + } + + public PSXLinda add(PSXLinda linda) { + next = linda; + return this; + } + + public void send(ByteBuffer command,ByteBuffer data) + throws IOException { + if (debug) { + checkConnect("send"); + if (command == null) { + System.out.println("PSXLinda:command is null"); + } + if (data == null) { + System.out.println("PSXLinda:data is null"); + } + } + socketChannel.write(command); + if (data != null) + socketChannel.write(data); + } + + public int sync() + throws IOException { + return fdl.sync(); + } + + public int sync(long mtime) + throws IOException { + return fdl.sync(mtime); + } +} + + +/* end */
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/fdl/PSXQueue.java Thu Feb 07 14:21:30 2008 +0900 @@ -0,0 +1,73 @@ + +/* + * @(#)PSXQueue.java 1.1 06/04/01 + * + * Copyright 2006 Shinji KONO + * + + PSX Lidna + Trasport layer of PSX Linda library + + */ + +package fdl; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +// import java.nio.channels.*; + +/** + PSXQueue + + Iterator + */ + +public class PSXQueue implements PSXQueueInterface { + public int tspace_id; + public int id; + public int mode; + public int size; + public ByteBuffer data; + public ByteBuffer command; + public int seq; + public PSXCallback callback; + public PSXQueue next; + public PSXLinda linda; + + public PSXQueue( PSXLinda _linda,int _id,int _mode,ByteBuffer _data,int _size,PSXCallback _callback) { + linda = _linda; + id = _id; + data = _data; + size = _size; + mode = _mode; + callback = _callback; + setCommand(); + } + + private void setCommand() { + command = ByteBuffer.allocate(LINDA_HEADER_SIZE); + command.order(ByteOrder.BIG_ENDIAN); + + command.putInt(LINDA_PACKET_LENGTH_OFFSET, + size+LINDA_HEADER_SIZE-INT_SIZE); + command.put(LINDA_MODE_OFFSET,(byte)mode); + command.putShort(LINDA_ID_OFFSET,(short)id); + command.putInt(LINDA_SEQ_OFFSET,seq); + command.putInt(LINDA_DATA_LENGTH_OFFSET,size); + } + + public void setSeq(int _seq) { + seq = _seq; + command.putInt(LINDA_SEQ_OFFSET,seq); + } + + public void Send() + throws IOException { + if (command!=null) command.rewind(); + if (data!=null) data.rewind(); + linda.send(command,data); + } +} + +/* end */
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/fdl/PSXQueueInterface.java Thu Feb 07 14:21:30 2008 +0900 @@ -0,0 +1,48 @@ + +/* + * @(#)PSXQueueInterface.java 1.1 06/04/01 + * + * Copyright 2006 Shinji KONO + * + + PSX Lidna + Trasport layer of PSX Linda library + + */ + +package fdl; + + +/** + PSXQueueInterface + + Iterator + */ + +public interface PSXQueueInterface { + static final int PSX_IN = 'i'; + static final int PSX_OUT = 'o'; + static final int PSX_UPDATE = 'u'; + static final int PSX_RD = 'r'; + static final int PSX_CHECK = 'c'; + static final int PSX_REPLY = '?'; + static final int PSX_WAIT_RD = 'w'; + static final int PSX_ANSWER = 'a'; + static final int PSX_HTTP_ANSWER = 'P'; // Put + static final int PSX_HTTP_REQUEST = 'G'; // Get + static final int PSX_COM_DEBUG = 'D'; //Communication DEBUG + + static final int LINDA_PACKET_LENGTH_OFFSET =0; + static final int LINDA_MODE_OFFSET =0+4; + static final int LINDA_ID_OFFSET =1+4; + static final int LINDA_SEQ_OFFSET =3+4; + static final int LINDA_DATA_LENGTH_OFFSET =7+4; + static final int LINDA_HEADER_SIZE =12+4; + static final int INT_SIZE =4; + static final int SHORT_SIZE =2; + + static final int PRIVILEGED_ID = 32767; + +} + +/* end */
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/fdl/PSXReply.java Thu Feb 07 14:21:30 2008 +0900 @@ -0,0 +1,75 @@ + +/** + * PSXReply + * + * @author Shinji Kono + * + + psx.in(),psxrd.() return this. + + method for check answer is ready or not. + + (call back interface can be used instead of this) + + unique sequence number */ + +package fdl; +import java.nio.ByteBuffer; + +public class PSXReply implements PSXQueueInterface { + public ByteBuffer command; + public ByteBuffer data; + public int seq; + public PSXReply next; + public int mode; + public PSXCallback callback; + static final boolean debug = false; + + public PSXReply(int _mode,PSXCallback _callback) { + mode = _mode; + callback = _callback; + } + + public PSXReply() { + } + + public void setAnswer(int _mode, ByteBuffer _command,ByteBuffer _data) { + mode = _mode; + data = _data; + command = _command; + if (debug) { +System.out.print("setAnswer mode:"); +System.out.println(mode); +System.out.print("setAnswer bool:"); +System.out.println(mode==PSX_ANSWER); + } + } + + public int getMode() { + return command.get(LINDA_MODE_OFFSET); + } + + public int getId() { + return command.getShort(LINDA_ID_OFFSET); + } + + public int getSeq() { + return command.getInt(LINDA_SEQ_OFFSET); + } + + public int getLength() { + return command.getInt(LINDA_DATA_LENGTH_OFFSET); + } + + public ByteBuffer getData() { + data.rewind(); + return data; + } + + public boolean ready() { + return mode==PSX_ANSWER; + } +} + + +/* end */
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/fdl/TestEtc.java Thu Feb 07 14:21:30 2008 +0900 @@ -0,0 +1,58 @@ + +package fdl; + +import java.io.IOException; +import java.nio.ByteBuffer; + +public class TestEtc implements PSXQueueInterface { + static final int MAX_REQ = 1; + static final int FAIL = (-1); + static final int MAX_UAER = 4; + static final int MAX_TUPLE = 65536; + static final int MAX_BUFFSIZE = 2048; + static final int DEF_PORT = 10000; + static final String PATHNAME = "/tmp/ldserv"; + + + static final int LINDA_MODE_OFFSET = 0; + static final int LINDA_ID_OFFSET = 1; + static final int LINDA_SEQ_OFFSET = 3; + static final int LINDA_DATA_LENGTH_OFFSET = 7; + static final int LINDA_HEADER_SIZE = 12; + + static final boolean debug = true; + public PSXQueue tuple_space; + public PSXQueue tp; + + public static void main(String[] args) throws IOException { + Tuple[] testTS = new Tuple[MAX_TUPLE]; + ByteBuffer buf = ByteBuffer.allocate(4); + buf.putInt(10); + buf.rewind(); + + Tuple tmpTuple; + + tmpTuple = testTS[MAX_TUPLE-1] = new Tuple(); + + if(tmpTuple != null) { + tmpTuple.setTuple('o', 1, 1, 1, buf); + } + + if(testTS[1] == null) { + System.out.println("testTS[1] == null"); + } + tmpTuple.next = testTS[1] = new Tuple(); + + tmpTuple = tmpTuple.next; + if(tmpTuple != null){ + tmpTuple.setTuple('o', 2, 2, 2, buf); + } + for(Tuple element: testTS){ + if(element != null){ + System.out.println(element.toString()); + } + } +// System.out.println("tmpTuple "+tmpTuple.toString()); +// System.out.println("testTS "+testTS[MAX_TUPLE-1].toString()); + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/fdl/TestPSXLinda.java Thu Feb 07 14:21:30 2008 +0900 @@ -0,0 +1,81 @@ + +/* + * @(#)TestPSXLinda.java 1.1 06/04/01 + * + * Copyright 2006 Shinji KONO + * + + Test PSX Lidna + + */ + +package fdl; + +import java.io.IOException; +import java.nio.ByteBuffer; + + +/** + * PSXLinda stream + * + * @author Shinji Kono + * + * @param host The host to connect to + * @param port The port to connect to at the host + + */ + + +class TestPSXLinda { + static int id; + public static void main (String args[]) { + + FederatedLinda fdl; + PSXLinda psx; + String host = "localhost"; + int port = 10000; + PSXReply r; + + + // try { + // port = Integer.parseInt(args[1]); + // } catch (NumberFormatException nfex) { } + try { + fdl = FederatedLinda.init(); + psx = fdl.open(host,port); + r = psx.in(65535); + //for(int i=0;i<100;i++) { + //if (1==0) { + //} + fdl.sync(1); + //} + System.out.println("Connected."); + + ByteBuffer data = ByteBuffer.allocate(10); + data.putInt(10); + + psx.out(1,data,4); + + while(!r.ready()) { + psx.sync(1000); + // System.out.println("Waiting...."); + } + } catch (IOException nfex) { + nfex.printStackTrace(); + System.out.println("Faild."); + return; + } + + print_id(r); + + + } + + public static void print_id (PSXReply ans) { + ByteBuffer r = ans.getData(); + id = r.getShort(); + System.out.print("ID = "); + System.out.println(id); + } +} +
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/fdl/Tuple.java Thu Feb 07 14:21:30 2008 +0900 @@ -0,0 +1,120 @@ + +package fdl; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.nio.channels.SocketChannel; + + +public class Tuple implements PSXQueueInterface { + public int mode; + public int id; + public int seq; + public int datalen; + public ByteBuffer command; + public ByteBuffer data; + public Tuple next; + public SocketChannel ch; + + static final boolean debug = false; + + public Tuple(SocketChannel _ch) { + ch = _ch; + } + + public Tuple() { + } + + public void setCommand(int _mode, int _seq) { + command = ByteBuffer.allocate(LINDA_HEADER_SIZE); + command.order(ByteOrder.BIG_ENDIAN); + + command.putInt(LINDA_PACKET_LENGTH_OFFSET, + this.datalen+LINDA_HEADER_SIZE-INT_SIZE); + command.put(LINDA_MODE_OFFSET,(byte)_mode); + command.putShort(LINDA_ID_OFFSET,(short)this.id); + command.putInt(LINDA_SEQ_OFFSET,_seq); + command.putInt(LINDA_DATA_LENGTH_OFFSET,this.datalen); + command.rewind(); + } + + + public void setCommand(int _mode, int _id, int _seq, int _datalen) { + command = ByteBuffer.allocate(LINDA_HEADER_SIZE); + command.order(ByteOrder.BIG_ENDIAN); + + command.putInt(LINDA_PACKET_LENGTH_OFFSET, + _datalen+LINDA_HEADER_SIZE-INT_SIZE); + command.put(LINDA_MODE_OFFSET,(byte)_mode); + command.putShort(LINDA_ID_OFFSET,(short)_id); + command.putInt(LINDA_SEQ_OFFSET,_seq); + command.putInt(LINDA_DATA_LENGTH_OFFSET,_datalen); + command.rewind(); + } + + public void setTuple(int _mode,int _id, int _seq, int _datalen, ByteBuffer _data) { + mode = _mode; + id = _id; + seq = _seq; + datalen = _datalen; + data = _data; + + if (debug) { + System.out.print("setTuple mode:"); + System.out.println(mode); + } + //setCommand(); + } + + public void setSeq(int _seq) { + seq = _seq; + } + + public void setMode(int _mode) { + mode = _mode; + } + + public void setDataLength(int _datalength) { + datalen = _datalength; + } + + public void setData(ByteBuffer _data) { + _data.rewind(); + data = _data; + } + + public int getMode() { + return mode; + } + + /*public int getId() { + return command.getShort(LINDA_ID_OFFSET); + }*/ + + public int getSeq() { + return seq; + } + + public int getdataLength() { + return datalen; + } + public ByteBuffer getData() { + data.rewind(); + return data; + } + + public ByteBuffer getCommand() { + return this.command; + } + /*public ByteBuffer getCommand() { + command = ByteBuffer.allocate(LINDA_HEADER_SIZE); + command.order(ByteOrder.BIG_ENDIAN); + command.putInt(LINDA_PACKET_LENGTH_OFFSET, LINDA_HEADER_SIZE+datalen-INT_SIZE); + command.put(LINDA_MODE_OFFSET, (byte)mode); + command.putShort(LINDA_ID_OFFSET,(short)id); + command.putInt(LINDA_SEQ_OFFSET, seq); + command.putInt(LINDA_DATA_LENGTH_OFFSET, datalen); + command.rewind(); + return command; + }*/ +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/fdl/TupleHandler.java Thu Feb 07 14:21:30 2008 +0900 @@ -0,0 +1,14 @@ + +package fdl; +import java.io.IOException; +import java.nio.channels.ClosedChannelException; +import java.nio.channels.SelectionKey; + +public interface TupleHandler { + static final int MAX_TUPLE = 65536; + static final int MAX_USER = 4; + static final int BUFSIZE = 65535; + public int user = 0; + public void handle(SelectionKey key) throws ClosedChannelException, IOException; + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/fdl/URLKicker.java Thu Feb 07 14:21:30 2008 +0900 @@ -0,0 +1,56 @@ + +/* + * @(#)PSXLindaURLKicker.java 1.1 06/04/01 + * + * Copyright 2006 Shinji KONO + * + + Kick some url + + */ + +package fdl; + +import java.net.*; +import java.io.*; +// mport java.nio.*; + + +/** + * URLKicker + * + * @author Shinji Kono + * + * @param args[] The URL to connect to + + */ + +class URLKicker { + public static char buf[]; + + public static void main (String args[]) { + InputStreamReader is; + buf = new char[1024]; + try { + URL a = new URL(args[0]); + URLConnection conn = a.openConnection(); + is = new InputStreamReader(conn.getInputStream()); + int ret = 0; + while ((ret = is.read(buf,0,1024)) > 0) { + processBuf(buf,ret); + } + // close the inputstream + is.close(); + } catch (IOException e) { + } + } + public static void processBuf (char buf[],int len) { +// System.out.print("data: "); +// for(int i=0;i<len;i++) { +// System.out.print(buf[i]); +// } + } + +} + +