Mercurial > hg > Database > Christie
changeset 37:b9dd655a54b9
add paxos Code
author | akahori |
---|---|
date | Tue, 24 Jul 2018 20:31:21 +0900 |
parents | 4479d37c8e53 |
children | 02991eabdcbe |
files | src/main/java/christie/test/Paxos/AcceptCodeGear.java src/main/java/christie/test/Paxos/AcceptorCodeGear.java src/main/java/christie/test/Paxos/PromiseCodeGear.java src/main/java/christie/test/Paxos/Proposal.java src/main/java/christie/test/Paxos/ProposerCodeGear.java src/main/java/christie/test/Paxos/ReceivePromiseCodeGear.java src/main/java/christie/test/Paxos/SendAcceptRequestCodeGear.java src/main/java/christie/test/Paxos/SendPrepareRequestCodeGear.java src/main/java/christie/test/Paxos/StartPaxos.java |
diffstat | 9 files changed, 300 insertions(+), 0 deletions(-) [+] |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/main/java/christie/test/Paxos/AcceptCodeGear.java Tue Jul 24 20:31:21 2018 +0900 @@ -0,0 +1,13 @@ +package christie.test.Paxos; + +import christie.codegear.CodeGear; +import christie.codegear.CodeGearManager; + +public class AcceptCodeGear extends CodeGear { + + @Override + protected void run(CodeGearManager cgm) { + + + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/main/java/christie/test/Paxos/AcceptorCodeGear.java Tue Jul 24 20:31:21 2018 +0900 @@ -0,0 +1,14 @@ +package christie.test.Paxos; + +import christie.codegear.CodeGear; +import christie.codegear.CodeGearManager; + + +public class AcceptorCodeGear extends CodeGear { + + @Override + protected void run(CodeGearManager cgm) {//できるだけ並列に走らせるためにStartCodeGearには書かない + cgm.setup(new PromiseCodeGear()); + put("promisedProposal", new Proposal()); + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/main/java/christie/test/Paxos/PromiseCodeGear.java Tue Jul 24 20:31:21 2018 +0900 @@ -0,0 +1,38 @@ +package christie.test.Paxos; + +import christie.annotation.Take; +import christie.codegear.CodeGear; +import christie.codegear.CodeGearManager; + +public class PromiseCodeGear extends CodeGear { + + + Proposal acceptedProposal; + + @Take + Proposal promisedProposal; + + @Take + Proposal prepareProposal; + + public PromiseCodeGear(){ } + + + + @Override + protected void run(CodeGearManager cgm) { + + if(promisedProposal.getNumber() < prepareProposal.getNumber()) { + System.out.println("promised < prepare : " + promisedProposal.getNumber() + " < " + prepareProposal.getNumber()); + System.out.println("Acceptor" + cgm.cgmID + " Recive Proposal from : " + prepareProposal.getCgmID()); + put("promisedProposal", prepareProposal); + cgm.getCgmList().get(prepareProposal.getCgmID()).getLocalDGM().put("recievePromise", prepareProposal); + //cgm.setup(new AcceptCodeGear()); + }else { + put("promisedProposal", promisedProposal); + } + + cgm.setup(new PromiseCodeGear()); + } + +} \ No newline at end of file
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/main/java/christie/test/Paxos/Proposal.java Tue Jul 24 20:31:21 2018 +0900 @@ -0,0 +1,52 @@ +package christie.test.Paxos; + +import christie.codegear.CodeGearManager; +import org.msgpack.annotation.Message; + +@Message +public class Proposal { + private int cgmNum = 0; + private int cgmID = 0; + private int number = 0; + private int value = 0; + + public Proposal(){} + + + + public Proposal(CodeGearManager cgm, int value){ + + this.cgmNum = cgm.getCgmList().size(); + this.cgmID = cgm.cgmID; + + this.value = value; + incrementNumber(); + + } + + + + public void setValue(int value){ + this.value = value; + } + + public int getNumber(){ + return this.number; + } + + public int incrementNumber() { + this.number += cgmNum + cgmID; + return this.number; + } + + public Proposal getIncrementedProposal(){ + incrementNumber(); + return this; + } + + public int getCgmID(){ + return this.cgmID; + } + + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/main/java/christie/test/Paxos/ProposerCodeGear.java Tue Jul 24 20:31:21 2018 +0900 @@ -0,0 +1,24 @@ +package christie.test.Paxos; + + +import christie.codegear.CodeGear; +import christie.codegear.CodeGearManager; +import christie.datagear.DataGearManager; +import java.util.concurrent.ConcurrentHashMap; + +public class ProposerCodeGear extends CodeGear { + + ConcurrentHashMap<String, DataGearManager> acceptors; + + @Override + protected void run(CodeGearManager cgm) { + + cgm.setup(new SendPrepareRequestCodeGear()); + put("acceptors", cgm.getRemoteDgmList()); + put("sendProposal", new Proposal(cgm,cgm.cgmID)); + put("promiseCount", 0); + + } +} + +
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/main/java/christie/test/Paxos/ReceivePromiseCodeGear.java Tue Jul 24 20:31:21 2018 +0900 @@ -0,0 +1,57 @@ +package christie.test.Paxos; + +import christie.annotation.Peek; +import christie.annotation.Take; +import christie.codegear.CodeGear; +import christie.codegear.CodeGearManager; +import christie.datagear.DataGearManager; + +import java.util.concurrent.ConcurrentHashMap; + + +public class ReceivePromiseCodeGear extends CodeGear{ + @Peek + ConcurrentHashMap<String, DataGearManager> acceptors; + + @Peek + Proposal sendedProposal; + + @Take + Proposal recievePromise; + + @Take + int promiseCount; + + @Take + long startTimeMillis; + + int timeOut_ms = 1 * 1000; + + + @Override + protected void run(CodeGearManager cgm) { + + if(promiseCount > acceptors.size()/2){ + System.out.println(cgm.cgmID + " Can send AcceptRequest"); + return; + } + + if(recievePromise.getNumber() == sendedProposal.getNumber()){ + promiseCount = promiseCount + 1; + + } + + long endTimeMillis = System.currentTimeMillis(); + if(endTimeMillis - startTimeMillis < timeOut_ms){ + put("promiseCount", promiseCount); + put("recievePromise", new Proposal()); + put("startTimeMillis", startTimeMillis); + cgm.setup(new ReceivePromiseCodeGear()); + + }else{ + put("sendProposal", sendedProposal.getIncrementedProposal()); + cgm.setup(new SendPrepareRequestCodeGear()); + } + + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/main/java/christie/test/Paxos/SendAcceptRequestCodeGear.java Tue Jul 24 20:31:21 2018 +0900 @@ -0,0 +1,22 @@ +package christie.test.Paxos; + +import christie.annotation.Peek; +import christie.annotation.Take; +import christie.codegear.CodeGear; +import christie.codegear.CodeGearManager; +import christie.datagear.DataGearManager; + +import java.util.concurrent.ConcurrentHashMap; + +public class SendAcceptRequestCodeGear extends CodeGear { + @Peek + ConcurrentHashMap<String, DataGearManager> acceptors; + + @Take + Proposal sendedProposal; + + @Override + protected void run(CodeGearManager cgm) { + System.out.println("send Accept Request"); + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/main/java/christie/test/Paxos/SendPrepareRequestCodeGear.java Tue Jul 24 20:31:21 2018 +0900 @@ -0,0 +1,29 @@ +package christie.test.Paxos; + +import christie.annotation.Peek; +import christie.annotation.Take; +import christie.codegear.CodeGear; +import christie.codegear.CodeGearManager; +import christie.datagear.DataGearManager; + +import java.util.concurrent.ConcurrentHashMap; + +public class SendPrepareRequestCodeGear extends CodeGear{ + @Peek + ConcurrentHashMap<String, DataGearManager> acceptors; + + @Take + Proposal sendProposal; + + @Override + protected void run(CodeGearManager cgm) { + for (String acceptorName : acceptors.keySet()){ + put(acceptorName,"prepareProposal", sendProposal); + } + + put("sendedProposal", sendProposal); + put("promiseCount", 0); + put("startTimeMillis", System.currentTimeMillis()); + cgm.setup(new ReceivePromiseCodeGear()); + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/main/java/christie/test/Paxos/StartPaxos.java Tue Jul 24 20:31:21 2018 +0900 @@ -0,0 +1,51 @@ +package christie.test.Paxos; + +import christie.codegear.CodeGearManager; + +import christie.codegear.StartCodeGear; + +import java.util.ArrayList; + +public class StartPaxos extends StartCodeGear{ + + public StartPaxos(CodeGearManager cgm) { + super(cgm); + + } + + public static void main(String args[]){ + int proposer_port = 10000; + int proposers_num = 2; + int acceptor_port = proposer_port + proposers_num; + int acceptors_num = 3; + + ArrayList<CodeGearManager> proposers = new ArrayList<CodeGearManager>(); + ArrayList<CodeGearManager> acceptors = new ArrayList<CodeGearManager>(); + + + for(int i = 0; i < acceptors_num; i++){ + CodeGearManager acceptor = createCGM(acceptor_port + i); + acceptor.setup(new AcceptorCodeGear()); + acceptors.add(acceptor); + } + + for(int i = 0; i < proposers_num; i++){ + proposers.add(createCGM(proposer_port + i)); + } + + + for(int i = 0; i < proposers_num; i++){ + + for(int j = 0; j < acceptors_num; j++){ + proposers.get(i).createRemoteDGM("acceptor" + j, "localhost", acceptors.get(j).localPort); + acceptors.get(j).createRemoteDGM("proposer" + i, "localhost", proposers.get(i).localPort); + } + proposers.get(i).setup(new ProposerCodeGear()); + } + + } + + + + +} \ No newline at end of file