Mercurial > hg > Database > Christie
diff src/main/java/christie/codegear/PriorityThreadPoolExecutors.java @ 196:ad49723367c2
add priority
author | akahori |
---|---|
date | Sat, 09 Mar 2019 14:03:06 +0900 |
parents | |
children | ef5aad739292 |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/main/java/christie/codegear/PriorityThreadPoolExecutors.java Sat Mar 09 14:03:06 2019 +0900 @@ -0,0 +1,133 @@ +package christie.codegear; + +import java.util.Comparator; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicLong; + + +// https://stackoverflow.com/questions/807223/how-do-i-implement-task-prioritization-using-an-executorservice-in-java-5/42831172#42831172 +public class PriorityThreadPoolExecutors { + + public static ThreadPoolExecutor createThreadPool(int nThreads, int keepAliveTime) { + return new PriorityThreadPoolExecutor(nThreads, nThreads, keepAliveTime, TimeUnit.MILLISECONDS); + } + private static class PriorityThreadPoolExecutor extends ThreadPoolExecutor { + private static final int DEFAULT_PRIORITY = 0; + private static AtomicLong instanceCounter = new AtomicLong(); + + public PriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize, + int keepAliveTime, TimeUnit unit) { + super(corePoolSize, maximumPoolSize, keepAliveTime, unit, (BlockingQueue) new PriorityBlockingQueue<ComparableTask>(10, + ComparableTask.comparatorByPriorityAndSequentialOrder())); + } + + @Override + public void execute(Runnable command) { + // If this is ugly then delegator pattern needed + if (command instanceof ComparableTask) //Already wrapped + super.execute(command); + else { + super.execute(newComparableRunnableFor(command)); + } + } + + private Runnable newComparableRunnableFor(Runnable runnable) { + return new ComparableRunnable(ensurePriorityRunnable(runnable)); + } + + @Override + protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { + return new ComparableFutureTask<>(ensurePriorityRunnable(runnable), value); + } + + private PriorityRunnable ensurePriorityRunnable(Runnable runnable) { + return (runnable instanceof PriorityRunnable) ? (PriorityRunnable) runnable + : PriorityRunnable.of(runnable, DEFAULT_PRIORITY); + } + + private class ComparableFutureTask<T> extends FutureTask<T> implements ComparableTask { + private Long sequentialOrder = instanceCounter.getAndIncrement(); + private HasPriority hasPriority; + + public ComparableFutureTask(PriorityRunnable priorityRunnable, T result) { + super(priorityRunnable, result); + this.hasPriority = priorityRunnable; + } + + @Override + public long getInstanceCount() { + return sequentialOrder; + } + + @Override + public int getPriority() { + return hasPriority.getPriority(); + } + } + + private static class ComparableRunnable implements Runnable, ComparableTask { + private Long instanceCount = instanceCounter.getAndIncrement(); + private HasPriority hasPriority; + private Runnable runnable; + + public ComparableRunnable(PriorityRunnable priorityRunnable) { + this.runnable = priorityRunnable; + this.hasPriority = priorityRunnable; + } + + @Override + public void run() { + runnable.run(); + } + + @Override + public int getPriority() { + return hasPriority.getPriority(); + } + + @Override + public long getInstanceCount() { + return instanceCount; + } + } + + private interface ComparableTask extends Runnable { + int getPriority(); + + long getInstanceCount(); + + static Comparator<ComparableTask> comparatorByPriorityAndSequentialOrder() { + return (o1, o2) -> { + int priorityResult = o2.getPriority() - o1.getPriority(); + return priorityResult != 0 ? priorityResult + : (int) (o1.getInstanceCount() - o2.getInstanceCount()); + }; + } + + } + + } + + public interface HasPriority{ + int getPriority(); + } + + + public interface PriorityRunnable extends Runnable, HasPriority{ + + public static PriorityRunnable of(Runnable runnable, int priority) { + return new PriorityRunnable() { + @Override + public void run() { + runnable.run(); + } + + @Override + public int getPriority() { + return priority; + } + }; + } + } + +} \ No newline at end of file