view src/main/java/alice/datasegment/DataSegmentManager.java @ 650:4289b232b3fd

nulValue
author suruga
date Fri, 02 Feb 2018 18:26:49 +0900
parents ea21af9a4762
children
line wrap: on
line source

package alice.datasegment;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.log4j.Logger;

import alice.codesegment.CodeSegment;

public abstract class DataSegmentManager {

    protected ConcurrentHashMap<Integer, Command> seqHash = new ConcurrentHashMap<Integer, Command>();
    protected LinkedBlockingQueue<Command> replyQueue = new LinkedBlockingQueue<Command>();
    protected AtomicInteger seq = new AtomicInteger(1); // waiting for PUT or UPDATE at unique sequence number
                                                        // but it doesn't need for Local

    protected Runnable replyThread = new Runnable() {
        Logger logger = Logger.getLogger("reply");
        @Override
        public void run() {//SEDAのREPLYスレッドのなごり。消してもいい。
            while (true) {
                try {
                    Command reply = replyQueue.take();
                    Command cmd = getAndRemoveCmd(reply.seq);
                    if (cmd == null) {
                        logger.warn("conflict sequence number");
                        continue;
                    }
                    cmd.cs.ids.reply(cmd.receiver, reply);
                    if (logger.isDebugEnabled())
                        logger.debug(reply.getCommandString() + " " + cmd.getCommandString());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }

    };

    public Command getAndRemoveCmd(int index){
        //System.err.println("DSM getAndRemoveCmd seq : " + index);
        return seqHash.remove(index);
    }

    public void addReplyCommand(Command cmd) {
        try {
            replyQueue.put(cmd);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    //各コマンドの抽象クラス
    public abstract void put(String key, ReceiveData rData, boolean quickFlag);
    public abstract void update(String key, ReceiveData rData, boolean quickFlag);
    public abstract void peek(Receiver receiver, CodeSegment cs, boolean quickFlag);
    public abstract void take(Receiver receiver, CodeSegment cs, boolean quickFlag);

    public abstract void remove(String key);
    public abstract void shutdown();
    public abstract void close();
    public abstract void finish();

    public abstract void ping(String returnKey);
    public abstract void response(String returnKey);

    public abstract void setSendError(boolean b);

}