Mercurial > hg > FederatedLinda
view src/fdl/PSXLindaImpl.java @ 90:9cdc24bae625
ring test
author | one |
---|---|
date | Sat, 13 Feb 2010 04:16:15 +0900 |
parents | b658ff1eb90f |
children | 7bf2eeea23a0 |
line wrap: on
line source
/* * @(#)PSXLinda.java 1.1 06/04/01 * * Copyright 2006 Shinji KONO * PSX Lidna Trasport layer of PSX Linda library */ package fdl; import java.io.IOException; import java.net.InetSocketAddress; import java.net.Socket; import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.nio.channels.ClosedChannelException; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.logging.Level; /** PSXLinda * * @author Shinji Kono * * @param mytsid Tuple Space ID Initialize connection channel for a tuple space one instance for each Tuple space connection */ public class PSXLindaImpl implements PSXLinda,TupleHandler { private FederatedLinda fdl; SocketChannel socketChannel; public String host; public int port; public int mytsid; public PSXLinda next; static final boolean debug = false; public PSXLindaImpl(FederatedLinda _fdl,Selector selector,int _mytsid,String _host,int _port) throws IOException { host = _host; port = _port; mytsid = _mytsid; fdl = _fdl; socketChannel = SocketChannel.open(); socketChannel.configureBlocking(false); // Selector needs this Socket socket = socketChannel.socket(); // socket.setReuseAddress(true); Client side don't need this. socket.setTcpNoDelay(true); socketChannel.connect(new InetSocketAddress(_host, _port)); while (! socketChannel.finishConnect()) { if (debug) { fdl.log(Level.INFO,"waiting for connect"); } } fdl.log(Level.INFO,"Linda client connect to "+socketChannel); socketChannel.register(selector,SelectionKey.OP_READ,this); checkConnect("PSXLinda"); } public void handle(SelectionKey key) throws ClosedChannelException, IOException { SocketChannel sock = (SocketChannel)key.channel(); if (sock!=socketChannel) { fdl.log(Level.SEVERE,"wrong socket on PSXLindaImple."); } ByteBuffer command = ByteBuffer.allocate(PSX.LINDA_HEADER_SIZE); command.order(ByteOrder.BIG_ENDIAN); ByteBuffer data = PSX.receivePacket(sock, command); if (debug) { PSX.printCommand("chkServe:",command, data); } int rseq = command.getInt(PSX.LINDA_SEQ_OFFSET); int mode = command.get(PSX.LINDA_MODE_OFFSET); PSXReply r = fdl.getReply(rseq); if (r==null) { fdl.log(Level.SEVERE,"Illegal answer sequence."); return; } r.setAnswer(mode,command,data); if (r.callback != null ) { r.callback.callback(data); } } protected void finalize() { if (socketChannel != null) { try { socketChannel.close(); } catch (IOException e) { } } } public void close() { if (socketChannel != null) { try { socketChannel.close(); } catch (IOException e) { } } } private void checkConnect(String s) { fdl.log(Level.INFO, "Connected:"+ s +": " +socketChannel.isConnected()); } public PSXReply in(int id) { PSXReply r = fdl.psx_queue(this, id, null, PSX.PSX_IN, (PSXCallback)null); return r; } public void in(int id, PSXCallback callback) { fdl.psx_queue(this, id, null, PSX.PSX_IN, callback); } public PSXReply ck(int id) { PSXReply r = fdl.psx_queue(this, id, null, PSX.PSX_IN, null); return r; } public void ck(int id, PSXCallback callback) { fdl.psx_queue(this, id, null, PSX.PSX_IN, callback); } public PSXReply out(int id, ByteBuffer data) { PSXReply r = fdl.psx_queue(this, id, data, PSX.PSX_OUT, null); return r; } public PSXReply update(int id, ByteBuffer data) { PSXReply r = fdl.psx_queue(this, id, data, PSX.PSX_UPDATE, null); return r; } public void update(int id, ByteBuffer data,PSXCallback callback) { fdl.psx_queue(this, id, data, PSX.PSX_UPDATE, callback); } public PSXReply rd(int id) { PSXReply r = fdl.psx_queue(this, id, null, PSX.PSX_RD, null); return r; } public void rd(int id, PSXCallback callback) { fdl.psx_queue(this, id, null, PSX.PSX_RD, callback); } public PSXLinda add(PSXLinda linda) { next = linda; return this; } public int sync() throws IOException { return fdl.sync(); } public int sync(long mtime) throws IOException { return fdl.sync(mtime); } public void send(ByteBuffer command, ByteBuffer data) { PSX.send(socketChannel, command, data); } } /* end */