196
|
1 package christie.codegear;
|
|
2
|
|
3 import java.util.Comparator;
|
|
4 import java.util.concurrent.*;
|
|
5 import java.util.concurrent.atomic.AtomicLong;
|
|
6
|
|
7
|
|
8 // https://stackoverflow.com/questions/807223/how-do-i-implement-task-prioritization-using-an-executorservice-in-java-5/42831172#42831172
|
|
9 public class PriorityThreadPoolExecutors {
|
|
10
|
|
11 public static ThreadPoolExecutor createThreadPool(int nThreads, int keepAliveTime) {
|
|
12 return new PriorityThreadPoolExecutor(nThreads, nThreads, keepAliveTime, TimeUnit.MILLISECONDS);
|
|
13 }
|
|
14 private static class PriorityThreadPoolExecutor extends ThreadPoolExecutor {
|
|
15 private static final int DEFAULT_PRIORITY = 0;
|
|
16 private static AtomicLong instanceCounter = new AtomicLong();
|
|
17
|
|
18 public PriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
|
|
19 int keepAliveTime, TimeUnit unit) {
|
|
20 super(corePoolSize, maximumPoolSize, keepAliveTime, unit, (BlockingQueue) new PriorityBlockingQueue<ComparableTask>(10,
|
|
21 ComparableTask.comparatorByPriorityAndSequentialOrder()));
|
|
22 }
|
|
23
|
|
24 @Override
|
|
25 public void execute(Runnable command) {
|
|
26 // If this is ugly then delegator pattern needed
|
|
27 if (command instanceof ComparableTask) //Already wrapped
|
|
28 super.execute(command);
|
|
29 else {
|
|
30 super.execute(newComparableRunnableFor(command));
|
|
31 }
|
|
32 }
|
|
33
|
|
34 private Runnable newComparableRunnableFor(Runnable runnable) {
|
|
35 return new ComparableRunnable(ensurePriorityRunnable(runnable));
|
|
36 }
|
|
37
|
|
38 @Override
|
|
39 protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
|
|
40 return new ComparableFutureTask<>(ensurePriorityRunnable(runnable), value);
|
|
41 }
|
|
42
|
|
43 private PriorityRunnable ensurePriorityRunnable(Runnable runnable) {
|
|
44 return (runnable instanceof PriorityRunnable) ? (PriorityRunnable) runnable
|
|
45 : PriorityRunnable.of(runnable, DEFAULT_PRIORITY);
|
|
46 }
|
|
47
|
|
48 private class ComparableFutureTask<T> extends FutureTask<T> implements ComparableTask {
|
|
49 private Long sequentialOrder = instanceCounter.getAndIncrement();
|
|
50 private HasPriority hasPriority;
|
|
51
|
|
52 public ComparableFutureTask(PriorityRunnable priorityRunnable, T result) {
|
|
53 super(priorityRunnable, result);
|
|
54 this.hasPriority = priorityRunnable;
|
|
55 }
|
|
56
|
|
57 @Override
|
|
58 public long getInstanceCount() {
|
|
59 return sequentialOrder;
|
|
60 }
|
|
61
|
|
62 @Override
|
|
63 public int getPriority() {
|
|
64 return hasPriority.getPriority();
|
|
65 }
|
|
66 }
|
|
67
|
|
68 private static class ComparableRunnable implements Runnable, ComparableTask {
|
|
69 private Long instanceCount = instanceCounter.getAndIncrement();
|
|
70 private HasPriority hasPriority;
|
|
71 private Runnable runnable;
|
|
72
|
|
73 public ComparableRunnable(PriorityRunnable priorityRunnable) {
|
|
74 this.runnable = priorityRunnable;
|
|
75 this.hasPriority = priorityRunnable;
|
|
76 }
|
|
77
|
|
78 @Override
|
|
79 public void run() {
|
|
80 runnable.run();
|
|
81 }
|
|
82
|
|
83 @Override
|
|
84 public int getPriority() {
|
|
85 return hasPriority.getPriority();
|
|
86 }
|
|
87
|
|
88 @Override
|
|
89 public long getInstanceCount() {
|
|
90 return instanceCount;
|
|
91 }
|
|
92 }
|
|
93
|
|
94 private interface ComparableTask extends Runnable {
|
|
95 int getPriority();
|
|
96
|
|
97 long getInstanceCount();
|
|
98
|
|
99 static Comparator<ComparableTask> comparatorByPriorityAndSequentialOrder() {
|
|
100 return (o1, o2) -> {
|
|
101 int priorityResult = o2.getPriority() - o1.getPriority();
|
|
102 return priorityResult != 0 ? priorityResult
|
|
103 : (int) (o1.getInstanceCount() - o2.getInstanceCount());
|
|
104 };
|
|
105 }
|
|
106
|
|
107 }
|
|
108
|
|
109 }
|
|
110
|
|
111 public interface HasPriority{
|
|
112 int getPriority();
|
|
113 }
|
|
114
|
|
115
|
|
116 public interface PriorityRunnable extends Runnable, HasPriority{
|
|
117
|
|
118 public static PriorityRunnable of(Runnable runnable, int priority) {
|
|
119 return new PriorityRunnable() {
|
|
120 @Override
|
|
121 public void run() {
|
|
122 runnable.run();
|
|
123 }
|
|
124
|
|
125 @Override
|
|
126 public int getPriority() {
|
|
127 return priority;
|
|
128 }
|
|
129 };
|
|
130 }
|
|
131 }
|
|
132
|
|
133 } |