view src/fdl/MetaLinda.java @ 88:5d1189e9e420

MetaLinda.sync() fix
author one
date Thu, 11 Feb 2010 12:22:41 +0900
parents c0591636a71a
children b658ff1eb90f
line wrap: on
line source


/*
 * @(#)MetaLinda.java       1.1 06/04/01
 *
 * Copyright 2008  Shinji KONO
 * 

   Meta Lidna
     Trasport layer of Meta Linda API

 */

package fdl;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.LinkedList;

/**
 MetaLinda 
 *
 * @author Shinji Kono
 *

   meta tuple interface in Linda Server

 */

public class MetaLinda implements PSXLinda {

	public TupleSpace ts;
	public FDLindaServ fds;
	public FederatedLinda fdl=FederatedLinda.init();
	public PSXLinda next=null;
	private LinkedList<MetaReply> replies=new LinkedList<MetaReply>();

	public MetaLinda(TupleSpace ts,FDLindaServ fds) {
		this.ts = ts;
		this.fds = fds;
	}

	public PSXLinda open(String _host,int _port) 
	throws IOException {
		return fdl.openFromMetaLinda(this, _host, _port);
	}

	public PSXReply in(int id) {
		MetaReply r = new MetaReply(PSX.PSX_IN,id,ts);
		addReply(r);
		return r;
	}

	public void in(int id, PSXCallback callback) {
		MetaReply r = new MetaReply(PSX.PSX_IN,id,ts, callback);
		addReply(r);
	}

	private void addReply(MetaReply r) {
		replies.addLast(r);
	}

	public PSXReply ck(int id) {
		MetaReply r = new MetaReply(PSX.PSX_CHECK,id,ts);
		return r;
	}

	public void ck(int id, PSXCallback callback) {
		MetaReply r = new MetaReply(PSX.PSX_CHECK,id,ts,callback);
		addReply(r);
	}

	public PSXReply out(int id, ByteBuffer data) {
		MetaReply r = new MetaReply(PSX.PSX_OUT,id,ts,data,null);
		addReply(r);
		return r;
	}

	public PSXReply update(int id, ByteBuffer data) {
		MetaReply r = new MetaReply(PSX.PSX_UPDATE,id,ts,data,null);
		return r;
	}

	public void update(int id, ByteBuffer data,PSXCallback callback) {
		MetaReply r = new MetaReply(PSX.PSX_UPDATE,id,ts,data,callback);
		addReply(r);
	}

	public PSXReply rd(int id) {
		MetaReply r = new MetaReply(PSX.PSX_RD,id,ts);
		return r;
	}

	public void rd(int id, PSXCallback callback) {
		MetaReply r = new MetaReply(PSX.PSX_RD,id,ts,callback);
		addReply(r);
	}

	public PSXLinda add(PSXLinda linda) {
		next = linda;
		return this;
	}


	public int sync() {
		return sync(0);
	}

	public int sync(long timeout) {
		fdl.queueExec();
		fds.checkTuple(timeout); // fdl sync is also handled here
		/*
		 * r.callback() may call meta.sync() and modifies the
		 * replies queue. Do current size of queue only. The
		 * rest is checked on the next sync call including
		 * the recursive case.
		 */
		int count = replies.size();
		while(count-->0) {
			MetaReply r = replies.poll();
			// previous call back may call this sync and make 
			// replies shorter.
			if (r==null) break;
			if (r.ready()) {
			} else {
				addReply(r);
			}
		}
		return 0;
	}

	public void send(ByteBuffer command, ByteBuffer data) {
	}

	public void setTupleSpaceHook(IOHandlerHook hook) {
		ts.hook = hook;
	}
}


/* end */