Mercurial > hg > RemoteEditor > Eclipse
changeset 205:540d7a8a9e33
add sessionmanager's file
author | one |
---|---|
date | Sat, 18 Dec 2010 17:35:25 +0900 |
parents | aaab17635d0c |
children | 563057fe244e |
files | src/remoteeditor/command/REPCommand.java src/rep/REP.java src/rep/REPCommand.java src/rep/REPCommandPacker.java src/rep/channel/ChannelSimulator.java src/rep/channel/NetworkSimulator.java src/rep/channel/NullSelector.java src/rep/channel/REPLogger.java src/rep/channel/REPPack.java src/rep/channel/REPSelectionKey.java src/rep/channel/REPSelector.java src/rep/channel/REPServerSocket.java src/rep/channel/REPServerSocketChannel.java src/rep/channel/REPSocketChannel.java src/rep/channel/SelectionKeySimulator.java src/rep/channel/SelectorSimulator.java src/rep/channel/ServerChannelSimulator.java |
diffstat | 17 files changed, 1742 insertions(+), 0 deletions(-) [+] |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/remoteeditor/command/REPCommand.java Sat Dec 18 17:35:25 2010 +0900 @@ -0,0 +1,76 @@ +package remoteeditor.command; + +import rep.REP; + +public class REPCommand { + public REP cmd; + public int sid; + public int eid; + public int seq; + public int len; + public int lineno; + + public String string; + // if this command is an undo command, it has it's original + public REPCommand original; + + public REPCommand(REP cmd,int sid,int eid, int seq, int lineno, String string) { + this.cmd = cmd; + this.sid = sid; + this.eid = eid; + this.seq = seq; + this.lineno = lineno; + this.string = string; + this.original = null; + } + + public REPCommand(REPCommand cmd){ + this(cmd.cmd, cmd.sid, cmd.eid, cmd.seq, cmd.lineno, cmd.string); + } + + public REPCommand() { + } + + public REPCommand(int cmd, int sid, int eid, int seq, int lineno, + int textsiz, String string) { + this.cmd = REP.newREP(cmd); + this.sid = sid; + this.eid = eid; + this.seq = seq; + this.lineno = lineno; + this.string = string; + this.original = null; + } + + public String toString(){ + String repCmdString = cmd + ",sid=" + sid + ",eid=" + eid + ",seq=" + seq + + ",lineno=" + lineno ; + if (string!=null) repCmdString += ",sz=" + string.length() +"," + string; + return repCmdString; + } + + public void setEID(int eid2) { + this.eid = eid2; + } + + public void setCMD(REP cmd2) { + this.cmd = cmd2; + } + + public void setSID(int sessionID) { + this.sid = sessionID; + } + + public void setString(String string2) { + string = string2; + } + + public void setSEQID(int i) { + seq = i; + } + + public boolean isSameSeq(REPCommand commit) { + return seq==commit.seq && sid==commit.sid && eid==commit.eid; + } + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/rep/REP.java Sat Dec 18 17:35:25 2010 +0900 @@ -0,0 +1,68 @@ +package rep; + +public enum REP { + REPCMD_INSERT_USER ( 5), + REPCMD_INSERT ( 6), + REPCMD_INSERT_ACK ( 7), + REPCMD_DELETE_USER ( 8), + REPCMD_DELETE ( 9), + REPCMD_DELETE_ACK ( 10), + REPCMD_CLOSE ( 11), + REPCMD_CLOSE_2 ( 12), + REPCMD_MERGE_MARK(13), + REPCMD_NOP ( 15), + SMCMD_JOIN ( 41), + SMCMD_JOIN_ACK ( 42), + SMCMD_PUT ( 45), + SMCMD_PUT_ACK ( 46), + SMCMD_SELECT ( 47), + SMCMD_SELECT_ACK ( 48), + SMCMD_SELECT0(49), + SMCMD_QUIT ( 53), + SMCMD_QUIT_ACK ( 54), + SMCMD_SM_JOIN ( 62), + SMCMD_SM_JOIN_ACK ( 63), + SMCMD_UPDATE ( 65), + SMCMD_UPDATE_ACK ( 66), + SMCMD_START_MERGE ( 75), + SMCMD_START_MERGE_ACK ( 76), + SMCMD_END_MERGE ( 77), + SMCMD_QUIT_2 ( 67), + SMCMD_QUIT_2_ACK ( 68), + + + SM_EID ( -1), + MERGE_EID ( -2), + + SMCMD_SYNC ( 83), + SMCMD_SYNC_ACK ( 84), + SMCMD_SLEEP(85); + + public final int id; + + REP(int id) { + this.id = id; + } + + static int max = 0; + static int min = 100; + + static REP rep[] ; + static { + // Certainly this is ridiculous... + for (REP r : REP.values()) { + if (max<r.id) max = r.id; + if (min>r.id) min = r.id; + } + rep = new REP[max-min+1]; + for (REP r : REP.values()) { + rep[r.id-min] = r; + } + } + + public static REP newREP(int id) { + // return new REP(id); this does not work... + return rep[id-min]; + } + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/rep/REPCommand.java Sat Dec 18 17:35:25 2010 +0900 @@ -0,0 +1,76 @@ +package rep; + +import rep.REP; + +public class REPCommand { + public REP cmd; + public int sid; + public int eid; + public int seq; + public int len; + public int lineno; + + public String string; + // if this command is an undo command, it has it's original + public REPCommand original; + + public REPCommand(REP cmd,int sid,int eid, int seq, int lineno, String string) { + this.cmd = cmd; + this.sid = sid; + this.eid = eid; + this.seq = seq; + this.lineno = lineno; + this.string = string; + this.original = null; + } + + public REPCommand(REPCommand cmd){ + this(cmd.cmd, cmd.sid, cmd.eid, cmd.seq, cmd.lineno, cmd.string); + } + + public REPCommand() { + } + + public REPCommand(int cmd, int sid, int eid, int seq, int lineno, + int textsiz, String string) { + this.cmd = REP.newREP(cmd); + this.sid = sid; + this.eid = eid; + this.seq = seq; + this.lineno = lineno; + this.string = string; + this.original = null; + } + + public String toString(){ + String repCmdString = cmd + ",sid=" + sid + ",eid=" + eid + ",seq=" + seq + + ",lineno=" + lineno ; + if (string!=null) repCmdString += ",sz=" + string.length() +"," + string; + return repCmdString; + } + + public void setEID(int eid2) { + this.eid = eid2; + } + + public void setCMD(REP cmd2) { + this.cmd = cmd2; + } + + public void setSID(int sessionID) { + this.sid = sessionID; + } + + public void setString(String string2) { + string = string2; + } + + public void setSEQID(int i) { + seq = i; + } + + public boolean isSameSeq(REPCommand commit) { + return seq==commit.seq && sid==commit.sid && eid==commit.eid; + } + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/rep/REPCommandPacker.java Sat Dec 18 17:35:25 2010 +0900 @@ -0,0 +1,128 @@ +package rep; + + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.CharBuffer; +import java.nio.channels.SocketChannel; +import java.nio.charset.CharacterCodingException; +import java.nio.charset.Charset; +import java.nio.charset.CharsetDecoder; +import java.nio.charset.CharsetEncoder; + +import rep.channel.REPPack; + +/* +//+-------+--------+--------+-------+--------+---------+------+ +//| cmd | session| editor | seqid | lineno | textsiz | text | +//| | id | id | | | | | +//+-------+--------+--------+-------+--------+---------+------+ +//o-------header section (network order)-------------o +int cmd; // command +int sid; // session ID : uniqu to editing file +int eid; // editor ID : owner editor ID = 1。Session に対して unique +int seqno; // Sequence number : sequence number はエディタごとに管理 +int lineno; // line number +int textsize; // textsize : bytesize +byte[] text; +*/ + + +public class REPCommandPacker implements REPPack<REPCommand> { + private static final int TEXTSIZELIMIT = 0x7000000; + private static final int HEADER_SIZE = 24; + // JIS/S-JIS = 2, UTF-8 = 3, UTF-?? = 5 + private static final int CHARSIZE = 5; + + Charset charset = Charset.forName("UTF-8"); + CharsetEncoder encoder = charset.newEncoder(); + CharsetDecoder decoder = charset.newDecoder(); + + /* (non-Javadoc) + * @see rep.REPPack#packUConv(rep.REPCommand) + */ + public ByteBuffer packUConv(REPCommand command){ + int size = 0; + if (command.string!=null) size = command.string.length()*CHARSIZE; + ByteBuffer buffer = ByteBuffer.allocateDirect(HEADER_SIZE+size); + buffer.clear(); // position = 0 + buffer.putInt(command.cmd.id); + buffer.putInt(command.sid); + buffer.putInt(command.eid); + buffer.putInt(command.seq); + buffer.putInt(command.lineno); + + int pos = buffer.position(); + buffer.putInt(0); + int pos1 = buffer.position(); + + if (command.string!=null) { + //Encode to UTF8 + CharBuffer cb = CharBuffer.wrap(command.string); + try { + encoder.encode(cb, buffer, true); + } catch (IllegalStateException e) { + buffer.position(pos1); + } + } + + //Encoded string length set + int length = buffer.position() -pos1 ; + buffer.putInt(pos, length); + buffer.limit(HEADER_SIZE+length); + buffer.rewind(); + + return buffer; + } + + + public REPCommand unpackUConv(SocketChannel sc) throws IOException { + ByteBuffer header = ByteBuffer.allocateDirect(HEADER_SIZE); + header.clear(); + + while(header.remaining()>0){ + if (sc.read(header)<0) throw new IOException(); + } + + header.rewind(); // position = 0 + + int cmd = header.getInt(); + int sid = header.getInt(); + int eid = header.getInt(); + int seqid = header.getInt(); + int lineno = header.getInt(); + int textsiz = header.getInt(); + + /** + * We should avoid large reading here. Large command should be + * broke in smaller one. It should be easy. + */ + if (textsiz>TEXTSIZELIMIT||textsiz<0) { + // corrupted packet + throw new IOException(); + } + ByteBuffer textBuffer = ByteBuffer.allocateDirect(textsiz); + + while(textBuffer.remaining()>0){ + if (sc.read(textBuffer)<0) throw new IOException(); + } + textBuffer.rewind(); + + //Decode UTF-8 to System Encoding(UTF-16) + String string; + try { + CharBuffer cb; + cb = decoder.decode(textBuffer); + cb.rewind(); + string = cb.toString(); + } catch (CharacterCodingException e) { + string = ""; + } + textsiz = string.length(); + REPCommand repcommand = new REPCommand(cmd, sid, eid, seqid, lineno, textsiz, string); + //if (isLogging) + //System.out.println("Reading: "+repcommand); + return repcommand; + } + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/rep/channel/ChannelSimulator.java Sat Dec 18 17:35:25 2010 +0900 @@ -0,0 +1,186 @@ +package rep.channel; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.net.SocketAddress; +import java.nio.channels.ClosedChannelException; +import java.nio.channels.NotYetConnectedException; +import java.nio.channels.SelectableChannel; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.util.LinkedList; +import java.util.Queue; + +public class ChannelSimulator<P> extends REPSocketChannel<P>{ + protected NetworkSimulator ns; + protected Queue<P> readQ; + protected Selector selector = new NullSelector(); + protected boolean isBlocking; + protected ChannelSimulator<P> otherEnd; + protected SelectionKeySimulator<P> key; + private String ownerName; + + /** Constructors. */ + public ChannelSimulator(){ + super(null, null); + ns = NetworkSimulator.singleton(); + readQ = new LinkedList<P>(); + ownerName = Thread.currentThread().getName(); + } + + /** read from Queue. + * @throws IOException */ + public synchronized P read() throws IOException { + // We may lock selector instead of this, but it reduces + // concurrency, but be careful of dead lock. + P tmp; + while ( (tmp=readQ.poll())==null && isBlocking ) { + try { + wait(); // queue is empty + } catch (InterruptedException e) { + throw new IOException(); + } + } + // for write wait (we don't need this) + //otherEnd.wakeup(); selector.wakeup(); + return tmp; + } + + //private synchronized void wakeup() { + // notify(); + //} + + /** write packet to other end. */ + public boolean write(P p){ + if (otherEnd==null) throw new NotYetConnectedException(); + return otherEnd.enQ(p); + } + + /** otherEnd Channel enqueue p to own queue using this method. */ + + boolean enQ(P p) { + if (enQ1(p)) { + // don't lock this channel while calling selector.wakeup(). + // selector may lock this channel, which may cause dead lock. + selector.wakeup(); + return true; + } + return false; + } + + protected synchronized boolean enQ1(P p){ + while(true) { + if (readQ.offer(p)) { + notify(); // other end my wait() + break; + } else { // this can't happen + assert(false); + try { + wait(); // queue is full, we have to wait here + } catch (InterruptedException e) { + } + } + } + return true; + } + + /** Connecting methods */ + // for clients. + public boolean connect(InetSocketAddress ip){ + return ns.connect(ip, this); // ns.connectはotherEndを返した方がよいか? + } + + public boolean connect(SocketAddress ip){ + return ns.connect((InetSocketAddress)ip, this); + } + + void setOtherEnd(ChannelSimulator<P> other){ + otherEnd = other; + } + + public ChannelSimulator<P> accept(){ + return null; + } + + public boolean isAcceptable() { + return false; + } + public synchronized boolean isReadable() { + return !readQ.isEmpty(); + } + + public boolean isWritable() { + return true; + } + + @Override + public SelectableChannel configureBlocking(boolean block) throws IOException { + isBlocking = block; + return this; + } + + + @Override + public SelectionKey keyFor(Selector selector2) { + return ((SelectorSimulator<?>) selector2).getKey(this); + } + + @Override + public SelectionKey keyFor(REPSelector<?> sel) { + return (SelectionKey)((SelectorSimulator<?>) sel).getKey(this); + } + + @SuppressWarnings("unchecked") + @Override + public REPSelectionKey<P> keyFor1(REPSelector<P> sel) { + return (REPSelectionKey<P>)((SelectorSimulator<P>) sel).getKey(this); + } + + @SuppressWarnings("unchecked") + @Override + public SelectionKey register(Selector sel, int ops, Object att) throws ClosedChannelException { + SelectorSimulator<P> sel1 = (SelectorSimulator<P>) sel; + selector = sel1; + key = sel1.register(this, ops, att); + return key; + } + + @SuppressWarnings("unchecked") + public SelectionKey register(REPSelector<P> sel, int ops, Object att) throws ClosedChannelException { + selector = sel; + key = (SelectionKeySimulator<P>) sel.register(this, ops, att); + return key; + } + + + public boolean finishConnect() throws IOException { + return otherEnd!=null; + } + + public Socket socket() { + assert(false); + return null; + } + + @Override + public String getLocalHostName() { + return "localhost"; // always... + } + + public String toString(){ + return "ChSim("+ownerName+")"; + } + + public ChannelSimulator<P> newChannel() { + return new ChannelSimulator<P>(); + } + + @SuppressWarnings("unchecked") + public void setOtherEnd1(ChannelSimulator<?> other) { + otherEnd = (ChannelSimulator<P>) other; + } + + public void close1() throws IOException { + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/rep/channel/NetworkSimulator.java Sat Dec 18 17:35:25 2010 +0900 @@ -0,0 +1,112 @@ +package rep.channel; + +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.util.HashMap; +import java.util.LinkedList; + +public class NetworkSimulator { + // we don't use <P> because we need singleton. + public static NetworkSimulator ns = new NetworkSimulator(); + + public HashMap<SocketAddress,Integer>namedb = new HashMap<SocketAddress,Integer>(); + public int ipcount = 1; + public REPLogger logger; + + public static NetworkSimulator singleton() { + // singleton pattern may used here, but it has a little cost. + return ns; + } + + int logLevel=5; + /** Listening Servers. */ + private LinkedList<ServerData> serverList; + + /** Constructor. */ + public NetworkSimulator(){ + serverList = new LinkedList<ServerData>(); + logger = REPLogger.singleton(); + logger.writeLog("construct Networksimulator", 1); + // printAllState(); + } + + /* */ + synchronized public void listen(InetSocketAddress ip, ServerChannelSimulator<?> scs) { + serverList.add(new ServerData(ip, scs)); + logger.writeLog("listen", 1); + printAllState(); + } + + synchronized public boolean connect(InetSocketAddress ip, ChannelSimulator<?> CHclient) { + logger.writeLog("connecting..", 1); + for (ServerData sd0: serverList){ + // ANY address (0.0.0.0/0.0.0.0) should be considered. + if (sd0.IP.getAddress().isAnyLocalAddress()) { + if (sd0.IP.getPort() != ip.getPort()) continue; + // we have to check, ip is really reachable to sd0 server, + // but this simulator has no notion of host. To distinguish, + // use different port address. + } else if (!sd0.IP.equals(ip)) continue; + + //ChannelSimulator<?> CHserver = new ChannelSimulator<?>(); + ChannelSimulator<?> CHserver = CHclient.newChannel(); + CHserver.setOtherEnd1(CHclient); + CHclient.setOtherEnd1(CHserver); + + sd0.connectedListS.add(CHserver); + sd0.connectedListC.add(CHclient); + sd0.scs.enQ(CHserver); + logger.writeLog("connected", 1); + //printAllState(); + return true; + } + return false; + } + + /** for DEBUG methods. */ + void printAllState(){ + String log = "NetworkSimulator State:"; + for (ServerData sd: serverList){ + log += "\tSessionManager(ip="+sd.IP.toString()+"): "; + log += channelList(sd.connectedListC); + } + logger.writeLog(log); + } + + private String channelList(LinkedList<ChannelSimulator<?>> list){ + String tmp = ""; + for (ChannelSimulator<?> ch: list){ + tmp += ch.toString()+" "; + } + return "\t"+tmp; + } + + + + public synchronized int nslookup(SocketAddress semaIP) { + Integer ip; + if ((ip=namedb.get(semaIP))==null) { + namedb.put(semaIP, (ip=ipcount++)); + } + return ip; + } + + +} + +class ServerData { + //int virtualIP; + InetSocketAddress IP; + //SelectorSimulator<P> selector; + ServerChannelSimulator<?> scs; + LinkedList<ChannelSimulator<?>> connectedListS; + LinkedList<ChannelSimulator<?>> connectedListC; + + ServerData(InetSocketAddress ip, ServerChannelSimulator<?> _scs){ + IP = ip; + //selector = _selector; + scs = _scs; + connectedListS = new LinkedList<ChannelSimulator<?>>(); + connectedListC = new LinkedList<ChannelSimulator<?>>(); + } +} \ No newline at end of file
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/rep/channel/NullSelector.java Sat Dec 18 17:35:25 2010 +0900 @@ -0,0 +1,56 @@ +package rep.channel; + +import java.io.IOException; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.spi.SelectorProvider; +import java.util.Set; + +public class NullSelector extends Selector { + + @Override + public void close() throws IOException { + + } + + @Override + public boolean isOpen() { + return false; + } + + @Override + public Set<SelectionKey> keys() { + return null; + } + + @Override + public SelectorProvider provider() { + return null; + } + + @Override + public int select() throws IOException { + return 0; + } + + @Override + public int select(long timeout) throws IOException { + return 0; + } + + @Override + public int selectNow() throws IOException { + return 0; + } + + @Override + public Set<SelectionKey> selectedKeys() { + return null; + } + + @Override + public Selector wakeup() { + return this; + } + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/rep/channel/REPLogger.java Sat Dec 18 17:35:25 2010 +0900 @@ -0,0 +1,37 @@ +package rep.channel; + +public class REPLogger { + static REPLogger single = new REPLogger(); + + public static REPLogger singleton(){ +// if(single==null){ +// synchronized(REPLogger.class){ +// if(single==null) +// single = new REPLogger(); +// } +// } + return single; + } + protected REPLogger(){ + } + + private int logLevel=3; + /** simulation log command */ + public void writeLog(String log, int level){ + if ( level<=logLevel ) + System.out.println(Thread.currentThread().getName()+": "+log); + //System.out.flush(); + } + public void writeLog(String log){ + writeLog(log, 2); + } + + public void setLogLevel(int logLevel) { + this.logLevel = logLevel; + } + + public int logLevel() { + return logLevel; + } + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/rep/channel/REPPack.java Sat Dec 18 17:35:25 2010 +0900 @@ -0,0 +1,14 @@ +package rep.channel; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.SocketChannel; + +public interface REPPack<P> { + + public ByteBuffer packUConv(P command); + + public P unpackUConv(SocketChannel sc) throws IOException ; + + +} \ No newline at end of file
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/rep/channel/REPSelectionKey.java Sat Dec 18 17:35:25 2010 +0900 @@ -0,0 +1,91 @@ +package rep.channel; + +import java.io.IOException; +import java.nio.channels.SelectableChannel; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; + + +public class REPSelectionKey<P> extends SelectionKey { + SelectionKey key; + private REPSelector<P> selector; + + public REPSelectionKey() { + + } + + public REPSelectionKey(SelectionKey key,REPSelector<P>s) { + this.key = key; + this.selector = s; + attach(key.attachment()); + } + + @Override + public void cancel() { + key.cancel(); + } + + @Override + public SelectableChannel channel() { + if (REPServerSocketChannel.isSimulation) return key.channel(); + SelectableChannel sc = key.channel(); + SelectableChannel rsc = REPSocketChannel.channels.get(sc); + return rsc; + } + + @SuppressWarnings("unchecked") + public REPSocketChannel<P> channel1() { + assert (!REPServerSocketChannel.isSimulation); + SelectableChannel sc = key.channel(); + REPSocketChannel<P> rsc = (REPSocketChannel<P>) REPSocketChannel.channels.get(sc); + return rsc; + } + + @SuppressWarnings("unchecked") + public REPServerSocketChannel<P> serverSocketChannel() { + assert (!REPServerSocketChannel.isSimulation); + SelectableChannel sc = key.channel(); + REPServerSocketChannel<P> rsc = (REPServerSocketChannel<P>) REPSocketChannel.channels.get(sc); + return rsc; + } + + @Override + public int interestOps() { + return key.interestOps(); + } + + @Override + public SelectionKey interestOps(int ops) { + return key.interestOps(ops); + } + + @Override + public boolean isValid() { + return key.isValid(); + } + + @Override + public int readyOps() { + return key.readyOps(); + } + + @Override + public Selector selector() { + return selector; + } + + public REPSocketChannel<P> accept(REPPack<P> pack) throws IOException { + assert(!REPServerSocketChannel.isSimulation); + if (!key.isAcceptable()) throw new IOException(); + ServerSocketChannel ssc = (ServerSocketChannel)key.channel(); + if (ssc==null) return null; + SocketChannel ss = (SocketChannel)ssc.accept(); + //System.err.println("Accept in SelectionKey "+ss); + if (ss==null) return null; + return new REPSocketChannel<P>(ss, pack); + } + + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/rep/channel/REPSelector.java Sat Dec 18 17:35:25 2010 +0900 @@ -0,0 +1,94 @@ +package rep.channel; + +import java.io.IOException; +import java.nio.channels.ClosedChannelException; +import java.nio.channels.SelectableChannel; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.spi.AbstractSelector; +import java.nio.channels.spi.SelectorProvider; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Set; + + +public class REPSelector<P> extends Selector{ + + Selector selector; + + public REPSelector(AbstractSelector selector) { + this.selector = selector; + } + + public static <T>REPSelector<T> create() throws IOException{ + if(REPServerSocketChannel.isSimulation){ + return new SelectorSimulator<T>(); + } + return new REPSelector<T>(SelectorProvider.provider().openSelector()); + } + + @Override + public void close() throws IOException { + selector.close(); + } + + @Override + public boolean isOpen() { + return selector.isOpen(); + } + + @Override + public Set<SelectionKey> keys() { + return selector.keys(); + } + + @Override + public SelectorProvider provider() { + return selector.provider(); + } + + @Override + public int select() throws IOException { + return selector.select(); + } + + @Override + public int select(long timeout) throws IOException { + return selector.select(timeout); + } + + @Override + public int selectNow() throws IOException { + return selector.selectNow(); + } + + @Override + public Set<SelectionKey> selectedKeys() { + return selector.selectedKeys(); + } + + @Override + public Selector wakeup() { + return selector.wakeup(); + } + + public SelectionKey register(SelectableChannel ch, int ops, Object att) throws ClosedChannelException{ + return ch.register(selector, ops, att); + } + + public Set<REPSelectionKey<P>> selectedKeys1() { + Set<SelectionKey> keys = selector.selectedKeys(); + //System.err.println("Selected keys = "+keys); + HashSet<REPSelectionKey<P>> newKeys = new HashSet<REPSelectionKey<P>>(); + + for (Iterator<SelectionKey> it = keys.iterator();it.hasNext(); ) { + SelectionKey k = it.next(); + newKeys.add(new REPSelectionKey<P>(k,this)); + it.remove(); + } + return newKeys; + } + + + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/rep/channel/REPServerSocket.java Sat Dec 18 17:35:25 2010 +0900 @@ -0,0 +1,62 @@ +package rep.channel; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.ServerSocket; +import java.net.SocketAddress; +import java.nio.channels.ServerSocketChannel; + + +/* こいつはシミュレーションの時しか生成され無いゾ */ +public class REPServerSocket extends ServerSocket{ + ServerChannelSimulator<?> scs; + + public REPServerSocket(ServerChannelSimulator<?> channel) throws IOException { + scs = channel; + } + + public void bind(SocketAddress ip){ + scs.bind((InetSocketAddress)ip); + } + + @Override + public void close(){ + } + @Override + public ServerSocketChannel getChannel(){ + return null; + } + @Override + public InetAddress getInetAddress(){ + return null; + } + @Override + public int getLocalPort(){ + return 0; + } + @Override + public SocketAddress getLocalSocketAddress(){ + return null; + } + @Override + public int getReceiveBufferSize(){ + return 0; + } + @Override + public boolean getReuseAddress(){ + return false; + } + @Override + public int getSoTimeout(){ + return 0; + } + /* + @Override + public (){ + } + @Override + public (){ + }*/ + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/rep/channel/REPServerSocketChannel.java Sat Dec 18 17:35:25 2010 +0900 @@ -0,0 +1,134 @@ +package rep.channel; + +import java.io.IOException; +import java.net.ServerSocket; +import java.nio.channels.ClosedChannelException; +import java.nio.channels.SelectableChannel; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; +import java.nio.channels.spi.SelectorProvider; + +/* + * シミュレーションでは inheritance のServerChannelSimulator を生成、 + * リアルコミュニケーションでは 自身を生成、内部にもつ ServerSocketChannelを扱う + */ +public class REPServerSocketChannel<P> extends SelectableChannel { + + public static boolean isSimulation=false; + private ServerSocketChannel ssc; + private REPPack<P> packer; + + public REPServerSocketChannel() { + + } + + public static <T> REPServerSocketChannel<T> open(REPPack<T> packer) throws IOException{ + if(isSimulation){ + return new ServerChannelSimulator<T>(); + }else{ + return new REPServerSocketChannel<T>(ServerSocketChannel.open(), packer); + } + } + + public static <T> REPServerSocketChannel<T> open(SelectableChannel c,REPPack<T> packer) throws IOException{ + assert(!isSimulation); + return new REPServerSocketChannel<T>((ServerSocketChannel)c, packer); + } + + public REPServerSocketChannel(ServerSocketChannel open, REPPack<P> packer) { + ssc = open; + this.packer = packer; + REPSocketChannel.addChannel(ssc,this); + } + + public void close1() throws IOException { + REPSocketChannel.removeChannel(ssc); + ssc.close(); + } + + public REPServerSocketChannel(SelectableChannel channel, REPPack<P> packer) { + this.ssc = (ServerSocketChannel)channel; + this.packer = packer; + REPSocketChannel.addChannel(ssc,this); + } + + public REPSocketChannel<P> accept1() throws IOException { + return new REPSocketChannel<P>(ssc.accept(), packer); + } + + public SelectionKey register(REPSelector<P> sel, int ops, Object att) throws ClosedChannelException { + assert(!isSimulation); + if(sel!=null) + return sel.register(ssc, ops, att); + else + return null; + } + + public SocketChannel accept() throws IOException { + return accept1().sc; + } + + + public ServerSocket socket() { + return ssc.socket(); + } + + public SelectableChannel configureBlocking(boolean block) throws IOException + { + ssc.configureBlocking(block); + return this; + } + + @Override + public Object blockingLock() { + // TODO Auto-generated method stub + return null; + } + + @Override + public boolean isBlocking() { + // TODO Auto-generated method stub + return false; + } + + @Override + public boolean isRegistered() { + // TODO Auto-generated method stub + return false; + } + + @Override + public SelectionKey keyFor(Selector sel) { + // TODO Auto-generated method stub + return null; + } + + @Override + public SelectorProvider provider() { + // TODO Auto-generated method stub + return null; + } + + @Override + public SelectionKey register(Selector sel, int ops, Object att) + throws ClosedChannelException { + // TODO Auto-generated method stub + return null; + } + + @Override + public int validOps() { + // TODO Auto-generated method stub + return 0; + } + + @Override + protected void implCloseChannel() throws IOException { + // TODO Auto-generated method stub + + } + + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/rep/channel/REPSocketChannel.java Sat Dec 18 17:35:25 2010 +0900 @@ -0,0 +1,182 @@ +package rep.channel; + +import java.io.IOException; +import java.net.Socket; +import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.ClosedChannelException; +import java.nio.channels.SelectableChannel; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.SocketChannel; +import java.nio.channels.spi.AbstractSelector; +import java.nio.channels.spi.SelectorProvider; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +public class REPSocketChannel<P> extends SelectableChannel{ + + public SocketChannel sc; + private REPPack<P> pack; + static public Map<SelectableChannel,SelectableChannel> channels = + Collections.synchronizedMap(new HashMap<SelectableChannel,SelectableChannel>()); + + public REPSocketChannel(SocketChannel channel, REPPack<P> packer) { + sc = channel; + pack = packer; + addChannel(sc,this); + } + + public REPSocketChannel(SelectableChannel channel, REPPack<P> packer) { + sc = (SocketChannel)channel; + pack = packer; + addChannel(sc,this); + } + + public static void addChannel(SelectableChannel sc,SelectableChannel rc) { + channels.put(sc, rc); + } + + public void close1() throws IOException { + removeChannel(sc); + sc.close(); + } + + public static void removeChannel(SelectableChannel sc) throws IOException { + if(channels.containsKey(sc)) channels.remove(sc); + } + + @Override + public Object blockingLock() { + return sc.blockingLock(); + } + + @Override + public SelectableChannel configureBlocking(boolean block) throws IOException { + return sc.configureBlocking(block); + } + + @Override + public boolean isBlocking() { + return sc.isBlocking(); + } + + @Override + public boolean isRegistered() { + return sc.isRegistered(); + } + + @Override + public SelectionKey keyFor(Selector sel) { + return sc.keyFor(sel); + } + + public SelectionKey keyFor(REPSelector<?> sel) { + return sc.keyFor(sel.selector); + } + + public REPSelectionKey<P> keyFor1(REPSelector<P> sel) { + return new REPSelectionKey<P>(sc.keyFor(sel.selector), + new REPSelector<P>((AbstractSelector) sel.selector)); + } + + @Override + public SelectorProvider provider() { + return sc.provider(); + } + + + @Override + public int validOps() { + return sc.validOps(); + } + + @Override + protected void implCloseChannel() throws IOException { + close1(); + } + + + public int read(ByteBuffer header) throws IOException { + return sc.read(header); + } + + public void write(ByteBuffer buffer) throws IOException { + sc.write(buffer); + + } + + public boolean finishConnect() throws IOException { + return sc.finishConnect(); + } + + public Socket socket() { + return sc.socket(); + } + + public P read() throws IOException{ + return pack.unpackUConv(sc); + } + + public boolean write(P p){ + ByteBuffer bb = pack.packUConv(p); + if (bb==null) return true; + try { + while (bb.remaining() > 0 ){ + sc.write(bb); + } + return true; + } catch (IOException e) { + return false; + } + } + + public static <T> REPSocketChannel<T> create(REPPack<T> packer) throws IOException { + if (REPServerSocketChannel.isSimulation) { + return new ChannelSimulator<T>(); + } else { + REPSocketChannel<T> rsc = new REPSocketChannel<T>(SocketChannel.open(), packer); + return rsc; + } + } + + + public boolean connect(SocketAddress semaIP) throws IOException { + return sc.connect(semaIP); + } + + public SelectionKey register(REPSelector<P> sel, int ops, Object att) throws ClosedChannelException { + return sc.register(sel.selector, ops, att); + } + + + + public SelectionKey register1(REPSelector<P> sel, int ops, Object att) + throws ClosedChannelException { +// if(sel instanceof REPSelector) { // should be always true... +// REPSelector<P> s = (REPSelector<P>)sel; +// return sc.register(s.selector, ops,att); + return sc.register(sel.selector, ops,att); +// } +// return sc.register(sel, ops,att); + } + + @SuppressWarnings("unchecked") + @Override + public SelectionKey register(Selector sel, int ops, Object att) + throws ClosedChannelException { + if(sel instanceof REPSelector) { + REPSelector<P> s = (REPSelector<P>)sel; + return sc.register(s.selector, ops,att); + } + return sc.register(sel, ops,att); + } + + public String getLocalHostName() { + return sc.socket().getLocalAddress().getHostName(); + + } + + +} \ No newline at end of file
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/rep/channel/SelectionKeySimulator.java Sat Dec 18 17:35:25 2010 +0900 @@ -0,0 +1,119 @@ +package rep.channel; + +import java.io.IOException; +import java.nio.channels.SelectableChannel; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; + +public class SelectionKeySimulator<P> extends REPSelectionKey<P>{ + + private int interestOpt; + private SelectableChannel channel; + private int ready; + public Selector selector; + public boolean canceled = false; + + public SelectionKeySimulator(SelectableChannel cs, int opt, Selector _selector) { + channel = cs; + interestOpt = opt; + selector = _selector; + } + public SelectionKeySimulator(SelectionKey k) { + channel = k.channel(); + interestOpt = k.interestOps(); + selector = k.selector(); + attach(k.attachment()); + } + + + public boolean isAble() { + if ( (interestOpt&OP_READ)!=0 && isReadable() ) + return true; + else if( (interestOpt&OP_ACCEPT)!=0 && isAcceptable() ) + return true; + else if( (interestOpt&OP_WRITE)!=0 && isWritable() ) + return true; + else + return false; + } + + public void setFlag() { + ChannelSimulator<?> scs = (ChannelSimulator<?>) channel; + ready = 0; + if(scs.isAcceptable()) ready |= OP_ACCEPT; + if(scs.isReadable()) ready |= OP_READ; + if(scs.isWritable()) ready |= OP_WRITE; + } + + public SelectableChannel channel() { + return channel; + } + + @SuppressWarnings("unchecked") + @Override + public REPSocketChannel<P> channel1() { + return (REPSocketChannel<P>)channel; + } + + @SuppressWarnings("unchecked") + @Override + public REPServerSocketChannel<P> serverSocketChannel() { + return (REPServerSocketChannel<P>)channel; + } + + public SelectableChannel channel(REPPack<P> packer) { + return channel; + } + + @SuppressWarnings("unchecked") + public REPSocketChannel<P> accept(REPPack<P> pack) throws IOException { + assert(channel instanceof ServerChannelSimulator); + ServerChannelSimulator<P> scs = (ServerChannelSimulator<P>) channel; + return scs.accept1(); + } + + @Override + public void cancel() { + canceled = true; + SelectorSimulator<?> s = (SelectorSimulator<?>)selector; + s.deregister(channel); + } + + @Override + public int interestOps() { + return interestOpt; + } + + @Override + public SelectionKey interestOps(int ops) { + interestOpt = ops; + return this; + } + + @Override + public boolean isValid() { + return (!canceled) && channel.isOpen() && selector.isOpen(); + } + + + @Override + public Selector selector() { + return selector; + } + + @Override + public int readyOps() { + int ops=0; + if ( channel instanceof ServerChannelSimulator<?> ){ + ServerChannelSimulator<?> scs = (ServerChannelSimulator<?>) channel; + ops = ( OP_ACCEPT * (scs.isAcceptable()? 1:0) ); + } else if ( channel instanceof ChannelSimulator<?> ){ + ChannelSimulator<?> scs = (ChannelSimulator<?>) channel; + ops = ( OP_READ * (scs.isReadable()? 1:0) ) + | ( OP_WRITE * (scs.isWritable()? 1:0) ); + // (OP_READ & true) がつかえないらしい. + } + return ops; + } + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/rep/channel/SelectorSimulator.java Sat Dec 18 17:35:25 2010 +0900 @@ -0,0 +1,159 @@ +package rep.channel; + +import java.io.IOException; +import java.nio.channels.SelectableChannel; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.spi.SelectorProvider; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +public class SelectorSimulator<P> extends REPSelector<P>{ + + // This selector cannot be shared among threads. + + private Map<SelectableChannel, SelectionKeySimulator<P>> keyList; + private Set<SelectionKeySimulator<P>> selectedKeys; + private boolean isOpen=true; + + public SelectorSimulator() { + super(null); + keyList = new HashMap<SelectableChannel,SelectionKeySimulator<P>>(); + } + + public int select() throws IOException { + getSelectedKeys(); + if(selectedKeys.isEmpty()) { + try { + synchronized(this) { + wait(); + } + } catch (InterruptedException e) { + throw new IOException(); + } + } + return selectedKeys.size(); + } + + @Override + public int select(long timeout) throws IOException { + getSelectedKeys(); + if(selectedKeys.isEmpty()) { + try { + synchronized(this) { + wait(timeout); + } + // we cannot know if we time outed or not + getSelectedKeys(); + } catch (InterruptedException e) { + throw new IOException(); + } + } + return selectedKeys.size(); + } + + private void getSelectedKeys() { + selectedKeys = new HashSet<SelectionKeySimulator<P>>(); + for(SelectionKeySimulator<P> key : keyList.values()){ + if(key.isAble()) + selectedKeys.add(new SelectionKeySimulator<P>(key)); + } + } + + @Override + public int selectNow() throws IOException { + getSelectedKeys(); + return selectedKeys.size(); + } + + public SelectionKeySimulator<P> register(SelectableChannel cs, int opt){ + return register(cs, opt, null); + } + public SelectionKeySimulator<P> register(SelectableChannel cs, int opt, Object handler){ + SelectionKeySimulator<P> key = keyList.get(cs); + if (key!=null) { + key.attach(handler); + key.interestOps(opt); + return key; + } + key = new SelectionKeySimulator<P>(cs, opt, this); + key.attach(handler); + keyList.put(cs,key); + return key; + } + + public SelectionKeySimulator<P> deregister(SelectableChannel channel) { + SelectionKeySimulator<P> key = keyList.remove(channel); + return key; + } + + + public SelectionKey getKey(SelectableChannel channel){ + return keyList.get(channel); + } + + @Override + public void close() throws IOException { + isOpen = false; + } + + @Override + public boolean isOpen() { + return isOpen; + } + + @Override + public Set<SelectionKey> keys() { + Set<SelectionKey> newKeys = new HashSet<SelectionKey>(); + for(SelectionKey k: keyList.values()) { + newKeys.add(k); + } + return newKeys; + } + + public Set<REPSelectionKey<P>> keys1() { + // we cannot solve cast, we need the same method again with different types + Set<REPSelectionKey<P>> newKeys = new HashSet<REPSelectionKey<P>>(); + for(SelectionKeySimulator<P> k: keyList.values()) { + newKeys.add(k); + } + return newKeys; + } + + @Override + public SelectorProvider provider() { + // should return NetworkSimulator? + return null; + } + + + @Override + public synchronized Selector wakeup() { + notifyAll(); + return this; + } + + + public Set<REPSelectionKey<P>> selectedKeys1() { + Set<REPSelectionKey<P>> newKeys = new HashSet<REPSelectionKey<P>>(); + for(SelectionKeySimulator<P> k: selectedKeys) { + newKeys.add(k); + } + return newKeys; + } + + /* + * type safe copy of selectedKeys1() + */ + @Override + public Set<SelectionKey> selectedKeys() { + Set<SelectionKey> newKeys = new HashSet<SelectionKey>(); + for(SelectionKeySimulator<P> k: selectedKeys) { + newKeys.add(k); + } + return newKeys; + } + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/rep/channel/ServerChannelSimulator.java Sat Dec 18 17:35:25 2010 +0900 @@ -0,0 +1,148 @@ +package rep.channel; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.ServerSocket; +import java.nio.channels.ClosedChannelException; +import java.nio.channels.SelectableChannel; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.spi.SelectorProvider; +import java.util.LinkedList; +import java.util.Queue; + +/* シミュレーションの際にコンストラクトされる REPServerSocketChannel の実装 */ +public class ServerChannelSimulator<P>extends REPServerSocketChannel<P> { + protected NetworkSimulator ns; + //public REPServerSocket<REPSocketChannel<P>> socket; + protected InetSocketAddress IP; + protected Queue<ChannelSimulator<P>> acceptQ; + protected Selector selector; + protected boolean isBlocking; + private SelectionKeySimulator<P> key; + + /** Constructors. + * @throws IOException */ + public ServerChannelSimulator() throws IOException { + ns = NetworkSimulator.singleton(); + selector = new NullSelector(); // new Object(); + acceptQ = new LinkedList<ChannelSimulator<P>>(); + } + + public void bind(InetSocketAddress ip){ + IP = ip; + } + + public synchronized REPSocketChannel<P> accept1() throws IOException { + ChannelSimulator<P> channel; + while ( (channel=acceptQ.poll())==null && isBlocking ) { + try { + wait(); + } catch (InterruptedException e) { + throw new IOException(); + } + } + return channel; + } + + protected boolean enQ(ChannelSimulator<?> ch){ + // Don't lock a selector from a locked channel, the selector may + // use channel.isAble() which locks the channel. + @SuppressWarnings("unchecked") + ChannelSimulator<P>ch1 = (ChannelSimulator<P>)ch; + synchronized(this) { + acceptQ.offer(ch1); + notify(); + } + selector.wakeup(); + return true; + } +// +// @SuppressWarnings("unchecked") +// public void enQ(ChannelSimulator<?> hserver) { +// // NetworkSimulator doesn't know P +// ChannelSimulator<P>ch = (ChannelSimulator<P>) hserver; +// enQ(ch); +// } + + public ServerSocket socket() { + try { + return new REPServerSocket(this); + } catch (IOException e) { + e.printStackTrace(); + return null; + } + } + + + + @SuppressWarnings("unchecked") + public SelectionKey register(REPSelector<P> sel, int ops, Object att) throws ClosedChannelException { + selector = sel; + REPSelector<P> selector1 = sel; + ns.listen(IP, this); + key = (SelectionKeySimulator<P>) selector1.register(this, ops, att); + return key; + } + @SuppressWarnings("unchecked") + @Override + public SelectionKey register(Selector sel, int ops, Object att) throws ClosedChannelException { + selector = sel; + REPSelector<P> selector1 = (REPSelector<P>)sel; + ns.listen(IP, this); // bindに移動してもいいよ + key = (SelectionKeySimulator<P>) selector1.register(this, ops, att); + return key; + } + + public synchronized boolean isAcceptable() { + return !acceptQ.isEmpty(); + } + + @Override + public Object blockingLock() { + return selector; + } + + public SelectableChannel configureBlocking(boolean block) throws IOException { + isBlocking = block; + return this; + } + + @Override + public boolean isBlocking() { + return isBlocking; + } + + @Override + public boolean isRegistered() { + // TODO Auto-generated method stub + return false; + } + + @Override + public SelectionKey keyFor(Selector sel) { + // TODO Auto-generated method stub + return null; + } + + @Override + public SelectorProvider provider() { + // TODO Auto-generated method stub + return null; + } + + + @Override + public int validOps() { + // TODO Auto-generated method stub + return 0; + } + + @Override + protected void implCloseChannel() throws IOException { + // TODO Auto-generated method stub + + } + + +}