Mercurial > hg > Members > shoshi > distributedalgorithm
changeset 0:38a110b13db1
added SimpleDistributedAlgorithmFramework.
added NaiveAlgorithm
added ChangRobertsAlgorithm
added PertersonAlgorithm
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/pom.xml Fri Oct 19 00:05:41 2012 +0900 @@ -0,0 +1,30 @@ +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <groupId>suikwasha</groupId> + <artifactId>distributedalgorithm</artifactId> + <version>0.0.1-SNAPSHOT</version> + <packaging>jar</packaging> + + <name>distributedalgorithm</name> + <url>http://maven.apache.org</url> + + <properties> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + </properties> + + <dependencies> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <version>3.8.1</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.functionaljava</groupId> + <artifactId>functionaljava</artifactId> + <version>3.0</version> + </dependency> + </dependencies> +</project>
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/main/java/suikwasha/distributedalgorithm/App.java Fri Oct 19 00:05:41 2012 +0900 @@ -0,0 +1,13 @@ +package suikwasha.distributedalgorithm; + +/** + * Hello world! + * + */ +public class App +{ + public static void main( String[] args ) + { + System.out.println( "Hello World!" ); + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/main/java/suikwasha/distributedalgorithm/algorithms/cr/ChangRoberts.java Fri Oct 19 00:05:41 2012 +0900 @@ -0,0 +1,30 @@ +package suikwasha.distributedalgorithm.algorithms.cr; + +import java.util.LinkedList; + +import suikwasha.distributedalgorithm.framework.Algorithm; +import suikwasha.distributedalgorithm.link.ReliableLinkBuilder; +import suikwasha.distributedalgorithm.machines.SimpleMachineBuilder; +import suikwasha.distributedalgorithm.simulator.Simulator; +import suikwasha.distributedalgorithm.simulator.Summary; +import suikwasha.distributedalgorithm.topologies.RingTopologyBuilder; + +public class ChangRoberts +{ + public static void main(String _args[]) throws InterruptedException + { + LinkedList<Algorithm> algoList = new LinkedList<Algorithm>(); + + for(long num = 0;num <= 40;num ++){ + algoList.add(new ChangRobertsAlgorithm(num)); + } + + ReliableLinkBuilder linkBuilder = new ReliableLinkBuilder(); + SimpleMachineBuilder machineBuilder = new SimpleMachineBuilder(); + RingTopologyBuilder ringBuilder = new RingTopologyBuilder(); + + Simulator sim = new Simulator(algoList,linkBuilder,machineBuilder,ringBuilder); + Summary sum = sim.startSimulation(); + sum.print(); + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/main/java/suikwasha/distributedalgorithm/algorithms/cr/ChangRobertsAlgorithm.java Fri Oct 19 00:05:41 2012 +0900 @@ -0,0 +1,79 @@ +package suikwasha.distributedalgorithm.algorithms.cr; + +import java.nio.ByteBuffer; +import java.util.Iterator; + +import suikwasha.distributedalgorithm.framework.Algorithm; +import suikwasha.distributedalgorithm.framework.Context; +import suikwasha.distributedalgorithm.framework.Message; +import suikwasha.distributedalgorithm.framework.Port; + +public class ChangRobertsAlgorithm implements Algorithm +{ + private final long num; + private long max; + + public ChangRobertsAlgorithm(long _num) + { + max = -1; + num = _num; + } + + public static ByteBuffer toByteBuffer(boolean _max,long _value) + { + ByteBuffer b = ByteBuffer.allocate(9); // size of (flagbit + long) + if(_max){ + b.put((byte)1); + }else{ + b.put((byte)0); + } + + b.putLong(_value); + b.rewind(); // do not forget rewind() after reading/writing + return b.asReadOnlyBuffer(); // message is read-only + } + + public void execute(Context _c) + { + Iterator<Port> ports = _c.getPorts().iterator(); + Port receiving = ports.next(); + Port sending = ports.next(); + + max = num; + Message newMessage = new Message(toByteBuffer(false,max)); + sending.send(newMessage); + + while(true){ + Message receivedMessage = null; + try{ + receivedMessage = receiving.blockingReceive(); + }catch(InterruptedException _e){ + _e.printStackTrace(); + } + + ByteBuffer b = receivedMessage.getMessage(); + byte flagMax = b.get(); + long value = b.getLong(); + if(flagMax == (byte)1){ + // maximum value detected. + newMessage = receivedMessage.newMessage(toByteBuffer(true,value)); + sending.send(newMessage); + //stop + return; + }else{ + if(value == max){ + newMessage = receivedMessage.newMessage(toByteBuffer(true,max)); + sending.send(newMessage); + //stop + return; + } + if(value > max){ + max = value; + newMessage = receivedMessage.newMessage(toByteBuffer(false,max)); + sending.send(newMessage); + } + } + } + } + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/main/java/suikwasha/distributedalgorithm/algorithms/naive/Naive.java Fri Oct 19 00:05:41 2012 +0900 @@ -0,0 +1,34 @@ +package suikwasha.distributedalgorithm.algorithms.naive; + +import java.util.LinkedList; + +import suikwasha.distributedalgorithm.framework.Algorithm; +import suikwasha.distributedalgorithm.link.ReliableLinkBuilder; +import suikwasha.distributedalgorithm.machines.SimpleMachineBuilder; +import suikwasha.distributedalgorithm.simulator.Simulator; +import suikwasha.distributedalgorithm.simulator.Summary; +import suikwasha.distributedalgorithm.topologies.RingTopologyBuilder; + +public class Naive +{ + public static void main(String _args[]) throws InterruptedException + { + LinkedList<Algorithm> algoList = new LinkedList<Algorithm>(); + + long num = 1; + algoList.add(new NaiveStartAlgorithm(num)); + + num ++; + for(;num <= 40;num ++){ + algoList.add(new NaiveStandbyAlgorithm(num)); + } + + ReliableLinkBuilder linkBuilder = new ReliableLinkBuilder(); + SimpleMachineBuilder machineBuilder = new SimpleMachineBuilder(); + RingTopologyBuilder ringBuilder = new RingTopologyBuilder(); + + Simulator sim = new Simulator(algoList,linkBuilder,machineBuilder,ringBuilder); + Summary sum = sim.startSimulation(); + sum.print(); + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/main/java/suikwasha/distributedalgorithm/algorithms/naive/NaiveStandbyAlgorithm.java Fri Oct 19 00:05:41 2012 +0900 @@ -0,0 +1,78 @@ +package suikwasha.distributedalgorithm.algorithms.naive; + +import java.util.Iterator; + +import suikwasha.distributedalgorithm.framework.Algorithm; +import suikwasha.distributedalgorithm.framework.Context; +import suikwasha.distributedalgorithm.framework.Message; +import suikwasha.distributedalgorithm.framework.Port; + +import static suikwasha.distributedalgorithm.algorithms.naive.NaiveStartAlgorithm.toByteBuffer; + +/* + * this standby algorithm for NAIVE using UnidirectedRing + * Port1 : receiving only + * Port2 : sending only + */ +public class NaiveStandbyAlgorithm implements Algorithm +{ + private long max; + private final long num; + + public NaiveStandbyAlgorithm(long _num) + { + max = -1; + num = _num; + } + + public long getValue() + { + return max; + } + + public void execute(Context _c) + { + Iterator<Port> ports = _c.getPorts().iterator(); + Port receiving = ports.next(); + Port sending = ports.next(); + + Message receivedMessage = null; + try{ + receivedMessage = receiving.blockingReceive(); + }catch(InterruptedException _e){ + Thread.currentThread().interrupt(); + _e.printStackTrace(); + } + + long value = receivedMessage.getMessage().getLong(); + max = Math.max(num,max); + + Message m1 = receivedMessage.newMessage(toByteBuffer(num)); + Message m2 = receivedMessage.newMessage(toByteBuffer(value)); + + sending.send(m1); + sending.send(m2); + + while(true){ + try{ + receivedMessage = receiving.blockingReceive(); + }catch(InterruptedException _e){ + _e.printStackTrace(); + Thread.currentThread().interrupt(); + return; + } + + value = receivedMessage.getMessage().getLong(); + if(max == value){ + Message newMessage = receivedMessage.newMessage(toByteBuffer(max)); + sending.send(newMessage); + return; + } + + max = Math.max(max,value); + Message newMessage = receivedMessage.newMessage(toByteBuffer(value)); + sending.send(newMessage); + } + } + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/main/java/suikwasha/distributedalgorithm/algorithms/naive/NaiveStartAlgorithm.java Fri Oct 19 00:05:41 2012 +0900 @@ -0,0 +1,73 @@ +package suikwasha.distributedalgorithm.algorithms.naive; + + +import java.nio.ByteBuffer; +import java.util.Iterator; + +import suikwasha.distributedalgorithm.framework.Algorithm; +import suikwasha.distributedalgorithm.framework.Context; +import suikwasha.distributedalgorithm.framework.Message; +import suikwasha.distributedalgorithm.framework.Port; + +/* + * this start algorithm for NAIVE using UnidirectedRing + * Port1 : receiving only + * Port2 : sending only + */ +public class NaiveStartAlgorithm implements Algorithm +{ + private long max; + private final long num; + + public NaiveStartAlgorithm(long _num) + { + max = -1; + num = _num; + } + + public long getValue() + { + return max; + } + + public static ByteBuffer toByteBuffer(long _value) + { + ByteBuffer b = ByteBuffer.allocate(8); // size of long in java + b.putLong(_value); + b.rewind(); // do not forget rewind() after reading/writing + return b.asReadOnlyBuffer(); // message is read-only + } + + public void execute(Context _c) + { + Iterator<Port> ports = _c.getPorts().iterator(); + Port receiving = ports.next(); + Port sending = ports.next(); + + max = num; + Message firstMessage = new Message(toByteBuffer(num)); + sending.send(firstMessage); + + while(true){ + Message receivedMessage; + try{ + receivedMessage = receiving.blockingReceive(); + }catch(InterruptedException _e){ + _e.printStackTrace(); + Thread.currentThread().interrupt(); + return; + } + + long value = receivedMessage.getMessage().getLong(); + if(max == value){ + Message newMessage = receivedMessage.newMessage(toByteBuffer(max)); + sending.send(newMessage); + return; + } + + max = Math.max(max,value); + Message newMessage = receivedMessage.newMessage(toByteBuffer(value)); + sending.send(newMessage); + } + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/main/java/suikwasha/distributedalgorithm/algorithms/peterson/Perterson.java Fri Oct 19 00:05:41 2012 +0900 @@ -0,0 +1,31 @@ +package suikwasha.distributedalgorithm.algorithms.peterson; + +import java.util.LinkedList; + +import suikwasha.distributedalgorithm.framework.Algorithm; +import suikwasha.distributedalgorithm.link.ReliableLinkBuilder; +import suikwasha.distributedalgorithm.machines.SimpleMachineBuilder; +import suikwasha.distributedalgorithm.simulator.Simulator; +import suikwasha.distributedalgorithm.simulator.Summary; +import suikwasha.distributedalgorithm.topologies.RingTopologyBuilder; + +public class Perterson +{ + public static void main(String _args[]) throws InterruptedException + { + LinkedList<Algorithm> algoList = new LinkedList<Algorithm>(); + + for(long num = 0;num <= 10;num ++){ + algoList.add(new PertersonAlgorithm(num)); + } + + ReliableLinkBuilder linkBuilder = new ReliableLinkBuilder(); + SimpleMachineBuilder machineBuilder = new SimpleMachineBuilder(); + RingTopologyBuilder ringBuilder = new RingTopologyBuilder(); + + Simulator sim = new Simulator(algoList,linkBuilder,machineBuilder,ringBuilder); + Summary sum = sim.startSimulation(); + sum.print(); + } + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/main/java/suikwasha/distributedalgorithm/algorithms/peterson/PertersonAlgorithm.java Fri Oct 19 00:05:41 2012 +0900 @@ -0,0 +1,123 @@ +package suikwasha.distributedalgorithm.algorithms.peterson; + +import java.nio.ByteBuffer; +import java.util.Iterator; + +import suikwasha.distributedalgorithm.framework.Algorithm; +import suikwasha.distributedalgorithm.framework.Context; +import suikwasha.distributedalgorithm.framework.Message; +import suikwasha.distributedalgorithm.framework.Port; + +public class PertersonAlgorithm implements Algorithm +{ + private final long num; + private long max; + + public PertersonAlgorithm(long _num) + { + max = -1; + num = _num; + } + + public static ByteBuffer toByteBuffer(boolean _max,long _value) + { + ByteBuffer b = ByteBuffer.allocate(9); // size of (flag + long) + if(_max){ + b.put((byte)1); + }else{ + b.put((byte)0); + } + + b.putLong(_value); + b.rewind(); // do not forget rewind() after reading/writing + return b.asReadOnlyBuffer(); // message is read-only + } + + public void execute(Context _c) + { + Iterator<Port> ports = _c.getPorts().iterator(); + Port in = ports.next(); + Port out = ports.next(); + + /* + * Active + */ + + long temp = num; + out.send(new Message(toByteBuffer(false,temp))); + + Message receivedMessage = null; + try{ + receivedMessage = in.blockingReceive(); + }catch(InterruptedException _e){ + _e.printStackTrace(); + } + + ByteBuffer b = receivedMessage.getMessage(); + long next = -1; + if(b.get() == (byte)0){ + next = b.getLong(); + }else{ + System.out.println("b.get() != 0"); + } + + Message newMessage; + while(temp != next){ + newMessage = receivedMessage.newMessage(toByteBuffer(false,next)); + out.send(newMessage); + try{ + receivedMessage = in.blockingReceive(); + }catch(InterruptedException _e){ + _e.printStackTrace(); + } + long nnext = -1; + b = receivedMessage.getMessage(); + if(b.get() == (byte)0){ + nnext = b.getLong(); + } + if(next < Math.max(temp,nnext)){ + break; + } + if(next > Math.max(temp,nnext)){ + temp = next; + } + newMessage = receivedMessage.newMessage(toByteBuffer(false,temp)); + out.send(newMessage); + newMessage = receivedMessage.newMessage(toByteBuffer(false,next)); + out.send(newMessage); + } + + if(temp == next){ + max = temp; + newMessage = receivedMessage.newMessage(toByteBuffer(true,max)); + out.send(newMessage); + + // stop + return; + } + + /* + * Passive + */ + while(true){ + try{ + receivedMessage = in.blockingReceive(); + }catch(InterruptedException _e){ + _e.printStackTrace(); + } + b = receivedMessage.getMessage(); + if(b.get() == (byte)1){ + max = b.getLong(); + newMessage = receivedMessage.newMessage(toByteBuffer(true,max)); + out.send(newMessage); + + //stop + return; + } + + long value = b.getLong(); + newMessage = receivedMessage.newMessage(toByteBuffer(false,value)); + out.send(newMessage); + } + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/main/java/suikwasha/distributedalgorithm/framework/Algorithm.java Fri Oct 19 00:05:41 2012 +0900 @@ -0,0 +1,6 @@ +package suikwasha.distributedalgorithm.framework; + +public interface Algorithm +{ + public void execute(Context _c); +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/main/java/suikwasha/distributedalgorithm/framework/Context.java Fri Oct 19 00:05:41 2012 +0900 @@ -0,0 +1,6 @@ +package suikwasha.distributedalgorithm.framework; + +public interface Context +{ + public Iterable<Port> getPorts(); +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/main/java/suikwasha/distributedalgorithm/framework/Link.java Fri Oct 19 00:05:41 2012 +0900 @@ -0,0 +1,7 @@ +package suikwasha.distributedalgorithm.framework; + +public interface Link +{ + public Port getPort1(); + public Port getPort2(); +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/main/java/suikwasha/distributedalgorithm/framework/LinkBuilder.java Fri Oct 19 00:05:41 2012 +0900 @@ -0,0 +1,6 @@ +package suikwasha.distributedalgorithm.framework; + +public interface LinkBuilder +{ + public Link build(); +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/main/java/suikwasha/distributedalgorithm/framework/Machine.java Fri Oct 19 00:05:41 2012 +0900 @@ -0,0 +1,7 @@ +package suikwasha.distributedalgorithm.framework; + +public abstract class Machine extends Thread +{ + public abstract void addPort(Port _p); + public abstract void run(); +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/main/java/suikwasha/distributedalgorithm/framework/MachineBuilder.java Fri Oct 19 00:05:41 2012 +0900 @@ -0,0 +1,6 @@ +package suikwasha.distributedalgorithm.framework; + +public interface MachineBuilder +{ + public Machine createMachine(Algorithm _algo); +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/main/java/suikwasha/distributedalgorithm/framework/Message.java Fri Oct 19 00:05:41 2012 +0900 @@ -0,0 +1,42 @@ +package suikwasha.distributedalgorithm.framework; + +import java.nio.ByteBuffer; + +public class Message +{ + private final ByteBuffer message; + private final MessageChain chain; + + public Message(ByteBuffer _message) + { + message = _message; + chain = MessageChain.NIL_MESSAGECHAIN; + } + + private Message(ByteBuffer _message,MessageChain _chain) + { + message = _message; + chain = _chain; + } + + public ByteBuffer getMessage() + { + return message.asReadOnlyBuffer(); + } + + public MessageChain getMessageChain() + { + return chain; + } + + public long getSize() + { + return message.limit(); + } + + public Message newMessage(ByteBuffer _message) + { + MessageChain newChain = chain.add(this); + return new Message(_message,newChain); + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/main/java/suikwasha/distributedalgorithm/framework/MessageChain.java Fri Oct 19 00:05:41 2012 +0900 @@ -0,0 +1,37 @@ +package suikwasha.distributedalgorithm.framework; + +import java.util.Iterator; + +import fj.data.List; + +public class MessageChain implements Iterable<Message> +{ + private static final List<Message> NIL_LIST = List.nil(); + private final List<Message> list; + public static final MessageChain NIL_MESSAGECHAIN = new MessageChain(); + + private MessageChain() + { + list = NIL_LIST; + } + + private MessageChain(List<Message> _list,Message _message) + { + list = _list.snoc(_message); + } + + public MessageChain add(Message _message) + { + return new MessageChain(list,_message); + } + + public long getMessageCount() + { + return list.length(); + } + + public Iterator<Message> iterator() + { + return list.iterator(); + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/main/java/suikwasha/distributedalgorithm/framework/Port.java Fri Oct 19 00:05:41 2012 +0900 @@ -0,0 +1,8 @@ +package suikwasha.distributedalgorithm.framework; + +public interface Port +{ + public void send(Message _mes); + public Message blockingReceive() throws InterruptedException; + public Message tryReceive(); +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/main/java/suikwasha/distributedalgorithm/framework/Topology.java Fri Oct 19 00:05:41 2012 +0900 @@ -0,0 +1,7 @@ +package suikwasha.distributedalgorithm.framework; + +public interface Topology +{ + public Iterable<Machine> getMachines(); + public long getMachineCount(); +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/main/java/suikwasha/distributedalgorithm/framework/TopologyBuilder.java Fri Oct 19 00:05:41 2012 +0900 @@ -0,0 +1,6 @@ +package suikwasha.distributedalgorithm.framework; + +public interface TopologyBuilder +{ + public Topology build(Iterable<Algorithm> _algos,MachineBuilder _machineBuilder,LinkBuilder _linkBuilder); +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/main/java/suikwasha/distributedalgorithm/link/ReliableLinkBuilder.java Fri Oct 19 00:05:41 2012 +0900 @@ -0,0 +1,67 @@ +package suikwasha.distributedalgorithm.link; + +import java.util.concurrent.LinkedBlockingQueue; + +import suikwasha.distributedalgorithm.framework.Link; +import suikwasha.distributedalgorithm.framework.LinkBuilder; +import suikwasha.distributedalgorithm.framework.Message; +import suikwasha.distributedalgorithm.framework.Port; + +public class ReliableLinkBuilder implements LinkBuilder +{ + public Link build() + { + return new LinkImpl(); + } + + private static class LinkImpl implements Link + { + private final Port port1,port2; + + public LinkImpl() + { + LinkedBlockingQueue<Message> queue1 = new LinkedBlockingQueue<Message>(); + LinkedBlockingQueue<Message> queue2 = new LinkedBlockingQueue<Message>(); + + port1 = new PortImpl(queue1,queue2); + port2 = new PortImpl(queue2,queue1); + } + + public Port getPort1() + { + return port1; + } + + public Port getPort2() + { + return port2; + } + } + + private static class PortImpl implements Port + { + private final LinkedBlockingQueue<Message> in; + private final LinkedBlockingQueue<Message> out; + + public PortImpl(LinkedBlockingQueue<Message> _in,LinkedBlockingQueue<Message> _out) + { + in = _in; + out = _out; + } + + public void send(Message _mes) + { + out.add(_mes); + } + + public Message blockingReceive() throws InterruptedException + { + return in.take(); + } + + public Message tryReceive() + { + return in.poll(); + } + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/main/java/suikwasha/distributedalgorithm/machines/SimpleMachineBuilder.java Fri Oct 19 00:05:41 2012 +0900 @@ -0,0 +1,54 @@ +package suikwasha.distributedalgorithm.machines; + + +import fj.data.List; +import suikwasha.distributedalgorithm.framework.Context; +import suikwasha.distributedalgorithm.framework.Algorithm; +import suikwasha.distributedalgorithm.framework.Machine; +import suikwasha.distributedalgorithm.framework.MachineBuilder; +import suikwasha.distributedalgorithm.framework.Port; + +public class SimpleMachineBuilder implements MachineBuilder +{ + public Machine createMachine(Algorithm _algo) + { + return new MachineImpl(_algo); + } + + private static class MachineImpl extends Machine + { + private List<Port> ports; + private final Algorithm algo; + + public MachineImpl(Algorithm _algo) + { + ports = List.nil(); + algo = _algo; + } + + public void addPort(Port _p) + { + ports = ports.snoc(_p); + } + + public void run() + { + algo.execute(new ContextImpl(ports)); + } + } + + private static class ContextImpl implements Context + { + public List<Port> ports; + + public ContextImpl(List<Port> _ports) + { + ports = _ports; + } + + public Iterable<Port> getPorts() + { + return ports; + } + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/main/java/suikwasha/distributedalgorithm/simulator/LoggingMessageLinkBuilder.java Fri Oct 19 00:05:41 2012 +0900 @@ -0,0 +1,77 @@ +package suikwasha.distributedalgorithm.simulator; + +import suikwasha.distributedalgorithm.framework.Link; +import suikwasha.distributedalgorithm.framework.LinkBuilder; +import suikwasha.distributedalgorithm.framework.Message; +import suikwasha.distributedalgorithm.framework.Port; + +public class LoggingMessageLinkBuilder implements LinkBuilder +{ + private final LinkBuilder builder; + private final Summary summary; + + public LoggingMessageLinkBuilder(Summary _summary,LinkBuilder _builder) + { + summary = _summary; + builder = _builder; + } + + public Link build() + { + return new LoggingLink(summary,builder.build()); + } + + public static class LoggingLink implements Link + { + private Link link; + private Summary summary; + + public LoggingLink(Summary _summary,Link _link) + { + summary = _summary; + link = _link; + } + + public Port getPort1() + { + return new LoggingPort(summary,link.getPort1()); + } + + public Port getPort2() + { + return new LoggingPort(summary,link.getPort2()); + } + } + + public static class LoggingPort implements Port + { + private Summary summary; + private Port port; + + public LoggingPort(Summary _summary,Port _port) + { + summary = _summary; + port = _port; + } + + public void send(Message _mes) + { + long bits = _mes.getMessage().limit(); + summary.addMessageBitCount(bits); + summary.incrementMessageCount(); + summary.trySetMessageChain(_mes.getMessageChain()); + + port.send(_mes); + } + + public Message blockingReceive() throws InterruptedException + { + return port.blockingReceive(); + } + + public Message tryReceive() + { + return port.tryReceive(); + } + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/main/java/suikwasha/distributedalgorithm/simulator/Simulator.java Fri Oct 19 00:05:41 2012 +0900 @@ -0,0 +1,43 @@ +package suikwasha.distributedalgorithm.simulator; + +import suikwasha.distributedalgorithm.framework.Algorithm; +import suikwasha.distributedalgorithm.framework.LinkBuilder; +import suikwasha.distributedalgorithm.framework.Machine; +import suikwasha.distributedalgorithm.framework.MachineBuilder; +import suikwasha.distributedalgorithm.framework.Topology; +import suikwasha.distributedalgorithm.framework.TopologyBuilder; + +public class Simulator +{ + private final Iterable<Algorithm> algos; + private final LinkBuilder linkBuilder; + private final MachineBuilder machineBuilder; + private final TopologyBuilder topologyBuilder; + + public Simulator(Iterable<Algorithm> _algos,LinkBuilder _linkBuilder + ,MachineBuilder _machineBuilder,TopologyBuilder _topologyBuilder) + { + algos = _algos; + linkBuilder = _linkBuilder; + machineBuilder = _machineBuilder; + topologyBuilder = _topologyBuilder; + } + + public Summary startSimulation() throws InterruptedException + { + Summary summary = new Summary(); + LoggingMessageLinkBuilder logLinkBuilder = new LoggingMessageLinkBuilder(summary,linkBuilder); + SyncMachineBuilder syncMachineBuilder = new SyncMachineBuilder(machineBuilder); + + Topology topologies = topologyBuilder.build(algos,syncMachineBuilder,logLinkBuilder); + + for(Machine machine : topologies.getMachines()){ + machine.start(); + } + + Synchronizer sync = syncMachineBuilder.getSynchronizer(); + sync.await(); + + return summary; + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/main/java/suikwasha/distributedalgorithm/simulator/Summary.java Fri Oct 19 00:05:41 2012 +0900 @@ -0,0 +1,66 @@ +package suikwasha.distributedalgorithm.simulator; + +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; + +import suikwasha.distributedalgorithm.framework.MessageChain; + +public class Summary +{ + private final AtomicLong messageComplexity; + private final AtomicLong bitComplexity; + private final AtomicReference<MessageChain> longestMessageChain; + + public Summary() + { + messageComplexity = new AtomicLong(0); + bitComplexity = new AtomicLong(0); + longestMessageChain = new AtomicReference<MessageChain>(MessageChain.NIL_MESSAGECHAIN); + } + + public void incrementMessageCount() + { + messageComplexity.incrementAndGet(); + } + + public void addMessageBitCount(long _size) + { + bitComplexity.addAndGet(_size); + } + + public long getCurrentMessageComplexity() + { + return messageComplexity.get(); + } + + public long getCurrentBitComplexity() + { + return bitComplexity.get(); + } + + public MessageChain getLongestMessageChain() + { + return longestMessageChain.get(); + } + + public boolean trySetMessageChain(MessageChain _chain) + { + MessageChain currentLongestChain; + do{ + currentLongestChain = longestMessageChain.get(); + if(currentLongestChain.getMessageCount() > _chain.getMessageCount()){ + return false; + } + }while(!longestMessageChain.compareAndSet(currentLongestChain,_chain)); + + return true; + } + + public void print() + { + System.out.println("Summary:"); + System.out.println(String.format("MessageComplexity :\t\t%d",messageComplexity.get())); + System.out.println(String.format("MessageBitComplexity :\t\t%d",messageComplexity.get())); + System.out.println(String.format("TimeComplexity :\t\t%d",longestMessageChain.get().getMessageCount())); + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/main/java/suikwasha/distributedalgorithm/simulator/SyncMachineBuilder.java Fri Oct 19 00:05:41 2012 +0900 @@ -0,0 +1,53 @@ +package suikwasha.distributedalgorithm.simulator; + +import suikwasha.distributedalgorithm.framework.Algorithm; +import suikwasha.distributedalgorithm.framework.Machine; +import suikwasha.distributedalgorithm.framework.MachineBuilder; +import suikwasha.distributedalgorithm.framework.Port; + +public class SyncMachineBuilder implements MachineBuilder +{ + private final MachineBuilder builder; + private final Synchronizer synchronizer; + + public SyncMachineBuilder(MachineBuilder _builder) + { + builder = _builder; + synchronizer = new Synchronizer(); + } + + public Machine createMachine(Algorithm _algo) + { + Machine newMachine = builder.createMachine(_algo); + synchronizer.countup(); + return new SyncMachine(synchronizer,newMachine); + } + + public Synchronizer getSynchronizer() + { + return synchronizer; + } + + private static class SyncMachine extends Machine + { + private final Synchronizer synchronizer; + private final Machine machine; + + public SyncMachine(Synchronizer _synchronizer,Machine _machine) + { + synchronizer = _synchronizer; + machine = _machine; + } + + public void addPort(Port _p) + { + machine.addPort(_p); + } + + public void run() + { + machine.run(); + synchronizer.countdown(); + } + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/main/java/suikwasha/distributedalgorithm/simulator/Synchronizer.java Fri Oct 19 00:05:41 2012 +0900 @@ -0,0 +1,48 @@ +package suikwasha.distributedalgorithm.simulator; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicLong; + +public class Synchronizer +{ + private AtomicLong counter; + private CountDownLatch latch; + + public Synchronizer() + { + counter = new AtomicLong(); + latch = new CountDownLatch(1); + } + + public long countup() throws IllegalStateException + { + // double checking , is it effective? + if(latch.getCount() != 0){ + long value = counter.incrementAndGet(); + if(latch.getCount() != 0){ + return value; + } + } + + throw new IllegalStateException("latch.getCount() == 0"); + } + + public long countdown() throws IllegalStateException + { + long value = counter.decrementAndGet(); + if(value < 0){ + throw new IllegalStateException("counter < 0"); + } + + if(value == 0){ + latch.countDown(); + } + + return value; + } + + public void await() throws InterruptedException + { + latch.await(); + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/main/java/suikwasha/distributedalgorithm/topologies/RingTopologyBuilder.java Fri Oct 19 00:05:41 2012 +0900 @@ -0,0 +1,72 @@ +package suikwasha.distributedalgorithm.topologies; + +import fj.data.List; +import suikwasha.distributedalgorithm.framework.Algorithm; +import suikwasha.distributedalgorithm.framework.Link; +import suikwasha.distributedalgorithm.framework.LinkBuilder; +import suikwasha.distributedalgorithm.framework.Machine; +import suikwasha.distributedalgorithm.framework.MachineBuilder; +import suikwasha.distributedalgorithm.framework.Topology; +import suikwasha.distributedalgorithm.framework.TopologyBuilder; + +public class RingTopologyBuilder implements TopologyBuilder +{ + public Topology build(Iterable<Algorithm> _algos,MachineBuilder _machineBuilder,LinkBuilder _linkBuilder) + { + List<Machine> list = List.nil(); + + // first , create machines + + for(Algorithm algo : _algos){ + Machine machine = _machineBuilder.createMachine(algo); + list = list.snoc(machine); + } + + if(list.length() < 1){ + throw new IllegalArgumentException("list.length() < 1"); + } + + // create links + + Link firstLink = _linkBuilder.build(); + Machine head = list.head(); + head.addPort(firstLink.getPort2()); + + List<Machine> tail = list.tail(); + Machine prevMachine = head; + if(tail.length() != 0){ + for(Machine machine : tail){ + Link link = _linkBuilder.build(); + prevMachine.addPort(link.getPort1()); + machine.addPort(link.getPort2()); + prevMachine = machine; + } + Machine last = tail.last(); + last.addPort(firstLink.getPort1()); + }else{ + head.addPort(firstLink.getPort1()); + } + + return new RingTopology(list); + } + + private class RingTopology implements Topology + { + private final List<Machine> machines; + + public RingTopology(List<Machine> _machines) + { + machines = _machines; + } + + public Iterable<Machine> getMachines() + { + return machines; + } + + public long getMachineCount() + { + return machines.length(); + } + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/test/java/suikwasha/distributedalgorithm/AppTest.java Fri Oct 19 00:05:41 2012 +0900 @@ -0,0 +1,38 @@ +package suikwasha.distributedalgorithm; + +import junit.framework.Test; +import junit.framework.TestCase; +import junit.framework.TestSuite; + +/** + * Unit test for simple App. + */ +public class AppTest + extends TestCase +{ + /** + * Create the test case + * + * @param testName name of the test case + */ + public AppTest( String testName ) + { + super( testName ); + } + + /** + * @return the suite of tests being tested + */ + public static Test suite() + { + return new TestSuite( AppTest.class ); + } + + /** + * Rigourous Test :-) + */ + public void testApp() + { + assertTrue( true ); + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/target/classes/META-INF/MANIFEST.MF Fri Oct 19 00:05:41 2012 +0900 @@ -0,0 +1,5 @@ +Manifest-Version: 1.0 +Built-By: shoshi +Build-Jdk: 1.6.0_35 +Created-By: Maven Integration for Eclipse +
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/target/classes/META-INF/maven/suikwasha/distributedalgorithm/pom.properties Fri Oct 19 00:05:41 2012 +0900 @@ -0,0 +1,7 @@ +#Generated by Maven Integration for Eclipse +#Fri Oct 19 00:04:33 JST 2012 +version=0.0.1-SNAPSHOT +groupId=suikwasha +m2e.projectName=distributedalgorithm +m2e.projectLocation=/Users/shoshi/Documents/old/distributedalgorithm +artifactId=distributedalgorithm
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/target/classes/META-INF/maven/suikwasha/distributedalgorithm/pom.xml Fri Oct 19 00:05:41 2012 +0900 @@ -0,0 +1,30 @@ +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <groupId>suikwasha</groupId> + <artifactId>distributedalgorithm</artifactId> + <version>0.0.1-SNAPSHOT</version> + <packaging>jar</packaging> + + <name>distributedalgorithm</name> + <url>http://maven.apache.org</url> + + <properties> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + </properties> + + <dependencies> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <version>3.8.1</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.functionaljava</groupId> + <artifactId>functionaljava</artifactId> + <version>3.0</version> + </dependency> + </dependencies> +</project>