52
|
1 package test;
|
|
2
|
|
3 import java.io.BufferedReader;
|
|
4 import java.io.IOException;
|
|
5 import java.io.InputStreamReader;
|
|
6 import java.util.concurrent.CountDownLatch;
|
|
7
|
|
8 public class MulticastQueue<T>
|
|
9 {
|
|
10 public static void main(String args[]) throws IOException
|
|
11 {
|
|
12 int threads = 5;
|
|
13 final MulticastQueue<String> queue = new MulticastQueue<String>();
|
|
14
|
|
15 Runnable type2 = new Runnable(){
|
|
16
|
|
17 @Override
|
|
18 public void run()
|
|
19 {
|
|
20 Client<String> client = queue.newClient();
|
|
21
|
|
22 for(;;){
|
|
23 String str = client.poll();
|
|
24 try {
|
|
25 Thread.sleep(10000);
|
|
26 } catch (InterruptedException e) {
|
|
27 // TODO Auto-generated catch block
|
|
28 e.printStackTrace();
|
|
29 }
|
|
30 System.out.println(Thread.currentThread().getName()+":"+str);
|
|
31 }
|
|
32 }
|
|
33 };
|
|
34
|
|
35 Runnable thread = new Runnable(){
|
|
36
|
|
37 @Override
|
|
38 public void run()
|
|
39 {
|
|
40 Client<String> client = queue.newClient();
|
|
41
|
|
42 for(;;){
|
|
43 String str = client.poll();
|
|
44 System.out.println(Thread.currentThread().getName()+":"+str);
|
|
45 }
|
|
46 }
|
|
47 };
|
|
48
|
|
49 for(int i = 0;i < threads;i ++){
|
|
50 new Thread(thread).start();
|
|
51 }
|
|
52 new Thread(type2).start();
|
|
53
|
|
54 BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
|
|
55 for(;;){
|
|
56 String str = br.readLine();
|
|
57 queue.put(str);
|
|
58 }
|
|
59 }
|
|
60
|
|
61
|
|
62 Node<T> tail;
|
|
63
|
|
64 public MulticastQueue()
|
|
65 {
|
|
66 tail = new Node<T>(null);
|
|
67 }
|
|
68
|
|
69 public synchronized void put(T item)
|
|
70 {
|
|
71 Node<T> next = new Node<T>(item);
|
|
72 tail.set(next);
|
|
73 tail = next;
|
|
74 }
|
|
75
|
|
76 public Client<T> newClient()
|
|
77 {
|
|
78 return new Client<T>(tail);
|
|
79 }
|
|
80
|
|
81 static class Client<T>
|
|
82 {
|
|
83 Node<T> node;
|
|
84
|
|
85 Client(Node<T> tail)
|
|
86 {
|
|
87 node = tail;
|
|
88 }
|
|
89
|
|
90 public T poll()
|
|
91 {
|
|
92 Node<T> next = null;
|
|
93
|
|
94 try {
|
|
95 next = node.next();
|
|
96 }catch(InterruptedException _e){
|
|
97 _e.printStackTrace();
|
|
98 }
|
|
99 node = next;
|
|
100 return next.item;
|
|
101 }
|
|
102 }
|
|
103
|
|
104 private static class Node<T>
|
|
105 {
|
|
106 private T item;
|
|
107 private Node<T> next;
|
|
108 private CountDownLatch latch;
|
|
109
|
|
110 public Node(T item)
|
|
111 {
|
|
112 this.item = item;
|
|
113 this.next = null;
|
|
114 latch = new CountDownLatch(1);
|
|
115 }
|
|
116
|
|
117 public void set(Node<T> next)
|
|
118 {
|
|
119 this.next = next;
|
|
120 latch.countDown();
|
|
121 }
|
|
122
|
|
123 public Node<T> next() throws InterruptedException
|
|
124 {
|
|
125 latch.await();
|
|
126 return next;
|
|
127 }
|
|
128 }
|
|
129 }
|