comparison src/main/java/christie/codegear/PriorityThreadPoolExecutors.java @ 196:ad49723367c2

add priority
author akahori
date Sat, 09 Mar 2019 14:03:06 +0900
parents
children ef5aad739292
comparison
equal deleted inserted replaced
195:a0be7c83fff8 196:ad49723367c2
1 package christie.codegear;
2
3 import java.util.Comparator;
4 import java.util.concurrent.*;
5 import java.util.concurrent.atomic.AtomicLong;
6
7
8 // https://stackoverflow.com/questions/807223/how-do-i-implement-task-prioritization-using-an-executorservice-in-java-5/42831172#42831172
9 public class PriorityThreadPoolExecutors {
10
11 public static ThreadPoolExecutor createThreadPool(int nThreads, int keepAliveTime) {
12 return new PriorityThreadPoolExecutor(nThreads, nThreads, keepAliveTime, TimeUnit.MILLISECONDS);
13 }
14 private static class PriorityThreadPoolExecutor extends ThreadPoolExecutor {
15 private static final int DEFAULT_PRIORITY = 0;
16 private static AtomicLong instanceCounter = new AtomicLong();
17
18 public PriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
19 int keepAliveTime, TimeUnit unit) {
20 super(corePoolSize, maximumPoolSize, keepAliveTime, unit, (BlockingQueue) new PriorityBlockingQueue<ComparableTask>(10,
21 ComparableTask.comparatorByPriorityAndSequentialOrder()));
22 }
23
24 @Override
25 public void execute(Runnable command) {
26 // If this is ugly then delegator pattern needed
27 if (command instanceof ComparableTask) //Already wrapped
28 super.execute(command);
29 else {
30 super.execute(newComparableRunnableFor(command));
31 }
32 }
33
34 private Runnable newComparableRunnableFor(Runnable runnable) {
35 return new ComparableRunnable(ensurePriorityRunnable(runnable));
36 }
37
38 @Override
39 protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
40 return new ComparableFutureTask<>(ensurePriorityRunnable(runnable), value);
41 }
42
43 private PriorityRunnable ensurePriorityRunnable(Runnable runnable) {
44 return (runnable instanceof PriorityRunnable) ? (PriorityRunnable) runnable
45 : PriorityRunnable.of(runnable, DEFAULT_PRIORITY);
46 }
47
48 private class ComparableFutureTask<T> extends FutureTask<T> implements ComparableTask {
49 private Long sequentialOrder = instanceCounter.getAndIncrement();
50 private HasPriority hasPriority;
51
52 public ComparableFutureTask(PriorityRunnable priorityRunnable, T result) {
53 super(priorityRunnable, result);
54 this.hasPriority = priorityRunnable;
55 }
56
57 @Override
58 public long getInstanceCount() {
59 return sequentialOrder;
60 }
61
62 @Override
63 public int getPriority() {
64 return hasPriority.getPriority();
65 }
66 }
67
68 private static class ComparableRunnable implements Runnable, ComparableTask {
69 private Long instanceCount = instanceCounter.getAndIncrement();
70 private HasPriority hasPriority;
71 private Runnable runnable;
72
73 public ComparableRunnable(PriorityRunnable priorityRunnable) {
74 this.runnable = priorityRunnable;
75 this.hasPriority = priorityRunnable;
76 }
77
78 @Override
79 public void run() {
80 runnable.run();
81 }
82
83 @Override
84 public int getPriority() {
85 return hasPriority.getPriority();
86 }
87
88 @Override
89 public long getInstanceCount() {
90 return instanceCount;
91 }
92 }
93
94 private interface ComparableTask extends Runnable {
95 int getPriority();
96
97 long getInstanceCount();
98
99 static Comparator<ComparableTask> comparatorByPriorityAndSequentialOrder() {
100 return (o1, o2) -> {
101 int priorityResult = o2.getPriority() - o1.getPriority();
102 return priorityResult != 0 ? priorityResult
103 : (int) (o1.getInstanceCount() - o2.getInstanceCount());
104 };
105 }
106
107 }
108
109 }
110
111 public interface HasPriority{
112 int getPriority();
113 }
114
115
116 public interface PriorityRunnable extends Runnable, HasPriority{
117
118 public static PriorityRunnable of(Runnable runnable, int priority) {
119 return new PriorityRunnable() {
120 @Override
121 public void run() {
122 runnable.run();
123 }
124
125 @Override
126 public int getPriority() {
127 return priority;
128 }
129 };
130 }
131 }
132
133 }