Mercurial > hg > Applications > TreeVNC
view src/main/java/jp/ac/u_ryukyu/treevnc/MulticastQueue.java @ 354:7ef4ac588459
remove flag in writeToClinet
author | oc |
---|---|
date | Mon, 23 Feb 2015 18:14:34 +0900 |
parents | 42fcc9419498 |
children | 57e0d052b126 |
line wrap: on
line source
package jp.ac.u_ryukyu.treevnc; import java.nio.ByteBuffer; import java.util.concurrent.CountDownLatch; public class MulticastQueue<T> { Node<T> tail; /** * Multicastcast Queue * Pass a data to multiple clients. * element Node T * Another time out thread should be used to limit the total size. */ public MulticastQueue() { tail = new Node<T>(null); } /** * @param size * @return * * try to allocate byteBuffer. * wait until heap is available. */ public ByteBuffer allocate(int size) { ByteBuffer b=null; while(true){ try { b = ByteBuffer.allocate(size); } catch (OutOfMemoryError e) { b = null; System.err.println("multicastqueue : wait for heap : " + e); } if (b!=null) { break; } try { wait(); } catch (InterruptedException e) { System.out.println("thread has interrupted the current thread."); } } return b; } public synchronized void heapAvailable() { notifyAll(); } /** * put item to the queue * all client threads start read it * @param item */ public synchronized void put(T item) { Node<T> next = new Node<T>(item); tail.set(next); tail = next; } /** * register new clients. Clients read this queue, if all clients read the queue, item is removed * @return */ public Client<T> newClient() { return new Client<T>(tail); } /** * @author kono * Inner Client class * @param <T> */ public static class Client<T> { Node<T> node; Client(Node<T> tail) { node = tail; } /** * try to read next item, if not available, wait for the next item * All clients wait for a CountDownLatch in the next item. * set operation count down it, and all clients get the item. * @return */ public T poll() { Node<T> next = null; T item = null; do { try { next = node.next(); } catch(InterruptedException _e) { System.out.println("thread has interrupted the current thread."); continue; } item = next.getItem(); node = next; } while ( item == null); return item; } } static class Node<T> { private T item; private Node<T> next; private CountDownLatch latch; public Node(T item) { this.item = item; this.next = null; latch = new CountDownLatch(1); } public T getItem() { return item; } public void set(Node<T> next) { this.next = next; latch.countDown(); } public Node<T> next() throws InterruptedException { latch.await(); return next; } public void clear() { item = null; } } }