Mercurial > hg > FederatedLinda
changeset 23:b4fd7fb9135a
Simple Test run.
author | kono |
---|---|
date | Wed, 20 Aug 2008 03:28:45 +0900 (2008-08-19) |
parents | 56e015e8f5dc |
children | 35375016b2f0 |
files | src/fdl/ComDebug_Client.java src/fdl/FederatedLinda.java src/fdl/IOHandler.java src/fdl/MetaLinda.java src/fdl/MetaLogEngine.java src/fdl/PSX.java src/fdl/PSXLinda.java src/fdl/PSXLindaInterface.java src/fdl/PSXQueue.java src/fdl/Tuple.java src/fdl/TupleSpace.java src/fdl/test/TestPSXLinda.java |
diffstat | 12 files changed, 154 insertions(+), 199 deletions(-) [+] |
line wrap: on
line diff
--- a/src/fdl/ComDebug_Client.java Tue Aug 19 16:02:48 2008 +0900 +++ b/src/fdl/ComDebug_Client.java Wed Aug 20 03:28:45 2008 +0900 @@ -80,16 +80,16 @@ psx.in(65535, new MyCallBack(psx)); System.out.println("COM_DEBUG Connected.["+host+":"+port+"]"); - psx.out(PSX.META_MONITOR, nullBuffer, 0); + psx.out(PSX.META_MONITOR, nullBuffer); debugCallback = new PSXCallback() { public void callback(ByteBuffer reply) { System.out.println(PSX.getdataString(reply)); - psx.out(PSX.META_MONITOR, nullBuffer, 0); + psx.out(PSX.META_MONITOR, nullBuffer); psx.in(PSX.META_MONITOR_DATA,debugCallback); } }; - psx.out(PSX.META_MONITOR, nullBuffer, 0); + psx.out(PSX.META_MONITOR, nullBuffer); psx.in(PSX.META_MONITOR_DATA,debugCallback); connect_num++;
--- a/src/fdl/FederatedLinda.java Tue Aug 19 16:02:48 2008 +0900 +++ b/src/fdl/FederatedLinda.java Wed Aug 20 03:28:45 2008 +0900 @@ -82,8 +82,8 @@ void(*callback)(char*,void*), void * obj): */ - public PSXReply psx_queue(PSXLinda linda, int id, ByteBuffer s, int length, int mode, PSXCallback callback) { - PSXQueue c = new PSXQueue(linda,id,mode,s,length,callback); + public PSXReply psx_queue(PSXLinda linda, int id, ByteBuffer s, int mode, PSXCallback callback) { + PSXQueue c = new PSXQueue(linda,id,mode,s,callback); if (q_top == null) { c = q_end = q_top = c; @@ -100,7 +100,7 @@ if (debug) { System.out.print("Integer compare="); System.out.println((new Integer(2)).equals(new Integer(2))); - System.out.print("Seding seq="); + System.out.print("Sedning seq="); System.out.println(p.seq); } if (r_top == null){ @@ -147,7 +147,7 @@ while (q_top != null){ PSXQueue c = q_top; - c.Send(); + c.send(); q_top = c.next; // psx_free(c); // q_top = c = t;
--- a/src/fdl/IOHandler.java Tue Aug 19 16:02:48 2008 +0900 +++ b/src/fdl/IOHandler.java Wed Aug 20 03:28:45 2008 +0900 @@ -11,7 +11,7 @@ import java.nio.channels.SocketChannel; public class IOHandler implements TupleHandler { - static final boolean debug = false; + static final boolean debug = true; public TupleSpace tupleSpace; String remoteString; @@ -53,9 +53,10 @@ // 読み込み while(readsize>0) { if(debug){ - System.out.print("reading command..."); + System.out.println("reading command..."+readsize); } count = channel.read(command); + if(count==0) throw new IOException(); if(count < 0) throw new IOException(); readsize -= count; } @@ -85,12 +86,13 @@ if (debug) { PSX.printData(command); } + manager_run(key, command, data); // I believe we don't need this //key.interestOps(key.interestOps() | SelectionKey.OP_READ ); assert((key.interestOps()& SelectionKey.OP_READ) !=0); } - public void manager_run(SelectionKey key, ByteBuffer command, ByteBuffer data, int len) throws IOException { + public void manager_run(SelectionKey key, ByteBuffer command, ByteBuffer data) throws IOException { command.order(ByteOrder.BIG_ENDIAN); int mode = command.get(PSX.LINDA_MODE_OFFSET); command.rewind(); @@ -98,7 +100,7 @@ if (debug) { System.out.println("data from : "+key.channel()); } - if((mode == '!') || (len == 0)) { + if(mode == '!') { tupleSpace.hook.closeHook(key); } else if(mode == PSX.PSX_CHECK) { tupleSpace.Check(key, command);
--- a/src/fdl/MetaLinda.java Tue Aug 19 16:02:48 2008 +0900 +++ b/src/fdl/MetaLinda.java Wed Aug 20 03:28:45 2008 +0900 @@ -56,18 +56,18 @@ replies.add(new MetaReply(PSX.PSX_CHECK,id,ts,callback)); } - public PSXReply out(int id, ByteBuffer data,int size) { + public PSXReply out(int id, ByteBuffer data) { MetaReply r = new MetaReply(PSX.PSX_OUT,id,ts,data,null); replies.add(r); return r; } - public PSXReply update(int id, ByteBuffer data,int size) { + public PSXReply update(int id, ByteBuffer data) { MetaReply r = new MetaReply(PSX.PSX_UPDATE,id,ts,data,null); return r; } - public void update(int id, ByteBuffer data,int size,PSXCallback callback) { + public void update(int id, ByteBuffer data,PSXCallback callback) { MetaReply r = new MetaReply(PSX.PSX_UPDATE,id,ts,data,callback); replies.add(r); } @@ -86,8 +86,6 @@ return this; } - public void send(ByteBuffer command,ByteBuffer data) { - } public int sync() { return sync(0);
--- a/src/fdl/MetaLogEngine.java Tue Aug 19 16:02:48 2008 +0900 +++ b/src/fdl/MetaLogEngine.java Wed Aug 20 03:28:45 2008 +0900 @@ -20,7 +20,7 @@ new PSXCallback() {public void callback(ByteBuffer reply) { meta.ts.hook = commDebug = new CommDebugHook(); ByteBuffer data = ByteBuffer.allocate(0) ; - meta.out(PSX.META_MONITOR_DATA, data, 0); + meta.out(PSX.META_MONITOR_DATA, data); meta.in(PSX.META_MONITOR,monitor_callback); }}; PSXCallback monitor_callback = @@ -31,7 +31,7 @@ if (data!=null) break; meta.sync(); } while (true); - meta.out(PSX.META_MONITOR_DATA, data, data.limit()); + meta.out(PSX.META_MONITOR_DATA, data); meta.in(PSX.META_MONITOR,monitor_callback); }};
--- a/src/fdl/PSX.java Tue Aug 19 16:02:48 2008 +0900 +++ b/src/fdl/PSX.java Wed Aug 20 03:28:45 2008 +0900 @@ -12,9 +12,12 @@ package fdl; +import java.io.IOException; import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.nio.CharBuffer; +import java.nio.channels.SelectionKey; +import java.nio.channels.SocketChannel; import java.nio.charset.CharacterCodingException; import java.nio.charset.Charset; import java.nio.charset.CharsetDecoder; @@ -104,7 +107,6 @@ static void setAnserCommand(ByteBuffer command, int seq) { command.put(LINDA_MODE_OFFSET, (byte)'a'); - command.rewind(); command.putInt(LINDA_SEQ_OFFSET, seq); command.rewind(); } @@ -135,6 +137,37 @@ sendtext = cb.toString(); return sendtext; } + + public static void send(SocketChannel ch, ByteBuffer command, ByteBuffer data) { + int send_size = LINDA_HEADER_SIZE; + int count = 0; + + if (command.position()!=0||command.limit()!=PSX.LINDA_HEADER_SIZE) + System.err.println("command length erron send"); + try { + //command Send + while(send_size > 0){ + count = ch.write(command); + if(count < 0) throw new IOException(); + send_size -= count; + } + + if (data==null) return; + //data Send + while(data.remaining() > 0){ + count = ch.write(data); + if(count < 0) throw new IOException(); + } + } catch (IOException e) { + System.out.println("Write Falied on:"+ch); + return; + } + } + + public static void send(SelectionKey key, ByteBuffer command, ByteBuffer data) { + SocketChannel ch = (SocketChannel)key.channel(); + send(ch,command,data); + } }
--- a/src/fdl/PSXLinda.java Tue Aug 19 16:02:48 2008 +0900 +++ b/src/fdl/PSXLinda.java Wed Aug 20 03:28:45 2008 +0900 @@ -36,7 +36,7 @@ public class PSXLinda implements PSXLindaInterface { private FederatedLinda fdl; - private SocketChannel socketChannel; + SocketChannel socketChannel; public String host; public int port; public int mytsid; @@ -96,44 +96,44 @@ } public PSXReply in(int id) { - PSXReply r = fdl.psx_queue(this, id, null, 0, PSX.PSX_IN, (PSXCallback)null); + PSXReply r = fdl.psx_queue(this, id, null, PSX.PSX_IN, (PSXCallback)null); return r; } public void in(int id, PSXCallback callback) { - fdl.psx_queue(this, id, null, 0, PSX.PSX_IN, callback); + fdl.psx_queue(this, id, null, PSX.PSX_IN, callback); } public PSXReply ck(int id) { - PSXReply r = fdl.psx_queue(this, id, null, 0, PSX.PSX_IN, null); + PSXReply r = fdl.psx_queue(this, id, null, PSX.PSX_IN, null); return r; } public void ck(int id, PSXCallback callback) { - fdl.psx_queue(this, id, null, 0, PSX.PSX_IN, callback); + fdl.psx_queue(this, id, null, PSX.PSX_IN, callback); } - public PSXReply out(int id, ByteBuffer data,int size) { - PSXReply r = fdl.psx_queue(this, id, data, size, PSX.PSX_OUT, null); + public PSXReply out(int id, ByteBuffer data) { + PSXReply r = fdl.psx_queue(this, id, data, PSX.PSX_OUT, null); return r; } - public PSXReply update(int id, ByteBuffer data,int size) { - PSXReply r = fdl.psx_queue(this, id, data, size, PSX.PSX_UPDATE, null); + public PSXReply update(int id, ByteBuffer data) { + PSXReply r = fdl.psx_queue(this, id, data, PSX.PSX_UPDATE, null); return r; } - public void update(int id, ByteBuffer data,int size,PSXCallback callback) { - fdl.psx_queue(this, id, data, size, PSX.PSX_UPDATE, callback); + public void update(int id, ByteBuffer data,PSXCallback callback) { + fdl.psx_queue(this, id, data, PSX.PSX_UPDATE, callback); } public PSXReply rd(int id) { - PSXReply r = fdl.psx_queue(this, id, null, 0, PSX.PSX_RD, null); + PSXReply r = fdl.psx_queue(this, id, null, PSX.PSX_RD, null); return r; } public void rd(int id, PSXCallback callback) { - fdl.psx_queue(this, id, null, 0, PSX.PSX_RD, callback); + fdl.psx_queue(this, id, null, PSX.PSX_RD, callback); } public PSXLindaInterface add(PSXLindaInterface linda) { @@ -141,22 +141,6 @@ return this; } - public void send(ByteBuffer command,ByteBuffer data) - throws IOException { - if (debug) { - checkConnect("send"); - if (command == null) { - System.out.println("PSXLinda:command is null"); - } - if (data == null) { - System.out.println("PSXLinda:data is null"); - } - } - socketChannel.write(command); - if (data != null) - socketChannel.write(data); - } - public int sync() throws IOException { return fdl.sync();
--- a/src/fdl/PSXLindaInterface.java Tue Aug 19 16:02:48 2008 +0900 +++ b/src/fdl/PSXLindaInterface.java Wed Aug 20 03:28:45 2008 +0900 @@ -14,11 +14,11 @@ public void ck(int id, PSXCallback callback) ; - public PSXReply out(int id, ByteBuffer data,int size) ; + public PSXReply out(int id, ByteBuffer data) ; - public PSXReply update(int id, ByteBuffer data,int size) ; + public PSXReply update(int id, ByteBuffer data) ; - public void update(int id, ByteBuffer data,int size,PSXCallback callback) ; + public void update(int id, ByteBuffer data,PSXCallback callback) ; public PSXReply rd(int id) ; @@ -26,8 +26,6 @@ public PSXLindaInterface add(PSXLindaInterface linda) ; - public void send(ByteBuffer command,ByteBuffer data) throws IOException ; - public int sync() throws IOException ; public int sync(long mtime) throws IOException ;
--- a/src/fdl/PSXQueue.java Tue Aug 19 16:02:48 2008 +0900 +++ b/src/fdl/PSXQueue.java Wed Aug 20 03:28:45 2008 +0900 @@ -25,38 +25,39 @@ public int tspace_id; public int id; public int mode; + public ByteBuffer data; public int size; - public ByteBuffer data; public ByteBuffer command; public int seq; public PSXCallback callback; public PSXQueue next; public PSXLinda linda; - public PSXQueue( PSXLinda _linda,int _id,int _mode,ByteBuffer _data,int _size,PSXCallback _callback) { + public PSXQueue( PSXLinda _linda,int _id,int _mode,ByteBuffer _data,PSXCallback _callback) { linda = _linda; id = _id; data = _data; - size = _size; + if (data!=null) + size = _data.limit()-_data.position(); + else + size = 0; mode = _mode; callback = _callback; setCommand(); } private void setCommand() { - command = PSX.setCommand(mode,id,size,seq); + command = PSX.setCommand(mode,id,seq,size); } public void setSeq(int _seq) { seq = _seq; command.putInt(PSX.LINDA_SEQ_OFFSET,seq); + command.rewind(); } - public void Send() - throws IOException { - if (command!=null) command.rewind(); - if (data!=null) data.rewind(); - linda.send(command,data); + public void send() { + PSX.send(linda.socketChannel, command, data); } }
--- a/src/fdl/Tuple.java Tue Aug 19 16:02:48 2008 +0900 +++ b/src/fdl/Tuple.java Wed Aug 20 03:28:45 2008 +0900 @@ -4,39 +4,36 @@ import java.nio.ByteBuffer; import java.nio.channels.SocketChannel; - public class Tuple { public int mode; public int id; public int seq; - public int datalen; public ByteBuffer command; public ByteBuffer data; + public SocketChannel ch; public Tuple next; - public SocketChannel ch; static final boolean debug = false; - public Tuple(SocketChannel _ch) { - ch = _ch; - } - public Tuple() { } + public Tuple(SocketChannel ch) { + this.ch = ch; + } + public void setCommand(int _mode, int _seq) { - setCommand( _mode, id, _seq,datalen); + setCommand( _mode, id, _seq,data.limit()); } public void setCommand(int _mode, int _id, int _seq, int _datalen) { command = PSX.setCommand(_mode, _id, _seq, _datalen); } - public void setTuple(int _mode,int _id, int _seq, int _datalen, ByteBuffer _data) { + public void setTuple(int _mode,int _id, int _seq, ByteBuffer _data) { mode = _mode; id = _id; seq = _seq; - datalen = _datalen; data = _data; if (debug) { @@ -53,13 +50,8 @@ public void setMode(int _mode) { mode = _mode; } - - public void setDataLength(int _datalength) { - datalen = _datalength; - } public void setData(ByteBuffer _data) { - _data.rewind(); data = _data; } @@ -67,19 +59,15 @@ return mode; } - /*public int getId() { - return command.getShort(LINDA_ID_OFFSET); - }*/ - public int getSeq() { return seq; } public int getdataLength() { - return datalen; + return data.limit(); } + public ByteBuffer getData() { - data.rewind(); return data; }
--- a/src/fdl/TupleSpace.java Tue Aug 19 16:02:48 2008 +0900 +++ b/src/fdl/TupleSpace.java Wed Aug 20 03:28:45 2008 +0900 @@ -35,18 +35,17 @@ user++; ByteBuffer data = ByteBuffer.allocate(2); - data.clear(); userchar[0] = (byte) (user/10 + '0'); userchar[1] = (byte) (user%10 + '0'); - data.put(userchar[0]); data.put(userchar[1]); + data.flip(); - data.rewind(); tmpTuple.setData(data); //Tuple int id = TupleHandler.MAX_TUPLE-1; - tmpTuple.setTuple('o', id, 0, data.limit(), data); + int seq = 0; + tmpTuple.setTuple('o', id, seq, data); System.out.println("Server: assign id "+user); } @@ -69,7 +68,7 @@ while((tuple_space[id] != null) && ((tuple_space[id].mode == PSX.PSX_WAIT_RD)||(tuple_space[id].mode == PSX.PSX_RD))) { PSX.setAnserCommand(command, tuple_space[id].getSeq()); - send(tuple_space[id].ch, command, data); + PSX.send(tuple_space[id].ch, command, data); removeTuple(id); tuple = null; @@ -81,7 +80,7 @@ int sendsize = datasize+PSX.LINDA_HEADER_SIZE; System.out.println("send size "+sendsize+" : mode = "+(char)'a'); } - send(tuple_space[id].ch, command, data); + PSX.send(tuple_space[id].ch, command, data); removeTuple(id); tuple = null; } else if ((tuple_space[id] == null)|| (tuple_space[id].getMode() == PSX.PSX_OUT)) { @@ -99,7 +98,6 @@ tuple.setMode('o'); tuple.setSeq(seq); tuple.setData(data); - tuple.setDataLength(datasize); if(debug){ System.out.println("data inserted len = "+tuple.getdataLength()+" : id = "+id); } @@ -139,7 +137,6 @@ hook.waitReadHook(key,id,seq,(char)mode); tuple.ch = (SocketChannel) key.channel(); - tuple.setDataLength(0); ByteBuffer buff = ByteBuffer.allocate(0); tuple.setData(buff); tuple.next = tuple_space[id]; @@ -157,7 +154,7 @@ //send ByteBuffer sendcommand = tuple.getCommand(); ByteBuffer senddata = tuple.getData(); - send(key,sendcommand, senddata); + PSX.send(key,sendcommand, senddata); } } @@ -186,7 +183,7 @@ } if (tuple != null && (tuple.mode == 'o')){ - tupleIsAvailable(command, mode, tuple, id, temp); + tuple = tupleIsAvailable(command, mode, tuple, id, temp); } else { tuple = setupWait(key, command, mode, tuple, id); } @@ -211,9 +208,8 @@ return null; } - private void tupleIsAvailable(ByteBuffer command, int mode, Tuple tuple, + private Tuple tupleIsAvailable(ByteBuffer command, int mode, Tuple tuple, int id, Tuple temp) { - //tmpTuple = new Tuple((SocketChannel)key.channel()); int seq = command.getInt(PSX.LINDA_SEQ_OFFSET); command.rewind(); tuple.setCommand('a', seq); @@ -222,16 +218,8 @@ int sendsize = tuple.getdataLength()+PSX.LINDA_HEADER_SIZE; System.out.println("send size "+sendsize+" : mode = "+(char)tuple.getMode()); } - - - //INの場合はremoveする if(mode == PSX.PSX_IN) { - if(tuple.data != null){ - //ByteBuffer buff = ByteBuffer.allocate(0); - //tmpTuple.setData(buff); - tuple.data = null; - } if(temp != null){ temp.next = tuple.next; } @@ -239,6 +227,7 @@ tuple_space[id] = tuple.next; } } + return tuple; } private Tuple setupWait(SelectionKey key, ByteBuffer command, int mode, @@ -259,9 +248,7 @@ command.rewind(); tuple.setSeq(seq2); tuple.ch = (SocketChannel) key.channel(); - tuple.setDataLength(0); ByteBuffer buff = ByteBuffer.allocate(0); - buff.rewind(); tuple.setData(buff); tuple = null; @@ -273,7 +260,7 @@ protected void Check(SelectionKey key, ByteBuffer command) throws IOException { ByteBuffer data = check1(key,command); - send(key, command, data); + PSX.send(key, command, data); } public ByteBuffer check1(SelectionKey key,ByteBuffer command) { @@ -292,7 +279,7 @@ tmpTuple = tmpTuple.next; } if (tmpTuple != null && (tmpTuple.mode == 'o')) { - command.putInt(PSX.LINDA_DATA_LENGTH_OFFSET, tmpTuple.datalen); + command.putInt(PSX.LINDA_DATA_LENGTH_OFFSET, tmpTuple.getdataLength()); command.rewind(); data = tmpTuple.getData(); }else { @@ -304,44 +291,5 @@ return data; } - public void send(SocketChannel ch, ByteBuffer command, ByteBuffer data) { - if (debug) { - if (command == null) { - System.out.println("Manager_run: command is null"); - } - if (data == null) { - System.out.println("Manager_run: data is null"); - } - } - int send_size = PSX.LINDA_HEADER_SIZE; - int count = 0; - - try { - //command Send - command.rewind(); - while(send_size > 0){ - count = ch.write(command); - if(count < 0) throw new IOException(); - send_size -= count; - } - - if (data==null) return; - //data Send - data.rewind(); - while(data.remaining() > 0){ - count = ch.write(data); - if(count < 0) throw new IOException(); - } - } catch (IOException e) { - System.out.println("Write Falied on:"+ch); - return; - } - } - - public void send(SelectionKey key, ByteBuffer command, ByteBuffer data) { - SocketChannel ch = (SocketChannel)key.channel(); - send(ch,command,data); - } - } \ No newline at end of file
--- a/src/fdl/test/TestPSXLinda.java Tue Aug 19 16:02:48 2008 +0900 +++ b/src/fdl/test/TestPSXLinda.java Wed Aug 20 03:28:45 2008 +0900 @@ -34,65 +34,68 @@ class TestPSXLinda { - static int id; - public static void main (String args[]) { + static int id; + public static void main (String args[]) { + + FederatedLinda fdl; + PSXLindaInterface psx; + String host; + int port = 10000; + PSXReply r; + InetSocketAddress localAddress; - FederatedLinda fdl; - PSXLindaInterface psx; - String host; - int port = 10000; - PSXReply r; - InetSocketAddress localAddress; - - try { - localAddress = new InetSocketAddress(InetAddress.getLocalHost(), port); - host = localAddress.getHostName(); - } catch (UnknownHostException e) { - // TODO Auto-generated catch block - host = "localhost"; - } - + try { + localAddress = new InetSocketAddress(InetAddress.getLocalHost(), port); + host = localAddress.getHostName(); + } catch (UnknownHostException e) { + // TODO Auto-generated catch block + host = "localhost"; + } + - // try { - // port = Integer.parseInt(args[1]); - // } catch (NumberFormatException nfex) { } - try { - fdl = FederatedLinda.init(); - psx = fdl.open(host,port); - r = psx.in(65535); - //for(int i=0;i<100;i++) { - //if (1==0) { - //} - fdl.sync(1); - //} - System.out.println("Connected."); + // try { + // port = Integer.parseInt(args[1]); + // } catch (NumberFormatException nfex) { } + try { + fdl = FederatedLinda.init(); + psx = fdl.open(host,port); + r = psx.in(65535); + //for(int i=0;i<100;i++) { + //if (1==0) { + //} + fdl.sync(1); + //} + System.out.println("Connected."); - ByteBuffer data = ByteBuffer.allocate(10); - data.putInt(10); + ByteBuffer data = ByteBuffer.allocate(10); + data.putInt(10); + data.flip(); + + psx.out(1,data); - psx.out(1,data,4); - - while(!r.ready()) { - psx.sync(1000); - // System.out.println("Waiting...."); - } - } catch (IOException nfex) { - nfex.printStackTrace(); - System.out.println("Faild."); - return; + int cnt=0; + while(!r.ready()) { + // psx.sync(1000); + psx.sync(); + System.out.println("Waiting...."+(cnt++)); + } + } catch (IOException nfex) { + nfex.printStackTrace(); + System.out.println("Faild."); + return; + } + + print_id(r); + + } - - print_id(r); - - } - - public static void print_id (PSXReply ans) { - ByteBuffer r = ans.getData(); - id = r.getShort(); - System.out.print("ID = "); - System.out.println(id); - } + public static void print_id (PSXReply ans) { + ByteBuffer r = ans.getData(); + id = r.getShort(); + System.out.print("ID = "); + System.out.println(id); + } }