view src/main/java/alice/datasegment/LocalDataSegmentManager.java @ 527:bfec2c3ff1b8 dispose

change unzip
author Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
date Thu, 30 Apr 2015 18:14:02 +0900
parents 928907206d21
children cb7c31848d16
line wrap: on
line source

package alice.datasegment;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.apache.log4j.Logger;

import alice.codesegment.CodeSegment;

/**
 * localのDSに対する処理。DS自体は持っていない。→ReceivedData
 * DataSegmentKey.runCommandに渡してコマンドを実行する。
 */
public class LocalDataSegmentManager extends DataSegmentManager {

    private String reverseKey = "local";
    private ConcurrentHashMap<String, DataSegmentKey> dataSegments = new ConcurrentHashMap<String, DataSegmentKey>();
    private Logger logger = Logger.getLogger("local");

    private ThreadPoolExecutor dataSegmentExecutor = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), // initial number of threads
            Runtime.getRuntime().availableProcessors(),
            Integer.MAX_VALUE, // keepAliveTime
            TimeUnit.SECONDS,
            new LinkedBlockingQueue<Runnable>());

    //コンストラクタ。スレッドが走る。
    public LocalDataSegmentManager() {
        new Thread(replyThread, "LocalDataSegmentManager-replyCommand").start();
    }

    public void setReverseKey(String s){
        reverseKey = s;
    }

    private class RunCommand implements Runnable {

        DataSegmentKey key;
        Command cmd;

        public RunCommand(DataSegmentKey key, Command cmd) {
            this.key = key;
            this.cmd = cmd;
        }

        @Override
        public void run() {
            key.runCommand(cmd);
        }

    }

    public void submitCommand(DataSegmentKey key, Command cmd) {
        dataSegmentExecutor.execute(new RunCommand(key, cmd));
    }

    public DataSegmentKey getDataSegmentKey(String key) {
        DataSegmentKey dsKey = dataSegments.get(key);
        if (dsKey != null)
            return dsKey;
        if (key == null)
            return null;
        DataSegmentKey newDataSegmentKey = new DataSegmentKey();
        DataSegmentKey dataSegmentKey = dataSegments.putIfAbsent(key, newDataSegmentKey);
        if (dataSegmentKey == null) {
            dataSegmentKey = newDataSegmentKey;
        }
        return dataSegmentKey;
    }

    public void removeDataSegmentKey(String key) {
        if (key!=null)
            dataSegments.remove(key);
    }

    @Override
    public void put(String key, ReceiveData rData, SendOption option) {
        DataSegmentKey dataSegmentKey = getDataSegmentKey(key);
        Command cmd = new Command(CommandType.PUT, null, key, rData, 0, 0, null, null, reverseKey);//ここでMP変換している
        cmd.setCompressFlag(option.getCompressFlag());
        rData.setCompressFlag(option.getCompressFlag());

        dataSegmentKey.runCommand(cmd);
        if (logger.isDebugEnabled())
            logger.debug(cmd.getCommandString());
    }

    /**
     * Enqueue update command to the queue of each DataSegment key
     */

    @Override
    public void update(String key, ReceiveData rData, SendOption option) {
        DataSegmentKey dataSegmentKey = getDataSegmentKey(key);
        Command cmd = new Command(CommandType.UPDATE, null, key, rData, 0, 0, null, null, reverseKey);
        cmd.setCompressFlag(option.getCompressFlag());
        rData.setCompressFlag(option.getCompressFlag());

        dataSegmentKey.runCommand(cmd);
        if (logger.isDebugEnabled())
            logger.debug(cmd.getCommandString());
    }

    @Override
    public void take(Receiver receiver, CodeSegment cs, SendOption option) {
        DataSegmentKey dataSegmentKey = getDataSegmentKey(receiver.key);
        int seq = this.seq.getAndIncrement();
        Command cmd = new Command(CommandType.TAKE, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null);
        cmd.setCompressFlag(option.getCompressFlag());

        dataSegmentKey.runCommand(cmd);
        if (logger.isDebugEnabled())
            logger.debug(cmd.getCommandString());
    }

    @Override
    public void peek(Receiver receiver, CodeSegment cs, SendOption option) {
        DataSegmentKey dataSegmentKey = getDataSegmentKey(receiver.key);
        int seq = this.seq.getAndIncrement();
        Command cmd = new Command(CommandType.PEEK, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null);
        cmd.setCompressFlag(option.getCompressFlag());

        dataSegmentKey.runCommand(cmd);
        if (logger.isDebugEnabled())
            logger.debug(cmd.getCommandString());
    }

    //このコマンドは?
    @Override
    public void remove(String key) {
        DataSegmentKey dataSegmentKey = getDataSegmentKey(key);
        Command cmd = new Command(CommandType.REMOVE, null, key, null, 0, 0, replyQueue, null, null);
        dataSegmentKey.runCommand(cmd);
        if (logger.isDebugEnabled())
            logger.debug(cmd.getCommandString());
    }

    @Override public void finish() {
        System.exit(0);
    }

    @Override
    public void close() {

    }

    //?
    public void recommand(Receiver receiver, CodeSegment cs) {
        DataSegmentKey dataSegmentKey = getDataSegmentKey(receiver.key);
        int seq = this.seq.getAndIncrement();
        Command cmd = new Command(receiver.type, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null);
        dataSegmentKey.runCommand(cmd);
        if (logger.isDebugEnabled())
            logger.debug(cmd.getCommandString());

    }

    @Override
    public void ping(String returnKey) {

    }

    @Override
    public void response(String returnKey) {

    }

    @Override
    public void shutdown() {

    }

    @Override
    public void setSendError(boolean b) {

    }
}