diff src/fdl/TupleSpace.java @ 19:0243987383b7

Meta Protocol Engine and sample implementation of event logger. ComDebug_Client needs fixes.
author kono
date Tue, 19 Aug 2008 05:33:32 +0900
parents 609b288f47f9
children b4fd7fb9135a
line wrap: on
line diff
--- a/src/fdl/TupleSpace.java	Mon Aug 18 09:36:13 2008 +0900
+++ b/src/fdl/TupleSpace.java	Tue Aug 19 05:33:32 2008 +0900
@@ -2,56 +2,75 @@
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.nio.CharBuffer;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.SocketChannel;
-import java.nio.charset.CharacterCodingException;
-import java.nio.charset.Charset;
-import java.nio.charset.CharsetDecoder;
 
 public class TupleSpace {
     static final boolean debug = true;
     static final int CAPSIZE = 4194304;
+	public static int user = 0;
+	public byte userchar[] = new byte[2];
     public Tuple[] tuple_space;
+    public IOHandlerHook hook = new NullIOHandlerHook();
     
-	public TupleSpace(Tuple[] _tuple_space) {
-		super();
+	public TupleSpace() {
 		// 読みこんだデータを格納するためのリストの初期化
-        tuple_space = _tuple_space;      
+        tuple_space = new Tuple[TupleHandler.MAX_TUPLE];      
 	}
     
-	public TupleSpace() {
-		// TODO Auto-generated constructor stub
+
+	public void newUser() {
+		Tuple tmpTuple;
+		//初期生成        
+        if((tmpTuple = tuple_space[TupleHandler.MAX_TUPLE-1]) == null) {
+        	tmpTuple = tuple_space[TupleHandler.MAX_TUPLE-1] = new Tuple();
+        	tmpTuple.next = null;
+        } else {
+        	while(tmpTuple.next != null) tmpTuple = tmpTuple.next;
+        	tmpTuple.next = new Tuple();
+        	tmpTuple = tmpTuple.next;
+        	tmpTuple.next = null;
+        }
+        
+        user++;
+        
+        ByteBuffer data = ByteBuffer.allocate(2);
+        data.clear();
+        userchar[0] = (byte) (user/10 + '0');
+        userchar[1] = (byte) (user%10 + '0');
+
+        data.put(userchar[0]);
+        data.put(userchar[1]);
+        
+        data.rewind();
+        tmpTuple.setData(data);
+        //Tuple
+        int id = TupleHandler.MAX_TUPLE-1;
+        tmpTuple.setTuple('o', id, 0, data.limit(), data);
+        System.out.println("Server: assign id "+user);
 	}
-
-	protected String Out(ByteBuffer command, ByteBuffer data) throws IOException {
+	
+	protected void Out(SelectionKey key,ByteBuffer command, ByteBuffer data) {
 		Tuple tuple;
 		int id;
 		int datasize;
 		char idc = (char)command.getShort(PSX.LINDA_ID_OFFSET);
 		command.rewind();
 		id = (int)idc;
-		String sendtext = "none";
-		
+
 		datasize = command.getInt(PSX.LINDA_DATA_LENGTH_OFFSET);
 		command.rewind();
 		
 		System.out.println("*** out command : id = "+id +" ***");
+		int seq = command.getInt(PSX.LINDA_SEQ_OFFSET);
+		command.rewind();
+		hook.outHook(key,id,seq,'o',data);    
 		
 		while((tuple_space[id] != null) &&
 				((tuple_space[id].mode == PSX.PSX_WAIT_RD)||(tuple_space[id].mode == PSX.PSX_RD))) {
 			PSX.setAnserCommand(command, tuple_space[id].getSeq());
-			//if(debug){
-				//int sendsize = tmpTuple.getdataLength()+PSX.LINDA_HEADER_SIZE;
-				//System.out.println("send size "+sendsize+" : mode = "+(char)mode);
-			//}
-			//ByteBuffer sendcommand = tmpTuple.getCommand();
-			//ByteBuffer senddata = tmpTuple.getData();
 			send(tuple_space[id].ch, command, data);
 
-			sendtext = getdataString(data);
-
-			
 			removeTuple(id);
 			tuple = null;
 		}
@@ -62,12 +81,7 @@
 				int sendsize = datasize+PSX.LINDA_HEADER_SIZE;
 				System.out.println("send size "+sendsize+" : mode = "+(char)'a');
 			}
-			//ByteBuffer sendcommand = tmpTuple.getCommand();
-			//ByteBuffer senddata = tmpTuple.getData();
 			send(tuple_space[id].ch, command, data);
-
-			sendtext = getdataString(data);
-			
 			removeTuple(id);
 			tuple = null;
 		} else if ((tuple_space[id] == null)|| (tuple_space[id].getMode() == PSX.PSX_OUT)) {
@@ -83,8 +97,6 @@
 			}
 	
 			tuple.setMode('o');
-			int seq = command.getInt(PSX.LINDA_SEQ_OFFSET);
-			command.rewind();
 			tuple.setSeq(seq);
 			tuple.setData(data);
 			tuple.setDataLength(datasize);
@@ -98,7 +110,6 @@
 			data.clear();
 			System.exit(1);
 		}
-		return sendtext;
 	}
 
 	private void removeTuple(int id) {
@@ -124,6 +135,9 @@
 		int seq = command.getInt(PSX.LINDA_SEQ_OFFSET);
 		command.rewind();
 		tuple.setSeq(seq);
+		
+		hook.waitReadHook(key,id,seq,(char)mode);
+		
 		tuple.ch = (SocketChannel) key.channel();
 		tuple.setDataLength(0);
 		ByteBuffer buff = ByteBuffer.allocate(0);
@@ -135,7 +149,7 @@
 		}
 	}
 
-	protected String In_Rd(SelectionKey key, ByteBuffer command, int mode)
+	protected void In_Rd(SelectionKey key, ByteBuffer command, int mode)
 			throws IOException {
 		Tuple tuple = read_in_1(key, command, mode);
 
@@ -145,8 +159,6 @@
 			ByteBuffer senddata = tuple.getData();
 			send(key,sendcommand, senddata);
 		}
-		String sendtext = getdataString(tuple.getData());
-		return sendtext;
 	}
 
 	private Tuple read_in_1(SelectionKey key, ByteBuffer command, int mode) {
@@ -159,10 +171,12 @@
 		char idc = (char)command.getShort(PSX.LINDA_ID_OFFSET);
 		command.rewind();
 		id = (int)idc;
-		
+		int seq = command.getInt(PSX.LINDA_SEQ_OFFSET);
+		command.rewind();
 		
 		System.out.println("*** "+(char)mode+" command : id = "+ id +" ***\n");    		
-		
+		hook.inHook(key,id,seq,(char)mode);
+
 		tuple = tuple_space[id];
 			
 		//wを無視
@@ -172,79 +186,106 @@
 		}
 		
 		if (tuple != null && (tuple.mode == 'o')){
-			//tmpTuple = new Tuple((SocketChannel)key.channel());
-			int seq = command.getInt(PSX.LINDA_SEQ_OFFSET);
-			command.rewind();
-			tuple.setCommand('a', seq);
-			
-			if(debug){
-				int sendsize = tuple.getdataLength()+PSX.LINDA_HEADER_SIZE;
-				System.out.println("send size "+sendsize+" : mode = "+(char)tuple.getMode());
-			}
-				
-
-			
-			//INの場合はremoveする
-			if(mode == PSX.PSX_IN) {
-				if(tuple.data != null){
-					//ByteBuffer buff = ByteBuffer.allocate(0);
-					//tmpTuple.setData(buff);
-					tuple.data = null;
-				}
-				if(temp != null){
-					temp.next = tuple.next;
-				}
-				else {
-					tuple_space[id] = tuple.next;
-				}
-			}
+			tupleIsAvailable(command, mode, tuple, id, temp);
 		} else {
-			if(tuple == null) {
-				//ServerSocketChannel sc = (ServerSocketChannel)key.channel();
-				tuple = tuple_space[id] = new Tuple((SocketChannel)key.channel());
-				tuple.next = null;
-			}else {
-				while(tuple.next !=null) tuple =tuple.next;
-				tuple.next= new Tuple((SocketChannel)key.channel());
-				tuple = tuple.next;
-				tuple.next = null;
-			}
-			
-			tuple.setMode(mode);
-			int seq2 = command.getInt(PSX.LINDA_SEQ_OFFSET);
-			command.rewind();
-			tuple.setSeq(seq2);
-			tuple.ch = (SocketChannel) key.channel();
-			tuple.setDataLength(0);
-			ByteBuffer buff = ByteBuffer.allocate(0);
-			buff.rewind();
-			tuple.setData(buff);
-			tuple = null;
-				
-			if(debug){
-				System.out.println("data inserted insert seq = "+seq2 +", id = "+id);
-			}
+			tuple = setupWait(key, command, mode, tuple, id);
 		}
 		return tuple;
 	}
 
-	protected String Check(SelectionKey key, ByteBuffer command) throws IOException {
-		String sendtext;
-		ByteBuffer data = check1(command);
-		send(key, command, data);
+	public ByteBuffer IN(int id,int mode, ByteBuffer command) {
+		Tuple tuple,temp=null;
+		tuple = tuple_space[id];
+
+		//wを無視
+		while(tuple != null && tuple.next != null && (tuple.mode == 'w')){
+			temp = tuple;
+			tuple = tuple.next;
+		}
+
+		if (tuple != null && (tuple.mode == 'o')){
+			ByteBuffer data = tuple.data;
+			tupleIsAvailable(command, mode, tuple, id, temp);
+			return data;
+		} 
+		return null;
+	}
+	
+	private void tupleIsAvailable(ByteBuffer command, int mode, Tuple tuple,
+			int id, Tuple temp) {
+		//tmpTuple = new Tuple((SocketChannel)key.channel());
+		int seq = command.getInt(PSX.LINDA_SEQ_OFFSET);
+		command.rewind();
+		tuple.setCommand('a', seq);
+
+		if(debug){
+			int sendsize = tuple.getdataLength()+PSX.LINDA_HEADER_SIZE;
+			System.out.println("send size "+sendsize+" : mode = "+(char)tuple.getMode());
+		}
+			
+
 		
-		sendtext = getdataString(data);
-	    
-	    return sendtext;
+		//INの場合はremoveする
+		if(mode == PSX.PSX_IN) {
+			if(tuple.data != null){
+				//ByteBuffer buff = ByteBuffer.allocate(0);
+				//tmpTuple.setData(buff);
+				tuple.data = null;
+			}
+			if(temp != null){
+				temp.next = tuple.next;
+			}
+			else {
+				tuple_space[id] = tuple.next;
+			}
+		}
 	}
 
-	private ByteBuffer check1(ByteBuffer command) {
+	private Tuple setupWait(SelectionKey key, ByteBuffer command, int mode,
+			Tuple tuple, int id) {
+		if(tuple == null) {
+			//ServerSocketChannel sc = (ServerSocketChannel)key.channel();
+			tuple = tuple_space[id] = new Tuple((SocketChannel)key.channel());
+			tuple.next = null;
+		}else {
+			while(tuple.next !=null) tuple =tuple.next;
+			tuple.next= new Tuple((SocketChannel)key.channel());
+			tuple = tuple.next;
+			tuple.next = null;
+		}
+		
+		tuple.setMode(mode);
+		int seq2 = command.getInt(PSX.LINDA_SEQ_OFFSET);
+		command.rewind();
+		tuple.setSeq(seq2);
+		tuple.ch = (SocketChannel) key.channel();
+		tuple.setDataLength(0);
+		ByteBuffer buff = ByteBuffer.allocate(0);
+		buff.rewind();
+		tuple.setData(buff);
+		tuple = null;
+			
+		if(debug){
+			System.out.println("data inserted insert seq = "+seq2 +", id = "+id);
+		}
+		return tuple;
+	}
+
+	protected void Check(SelectionKey key, ByteBuffer command) throws IOException {
+		ByteBuffer data = check1(key,command);
+		send(key, command, data);
+	}
+
+	public ByteBuffer check1(SelectionKey key,ByteBuffer command) {
 		ByteBuffer data;
 		Tuple tmpTuple;
 		int id;
 		char idc = (char)command.getShort(PSX.LINDA_ID_OFFSET);
 		command.rewind();
 		id = (int)idc;
+		int seq = command.getInt(PSX.LINDA_SEQ_OFFSET);
+		command.rewind();
+		hook.checkHook(key,id,seq,'c');
 		
 		tmpTuple = tuple_space[id];
 		while(tmpTuple != null && tmpTuple.next != null && (tmpTuple.mode == 'w')){
@@ -263,64 +304,44 @@
 		return data;
 	}
 
-	public void send(SocketChannel ch, ByteBuffer command, ByteBuffer data)
-			throws IOException {
-				if (debug) {
-			        if (command == null) {
-			            System.out.println("Manager_run: command is null");
-			        }
-			        if (data == null) {
-			            System.out.println("Manager_run: data is null");
-			        }
-				}
-				int send_size = PSX.LINDA_HEADER_SIZE;
-				int count = 0;
-			
-				//command Send
-				command.rewind();
-				while(send_size > 0){
-					count = ch.write(command);
-					if(count < 0){
-						System.out.println("Write Falied! close ch:"+ch);
-						ch.close();
-						return;
-					}
-					send_size -= count;
-				}
-				
-				//data Send
-				data.rewind();
-				if(data != null) {
-					data.rewind();
-					ch.write(data);
-				}
+	public void send(SocketChannel ch, ByteBuffer command, ByteBuffer data) {
+		if (debug) {
+			if (command == null) {
+				System.out.println("Manager_run: command is null");
+			}
+			if (data == null) {
+				System.out.println("Manager_run: data is null");
+			}
+		}
+		int send_size = PSX.LINDA_HEADER_SIZE;
+		int count = 0;
+
+		try {
+			//command Send
+			command.rewind();
+			while(send_size > 0){
+				count = ch.write(command);
+				if(count < 0) throw new IOException();
+				send_size -= count;
 			}
 
-	public void send(SelectionKey key, ByteBuffer command, ByteBuffer data)
-			throws IOException {
-				SocketChannel ch = (SocketChannel)key.channel();
-				send(ch,command,data);
+			if (data==null) return;
+			//data Send
+			data.rewind();
+			while(data.remaining() > 0){
+				count = ch.write(data);
+				if(count < 0) throw new IOException();
 			}
-
-	private String getdataString(ByteBuffer data) {
-		String sendtext;
-		data.rewind();
-		//set sendtext
-		//CharBuffer chardata = data.asCharBuffer();
-		
-		//Decode UTF-8 to System Encoding(UTF-16) 
-		Charset charset = Charset.forName("UTF-8");
-		CharsetDecoder decoder = charset.newDecoder();
-		CharBuffer cb = null;
-		try {
-			cb = decoder.decode(data);
-		} catch (CharacterCodingException e) {
-			e.printStackTrace();
+		} catch (IOException e) {
+			System.out.println("Write Falied on:"+ch);
+			return;
 		}
-		cb.rewind();
-		
-		sendtext = cb.toString();
-		return sendtext;
 	}
 
+	public void send(SelectionKey key, ByteBuffer command, ByteBuffer data) {
+		SocketChannel ch = (SocketChannel)key.channel();
+		send(ch,command,data);
+	}
+
+
 }
\ No newline at end of file