view src/main/java/alice/codesegment/InputDataSegment.java @ 533:b3c9554ccb1b dispose

change compressed API to set data specified DSM name
author Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
date Mon, 04 May 2015 16:02:22 +0900
parents bfec2c3ff1b8
children 0832af83583f 3284428f525e 33f300d0720a
line wrap: on
line source

package alice.codesegment;

import java.util.concurrent.atomic.AtomicInteger;

import alice.datasegment.Command;
import alice.datasegment.CommandType;
import alice.datasegment.DataSegment;
import alice.datasegment.Receiver;
import alice.datasegment.SendOption;

/**
 * RemoteかLocalかで分けて処理する。ここに圧縮DSMへ投げる処理を追加。
 * InputDataSegment Manager
 * 			keep tracking unbound/bound count
 * @author kazz
 *
 */
public class InputDataSegment {

    public CodeSegment cs;
    private AtomicInteger count = new AtomicInteger(1); // 1 for no input data segments
    private AtomicInteger keyCount = new AtomicInteger(0); // number of DataSegments

    public InputDataSegment(CodeSegment cs) {
        this.cs = cs;
    }

    public void init(){
        count = new AtomicInteger(1);
        keyCount = new AtomicInteger(0);
    }

    public void quickPeek(Receiver receiver) {//SEDAで実行
        cs.register(receiver);
        DataSegment.get(receiver.managerKey).peek(receiver, cs, true);
    }


    public void peek(Receiver receiver) {
        cs.register(receiver);
        DataSegment.get(receiver.managerKey).peek(receiver, cs, false);
    }

    public void quickTake(Receiver receiver) {
        cs.register(receiver);
        DataSegment.get(receiver.managerKey).take(receiver, cs, true);
    }

    public void take(Receiver receiver) {
        cs.register(receiver);
        DataSegment.get(receiver.managerKey).take(receiver, cs, false);
    }

    public void reply(Receiver receiver, Command reply) {
        receiver.index = reply.index;
        receiver.from = reply.reverseKey;
        receiver.setData(reply.rData);
        receive();
    }

    public void register() {
        count.getAndIncrement();
        keyCount.getAndIncrement();
    }

    public void setKey() {
        if (keyCount.decrementAndGet() == 0) {
            receive();
        }
    }

    public void receive() {
        if (count.decrementAndGet() == 0) {
            CodeSegmentManager.submit(cs);
        }
    }

    /**
     * InputDataSegment factory
     * @param type PEEK or TAKE
     * @return Receiver of DataSegment reply
     */
    public Receiver create(CommandType type) {
        return new Receiver(this, type);
    }//Receiverを作成

    public void recommand(Receiver receiver) {
        // TODO why only local?
        DataSegment.getLocal().recommand(receiver, cs);
    }

    public void setCounter(int cnt){
        count.set(cnt);
    }
}