view src/main/java/christie/daemon/IncomingTcpConnection.java @ 22:77583ea56656

add WaitList and implement RemoteTake but not work
author Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
date Tue, 23 Jan 2018 22:02:54 +0900
parents 5baccb8f7fbd
children 695705dba324
line wrap: on
line source

package christie.daemon;


import christie.codegear.CodeGearManager;
import christie.datagear.*;

import org.msgpack.MessagePack;
import org.msgpack.unpacker.Unpacker;

import java.io.IOException;
import java.nio.channels.ClosedChannelException;
import java.util.concurrent.ConcurrentHashMap;

public class IncomingTcpConnection extends Thread {

    RemoteDataGearManager manager;
    ConcurrentHashMap<Integer, CodeGearManager> cgms;
    Connection connection;
    private MessagePack packer = new MessagePack();

    public IncomingTcpConnection(Connection connection, CodeGearManager cgm) {
        this.connection = connection;
        this.cgms = cgm.getCgms();
    }

    public void setManager(RemoteDataGearManager manager){
        this.manager = manager;
    }

    public void run() {
        Unpacker unpacker = null;
        try {
            unpacker = packer.createUnpacker(connection.socket.getInputStream());
        } catch (IOException e) {
            e.printStackTrace();
        }
        if (unpacker == null) {
            return;
        }
        while (true) {
            try {
                Command cmd = null;
                RemoteMessage msg = unpacker.read(RemoteMessage.class);
                CommandType type = CommandType.getCommandTypeFromId(msg.type);
                int dataSize = unpacker.readInt();
                byte[] data = new byte[dataSize];
                switch (type) {
                    case PUT:
                        connection.socket.getInputStream().read(data);
                        try {
                            MessagePackDataGear dg = new MessagePackDataGear(data, Class.forName(msg.clazz));

                            if (cgms.containsKey(msg.cgmID)){
                                cgms.get(msg.cgmID).getLocalDGM().put(msg.key, dg);
                            } else {
                                throw new IllegalArgumentException("DGM_ID:" + msg.cgmID + "is not found");
                            }

                        } catch (ClassNotFoundException e) {
                            e.printStackTrace();
                        }

                        break;

                    case PEEK:
                    case TAKE:
                        cmd = new Command(type, null, msg.cgmID, msg.dgmName, msg.key, null, null, connection);

                        if (cgms.containsKey(msg.cgmID)){
                            cgms.get(msg.cgmID).getLocalDGM().take(cmd);
                        } else {
                            throw new IllegalArgumentException("DGM_ID:" + msg.cgmID + "is not found");
                        }

                        break;
                    case REPLY://待っていたwaitListに渡してcsにセット
                        connection.socket.getInputStream().read(data);
                        try {
                            MessagePackDataGear dg = new MessagePackDataGear(data, Class.forName(msg.clazz));

                            DataGearManager dgm = cgms.get(msg.cgmID).getDGM(msg.dgmName);
                            Command cm = dgm.waitList.getAndRemoveCommand(msg.key);
                            cm.dg = dg;
                            dgm.runCommand(cm);

                        } catch (ClassNotFoundException e) {
                            e.printStackTrace();
                        }

                        break;
                    default:
                        break;
                }
            } catch (ClosedChannelException e) {
                return;
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }


}