并发编程
1、线程池中提交一个任务的流程是怎样的?
1、提交任务:首先,一个任务被提交到线程池。这个任务通常是一个实现了Runnable
或Callable
接口的对象;
2、检测线程池状态:线程池会首先检测其运行状态。如果线程池不是RUNNING
状态,任务会被直接拒绝
3、核心线程判断:如果当前工作线程数workerCount
小于核心线程数corePoolSize
,线程池会创建一个新的核心线程来执行提交的任务;
4、阻塞队列判断:如果工作线程数已经达到核心线程数,但线程池内的阻塞队列workQueue
还未满,任务会被添加到这个阻塞队列中,等待执行。随后,空闲的核心线程会依次从队列中取出任务来执行;
5、非核心线程判断:如果阻塞队列已满,则入队失败,那么会尝试增加线程,如果当前线程池中线程数小于最大线程数maximumPoolSize
, 线程池会创建一个新的非核心线程(也称为临时线程)来执行任务;
6、拒绝策略:如果阻塞队列满了,且工作线程数已达到最大线程数,线程池会根据预设的拒绝策略来处理这个任务。默认的处理方式是直接抛出一个RejectedExecutionException
异常,但还有其他策略如CallerRunsPolicy
(在调用者线程执行)、DiscardPolicy
(任务直接丢弃,不做任何处理)和DiscardOldestPolicy
(丢弃队列里最旧的那个任务,再尝试执行当前任务)等。但是这种策略有一个弊端就是任务执行的轨迹不会被记录下来。所以,我们往往需要实现自定义的拒绝策略, 通过实现RejectedExecutionHandler
接口的方式。
在整个过程中,线程池会优先使用核心线程来执行任务,其次是阻塞队列,最后是非核心线程。如果所有资源都已经用尽,任务会根据拒绝策略进行处理。
注意,线程池提供了两种主要的方法来执行任务:execute()
和submit()
。其中,execute()
方法用于提交不需要返回值的任务,而submit()
方法用于提交一个任务并带有返回值,这个方法将返回一个Future类型对象
,可以通过这个返回对象判断任务是否执行成功,并且可以通过future.get()
方法来获取返回值。
流程图:
代码:
public void test() {
// 定义线程池的参数
int corePoolSize = 5; // 核心线程数
int maximumPoolSize = 10; // 最大线程数
long keepAliveTime = 60L; // 非核心线程的空闲存活时间
TimeUnit unit = TimeUnit.SECONDS; // 时间单位
BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(100); // 阻塞队列
ThreadFactory threadFactory = Executors.defaultThreadFactory(); // 线程工厂
RejectedExecutionHandler handler = new ThreadPoolExecutor.AbortPolicy(); // 拒绝策略
// 创建线程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(
corePoolSize,
maximumPoolSize,
keepAliveTime,
unit,
workQueue,
threadFactory,
handler
);
// 提交任务到线程池
executor.execute(new Runnable() {
@Override
public void run() {
// 模拟耗时操作
System.out.println("nihao");
}
});
}
// 线程池execute()方法源码
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// 当前线程数小于核心线程数
if (workerCountOf(c) < corePoolSize) {
// 新创建一个线程
if (addWorker(command, true))
return;
c = ctl.get();
}
// 如果当前线程数大于核心线程数,则添加到阻塞队列中
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 如果阻塞队列满了,则创建非核心线程
else if (!addWorker(command, false))
// 当前线程数是否>=最大线程数,执行拒绝策略
reject(command);
}
// 创建线程源码
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
// 判断当前线程数是否>=最大线程数
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
}
}
boolean workerStarted = false;
boolean workerAdded = false;
ThreadPoolExecutor.Worker w = null;
try {
w = new ThreadPoolExecutor.Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
2、线程池有几种状态?分别是如何变化的?
线程池有五种状态,分别是:
1、Running(运行状态):
线程池经过初始化后,就进入了运行状态。这个状态下,线程池中的线程开始执行任务,并且会接受新任务、会处理队列中的任务;
2、Shutdown(关闭状态):
当调用线程池的shutdown()
方法后,线程池的状态会变为Shutdown。这个状态下,线程池不会接收新的任务,会处理队列中的任务,任务处理完后会中断所有线程;
3、Stop(停止状态):
当调用线程池的shutdownNow()
方法后,线程池的状态会变为Stop。这个状态下,线程池不会接收新的任务,不会处理队列中的任务,并且会直接中断所有线程;
4、Tidying(整理状态):
所有的线程都停止了之后,线程池的状态会转变为TIDYING,一旦达到此状态,就会调用线程池的terminated()方法;
5、Terminated(终止状态):
线程池处于TIDYING状态后,会执行terminated()方法,执行完后就进入Terminated状态。
RUNNING: Accept new tasks and process queued tasks
* SHUTDOWN: Don't accept new tasks, but process queued tasks
* STOP: Don't accept new tasks, don't process queued tasks,
* and interrupt in-progress tasks
* TIDYING: All tasks have terminated, workerCount is zero,
* the thread transitioning to state TIDYING
* will run the terminated() hook method
* TERMINATED: terminated() has completed
线程池的状态变化通常是通过调用shutdown()
或shutdownNow()
方法来实现的。当调用shutdown()
方法时,线程池的状态会从Running
到Shutdown
,再到Tidying
,最后到Terminated
销毁状态。当调用shutdownNow()
方法时,线程池的状态会从Running
到Stop
,再到Tidying
,最后到Terminated
销毁状态
代码:
// shutdown()方法源码
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
// 修改线程池状态为SHUTDOWN
advanceRunState(SHUTDOWN);
// 关闭线程
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
// 当线程都关闭后,执行tryTerminate()方法,将线程池的状态修改为TIDYING
tryTerminate();
}
// shutdownNow()方法源码
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
// 修改线程池状态为STOP,只修改状态。先修改状态,在关闭线程,防止有新的任务进来
advanceRunState(STOP);
// 关闭线程
interruptWorkers();
tasks = drainQueue();
} finally {
mainLock.unlock();
}
// 当线程都关闭后,执行tryTerminate()方法,将线程池的状态修改为TIDYING
tryTerminate();
return tasks;
}
final void tryTerminate() {
for (;;) {
int c = ctl.get();
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE);
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// ctlOf(TIDYING, 0) 修改线程池状态为TIDYING
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
// 空方法,当线程池都关闭后,如果想要做其他额外处理,可重写此方法进行扩展
terminated();
} finally {
// 修改线程池状态为TERMINATED,最终状态,线程池到此真正关闭
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
3、如何停止一个线程?
Thread
类中有两个方法:
start()
:开启一个线程
stop()
:停止一个线程
但是stop()
方法不建议使用,并且是有可能在未来版本中删除掉的
因为stop()
方法太粗暴了,一旦调用了stop()
方法,就会直接停掉线程,这样就可能造成严重的问题,比如任务执行到哪一步了?锁释放了吗?等一系列问题。
注意:stop()
方法会释放线程占用的synchronized锁
,而不会自动释放ReentrantLock锁
public static void main(String[] args) {
Object lock = new Object();
Thread thread = new Thread(()->{
synchronized (lock) {
for (int i = 0; i < 100; i++) {
System.out.println(i);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
});
thread.start();
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
thread.stop();
synchronized (lock){
// 程序能正常打印这条语句,说明线程调用stop()方法会释放synchronized锁
System.out.println("拿到锁了");
}
}
public static void main(String[] args) {
ReentrantLock lock = new ReentrantLock();
Thread thread = new Thread(() -> {
// 加ReentrantLock锁
lock.lock();
for (int i = 0; i < 100; i++) {
System.out.println(i);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 释放锁
lock.unlock();
});
thread.start();
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
thread.stop();
lock.lock();
// 程序不能正常打印这条语句,说明线程调用stop()方法不会释放ReentrantLock锁
System.out.println("拿到锁了");
lock.unlock();
}
正常情况下我们可以使用中断机制(Interrupt)来停止一个线程
Thread
类中有interrupt()
方法可以来停止线程,interrupt()
方法并不是直接停止线程,而是在线程中设置一个中断状态。线程可以使用isInterrupted()
方法检查中断状态,并据此决定是否继续执行;
如果线程处于阻塞状态(如sleep()
或wait()
等),调用interrupt()
会唤醒线程,并抛出InterruptedException异常,同时会清除isInterrupted()
方法的中断状态,重新置为false
;
另外,线程池中也是通过interrupt()
来停止线程的,比如shutdownNow()
方法中就会调用
public static void main(String[] args) {
Thread thread = new Thread(() -> {
for (int i = 0; i < 100; i++) {
// 由当前线程决定是否中断线程
if (Thread.currentThread().isInterrupted() && i > 50) {
break;
}
// 打印到50线程被就中止掉
System.out.println(i);
}
});
thread.start();
// 中断线程
thread.interrupt();
System.out.println("end");
}
public static void main(String[] args) {
Thread thread = new Thread(() -> {
for (int i = 0; i < 100; i++) {
// 由当前线程决定是否中断线程
if (Thread.currentThread().isInterrupted() && i > 50) {
break;
}
System.out.println(i);
try {
// 当线程存在阻塞状态,会清除中止状态,这里会打印到99
Thread.sleep(1000);
} catch (InterruptedException e) {
// e.printStackTrace();
}
}
});
thread.start();
// 中断线程
thread.interrupt();
System.out.println("end");
}