view src/main/java/christie/daemon/IncomingTcpConnection.java @ 24:0930b0554299

use DataGears
author Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
date Thu, 25 Jan 2018 23:02:02 +0900
parents 695705dba324
children 76fac42a840e
line wrap: on
line source

package christie.daemon;


import christie.codegear.CodeGearManager;
import christie.datagear.*;

import christie.datagear.Command.Command;
import christie.datagear.Command.CommandType;
import christie.datagear.RemoteMessage;
import christie.datagear.Command.RemoteTakeCommand;
import org.msgpack.MessagePack;
import org.msgpack.unpacker.Unpacker;

import java.io.IOException;
import java.nio.channels.ClosedChannelException;

public class IncomingTcpConnection extends Thread {

    RemoteDataGearManager manager;
    CodeGearManager cgm;
    Connection connection;
    private MessagePack packer = new MessagePack();

    public IncomingTcpConnection(Connection connection, CodeGearManager cgm) {
        this.connection = connection;
        this.cgm = cgm;
    }

    public void setManager(RemoteDataGearManager manager){
        this.manager = manager;
    }

    public void run() {
        Unpacker unpacker = null;
        try {
            unpacker = packer.createUnpacker(connection.socket.getInputStream());
        } catch (IOException e) {
            e.printStackTrace();
        }
        if (unpacker == null) {
            return;
        }
        while (true) {
            try {
                RemoteMessage msg = unpacker.read(RemoteMessage.class);
                CommandType type = CommandType.getCommandTypeFromId(msg.type);
                int dataSize = unpacker.readInt();//ここでとまる
                byte[] data = new byte[dataSize];
                switch (type) {
                    case PUT:
                        connection.socket.getInputStream().read(data);
                        try {
                            MessagePackDataGear dg = new MessagePackDataGear(data, Class.forName(msg.clazz));
                            cgm.getLocalDGM().put(msg.key, dg);
                        } catch (ClassNotFoundException e) {
                            e.printStackTrace();
                        }

                        break;

                    case REMOTEPEEK:
                    case REMOTETAKE:
                        RemoteTakeCommand cmd = null;
                        try {
                            cmd = new RemoteTakeCommand(msg.fromDgmName, msg.key, Class.forName(msg.clazz), connection);
                        } catch (ClassNotFoundException e) {
                            e.printStackTrace();
                        }

                        cgm.getLocalDGM().take(cmd);

                        break;
                    case REPLY://待っていたwaitListに渡してcsにセット
                        connection.socket.getInputStream().read(data);
                        try {
                            MessagePackDataGear dg = new MessagePackDataGear(data, Class.forName(msg.clazz));

                            DataGearManager dgm = cgm.getDGM(msg.fromDgmName);
                            Command cm = dgm.waitList.getAndRemoveCommand(msg.key);
                            cm.dg = dg;
                            dgm.runCommand(cm);

                        } catch (ClassNotFoundException e) {
                            e.printStackTrace();
                        }

                        break;
                    default:
                        break;
                }
            } catch (ClosedChannelException e) {
                return;
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }


}