Mercurial > hg > RemoteEditor > Eclipse
changeset 84:5ac3df98f780
merger simulator using BlockingQ
author | kent |
---|---|
date | Mon, 12 Nov 2007 17:57:38 +0900 |
parents | 3db21fae825a |
children | 18ae3b9fe57a |
files | src/pathfinder/BlockingQ/ChannelSimulator.java src/pathfinder/BlockingQ/EditorSimulator.java src/pathfinder/BlockingQ/NetworkSimulator.java src/pathfinder/BlockingQ/SeMaSimulator.java src/pathfinder/BlockingQ/TestMerger.java |
diffstat | 5 files changed, 426 insertions(+), 0 deletions(-) [+] |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/pathfinder/BlockingQ/ChannelSimulator.java Mon Nov 12 17:57:38 2007 +0900 @@ -0,0 +1,39 @@ +package pathfinder.BlockingQ; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +public class ChannelSimulator<P> { + private BlockingQueue<P> q0; + private BlockingQueue<P> q1; + + private NetworkSimulator<P> ns; + + public ChannelSimulator(NetworkSimulator<P> _ns){ + this(new LinkedBlockingQueue<P>(), new LinkedBlockingQueue<P>(), _ns); + } + public ChannelSimulator(BlockingQueue<P> _a, BlockingQueue<P> _b, NetworkSimulator<P> _ns){ + q0 = _a; + q1 = _b; + ns = _ns; + } + + public P read() throws InterruptedException { + return q0.take(); + } + + public void write(P p) throws InterruptedException{ + synchronized (ns){ + q1.put(p); + ns.notifyAll(); + } + } + + public ChannelSimulator<P> getServerChannel() { + return new ChannelSimulator<P>(q1, q0,ns); + } + + public P poll() { + return q0.poll(); + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/pathfinder/BlockingQ/EditorSimulator.java Mon Nov 12 17:57:38 2007 +0900 @@ -0,0 +1,105 @@ +package pathfinder.BlockingQ; + +import java.util.Queue; + +import remoteeditor.command.REPCommand; +import remoteeditor.network.REP; +import sample.merge.Translater; + +public class EditorSimulator extends Thread{ + private int eid; + private int seq; + private boolean isOwner; + private NetworkSimulator<REPCommand> ns; + private ChannelSimulator<REPCommand> cs; + private Queue<REPCommand> CmdList; + private Translater translater; + private pathfinder.Text text; + private boolean running=true; + + public EditorSimulator(int _eid, NetworkSimulator<REPCommand> _ns, Queue<REPCommand> q, String _name) { + super(_name); + eid = _eid; + ns = _ns; + CmdList = q; + translater = new Translater(_eid); + text = new pathfinder.Text(); + cs = ns.connect(); + } + + public void setOwner(boolean f){ + isOwner = f; + } + synchronized public void finish(){ + running = false; + } + + public void run(){ + System.out.println("Editor"+eid+" start."); + + // Send All Command that is included CmdList. + try { + sendAllCommand(); + } catch (InterruptedException e1) { + e1.printStackTrace(); + running=false; + } + + // MainLoop, + while(running){ + REPCommand cmd; + try { + cmd = cs.read(); + } catch (InterruptedException e) { + e.printStackTrace(); + continue; + } + + //終了条件 + if (checkQuit(cmd)){ + System.out.println("\tEditor"+eid+" catch QUIT command emited by itself."); + translater.transReceiveCmd(cmd); + running=false; break; + } + System.out.println("\tEditor"+eid+" catch command:>> "+cmd.toString()); + cmd = translater.transReceiveCmd(cmd); + if (isOwner&&cmd!=null) cmd.setThroughMaster(true); + if (cmd==null) continue; + + text.edit(cmd); + try { + cs.write(cmd); + } catch (InterruptedException e) { + e.printStackTrace(); + continue; + } + } + + System.out.println("Editor"+eid+" finish."); + } + + private void sendAllCommand() throws InterruptedException { + for (REPCommand cmd: CmdList){ + cmd.seq = seq; + cmd.eid = eid; + cmd.setString("this is inserted or replaced by Editor"+eid+":"+seq); + cmd = translater.transSendCmd(cmd); + if (isOwner) cmd.setThroughMaster(true); + text.edit(cmd); + cs.write(cmd); + seq++; + } + + // Send Quit Command + cs.write( translater.transSendCmd( new REPCommand(REP.SMCMD_QUIT, 0, eid, seq++, 0, 0, "QUIT by Editor"+eid))); + } + + private boolean checkQuit(REPCommand cmd) { + // 最初に全部のコマンドを送信するから、自分のQUITが来るのは最後 + return (cmd.eid==eid && cmd.cmd==REP.SMCMD_QUIT); + } + + public pathfinder.Text getText(){ + return text; + } +} \ No newline at end of file
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/pathfinder/BlockingQ/NetworkSimulator.java Mon Nov 12 17:57:38 2007 +0900 @@ -0,0 +1,40 @@ +package pathfinder.BlockingQ; + +import java.util.LinkedList; +import java.util.Queue; + +public class NetworkSimulator<P> { + + /** Waiting connectionRequest to be accepted by SessionManager. */ + private Queue<ChannelSimulator<P>> acceptList; + /** Established connection */ + private Queue<ChannelSimulator<P>> connectedList; + + public NetworkSimulator(){ + acceptList = new LinkedList<ChannelSimulator<P>>(); + connectedList = new LinkedList<ChannelSimulator<P>>(); + } + + /** + * Establish connection. It's called by SeMa. + * @return + */ + public ChannelSimulator<P> accept(){ + ChannelSimulator<P> cs = acceptList.poll(); + if (cs==null) return null; + + connectedList.offer(cs); + return cs.getServerChannel(); + } + + /** + * Request to connect. + * Client editor use this method to connect SeMa. + * @param cs + */ + public ChannelSimulator<P> connect(){ + ChannelSimulator<P> cs = new ChannelSimulator<P>(this); + acceptList.offer(cs); + return cs; + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/pathfinder/BlockingQ/SeMaSimulator.java Mon Nov 12 17:57:38 2007 +0900 @@ -0,0 +1,93 @@ +package pathfinder.BlockingQ; + +import java.util.ArrayList; +import java.util.List; + +public class SeMaSimulator<P> extends Thread { + private int MAX_PACKET; + private int MAX_CLIENT; + private boolean running=true; + private NetworkSimulator<P> ns; + private List<ChannelSimulator<P>> csList; + + public SeMaSimulator(NetworkSimulator<P> _ns, int max_client, int max_packet){ + ns = _ns; + MAX_CLIENT = max_client; + MAX_PACKET = max_packet; + csList = new ArrayList<ChannelSimulator<P>>(); + } + public SeMaSimulator(NetworkSimulator<P> _ns){ + this(_ns, 2, 0); + } + + synchronized public void finish(){ + synchronized(ns){ + running = false; + ns.notify(); + } + } + + /** + * Check whether the NetworkSimulator hold waiting connections. + */ + private void checkAccept(){ + ChannelSimulator<P> cs; + while((cs=ns.accept())!=null){ + csList.add(cs); + } + } + + public void run(){ + int i=0; + int count=0; + P packet; + + while(csList.size()<MAX_CLIENT){ checkAccept(); Thread.yield(); } + System.out.println("SessionManager start."); + + /* Main Loop */ + ChannelSimulator<P> cs = csList.get(i); + while(running + && (MAX_PACKET==0 || count<MAX_PACKET)){ + synchronized(ns){ + int prev_i=i; + while((packet=cs.poll())==null && running){ + i = (i+1)%csList.size(); // i++ + cs = csList.get(i); // 次のChennelをゲット + if(i==prev_i) try { ns.wait(); } catch (InterruptedException e) { e.printStackTrace(); } + } + } + if(!running) break; + + System.out.println("SeMa pass packet to "+i+":>> "+packet.toString()); + i = (i+1)%csList.size(); // i++ + cs = csList.get(i); // 次のChennelをゲット + + // readできていたならそれを書き込む + try { + cs.write(packet); + } catch (InterruptedException e) { + System.out.println("SeMa cannot write!!"); + e.printStackTrace(); + } + count++; + } +/* + ChannelSimulator<P> cs = csList.get(i); + while(running + && MAX_PACKET==0 || count<MAX_PACKET){ + packet=cs.poll(); // [i]からread + //if(packet!=null) System.out.println("SeMa catch packet to "+i+":>> "+packet.toString()); + i = (i+1)%csList.size(); // i++ + cs = csList.get(i); // 次のChennelをゲット + if (packet!=null) { + System.out.println("SeMa pass packet to "+i+":>> "+packet.toString()); + cs.write(packet); // readできていたならそれを書き込む + count++; + } + //if (i==0) checkAccept(); //全部回ったらaccept待ちをチェック + } +*/ + System.out.println("SessionManager finish."); + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/pathfinder/BlockingQ/TestMerger.java Mon Nov 12 17:57:38 2007 +0900 @@ -0,0 +1,149 @@ +package pathfinder.BlockingQ; + +import java.util.LinkedList; +import remoteeditor.command.REPCommand; +import remoteeditor.network.REP; + +public class TestMerger { + static public int cmdNO[] = { REP.REPCMD_INSERT, REP.REPCMD_REPLACE, REP.REPCMD_DELETE }; + private int N_editor; + + private NetworkSimulator<REPCommand> ns; + private SeMaSimulator<REPCommand> sm; + private LinkedList<EditorSimulator> editors; + private int N_packet; + + public TestMerger(int editor, int packet){ + N_editor = editor; + N_packet = packet; + ns = new NetworkSimulator<REPCommand>(); + sm = new SeMaSimulator<REPCommand>(ns, N_editor, 0); + editors = new LinkedList<EditorSimulator>(); + } + + public static void main(String[] args){ + TestMerger tm; + tm = new TestMerger(2, 3); + //tm.init(); + //tm.test2cmd(); + tm.test1cmd(); + //tm.test0cmd(); + tm.startTest(); + + //tm.printAllTexts(); + //if (!tm.checkCS()) + // System.out.println("Error!! :some ChannelSimulator still have packet!"); + if (!tm.checkEquality()) + System.out.println("Error!! :all Editor's text is NOT mutch!"); + + } + + /* + private void init(){ + for (int i=0; i<N_editor; i++){ + int j; + LinkedList<REPCommand> cmds = new LinkedList<REPCommand>(); + // 各エディタが送信するコマンド列を生成 + for(j=0; j<N_packet; j++) { + REPCommand cmd = new REPCommand(cmdNO[Verify.random(2)], + 0, i, j, + 10, //Verify.random(text.size()-1), //size-1? + 0, null); + cmds.add( cmd); + } + EditorEmulator2 ee = new EditorEmulator2(i, ns, cmds, "Editor"+i); + editors.add(ee); + } + } + */ + private void test2cmd(){ + for (int i=0; i<N_editor; i++){ + int j=0; + LinkedList<REPCommand> cmds = new LinkedList<REPCommand>(); + // 各エディタが送信するコマンド列を生成 + + String str = "created by Editor"+i+":"+j; + REPCommand cmd = new REPCommand(REP.REPCMD_INSERT, + 0, i, j++, + 10, //Verify.random(text.size()-1), //size-1? + str.length(), str); + cmds.add( cmd); + str = "created by Editor"+i+":"+j; + cmd = new REPCommand(REP.REPCMD_INSERT, + 0, i, j++, + 10, //Verify.random(text.size()-1), //size-1? + str.length(), str); + cmds.add( cmd); + + EditorSimulator ee = new EditorSimulator(i, ns, cmds, "Editor"+i); + if(i==0) ee.setOwner(true); + editors.add(ee); + } + } + + + private void test1cmd(){ + for (int i=0; i<N_editor; i++){ + int j=0; + LinkedList<REPCommand> cmds = new LinkedList<REPCommand>(); + //各エディタが送信するコマンド列を生成 + String str = "Editor"+i+":"+j; + REPCommand cmd = new REPCommand(REP.REPCMD_INSERT, + 0, i, j++, + 10, //Verify.random(text.size()-1), //size-1? + str.length(), str); + cmds.add( cmd); + EditorSimulator ee = new EditorSimulator(i, ns, cmds, "Editor"+i); + if(i==0) ee.setOwner(true); + editors.add(ee); + } + } + + private void test0cmd(){ + for (int i=0; i<N_editor; i++){ + int j=0; + LinkedList<REPCommand> cmds = new LinkedList<REPCommand>(); + //各エディタが送信するコマンド列を生成 + EditorSimulator ee = new EditorSimulator(i, ns, cmds, "Editor"+i); + if(i==0) ee.setOwner(true); + editors.add(ee); + } + } + + private void startTest() { + for (EditorSimulator ee: editors){ + ee.start(); + } + sm.start(); + + try { + for (EditorSimulator ee: editors){ + //ee.finish(); + ee.join(); + } + sm.finish(); + sm.join(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + private void printAllTexts(){ + for(EditorSimulator ee: editors){ + System.out.println("--"+ee.getName()+"------------------------"); + ee.getText().printAllText(); + } + } + + private boolean checkEquality(){ + /* + Text ee0 = editors.remove().getText(); + return editors.remove().getText().equals(ee0); + */ + pathfinder.Text text0 = editors.element().getText(); + for(EditorSimulator ee: editors){ + if (!text0.equals(ee.getText())) return false; + } + return true; + } +}