Mercurial > hg > Database > Christie
comparison src/main/java/christie/codegear/PriorityThreadPoolExecutors.java @ 196:ad49723367c2
add priority
author | akahori |
---|---|
date | Sat, 09 Mar 2019 14:03:06 +0900 |
parents | |
children | ef5aad739292 |
comparison
equal
deleted
inserted
replaced
195:a0be7c83fff8 | 196:ad49723367c2 |
---|---|
1 package christie.codegear; | |
2 | |
3 import java.util.Comparator; | |
4 import java.util.concurrent.*; | |
5 import java.util.concurrent.atomic.AtomicLong; | |
6 | |
7 | |
8 // https://stackoverflow.com/questions/807223/how-do-i-implement-task-prioritization-using-an-executorservice-in-java-5/42831172#42831172 | |
9 public class PriorityThreadPoolExecutors { | |
10 | |
11 public static ThreadPoolExecutor createThreadPool(int nThreads, int keepAliveTime) { | |
12 return new PriorityThreadPoolExecutor(nThreads, nThreads, keepAliveTime, TimeUnit.MILLISECONDS); | |
13 } | |
14 private static class PriorityThreadPoolExecutor extends ThreadPoolExecutor { | |
15 private static final int DEFAULT_PRIORITY = 0; | |
16 private static AtomicLong instanceCounter = new AtomicLong(); | |
17 | |
18 public PriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize, | |
19 int keepAliveTime, TimeUnit unit) { | |
20 super(corePoolSize, maximumPoolSize, keepAliveTime, unit, (BlockingQueue) new PriorityBlockingQueue<ComparableTask>(10, | |
21 ComparableTask.comparatorByPriorityAndSequentialOrder())); | |
22 } | |
23 | |
24 @Override | |
25 public void execute(Runnable command) { | |
26 // If this is ugly then delegator pattern needed | |
27 if (command instanceof ComparableTask) //Already wrapped | |
28 super.execute(command); | |
29 else { | |
30 super.execute(newComparableRunnableFor(command)); | |
31 } | |
32 } | |
33 | |
34 private Runnable newComparableRunnableFor(Runnable runnable) { | |
35 return new ComparableRunnable(ensurePriorityRunnable(runnable)); | |
36 } | |
37 | |
38 @Override | |
39 protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { | |
40 return new ComparableFutureTask<>(ensurePriorityRunnable(runnable), value); | |
41 } | |
42 | |
43 private PriorityRunnable ensurePriorityRunnable(Runnable runnable) { | |
44 return (runnable instanceof PriorityRunnable) ? (PriorityRunnable) runnable | |
45 : PriorityRunnable.of(runnable, DEFAULT_PRIORITY); | |
46 } | |
47 | |
48 private class ComparableFutureTask<T> extends FutureTask<T> implements ComparableTask { | |
49 private Long sequentialOrder = instanceCounter.getAndIncrement(); | |
50 private HasPriority hasPriority; | |
51 | |
52 public ComparableFutureTask(PriorityRunnable priorityRunnable, T result) { | |
53 super(priorityRunnable, result); | |
54 this.hasPriority = priorityRunnable; | |
55 } | |
56 | |
57 @Override | |
58 public long getInstanceCount() { | |
59 return sequentialOrder; | |
60 } | |
61 | |
62 @Override | |
63 public int getPriority() { | |
64 return hasPriority.getPriority(); | |
65 } | |
66 } | |
67 | |
68 private static class ComparableRunnable implements Runnable, ComparableTask { | |
69 private Long instanceCount = instanceCounter.getAndIncrement(); | |
70 private HasPriority hasPriority; | |
71 private Runnable runnable; | |
72 | |
73 public ComparableRunnable(PriorityRunnable priorityRunnable) { | |
74 this.runnable = priorityRunnable; | |
75 this.hasPriority = priorityRunnable; | |
76 } | |
77 | |
78 @Override | |
79 public void run() { | |
80 runnable.run(); | |
81 } | |
82 | |
83 @Override | |
84 public int getPriority() { | |
85 return hasPriority.getPriority(); | |
86 } | |
87 | |
88 @Override | |
89 public long getInstanceCount() { | |
90 return instanceCount; | |
91 } | |
92 } | |
93 | |
94 private interface ComparableTask extends Runnable { | |
95 int getPriority(); | |
96 | |
97 long getInstanceCount(); | |
98 | |
99 static Comparator<ComparableTask> comparatorByPriorityAndSequentialOrder() { | |
100 return (o1, o2) -> { | |
101 int priorityResult = o2.getPriority() - o1.getPriority(); | |
102 return priorityResult != 0 ? priorityResult | |
103 : (int) (o1.getInstanceCount() - o2.getInstanceCount()); | |
104 }; | |
105 } | |
106 | |
107 } | |
108 | |
109 } | |
110 | |
111 public interface HasPriority{ | |
112 int getPriority(); | |
113 } | |
114 | |
115 | |
116 public interface PriorityRunnable extends Runnable, HasPriority{ | |
117 | |
118 public static PriorityRunnable of(Runnable runnable, int priority) { | |
119 return new PriorityRunnable() { | |
120 @Override | |
121 public void run() { | |
122 runnable.run(); | |
123 } | |
124 | |
125 @Override | |
126 public int getPriority() { | |
127 return priority; | |
128 } | |
129 }; | |
130 } | |
131 } | |
132 | |
133 } |