view src/main/java/alice/datasegment/DataSegmentKey.java @ 448:4840d0e2b605 dispose

add compress and serialize Flag.
author sugi
date Tue, 28 Oct 2014 00:04:04 +0900
parents 86b74532e66c
children f68d103498e0
line wrap: on
line source

package alice.datasegment;

import java.util.ArrayList;
import java.util.Iterator;

import alice.datasegment.Command; 

/**
 * Synchronized DataSegment for each DataSegment key
 * @author kazz
 *
 */
public class DataSegmentKey {

    private ArrayList<DataSegmentValue> dataList = new ArrayList<DataSegmentValue>();
    private ArrayList<Command> waitList = new ArrayList<Command>();
    private int tailIndex = 1;

    public synchronized void runCommand(Command cmd) {
        switch (cmd.type) {
        case UPDATE:
            if (dataList.size() != 0) {
                dataList.remove(0);
            }
        case PUT:
            int index = tailIndex;
            tailIndex++;
            DataSegmentValue dsv = new DataSegmentValue(index, cmd.val, cmd.obj, cmd.reverseKey); 
            dataList.add(dsv);
            // Process waiting peek and take commands
            for (Iterator<Command> iter = waitList.iterator(); iter.hasNext(); ) {
                Command waitCmd = iter.next();
                if (waitCmd.index < index) {
                    replyValue(waitCmd ,dsv);
                    iter.remove();
                    if (waitCmd.type == CommandType.TAKE) { // someone is waiting for this put or update command
                        dataList.remove(dsv);
                        break;
                    }
                }
            }
            break;
        case PEEK:
            if (cmd.index >= tailIndex) {
                waitList.add(cmd);
                break;
            }
            boolean waitFlag2 = true;
            for (DataSegmentValue data : dataList) {
                if (data.index > cmd.index) {
                    replyValue(cmd ,data);
                    waitFlag2 = false;
                    break;
                }
            }
            if (waitFlag2)
                waitList.add(cmd);
            break;
        case TAKE:
            if (cmd.index >= tailIndex) {
                waitList.add(cmd);
                break;
            }
            boolean waitFlag = true;
            for (Iterator<DataSegmentValue> iter = dataList.iterator(); iter.hasNext(); ) {
                DataSegmentValue data = iter.next();
                if (data.index > cmd.index) {
                    replyValue(cmd ,data);
                    iter.remove();
                    waitFlag = false;
                    break;
                }
            }
            if (waitFlag)
                waitList.add(cmd);
            break;
        case REMOVE:
            // TODO: implements later
            break;
        default:
        }

    }

    public void replyValue(Command cmd, DataSegmentValue data){
        if (cmd.cs!=null){ // if cmd has cs-instance, it means Command from local.
            cmd.cs.ids.reply(cmd.receiver, new Command(CommandType.REPLY, null, null, data.val, data.obj, data.index, cmd.seq, null, null, data.from));
        } else {
            try {
                if (!cmd.getQuickFlag()){ 
                    cmd.connection.sendQueue.put(new Command(CommandType.REPLY, null, null, data.val, data.obj, data.index, cmd.seq, null, null, data.from));
                }
                else {
                    cmd.connection.write(new Command(CommandType.REPLY, null, null, data.val, data.obj, data.index, cmd.seq, null, null, data.from));
                }

            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

}