Mercurial > hg > Database > Christie
view src/main/java/christie/codegear/PriorityThreadPoolExecutors.java @ 196:ad49723367c2
add priority
author | akahori |
---|---|
date | Sat, 09 Mar 2019 14:03:06 +0900 |
parents | |
children | ef5aad739292 |
line wrap: on
line source
package christie.codegear; import java.util.Comparator; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicLong; // https://stackoverflow.com/questions/807223/how-do-i-implement-task-prioritization-using-an-executorservice-in-java-5/42831172#42831172 public class PriorityThreadPoolExecutors { public static ThreadPoolExecutor createThreadPool(int nThreads, int keepAliveTime) { return new PriorityThreadPoolExecutor(nThreads, nThreads, keepAliveTime, TimeUnit.MILLISECONDS); } private static class PriorityThreadPoolExecutor extends ThreadPoolExecutor { private static final int DEFAULT_PRIORITY = 0; private static AtomicLong instanceCounter = new AtomicLong(); public PriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize, int keepAliveTime, TimeUnit unit) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, (BlockingQueue) new PriorityBlockingQueue<ComparableTask>(10, ComparableTask.comparatorByPriorityAndSequentialOrder())); } @Override public void execute(Runnable command) { // If this is ugly then delegator pattern needed if (command instanceof ComparableTask) //Already wrapped super.execute(command); else { super.execute(newComparableRunnableFor(command)); } } private Runnable newComparableRunnableFor(Runnable runnable) { return new ComparableRunnable(ensurePriorityRunnable(runnable)); } @Override protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { return new ComparableFutureTask<>(ensurePriorityRunnable(runnable), value); } private PriorityRunnable ensurePriorityRunnable(Runnable runnable) { return (runnable instanceof PriorityRunnable) ? (PriorityRunnable) runnable : PriorityRunnable.of(runnable, DEFAULT_PRIORITY); } private class ComparableFutureTask<T> extends FutureTask<T> implements ComparableTask { private Long sequentialOrder = instanceCounter.getAndIncrement(); private HasPriority hasPriority; public ComparableFutureTask(PriorityRunnable priorityRunnable, T result) { super(priorityRunnable, result); this.hasPriority = priorityRunnable; } @Override public long getInstanceCount() { return sequentialOrder; } @Override public int getPriority() { return hasPriority.getPriority(); } } private static class ComparableRunnable implements Runnable, ComparableTask { private Long instanceCount = instanceCounter.getAndIncrement(); private HasPriority hasPriority; private Runnable runnable; public ComparableRunnable(PriorityRunnable priorityRunnable) { this.runnable = priorityRunnable; this.hasPriority = priorityRunnable; } @Override public void run() { runnable.run(); } @Override public int getPriority() { return hasPriority.getPriority(); } @Override public long getInstanceCount() { return instanceCount; } } private interface ComparableTask extends Runnable { int getPriority(); long getInstanceCount(); static Comparator<ComparableTask> comparatorByPriorityAndSequentialOrder() { return (o1, o2) -> { int priorityResult = o2.getPriority() - o1.getPriority(); return priorityResult != 0 ? priorityResult : (int) (o1.getInstanceCount() - o2.getInstanceCount()); }; } } } public interface HasPriority{ int getPriority(); } public interface PriorityRunnable extends Runnable, HasPriority{ public static PriorityRunnable of(Runnable runnable, int priority) { return new PriorityRunnable() { @Override public void run() { runnable.run(); } @Override public int getPriority() { return priority; } }; } } }