Mercurial > hg > Database > Alice
view src/main/java/alice/datasegment/MulticastDataSegmentManager.java @ 366:abc54fa0c81b multicast
MulticastDataSegment's extend class change from DataSegmentManager from LocalDataSegmentManager
author | sugi |
---|---|
date | Sat, 17 May 2014 21:34:01 +0900 |
parents | 1494d44392a2 |
children | 0c24894db37e |
line wrap: on
line source
package alice.datasegment; import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.NetworkInterface; import java.net.SocketAddress; import java.net.StandardProtocolFamily; import java.net.StandardSocketOptions; import java.nio.channels.DatagramChannel; import org.apache.log4j.Logger; import alice.daemon.IncomingUdpConnection; import alice.daemon.MulticastConnection; import alice.daemon.OutboundTcpConnection; public class MulticastDataSegmentManager extends LocalDataSegmentManager { private MulticastConnection sender; private Logger logger; public MulticastDataSegmentManager(String connectionKey ,final String MCASTADDR, final int port, final String nis) { super.setReverseKey(connectionKey); logger = Logger.getLogger(connectionKey); InetAddress mAddr; try { mAddr = InetAddress.getByName(MCASTADDR); DatagramChannel dcr = createDatagramChannel(mAddr, port, nis); dcr.bind(new InetSocketAddress(port)); SocketAddress sAddrr = new InetSocketAddress(mAddr,port); MulticastConnection receiver = new MulticastConnection(dcr, sAddrr); new IncomingUdpConnection(receiver).start(); DatagramChannel dcs = createDatagramChannel(mAddr, port, nis); SocketAddress sAddrs = new InetSocketAddress(mAddr,port); sender = new MulticastConnection(dcs, sAddrs); new OutboundTcpConnection(sender).start(); // OutboundUdpConnection sender } catch (Exception e) { e.printStackTrace(); } } private DatagramChannel createDatagramChannel(InetAddress group, int port, String nis) { DatagramChannel dc = null; NetworkInterface ni; try { ni = NetworkInterface.getByName(nis); if (ni==null) { System.err.println("Can't open network interface "+nis); throw new IOException(); } if (!ni.supportsMulticast()) { System.err.println("Network interface does not support multicast"+nis); throw new IOException(); } dc = DatagramChannel.open(StandardProtocolFamily.INET); dc.setOption(StandardSocketOptions.SO_REUSEADDR, true); dc.setOption(StandardSocketOptions.IP_MULTICAST_IF, ni); dc.join(group, ni); } catch (Exception e) { e.printStackTrace(); } return dc; } @Override public void put(String key, Object val) { Command cmd = new Command(CommandType.PUT, null, key, val, 0, 0, null, null, null); sender.sendCommand(cmd); // put command on the transmission thread if (logger.isDebugEnabled()) logger.debug(cmd.getCommandString()); } @Override public void update(String key, Object val) { Command cmd = new Command(CommandType.UPDATE, null, key, val, 0, 0, null, null, null); sender.sendCommand(cmd); if (logger.isDebugEnabled()) logger.debug(cmd.getCommandString()); } @Override public void quickPut(String key, Object val) { Command cmd = new Command(CommandType.PUT, null, key, val, 0, 0, null, null, null); sender.write(cmd); // put command is executed right now } @Override public void quickUpdate(String key, Object val) { Command cmd = new Command(CommandType.UPDATE, null, key, val, 0, 0, null, null, null); sender.write(cmd); } }