view src/main/java/alice/datasegment/DataSegmentKey.java @ 650:4289b232b3fd

nulValue
author suruga
date Fri, 02 Feb 2018 18:26:49 +0900
parents 646f705e65b1
children
line wrap: on
line source

package alice.datasegment;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;

import alice.datasegment.Command;

/**
 * ここがコマンドの中身部分
 *
 * Synchronized DataSegment for each DataSegment key
 * @author kazz
 *
 */
public class DataSegmentKey {

    private ArrayList<DataSegmentValue> dataList = new ArrayList<DataSegmentValue>();
    private ArrayList<Command> waitList = new ArrayList<Command>();
    private int tailIndex = 1;

    public synchronized void runCommand(Command cmd) {
        switch (cmd.type) {
        case UPDATE:
            if (dataList.size() != 0) {
                dataList.remove(0);
            }
        case PUT:
            int index = tailIndex;
            tailIndex++;
            DataSegmentValue dsv = new DataSegmentValue(index, cmd.rData, cmd.reverseKey);
            dataList.add(dsv);
            // Process waiting peek and take commands
            for (Iterator<Command> iter = waitList.iterator(); iter.hasNext(); ) {
                Command waitCmd = iter.next();
                if (waitCmd.index < index) {
                    replyValue(waitCmd, dsv, cmd.getCompressFlag());
                    iter.remove();
                    if (waitCmd.type == CommandType.TAKE) { // someone is waiting for this put or update command
                        dataList.remove(dsv);
                        break;
                    }
                }
            }
            break;
        case PEEK:
            if (cmd.index >= tailIndex) {
                waitList.add(cmd);
                break;
            }
            boolean waitFlag2 = true;
            for (DataSegmentValue data : dataList) {
                if (data.index > cmd.index) {
                    replyValue(cmd, data, cmd.getCompressFlag());
                    waitFlag2 = false;
                    break;
                }
            }
            if (waitFlag2)
                waitList.add(cmd);
            break;
        case TAKE:
            if (cmd.index >= tailIndex) {
                waitList.add(cmd);
                break;
            }
            boolean waitFlag = true;
            for (Iterator<DataSegmentValue> iter = dataList.iterator(); iter.hasNext(); ) {
                DataSegmentValue data = iter.next();
                if (data.index > cmd.index) {
                    replyValue(cmd, data, cmd.getCompressFlag());
                    iter.remove();
                    waitFlag = false;
                    break;
                }
            }
            if (waitFlag)
                waitList.add(cmd);
            break;
        case REMOVE:
            // TODO: implements later
            break;
        default:
        }

    }

    public void replyValue(Command cmd, DataSegmentValue data, boolean cFlag){
        if (cFlag && !data.rData.compressed()){
            try {
                data.rData.zip();
                // System.out.println("in reply zip");
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        Command rCmd = new Command(CommandType.REPLY, null, null, data.rData, data.index, cmd.seq, null, null, data.from);
        rCmd.setCompressFlag(cFlag);

        if (cmd.cs!=null){ // if cmd has cs-instance, it means Command from local.
            cmd.cs.ids.reply(cmd.receiver, rCmd);
        } else {
            try {
                if (!cmd.getQuickFlag()) {
                    cmd.connection.sendQueue.put(rCmd);
                } else {
                    cmd.connection.write(rCmd);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

}