360
|
1 package alice.datasegment;
|
|
2
|
|
3 import java.io.IOException;
|
|
4 import java.net.InetAddress;
|
361
|
5 import java.net.InetSocketAddress;
|
360
|
6 import java.net.NetworkInterface;
|
361
|
7 import java.net.SocketAddress;
|
360
|
8 import java.net.StandardProtocolFamily;
|
|
9 import java.net.StandardSocketOptions;
|
|
10 import java.nio.channels.DatagramChannel;
|
361
|
11
|
360
|
12 import org.apache.log4j.Logger;
|
|
13
|
|
14 import alice.codesegment.CodeSegment;
|
361
|
15 import alice.daemon.IncomingUdpConnection;
|
|
16 import alice.daemon.MulticastConnection;
|
|
17 import alice.daemon.OutboundTcpConnection;
|
360
|
18
|
|
19 public class MulticastDataSegmentManager extends DataSegmentManager {
|
361
|
20 MulticastConnection sender;
|
360
|
21 Logger logger;
|
|
22
|
|
23 public MulticastDataSegmentManager(final String MCASTADDR, final int port, final String nis) {
|
|
24 logger = Logger.getLogger("multicast");
|
361
|
25 InetAddress mAddr;
|
|
26 try {
|
|
27 mAddr = InetAddress.getByName(MCASTADDR);
|
|
28
|
|
29 DatagramChannel dcr = createDatagramChannel(mAddr, port, nis);
|
|
30 dcr.bind(new InetSocketAddress(port));
|
|
31 SocketAddress sAddrr = new InetSocketAddress(mAddr,port);
|
|
32 MulticastConnection receiver = new MulticastConnection(dcr, sAddrr);
|
|
33 new IncomingUdpConnection(receiver).start();
|
|
34
|
|
35 DatagramChannel dcs = createDatagramChannel(mAddr, port, nis);
|
|
36 SocketAddress sAddrs = new InetSocketAddress(mAddr,port);
|
|
37 sender = new MulticastConnection(dcs, sAddrs);
|
|
38 new OutboundTcpConnection(sender).start(); // OutboundUdpConnection sender
|
|
39
|
|
40 } catch (Exception e) {
|
|
41 e.printStackTrace();
|
|
42 }
|
360
|
43
|
|
44 }
|
|
45
|
361
|
46 private DatagramChannel createDatagramChannel(InetAddress group, int port, String nis) {
|
360
|
47 DatagramChannel dc = null;
|
|
48 NetworkInterface ni;
|
|
49 try {
|
|
50 ni = NetworkInterface.getByName(nis);
|
|
51 if (ni==null) {
|
|
52 System.err.println("Can't open network interface "+nis);
|
|
53 throw new IOException();
|
|
54 }
|
|
55 if (!ni.supportsMulticast()) {
|
|
56 System.err.println("Network interface does not support multicast"+nis);
|
|
57 throw new IOException();
|
|
58 }
|
|
59
|
|
60 dc = DatagramChannel.open(StandardProtocolFamily.INET);
|
|
61 dc.setOption(StandardSocketOptions.SO_REUSEADDR, true);
|
|
62 dc.setOption(StandardSocketOptions.IP_MULTICAST_IF, ni);
|
|
63 dc.join(group, ni);
|
|
64 } catch (Exception e) {
|
|
65 e.printStackTrace();
|
|
66 }
|
|
67 return dc;
|
|
68 }
|
|
69
|
|
70 @Override
|
|
71 public void put(String key, Object val) {
|
364
|
72 Command cmd = new Command(CommandType.PUT, null, key, val, 0, 0, null, null, null);
|
|
73 sender.sendCommand(cmd); // put command on the transmission thread
|
|
74 if (logger.isDebugEnabled())
|
|
75 logger.debug(cmd.getCommandString());
|
360
|
76 }
|
|
77
|
|
78 @Override
|
|
79 public void update(String key, Object val) {
|
|
80
|
|
81 }
|
|
82
|
|
83 @Override
|
|
84 public void take(Receiver receiver, CodeSegment cs) {}
|
|
85 @Override
|
|
86 public void peek(Receiver receiver, CodeSegment cs) {}
|
|
87
|
|
88 @Override
|
|
89 public void quickPut(String key, Object val) {
|
|
90
|
|
91 }
|
|
92
|
|
93 @Override
|
|
94 public void quickUpdate(String key, Object val) {
|
|
95
|
|
96 }
|
|
97
|
|
98 @Override
|
|
99 public void quickPeek(Receiver receiver, CodeSegment cs) {}
|
|
100 @Override
|
|
101 public void quickTake(Receiver receiver, CodeSegment cs) {}
|
|
102
|
|
103 @Override
|
|
104 public void remove(String key) {
|
|
105
|
|
106 }
|
|
107
|
|
108 @Override
|
|
109 public void shutdown() {
|
|
110
|
|
111 }
|
|
112
|
|
113 @Override
|
|
114 public void close() {
|
|
115
|
|
116 }
|
|
117
|
|
118 @Override
|
|
119 public void finish() {
|
|
120
|
|
121 }
|
|
122
|
|
123 @Override
|
|
124 public void ping(String returnKey) {}
|
|
125
|
|
126 @Override
|
|
127 public void response(String returnKey) {}
|
|
128
|
|
129 }
|