52
|
1 package test;
|
|
2
|
|
3 import java.io.BufferedReader;
|
|
4 import java.io.IOException;
|
|
5 import java.io.InputStreamReader;
|
|
6 import java.util.concurrent.CountDownLatch;
|
|
7
|
|
8 public class MulticastQueue<T>
|
|
9 {
|
|
10 public static void main(String args[]) throws IOException
|
|
11 {
|
|
12 int threads = 5;
|
|
13 final MulticastQueue<String> queue = new MulticastQueue<String>();
|
|
14
|
|
15 Runnable type2 = new Runnable(){
|
|
16
|
|
17 public void run()
|
|
18 {
|
|
19 Client<String> client = queue.newClient();
|
|
20
|
|
21 for(;;){
|
|
22 String str = client.poll();
|
|
23 try {
|
|
24 Thread.sleep(10000);
|
|
25 } catch (InterruptedException e) {
|
|
26 // TODO Auto-generated catch block
|
|
27 e.printStackTrace();
|
|
28 }
|
|
29 System.out.println(Thread.currentThread().getName()+":"+str);
|
|
30 }
|
|
31 }
|
|
32 };
|
|
33
|
|
34 Runnable thread = new Runnable(){
|
|
35
|
|
36 public void run()
|
|
37 {
|
|
38 Client<String> client = queue.newClient();
|
|
39
|
|
40 for(;;){
|
|
41 String str = client.poll();
|
|
42 System.out.println(Thread.currentThread().getName()+":"+str);
|
|
43 }
|
|
44 }
|
|
45 };
|
|
46
|
|
47 for(int i = 0;i < threads;i ++){
|
|
48 new Thread(thread).start();
|
|
49 }
|
|
50 new Thread(type2).start();
|
|
51
|
|
52 BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
|
|
53 for(;;){
|
|
54 String str = br.readLine();
|
|
55 queue.put(str);
|
|
56 }
|
|
57 }
|
|
58
|
|
59
|
|
60 Node<T> tail;
|
|
61
|
|
62 public MulticastQueue()
|
|
63 {
|
|
64 tail = new Node<T>(null);
|
|
65 }
|
|
66
|
|
67 public synchronized void put(T item)
|
|
68 {
|
|
69 Node<T> next = new Node<T>(item);
|
|
70 tail.set(next);
|
|
71 tail = next;
|
|
72 }
|
|
73
|
|
74 public Client<T> newClient()
|
|
75 {
|
|
76 return new Client<T>(tail);
|
|
77 }
|
|
78
|
|
79 static class Client<T>
|
|
80 {
|
|
81 Node<T> node;
|
|
82
|
|
83 Client(Node<T> tail)
|
|
84 {
|
|
85 node = tail;
|
|
86 }
|
|
87
|
|
88 public T poll()
|
|
89 {
|
|
90 Node<T> next = null;
|
|
91
|
|
92 try {
|
|
93 next = node.next();
|
|
94 }catch(InterruptedException _e){
|
|
95 _e.printStackTrace();
|
|
96 }
|
|
97 node = next;
|
|
98 return next.item;
|
|
99 }
|
|
100 }
|
|
101
|
|
102 private static class Node<T>
|
|
103 {
|
|
104 private T item;
|
|
105 private Node<T> next;
|
|
106 private CountDownLatch latch;
|
|
107
|
|
108 public Node(T item)
|
|
109 {
|
|
110 this.item = item;
|
|
111 this.next = null;
|
|
112 latch = new CountDownLatch(1);
|
|
113 }
|
|
114
|
|
115 public void set(Node<T> next)
|
|
116 {
|
|
117 this.next = next;
|
|
118 latch.countDown();
|
|
119 }
|
|
120
|
|
121 public Node<T> next() throws InterruptedException
|
|
122 {
|
|
123 latch.await();
|
|
124 return next;
|
|
125 }
|
|
126 }
|
|
127 }
|