view src/fdl/PSX.java @ 67:264123227f0d

large tuple case (suggestion)
author one
date Sat, 06 Jun 2009 14:33:59 +0900
parents d5bca4b5ee95
children 96c63bc659d4
line wrap: on
line source


/*
 * @(#)PSXQueueInterface.java       1.1 06/04/01
 *
 * Copyright 2006  Shinji KONO
 * 

   PSX Lidna
     Trasport layer of PSX Linda library

 */

package fdl;

import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.CharBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.nio.charset.CharacterCodingException;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;



/**
  PSX Tuple Command Protocol Format

	All PSX offset command operation should be here.
	Some public library
 */

public class PSX {
    static final int PSX_IN =		'i';
    static final int PSX_OUT =		'o';
    static final int PSX_UPDATE =	'u';
    static final int PSX_RD =		'r';
    static final int PSX_CHECK =	'c';
    static final int PSX_REPLY =	'?';
    static final int PSX_WAIT_RD =	'w';
    static final int PSX_ANSWER =	'a';
    static final int PSX_HTTP_ANSWER =	'P';   // Put
    static final int PSX_HTTP_REQUEST =	'G';  // Get
    static final int PSX_COM_DEBUG = 'D';  //Communication DEBUG
    
    static final int LINDA_PACKET_LENGTH_OFFSET	=0;
    static final int LINDA_MODE_OFFSET          =0+4;
    static final int LINDA_ID_OFFSET            =1+4;
    static final int LINDA_SEQ_OFFSET           =3+4;
    static final int LINDA_DATA_LENGTH_OFFSET   =7+4;
    static final int LINDA_HEADER_SIZE          =12+4;
    static final int INT_SIZE          =4;
    static final int SHORT_SIZE          =2;
    
    static final int PRIVILEGED_ID_START   = 32768;
    static final int PRIVILEGED_ID_END   = 36864;
	static public final int META_STOP = PRIVILEGED_ID_START;
	static public final int META_MONITOR = PRIVILEGED_ID_START+1;
	static public final int META_MONITOR_DATA = PRIVILEGED_ID_START+2;
    
	
	static void printCommand(String comment, ByteBuffer command, ByteBuffer data) {
		char id = (char)command.getShort(LINDA_ID_OFFSET);
		System.err.println(comment+" LENGTH:"+command.getInt(LINDA_PACKET_LENGTH_OFFSET)+" "+
				"MODE:"+(char)command.get(LINDA_MODE_OFFSET)+" "+
				"ID:"+(int)id+" "+
				"SEQ:"+command.getInt(LINDA_SEQ_OFFSET)+" "+
				"DATA LENGTH:"+command.getInt(LINDA_DATA_LENGTH_OFFSET)+" ");
		if(data!=null) {
			System.err.println("DATA:"+data);
			data.rewind();
		}
		command.rewind();
	}
	
	static void printData(String comment,ByteBuffer command) {
		/*** print data ***/
		char id = (char)command.getShort(LINDA_ID_OFFSET);
		System.out.println(comment+" LENGTH:"+command.getInt(LINDA_PACKET_LENGTH_OFFSET)+" "+
				"MODE:"+(char)command.get(LINDA_MODE_OFFSET)+" "+
				"ID:"+(int)id+" "+
				"SEQ:"+command.getInt(LINDA_SEQ_OFFSET)+" ");
		command.rewind();
	}
	
	
	static ByteBuffer setCommand(int _mode, int _id, int _seq, ByteBuffer data) {
		int _datalen = data==null?0:data.remaining();
		ByteBuffer command = ByteBuffer.allocate(LINDA_HEADER_SIZE);
		command.order(ByteOrder.BIG_ENDIAN);
	
		command.putInt(LINDA_PACKET_LENGTH_OFFSET,
					_datalen+LINDA_HEADER_SIZE-INT_SIZE);
		command.put(LINDA_MODE_OFFSET,(byte)_mode);
		command.putShort(LINDA_ID_OFFSET,(short)_id);
		command.putInt(LINDA_SEQ_OFFSET,_seq);
		command.putInt(LINDA_DATA_LENGTH_OFFSET,_datalen);
		command.rewind();
		return command;
	}

	static void setAnserCommand(ByteBuffer command, int seq) {
		command.put(LINDA_MODE_OFFSET, (byte)'a');
		command.putInt(LINDA_SEQ_OFFSET, seq);
		command.rewind();
	}

	public static ByteBuffer string2ByteBuffer(String log) {
		ByteBuffer blog = ByteBuffer.allocate(log.length()*2); // this is incorrect...
		for(int i=0;i<log.length();i++) {
			blog.putChar(log.charAt(i));
		}
		blog.flip();
		return blog;
	}

	public static String getdataString(ByteBuffer data) {
		String sendtext;
		//Decode UTF-8 to System Encoding(UTF-16) 
		Charset charset = Charset.forName("UTF-8");
		CharsetDecoder decoder = charset.newDecoder();
		CharBuffer cb = null;
		try {
			cb = decoder.decode(data);
		} catch (CharacterCodingException e) {
		}
		cb.rewind();
		data.rewind();
	
		sendtext = cb.toString();
		return sendtext;
	}

	public static void send(SocketChannel ch, ByteBuffer command, ByteBuffer data) {
		// if datalen in the header is different from ByteBuffer remaining(), we lost
		// protocol synchronization. Make sure to have correct length now.
		if (true) { 
			int datalen = data==null?0:(data.limit()-data.position());
//			if (command.getInt(LINDA_DATA_LENGTH_OFFSET)>0 && (data==null||data.remaining()!=command.getInt(LINDA_DATA_LENGTH_OFFSET))) {
//				System.err.println("Missing data.");
//			}
			command.putInt(LINDA_DATA_LENGTH_OFFSET,datalen); 
			command.putInt(LINDA_PACKET_LENGTH_OFFSET,datalen+LINDA_HEADER_SIZE-INT_SIZE);
			command.rewind();
		}
		try {
			//command Send
			while(command.remaining() > 0){
				int count = ch.write(command);
				if(count <= 0) throw new IOException();
			}
	
			command.rewind();
			if (data==null) return;
			//data Send
			while(data.remaining() > 0){
				int count = ch.write(data);
				if(count < 0) throw new IOException();
			}
		} catch (IOException e) {
			System.out.println("Write Falied on:"+ch);
			return;
		}
		// command or data may be shared among PSX queue
		data.rewind();
	}

	public static void send(SelectionKey key, ByteBuffer command, ByteBuffer data) {
		SocketChannel ch = (SocketChannel)key.channel();
		send(ch,command,data);
	}

	static String getRemoteHostAndPort(SocketChannel ch) {
		String socketString = ch.socket().getRemoteSocketAddress().toString();
		String[] split = socketString.split("/");
		int length = split.length;
		String hostAndPort = split[length-1];
		split = hostAndPort.split(":");
		String host = split[0];
		String port = split[1];
		int portnum = Integer.parseInt(port);
		try {
			InetSocketAddress address = new InetSocketAddress(InetAddress.getByName(host), portnum);
			host = address.getHostName().toString();
	        return (host +":"+port);
	    }
	    catch( UnknownHostException e ){
	    	return hostAndPort;
	    }		
	}

	static String getLocalHostAndPort(SocketChannel ch) {
		String socketString = ch.socket().getLocalSocketAddress().toString();
		String[] split = socketString.split("/");
		int length = split.length;
		String hostAndPort = split[length-1];
		split = hostAndPort.split(":");
		String host = split[0];
		String port = split[1];
		try {
	        InetAddress localhost = InetAddress.getLocalHost();
	        host = localhost.getHostName();            
	        return (host +":"+port);
	    }
	    catch( UnknownHostException e ){
	    	return (host +":"+port);
	    }
	}

	static void receive(SocketChannel channel, ByteBuffer command, int readsize)
			throws IOException {
		int count;
		if (command.capacity()!=readsize) {
    		System.err.println("read size mismatch"+readsize+" and "+command.capacity());
		}
		while(readsize>0) {
	    	if(false && IOHandler.debug){
	    		System.out.println("reading packet..."+readsize);
	    	}
	    	count = channel.read(command);
	    	if(count<0) 
	    		throw new IOException();
	    	readsize -= count;
		}
	    command.rewind();
	}

	static ByteBuffer receivePacket(SocketChannel channel, ByteBuffer command)
			throws IOException {
		/**
		 * Receive a command and data according to the command.
		 * 
		 * This read could be very large. Since this Linda server is a single
		 * thread server, large reading is not welcome. In this case, we should
		 * start another thread for the large reading. If someone access the
		 * tuple it should be wait for the reading. This makes rather big changes.
		 * send routines has the same problem also. I think it is better to limit
		 * basic tuple size, and introduce cascading tuple type.
		 * We should try it later.
		 */
	    receive(channel, command,LINDA_HEADER_SIZE); 
	    int datalen = command.getInt(LINDA_DATA_LENGTH_OFFSET);
	    int packetlen = command.getInt(LINDA_PACKET_LENGTH_OFFSET);
	    command.rewind();
	    if (datalen+LINDA_HEADER_SIZE-INT_SIZE!=packetlen) {
	    	System.err.println("Bad packet received. "+(datalen+LINDA_HEADER_SIZE-INT_SIZE)+"!="+packetlen);
	    	throw new IOException();
	    }
	    ByteBuffer data = ByteBuffer.allocate(datalen);
	    data.order(ByteOrder.BIG_ENDIAN);
//	    if (datalen>SINGLE_THREAD_READ_LIMIT) {
//	    	... make a thread for reading and mark reading flag
//	    } else
	    receive(channel, data, datalen);
		return data;
	}
    

}

/* end */