view src/main/java/alice/datasegment/CompressedLocalDataSegmentManager.java @ 574:ea21af9a4762 dispose

delete serializeFlag, fix MessagePack pack&unpack
author Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
date Tue, 15 Dec 2015 11:49:07 +0900
parents 8c17a9e66cc7
children
line wrap: on
line source

package alice.datasegment;

import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.apache.log4j.Logger;

import alice.codesegment.CodeSegment;

public class CompressedLocalDataSegmentManager extends DataSegmentManager {

    LocalDataSegmentManager manager;
    private String reverseKey = "compressedlocal";

    public CompressedLocalDataSegmentManager(LocalDataSegmentManager manager) {
        this.manager = manager;
    }

    public void setReverseKey(String s){
        reverseKey = s;
    }

    public void submitCommand(DataSegmentKey key, Command cmd) {
        manager.submitCommand(key, cmd);
    }

    public DataSegmentKey getDataSegmentKey(String key) {
        return manager.getDataSegmentKey(key);
    }

    public void removeDataSegmentKey(String key) {
        manager.removeDataSegmentKey(key);
    }

    @Override
    public void put(String key, ReceiveData rData, boolean quickFlag) {
        if (!rData.compressed()){
            try {
                rData.zip();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

        Command cmd = new Command(CommandType.PUT, null, key, rData, 0, 0, null, null, reverseKey);
        cmd.setCompressFlag(true);

        manager.put1(key, cmd);
    }

    /**
     * Enqueue update command to the queue of each DataSegment key
     */

    @Override
    public void update(String key, ReceiveData rData, boolean quickFlag) {

        if (!rData.compressed()){
            try {
                rData.zip();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

        Command cmd = new Command(CommandType.UPDATE, null, key, rData, 0, 0, null, null, reverseKey);
        cmd.setCompressFlag(true);

        manager.put1(key, cmd);
    }

    @Override
    public void take(Receiver receiver, CodeSegment cs, boolean quickFlag) {
        Command cmd = new Command(CommandType.TAKE, receiver, receiver.key, null, receiver.index, 0, replyQueue, cs, null);
        cmd.setCompressFlag(true);

        manager.take1(receiver, cmd);
    }

    @Override
    public void peek(Receiver receiver, CodeSegment cs, boolean quickFlag) {
        Command cmd = new Command(CommandType.PEEK, receiver, receiver.key, null, receiver.index, 0, replyQueue, cs, null);
        cmd.setCompressFlag(true);

        manager.take1(receiver, cmd);
    }

    @Override
    public void remove(String key) {
        manager.remove(key);
    }

    @Override public void finish() {
        manager.finish();
    }

    @Override
    public void close() {

    }

    public void recommand(Receiver receiver, CodeSegment cs) {
        manager.recommand(receiver, cs);
    }

    @Override
    public void ping(String returnKey) {

    }

    @Override
    public void response(String returnKey) {

    }

    @Override
    public void shutdown() {

    }

    @Override
    public void setSendError(boolean b) {

    }
}