Mercurial > hg > Database > Christie
changeset 196:ad49723367c2
add priority
author | akahori |
---|---|
date | Sat, 09 Mar 2019 14:03:06 +0900 |
parents | a0be7c83fff8 |
children | 4d8f90e8a92c |
files | src/main/java/christie/codegear/CodeGear.java src/main/java/christie/codegear/CodeGearExecutor.java src/main/java/christie/codegear/CodeGearManager.java src/main/java/christie/codegear/InputDataGear.java src/main/java/christie/codegear/PriorityThreadPoolExecutors.java src/main/java/christie/codegear/StartCodeGear.java |
diffstat | 6 files changed, 167 insertions(+), 12 deletions(-) [+] |
line wrap: on
line diff
--- a/src/main/java/christie/codegear/CodeGear.java Sat Mar 09 10:19:03 2019 +0900 +++ b/src/main/java/christie/codegear/CodeGear.java Sat Mar 09 14:03:06 2019 +0900 @@ -26,17 +26,17 @@ protected abstract void run(CodeGearManager cgm); - public void setup(CodeGearManager cgm){ + public void setup(CodeGearManager cgm, int priority){ this.cgm = cgm; - this.cge = new CodeGearExecutor(this, this.cgm); + this.cge = new CodeGearExecutor(this, this.cgm, priority); this.localDGM = cgm.getLocalDGM(); for (Field field : this.getClass().getDeclaredFields()) {//AnnotationからInputDataGearをセット if (field.isAnnotationPresent(Take.class)) { - Take ano = field.getAnnotation(Take.class); + //Take ano = field.getAnnotation(Take.class); setTakeCommand("local", field.getName(), new DataGear(field.getType())); } else if (field.isAnnotationPresent(Peek.class)) { - Peek ano = field.getAnnotation(Peek.class); + //Peek ano = field.getAnnotation(Peek.class); setPeekCommand("local", field.getName(), new DataGear(field.getType())); } else if (field.isAnnotationPresent(TakeFrom.class)) { TakeFrom ano = field.getAnnotation(TakeFrom.class); @@ -105,4 +105,5 @@ public CodeGearExecutor getCge() { return cge; } + }
--- a/src/main/java/christie/codegear/CodeGearExecutor.java Sat Mar 09 10:19:03 2019 +0900 +++ b/src/main/java/christie/codegear/CodeGearExecutor.java Sat Mar 09 14:03:06 2019 +0900 @@ -3,14 +3,26 @@ public class CodeGearExecutor implements Runnable { CodeGear cg; CodeGearManager cgm; + private int priority = Thread.NORM_PRIORITY; - public CodeGearExecutor(CodeGear cg, CodeGearManager cgm){ + public CodeGearExecutor(CodeGear cg, CodeGearManager cgm, int priority){ this.cg = cg; this.cgm = cgm; + this.priority = priority; } @Override public void run() { cg.run(cgm); } + + public int getPriority() { + return priority; + } + + public void setPriority(int priority) { + this.priority = priority; + } + + }
--- a/src/main/java/christie/codegear/CodeGearManager.java Sat Mar 09 10:19:03 2019 +0900 +++ b/src/main/java/christie/codegear/CodeGearManager.java Sat Mar 09 14:03:06 2019 +0900 @@ -31,6 +31,8 @@ this.localPort = localPort; daemon = new ChristieDaemon(localPort, this); daemon.listen(); + + } public LocalDataGearManager getLocalDGM(){ @@ -53,11 +55,15 @@ } public void submit(CodeGear cg){ - threadPoolExecutor.execute(cg.getCge()); + threadPoolExecutor.execute(PriorityThreadPoolExecutors.PriorityRunnable.of(cg.getCge(), cg.getCge().getPriority())); } public void setup(CodeGear cg){ - cg.setup(this); + setup(cg, Thread.NORM_PRIORITY); + } + + public void setup(CodeGear cg, int priority){ + cg.setup(this, priority); } public ConcurrentHashMap<Integer, CodeGearManager> getCgmList() {
--- a/src/main/java/christie/codegear/InputDataGear.java Sat Mar 09 10:19:03 2019 +0900 +++ b/src/main/java/christie/codegear/InputDataGear.java Sat Mar 09 14:03:06 2019 +0900 @@ -4,6 +4,7 @@ import christie.annotation.PeekFrom; import christie.annotation.Take; import christie.annotation.TakeFrom; +import christie.datagear.DataGears; import christie.datagear.command.Command; import christie.datagear.dg.DataGear; @@ -17,6 +18,7 @@ * inputDataGearの待ち合わせの管理 */ public class InputDataGear { + public DataGears dataGears; public ConcurrentHashMap<String, DataGear> inputValue = new ConcurrentHashMap<String, DataGear>();//受け皿 public CodeGearManager cgm; public CodeGear cg; @@ -41,10 +43,10 @@ public void setInputs(String key, DataGear dg){ inputValue.put(key, dg); - count(); + decriment(); } - public synchronized void count(){//Commandが実行されるたびにデクリメント + public synchronized void decriment(){//Commandが実行されるたびにデクリメント if (count.decrementAndGet() == 0){ setInputValue(); submitCG();
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/main/java/christie/codegear/PriorityThreadPoolExecutors.java Sat Mar 09 14:03:06 2019 +0900 @@ -0,0 +1,133 @@ +package christie.codegear; + +import java.util.Comparator; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicLong; + + +// https://stackoverflow.com/questions/807223/how-do-i-implement-task-prioritization-using-an-executorservice-in-java-5/42831172#42831172 +public class PriorityThreadPoolExecutors { + + public static ThreadPoolExecutor createThreadPool(int nThreads, int keepAliveTime) { + return new PriorityThreadPoolExecutor(nThreads, nThreads, keepAliveTime, TimeUnit.MILLISECONDS); + } + private static class PriorityThreadPoolExecutor extends ThreadPoolExecutor { + private static final int DEFAULT_PRIORITY = 0; + private static AtomicLong instanceCounter = new AtomicLong(); + + public PriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize, + int keepAliveTime, TimeUnit unit) { + super(corePoolSize, maximumPoolSize, keepAliveTime, unit, (BlockingQueue) new PriorityBlockingQueue<ComparableTask>(10, + ComparableTask.comparatorByPriorityAndSequentialOrder())); + } + + @Override + public void execute(Runnable command) { + // If this is ugly then delegator pattern needed + if (command instanceof ComparableTask) //Already wrapped + super.execute(command); + else { + super.execute(newComparableRunnableFor(command)); + } + } + + private Runnable newComparableRunnableFor(Runnable runnable) { + return new ComparableRunnable(ensurePriorityRunnable(runnable)); + } + + @Override + protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { + return new ComparableFutureTask<>(ensurePriorityRunnable(runnable), value); + } + + private PriorityRunnable ensurePriorityRunnable(Runnable runnable) { + return (runnable instanceof PriorityRunnable) ? (PriorityRunnable) runnable + : PriorityRunnable.of(runnable, DEFAULT_PRIORITY); + } + + private class ComparableFutureTask<T> extends FutureTask<T> implements ComparableTask { + private Long sequentialOrder = instanceCounter.getAndIncrement(); + private HasPriority hasPriority; + + public ComparableFutureTask(PriorityRunnable priorityRunnable, T result) { + super(priorityRunnable, result); + this.hasPriority = priorityRunnable; + } + + @Override + public long getInstanceCount() { + return sequentialOrder; + } + + @Override + public int getPriority() { + return hasPriority.getPriority(); + } + } + + private static class ComparableRunnable implements Runnable, ComparableTask { + private Long instanceCount = instanceCounter.getAndIncrement(); + private HasPriority hasPriority; + private Runnable runnable; + + public ComparableRunnable(PriorityRunnable priorityRunnable) { + this.runnable = priorityRunnable; + this.hasPriority = priorityRunnable; + } + + @Override + public void run() { + runnable.run(); + } + + @Override + public int getPriority() { + return hasPriority.getPriority(); + } + + @Override + public long getInstanceCount() { + return instanceCount; + } + } + + private interface ComparableTask extends Runnable { + int getPriority(); + + long getInstanceCount(); + + static Comparator<ComparableTask> comparatorByPriorityAndSequentialOrder() { + return (o1, o2) -> { + int priorityResult = o2.getPriority() - o1.getPriority(); + return priorityResult != 0 ? priorityResult + : (int) (o1.getInstanceCount() - o2.getInstanceCount()); + }; + } + + } + + } + + public interface HasPriority{ + int getPriority(); + } + + + public interface PriorityRunnable extends Runnable, HasPriority{ + + public static PriorityRunnable of(Runnable runnable, int priority) { + return new PriorityRunnable() { + @Override + public void run() { + runnable.run(); + } + + @Override + public int getPriority() { + return priority; + } + }; + } + } + +} \ No newline at end of file
--- a/src/main/java/christie/codegear/StartCodeGear.java Sat Mar 09 10:19:03 2019 +0900 +++ b/src/main/java/christie/codegear/StartCodeGear.java Sat Mar 09 14:03:06 2019 +0900 @@ -7,12 +7,13 @@ public abstract class StartCodeGear extends CodeGear{ static ConcurrentHashMap<Integer, CodeGearManager> cgmList = new ConcurrentHashMap<>(); - static LinkedBlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<Runnable>(); - static ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), // initial number of threads + //static LinkedBlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<Runnable>(); + /*static ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), // initial number of threads Runtime.getRuntime().availableProcessors(), Integer.MAX_VALUE, // keepAliveTime TimeUnit.SECONDS, - taskQueue); + taskQueue);*/ + static ThreadPoolExecutor threadPoolExecutor = PriorityThreadPoolExecutors.createThreadPool(Runtime.getRuntime().availableProcessors(), Integer.MAX_VALUE); static int cgmCount = 1; public StartCodeGear(CodeGearManager cgm){