Mercurial > hg > Database > Christie-sharp
view datagear/RemoteDataGearManager.cs @ 34:1236da135f79
update
author | riono <e165729@ie.u-ryukyu.ac.jp> |
---|---|
date | Tue, 27 Apr 2021 22:57:14 +0900 |
parents | 7575980bffc9 |
children | 090be804eaa9 |
line wrap: on
line source
using System; using System.Collections.Concurrent; using System.Net; using System.Net.Sockets; using System.Threading; using System.Threading.Tasks; using Christie_net.codegear; using Christie_net.daemon; using Christie_net.datagear.command; using Christie_net.datagear.dg; namespace Christie_net.datagear { public class RemoteDataGearManager : DataGearManager { private Connection connection; private CodeGearManager cgm; private bool connect = false; private object syncObj = new object(); public RemoteDataGearManager (string dgmname ,string address, int port, CodeGearManager cgm) { this.cgm = cgm; RemoteDataGearManager manager = this; Thread thread = new Thread(() => { do { try { IPHostEntry host = Dns.GetHostEntry(address); IPAddress ipAddress = host.AddressList[0]; IPEndPoint remoteEndPoint = new IPEndPoint(ipAddress, port); Socket socket = new Socket(ipAddress.AddressFamily, SocketType.Stream, ProtocolType.Tcp); socket.Connect(remoteEndPoint); socket.NoDelay = true; connection = new Connection(socket, cgm); connection.name = dgmname; // Debug //Console.WriteLine("connect:" + connection); lock (syncObj) { connect = true; Monitor.Pulse(syncObj); } } catch(Exception e) { Console.WriteLine(e.StackTrace); } } while (!connect); IncomingTcpConnection incoming = new IncomingTcpConnection(connection); incoming.SetManager(manager); Thread incomingThread = new Thread(incoming.Run); incomingThread.Name = dgmname + "-IncomingTcp"; incomingThread.Priority = ThreadPriority.Highest; incomingThread.Start(); OutboundTcpConnection outbound = new OutboundTcpConnection(connection); Thread outboundThread = new Thread(outbound.Run); outboundThread.Name = dgmname + "-OutboundTcp"; outboundThread.Priority = ThreadPriority.Highest; outboundThread.Start(); // Task.Factory.StartNew( // () => outbound.Run(), // TaskCreationOptions.LongRunning); }); thread.Name = "Connect-" + dgmname; thread.Start(); } public override void Put(string key, object data) { Command cm = new CommandBuilder().Init(CommandType.PUT).Key(key) .Dg(new DataGear<object>(data)).Build(); // TODO: javaの方ではconnectがnullになってしまうときがあるらしい // コンストラクタで呼び出されるThreadをやめて実効すればいいらしい if (!connect) { ConnectWait(); } // Debug //Console.WriteLine("coneect:" + connection); connection.Write(cm); } public override void RunCommand(Command cm) { waitList.Add(cm); CommandType type = cm.type; switch (cm.type) { case CommandType.PEEK: type = CommandType.REMOTEPEEK; break; case CommandType.TAKE: type = CommandType.REMOTETAKE; break; } Command remoteCmd = new CommandBuilder().Init(type).FromDgmName(connection.name).Key(cm.key) .Clazz(cm.clazz).Connection(connection).Build(); connection.Write(remoteCmd); } public override void ResolveWaitCommand(string key, DataGear<object> dg) { Command cm = waitList.GetAndRemoveCommand(key); cm.SetDg(dg); cm.Execute(); } public override void Finish() { Command cm = new CommandBuilder().Init(CommandType.FINISH).Build(); connection.SendCommand(cm); } public override void Close() { Command cm = new CommandBuilder().Init(CommandType.CLOSE).Connection(connection).Build(); connection.SendCommand(cm); } public override void Shutdown() { connection.Close(); BlockingCollection<Command> queue = connection.sendQueue; if (queue.Count == 0) { queue.Dispose(); } } public void ConnectWait() { lock (syncObj) { while (!connect) { try { Monitor.Wait(syncObj); } catch {} } } } } }