12
|
1 package jp.ac.u_ryukyu.treevnc;
|
3
|
2
|
74
|
3 import java.nio.ByteBuffer;
|
3
|
4 import java.util.concurrent.CountDownLatch;
|
|
5
|
|
6 public class MulticastQueue<T>
|
|
7 {
|
86
|
8
|
3
|
9 Node<T> tail;
|
|
10
|
86
|
11 /**
|
|
12 * Multicastcast Queue
|
|
13 * Pass a data to multiple clients.
|
|
14 * element Node T
|
|
15 * Another time out thread should be used to limit the total size.
|
|
16 */
|
3
|
17 public MulticastQueue()
|
|
18 {
|
|
19 tail = new Node<T>(null);
|
|
20 }
|
74
|
21
|
|
22 /**
|
|
23 * @param size
|
|
24 * @return
|
|
25 *
|
|
26 * try to allocate byteBuffer.
|
|
27 * wait until heap is available.
|
|
28 */
|
|
29 public ByteBuffer allocate(int size)
|
|
30 {
|
|
31 ByteBuffer b=null;
|
|
32 while(true){
|
76
|
33 try {
|
74
|
34 b = ByteBuffer.allocate(size);
|
76
|
35 } catch (OutOfMemoryError e) {
|
|
36 b = null;
|
|
37 System.err.println("multicastqueue : wait for heap : " + e);
|
74
|
38 }
|
76
|
39 if (b!=null) {
|
74
|
40 break;
|
|
41 }
|
|
42 try {
|
|
43 wait();
|
|
44 } catch (InterruptedException e) {
|
|
45 }
|
|
46 }
|
|
47 return b;
|
|
48 }
|
3
|
49
|
74
|
50 public synchronized void heapAvailable() {
|
|
51 notifyAll();
|
|
52 }
|
|
53
|
86
|
54 /**
|
|
55 * put item to the queue
|
|
56 * all client threads start read it
|
|
57 * @param item
|
|
58 */
|
3
|
59 public synchronized void put(T item)
|
|
60 {
|
|
61 Node<T> next = new Node<T>(item);
|
|
62 tail.set(next);
|
|
63 tail = next;
|
|
64 }
|
|
65
|
86
|
66 /**
|
|
67 * register new clients. Clients read this queue, if all clients read the queue, item is removed
|
|
68 * @return
|
|
69 */
|
3
|
70 public Client<T> newClient()
|
|
71 {
|
|
72 return new Client<T>(tail);
|
|
73 }
|
|
74
|
86
|
75 /**
|
|
76 * @author kono
|
|
77 * Inner Client class
|
|
78 * @param <T>
|
|
79 */
|
4
|
80 public static class Client<T>
|
3
|
81 {
|
|
82 Node<T> node;
|
|
83
|
|
84 Client(Node<T> tail)
|
|
85 {
|
|
86 node = tail;
|
|
87 }
|
|
88
|
86
|
89 /**
|
|
90 * try to read next item, if not available, wait for the next item
|
|
91 * All clients wait for a CountDownLatch in the next item.
|
|
92 * set operation count down it, and all clients get the item.
|
|
93 * @return
|
|
94 */
|
|
95 public T poll()
|
3
|
96 {
|
|
97 Node<T> next = null;
|
|
98 T item = null;
|
|
99 do {
|
|
100 try {
|
|
101 next = node.next();
|
|
102 }catch(InterruptedException _e){
|
|
103 continue;
|
|
104 }
|
|
105 item = next.getItem();
|
|
106 node = next;
|
|
107 } while ( item == null);
|
|
108 return item;
|
|
109 }
|
|
110 }
|
|
111
|
|
112 static class Node<T>
|
|
113 {
|
|
114 private T item;
|
|
115 private Node<T> next;
|
|
116 private CountDownLatch latch;
|
|
117
|
|
118 public Node(T item)
|
|
119 {
|
|
120 this.item = item;
|
|
121 this.next = null;
|
|
122 latch = new CountDownLatch(1);
|
|
123 }
|
|
124
|
86
|
125 public T getItem() {
|
3
|
126 return item;
|
|
127 }
|
|
128
|
|
129 public void set(Node<T> next)
|
|
130 {
|
|
131 this.next = next;
|
|
132 latch.countDown();
|
|
133 }
|
|
134
|
|
135 public Node<T> next() throws InterruptedException
|
|
136 {
|
|
137 latch.await();
|
|
138 return next;
|
|
139 }
|
|
140
|
86
|
141 public void clear() {
|
3
|
142 item = null;
|
|
143 }
|
|
144 }
|
|
145 }
|