changeset 450:21cb16b7f3df

block message in Editor.write()
author one
date Thu, 23 Sep 2010 18:15:37 +0900
parents 89a326696c54
children fa7d9ec2008e
files Todo rep/PacketSet.java rep/ServerMainLoop.java rep/handler/Editor.java rep/handler/FirstConnector.java rep/handler/Forwarder.java rep/handler/REPNode.java rep/handler/Translator.java test/mergertest/EditorSimulator.java test/mergertest/EditorSimulatorImpl.java test/mergertest/TestMerger.java
diffstat 11 files changed, 112 insertions(+), 43 deletions(-) [+]
line wrap: on
line diff
--- a/Todo	Wed Sep 22 22:11:58 2010 +0900
+++ b/Todo	Thu Sep 23 18:15:37 2010 +0900
@@ -1,3 +1,45 @@
+Thu Sep 23 14:57:57 JST 2010
+
+やっぱり、send が
+
+    Editor Object から Editor へのsend
+    他の Editor Object から Editor へのsend
+
+の二つに使われているのはダメだよ。片方をブロックしたい時があるのだから。
+
+sendNext で分割してブロックはできた。問題は、途中で送られたものをどう
+処理するかだが〜
+
+うーん、すでにSession Manager の送信キューに入っているので、
+blocking が効かないようだ。
+
+そういうわけなので、受け側でなんとかした方が良いみたい。
+可能なの? いや、無理だろうな。
+
+    REPNode.send  他のところからの送信
+    REPNode.write Serverの送信ループ
+
+なので、Editor.write() で捕まえるか。
+
+Thu Sep 23 12:13:19 JST 2010
+
+START_MERGE から START_MERGE_ACK までにEditorから送られたコマンドは、
+sentList に付け加えるべきでは?
+
+しかし、その間、外部からEditorに送るコマンドは止める必要がある。
+
+    Editor.merging     ...  START_MERGE ... START_MERGE_ACK ... END_MERGE
+    Translaotr.merging ...  START_MERGE_ACK ... END_MERGE
+
+と言うように区別するか。
+
+START_MERGE は、 Editor から返って来るタイミングでブロックするので、
+その段階で、Editor へ送られているコマンドをブロックできない。
+もちろん、Editor からのUSER_INPUTもブロックできない。
+
+Editorの undoが正しくなくなるだけでなく、
+Merge phase  のコマンドが二つ続けて送られるのはよろしくない。
+
 Wed Sep 22 19:59:26 JST 2010
 
 NOPを廻す方式とAckを廻す方式は、
--- a/rep/PacketSet.java	Wed Sep 22 22:11:58 2010 +0900
+++ b/rep/PacketSet.java	Thu Sep 23 18:15:37 2010 +0900
@@ -1,26 +1,23 @@
 package rep;
 
-import rep.channel.REPSocketChannel;
 import rep.handler.REPNode;
 
 public class PacketSet {
 
-	public REPSocketChannel<REPCommand> channel;
-	public REPNode editor;
+	public REPNode channel;
 	public REPCommand command;
 
-	public PacketSet(REPSocketChannel<REPCommand> channel, REPNode editor, REPCommand command) {
-		this.channel = channel;
-		this.editor = editor;
+	public PacketSet(REPNode editor, REPCommand command) {
+		this.channel = editor;
 		this.command = command;
 	}
 
 	public REPNode getEditor() {
-		return editor;
+		return channel;
 	}
 	
 	public String toString() {
-		return "PacketSet("+command.toString()+","+editor+")";
+		return "PacketSet("+command.toString()+","+channel+")";
 	}
 
 }
--- a/rep/ServerMainLoop.java	Wed Sep 22 22:11:58 2010 +0900
+++ b/rep/ServerMainLoop.java	Thu Sep 23 18:15:37 2010 +0900
@@ -128,7 +128,7 @@
 	private void sendLog(PacketSet p) {
 		REPNode to;
 		String s;
-		to = manager.editorList.editorByChannel(p.channel);
+		to = manager.editorList.editorByChannel(p.channel.channel);
 		if (to==null)
 			s = p.channel.toString();
 		else
--- a/rep/handler/Editor.java	Wed Sep 22 22:11:58 2010 +0900
+++ b/rep/handler/Editor.java	Thu Sep 23 18:15:37 2010 +0900
@@ -165,10 +165,10 @@
 	private boolean waitingRequired(REPCommand command, REPSocketChannel<REPCommand> channel) {
 		if (hasWaitingCommand()) {
 			// We cannot do this operation before watingCommandQueue.
-			addWaitingCommand(new PacketSet(channel, this, new REPCommand(command)));
+			addWaitingCommand(new PacketSet(this, new REPCommand(command)));
 			return true;
 		} else if (isMerging()) { 
-			addWaitingCommand(new PacketSet(channel, this, new REPCommand(command)));
+			addWaitingCommand(new PacketSet(this, new REPCommand(command)));
 			return true;
 		} 
 		//ServerMainLoop.logger.writeLog("Editor eid:"+eid+" no waiting");
@@ -231,7 +231,7 @@
 	}
 
 	private boolean checkAck(REPCommand command) {
-		assert(!merging);
+		assert(!isMerging());
 		REPCommand prev;
 		if (sentList.getFirst()==mergeMark) prev=sentList.remove(1); else prev=sentList.remove(0);		
 		if (prev==null || prev.seq != command.seq || prev.eid!=command.eid) {
@@ -256,7 +256,7 @@
 		// START_MERGE を送る
 		//    送らないで良い場合もある?
 		REPCommand cmd = new REPCommand(REP.SMCMD_START_MERGE,command.sid,REP.SM_EID.id,seq(),0,"");
-		send(cmd);
+		sendToEditor(cmd);
 		merging = true;
 		// Session Manager 側で、このeditorへの他のeditorからの
 		// 入力を止めて、merge にそなえる。merge は、eidtor 側から
@@ -301,7 +301,7 @@
 	private void endMerge() {
 		translator.endMerge();
 		REPCommand mergeEnd = new REPCommand(REP.SMCMD_END_MERGE,sid,eid,seq(),0,"");
-		send(mergeEnd);
+		sendToEditor(mergeEnd);
 		if (preMergeCommand.eid==eid) {
 			// Ackの場合はcheckAck() で既にremoveされている
 			if (sentList.getFirst()==mergeMark) sentList.remove(1); else sentList.remove(0);		
@@ -326,7 +326,7 @@
 
 	private boolean checkQuit() {
 		if (quit2!=null && sentList.size()==1&&!isMerging()) {
-			send(quit2);
+			sendToEditor(quit2);
 			quit2 = null;
 			return true;
 		}
@@ -364,7 +364,7 @@
 		
 		case SMCMD_SYNC:
 			if (isMaster()) 
-				send(command);
+				sendToEditor(command);
 			else
 				next.send(command);
 			
@@ -403,18 +403,33 @@
 	}
 	
 	/**
-	 * send command to the editor
+	 * write command to the editor
 	 *   called from another Editor instance such as next.send(command)
 	 */
 	@Override
-	public void send(REPCommand command) {
-		if (command.eid == REP.MERGE_EID.id || 
-				command.cmd==REP.SMCMD_END_MERGE ||
-				!waitingRequired(command,channel)) {
-			super.send(command);
+	public void write(REPCommand command) {
+		if (	!waitingRequired(command,channel)) {
+			if (isMergeCommand(command)) {
+				merging = true;
+			}
+			super.write(command);
 		}
 	}
 	
+	private boolean isMergeCommand(REPCommand command) {
+		switch(command.cmd) {
+		case REPCMD_INSERT: case REPCMD_DELETE:
+			return command.eid==eid;
+		case REPCMD_INSERT_ACK: case REPCMD_DELETE_ACK:
+			return command.eid!=eid;
+		}
+		return false;
+	}
+	
+	public void sendToEditor(REPCommand command) {
+		super.write(command);
+	}
+
 	@Override
 	public void handle(REPCommand command, REPSelectionKey<REPCommand> key) throws IOException {
 		//ServerMainLoop.logger.writeLog("Manager "+manager.getId()+" read : command = " + command 
@@ -476,12 +491,12 @@
 			PacketSet p = w.remove(0);
 			try {
 				if (p.channel!=null)
-					send(p.command);
+					write(p.command);
 				else
 					manage(p.command);
 			} catch (Exception e1) {
 				assert false;
-				manager.close(p.channel);
+				manager.close(p.channel.channel);
 				return;
 			}
 		}
--- a/rep/handler/FirstConnector.java	Wed Sep 22 22:11:58 2010 +0900
+++ b/rep/handler/FirstConnector.java	Thu Sep 23 18:15:37 2010 +0900
@@ -98,6 +98,6 @@
 		assert(command!=null && command.cmd==REP.SMCMD_SM_JOIN);
 		assert(channel!=null);
 		REPCommand c = new REPCommand(command);
-		manager.addWriteQueue(new PacketSet(channel,null,  c));
+		manager.addWriteQueue(new PacketSet(this,  c));
 	}
 }
--- a/rep/handler/Forwarder.java	Wed Sep 22 22:11:58 2010 +0900
+++ b/rep/handler/Forwarder.java	Thu Sep 23 18:15:37 2010 +0900
@@ -42,7 +42,7 @@
 		assert(command!=null);
 		assert(channel!=null);
 		REPCommand c = new REPCommand(command);
-		manager.addWriteQueue(new PacketSet(channel,null,  c));
+		manager.addWriteQueue(new PacketSet(this, c));
 	}
 	
 	public void sendWithSeq(REPCommand command) {
@@ -50,7 +50,7 @@
 		assert(channel!=null);
 		REPCommand c = new REPCommand(command);
 		c.setSEQID(seq());
-		manager.addWriteQueue(new PacketSet(channel,null,  c));
+		manager.addWriteQueue(new PacketSet(this, c));
 	}
 	
 	public REPSocketChannel<REPCommand> getChannel() {
@@ -151,6 +151,11 @@
 		return null;
 	}
 
+	@Override
+	public void write(REPCommand command) {
+		channel.write(command);		
+	}
+
 	
 	
 	
--- a/rep/handler/REPNode.java	Wed Sep 22 22:11:58 2010 +0900
+++ b/rep/handler/REPNode.java	Thu Sep 23 18:15:37 2010 +0900
@@ -110,8 +110,17 @@
 
 	public abstract String getLocalHostName();
 
-
+	/**
+	 * Send to the next REPNode (with possible blocking)
+	 * @param command
+	 */
 	public abstract void send(REPCommand command) ;
+	
+	/**
+	 * write command to the socket channel
+	 * @param command
+	 */
+	public abstract void write(REPCommand command) ;
 
 	public abstract void sendWithSeq(REPCommand command) ;
 
@@ -119,9 +128,6 @@
 
 	public abstract int seq() ;
 
-	public abstract boolean isMerging() ;
-
-
 	public abstract boolean manage(REPCommand command) ;
 
 
@@ -163,6 +169,10 @@
 
 	public abstract List<REPCommand> getSentList() ;
 
+	public void sendToEditor(REPCommand m) {
+		send(m);		
+	}
+
 		
 	
 	
--- a/rep/handler/Translator.java	Wed Sep 22 22:11:58 2010 +0900
+++ b/rep/handler/Translator.java	Thu Sep 23 18:15:37 2010 +0900
@@ -116,7 +116,7 @@
 			m.setEID(REP.MERGE_EID.id);
 			m.setSEQID(editor.seq());
 			sentMergedList.addLast(m);
-			editor.send(m);
+			editor.sendToEditor(m);
 		}
 		logger.writeLog("OptimizedOutputMerge"+eid+":"+sentMergedList);
 		merge_mode = true;
--- a/test/mergertest/EditorSimulator.java	Wed Sep 22 22:11:58 2010 +0900
+++ b/test/mergertest/EditorSimulator.java	Thu Sep 23 18:15:37 2010 +0900
@@ -41,11 +41,6 @@
 
 	}
 
-	@Override
-	public boolean isMerging() {
-		// TODO Auto-generated method stub
-		return false;
-	}
 
 	@Override
 	public void joinAck(REPCommand sendCommand, int sid) {
--- a/test/mergertest/EditorSimulatorImpl.java	Wed Sep 22 22:11:58 2010 +0900
+++ b/test/mergertest/EditorSimulatorImpl.java	Thu Sep 23 18:15:37 2010 +0900
@@ -28,4 +28,9 @@
 		return null;
 	}
 
+	@Override
+	public void write(REPCommand command) {
+		
+	}
+
 }
--- a/test/mergertest/TestMerger.java	Wed Sep 22 22:11:58 2010 +0900
+++ b/test/mergertest/TestMerger.java	Thu Sep 23 18:15:37 2010 +0900
@@ -87,12 +87,6 @@
 	}
 
 	@Override
-	public boolean isMerging() {
-		// TODO Auto-generated method stub
-		return false;
-	}
-
-	@Override
 	public void joinAck(REPCommand sendCommand, int sid) {
 		// TODO Auto-generated method stub
 		
@@ -138,4 +132,10 @@
 		// TODO Auto-generated method stub
 		return null;
 	}
+
+	@Override
+	public void write(REPCommand command) {
+		// TODO Auto-generated method stub
+		
+	}
 }