annotate src/main/java/christie/codegear/PriorityThreadPoolExecutors.java @ 196:ad49723367c2

add priority
author akahori
date Sat, 09 Mar 2019 14:03:06 +0900
parents
children ef5aad739292
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
rev   line source
196
ad49723367c2 add priority
akahori
parents:
diff changeset
1 package christie.codegear;
ad49723367c2 add priority
akahori
parents:
diff changeset
2
ad49723367c2 add priority
akahori
parents:
diff changeset
3 import java.util.Comparator;
ad49723367c2 add priority
akahori
parents:
diff changeset
4 import java.util.concurrent.*;
ad49723367c2 add priority
akahori
parents:
diff changeset
5 import java.util.concurrent.atomic.AtomicLong;
ad49723367c2 add priority
akahori
parents:
diff changeset
6
ad49723367c2 add priority
akahori
parents:
diff changeset
7
ad49723367c2 add priority
akahori
parents:
diff changeset
8 // https://stackoverflow.com/questions/807223/how-do-i-implement-task-prioritization-using-an-executorservice-in-java-5/42831172#42831172
ad49723367c2 add priority
akahori
parents:
diff changeset
9 public class PriorityThreadPoolExecutors {
ad49723367c2 add priority
akahori
parents:
diff changeset
10
ad49723367c2 add priority
akahori
parents:
diff changeset
11 public static ThreadPoolExecutor createThreadPool(int nThreads, int keepAliveTime) {
ad49723367c2 add priority
akahori
parents:
diff changeset
12 return new PriorityThreadPoolExecutor(nThreads, nThreads, keepAliveTime, TimeUnit.MILLISECONDS);
ad49723367c2 add priority
akahori
parents:
diff changeset
13 }
ad49723367c2 add priority
akahori
parents:
diff changeset
14 private static class PriorityThreadPoolExecutor extends ThreadPoolExecutor {
ad49723367c2 add priority
akahori
parents:
diff changeset
15 private static final int DEFAULT_PRIORITY = 0;
ad49723367c2 add priority
akahori
parents:
diff changeset
16 private static AtomicLong instanceCounter = new AtomicLong();
ad49723367c2 add priority
akahori
parents:
diff changeset
17
ad49723367c2 add priority
akahori
parents:
diff changeset
18 public PriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
ad49723367c2 add priority
akahori
parents:
diff changeset
19 int keepAliveTime, TimeUnit unit) {
ad49723367c2 add priority
akahori
parents:
diff changeset
20 super(corePoolSize, maximumPoolSize, keepAliveTime, unit, (BlockingQueue) new PriorityBlockingQueue<ComparableTask>(10,
ad49723367c2 add priority
akahori
parents:
diff changeset
21 ComparableTask.comparatorByPriorityAndSequentialOrder()));
ad49723367c2 add priority
akahori
parents:
diff changeset
22 }
ad49723367c2 add priority
akahori
parents:
diff changeset
23
ad49723367c2 add priority
akahori
parents:
diff changeset
24 @Override
ad49723367c2 add priority
akahori
parents:
diff changeset
25 public void execute(Runnable command) {
ad49723367c2 add priority
akahori
parents:
diff changeset
26 // If this is ugly then delegator pattern needed
ad49723367c2 add priority
akahori
parents:
diff changeset
27 if (command instanceof ComparableTask) //Already wrapped
ad49723367c2 add priority
akahori
parents:
diff changeset
28 super.execute(command);
ad49723367c2 add priority
akahori
parents:
diff changeset
29 else {
ad49723367c2 add priority
akahori
parents:
diff changeset
30 super.execute(newComparableRunnableFor(command));
ad49723367c2 add priority
akahori
parents:
diff changeset
31 }
ad49723367c2 add priority
akahori
parents:
diff changeset
32 }
ad49723367c2 add priority
akahori
parents:
diff changeset
33
ad49723367c2 add priority
akahori
parents:
diff changeset
34 private Runnable newComparableRunnableFor(Runnable runnable) {
ad49723367c2 add priority
akahori
parents:
diff changeset
35 return new ComparableRunnable(ensurePriorityRunnable(runnable));
ad49723367c2 add priority
akahori
parents:
diff changeset
36 }
ad49723367c2 add priority
akahori
parents:
diff changeset
37
ad49723367c2 add priority
akahori
parents:
diff changeset
38 @Override
ad49723367c2 add priority
akahori
parents:
diff changeset
39 protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
ad49723367c2 add priority
akahori
parents:
diff changeset
40 return new ComparableFutureTask<>(ensurePriorityRunnable(runnable), value);
ad49723367c2 add priority
akahori
parents:
diff changeset
41 }
ad49723367c2 add priority
akahori
parents:
diff changeset
42
ad49723367c2 add priority
akahori
parents:
diff changeset
43 private PriorityRunnable ensurePriorityRunnable(Runnable runnable) {
ad49723367c2 add priority
akahori
parents:
diff changeset
44 return (runnable instanceof PriorityRunnable) ? (PriorityRunnable) runnable
ad49723367c2 add priority
akahori
parents:
diff changeset
45 : PriorityRunnable.of(runnable, DEFAULT_PRIORITY);
ad49723367c2 add priority
akahori
parents:
diff changeset
46 }
ad49723367c2 add priority
akahori
parents:
diff changeset
47
ad49723367c2 add priority
akahori
parents:
diff changeset
48 private class ComparableFutureTask<T> extends FutureTask<T> implements ComparableTask {
ad49723367c2 add priority
akahori
parents:
diff changeset
49 private Long sequentialOrder = instanceCounter.getAndIncrement();
ad49723367c2 add priority
akahori
parents:
diff changeset
50 private HasPriority hasPriority;
ad49723367c2 add priority
akahori
parents:
diff changeset
51
ad49723367c2 add priority
akahori
parents:
diff changeset
52 public ComparableFutureTask(PriorityRunnable priorityRunnable, T result) {
ad49723367c2 add priority
akahori
parents:
diff changeset
53 super(priorityRunnable, result);
ad49723367c2 add priority
akahori
parents:
diff changeset
54 this.hasPriority = priorityRunnable;
ad49723367c2 add priority
akahori
parents:
diff changeset
55 }
ad49723367c2 add priority
akahori
parents:
diff changeset
56
ad49723367c2 add priority
akahori
parents:
diff changeset
57 @Override
ad49723367c2 add priority
akahori
parents:
diff changeset
58 public long getInstanceCount() {
ad49723367c2 add priority
akahori
parents:
diff changeset
59 return sequentialOrder;
ad49723367c2 add priority
akahori
parents:
diff changeset
60 }
ad49723367c2 add priority
akahori
parents:
diff changeset
61
ad49723367c2 add priority
akahori
parents:
diff changeset
62 @Override
ad49723367c2 add priority
akahori
parents:
diff changeset
63 public int getPriority() {
ad49723367c2 add priority
akahori
parents:
diff changeset
64 return hasPriority.getPriority();
ad49723367c2 add priority
akahori
parents:
diff changeset
65 }
ad49723367c2 add priority
akahori
parents:
diff changeset
66 }
ad49723367c2 add priority
akahori
parents:
diff changeset
67
ad49723367c2 add priority
akahori
parents:
diff changeset
68 private static class ComparableRunnable implements Runnable, ComparableTask {
ad49723367c2 add priority
akahori
parents:
diff changeset
69 private Long instanceCount = instanceCounter.getAndIncrement();
ad49723367c2 add priority
akahori
parents:
diff changeset
70 private HasPriority hasPriority;
ad49723367c2 add priority
akahori
parents:
diff changeset
71 private Runnable runnable;
ad49723367c2 add priority
akahori
parents:
diff changeset
72
ad49723367c2 add priority
akahori
parents:
diff changeset
73 public ComparableRunnable(PriorityRunnable priorityRunnable) {
ad49723367c2 add priority
akahori
parents:
diff changeset
74 this.runnable = priorityRunnable;
ad49723367c2 add priority
akahori
parents:
diff changeset
75 this.hasPriority = priorityRunnable;
ad49723367c2 add priority
akahori
parents:
diff changeset
76 }
ad49723367c2 add priority
akahori
parents:
diff changeset
77
ad49723367c2 add priority
akahori
parents:
diff changeset
78 @Override
ad49723367c2 add priority
akahori
parents:
diff changeset
79 public void run() {
ad49723367c2 add priority
akahori
parents:
diff changeset
80 runnable.run();
ad49723367c2 add priority
akahori
parents:
diff changeset
81 }
ad49723367c2 add priority
akahori
parents:
diff changeset
82
ad49723367c2 add priority
akahori
parents:
diff changeset
83 @Override
ad49723367c2 add priority
akahori
parents:
diff changeset
84 public int getPriority() {
ad49723367c2 add priority
akahori
parents:
diff changeset
85 return hasPriority.getPriority();
ad49723367c2 add priority
akahori
parents:
diff changeset
86 }
ad49723367c2 add priority
akahori
parents:
diff changeset
87
ad49723367c2 add priority
akahori
parents:
diff changeset
88 @Override
ad49723367c2 add priority
akahori
parents:
diff changeset
89 public long getInstanceCount() {
ad49723367c2 add priority
akahori
parents:
diff changeset
90 return instanceCount;
ad49723367c2 add priority
akahori
parents:
diff changeset
91 }
ad49723367c2 add priority
akahori
parents:
diff changeset
92 }
ad49723367c2 add priority
akahori
parents:
diff changeset
93
ad49723367c2 add priority
akahori
parents:
diff changeset
94 private interface ComparableTask extends Runnable {
ad49723367c2 add priority
akahori
parents:
diff changeset
95 int getPriority();
ad49723367c2 add priority
akahori
parents:
diff changeset
96
ad49723367c2 add priority
akahori
parents:
diff changeset
97 long getInstanceCount();
ad49723367c2 add priority
akahori
parents:
diff changeset
98
ad49723367c2 add priority
akahori
parents:
diff changeset
99 static Comparator<ComparableTask> comparatorByPriorityAndSequentialOrder() {
ad49723367c2 add priority
akahori
parents:
diff changeset
100 return (o1, o2) -> {
ad49723367c2 add priority
akahori
parents:
diff changeset
101 int priorityResult = o2.getPriority() - o1.getPriority();
ad49723367c2 add priority
akahori
parents:
diff changeset
102 return priorityResult != 0 ? priorityResult
ad49723367c2 add priority
akahori
parents:
diff changeset
103 : (int) (o1.getInstanceCount() - o2.getInstanceCount());
ad49723367c2 add priority
akahori
parents:
diff changeset
104 };
ad49723367c2 add priority
akahori
parents:
diff changeset
105 }
ad49723367c2 add priority
akahori
parents:
diff changeset
106
ad49723367c2 add priority
akahori
parents:
diff changeset
107 }
ad49723367c2 add priority
akahori
parents:
diff changeset
108
ad49723367c2 add priority
akahori
parents:
diff changeset
109 }
ad49723367c2 add priority
akahori
parents:
diff changeset
110
ad49723367c2 add priority
akahori
parents:
diff changeset
111 public interface HasPriority{
ad49723367c2 add priority
akahori
parents:
diff changeset
112 int getPriority();
ad49723367c2 add priority
akahori
parents:
diff changeset
113 }
ad49723367c2 add priority
akahori
parents:
diff changeset
114
ad49723367c2 add priority
akahori
parents:
diff changeset
115
ad49723367c2 add priority
akahori
parents:
diff changeset
116 public interface PriorityRunnable extends Runnable, HasPriority{
ad49723367c2 add priority
akahori
parents:
diff changeset
117
ad49723367c2 add priority
akahori
parents:
diff changeset
118 public static PriorityRunnable of(Runnable runnable, int priority) {
ad49723367c2 add priority
akahori
parents:
diff changeset
119 return new PriorityRunnable() {
ad49723367c2 add priority
akahori
parents:
diff changeset
120 @Override
ad49723367c2 add priority
akahori
parents:
diff changeset
121 public void run() {
ad49723367c2 add priority
akahori
parents:
diff changeset
122 runnable.run();
ad49723367c2 add priority
akahori
parents:
diff changeset
123 }
ad49723367c2 add priority
akahori
parents:
diff changeset
124
ad49723367c2 add priority
akahori
parents:
diff changeset
125 @Override
ad49723367c2 add priority
akahori
parents:
diff changeset
126 public int getPriority() {
ad49723367c2 add priority
akahori
parents:
diff changeset
127 return priority;
ad49723367c2 add priority
akahori
parents:
diff changeset
128 }
ad49723367c2 add priority
akahori
parents:
diff changeset
129 };
ad49723367c2 add priority
akahori
parents:
diff changeset
130 }
ad49723367c2 add priority
akahori
parents:
diff changeset
131 }
ad49723367c2 add priority
akahori
parents:
diff changeset
132
ad49723367c2 add priority
akahori
parents:
diff changeset
133 }