view src/alice/datasegment/LocalDataSegmentManager.java @ 30:b5a21baf0b07

implements RingTopology
author kazz <kazz@cr.ie.u-ryukyu.ac.jp>
date Tue, 17 Jan 2012 16:13:03 +0900
parents 98ab26e09a98
children 20c67f673224
line wrap: on
line source

package alice.datasegment;

import org.msgpack.type.Value;

import alice.codesegment.CodeSegment;
import alice.datasegment.CommandType;

public class LocalDataSegmentManager extends DataSegmentManager {
	
	public String reverseKey = "local";
	
	public LocalDataSegmentManager() {
		new Thread(replyThread, "LocalDataSegmentManager").start();
	}
	
	public DataSegmentKey getDataSegmentKey(String key) {
		if (key == null) {
			return null;
		}
		DataSegmentKey newDataSegmentKey = new DataSegmentKey(key);
		DataSegmentKey dataSegmentKey = dataSegments.putIfAbsent(key, newDataSegmentKey);
		if (dataSegmentKey == null) {
			newDataSegmentKey.runKeyThread();
			dataSegmentKey = newDataSegmentKey;
		}
		return dataSegmentKey;
	}
	
	@Override
	public void put(String key, Value val) {
		DataSegmentKey dataSegmentKey = getDataSegmentKey(key);
		dataSegmentKey.addCommand(new Command(CommandType.PUT, null, null, val, 0, 0, replyQueue, null, reverseKey));
	}

	@Override
	public void update(String key, Value val) {
		DataSegmentKey dataSegmentKey = getDataSegmentKey(key);
		dataSegmentKey.addCommand(new Command(CommandType.UPDATE, null, null, val, 0, 0, replyQueue, null, reverseKey));
	}

	@Override
	public void take(DataSegmentReceiver receiver, String key, int index, CodeSegment cs) {
		DataSegmentKey dataSegmentKey = getDataSegmentKey(key);
		int seq = this.seq.getAndIncrement();
		Command cmd = new Command(CommandType.TAKE, receiver, null, null, index, seq, replyQueue, cs, null);
		seqHash.put(seq, cmd);
		dataSegmentKey.addCommand(cmd);
	}

	@Override
	public void peek(DataSegmentReceiver receiver, String key, int index, CodeSegment cs) {
		DataSegmentKey dataSegmentKey = getDataSegmentKey(key);
		int seq = this.seq.getAndIncrement();
		Command cmd = new Command(CommandType.PEEK, receiver, null, null, index, seq, replyQueue, cs, null);
		seqHash.put(seq, cmd);
		dataSegmentKey.addCommand(cmd);
	}

	@Override
	public void remove(String key) {
		DataSegmentKey dataSegmentKey = getDataSegmentKey(key);
		dataSegmentKey.addCommand(new Command(CommandType.REMOVE, null, null, null, 0, 0, replyQueue, null, null));
	}
	
	@Override public void finish() {
		System.exit(0);
	}
	
}