view src/main/java/christie/daemon/IncomingTcpConnection.java @ 32:307ac87ddcf7

add TakeFrom annotation
author Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
date Thu, 15 Feb 2018 15:48:23 +0900
parents bf8ac57409af
children 671246274719
line wrap: on
line source

package christie.daemon;


import christie.codegear.CodeGearManager;
import christie.datagear.*;

import christie.datagear.command.Command;
import christie.datagear.command.CommandType;
import christie.datagear.RemoteMessage;
import christie.datagear.command.RemotePeekCommand;
import christie.datagear.command.RemoteTakeCommand;
import christie.datagear.dg.MessagePackDataGear;
import org.msgpack.MessagePack;
import org.msgpack.unpacker.Unpacker;

import java.io.IOException;
import java.nio.channels.ClosedChannelException;

public class IncomingTcpConnection extends Thread {

    RemoteDataGearManager manager;
    CodeGearManager cgm;
    Connection connection;
    private MessagePack packer = new MessagePack();

    public IncomingTcpConnection(Connection connection, CodeGearManager cgm) {
        this.connection = connection;
        this.cgm = cgm;
    }

    public void setManager(RemoteDataGearManager manager){
        this.manager = manager;
    }

    public void run() {
        Unpacker unpacker = null;
        try {
            unpacker = packer.createUnpacker(connection.socket.getInputStream());
        } catch (IOException e) {
            e.printStackTrace();
        }
        if (unpacker == null) {
            return;
        }
        while (true) {
            try {
                RemoteMessage msg = unpacker.read(RemoteMessage.class);
                CommandType type = CommandType.getCommandTypeFromId(msg.type);
                byte[] data;

                switch (type) {
                    case PUT:
                        data = new byte[unpacker.readInt()];
                        connection.socket.getInputStream().read(data);
                        try {
                            MessagePackDataGear dg = new MessagePackDataGear(data, Class.forName(msg.clazz));
                            cgm.getLocalDGM().put(msg.key, dg);
                        } catch (ClassNotFoundException e) {
                            e.printStackTrace();
                        }

                        break;

                    case REMOTEPEEK:
                        cgm.getLocalDGM().runCommand(new RemotePeekCommand(msg, connection));
                        break;
                    case REMOTETAKE:
                        cgm.getLocalDGM().runCommand(new RemoteTakeCommand(msg, connection));

                        break;
                    case REPLY://待っていたwaitListに渡してcsにセット
                        data = new byte[unpacker.readInt()];
                        connection.socket.getInputStream().read(data);

                        try {
                            MessagePackDataGear dg = new MessagePackDataGear(data, Class.forName(msg.clazz));
                            cgm.getDGM(msg.fromDgmName).resolveWaitCommand(msg.key, dg);
                        } catch (ClassNotFoundException e) {
                            e.printStackTrace();
                        }

                        break;
                    default:
                        break;
                }
            } catch (ClosedChannelException e) {
                return;
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }


}