view src/fdl/IOParam.java @ 15:aced4bfc15af

add Meta Linda Interface for debugger.
author kono
date Sun, 17 Aug 2008 20:24:24 +0900
parents 4391c9fac885
children
line wrap: on
line source

package fdl;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.nio.charset.CharacterCodingException;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;

public class IOParam implements TupleHandler, PSXQueueInterface{
    static final boolean debug = true;
    static final int CAPSIZE = 4194304;
    public Tuple[] tuple_space;
    
	public IOParam(Tuple[] _tuple_space) {
		super();
		// 読みこんだデータを格納するためのリストの初期化
        tuple_space = _tuple_space;      
	}
    
	public IOParam() {
		// TODO Auto-generated constructor stub
	}

	protected String Out(ByteBuffer command, ByteBuffer data) throws IOException {
		Tuple tmpTuple;
		int id;
		int datasize;
		char idc = (char)command.getShort(LINDA_ID_OFFSET);
		command.rewind();
		id = (int)idc;
		String sendtext = "none";
		
		datasize = command.getInt(LINDA_DATA_LENGTH_OFFSET);
		command.rewind();
		
		System.out.println("*** out command : id = "+id +" ***");
		
		while((tuple_space[id] != null) &&
				((tuple_space[id].mode == PSX_WAIT_RD)||(tuple_space[id].mode == PSX_RD))) {
			command.put(LINDA_MODE_OFFSET, (byte)'a');
			command.rewind();
			command.putInt(LINDA_SEQ_OFFSET, tuple_space[id].getSeq());
			command.rewind();
			//if(debug){
				//int sendsize = tmpTuple.getdataLength()+LINDA_HEADER_SIZE;
				//System.out.println("send size "+sendsize+" : mode = "+(char)mode);
			//}
			//ByteBuffer sendcommand = tmpTuple.getCommand();
			//ByteBuffer senddata = tmpTuple.getData();
			send(tuple_space[id].ch, command, data);

			sendtext = getdataString(data);

			
			//後処理
			tmpTuple = tuple_space[id];
			tuple_space[id] = tmpTuple.next;
			tmpTuple = null;
		}
		if(tuple_space[id] != null && tuple_space[id].mode == PSX_IN) {
			command.put(LINDA_MODE_OFFSET, (byte)'a');
			command.rewind();
			command.putInt(LINDA_SEQ_OFFSET, tuple_space[id].getSeq());
			command.rewind();
			
			if(debug){
				int sendsize = datasize+LINDA_HEADER_SIZE;
				System.out.println("send size "+sendsize+" : mode = "+(char)'a');
			}
			//ByteBuffer sendcommand = tmpTuple.getCommand();
			//ByteBuffer senddata = tmpTuple.getData();
			send(tuple_space[id].ch, command, data);

			sendtext = getdataString(data);
			
			//後処理
			tmpTuple = tuple_space[id];
			tuple_space[id] = tmpTuple.next;
			tmpTuple = null;
		} else if ((tuple_space[id] == null)|| (tuple_space[id].getMode() == PSX_OUT)) {
			if((tmpTuple = tuple_space[id]) == null) {
				tmpTuple = tuple_space[id] = new Tuple();
				tmpTuple.next = null;
			}
			else {
				while(tmpTuple.next != null) tmpTuple = tmpTuple.next;
				tmpTuple.next = new Tuple();
				tmpTuple = tmpTuple.next;
				tmpTuple.next = null;
			}
	
			tmpTuple.setMode('o');
			int seq = command.getInt(LINDA_SEQ_OFFSET);
			command.rewind();
			tmpTuple.setSeq(seq);
			tmpTuple.setData(data);
			tmpTuple.setDataLength(datasize);
			if(debug){
				System.out.println("data inserted len = "+tmpTuple.getdataLength()+" : id = "+id);
			}
		}
		else {
			System.out.println("Uncorrect mode :"+(char)tuple_space[id].getMode());
			command.clear();
			data.clear();
			System.exit(1);
		}
		return sendtext;
	}

	protected void Wait_Rd(SelectionKey key, ByteBuffer command, int mode) {
		Tuple tmpTuple;
		int id;

		char idc = (char)command.getShort(LINDA_ID_OFFSET);
		command.rewind();
		id = (int)idc;
		
		System.out.println("*** "+(char)mode+" command : id = "+ id +" ***\n");
		
		tmpTuple = new Tuple();
		tmpTuple.setMode(mode);
		int seq = command.getInt(LINDA_SEQ_OFFSET);
		command.rewind();
		tmpTuple.setSeq(seq);
		tmpTuple.ch = (SocketChannel) key.channel();
		tmpTuple.setDataLength(0);
		ByteBuffer buff = ByteBuffer.allocate(0);
		tmpTuple.setData(buff);
		tmpTuple.next = tuple_space[id];
		tuple_space[id] = tmpTuple;
		if(debug){
			System.out.println("data inserted insert seq = "+seq +", id = "+id);
		}
	}

	protected String In_Rd(SelectionKey key, ByteBuffer command, int mode)
			throws IOException {
		Tuple tmpTuple;
		int id;
		//id = command.getInt(LINDA_ID_OFFSET);
		//int mode = command.getInt(LINDA_MODE_OFFSET);
		Tuple temp = null;
		
		char idc = (char)command.getShort(LINDA_ID_OFFSET);
		command.rewind();
		id = (int)idc;
		
		String sendtext = "none";
		
		System.out.println("*** "+(char)mode+" command : id = "+ id +" ***\n");    		
		
		tmpTuple = tuple_space[id];
			
		//wを無視
		while(tmpTuple != null && tmpTuple.next != null && (tmpTuple.mode == 'w')){
			temp = tmpTuple;
			tmpTuple = tmpTuple.next;
		}
		
		if (tmpTuple != null && (tmpTuple.mode == 'o')){
			//tmpTuple = new Tuple((SocketChannel)key.channel());
			int seq = command.getInt(LINDA_SEQ_OFFSET);
			command.rewind();
			tmpTuple.setCommand('a', seq);
			
			if(debug){
				int sendsize = tmpTuple.getdataLength()+LINDA_HEADER_SIZE;
				System.out.println("send size "+sendsize+" : mode = "+(char)tmpTuple.getMode());
			}
				
			//send
			ByteBuffer sendcommand = tmpTuple.getCommand();
			ByteBuffer senddata = tmpTuple.getData();
			send(key,sendcommand, senddata);
			
			sendtext = getdataString(senddata);
			
			
			//INの場合はremoveする
			if(mode == PSX_IN) {
				if(tmpTuple.data != null){
					//ByteBuffer buff = ByteBuffer.allocate(0);
					//tmpTuple.setData(buff);
					tmpTuple.data = null;
				}
				if(temp != null){
					temp.next = tmpTuple.next;
				}
				else {
					tuple_space[id] = tmpTuple.next;
				}
				tmpTuple = null;
			}
		} else {
			if(tmpTuple == null) {
				//ServerSocketChannel sc = (ServerSocketChannel)key.channel();
				tmpTuple = tuple_space[id] = new Tuple((SocketChannel)key.channel());
				tmpTuple.next = null;
			}else {
				while(tmpTuple.next !=null) tmpTuple =tmpTuple.next;
				tmpTuple.next= new Tuple((SocketChannel)key.channel());
				tmpTuple = tmpTuple.next;
				tmpTuple.next = null;
			}
			
			tmpTuple.setMode(mode);
			int seq2 = command.getInt(LINDA_SEQ_OFFSET);
			command.rewind();
			tmpTuple.setSeq(seq2);
			tmpTuple.ch = (SocketChannel) key.channel();
			tmpTuple.setDataLength(0);
			ByteBuffer buff = ByteBuffer.allocate(0);
			buff.rewind();
			tmpTuple.setData(buff);
				
			if(debug){
				System.out.println("data inserted insert seq = "+seq2 +", id = "+id);
			}
		}
		
		//} else if (command.getInt(LINDA_MODE_OFFSET) == PSX_WAIT_RD) {
		return sendtext;
	}

	protected String Check(SelectionKey key, ByteBuffer command) throws IOException {
		ByteBuffer data;
		Tuple tmpTuple;
		int id;
		char idc = (char)command.getShort(LINDA_ID_OFFSET);
		command.rewind();
		id = (int)idc;
		String sendtext = "none";
		
		tmpTuple = tuple_space[id];
		while(tmpTuple != null && tmpTuple.next != null && (tmpTuple.mode == 'w')){
			tmpTuple = tmpTuple.next;
		}
		if (tmpTuple != null && (tmpTuple.mode == 'o')) {
			command.putInt(LINDA_DATA_LENGTH_OFFSET, tmpTuple.datalen);
			command.rewind();
			data = tmpTuple.getData();
		}else {
			//means no out tuple
			command.putInt(LINDA_DATA_LENGTH_OFFSET, 0);
			command.rewind();
			data = null;
			data = ByteBuffer.allocate(0);
		}
		send(key, command, data);
		
		sendtext = getdataString(data);
	    
	    return sendtext;
	}

	public void send(SocketChannel ch, ByteBuffer command, ByteBuffer data)
			throws IOException {
				if (debug) {
			        if (command == null) {
			            System.out.println("Manager_run: command is null");
			        }
			        if (data == null) {
			            System.out.println("Manager_run: data is null");
			        }
				}
				int send_size = LINDA_HEADER_SIZE;
				int count = 0;
			
				//command Send
				command.rewind();
				while(send_size > 0){
					count = ch.write(command);
					if(count < 0){
						System.out.println("Write Falied! close ch:"+ch);
						ch.close();
						return;
					}
					send_size -= count;
				}
				
				//data Send
				data.rewind();
				if(data != null) {
					data.rewind();
					ch.write(data);
				}
			}

	public void send(SelectionKey key, ByteBuffer command, ByteBuffer data)
			throws IOException {
				SocketChannel ch = (SocketChannel)key.channel();
				send(ch,command,data);
			}

	private String getdataString(ByteBuffer data) {
		String sendtext;
		data.rewind();
		//set sendtext
		//CharBuffer chardata = data.asCharBuffer();
		
		//Decode UTF-8 to System Encoding(UTF-16) 
		Charset charset = Charset.forName("UTF-8");
		CharsetDecoder decoder = charset.newDecoder();
		CharBuffer cb = null;
		try {
			cb = decoder.decode(data);
		} catch (CharacterCodingException e) {
			e.printStackTrace();
		}
		cb.rewind();
		
		sendtext = cb.toString();
		return sendtext;
	}

	public void handle(SelectionKey key) throws ClosedChannelException,
			IOException {
		
	}
}