view src/main/java/alice/datasegment/DataSegmentManager.java @ 467:6e304a7a60e7 dispose

remove white space
author sugi
date Sat, 22 Nov 2014 12:08:24 +0900
parents bcf6f4a6fcd0
children 86c45738dd9e
line wrap: on
line source

package alice.datasegment;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.log4j.Logger;

import alice.codesegment.CodeSegment;

public abstract class DataSegmentManager {

    protected ConcurrentHashMap<Integer, Command> seqHash = new ConcurrentHashMap<Integer, Command>();
    protected LinkedBlockingQueue<Command> replyQueue = new LinkedBlockingQueue<Command>();
    protected AtomicInteger seq = new AtomicInteger(1); // waiting for PUT or UPDATE at unique sequence number
                                                        // but it doesn't need for Local

    protected Runnable replyThread = new Runnable() {
        Logger logger = Logger.getLogger("reply");
        @Override
        public void run() {
            while (true) {
                try {
                    Command reply = replyQueue.take();
                    Command cmd = getAndRemoveCmd(reply.seq);
                    if (cmd == null) {
                        logger.warn("conflict sequence number");
                        continue;
                    }
                    cmd.cs.ids.reply(cmd.receiver, reply);
                    if (logger.isDebugEnabled())
                        logger.debug(reply.getCommandString() + " " + cmd.getCommandString());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }

    };

    public Command getAndRemoveCmd(int index){
        return seqHash.remove(index);
    }

    public void addReplyCommand(Command cmd) {
        try {
            replyQueue.put(cmd);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public abstract void put(String key, ReceiveData rData, SendOption option);
    public abstract void update(String key, ReceiveData rData, SendOption option);
    public abstract void peek(Receiver receiver, CodeSegment cs, SendOption option);
    public abstract void take(Receiver receiver, CodeSegment cs, SendOption option);

    public abstract void remove(String key);
    public abstract void shutdown();
    public abstract void close();
    public abstract void finish();

    public abstract void ping(String returnKey);
    public abstract void response(String returnKey);

}