Mercurial > hg > FederatedLinda
view src/fdl/PSX.java @ 34:e7c5958fd285
*** empty log message ***
author | kono |
---|---|
date | Sun, 24 Aug 2008 17:36:14 +0900 |
parents | 64071f8e2e0d |
children | fe338d497c72 |
line wrap: on
line source
/* * @(#)PSXQueueInterface.java 1.1 06/04/01 * * Copyright 2006 Shinji KONO * PSX Lidna Trasport layer of PSX Linda library */ package fdl; import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.UnknownHostException; import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.nio.CharBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.SocketChannel; import java.nio.charset.CharacterCodingException; import java.nio.charset.Charset; import java.nio.charset.CharsetDecoder; /** PSX Tuple Command Protocol Format All PSX offset command operation should be here. Some public library */ public class PSX { static final int PSX_IN = 'i'; static final int PSX_OUT = 'o'; static final int PSX_UPDATE = 'u'; static final int PSX_RD = 'r'; static final int PSX_CHECK = 'c'; static final int PSX_REPLY = '?'; static final int PSX_WAIT_RD = 'w'; static final int PSX_ANSWER = 'a'; static final int PSX_HTTP_ANSWER = 'P'; // Put static final int PSX_HTTP_REQUEST = 'G'; // Get static final int PSX_COM_DEBUG = 'D'; //Communication DEBUG static final int LINDA_PACKET_LENGTH_OFFSET =0; static final int LINDA_MODE_OFFSET =0+4; static final int LINDA_ID_OFFSET =1+4; static final int LINDA_SEQ_OFFSET =3+4; static final int LINDA_DATA_LENGTH_OFFSET =7+4; static final int LINDA_HEADER_SIZE =12+4; static final int INT_SIZE =4; static final int SHORT_SIZE =2; static final int PRIVILEGED_ID_START = 32768; static final int PRIVILEGED_ID_END = 36864; static public final int META_STOP = PRIVILEGED_ID_START; static public final int META_MONITOR = PRIVILEGED_ID_START+1; static public final int META_MONITOR_DATA = PRIVILEGED_ID_START+2; static void printCommand(String comment, ByteBuffer command, ByteBuffer data) { char id = (char)command.getShort(LINDA_ID_OFFSET); System.err.println(comment+" LENGTH:"+command.getInt(LINDA_PACKET_LENGTH_OFFSET)+" "+ "MODE:"+(char)command.get(LINDA_MODE_OFFSET)+" "+ "ID:"+(int)id+" "+ "SEQ:"+command.getInt(LINDA_SEQ_OFFSET)+" "+ "DATA LENGTH:"+command.getInt(LINDA_DATA_LENGTH_OFFSET)+" "); if(data!=null) { System.err.println("DATA:"+data); data.rewind(); } command.rewind(); } static void printData(String comment,ByteBuffer command) { /*** print data ***/ char id = (char)command.getShort(LINDA_ID_OFFSET); System.out.println(comment+" LENGTH:"+command.getInt(LINDA_PACKET_LENGTH_OFFSET)+" "+ "MODE:"+(char)command.get(LINDA_MODE_OFFSET)+" "+ "ID:"+(int)id+" "+ "SEQ:"+command.getInt(LINDA_SEQ_OFFSET)+" "); command.rewind(); } static ByteBuffer setCommand(int _mode, int _id, int _seq, ByteBuffer data) { int _datalen = data==null?0:data.remaining(); ByteBuffer command = ByteBuffer.allocate(LINDA_HEADER_SIZE); command.order(ByteOrder.BIG_ENDIAN); command.putInt(LINDA_PACKET_LENGTH_OFFSET, _datalen+LINDA_HEADER_SIZE-INT_SIZE); command.put(LINDA_MODE_OFFSET,(byte)_mode); command.putShort(LINDA_ID_OFFSET,(short)_id); command.putInt(LINDA_SEQ_OFFSET,_seq); command.putInt(LINDA_DATA_LENGTH_OFFSET,_datalen); command.rewind(); return command; } static void setAnserCommand(ByteBuffer command, int seq) { command.put(LINDA_MODE_OFFSET, (byte)'a'); command.putInt(LINDA_SEQ_OFFSET, seq); command.rewind(); } public static ByteBuffer string2ByteBuffer(String log) { ByteBuffer blog = ByteBuffer.allocate(log.length()*2); // this is incorrect... for(int i=0;i<log.length();i++) { blog.putChar(log.charAt(i)); } blog.flip(); return blog; } public static String getdataString(ByteBuffer data) { String sendtext; //Decode UTF-8 to System Encoding(UTF-16) Charset charset = Charset.forName("UTF-8"); CharsetDecoder decoder = charset.newDecoder(); CharBuffer cb = null; try { cb = decoder.decode(data); } catch (CharacterCodingException e) { } cb.rewind(); data.rewind(); sendtext = cb.toString(); return sendtext; } public static void send(SocketChannel ch, ByteBuffer command, ByteBuffer data) { // if datalen in the header is different from ByteBuffer remaining(), we lost // protocol synchronization. Make sure to have correct length now. if (true) { int datalen = data==null?0:(data.limit()-data.position()); if (command.getInt(LINDA_DATA_LENGTH_OFFSET)>0 && (data==null||data.remaining()!=command.getInt(LINDA_DATA_LENGTH_OFFSET))) { System.err.println("Missing data."); } command.putInt(LINDA_DATA_LENGTH_OFFSET,datalen); command.putInt(LINDA_PACKET_LENGTH_OFFSET,datalen+LINDA_HEADER_SIZE-INT_SIZE); command.rewind(); } try { //command Send while(command.remaining() > 0){ int count = ch.write(command); if(count <= 0) throw new IOException(); } command.rewind(); if (data==null) return; //data Send while(data.remaining() > 0){ int count = ch.write(data); if(count < 0) throw new IOException(); } } catch (IOException e) { System.out.println("Write Falied on:"+ch); return; } // command or data may be shared among PSX queue data.rewind(); } public static void send(SelectionKey key, ByteBuffer command, ByteBuffer data) { SocketChannel ch = (SocketChannel)key.channel(); send(ch,command,data); } static String getRemoteHostAndPort(SocketChannel ch) { String socketString = ch.socket().getRemoteSocketAddress().toString(); String[] split = socketString.split("/"); int length = split.length; String hostAndPort = split[length-1]; split = hostAndPort.split(":"); String host = split[0]; String port = split[1]; int portnum = Integer.parseInt(port); try { InetSocketAddress address = new InetSocketAddress(InetAddress.getByName(host), portnum); host = address.getHostName().toString(); return (host +":"+port); } catch( UnknownHostException e ){ return hostAndPort; } } static String getLocalHostAndPort(SocketChannel ch) { String socketString = ch.socket().getLocalSocketAddress().toString(); String[] split = socketString.split("/"); int length = split.length; String hostAndPort = split[length-1]; split = hostAndPort.split(":"); String host = split[0]; String port = split[1]; try { InetAddress localhost = InetAddress.getLocalHost(); host = localhost.getHostName(); return (host +":"+port); } catch( UnknownHostException e ){ return (host +":"+port); } } static void receive(SocketChannel channel, ByteBuffer command, int readsize) throws IOException { int count; if (command.capacity()!=readsize) { System.err.println("read size mismatch"+readsize+" and "+command.capacity()); } while(readsize>0) { if(false && IOHandler.debug){ System.out.println("reading packet..."+readsize); } count = channel.read(command); if(count<0) throw new IOException(); readsize -= count; } command.rewind(); } static ByteBuffer receivePacket(SocketChannel channel, ByteBuffer command) throws IOException { /** * Receive a command and data according to the command. */ receive(channel, command,LINDA_HEADER_SIZE); int datalen = command.getInt(LINDA_DATA_LENGTH_OFFSET); int packetlen = command.getInt(LINDA_PACKET_LENGTH_OFFSET); command.rewind(); if (datalen+LINDA_HEADER_SIZE-INT_SIZE!=packetlen) { System.err.println("Bad packet received. "+(datalen+LINDA_HEADER_SIZE-INT_SIZE)+"!="+packetlen); throw new IOException(); } ByteBuffer data = ByteBuffer.allocate(datalen); data.order(ByteOrder.BIG_ENDIAN); receive(channel, data, datalen); return data; } } /* end */