changeset 37:b9dd655a54b9

add paxos Code
author akahori
date Tue, 24 Jul 2018 20:31:21 +0900
parents 4479d37c8e53
children 02991eabdcbe
files src/main/java/christie/test/Paxos/AcceptCodeGear.java src/main/java/christie/test/Paxos/AcceptorCodeGear.java src/main/java/christie/test/Paxos/PromiseCodeGear.java src/main/java/christie/test/Paxos/Proposal.java src/main/java/christie/test/Paxos/ProposerCodeGear.java src/main/java/christie/test/Paxos/ReceivePromiseCodeGear.java src/main/java/christie/test/Paxos/SendAcceptRequestCodeGear.java src/main/java/christie/test/Paxos/SendPrepareRequestCodeGear.java src/main/java/christie/test/Paxos/StartPaxos.java
diffstat 9 files changed, 300 insertions(+), 0 deletions(-) [+]
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/main/java/christie/test/Paxos/AcceptCodeGear.java	Tue Jul 24 20:31:21 2018 +0900
@@ -0,0 +1,13 @@
+package christie.test.Paxos;
+
+import christie.codegear.CodeGear;
+import christie.codegear.CodeGearManager;
+
+public class AcceptCodeGear extends CodeGear {
+
+    @Override
+    protected void run(CodeGearManager cgm) {
+
+
+    }
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/main/java/christie/test/Paxos/AcceptorCodeGear.java	Tue Jul 24 20:31:21 2018 +0900
@@ -0,0 +1,14 @@
+package christie.test.Paxos;
+
+import christie.codegear.CodeGear;
+import christie.codegear.CodeGearManager;
+
+
+public class AcceptorCodeGear extends CodeGear {
+
+    @Override
+    protected void run(CodeGearManager cgm) {//できるだけ並列に走らせるためにStartCodeGearには書かない
+        cgm.setup(new PromiseCodeGear());
+        put("promisedProposal", new Proposal());
+    }
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/main/java/christie/test/Paxos/PromiseCodeGear.java	Tue Jul 24 20:31:21 2018 +0900
@@ -0,0 +1,38 @@
+package christie.test.Paxos;
+
+import christie.annotation.Take;
+import christie.codegear.CodeGear;
+import christie.codegear.CodeGearManager;
+
+public class PromiseCodeGear extends CodeGear {
+
+
+    Proposal acceptedProposal;
+
+    @Take
+    Proposal promisedProposal;
+
+    @Take
+    Proposal prepareProposal;
+
+    public PromiseCodeGear(){ }
+
+
+
+    @Override
+    protected void run(CodeGearManager cgm) {
+
+        if(promisedProposal.getNumber() < prepareProposal.getNumber()) {
+            System.out.println("promised < prepare : " + promisedProposal.getNumber() +  " < " + prepareProposal.getNumber());
+            System.out.println("Acceptor" + cgm.cgmID + " Recive Proposal from : " + prepareProposal.getCgmID());
+            put("promisedProposal", prepareProposal);
+            cgm.getCgmList().get(prepareProposal.getCgmID()).getLocalDGM().put("recievePromise", prepareProposal);
+            //cgm.setup(new AcceptCodeGear());
+        }else {
+            put("promisedProposal", promisedProposal);
+        }
+
+        cgm.setup(new PromiseCodeGear());
+    }
+
+}
\ No newline at end of file
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/main/java/christie/test/Paxos/Proposal.java	Tue Jul 24 20:31:21 2018 +0900
@@ -0,0 +1,52 @@
+package christie.test.Paxos;
+
+import christie.codegear.CodeGearManager;
+import org.msgpack.annotation.Message;
+
+@Message
+public class Proposal {
+    private int cgmNum = 0;
+    private int cgmID = 0;
+    private int number = 0;
+    private int value = 0;
+
+    public Proposal(){}
+
+
+
+    public Proposal(CodeGearManager cgm, int value){
+
+        this.cgmNum = cgm.getCgmList().size();
+        this.cgmID = cgm.cgmID;
+
+        this.value = value;
+        incrementNumber();
+
+    }
+
+
+
+    public void setValue(int value){
+        this.value = value;
+    }
+
+    public int getNumber(){
+        return this.number;
+    }
+
+    public int incrementNumber() {
+        this.number += cgmNum + cgmID;
+        return this.number;
+    }
+
+    public Proposal getIncrementedProposal(){
+        incrementNumber();
+        return this;
+    }
+
+    public int getCgmID(){
+        return this.cgmID;
+    }
+
+
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/main/java/christie/test/Paxos/ProposerCodeGear.java	Tue Jul 24 20:31:21 2018 +0900
@@ -0,0 +1,24 @@
+package christie.test.Paxos;
+
+
+import christie.codegear.CodeGear;
+import christie.codegear.CodeGearManager;
+import christie.datagear.DataGearManager;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class ProposerCodeGear extends CodeGear {
+
+    ConcurrentHashMap<String, DataGearManager> acceptors;
+
+    @Override
+    protected void run(CodeGearManager cgm) {
+
+        cgm.setup(new SendPrepareRequestCodeGear());
+        put("acceptors", cgm.getRemoteDgmList());
+        put("sendProposal", new Proposal(cgm,cgm.cgmID));
+        put("promiseCount", 0);
+
+    }
+}
+
+
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/main/java/christie/test/Paxos/ReceivePromiseCodeGear.java	Tue Jul 24 20:31:21 2018 +0900
@@ -0,0 +1,57 @@
+package christie.test.Paxos;
+
+import christie.annotation.Peek;
+import christie.annotation.Take;
+import christie.codegear.CodeGear;
+import christie.codegear.CodeGearManager;
+import christie.datagear.DataGearManager;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+
+public class ReceivePromiseCodeGear extends CodeGear{
+    @Peek
+    ConcurrentHashMap<String, DataGearManager> acceptors;
+
+    @Peek
+    Proposal sendedProposal;
+
+    @Take
+    Proposal recievePromise;
+
+    @Take
+    int promiseCount;
+
+    @Take
+    long startTimeMillis;
+
+    int timeOut_ms = 1 * 1000;
+
+
+    @Override
+    protected void run(CodeGearManager cgm) {
+
+        if(promiseCount > acceptors.size()/2){
+            System.out.println(cgm.cgmID + " Can send AcceptRequest");
+            return;
+        }
+
+        if(recievePromise.getNumber() == sendedProposal.getNumber()){
+            promiseCount = promiseCount + 1;
+
+        }
+
+        long endTimeMillis = System.currentTimeMillis();
+        if(endTimeMillis - startTimeMillis < timeOut_ms){
+            put("promiseCount", promiseCount);
+            put("recievePromise", new Proposal());
+            put("startTimeMillis", startTimeMillis);
+            cgm.setup(new ReceivePromiseCodeGear());
+
+        }else{
+            put("sendProposal", sendedProposal.getIncrementedProposal());
+            cgm.setup(new SendPrepareRequestCodeGear());
+        }
+
+    }
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/main/java/christie/test/Paxos/SendAcceptRequestCodeGear.java	Tue Jul 24 20:31:21 2018 +0900
@@ -0,0 +1,22 @@
+package christie.test.Paxos;
+
+import christie.annotation.Peek;
+import christie.annotation.Take;
+import christie.codegear.CodeGear;
+import christie.codegear.CodeGearManager;
+import christie.datagear.DataGearManager;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+public class SendAcceptRequestCodeGear extends CodeGear {
+    @Peek
+    ConcurrentHashMap<String, DataGearManager> acceptors;
+
+    @Take
+    Proposal sendedProposal;
+
+    @Override
+    protected void run(CodeGearManager cgm) {
+        System.out.println("send Accept Request");
+    }
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/main/java/christie/test/Paxos/SendPrepareRequestCodeGear.java	Tue Jul 24 20:31:21 2018 +0900
@@ -0,0 +1,29 @@
+package christie.test.Paxos;
+
+import christie.annotation.Peek;
+import christie.annotation.Take;
+import christie.codegear.CodeGear;
+import christie.codegear.CodeGearManager;
+import christie.datagear.DataGearManager;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+public class SendPrepareRequestCodeGear extends CodeGear{
+    @Peek
+    ConcurrentHashMap<String, DataGearManager> acceptors;
+
+    @Take
+    Proposal sendProposal;
+
+    @Override
+    protected void run(CodeGearManager cgm) {
+        for (String acceptorName : acceptors.keySet()){
+            put(acceptorName,"prepareProposal", sendProposal);
+        }
+
+        put("sendedProposal", sendProposal);
+        put("promiseCount", 0);
+        put("startTimeMillis", System.currentTimeMillis());
+        cgm.setup(new ReceivePromiseCodeGear());
+    }
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/main/java/christie/test/Paxos/StartPaxos.java	Tue Jul 24 20:31:21 2018 +0900
@@ -0,0 +1,51 @@
+package christie.test.Paxos;
+
+import christie.codegear.CodeGearManager;
+
+import christie.codegear.StartCodeGear;
+
+import java.util.ArrayList;
+
+public class StartPaxos extends StartCodeGear{
+
+    public StartPaxos(CodeGearManager cgm) {
+        super(cgm);
+
+    }
+
+    public static void main(String args[]){
+        int proposer_port = 10000;
+        int proposers_num = 2;
+        int acceptor_port = proposer_port + proposers_num;
+        int acceptors_num = 3;
+
+        ArrayList<CodeGearManager> proposers = new ArrayList<CodeGearManager>();
+        ArrayList<CodeGearManager> acceptors = new ArrayList<CodeGearManager>();
+
+
+        for(int i = 0; i < acceptors_num; i++){
+            CodeGearManager acceptor = createCGM(acceptor_port + i);
+            acceptor.setup(new AcceptorCodeGear());
+            acceptors.add(acceptor);
+        }
+
+        for(int i = 0; i < proposers_num; i++){
+            proposers.add(createCGM(proposer_port + i));
+        }
+
+
+        for(int i = 0; i < proposers_num; i++){
+
+            for(int j = 0; j < acceptors_num; j++){
+                proposers.get(i).createRemoteDGM("acceptor" + j, "localhost", acceptors.get(j).localPort);
+                acceptors.get(j).createRemoteDGM("proposer" + i, "localhost", proposers.get(i).localPort);
+            }
+            proposers.get(i).setup(new ProposerCodeGear());
+        }
+
+    }
+
+
+
+
+}
\ No newline at end of file