mirror of
https://github.com/openjdk/jdk.git
synced 2026-05-13 06:59:38 +00:00
6776941: Improve thread pool shutdown
Reviewed-by: dl, skoivu
This commit is contained in:
parent
2414c54cc1
commit
1193ef2418
@ -34,8 +34,10 @@
|
||||
*/
|
||||
|
||||
package java.util.concurrent;
|
||||
import java.util.concurrent.locks.*;
|
||||
import java.util.concurrent.atomic.*;
|
||||
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
|
||||
import java.util.concurrent.locks.Condition;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.*;
|
||||
|
||||
/**
|
||||
@ -491,10 +493,15 @@ public class ThreadPoolExecutor extends AbstractExecutorService {
|
||||
* policy limiting the number of threads. Even though it is not
|
||||
* treated as an error, failure to create threads may result in
|
||||
* new tasks being rejected or existing ones remaining stuck in
|
||||
* the queue. On the other hand, no special precautions exist to
|
||||
* handle OutOfMemoryErrors that might be thrown while trying to
|
||||
* create threads, since there is generally no recourse from
|
||||
* within this class.
|
||||
* the queue.
|
||||
*
|
||||
* We go further and preserve pool invariants even in the face of
|
||||
* errors such as OutOfMemoryError, that might be thrown while
|
||||
* trying to create threads. Such errors are rather common due to
|
||||
* the need to allocate a native stack in Thread#start, and users
|
||||
* will want to perform clean pool shutdown to clean up. There
|
||||
* will likely be enough memory available for the cleanup code to
|
||||
* complete without encountering yet another OutOfMemoryError.
|
||||
*/
|
||||
private volatile ThreadFactory threadFactory;
|
||||
|
||||
@ -568,9 +575,13 @@ public class ThreadPoolExecutor extends AbstractExecutorService {
|
||||
* task execution. This protects against interrupts that are
|
||||
* intended to wake up a worker thread waiting for a task from
|
||||
* instead interrupting a task being run. We implement a simple
|
||||
* non-reentrant mutual exclusion lock rather than use ReentrantLock
|
||||
* because we do not want worker tasks to be able to reacquire the
|
||||
* lock when they invoke pool control methods like setCorePoolSize.
|
||||
* non-reentrant mutual exclusion lock rather than use
|
||||
* ReentrantLock because we do not want worker tasks to be able to
|
||||
* reacquire the lock when they invoke pool control methods like
|
||||
* setCorePoolSize. Additionally, to suppress interrupts until
|
||||
* the thread actually starts running tasks, we initialize lock
|
||||
* state to a negative value, and clear it upon start (in
|
||||
* runWorker).
|
||||
*/
|
||||
private final class Worker
|
||||
extends AbstractQueuedSynchronizer
|
||||
@ -594,6 +605,7 @@ public class ThreadPoolExecutor extends AbstractExecutorService {
|
||||
* @param firstTask the first task (null if none)
|
||||
*/
|
||||
Worker(Runnable firstTask) {
|
||||
setState(-1); // inhibit interrupts until runWorker
|
||||
this.firstTask = firstTask;
|
||||
this.thread = getThreadFactory().newThread(this);
|
||||
}
|
||||
@ -609,7 +621,7 @@ public class ThreadPoolExecutor extends AbstractExecutorService {
|
||||
// The value 1 represents the locked state.
|
||||
|
||||
protected boolean isHeldExclusively() {
|
||||
return getState() == 1;
|
||||
return getState() != 0;
|
||||
}
|
||||
|
||||
protected boolean tryAcquire(int unused) {
|
||||
@ -630,6 +642,16 @@ public class ThreadPoolExecutor extends AbstractExecutorService {
|
||||
public boolean tryLock() { return tryAcquire(1); }
|
||||
public void unlock() { release(1); }
|
||||
public boolean isLocked() { return isHeldExclusively(); }
|
||||
|
||||
void interruptIfStarted() {
|
||||
Thread t;
|
||||
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
|
||||
try {
|
||||
t.interrupt();
|
||||
} catch (SecurityException ignore) {
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
@ -728,12 +750,8 @@ public class ThreadPoolExecutor extends AbstractExecutorService {
|
||||
final ReentrantLock mainLock = this.mainLock;
|
||||
mainLock.lock();
|
||||
try {
|
||||
for (Worker w : workers) {
|
||||
try {
|
||||
w.thread.interrupt();
|
||||
} catch (SecurityException ignore) {
|
||||
}
|
||||
}
|
||||
for (Worker w : workers)
|
||||
w.interruptIfStarted();
|
||||
} finally {
|
||||
mainLock.unlock();
|
||||
}
|
||||
@ -790,19 +808,6 @@ public class ThreadPoolExecutor extends AbstractExecutorService {
|
||||
|
||||
private static final boolean ONLY_ONE = true;
|
||||
|
||||
/**
|
||||
* Ensures that unless the pool is stopping, the current thread
|
||||
* does not have its interrupt set. This requires a double-check
|
||||
* of state in case the interrupt was cleared concurrently with a
|
||||
* shutdownNow -- if so, the interrupt is re-enabled.
|
||||
*/
|
||||
private void clearInterruptsForTaskRun() {
|
||||
if (runStateLessThan(ctl.get(), STOP) &&
|
||||
Thread.interrupted() &&
|
||||
runStateAtLeast(ctl.get(), STOP))
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
|
||||
/*
|
||||
* Misc utilities, most of which are also exported to
|
||||
* ScheduledThreadPoolExecutor
|
||||
@ -862,12 +867,13 @@ public class ThreadPoolExecutor extends AbstractExecutorService {
|
||||
* Checks if a new worker can be added with respect to current
|
||||
* pool state and the given bound (either core or maximum). If so,
|
||||
* the worker count is adjusted accordingly, and, if possible, a
|
||||
* new worker is created and started running firstTask as its
|
||||
* new worker is created and started, running firstTask as its
|
||||
* first task. This method returns false if the pool is stopped or
|
||||
* eligible to shut down. It also returns false if the thread
|
||||
* factory fails to create a thread when asked, which requires a
|
||||
* backout of workerCount, and a recheck for termination, in case
|
||||
* the existence of this worker was holding up termination.
|
||||
* factory fails to create a thread when asked. If the thread
|
||||
* creation fails, either due to the thread factory returning
|
||||
* null, or due to an exception (typically OutOfMemoryError in
|
||||
* Thread#start), we roll back cleanly.
|
||||
*
|
||||
* @param firstTask the task the new thread should run first (or
|
||||
* null if none). Workers are created with an initial first task
|
||||
@ -910,46 +916,65 @@ public class ThreadPoolExecutor extends AbstractExecutorService {
|
||||
}
|
||||
}
|
||||
|
||||
Worker w = new Worker(firstTask);
|
||||
Thread t = w.thread;
|
||||
boolean workerStarted = false;
|
||||
boolean workerAdded = false;
|
||||
Worker w = null;
|
||||
try {
|
||||
final ReentrantLock mainLock = this.mainLock;
|
||||
w = new Worker(firstTask);
|
||||
final Thread t = w.thread;
|
||||
if (t != null) {
|
||||
mainLock.lock();
|
||||
try {
|
||||
// Recheck while holding lock.
|
||||
// Back out on ThreadFactory failure or if
|
||||
// shut down before lock acquired.
|
||||
int c = ctl.get();
|
||||
int rs = runStateOf(c);
|
||||
|
||||
if (rs < SHUTDOWN ||
|
||||
(rs == SHUTDOWN && firstTask == null)) {
|
||||
if (t.isAlive()) // precheck that t is startable
|
||||
throw new IllegalThreadStateException();
|
||||
workers.add(w);
|
||||
int s = workers.size();
|
||||
if (s > largestPoolSize)
|
||||
largestPoolSize = s;
|
||||
workerAdded = true;
|
||||
}
|
||||
} finally {
|
||||
mainLock.unlock();
|
||||
}
|
||||
if (workerAdded) {
|
||||
t.start();
|
||||
workerStarted = true;
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
if (! workerStarted)
|
||||
addWorkerFailed(w);
|
||||
}
|
||||
return workerStarted;
|
||||
}
|
||||
|
||||
/**
|
||||
* Rolls back the worker thread creation.
|
||||
* - removes worker from workers, if present
|
||||
* - decrements worker count
|
||||
* - rechecks for termination, in case the existence of this
|
||||
* worker was holding up termination
|
||||
*/
|
||||
private void addWorkerFailed(Worker w) {
|
||||
final ReentrantLock mainLock = this.mainLock;
|
||||
mainLock.lock();
|
||||
try {
|
||||
// Recheck while holding lock.
|
||||
// Back out on ThreadFactory failure or if
|
||||
// shut down before lock acquired.
|
||||
int c = ctl.get();
|
||||
int rs = runStateOf(c);
|
||||
|
||||
if (t == null ||
|
||||
(rs >= SHUTDOWN &&
|
||||
! (rs == SHUTDOWN &&
|
||||
firstTask == null))) {
|
||||
decrementWorkerCount();
|
||||
tryTerminate();
|
||||
return false;
|
||||
}
|
||||
|
||||
workers.add(w);
|
||||
|
||||
int s = workers.size();
|
||||
if (s > largestPoolSize)
|
||||
largestPoolSize = s;
|
||||
if (w != null)
|
||||
workers.remove(w);
|
||||
decrementWorkerCount();
|
||||
tryTerminate();
|
||||
} finally {
|
||||
mainLock.unlock();
|
||||
}
|
||||
|
||||
t.start();
|
||||
// It is possible (but unlikely) for a thread to have been
|
||||
// added to workers, but not yet started, during transition to
|
||||
// STOP, which could result in a rare missed interrupt,
|
||||
// because Thread.interrupt is not guaranteed to have any effect
|
||||
// on a non-yet-started Thread (see Thread#interrupt).
|
||||
if (runStateOf(ctl.get()) == STOP && ! t.isInterrupted())
|
||||
t.interrupt();
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -1096,15 +1121,25 @@ public class ThreadPoolExecutor extends AbstractExecutorService {
|
||||
* @param w the worker
|
||||
*/
|
||||
final void runWorker(Worker w) {
|
||||
Thread wt = Thread.currentThread();
|
||||
Runnable task = w.firstTask;
|
||||
w.firstTask = null;
|
||||
w.unlock(); // allow interrupts
|
||||
boolean completedAbruptly = true;
|
||||
try {
|
||||
while (task != null || (task = getTask()) != null) {
|
||||
w.lock();
|
||||
clearInterruptsForTaskRun();
|
||||
// If pool is stopping, ensure thread is interrupted;
|
||||
// if not, ensure thread is not interrupted. This
|
||||
// requires a recheck in second case to deal with
|
||||
// shutdownNow race while clearing interrupt
|
||||
if ((runStateAtLeast(ctl.get(), STOP) ||
|
||||
(Thread.interrupted() &&
|
||||
runStateAtLeast(ctl.get(), STOP))) &&
|
||||
!wt.isInterrupted())
|
||||
wt.interrupt();
|
||||
try {
|
||||
beforeExecute(w.thread, task);
|
||||
beforeExecute(wt, task);
|
||||
Throwable thrown = null;
|
||||
try {
|
||||
task.run();
|
||||
@ -2064,3 +2099,4 @@ public class ThreadPoolExecutor extends AbstractExecutorService {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user