view src/fdl/TupleSpace.java @ 25:330fa49bc4fd

*** empty log message ***
author kono
date Wed, 20 Aug 2008 14:12:15 +0900
parents 35375016b2f0
children 846c6c14cf04
line wrap: on
line source

package fdl;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;

public class TupleSpace {
    static final boolean debug = true;
	public static int user = 0;
	public byte userchar[] = new byte[2];
    public Tuple[] tuple_space;
    public IOHandlerHook hook = new NullIOHandlerHook();
	public static final int MAX_TUPLE_ID = 65536;
    
	public TupleSpace() {
		// 読みこんだデータを格納するためのリストの初期化
        tuple_space = new Tuple[TupleSpace.MAX_TUPLE_ID];      
	}
    

	public void newUser() {
		Tuple tmpTuple;
		//初期生成        
        if((tmpTuple = tuple_space[TupleSpace.MAX_TUPLE_ID-1]) == null) {
        	tmpTuple = tuple_space[TupleSpace.MAX_TUPLE_ID-1] = new Tuple();
        	tmpTuple.next = null;
        } else {
        	while(tmpTuple.next != null) tmpTuple = tmpTuple.next;
        	tmpTuple.next = new Tuple();
        	tmpTuple = tmpTuple.next;
        	tmpTuple.next = null;
        }
        
        user++;
        
        ByteBuffer data = ByteBuffer.allocate(2);
        userchar[0] = (byte) (user/10 + '0');
        userchar[1] = (byte) (user%10 + '0');
        data.put(userchar[0]);
        data.put(userchar[1]);
        data.flip();
        
        tmpTuple.setData(data);
        //Tuple
        int id = TupleSpace.MAX_TUPLE_ID-1;
        int seq = 0;
        tmpTuple.setTuple('o', id, seq, data);
        System.out.println("Server: assign id "+user);
	}
	
	protected void Out(SelectionKey key,ByteBuffer command, ByteBuffer data) {
		Tuple tuple;
		int id;
		int datasize;
		char idc = (char)command.getShort(PSX.LINDA_ID_OFFSET);
		command.rewind();
		id = (int)idc;

		datasize = command.getInt(PSX.LINDA_DATA_LENGTH_OFFSET);
		command.rewind();
		
		if (debug) System.out.println("*** out command : id = "+id +" ***");
		int seq = command.getInt(PSX.LINDA_SEQ_OFFSET);
		command.rewind();
		hook.outHook(key,id,seq,'o',data);    
		
		while((tuple_space[id] != null) &&
				((tuple_space[id].mode == PSX.PSX_WAIT_RD)||(tuple_space[id].mode == PSX.PSX_RD))) {
			PSX.setAnserCommand(command, tuple_space[id].getSeq());
			PSX.send(tuple_space[id].ch, command, data);

			removeTuple(id);
			tuple = null;
		}
		if(tuple_space[id] != null && tuple_space[id].mode == PSX.PSX_IN) {
			PSX.setAnserCommand(command, tuple_space[id].getSeq());
			
			if(debug){
				int sendsize = datasize+PSX.LINDA_HEADER_SIZE;
				System.out.println("send size "+sendsize+" : mode = "+(char)'a');
			}
			PSX.send(tuple_space[id].ch, command, data);
			removeTuple(id);
			tuple = null;
		} else if ((tuple_space[id] == null)|| (tuple_space[id].getMode() == PSX.PSX_OUT)) {
			if((tuple = tuple_space[id]) == null) {
				tuple = tuple_space[id] = new Tuple();
				tuple.next = null;
			}
			else {
				while(tuple.next != null) tuple = tuple.next;
				tuple.next = new Tuple();
				tuple = tuple.next;
				tuple.next = null;
			}
	
			tuple.setMode('o');
			tuple.setSeq(seq);
			tuple.setData(data);
			if(debug){
				System.out.println("data inserted len = "+tuple.getdataLength()+" : id = "+id);
			}
		}
		else {
			System.out.println("Incorrect mode :"+(char)tuple_space[id].getMode());
			command.clear();
			data.clear();
			System.exit(1);
		}
	}

	private void removeTuple(int id) {
		Tuple tuple;
		//後処理
		tuple = tuple_space[id];
		tuple_space[id] = tuple.next;
	}

	protected void Wait_Rd(SelectionKey key, ByteBuffer command, int mode) {
		Tuple tuple;
		int id;

		char idc = (char)command.getShort(PSX.LINDA_ID_OFFSET);
		command.rewind();
		id = (int)idc;
		
		if (debug)
			System.out.println("*** "+(char)mode+" command : id = "+ id +" ***\n");
		
		tuple = new Tuple();
		tuple.setMode(mode);
		int seq = command.getInt(PSX.LINDA_SEQ_OFFSET);
		command.rewind();
		tuple.setSeq(seq);
		
		hook.waitReadHook(key,id,seq,(char)mode);
		
		tuple.ch = (SocketChannel) key.channel();
		ByteBuffer buff = ByteBuffer.allocate(0);
		tuple.setData(buff);
		tuple.next = tuple_space[id];
		tuple_space[id] = tuple;
		if(debug){
			System.out.println("data inserted insert seq = "+seq +", id = "+id);
		}
	}

	protected void In_Rd(SelectionKey key, ByteBuffer command, int mode)
			throws IOException {
		Tuple tuple = read_in_1(key, command, mode);

		if (tuple!=null) {
		//send
			ByteBuffer sendcommand = tuple.getCommand();
			ByteBuffer senddata = tuple.getData();
			PSX.send(key,sendcommand, senddata);
		}
	}

	private Tuple read_in_1(SelectionKey key, ByteBuffer command, int mode) {
		Tuple tuple;
		int id;
		//id = command.getInt(PSX.LINDA_ID_OFFSET);
		//int mode = command.getInt(PSX.LINDA_MODE_OFFSET);
		Tuple temp = null;
		
		char idc = (char)command.getShort(PSX.LINDA_ID_OFFSET);
		command.rewind();
		id = (int)idc;
		int seq = command.getInt(PSX.LINDA_SEQ_OFFSET);
		command.rewind();
		
		if (debug) System.out.println("*** "+(char)mode+" command : id = "+ id +" ***\n");    		
		hook.inHook(key,id,seq,(char)mode);

		tuple = tuple_space[id];
			
		//wを無視
		while(tuple != null && tuple.next != null && (tuple.mode == 'w')){
			temp = tuple;
			tuple = tuple.next;
		}
		
		if (tuple != null && (tuple.mode == 'o')){
			tuple = tupleIsAvailable(command, mode, tuple, id, temp);
		} else {
			tuple = setupWait(key, command, mode, tuple, id);
		}
		return tuple;
	}

	public ByteBuffer IN(int id,int mode, ByteBuffer command) {
		Tuple tuple,temp=null;
		tuple = tuple_space[id];

		//wを無視
		while(tuple != null && tuple.next != null && (tuple.mode == 'w')){
			temp = tuple;
			tuple = tuple.next;
		}

		if (tuple != null && (tuple.mode == 'o')){
			ByteBuffer data = tuple.data;
			tupleIsAvailable(command, mode, tuple, id, temp);
			return data;
		} 
		return null;
	}
	
	private Tuple tupleIsAvailable(ByteBuffer command, int mode, Tuple tuple,
			int id, Tuple temp) {
		int seq = command.getInt(PSX.LINDA_SEQ_OFFSET);
		command.rewind();
		tuple.setCommand('a', seq);

		if(debug){
			int sendsize = tuple.getdataLength()+PSX.LINDA_HEADER_SIZE;
			System.out.println("send size "+sendsize+" : mode = "+(char)tuple.getMode());
		}
		//INの場合はremoveする
		if(mode == PSX.PSX_IN) {
			if(temp != null){
				temp.next = tuple.next;
			}
			else {
				tuple_space[id] = tuple.next;
			}
		}
		return tuple;
	}

	private Tuple setupWait(SelectionKey key, ByteBuffer command, int mode,
			Tuple tuple, int id) {
		if(tuple == null) {
			//ServerSocketChannel sc = (ServerSocketChannel)key.channel();
			tuple = tuple_space[id] = new Tuple((SocketChannel)key.channel());
			tuple.next = null;
		}else {
			while(tuple.next !=null) tuple =tuple.next;
			tuple.next= new Tuple((SocketChannel)key.channel());
			tuple = tuple.next;
			tuple.next = null;
		}
		
		tuple.setMode(mode);
		int seq2 = command.getInt(PSX.LINDA_SEQ_OFFSET);
		command.rewind();
		tuple.setSeq(seq2);
		tuple.ch = (SocketChannel) key.channel();
		ByteBuffer buff = ByteBuffer.allocate(0);
		tuple.setData(buff);
		tuple = null;
			
		if(debug){
			System.out.println("data inserted insert seq = "+seq2 +", id = "+id);
		}
		return tuple;
	}

	protected void Check(SelectionKey key, ByteBuffer command) throws IOException {
		ByteBuffer data = check1(key,command);
		PSX.send(key, command, data);
	}

	public ByteBuffer check1(SelectionKey key,ByteBuffer command) {
		ByteBuffer data;
		Tuple tmpTuple;
		int id;
		char idc = (char)command.getShort(PSX.LINDA_ID_OFFSET);
		command.rewind();
		id = (int)idc;
		int seq = command.getInt(PSX.LINDA_SEQ_OFFSET);
		command.rewind();
		hook.checkHook(key,id,seq,'c');
		
		tmpTuple = tuple_space[id];
		while(tmpTuple != null && tmpTuple.next != null && (tmpTuple.mode == 'w')){
			tmpTuple = tmpTuple.next;
		}
		if (tmpTuple != null && (tmpTuple.mode == 'o')) {
			command.putInt(PSX.LINDA_DATA_LENGTH_OFFSET, tmpTuple.getdataLength());
			command.rewind();
			data = tmpTuple.getData();
		}else {
			//means no out tuple
			command.putInt(PSX.LINDA_DATA_LENGTH_OFFSET, 0);
			command.rewind();
			data = ByteBuffer.allocate(0);
		}
		return data;
	}


}