comparison src/queue/MulticastQueue.java @ 1:15920e9b562d

added MulticastQueue.java the BlockingMulticastQueue
author Shoshi TAMAKI <shoshi@cr.ie.u-ryukyu.ac.jp>
date Tue, 09 Aug 2011 11:13:36 +0900
parents
children
comparison
equal deleted inserted replaced
0:a9cb12a7f995 1:15920e9b562d
1 package queue;
2
3 import java.io.BufferedReader;
4 import java.io.IOException;
5 import java.io.InputStreamReader;
6 import java.util.concurrent.CountDownLatch;
7
8 public class MulticastQueue<T>
9 {
10 public static void main(String args[]) throws IOException
11 {
12 int threads = 5;
13 final MulticastQueue<String> queue = new MulticastQueue<String>();
14
15 Runnable type2 = new Runnable(){
16
17 @Override
18 public void run()
19 {
20 Client<String> client = queue.newClient();
21
22 for(;;){
23 String str = client.poll();
24 try {
25 Thread.sleep(10000);
26 } catch (InterruptedException e) {
27 // TODO Auto-generated catch block
28 e.printStackTrace();
29 }
30 System.out.println(Thread.currentThread().getName()+":"+str);
31 }
32 }
33 };
34
35 Runnable thread = new Runnable(){
36
37 @Override
38 public void run()
39 {
40 Client<String> client = queue.newClient();
41
42 for(;;){
43 String str = client.poll();
44 System.out.println(Thread.currentThread().getName()+":"+str);
45 }
46 }
47 };
48
49 for(int i = 0;i < threads;i ++){
50 new Thread(thread).start();
51 }
52 new Thread(type2).start();
53
54 BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
55 for(;;){
56 String str = br.readLine();
57 queue.put(str);
58 }
59 }
60
61
62 Node<T> tail;
63
64 public MulticastQueue()
65 {
66 tail = new Node<T>(null);
67 }
68
69 public synchronized void put(T item)
70 {
71 Node<T> next = new Node<T>(item);
72 tail.set(next);
73 tail = next;
74 }
75
76 public Client<T> newClient()
77 {
78 return new Client<T>(tail);
79 }
80
81 static class Client<T>
82 {
83 Node<T> node;
84
85 Client(Node<T> tail)
86 {
87 node = tail;
88 }
89
90 public T poll()
91 {
92 Node<T> next = null;
93
94 try {
95 next = node.next();
96 }catch(InterruptedException _e){
97 _e.printStackTrace();
98 }
99 node = next;
100 return next.item;
101 }
102 }
103
104 private static class Node<T>
105 {
106 private T item;
107 private Node<T> next;
108 private CountDownLatch latch;
109
110 public Node(T item)
111 {
112 this.item = item;
113 this.next = null;
114 latch = new CountDownLatch(1);
115 }
116
117 public void set(Node<T> next)
118 {
119 this.next = next;
120 latch.countDown();
121 }
122
123 public Node<T> next() throws InterruptedException
124 {
125 latch.await();
126 return next;
127 }
128 }
129 }