view src/fdl/TupleSpace.java @ 17:609b288f47f9

*** empty log message ***
author kono
date Mon, 18 Aug 2008 07:28:29 +0900 (2008-08-17)
parents cccf34386cad
children 0243987383b7
line wrap: on
line source
package fdl;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.nio.charset.CharacterCodingException;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;

public class TupleSpace {
    static final boolean debug = true;
    static final int CAPSIZE = 4194304;
    public Tuple[] tuple_space;
    
	public TupleSpace(Tuple[] _tuple_space) {
		super();
		// 読みこんだデータを格納するためのリストの初期化
        tuple_space = _tuple_space;      
	}
    
	public TupleSpace() {
		// TODO Auto-generated constructor stub
	}

	protected String Out(ByteBuffer command, ByteBuffer data) throws IOException {
		Tuple tuple;
		int id;
		int datasize;
		char idc = (char)command.getShort(PSX.LINDA_ID_OFFSET);
		command.rewind();
		id = (int)idc;
		String sendtext = "none";
		
		datasize = command.getInt(PSX.LINDA_DATA_LENGTH_OFFSET);
		command.rewind();
		
		System.out.println("*** out command : id = "+id +" ***");
		
		while((tuple_space[id] != null) &&
				((tuple_space[id].mode == PSX.PSX_WAIT_RD)||(tuple_space[id].mode == PSX.PSX_RD))) {
			PSX.setAnserCommand(command, tuple_space[id].getSeq());
			//if(debug){
				//int sendsize = tmpTuple.getdataLength()+PSX.LINDA_HEADER_SIZE;
				//System.out.println("send size "+sendsize+" : mode = "+(char)mode);
			//}
			//ByteBuffer sendcommand = tmpTuple.getCommand();
			//ByteBuffer senddata = tmpTuple.getData();
			send(tuple_space[id].ch, command, data);

			sendtext = getdataString(data);

			
			removeTuple(id);
			tuple = null;
		}
		if(tuple_space[id] != null && tuple_space[id].mode == PSX.PSX_IN) {
			PSX.setAnserCommand(command, tuple_space[id].getSeq());
			
			if(debug){
				int sendsize = datasize+PSX.LINDA_HEADER_SIZE;
				System.out.println("send size "+sendsize+" : mode = "+(char)'a');
			}
			//ByteBuffer sendcommand = tmpTuple.getCommand();
			//ByteBuffer senddata = tmpTuple.getData();
			send(tuple_space[id].ch, command, data);

			sendtext = getdataString(data);
			
			removeTuple(id);
			tuple = null;
		} else if ((tuple_space[id] == null)|| (tuple_space[id].getMode() == PSX.PSX_OUT)) {
			if((tuple = tuple_space[id]) == null) {
				tuple = tuple_space[id] = new Tuple();
				tuple.next = null;
			}
			else {
				while(tuple.next != null) tuple = tuple.next;
				tuple.next = new Tuple();
				tuple = tuple.next;
				tuple.next = null;
			}
	
			tuple.setMode('o');
			int seq = command.getInt(PSX.LINDA_SEQ_OFFSET);
			command.rewind();
			tuple.setSeq(seq);
			tuple.setData(data);
			tuple.setDataLength(datasize);
			if(debug){
				System.out.println("data inserted len = "+tuple.getdataLength()+" : id = "+id);
			}
		}
		else {
			System.out.println("Incorrect mode :"+(char)tuple_space[id].getMode());
			command.clear();
			data.clear();
			System.exit(1);
		}
		return sendtext;
	}

	private void removeTuple(int id) {
		Tuple tuple;
		//後処理
		tuple = tuple_space[id];
		tuple_space[id] = tuple.next;
	}

	protected void Wait_Rd(SelectionKey key, ByteBuffer command, int mode) {
		Tuple tuple;
		int id;

		char idc = (char)command.getShort(PSX.LINDA_ID_OFFSET);
		command.rewind();
		id = (int)idc;
		
		if (debug)
			System.out.println("*** "+(char)mode+" command : id = "+ id +" ***\n");
		
		tuple = new Tuple();
		tuple.setMode(mode);
		int seq = command.getInt(PSX.LINDA_SEQ_OFFSET);
		command.rewind();
		tuple.setSeq(seq);
		tuple.ch = (SocketChannel) key.channel();
		tuple.setDataLength(0);
		ByteBuffer buff = ByteBuffer.allocate(0);
		tuple.setData(buff);
		tuple.next = tuple_space[id];
		tuple_space[id] = tuple;
		if(debug){
			System.out.println("data inserted insert seq = "+seq +", id = "+id);
		}
	}

	protected String In_Rd(SelectionKey key, ByteBuffer command, int mode)
			throws IOException {
		Tuple tuple = read_in_1(key, command, mode);

		if (tuple!=null) {
		//send
			ByteBuffer sendcommand = tuple.getCommand();
			ByteBuffer senddata = tuple.getData();
			send(key,sendcommand, senddata);
		}
		String sendtext = getdataString(tuple.getData());
		return sendtext;
	}

	private Tuple read_in_1(SelectionKey key, ByteBuffer command, int mode) {
		Tuple tuple;
		int id;
		//id = command.getInt(PSX.LINDA_ID_OFFSET);
		//int mode = command.getInt(PSX.LINDA_MODE_OFFSET);
		Tuple temp = null;
		
		char idc = (char)command.getShort(PSX.LINDA_ID_OFFSET);
		command.rewind();
		id = (int)idc;
		
		
		System.out.println("*** "+(char)mode+" command : id = "+ id +" ***\n");    		
		
		tuple = tuple_space[id];
			
		//wを無視
		while(tuple != null && tuple.next != null && (tuple.mode == 'w')){
			temp = tuple;
			tuple = tuple.next;
		}
		
		if (tuple != null && (tuple.mode == 'o')){
			//tmpTuple = new Tuple((SocketChannel)key.channel());
			int seq = command.getInt(PSX.LINDA_SEQ_OFFSET);
			command.rewind();
			tuple.setCommand('a', seq);
			
			if(debug){
				int sendsize = tuple.getdataLength()+PSX.LINDA_HEADER_SIZE;
				System.out.println("send size "+sendsize+" : mode = "+(char)tuple.getMode());
			}
				

			
			//INの場合はremoveする
			if(mode == PSX.PSX_IN) {
				if(tuple.data != null){
					//ByteBuffer buff = ByteBuffer.allocate(0);
					//tmpTuple.setData(buff);
					tuple.data = null;
				}
				if(temp != null){
					temp.next = tuple.next;
				}
				else {
					tuple_space[id] = tuple.next;
				}
			}
		} else {
			if(tuple == null) {
				//ServerSocketChannel sc = (ServerSocketChannel)key.channel();
				tuple = tuple_space[id] = new Tuple((SocketChannel)key.channel());
				tuple.next = null;
			}else {
				while(tuple.next !=null) tuple =tuple.next;
				tuple.next= new Tuple((SocketChannel)key.channel());
				tuple = tuple.next;
				tuple.next = null;
			}
			
			tuple.setMode(mode);
			int seq2 = command.getInt(PSX.LINDA_SEQ_OFFSET);
			command.rewind();
			tuple.setSeq(seq2);
			tuple.ch = (SocketChannel) key.channel();
			tuple.setDataLength(0);
			ByteBuffer buff = ByteBuffer.allocate(0);
			buff.rewind();
			tuple.setData(buff);
			tuple = null;
				
			if(debug){
				System.out.println("data inserted insert seq = "+seq2 +", id = "+id);
			}
		}
		return tuple;
	}

	protected String Check(SelectionKey key, ByteBuffer command) throws IOException {
		String sendtext;
		ByteBuffer data = check1(command);
		send(key, command, data);
		
		sendtext = getdataString(data);
	    
	    return sendtext;
	}

	private ByteBuffer check1(ByteBuffer command) {
		ByteBuffer data;
		Tuple tmpTuple;
		int id;
		char idc = (char)command.getShort(PSX.LINDA_ID_OFFSET);
		command.rewind();
		id = (int)idc;
		
		tmpTuple = tuple_space[id];
		while(tmpTuple != null && tmpTuple.next != null && (tmpTuple.mode == 'w')){
			tmpTuple = tmpTuple.next;
		}
		if (tmpTuple != null && (tmpTuple.mode == 'o')) {
			command.putInt(PSX.LINDA_DATA_LENGTH_OFFSET, tmpTuple.datalen);
			command.rewind();
			data = tmpTuple.getData();
		}else {
			//means no out tuple
			command.putInt(PSX.LINDA_DATA_LENGTH_OFFSET, 0);
			command.rewind();
			data = ByteBuffer.allocate(0);
		}
		return data;
	}

	public void send(SocketChannel ch, ByteBuffer command, ByteBuffer data)
			throws IOException {
				if (debug) {
			        if (command == null) {
			            System.out.println("Manager_run: command is null");
			        }
			        if (data == null) {
			            System.out.println("Manager_run: data is null");
			        }
				}
				int send_size = PSX.LINDA_HEADER_SIZE;
				int count = 0;
			
				//command Send
				command.rewind();
				while(send_size > 0){
					count = ch.write(command);
					if(count < 0){
						System.out.println("Write Falied! close ch:"+ch);
						ch.close();
						return;
					}
					send_size -= count;
				}
				
				//data Send
				data.rewind();
				if(data != null) {
					data.rewind();
					ch.write(data);
				}
			}

	public void send(SelectionKey key, ByteBuffer command, ByteBuffer data)
			throws IOException {
				SocketChannel ch = (SocketChannel)key.channel();
				send(ch,command,data);
			}

	private String getdataString(ByteBuffer data) {
		String sendtext;
		data.rewind();
		//set sendtext
		//CharBuffer chardata = data.asCharBuffer();
		
		//Decode UTF-8 to System Encoding(UTF-16) 
		Charset charset = Charset.forName("UTF-8");
		CharsetDecoder decoder = charset.newDecoder();
		CharBuffer cb = null;
		try {
			cb = decoder.decode(data);
		} catch (CharacterCodingException e) {
			e.printStackTrace();
		}
		cb.rewind();
		
		sendtext = cb.toString();
		return sendtext;
	}

}