view src/main/java/jp/ac/u_ryukyu/treevnc/TreeRFBProto.java @ 519:d17c048f356f

blocking worked
author mir3636
date Fri, 22 Feb 2019 15:05:21 +0900
parents c4d1a275b7d5
children 3a9dadc8821f
line wrap: on
line source

package jp.ac.u_ryukyu.treevnc;

import com.glavsoft.exceptions.TransportException;
import com.glavsoft.rfb.client.ClientToServerMessage;
import com.glavsoft.rfb.encoding.EncodingType;
import com.glavsoft.rfb.encoding.decoder.FramebufferUpdateRectangle;
import com.glavsoft.rfb.protocol.Protocol;
import com.glavsoft.rfb.protocol.ProtocolContext;
import com.glavsoft.rfb.protocol.ReceiverTask;
import com.glavsoft.transport.Reader;
import com.glavsoft.transport.Writer;
import com.glavsoft.viewer.ConnectionPresenter;
import com.glavsoft.viewer.ViewerInterface;
import com.glavsoft.viewer.swing.ConnectionParams;

import java.io.IOException;
import java.io.OutputStream;
import java.io.UnsupportedEncodingException;
import java.net.*;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.Enumeration;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.zip.DataFormatException;
import java.util.zip.Deflater;
import java.util.zip.Inflater;


public class TreeRFBProto {

    protected final static int FramebufferUpdate = 0;
    protected ProtocolContext context;
    private int clients = 0;

    public MulticastQueue<LinkedList<ByteBuffer>> multicastqueue = new MulticastQueue<LinkedList<ByteBuffer>>();
    public FindRoot findRoot;
    public int acceptPort = 0;
    private String myAddress;
    private long counter = 0; // packet serial number
    public ServerSocket servSock;
    private static final int INFLATE_BUFSIZE = 1024 * 100;
    private Inflater inflater = new Inflater();
    private Deflater deflater = new Deflater();
    ViewerInterface viewer;
    private short id = 0;  // my tree node id ( = 0 in root ), -1 means no parent
    private short sharingId = -1; // VNCServer's id. this is used control visivility
    private TreeVncCommandChannelListener acceptThread;
    private TreeRootFinderListener getCast;
    private CreateConnectionParam cp;
    private ConnectionPresenter connectionPresenter;
    private TreeVNCNetwork nets = new TreeVNCNetwork();
    private TreeVncRootSelectionPanel rootSelectionPanel;
    private String vncInterface;
    private TreeManagement treeManager;
    public LinkedList<TreeVNCNode> nodeList;

    protected boolean readyReconnect = false;
    private boolean cuiVersion;
    private boolean permitChangeScreen = true;
    private boolean leader;
    private boolean hasViewer = false;
    private boolean normalTermination;

    private boolean isTreeManager;
    public boolean showTreeNode = false;
    public boolean checkDelay = false;
    public boolean addSerialNum = false;
    public boolean fixingSize = false;
    public int fixingSizeWidth;
    public int fixingSizeHeight;
    private DatagramSocket socket = null;
    private byte[] originalInitData = null;
    private boolean childrenMulticast = true;
    private static int uniqueNodeId = 0; // uniquenodeid in all trees (less than MAX_UNIQUE_NODE_ID)
    private int deflate_size = 65507;
    private ByteBuffer header;
    private ByteBuffer c1;
    private FramebufferUpdateRectangle c1rect;
    private int c1headerPos;
    private boolean stopBroadcast;
    public boolean multicastBlocking = true;

    public TreeRFBProto(boolean isTreeManager, ViewerInterface viewer) {
        nets.setMyRfb(this);
        this.isTreeManager = isTreeManager;
        this.viewer = viewer;
        startTreeRootFindThread();
    }

    public void startTreeRootFindThread() {
        if (isTreeManager()) {
            getCast = new TreeRootFinderListener(viewer);
            Thread treeRootFindThread = new Thread(getCast, "tree-root-find-listener");
            treeRootFindThread.start();
        }
    }

    public boolean isTreeManager() {
        return isTreeManager;
    }

    public void setIsTreeManager(boolean isTreeManager) {
        this.isTreeManager = isTreeManager;
    }

    public boolean isAddSerialNum() {
        return addSerialNum;
    }

    public ProtocolContext getContext() {
        return context;
    }

    /**
     * handle new client accept
     * it also handle TreeVNC Command
     *
     * @param os
     * @param is
     * @param intf
     * @throws IOException
     * @throws TransportException
     */
    public void newClient(final Writer os, final Reader is, final String intf) {
        final int myId = clients;
        final MulticastQueue.Client<LinkedList<ByteBuffer>> c = multicastqueue.newClient();
        final AtomicInteger writerRunning = new AtomicInteger();
        writerRunning.set(1);
        /**
         * Timeout thread. If a client is suspended, it has top of queue
         * indefinitely, which caused memory overflow. After the timeout, we
         * poll the queue and discard it. Start long wait if writer is running.
         */
        final Runnable timer = new Runnable() {
            public void run() {
                int count = 0;
                for (; ; ) {
                    long timeout = 50000 / 8;
                    try {
                        synchronized (this) {
                            int state, flag;
                            writerRunning.set(0);
                            wait(timeout);
                            flag = 0;
                            while ((state = writerRunning.get()) == 0) {
                                c.poll(); // discard, should be timeout
                                count++;
                                if (flag == 0) {
                                    // System.out.println("Discarding " + myId + " count=" + count);
                                    flag = 1;
                                }
                                wait(10); // if this is too short, writer cannot
                                // take the poll, if this is too
                                // long, memory will overflow...
                            }
                            // if (flag == 1)
                            //    System.out.println("Resuming " + myId + " count=" + count);
                            if (state != 1) {
                                System.out.println("Client died " + myId);
                                // System.out.println("task stop");
                                String rootHostName = null;
                                int rootHostPort = 0;
                                String myHostName = null;
                                int myHostPort = 0;
                                clients = myId;
                                if (!isTreeManager) {
                                    rootHostName = getConnectionParam().getHostName();
                                    rootHostPort = getConnectionParam().getPort();
                                    myHostName = getMyAddress();
                                    myHostPort = getAcceptPort();
                                    TreeVncProtocol echo = new TreeVncProtocol(rootHostName, rootHostPort);
                                    echo.lostChild(myHostName, myHostPort, myId);
                                } else {
                                    getTreeManager(intf).fixLostChild1(myId + 1);
                                }
                                break;
                            }
                        }
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                        System.out.println("timer thread interrupted.");
                    }
                }
            }
        };
        new Thread(timer, "timer-discard-multicastqueue").start();
        /**
         * handle command from lower node
         */
        final Runnable reader = new Runnable() {
            public void run() {
                for (; ; ) {
                    try {
                        final byte b[] = new byte[4096];
                        final int c = is.readByte(b);
                        if (c <= 0)
                            throw new IOException();
                        // case of root node.
                        if (isTreeManager()) {
                            if (b[0] == ClientToServerMessage.SERVER_CHANGE_REQUEST) {
                                if (permitChangeScreen()) {
                                    ByteBuffer buf = ByteBuffer.wrap(b);
                                    buf.order(ByteOrder.BIG_ENDIAN);
                                    buf.get(); // padding
                                    buf.get();
                                    short id = buf.getShort();
                                    int length = buf.getInt();
                                    if (length == 0)
                                        continue;
                                    byte[] byteAddress = new byte[length];
                                    buf.get(byteAddress);
                                    String newHostName = new String(byteAddress, "UTF-8");
                                    int x = buf.getInt();
                                    int y = buf.getInt();
                                    int singleWidth = buf.getInt();
                                    int singleHeight = buf.getInt();
                                    int port = buf.getInt();
                                    int scale = buf.getInt();
                                    System.out.println("Root server change request :" + newHostName + " : " + port + " id:" + id);
                                    changeVNCServer(viewer, newHostName, port, x, y, singleWidth, singleHeight, scale, id, is, os);
                                } else {
                                    continue;
                                }
                            } else if (b[0] == ClientToServerMessage.READY_SHARE_SOUND) {
                                int rtpPort = selectPort(ConnectionParams.DEFAULT_RTP_PORT);
                                InetAddress hostname = InetAddress.getLocalHost();
                                ReceiveSound receiveSound = new ReceiveSound(socket);
                                Thread receiveSoundThread = new Thread(receiveSound, "receive-sound");
                                receiveSoundThread.start();
                            } else if (b[0] == ClientToServerMessage.CHECK_DELAY_REPLY) {
                                ByteBuffer buf = ByteBuffer.wrap(b);
                                buf.order(ByteOrder.BIG_ENDIAN);
                                buf.getShort();
                                Long time = buf.getLong();
                                int port = buf.getInt();
                                int addressLength = buf.getInt();
                                int dataLen = buf.getInt();
                                byte[] byteAddress = new byte[addressLength];
                                buf.get(byteAddress);
                                String address = new String(byteAddress, "UTF-8");
                                int nodeNum = getNodeNum(port, address);
                                if (nodeNum != 0) {
                                    Long delay = System.currentTimeMillis() - time;
                                    if (delay > 3000) {
                                        getTreeManager(intf).fixLostChild1(nodeNum);
                                    }
                                    System.out.println(System.currentTimeMillis() + " : receive checkDelay : nodeNum" + nodeNum + ", port : " + port + ", address : " + address + ", delay : " + delay + ", size : " + dataLen);
                                }
                            }
                            // case of client node.
                        } else if (b[0] == ClientToServerMessage.SERVER_CHANGE_REQUEST) {
                            ClientToServerMessage serverChangeRequest = new ClientToServerMessage() {
                                @Override
                                public void send(Writer writer)
                                        throws TransportException {
                                    writer.write(b, 0, c);
                                }
                            };
                            context.sendMessage(serverChangeRequest);
                        } else if (b[0] == ClientToServerMessage.READY_SHARE_SOUND) {
                            ClientToServerMessage readyShareSound = new ClientToServerMessage() {
                                @Override
                                public void send(Writer writer)
                                        throws TransportException {
                                    writer.write(b, 0, c);
                                }
                            };
                            context.sendMessage(readyShareSound);
                        } else if (b[0] == ClientToServerMessage.CHECK_DELAY_REPLY) {
                            ClientToServerMessage checkDelayReply = new ClientToServerMessage() {
                                @Override
                                public void send(Writer writer)
                                        throws TransportException {
                                    writer.write(b, 0, c);
                                }
                            };
                            context.sendMessage(checkDelayReply);
                        }
                    } catch (Exception e) {
                        try {
                            writerRunning.set(2);
                            os.close();
                            is.close();
                            break;
                        } catch (Exception e1) {
                            System.out.println("cannot close ClientToServerMessage " + e1);

                        }
                        System.out.println("cannot read ClientToServerMessage " + e);

                        return;
                    }

                    /*
                    // あとで検討
                    } catch (TransportException e) {

                    }
                    */
                }
            }
        };
        /**
         * send packets to a client (one thread for each client )
         */
        Runnable sender = new Runnable() {
            public void run() {
                writerRunning.set(1);
                try {
                    // requestThreadNotify(); // send full screen request via fullScreenRequestThread

                    // after this, we discard upward packet.
                    new Thread(reader, "upward-packet-processing").start();

                    for (; ; ) {
                        LinkedList<ByteBuffer> bufs = c.poll();
                        int inputIndex = 0;
                        ByteBuffer header = bufs.get(inputIndex);
                        if (header == null)
                            continue;
                        writeToClient(os, bufs, inputIndex);
                        writerRunning.set(1); // yes my client is awaking.
                        if (!childrenMulticast) {
                            for (; ; ) {
                                LinkedList<ByteBuffer> flag = c.poll();
                                if (flag.size() == 0) {
                                    break;
                                }
                            }
                        }
                    }
                } catch (Exception e) {
                    try {
                        writerRunning.set(2);
                        os.close();
                    } catch (IOException e1) {
                        System.out.println("root writer close failed :" + e1);
                    }
                    System.out.println("root writer failed :" + e);
                    /* if socket closed cliList.remove(newCli); */
                }
            }

            public void writeToClient(final Writer os,
                                      LinkedList<ByteBuffer> bufs, int inputIndex)
                    throws TransportException {
                while (inputIndex < bufs.size()) {
                    ByteBuffer b = bufs.get(inputIndex++);
                    os.write(b.array(), b.position(), b.limit());
                }
                os.flush();
                multicastqueue.heapAvailable();
            }
        };
        clients++;
        new Thread(sender, "writer-to-lower-node").start();
    }

    public boolean permitChangeScreen() {
        return permitChangeScreen;
    }


    public void setPermitChangeScreen(boolean v) {
        permitChangeScreen = v;
    }

    public void requestThreadNotify() {
        // send full screen update
    }


    public void setProtocolContext(Protocol workingProtocol) {
        context = workingProtocol;
    }

    public Socket accept() throws IOException {
        return servSock.accept();
    }

    public int selectPort(int p) {
        int port = p;
        while (true) {
            try {
                servSock = new ServerSocket(port);
                acceptPort = port;
                myAddress = "127.0.0.1";
                nets.getNetworkInterfaces();
                break;
            } catch (BindException e) {
                port++;
                continue;
            } catch (IOException e) {

            }
        }
        System.out.println("accept port = " + port);
        return port;
    }

    public void setViewer(ViewerInterface v) {
        viewer = v;
    }

    public ViewerInterface getViewer() {
        return viewer;
    }


    void sendInitData(OutputStream os) throws IOException {
        os.write(context.getInitData());
    }


    public void setTerminationType(boolean setType) {
        normalTermination = setType;
    }

    public boolean getTerminationType() {
        return normalTermination;
    }


    public void addHostToSelectionPanel(int port, String hostname, String myHostName) {
        if (rootSelectionPanel != null) {
            rootSelectionPanel.checkBox(Integer.toString(port) + ":" + hostname + ":" + myHostName);
            rootSelectionPanel.visible();
        }
    }

    public void createRootSelectionPanel(CreateConnectionParam cp, FindRoot getBcast) {
        rootSelectionPanel = new TreeVncRootSelectionPanel(getBcast);
        rootSelectionPanel.setCp(cp);
        rootSelectionPanel.visible();
    }

    public void close() {
        // none
    }

    public int getAcceptPort() {
        return acceptPort;
    }

    public boolean getReadyReconnect() {
        return readyReconnect;
    }


    public boolean getCuiVersion() {
        return cuiVersion;
    }

    public void setCuiVersion(boolean flag) {
        cuiVersion = flag;
    }

    public void readCheckDelay(Reader reader) throws TransportException {

    }

    public synchronized void vncConnected(boolean ready) {
        enableChildrenTransmission();
        readyReconnect = ready;
        if (ready) {
            notifyAll();
        }
    }

    public void printNetworkInterface() {
        Socket vncSocket = viewer.getVNCSocket();
        NetworkInterface ni = nets.getInterface(vncSocket);
        if (ni != null) {
            vncInterface = ni.getName();
            System.out.print("VNCNetworkInterface :" + vncInterface);
            if (false) {
                Enumeration<InetAddress> addresses = ni.getInetAddresses();
                while (addresses.hasMoreElements()) {
                    InetAddress adr = addresses.nextElement();
                    System.out.print(" " + adr);
                }
            }
            System.out.println();
        }
    }

    public void sendDesktopSizeChange(short id) {
        LinkedList<ByteBuffer> desktopSize = new LinkedList<ByteBuffer>();
        int singleWidth = connectionPresenter.getSingleWidth();
        int singleHeight = connectionPresenter.getSingleHeight();
        int x = connectionPresenter.getX();
        int y = connectionPresenter.getY();
        desktopSize.add(new ChangeDesktopSize(context.getFbWidth(), context.getFbHeight(), singleWidth,singleHeight, x, y, EncodingType.INIT_DATA, context.getInitData(), id).getMessage());
        if (addSerialNum) {
            addSerialNumber(desktopSize);
        }
        multicastqueue.put(desktopSize);
    }

    //public byte[] createOriginalInitData(int singleWidth, int singleHeight, String remoteDesktopName) {
    //    TreeVncCommandChannelListener treeVncCommandChannelListener = new TreeVncCommandChannelListener(this, acceptPort);
    //    byte[] originalInitData = treeVncCommandChannelListener.createOriginalInitData(singleWidth, singleHeight, remoteDesktopName);
    //    return originalInitData;
    //}

    public void addSerialNumber(LinkedList<ByteBuffer> bufs) {
        ByteBuffer serialNum = multicastqueue.allocate(4 + 8); // addSerialNum flag + SerialNum
        serialNum.putInt(1);
        serialNum.putLong(counter++);
        serialNum.flip();
        bufs.addFirst(serialNum);
    }


    public void resetDecoder() {
        context.resetDecoder();
    }

    public void stopReceiverTask() {
        if (context != null) {
            context.cleanUpSession(null);
            context.getRfb().clearChildrenTransmission();
        }
        // cleanup zlib decoder for new VNCServer
        if (isTreeManager())
            inflater = new Inflater();
    }

    public void clearChildrenTransmission() {
        // discarding does not work now.

        /*
        // set discard flag
        childrenMulticast = false;
        // put gurd erement to restart multicastqueue.
        multicastqueue.put(new LinkedList<ByteBuffer>());
        */
    }

    public void enableChildrenTransmission() {
        // a child senderTask may skip this flag
        // durling write waiting. in this case, remaining bufferes are sent.
        childrenMulticast = true;
    }

    public String getMyAddress() {
        return myAddress;
    }

    /**
     * gzip byte arrays
     *
     * @param deflater
     * @param inputs     byte data[]
     * @param inputIndex
     * @param outputs    byte data[]
     * @return byte length in last byte array
     * @throws IOException
     */
    public int zip(Deflater deflater, LinkedList<ByteBuffer> inputs,
                   int inputIndex, LinkedList<ByteBuffer> outputs) throws IOException {
        int len = 0;
        ByteBuffer c1 = multicastqueue.allocate(INFLATE_BUFSIZE);
        while (inputIndex < inputs.size()) {
            ByteBuffer b1 = inputs.get(inputIndex++);
            deflater.setInput(b1.array(), b1.position(), b1.remaining());
            /**
             * If we finish() stream and reset() it, Deflater start new gzip
             * stream, this makes continuous zlib reader unhappy. if we remove
             * finish(), Deflater.deflate() never flushes its output. The
             * original zlib deflate has flush flag. I'm pretty sure this a kind
             * of bug of Java library.
             */
            if (inputIndex == inputs.size())
                deflater.finish();
            int len1 = 0;
            do {
                len1 = deflater.deflate(c1.array(), c1.position(),
                        c1.remaining());
                if (len1 > 0) {
                    len += len1;
                    c1.position(c1.position() + len1);
                    if (c1.remaining() == 0) {
                        c1.flip();
                        outputs.addLast(c1);
                        c1 = multicastqueue.allocate(INFLATE_BUFSIZE);
                    }
                }
            } while (len1 > 0 || !deflater.needsInput()); // &&!deflater.finished());
        }
        if (c1.position() != 0) {
            c1.flip();
            outputs.addLast(c1);
        }
        deflater.reset();
        return len;
    }

    /**
     * gunzip byte arrays
     *
     * @param inflater
     * @param inputs   byte data[]
     * @param bytes    byte data[]
     * @return number of total bytes
     * @throws IOException
     */
    public int unzip(Inflater inflater, LinkedList<ByteBuffer> inputs,
                     int inputIndex, byte[] bytes, int bufSize) throws DataFormatException {
        int position = 0;
        int limit = bytes.length;
        while (inputIndex < inputs.size()) {
            ByteBuffer input = inputs.get(inputIndex++);
            inflater.setInput(input.array(), input.position(), input.limit());
            // if (inputIndex==inputs.size()) if inflater/deflater has symmetry,
            // we need this
            // inflater.end(); but this won't work
            do {
                int len0;
                len0 = inflater.inflate(bytes, position, limit - position);
                if (len0 > 0) {
                    position += len0;
                    if (position > limit) throw new DataFormatException();
                }
            } while (!inflater.needsInput());
        }
        return position;
    }


    /**
     * Multicast framebufferUpdate to children.
     * read FrameBuffferUpdate. If it is ZLE, make it ZLEE which is self contained compressed packet.
     * put the packet to the multicastqueue. Then normal rendering engine read the same stream using is.reset().
     *
     * @param dataLen
     * @param reader
     * @throws TransportException
     */
    public void readSendData(int dataLen, Reader reader, byte[] bytes, FramebufferUpdateRectangle rect)
            throws TransportException {
        LinkedList<ByteBuffer> bufs = new LinkedList<ByteBuffer>();
        int headerLen = rect.getEncodingType() == EncodingType.CHECK_DELAY ? 24 : 16;
        ByteBuffer header = multicastqueue.allocate(headerLen);
        ByteBuffer serial = multicastqueue.allocate(4 + 8);
        if (!isTreeManager() && addSerialNum) {
            reader.readBytes(serial.array(), 0, 4 + 8);
            serial.limit(4 + 8);
        }
        reader.mark(dataLen);
        reader.readBytes(header.array(), 0, headerLen);
        header.limit(headerLen);
        if (header.get(0) == FramebufferUpdate) {
            int encoding = header.getInt(12);

            if (encoding == EncodingType.ZRLE.getId()
                    || encoding == EncodingType.ZLIB.getId()) {
                if (multicastBlocking)  {
                    try {
                        zrleeBlocking(dataLen, reader, bytes, rect, context.getPixelFormat().bitsPerPixel/8, bufs, header);
                    } catch (DataFormatException e) {
                        e.printStackTrace();
                    }
                    return;
                }
                // recompress into ZREE
                // uncompressed result is remain in bytes
                ByteBuffer len = multicastqueue.allocate(4);
                reader.readBytes(len.array(), 0, 4);
                len.limit(4);
                ByteBuffer inputData = multicastqueue.allocate(dataLen - 20);
                reader.readBytes(inputData.array(), 0, inputData.capacity());
                inputData.limit(dataLen - 20);
                LinkedList<ByteBuffer> inputs = new LinkedList<ByteBuffer>();
                inputs.add(inputData);
                header.putInt(12, EncodingType.ZRLEE.getId()); // means
                // recompress
                // every time
                // using new Deflecter every time is incompatible with the
                // protocol, clients have to be modified.
                Deflater nDeflater = deflater; // new Deflater();
                LinkedList<ByteBuffer> out = new LinkedList<ByteBuffer>();
                try {
                    unzip(inflater, inputs, 0, bytes, INFLATE_BUFSIZE);
                    // dump32(inputs);
                    out.add(ByteBuffer.wrap(bytes));
                    int len2 = zip(nDeflater, out, 0, bufs);
                    ByteBuffer blen = multicastqueue.allocate(4);
                    blen.putInt(len2);
                    blen.flip();
                    bufs.addFirst(blen);
                    if (checkDelay) {
                        bufs = createCheckDelayHeader(bufs, header);
                    } else {
                        bufs.addFirst(header);
                    }
                    if (addSerialNum) {
                        addSerialNumber(bufs);
                    }
                    multicastqueue.waitput(bufs);
                } catch (IOException | InterruptedException e) {
                    throw new TransportException(e);
                } catch (DataFormatException e) {
                    throw new TransportException(e);
                }
                return;
            }

            //    ZRLEE is already compressed
            bufs.add(header);
            if (addSerialNum) {
                this.addSerialNumber(bufs);
            }
            if (dataLen > headerLen) {
                ByteBuffer b = multicastqueue.allocate(dataLen - headerLen);
                reader.readBytes(b.array(), 0, dataLen - headerLen);
                b.limit(dataLen - headerLen);
                bufs.add(b);
                byte[] bytes1 = b.array();
                int headerPos = 4; int len2 = b.limit();
                System.out.println(" bytes: " + bytes1[headerPos + 20] + ", " + bytes1[headerPos + 21] + ", " + bytes1[headerPos + 22] + ", " + bytes1[headerPos + 23]);
                System.out.println(" bytes: " + bytes1[len2 - 8] + ", " + bytes1[len2 - 7] + ", " + bytes1[len2 - 6] + ", " + bytes1[len2 - 5]);
                System.out.println(" bytes: " + bytes1[len2 - 4] + ", " + bytes1[len2 - 3] + ", " + bytes1[len2 - 2] + ", " + bytes1[len2 - 1]);
            }
            multicastqueue.put(bufs);

            return;
        }
        // It may be compressed. We can inflate here to avoid repeating clients
        // decompressing here,
        // but it may generate too many large data. It is better to do it in
        // each client.
        // But we have do inflation for all input data, so we have to do it
        // here.
    }

    /**
     * Multicast framebufferUpdate to children.
     * read FrameBuffferUpdate. If it is ZLE, make it ZLEE which is self contained compressed packet.
     * put the packet to the multicastqueue. Then normal rendering engine read the same stream using is.reset().
     *
     * @param dataLen
     * @param reader
     * @throws TransportException
     * @throws UnsupportedEncodingException
     */


    private void zrleeBlocking(int dataLen, Reader reader, byte[] bytes, FramebufferUpdateRectangle rect, int bytePerPixel, LinkedList<ByteBuffer> bufs, ByteBuffer header) throws TransportException, DataFormatException {
        ByteBuffer len = multicastqueue.allocate(4);
        reader.readBytes(len.array(), 0, 4);
        len.limit(4);
        ByteBuffer inputData = multicastqueue.allocate(dataLen - 20);
        reader.readBytes(inputData.array(), 0, inputData.capacity());
        inputData.limit(dataLen - 20);
        LinkedList<ByteBuffer> inputs = new LinkedList<ByteBuffer>();
        inputs.add(inputData);
        header.putInt(12, EncodingType.ZRLEE.getId()); // means
        // recompress
        // every time
        // using new Deflecter every time is incompatible with the
        // protocol, clients have to be modified.
        Deflater nDeflater = deflater; // new Deflater();
        LinkedList<ByteBuffer> out = new LinkedList<ByteBuffer>();
        int inflate_size = rect.width * rect.height * bytePerPixel;

        unzip(inflater, inputs, 0, bytes, inflate_size);
        this.header = header;
        // dump32(inputs);
        c1 = multicastqueue.allocate(deflate_size);
        if (addSerialNum)
            c1.putLong(counter++);
        if (checkDelay)
            CheckDelay.checkDelay(c1, rect.x, rect.y, rect.width, rect.height, System.currentTimeMillis(), EncodingType.CHECK_DELAY);
        c1headerPos = c1.position();
        c1.put(header);
        header.flip();
        c1.putInt(0);
        c1rect = new FramebufferUpdateRectangle(rect.x, rect.y, 0, 0);
        return;
    }

    public void multicastPut(FramebufferUpdateRectangle rect, byte[] bytes, int prevoffset, int offset, int tilex, int tiley) {
        int span = offset - prevoffset;
        deflater.setInput(bytes,prevoffset,span);
        c1rect.height = tiley;
        if (c1.remaining() < span || c1rect.x + c1rect.width + tilex >= rect.x + rect.width ) {
            deflater.deflate(c1, Deflater.FULL_FLUSH);
            deflater.finish();
            c1.flip();
            System.out.println("multicastPut: " + c1rect + " length: " + (c1.remaining()-c1headerPos-header.limit()));
            try {
                writeUpdateRectangleWithHeader(c1, c1headerPos, c1.remaining()-c1headerPos-header.limit()-4, c1rect.x, c1rect.y, c1rect.width, c1rect.height);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            c1rect.x += c1rect.width;
            if (c1rect.x >= rect.x + rect.width) {
                c1rect.x = rect.x;
                c1rect.y += tiley;
            }
            c1rect.width = 0;
            c1 = multicastqueue.allocate(deflate_size);
            if (addSerialNum)
                c1.putLong(counter++);
            c1headerPos = c1.position();
            c1.put(header);
            header.flip();
            c1.putInt(0);
        } else {
            deflater.deflate(c1, Deflater.SYNC_FLUSH);
        }
        c1rect.width += tilex;
    }

    /**
     * make and send frameBufferUpdateRectangle packet
     * @param c1
     * @param headerPos
     * @param len2
     * @param x
     * @param y
     * @param w
     * @param h
     * @throws InterruptedException
     *
     * [8]    sequence number (if used )
     *  0  0      FRAMEBUFFEUPDATERRECTANGLE  < headerPos
     *  1  0      padding
     *  2  int16  count of frame
     *  4  int16 x
     *  6  int16 y
     *  8  int16 w
     * 10  int16 h
     * 12  int32  encoding type
     * 16  [int32]   datalen if zcompressed
     * 20  compressedData
     */
    private void writeUpdateRectangleWithHeader(ByteBuffer c1, int headerPos, int len2, int x, int y, int w, int h) throws InterruptedException {
        deflater.reset();

        c1.putInt(headerPos + 16, len2);
        c1.putShort(headerPos + 4,(short) x);
        c1.putShort(headerPos + 6,(short) y);
        c1.putShort(headerPos + 8,(short) w);
        c1.putShort(headerPos + 10,(short) h);
        LinkedList<ByteBuffer> bufs = new LinkedList<ByteBuffer>();
        bufs.add(c1);
        if (isTreeManager && connectionPresenter.isUseMulticast()) {
            for(ByteBuffer buf : bufs)
                viewer.getRfbBroadcastListener().multicastUpdateRectangle(buf);
        } else
            multicastqueue.waitput(bufs);
        byte[] bytes = c1.array();
        len2 = c1.remaining();
        System.out.println(" bytes: " + bytes[headerPos + 20] + ", " + bytes[headerPos + 21] + ", " + bytes[headerPos + 22] + ", " + bytes[headerPos + 23]);
        System.out.println(" bytes: " + bytes[len2 - 4] + ", " + bytes[len2 - 3] + ", " + bytes[len2 - 2] + ", " + bytes[len2 - 1]);
    }

    public LinkedList<ByteBuffer> createCheckDelayHeader(LinkedList<ByteBuffer> checkDelay, ByteBuffer header) {
        int x = (int) header.getShort(4);
        int y = (int) header.getShort(6);
        int width = (int) header.getShort(8);
        int height = (int) header.getShort(10);
        long time = System.currentTimeMillis();
        checkDelay.addFirst(new CheckDelay(x, y, width, height, time, EncodingType.CHECK_DELAY).getMessage());
        return checkDelay;
    }

    public void setId(short id) {
        this.id = id;
    }

    public short getId() {
        return id;
    }

    public void setMyAddress(String myHostName) {
        this.myAddress = myHostName;

    }

    public void setLeader(boolean leader) {
        this.leader = leader;
    }

    public boolean isLeader() {
        return leader;
    }

    public void setTreeManager(String intf, TreeManagement clients) {
        nets.setTreeManager(intf, clients);
    }

    public TreeManagement getTreeManager(String intf) {
        this.treeManager = nets.getTreeManager(intf);
        this.nodeList = treeManager.getList();
        return treeManager;
    }

    public int getNewNodeId () {
        return ++uniqueNodeId;
    } // 0 is reserved for root

    /**
     * change VNCServer is called when host change.
     *
     * @param vncProxyService
     * @param hostName          HostAddress
     * @param x
     * @param y
     * @param width             FrameWidth
     * @param height            FrameHeight
     * @param scale
     * @param newVNCServerId
     * @param is
     * @param os
     */
    public void changeVNCServer(ViewerInterface vncProxyService, String hostName, int port, int x, int y, int width, int height, int scale, short newVNCServerId, Reader is, Writer os)
            throws IOException {
        if (newVNCServerId == -1) {
            // change to the tree vnc root on other network.
            vncProxyService.changeToDirectConnectedServer(hostName, is, os, x, y, width, height, scale);
            return;
        }
        // serverChangeの処理
        vncProxyService.inhelitClients(hostName, newVNCServerId, x, y, width, height, scale);
        // after connecting VNC server, rfb send SEND_INIT_DATA command and wakes me up if necessary
        // stop reader stop
    }

    /**
     * start accepting children
     * run rootFinderListener if necessary
     */
    public void createConnectionAndStart(ViewerInterface v) {
        selectPort(ConnectionParams.DEFAULT_VNC_ROOT);
        acceptThread = new TreeVncCommandChannelListener(this, getAcceptPort());
        Thread thread = new Thread(acceptThread, "TreeVNC-accept");
        thread.start();
    }

    public TreeVncCommandChannelListener getAcceptThread() {
        return acceptThread;
    }

    public void setConnectionParam(CreateConnectionParam createConnectionParam) {
        cp = createConnectionParam;
    }

    public CreateConnectionParam getConnectionParam() {
        return cp;
    }

    public boolean hasViewer() {
        return hasViewer;
    }

    public void setHasViewer(boolean b) {
        hasViewer = b;
    }

    public void setShowTree(boolean showTree) {
        this.showTreeNode = showTree;
    }

    public void setCheckDelay(boolean checkDelay) {
        this.checkDelay = checkDelay;
    }

    public void setAddSerialNum(boolean addSerialNum) {
        this.addSerialNum = addSerialNum;
    }

    public int getNodeNum(int port, String address) {
        int nodeNum = 0;

        for (Iterator<TreeVNCNode> i = nodeList.iterator(); i.hasNext(); ) {
            TreeVNCNode tvn = (TreeVNCNode) i.next();
            if (port == tvn.port && address.equals(tvn.hostname)) {
                nodeNum = tvn.treeNum;
                return nodeNum;
            }
        }
        return nodeNum;
    }

    public void setFixingSize(boolean fixingSize) {
        this.fixingSize = fixingSize;
    }

    public boolean hasParent() {
        return id != -1;
    }

    /**
     * server change to directed connected server
     * server maybe on the different network
     * so can not in whereToConnect message
     * reuse dynamic connect socket us a new client
     * reother server exchange socket when serverChangeRequest with id -1
     *
     * @param previousReader
     * @param previousWriter
     */
    public void exchangeDirectConnectedServer(Reader previousReader, Writer previousWriter) {
        String adr = viewer.getRfb().getMyAddress();
        ConnectionPresenter cp1 = viewer.getConnectionPresenter();
        int scale = cp1.getRetinaScale();
        int singleWidth = cp1.getSingleWidth();
        int singleHeight = cp1.getSingleHeight();
        int x = cp1.getX();
        int y = cp1.getY();
        ScreenChangeRequest scr = new ScreenChangeRequest(adr, ConnectionParams.DEFAULT_VNC_ROOT, (short) -1, x, y, singleWidth, singleHeight, scale);
        try {
            scr.send(previousWriter);
        } catch (TransportException e) {
            e.printStackTrace();
            return;
        }
        // newClient(previousWriter, previousReader, null);
    }

    /**
     * Requested server is connected. stop old connection, replace old connection parameter such as
     * context (PROTOCOL)
     * start new connection and send INIT_DATA
     *
     * @param workingProtocol
     * @param connectionPresenter
     */
    public synchronized void newVNCConnection(Protocol workingProtocol, ConnectionPresenter connectionPresenter) {
        ProtocolContext previousContext = getContext();
        stopReceiverTask();
        setProtocolContext(workingProtocol);
        this.connectionPresenter = connectionPresenter;
        connectionPresenter.viewer.setConnectionPresenter(connectionPresenter);
        connectionPresenter.addModel("ConnectionParamsModel", connectionPresenter.getConnectionParams());
        if (previousContext != null && isTreeManager() && hasParent()) {
            Reader previousReader = previousContext.getReader();
            Writer previousWriter = previousContext.getWriter();
            // exchangeDirectConnectedServer(previousReader, previousWriter);
        }
        enableChildrenTransmission();
        printNetworkInterface();
        System.out.println("newVNCConenction reconnectingId: " + connectionPresenter.getReconnectingId());
        if (isTreeManager()) {
            sharingId = connectionPresenter.getReconnectingId();
        }
        sendDesktopSizeChange(connectionPresenter.getReconnectingId());
    }

    public void sendErrorAnnounce(short reconnectingId, String message) {
        LinkedList<ByteBuffer> errorAnnounce = new LinkedList<ByteBuffer>();
        errorAnnounce.add(new ChildNodeAnnounce(EncodingType.ERROR_ANNOUNCE, message.getBytes(), reconnectingId).getMessage());
        if (addSerialNum) {
            addSerialNumber(errorAnnounce);
        }
        multicastqueue.put(errorAnnounce);
    }

    public void setConnectionPresenter(ConnectionPresenter connectionPresenter) {
        this.connectionPresenter = connectionPresenter;
    }

    public short getSharingId() {
        return sharingId;
    }

    public void setSharingId(short sharingId) {
        this.sharingId = sharingId;
    }

    public synchronized void startBroadcast(ReceiverTask receiverTask) {
        BroadcastRFBListener rfbBroadcastListener = viewer.getRfbBroadcastListener();
        rfbBroadcastListener.init(this, receiverTask);
        if (!rfbBroadcastListener.isStopFlag()) {
            Thread rfbBroadcast = new Thread(rfbBroadcastListener, "RFBBroadcast");
            rfbBroadcast.start();
        }
    }
}