view src/main/java/alice/codesegment/InputDataSegment.java @ 452:f68d103498e0 dispose

refactor (InputDataSegment holder class changed)
author sugi
date Tue, 28 Oct 2014 17:24:16 +0900 (2014-10-28)
parents aadea6a59376
children b004f62b83e5
line wrap: on
line source
package alice.codesegment;

import java.util.concurrent.atomic.AtomicInteger;

import alice.datasegment.Command;
import alice.datasegment.CommandType;
import alice.datasegment.DataSegment;
import alice.datasegment.ReceiveData;
import alice.datasegment.Receiver;

/**
 * InputDataSegment Manager
 * 			keep tracking unbound/bound count
 * @author kazz
 *
 */
public class InputDataSegment {

    public CodeSegment cs;
    private AtomicInteger count = new AtomicInteger(1); // 1 for no input data segments
    private AtomicInteger keyCount = new AtomicInteger(0); // number of DataSegments
    public InputDataSegment(CodeSegment cs) {
        this.cs = cs;
    }

    public void init(){
        count = new AtomicInteger(1);
        keyCount = new AtomicInteger(0);
    }

    public void quickPeek(Receiver receiver) {
        cs.list.add(receiver);
        if (receiver.managerKey==null){
            DataSegment.getLocal().peek(receiver, cs);
        } else {
            DataSegment.get(receiver.managerKey).quickPeek(receiver ,cs);
        }
    }

    public void quickTake(Receiver receiver) {
        cs.list.add(receiver);
        if (receiver.managerKey==null){
            DataSegment.getLocal().quickTake(receiver, cs);
        } else {
            DataSegment.get(receiver.managerKey).quickTake(receiver ,cs);
        }
    }

    public void peek(Receiver receiver) {
        cs.list.add(receiver);
        if (receiver.managerKey==null){
            DataSegment.getLocal().peek(receiver, cs);
        } else {
            DataSegment.get(receiver.managerKey).peek(receiver, cs);
        }
    }


    public void take(Receiver receiver) {
        cs.list.add(receiver);
        if (receiver.managerKey==null){
            DataSegment.getLocal().take(receiver, cs);
        } else {
            DataSegment.get(receiver.managerKey).take(receiver, cs);
        }
    }

    public void reply(Receiver receiver, Command reply) {
        receiver.index = reply.index;
        receiver.from = reply.reverseKey;        
        receiver.setData(new ReceiveData(reply.val, reply.getCompressFlag(), reply.getSerializeFlag()));
        receive();
    }

    public void register() {
        count.getAndIncrement();
        keyCount.getAndIncrement();
    }

    public void setKey() {
        if (keyCount.decrementAndGet() == 0) {
            receive();
        }
    }

    public void receive() {
        if (count.decrementAndGet() == 0) {
            CodeSegmentManager.submit(cs);
        }
    }

    /**
     * InputDataSegment factory
     * @param type PEEK or TAKE
     * @return Receiver of DataSegment reply 
     */
    public Receiver create(CommandType type) {
        return new Receiver(this, type);
    }

    public void recommand(Receiver receiver) {
        // TODO why only local?
        DataSegment.getLocal().recommand(receiver, cs);
    }

    public void setCounter(int cnt){
        count.set(cnt);
    }
}