使用ThreadPoolExecutor遇到的核心线程被阻塞,非核心线程未按照预期运行问题
简单分析ThreadPoolExecutor回收工作线程的原理
A thread in the waiting state is waiting for another thread to* perform a particular action.* 处于等待状态的线程正在等待另一个线程执行特定操作。** For example, a thread that has called {@code Object.wait()}* on an object is waiting for another thread to call* {@code Object.notify()} or {@code Object.notifyAll()} on* that object. A thread that has called {@code Thread.join()}* is waiting for a specified thread to terminate.*/WAITING,/*** Thread state for a waiting thread with a specified waiting time.* A thread is in the timed waiting state due to calling one of* the following methods with a specified positive waiting time:* 具有指定等待时间的等待线程的线程状态。由于以指定的正等待时间调用以下方法之一,线程处于定时等待状态:* public enum State {/*** Thread state for a thread which has not yet started. 尚未启动的线程的线程状态。*/NEW,/*** Thread state for a runnable thread. A thread in the runnable* state is executing in the Java virtual machine but it may* be waiting for other resources from the operating system* such as processor. * 可运行线程的线程状态。处于可运行状态的线程正在 Java 虚拟机中执行,但它可能正在等待来自操作系统的其他资源,例如处理器。*/RUNNABLE,/*** Thread state for a thread blocked waiting for a monitor lock.* A thread in the blocked state is waiting for a monitor lock* to enter a synchronized block/method or* reenter a synchronized block/method after calling* {@link Object#wait() Object.wait}.* 线程阻塞等待监视器锁的线程状态。处于阻塞状态的线程在调用后等待监视器锁进入同步块/方法*/BLOCKED,/*** Thread state for a waiting thread.* A thread is in the waiting state due to calling one of the* following methods:* 等待线程的线程状态。由于调用以下方法之一,线程处于等待状态:*
*
** *
*/TIMED_WAITING,/*** Thread state for a terminated thread.* The thread has completed execution.* 已终止线程的线程状态。线程已完成执行。*/TERMINATED;}
1. ThreadPoolExecutor构造函数
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory) {this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,threadFactory, defaultHandler);}int corePoolSize, // 核心线程
int maximumPoolSize, // 最大线程数
long keepAliveTime, // 持续时间
TimeUnit unit, // 时间单位
BlockingQueue<Runnable> workQueue, // 阻塞线程
ThreadFactory threadFactory //工厂方法
2. public void execute(Runnable command)
public void execute(Runnable command) {if (command &#61;&#61; null)throw new NullPointerException();/** Proceed in 3 steps:** 1. If fewer than corePoolSize threads are running, try to* start a new thread with the given command as its first* task. The call to addWorker atomically checks runState and* workerCount, and so prevents false alarms that would add* threads when it shouldn&#39;t, by returning false.** 2. If a task can be successfully queued, then we still need* to double-check whether we should have added a thread* (because existing ones died since last checking) or that* the pool shut down since entry into this method. So we* recheck state and if necessary roll back the enqueuing if* stopped, or start a new thread if there are none.** 3. If we cannot queue task, then we try to add a new* thread. If it fails, we know we are shut down or saturated* and so reject the task.*/int c &#61; ctl.get();// 小于核心线程&#xff0c;直接加入if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))return;c &#61; ctl.get();}// 判断是否加入阻塞队列if (isRunning(c) && workQueue.offer(command)) {int recheck &#61; ctl.get();if (! isRunning(recheck) && remove(command))reject(command); // 拒绝策略else if (workerCountOf(recheck) &#61;&#61; 0)addWorker(null, false);}else if (!addWorker(command, false)) // 尝试加入非核心线程reject(command); // 没有加入成功&#xff0c;拒绝策略}
总结&#xff1a;
/** Methods for creating, running and cleaning up after workers*//*** Checks if a new worker can be added with respect to current* pool state and the given bound (either core or maximum). If so,* the worker count is adjusted accordingly, and, if possible, a* new worker is created and started, running firstTask as its* first task. This method returns false if the pool is stopped or* eligible to shut down. It also returns false if the thread* factory fails to create a thread when asked. If the thread* creation fails, either due to the thread factory returning* null, or due to an exception (typically OutOfMemoryError in* Thread.start()), we roll back cleanly.** &#64;param firstTask the task the new thread should run first (or* null if none). Workers are created with an initial first task* (in method execute()) to bypass queuing when there are fewer* than corePoolSize threads (in which case we always start one),* or when the queue is full (in which case we must bypass queue).* Initially idle threads are usually created via* prestartCoreThread or to replace other dying workers.** &#64;param core if true use corePoolSize as bound, else* maximumPoolSize. (A boolean indicator is used here rather than a* value to ensure reads of fresh values after checking other pool* state).* &#64;return true if successful*/private boolean addWorker(Runnable firstTask, boolean core) {retry:for (int c &#61; ctl.get();;) {// Check if queue empty only if necessary.if (runStateAtLeast(c, SHUTDOWN)&& (runStateAtLeast(c, STOP)|| firstTask !&#61; null|| workQueue.isEmpty()))return false;for (;;) {// 如果不满足核心和非核心线程数&#xff0c;就falseif (workerCountOf(c)>&#61; ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))return false;if (compareAndIncrementWorkerCount(c))break retry;c &#61; ctl.get(); // Re-read ctlif (runStateAtLeast(c, SHUTDOWN))continue retry;// else CAS failed due to workerCount change; retry inner loop}}boolean workerStarted &#61; false;boolean workerAdded &#61; false;Worker w &#61; null;try {// 创建一个Worker工作类w &#61; new Worker(firstTask);final Thread t &#61; w.thread;if (t !&#61; null) {final ReentrantLock mainLock &#61; this.mainLock;mainLock.lock();try {// Recheck while holding lock.// Back out on ThreadFactory failure or if// shut down before lock acquired.int c &#61; ctl.get();if (isRunning(c) || (runStateLessThan(c, STOP) && firstTask &#61;&#61; null)) {if (t.getState() !&#61; Thread.State.NEW)throw new IllegalThreadStateException();workers.add(w);workerAdded &#61; true;int s &#61; workers.size();if (s > largestPoolSize)largestPoolSize &#61; s;}} finally {mainLock.unlock();}if (workerAdded) {t.start();workerStarted &#61; true;}}} finally {if (! workerStarted)addWorkerFailed(w);}return workerStarted;}
总结&#xff1a;
1. 判断是否在核心和非核心线程内
2. 创建一个Worker
3. 添加到workers线程中
4. 开启Worker中Thread的线程
private final class Worker extends AbstractQueuedSynchronizer implements Runnable{/*** This class will never be serialized, but we provide a* serialVersionUID to suppress a javac warning.*/private static final long serialVersionUID &#61; 6138294804551838833L;/** Thread this worker is running in. Null if factory fails. */final Thread thread;/** Initial task to run. Possibly null. */Runnable firstTask;/** Per-thread task counter */volatile long completedTasks;// TODO: switch to AbstractQueuedLongSynchronizer and move// completedTasks into the lock word./*** Creates with given first task and thread from ThreadFactory.* &#64;param firstTask the first task (null if none)*/Worker(Runnable firstTask) {setState(-1); // inhibit interrupts until runWorkerthis.firstTask &#61; firstTask;this.thread &#61; getThreadFactory().newThread(this);}/** Delegates main run loop to outer runWorker. */public void run() {// 真正运行线程开始runWorker(this);}// Lock methods//// The value 0 represents the unlocked state.// The value 1 represents the locked state.protected boolean isHeldExclusively() {return getState() !&#61; 0;}protected boolean tryAcquire(int unused) {if (compareAndSetState(0, 1)) {setExclusiveOwnerThread(Thread.currentThread());return true;}return false;}protected boolean tryRelease(int unused) {setExclusiveOwnerThread(null);setState(0);return true;}public void lock() { acquire(1); }public boolean tryLock() { return tryAcquire(1); }public void unlock() { release(1); }public boolean isLocked() { return isHeldExclusively(); }void interruptIfStarted() {Thread t;if (getState() >&#61; 0 && (t &#61; thread) !&#61; null && !t.isInterrupted()) {try {t.interrupt();} catch (SecurityException ignore) {}}}}
总结&#xff1a;
/*** Main worker run loop. Repeatedly gets tasks from queue and* executes them, while coping with a number of issues:** 1. We may start out with an initial task, in which case we* don&#39;t need to get the first one. Otherwise, as long as pool is* running, we get tasks from getTask. If it returns null then the* worker exits due to changed pool state or configuration* parameters. Other exits result from exception throws in* external code, in which case completedAbruptly holds, which* usually leads processWorkerExit to replace this thread.** 2. Before running any task, the lock is acquired to prevent* other pool interrupts while the task is executing, and then we* ensure that unless pool is stopping, this thread does not have* its interrupt set.** 3. Each task run is preceded by a call to beforeExecute, which* might throw an exception, in which case we cause thread to die* (breaking loop with completedAbruptly true) without processing* the task.** 4. Assuming beforeExecute completes normally, we run the task,* gathering any of its thrown exceptions to send to afterExecute.* We separately handle RuntimeException, Error (both of which the* specs guarantee that we trap) and arbitrary Throwables.* Because we cannot rethrow Throwables within Runnable.run, we* wrap them within Errors on the way out (to the thread&#39;s* UncaughtExceptionHandler). Any thrown exception also* conservatively causes thread to die.** 5. After task.run completes, we call afterExecute, which may* also throw an exception, which will also cause thread to* die. According to JLS Sec 14.20, this exception is the one that* will be in effect even if task.run throws.** The net effect of the exception mechanics is that afterExecute* and the thread&#39;s UncaughtExceptionHandler have as accurate* information as we can provide about any problems encountered by* user code.** &#64;param w the worker*/final void runWorker(Worker w) {Thread wt &#61; Thread.currentThread();Runnable task &#61; w.firstTask;w.firstTask &#61; null;w.unlock(); // allow interruptsboolean completedAbruptly &#61; true;try {// 1. task不为空// 2. getTask 从阻塞列表中获取while (task !&#61; null || (task &#61; getTask()) !&#61; null) {w.lock();// If pool is stopping, ensure thread is interrupted;// if not, ensure thread is not interrupted. This// requires a recheck in second case to deal with// shutdownNow race while clearing interruptif ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();try {beforeExecute(wt, task);try {task.run();afterExecute(task, null);} catch (Throwable ex) {afterExecute(task, ex);throw ex;}} finally {task &#61; null;w.completedTasks&#43;&#43;;w.unlock();}}completedAbruptly &#61; false;} finally {processWorkerExit(w, completedAbruptly);}}
总结;
/*** Performs blocking or timed wait for a task, depending on* current configuration settings, or returns null if this worker* must exit because of any of:* 1. There are more than maximumPoolSize workers (due to* a call to setMaximumPoolSize).* 2. The pool is stopped.* 3. The pool is shutdown and the queue is empty.* 4. This worker timed out waiting for a task, and timed-out* workers are subject to termination (that is,* {&#64;code allowCoreThreadTimeOut || workerCount > corePoolSize})* both before and after the timed wait, and if the queue is* non-empty, this worker is not the last thread in the pool.** &#64;return task, or null if the worker must exit, in which case* workerCount is decremented*/private Runnable getTask() {boolean timedOut &#61; false; // Did the last poll() time out?for (;;) {int c &#61; ctl.get();// Check if queue empty only if necessary.if (runStateAtLeast(c, SHUTDOWN)&& (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {decrementWorkerCount();return null;}int wc &#61; workerCountOf(c);// Are workers subject to culling?boolean timed &#61; allowCoreThreadTimeOut || wc > corePoolSize;if ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {if (compareAndDecrementWorkerCount(c))return null;continue;}try {// 从阻塞队列中获取&#xff0c; 此处有用到延迟消息// poll(keepAliveTime 是延迟获取// take() 是阻塞获取Runnable r &#61; timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();if (r !&#61; null)return r;timedOut &#61; true;} catch (InterruptedException retry) {timedOut &#61; false;}}}
总结:
/*** Performs cleanup and bookkeeping for a dying worker. Called* only from worker threads. Unless completedAbruptly is set,* assumes that workerCount has already been adjusted to account* for exit. This method removes thread from worker set, and* possibly terminates the pool or replaces the worker if either* it exited due to user task exception or if fewer than* corePoolSize workers are running or queue is non-empty but* there are no workers.** &#64;param w the worker* &#64;param completedAbruptly if the worker died due to user exception*/private void processWorkerExit(Worker w, boolean completedAbruptly) {if (completedAbruptly) // If abrupt, then workerCount wasn&#39;t adjusteddecrementWorkerCount();final ReentrantLock mainLock &#61; this.mainLock;mainLock.lock();try {completedTaskCount &#43;&#61; w.completedTasks;workers.remove(w);} finally {mainLock.unlock();}tryTerminate();int c &#61; ctl.get();if (runStateLessThan(c, STOP)) {if (!completedAbruptly) {int min &#61; allowCoreThreadTimeOut ? 0 : corePoolSize;if (min &#61;&#61; 0 && ! workQueue.isEmpty())min &#61; 1;if (workerCountOf(c) >&#61; min)return; // replacement not needed}addWorker(null, false);}}
总结&#xff1a;
addWorkder(null, false);
添加一个null的runnable, 当为空时候,会从队列中去取.