Mercurial > hg > Members > shoshi > AADS
diff src/queue/MulticastQueue.java @ 1:15920e9b562d
added MulticastQueue.java the BlockingMulticastQueue
author | Shoshi TAMAKI <shoshi@cr.ie.u-ryukyu.ac.jp> |
---|---|
date | Tue, 09 Aug 2011 11:13:36 +0900 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/queue/MulticastQueue.java Tue Aug 09 11:13:36 2011 +0900 @@ -0,0 +1,129 @@ +package queue; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.concurrent.CountDownLatch; + +public class MulticastQueue<T> +{ + public static void main(String args[]) throws IOException + { + int threads = 5; + final MulticastQueue<String> queue = new MulticastQueue<String>(); + + Runnable type2 = new Runnable(){ + + @Override + public void run() + { + Client<String> client = queue.newClient(); + + for(;;){ + String str = client.poll(); + try { + Thread.sleep(10000); + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + System.out.println(Thread.currentThread().getName()+":"+str); + } + } + }; + + Runnable thread = new Runnable(){ + + @Override + public void run() + { + Client<String> client = queue.newClient(); + + for(;;){ + String str = client.poll(); + System.out.println(Thread.currentThread().getName()+":"+str); + } + } + }; + + for(int i = 0;i < threads;i ++){ + new Thread(thread).start(); + } + new Thread(type2).start(); + + BufferedReader br = new BufferedReader(new InputStreamReader(System.in)); + for(;;){ + String str = br.readLine(); + queue.put(str); + } + } + + + Node<T> tail; + + public MulticastQueue() + { + tail = new Node<T>(null); + } + + public synchronized void put(T item) + { + Node<T> next = new Node<T>(item); + tail.set(next); + tail = next; + } + + public Client<T> newClient() + { + return new Client<T>(tail); + } + + static class Client<T> + { + Node<T> node; + + Client(Node<T> tail) + { + node = tail; + } + + public T poll() + { + Node<T> next = null; + + try { + next = node.next(); + }catch(InterruptedException _e){ + _e.printStackTrace(); + } + node = next; + return next.item; + } + } + + private static class Node<T> + { + private T item; + private Node<T> next; + private CountDownLatch latch; + + public Node(T item) + { + this.item = item; + this.next = null; + latch = new CountDownLatch(1); + } + + public void set(Node<T> next) + { + this.next = next; + latch.countDown(); + } + + public Node<T> next() throws InterruptedException + { + latch.await(); + return next; + } + } +}