view datagear/RemoteDataGearManager.cs @ 47:61ec3dd0995c

bug fixing
author riono <e165729@ie.u-ryukyu.ac.jp>
date Mon, 10 Jan 2022 00:45:32 +0900
parents 98ee1ee1efb7
children 476b6efeca5b
line wrap: on
line source

using System;
using System.Collections.Concurrent;
using System.Net;
using System.Net.Sockets;
using System.Threading;
using System.Threading.Tasks;
using Christie_net.codegear;
using Christie_net.daemon;
using Christie_net.datagear.command;
using Christie_net.datagear.dg;
using Christie_net.Test.Example.RemoteTake;

namespace Christie_net.datagear {
public class RemoteDataGearManager : DataGearManager {
    private Connection connection;
    private CodeGearManager cgm;
    private bool connect = false;
    private object syncObj = new object();
    
    public RemoteDataGearManager (string dgmname ,string address, int port, CodeGearManager cgm) {
        this.cgm = cgm;
        RemoteDataGearManager manager = this;
        Thread thread = new Thread(() => {
            do {
                try {
                    IPHostEntry host = Dns.GetHostEntry(address);
                    IPAddress ipAddress = host.AddressList[0];
                    IPEndPoint remoteEndPoint = new IPEndPoint(ipAddress, port);
                    Socket socket = new Socket(ipAddress.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
                    socket.Connect(remoteEndPoint);
                    socket.NoDelay = true;
                    
                    connection = new Connection(socket, cgm);
                    connection.name = dgmname;

                    // Debug
                    //Console.WriteLine("connect:" + connection);

                    lock (syncObj) {
                        connect = true;
                        Monitor.Pulse(syncObj);
                    }
                } catch(Exception e) {
                    Console.WriteLine(e.ToString());
                }
            } while (!connect);

            IncomingTcpConnection incoming = new IncomingTcpConnection(connection);
            incoming.SetManager(manager);
            Thread incomingThread = new Thread(incoming.Run);
            incomingThread.Name = dgmname + "-IncomingTcp";
            //incomingThread.Priority = ThreadPriority.Highest;
            incomingThread.Start();
            
            OutboundTcpConnection outbound = new OutboundTcpConnection(connection);
            Thread outboundThread = new Thread(outbound.Run);
            outboundThread.Name = dgmname + "-OutboundTcp";
            //outboundThread.Priority = ThreadPriority.Highest;
            outboundThread.Start();
        });
        thread.Name = "Connect-" + dgmname;
        thread.Start();
    }
    
    public override void Put(string key, object data) {
        Command cm = new CommandBuilder().Init(CommandType.PUT).Key(key)
            .Dg(new DataGear<object>(data)).Build();

        // TODO: javaの方ではconnectがnullになってしまうときがあるらしい
        // コンストラクタで呼び出されるThreadをやめて実効すればいいらしい
        if (!connect) {
            ConnectWait();
        }
       
        // Debug
        //Console.WriteLine("connect:" + connection.name);
        //Console.WriteLine("data:" + cmd.command);
        
        connection.Write(cm);
    }

    public override void RunCommand(Command cm) {
        waitList.Add(cm);
        CommandType type = cm.type;
        switch (cm.type) {
            case CommandType.PEEK:
                type = CommandType.REMOTEPEEK;
                break;
            case CommandType.TAKE:
                type = CommandType.REMOTETAKE;
                break;
        }

        Command remoteCmd = new CommandBuilder().Init(type).FromDgmName(connection.name).Key(cm.key)
            .Clazz(cm.clazz).Connection(connection).Build();

        connection.Write(remoteCmd);
    }

    public override void ResolveWaitCommand(string key, DataGear<object> dg) {
        Command cm = waitList.GetAndRemoveCommand(key);
        cm.SetDg(dg);
        cm.Execute();
    }

    public override void Finish() {
        Command cm = new CommandBuilder().Init(CommandType.FINISH).Build();
        connection.SendCommand(cm);
    }

    public override void Close() {
        Command cm = new CommandBuilder().Init(CommandType.CLOSE).Connection(connection).Build();
        connection.SendCommand(cm);
    }

    public override void Shutdown() {
        connection.Close();
        BlockingCollection<Command> queue = connection.sendQueue;
        if (queue.Count == 0) {
            queue.Dispose();
        }
    }

    public void ConnectWait() {
        lock (syncObj) {
            while (!connect) {
                try {
                    Monitor.Wait(syncObj);
                } catch {}
            }
        }
    }
}
}