Mercurial > hg > Members > shoshi > distributedalgorithm
changeset 1:d24bcb819032
trying to add Selector
line wrap: on
line diff
--- a/src/main/java/suikwasha/distributedalgorithm/algorithms/cr/ChangRoberts.java Fri Oct 19 00:05:41 2012 +0900 +++ b/src/main/java/suikwasha/distributedalgorithm/algorithms/cr/ChangRoberts.java Fri Oct 19 23:48:11 2012 +0900 @@ -15,7 +15,7 @@ { LinkedList<Algorithm> algoList = new LinkedList<Algorithm>(); - for(long num = 0;num <= 40;num ++){ + for(long num = 0;num <= 1000;num ++){ algoList.add(new ChangRobertsAlgorithm(num)); }
--- a/src/main/java/suikwasha/distributedalgorithm/algorithms/cr/ChangRobertsAlgorithm.java Fri Oct 19 00:05:41 2012 +0900 +++ b/src/main/java/suikwasha/distributedalgorithm/algorithms/cr/ChangRobertsAlgorithm.java Fri Oct 19 23:48:11 2012 +0900 @@ -36,17 +36,17 @@ public void execute(Context _c) { Iterator<Port> ports = _c.getPorts().iterator(); - Port receiving = ports.next(); - Port sending = ports.next(); + Port in = ports.next(); + Port out = ports.next(); max = num; Message newMessage = new Message(toByteBuffer(false,max)); - sending.send(newMessage); + out.send(newMessage); while(true){ Message receivedMessage = null; try{ - receivedMessage = receiving.blockingReceive(); + receivedMessage = in.blockingReceive(); }catch(InterruptedException _e){ _e.printStackTrace(); } @@ -57,20 +57,20 @@ if(flagMax == (byte)1){ // maximum value detected. newMessage = receivedMessage.newMessage(toByteBuffer(true,value)); - sending.send(newMessage); + out.send(newMessage); //stop return; }else{ if(value == max){ newMessage = receivedMessage.newMessage(toByteBuffer(true,max)); - sending.send(newMessage); + out.send(newMessage); //stop return; } if(value > max){ max = value; newMessage = receivedMessage.newMessage(toByteBuffer(false,max)); - sending.send(newMessage); + out.send(newMessage); } } }
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/main/java/suikwasha/distributedalgorithm/algorithms/krs/KorachRotemSantoro.java Fri Oct 19 23:48:11 2012 +0900 @@ -0,0 +1,32 @@ +package suikwasha.distributedalgorithm.algorithms.krs; + +import java.util.LinkedList; + +import suikwasha.distributedalgorithm.framework.Algorithm; +import suikwasha.distributedalgorithm.link.ReliableLinkBuilder; +import suikwasha.distributedalgorithm.machines.SimpleMachineBuilder; +import suikwasha.distributedalgorithm.simulator.Simulator; +import suikwasha.distributedalgorithm.simulator.Summary; +import suikwasha.distributedalgorithm.topologies.RingTopologyBuilder; + +public class KorachRotemSantoro +{ + public static void main(String _args[]) throws InterruptedException + { + LinkedList<Algorithm> algoList = new LinkedList<Algorithm>(); + + long num = 0; + + for(num = 0;num <= 10;num ++){ + algoList.add(new KorachRotemSantoroAlgorithm(num,false,true)); + } + + ReliableLinkBuilder linkBuilder = new ReliableLinkBuilder(); + SimpleMachineBuilder machineBuilder = new SimpleMachineBuilder(); + RingTopologyBuilder ringBuilder = new RingTopologyBuilder(); + + Simulator sim = new Simulator(algoList,linkBuilder,machineBuilder,ringBuilder); + Summary sum = sim.startSimulation(); + sum.print(); + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/main/java/suikwasha/distributedalgorithm/algorithms/krs/KorachRotemSantoroAlgorithm.java Fri Oct 19 23:48:11 2012 +0900 @@ -0,0 +1,152 @@ +package suikwasha.distributedalgorithm.algorithms.krs; + +import java.nio.ByteBuffer; +import java.util.Iterator; + +import fj.P; +import fj.P2; + +import suikwasha.distributedalgorithm.framework.Algorithm; +import suikwasha.distributedalgorithm.framework.Context; +import suikwasha.distributedalgorithm.framework.Message; +import suikwasha.distributedalgorithm.framework.Port; + +public class KorachRotemSantoroAlgorithm implements Algorithm +{ + private final boolean isInitProcess; + private final boolean direction; + private final long num; + private long max; + + public static final int MESSAGE_SIZE = 1 + 8; // size of (byte + long) in java + + public KorachRotemSantoroAlgorithm(long _num,boolean _direction,boolean _isInitProcess) + { + num = _num; + max = -1; + direction = _direction; + isInitProcess = _isInitProcess; + } + + /* + * message format. + * flag | long value + * 0xFF | 0xFF 0xFF 0xFF 0xFF 0xFF 0xFF 0xFF 0xFF + */ + public static ByteBuffer s(boolean _flagMax,long _value) + { + ByteBuffer b = ByteBuffer.allocate(MESSAGE_SIZE); + b.put((_flagMax) ? (byte)1 : (byte)0); + b.putLong(_value); + b.rewind(); + return b; + } + + public P2<Message,Port> RECV(Port... ports) + { + while(true){ + for(Port p : ports){ + Message message = p.tryReceive(); + if(message != null){ + return P.p(message,p); + } + try{ + Thread.sleep(1); + }catch(Exception _e){ + _e.printStackTrace(); + } + } + } + + } + + public void execute(Context _c) + { + Iterator<Port> ports = _c.getPorts().iterator(); + Port left,right; + if(direction){ + left = ports.next(); + right = ports.next(); + }else{ + right = ports.next(); + left = ports.next(); + } + + Message newMessage,receivedMessage; + + if(isInitProcess){ + /* + * initial process starts from here. + */ + max = num; + right.send(new Message(s(false,max))); + }else{ + /* + * waiting process starts from here. + */ + + P2<Message,Port> ret = RECV(right,left); + receivedMessage = ret._1(); + Port receivedPort = ret._2(); + + ByteBuffer b = receivedMessage.getMessage(); + long value = b.getLong(1); // see message format above + max = Math.max(num,value); + + newMessage = receivedMessage.newMessage(s(false,max)); + if(receivedPort == right){ + left.send(newMessage); + }else{ + right.send(newMessage); + } + } + + /* + * label : L in text book. + */ + while(true){ + P2<Message,Port> ret = RECV(right,left); + + receivedMessage = ret._1(); + Port x = ret._2(); + + ByteBuffer b = receivedMessage.getMessage(); + byte flagMax = b.get(); + long value = b.getLong(); + if(max == value){ + newMessage = receivedMessage.newMessage(s(true,max)); + if(x == right){ + left.send(newMessage); + }else{ + right.send(newMessage); + } + + // stop; + return; + } + + if(flagMax == 1){ + max = value; + newMessage = receivedMessage.newMessage(s(true,max)); + if(x == right){ + left.send(newMessage); + }else{ + right.send(newMessage); + } + + // stop; + return; + } + + if(value > max){ + max = value; + newMessage = receivedMessage.newMessage(s(false,max)); + if(x == right){ + left.send(newMessage); + }else{ + right.send(newMessage); + } + } + } + } +}
--- a/src/main/java/suikwasha/distributedalgorithm/algorithms/peterson/Perterson.java Fri Oct 19 00:05:41 2012 +0900 +++ b/src/main/java/suikwasha/distributedalgorithm/algorithms/peterson/Perterson.java Fri Oct 19 23:48:11 2012 +0900 @@ -15,7 +15,7 @@ { LinkedList<Algorithm> algoList = new LinkedList<Algorithm>(); - for(long num = 0;num <= 10;num ++){ + for(long num = 0;num <= 1000;num ++){ algoList.add(new PertersonAlgorithm(num)); }
--- a/src/main/java/suikwasha/distributedalgorithm/framework/Port.java Fri Oct 19 00:05:41 2012 +0900 +++ b/src/main/java/suikwasha/distributedalgorithm/framework/Port.java Fri Oct 19 23:48:11 2012 +0900 @@ -3,6 +3,9 @@ public interface Port { public void send(Message _mes); + public boolean isReady(); public Message blockingReceive() throws InterruptedException; public Message tryReceive(); + + public void register(Selector _s); }
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/main/java/suikwasha/distributedalgorithm/framework/Selector.java Fri Oct 19 23:48:11 2012 +0900 @@ -0,0 +1,42 @@ +package suikwasha.distributedalgorithm.framework; + +import java.util.LinkedList; +import java.util.concurrent.SynchronousQueue; + +public class Selector +{ + private final SynchronousQueue<Port> queue; + private final LinkedList<Port> watchList; + + public Selector() + { + queue = new SynchronousQueue<Port>(); + watchList = new LinkedList<Port>(); + } + + public void signal(Port _p) + { + queue.offer(_p); + } + + public void register(Port _p) + { + watchList.add(_p); + } + + public Port select() throws InterruptedException + { + Port availablePort; + int length = watchList.size(); + for(int i = 0;i < length;i ++){ + availablePort = watchList.poll(); + if(availablePort.isReady()){ + watchList.addLast(availablePort); + return availablePort; + } + watchList.addLast(availablePort); + } + + return queue.take(); + } +}
--- a/src/main/java/suikwasha/distributedalgorithm/simulator/Summary.java Fri Oct 19 00:05:41 2012 +0900 +++ b/src/main/java/suikwasha/distributedalgorithm/simulator/Summary.java Fri Oct 19 23:48:11 2012 +0900 @@ -60,7 +60,7 @@ { System.out.println("Summary:"); System.out.println(String.format("MessageComplexity :\t\t%d",messageComplexity.get())); - System.out.println(String.format("MessageBitComplexity :\t\t%d",messageComplexity.get())); + System.out.println(String.format("MessageBitComplexity :\t\t%d",bitComplexity.get())); System.out.println(String.format("TimeComplexity :\t\t%d",longestMessageChain.get().getMessageCount())); } }
--- a/target/classes/META-INF/maven/suikwasha/distributedalgorithm/pom.properties Fri Oct 19 00:05:41 2012 +0900 +++ b/target/classes/META-INF/maven/suikwasha/distributedalgorithm/pom.properties Fri Oct 19 23:48:11 2012 +0900 @@ -1,5 +1,5 @@ #Generated by Maven Integration for Eclipse -#Fri Oct 19 00:04:33 JST 2012 +#Fri Oct 19 23:47:17 JST 2012 version=0.0.1-SNAPSHOT groupId=suikwasha m2e.projectName=distributedalgorithm