view src/main/java/alice/datasegment/RemoteDataSegmentManager.java @ 445:86b74532e66c dispose

change Protocol
author sugi
date Sun, 26 Oct 2014 18:21:48 +0900
parents aefbe41fcf12
children a91890dff56e
line wrap: on
line source

package alice.datasegment;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SocketChannel;

import org.apache.log4j.Logger;

import alice.codesegment.CodeSegment;
import alice.daemon.Connection;
import alice.daemon.IncomingTcpConnection;
import alice.daemon.OutboundTcpConnection;

public class RemoteDataSegmentManager extends DataSegmentManager {

    Connection connection;
    Logger logger;

    public RemoteDataSegmentManager(){}

    public RemoteDataSegmentManager(String connectionKey, final String reverseKey, final String hostName, final int port) {
        logger = Logger.getLogger(connectionKey);
        connection = new Connection();
        final RemoteDataSegmentManager manager = this;
        new Thread(replyThread, "RemoteDataSegmentManager-" + connectionKey).start();
        new Thread("Connect-" + connectionKey) {
            public void run() {
                boolean connect = true;
                do {
                    try {
                        SocketChannel sc = SocketChannel.open(new InetSocketAddress(hostName, port));
                        connection.socket = sc.socket();
                        connection.socket.setTcpNoDelay(true);
                        connect = false;
                        logger.info("Connect to " + connection.getInfoString());
                    } catch (IOException e) {
                        try {
                            Thread.sleep(50);
                        } catch (InterruptedException e1) {
                            e1.printStackTrace();
                        }
                    }
                } while (connect);
                new IncomingTcpConnection(connection, manager, reverseKey).start();
                new OutboundTcpConnection(connection).start();				
            }
        }.start();
    }

    /**
     * send put command to target DataSegment
     */
    @Override
    public void put(String key, Object val) {
        Command cmd = new Command(CommandType.PUT, null, key, val, 0, 0, null, null, null);
        connection.sendCommand(cmd); // put command on the transmission thread
        if (logger.isDebugEnabled())
            logger.debug(cmd.getCommandString());
    }
    
    public void put(String key, byte[] val) {
        Command cmd = new Command(CommandType.PUT, null, key, val, 0, 0, null, null, null);
        cmd.setSerializeFlag(false);
        connection.sendCommand(cmd); // put command on the transmission thread
        if (logger.isDebugEnabled())
            logger.debug(cmd.getCommandString());
    }

    @Override
    public void quickPut(String key, Object val) {
        Command cmd = new Command(CommandType.PUT, null, key, val, 0, 0, null, null, null);
        connection.write(cmd); // put command is executed right now
        if (logger.isDebugEnabled())
            logger.debug(cmd.getCommandString());
    }

    @Override
    public void update(String key, Object val) {
        Command cmd = new Command(CommandType.UPDATE, null, key, val, 0, 0, null, null, null);
        connection.sendCommand(cmd);
        if (logger.isDebugEnabled())
            logger.debug(cmd.getCommandString());
    }

    @Override
    public void quickUpdate(String key, Object val) {
        Command cmd = new Command(CommandType.UPDATE, null, key, val, 0, 0, null, null, null);
        connection.write(cmd);
        if (logger.isDebugEnabled())
            logger.debug(cmd.getCommandString());
    }

    @Override
    public void take(Receiver receiver, CodeSegment cs) {
        int seq = this.seq.getAndIncrement();
        Command cmd = new Command(CommandType.TAKE, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null);
        seqHash.put(seq, cmd);
        connection.sendCommand(cmd);
        if (logger.isDebugEnabled())
            logger.debug(cmd.getCommandString());
    }

    public void quickTake(Receiver receiver, CodeSegment cs) {
        int seq = this.seq.getAndIncrement();
        Command cmd = new Command(CommandType.TAKE, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null, true);
        seqHash.put(seq, cmd);
        connection.write(cmd);
        if (logger.isDebugEnabled())
            logger.debug(cmd.getCommandString());
    }

    @Override
    public void peek(Receiver receiver, CodeSegment cs) {
        int seq = this.seq.getAndIncrement();
        Command cmd = new Command(CommandType.PEEK, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null);
        seqHash.put(seq, cmd);
        connection.sendCommand(cmd);
        if (logger.isDebugEnabled())
            logger.debug(cmd.getCommandString());
    }

    public void quickPeek(Receiver receiver, CodeSegment cs) {
        int seq = this.seq.getAndIncrement();
        Command cmd = new Command(CommandType.PEEK, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null, true);
        seqHash.put(seq, cmd);
        connection.write(cmd);
        if (logger.isDebugEnabled())
            logger.debug(cmd.getCommandString());

    }

    @Override
    public void remove(String key) {
        Command cmd = new Command(CommandType.REMOVE, null, key, null, 0, 0, null, null, null);
        connection.sendCommand(cmd);
        if (logger.isDebugEnabled())
            logger.debug(cmd.getCommandString());
    }

    @Override
    public void finish() {
        Command cmd = new Command(CommandType.FINISH, null, null, null, 0, 0, null, null, null);
        connection.sendCommand(cmd);
    }

    @Override
    public void ping(String returnKey) {
        Command cmd = new Command(CommandType.PING, null, returnKey, null, 0, 0, null, null, null);
        connection.write(cmd);
    }

    @Override
    public void response(String returnKey) {
        Command cmd = new Command(CommandType.RESPONSE, null, returnKey, null, 0, 0, null, null, null);
        connection.write(cmd);
    }

    @Override
    public void close() {
        Command cmd = new Command(CommandType.CLOSE, null, null, null, 0, 0, null, null, null);
        connection.sendCommand(cmd);
    }

    @Override
    public void shutdown() {
        connection.close();
    }


}