diff src/queue/MulticastQueue.java @ 1:15920e9b562d

added MulticastQueue.java the BlockingMulticastQueue
author Shoshi TAMAKI <shoshi@cr.ie.u-ryukyu.ac.jp>
date Tue, 09 Aug 2011 11:13:36 +0900
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/queue/MulticastQueue.java	Tue Aug 09 11:13:36 2011 +0900
@@ -0,0 +1,129 @@
+package queue;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.concurrent.CountDownLatch;
+
+public class MulticastQueue<T>
+{
+	public static void main(String args[]) throws IOException
+	{
+		int threads = 5;
+		final MulticastQueue<String> queue = new MulticastQueue<String>();
+		
+		Runnable type2 = new Runnable(){
+
+			@Override
+			public void run()
+			{
+				Client<String> client = queue.newClient();
+				
+				for(;;){
+					String str = client.poll();
+					try {
+						Thread.sleep(10000);
+					} catch (InterruptedException e) {
+						// TODO Auto-generated catch block
+						e.printStackTrace();
+					}
+					System.out.println(Thread.currentThread().getName()+":"+str);
+				}
+			}
+		};
+		
+		Runnable thread = new Runnable(){
+
+			@Override
+			public void run()
+			{
+				Client<String> client = queue.newClient();
+				
+				for(;;){
+					String str = client.poll();
+					System.out.println(Thread.currentThread().getName()+":"+str);
+				}
+			}
+		};
+		
+		for(int i = 0;i < threads;i ++){
+			new Thread(thread).start();
+		}
+		new Thread(type2).start();
+		
+		BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
+		for(;;){
+			String str = br.readLine();
+			queue.put(str);
+		}
+	}
+	
+	
+	Node<T> tail;
+	
+	public MulticastQueue()
+	{
+		tail = new Node<T>(null);
+	}
+	
+	public synchronized void put(T item)
+	{
+		Node<T> next = new Node<T>(item);
+		tail.set(next);
+		tail = next;
+	}
+	
+	public Client<T> newClient()
+	{
+		return new Client<T>(tail);
+	}
+	
+	static class Client<T>
+	{
+		Node<T> node;
+		
+		Client(Node<T> tail)
+		{
+			node = tail;
+		}
+		
+		public T poll()
+		{
+			Node<T> next = null;
+			
+			try {
+				next = node.next();
+			}catch(InterruptedException _e){
+				_e.printStackTrace();
+			}
+			node = next;
+			return next.item;
+		}
+	}
+	
+	private static class Node<T>
+	{
+		private T item;
+		private Node<T> next;
+		private CountDownLatch latch;
+		
+		public Node(T item)
+		{
+			this.item = item;
+			this.next = null;
+			latch = new CountDownLatch(1);
+		}
+		
+		public void set(Node<T> next)
+		{
+			this.next = next;
+			latch.countDown();
+		}
+		
+		public Node<T> next() throws InterruptedException
+		{
+			latch.await();
+			return next;
+		}
+	}
+}