Java 线程池详解(ThreadPoolExecutor)

2021/5/24 ThreadPoolExecutor随笔

什么是线程池?简单理解,它就是一个用于管理线程的池子。掌握并发编程以及线程池是一名程序员的基本要求,相信在日常编程开发中也会经常直接或者间接的用到线程池(例如网络访问,数据处理等场景),求职面试过程中,几乎都会被问到有关于线程池的相关问题。

# 1. 线程池的特点

  • 管理线程,避免增加创建线程和销毁线程的资源损耗,因为线程对象的创建、运行、停止、销毁都是需要资源的开销的。
  • 提高响应速度,相对于从线程池拿一个现有的线程去执行任务和重新去创建一条线程执行,速度肯定慢很多。
  • 可重复利用资源 ,线程任务执行完毕后,可以再放回池子等待下次使用,可以达到重复利用的效果,节省资源。

# 2. 创建线程池的方式

# Executors 创建线程池

Java 中可以使用Executors快捷的创建线程池,只需要调用 Executors 中相应的便捷创建线程池的方法即可,比如Executors.newFixedThreadPool(int nThreads),当然了,方便的同时,为我们埋下了潜在的各种局限性和隐患(OOM,资源耗尽等)。Executors 中创建线程池的快捷方法,实际上是调用了ThreadPoolExecutor的对应的构造方法(定时任务使用的是ScheduledThreadPoolExecutor)。

Executors 快速创建线程池的几种方法如下:

方法名 功能
newFixedThreadPool(int nThreads) 创建固定大小的线程池
newSingleThreadExecutor() 创建只有一个线程的线程池
newCachedThreadPool() 创建一个不限线程数上限的线程池,任何提交的任务都将立即执行
newScheduledThreadPool() 创建一个线程池,该线程池可以安排任务在给定延迟后运行,或定期执行

提示:

学习、测试、验证、小业务等场景使用 Executors 快捷方式创建线程池应该没有什么大的问题,但是在生产环境、复杂业务等场景就不建议使用了,因为便捷的使用方法所带来的很多潜在的隐患问题是无法控制的,推荐使用ThreadPoolExecutor的构造方法进行精确配置方法来创建线程池。

# ThreadPoolExecutor 创建线程池

线程池可以通过 ThreadPoolExecutor 构造函数来精确创建,一起来看一下构造函数:

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
       ...
    }

构造函数的相关参数介绍:

  • corePoolSize: 线程池核心线程数最大值,线程池长期维持的线程数,即使线程处于Idle状态,也不会回收。
  • maximumPoolSize: 线程池最大线程数大小,线程池线程数量的上限。
  • keepAliveTime: 线程池中非核心线程空闲的存活时间大小,超过这个时间,多余的线程会被回收。
  • TimeUnit: 非核心线程空闲存活时间的单位。
  • BlockingQueue: 存放任务的阻塞队列。
  • ThreadFactory: 用于设置创建线程的工厂(新线程的产生方式),可以给创建的线程设置有意义的名字,可方便排查问题。
  • RejectedExecutionHandler: 线程池的饱和拒绝策略。

# 3. 线程池执行流程

线程池的工作流程介绍

If fewer than corePoolSize threads are running, the Executor always prefers adding a new thread rather than queuing.

If corePoolSize or more threads are running, the Executor always prefers queuing a request rather than adding a new thread.

If a request cannot be queued, a new thread is created unless this would exceed maximumPoolSize, in which case, the task will be rejected.

核心线程 -> 任务队列 -> 最大线程数 -> 拒绝策略

执行流程示意图:

线程池执行流程

# 4. 提交的任务对象

可以向线程池提交的任务有两种:RunnableCallable,二者的区别如下:

  1. 方法签名不同,void Runnable.run(), V Callable.call() throws Exception
  2. 是否允许有返回值,Callable允许有返回值。
  3. 是否允许抛出异常,Callable允许抛出异常。

Callable是 JDK1.5 时加入的接口,作为Runnable的一种补充,允许有返回值,允许抛出异常。

# 5. 提交任务的方式

提交方式 是否关心返回结果
Future submit(Callable task)
void execute(Runnable command)
Future<?> submit(Runnable task) 否,虽然返回Future,但是其get()方法总是返回null

# 6. 几种工作队列

  • ArrayBlockingQueue:数组队列,是一个用数组实现的有界阻塞队列,按FIFO排序量。
  • LinkedBlockingQueue:链表队列,基于链表结构的阻塞队列,按 FIFO 排序任务,容量可以选择进行设置,不设置的话,将是一个无边界的阻塞队列,最大长度为 Integer.MAX_VALUE,吞吐量通常要高于 ArrayBlockingQuene;newFixedThreadPool 线程池使用了这个队列。
  • DelayQueue:延迟队列,是一个任务定时周期的延迟执行的队列。根据指定的执行时间从小到大排序,否则根据插入到队列的先后排序。newScheduledThreadPool线程池使用了这个队列。
  • PriorityBlockingQueue:优先级队列,是具有优先级的无界阻塞队列。
  • SynchronousQueue:同步队列,一个不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQuene,newCachedThreadPool 线程池使用了这个队列。

# 7. 几种拒绝策略

拒绝策略 拒绝行为
AbortPolicy 抛出RejectedExecutionException
DiscardPolicy 什么也不做,直接丢弃任务
DiscardOldestPolicy 丢弃执行队列中最老的任务,尝试为当前提交的任务腾出位置
CallerRunsPolicy 交给线程池调用所在的线程进行处理,由提交任务者执行这个任务

# 8. 几种常用的线程池

# newFixedThreadPool

固定数目线程的线程池线程池特点:

  • 核心线程数和最大线程数大小一样
  • 没有所谓的非空闲时间,即keepAliveTime为0
  • 阻塞队列为无界队列LinkedBlockingQueue

工作机制示意图:

newFixedThreadPool_flow

# newCachedThreadPool

可缓存线程的线程池线程池特点:

  • 核心线程数为 0
  • 最大线程数为 Integer.MAX_VALUE
  • 阻塞队列是 SynchronousQueue
  • 非核心线程空闲存活时间为 60 秒

当提交任务的速度大于处理任务的速度时,每次提交一个任务,就必然会创建一个线程。极端情况下会创建过多的线程,耗尽 CPU 和内存资源。由于空闲 60 秒的线程会被终止,长时间保持空闲的 CachedThreadPool 不会占用任何资源。

工作机制示意图:

CachedThreadPool

# newSingleThreadExecutor

单线程的线程池线程池特点:

  • 核心线程数为1
  • 最大线程数也为1
  • 阻塞队列是LinkedBlockingQueue
  • keepAliveTime为0

工作机制示意图:

newSingleThreadExecutor

# newScheduledThreadPool

定时及周期执行的线程池线程池特点:

  • 最大线程数为Integer.MAX_VALUE
  • 阻塞队列是DelayedWorkQueue
  • keepAliveTime为0
  • scheduleAtFixedRate() :按某种速率周期执行
  • scheduleWithFixedDelay():在某个延迟后执行

工作原理与 Timer 类似,但是精确度比 Timer 高。

简单工作原理:

  1. 添加一个任务。
  2. 线程池中的线程从 DelayQueue 中取任务。
  3. 线程从 DelayQueue 中获取 time 大于等于当前时间的task。
  4. 执行完后修改这个 task 的 time 为下次被执行的时间。
  5. 这个 task 放回DelayQueue队列中。

# 9. 线程池异常处理

在使用线程池处理任务的时候,任务代码可能抛出 Exception,抛出异常后,我们可能无法感知任务出现了异常,因此我们需要考虑线程池异常情况。

# Future 处理异常

submit()方法提交 Callable 任务对象可以返回一个 Future 对象,submit 方法本身不会传递结果和任务执行过程中的异常,线程池的处理结果、以及处理过程中的异常都被包装到Future中,并在调用Future.get()方法时获取,执行过程中的异常会被捕获并包装ExecutionException

示例代码:

ExecutorService pool = Executors.newFixedThreadPool(1);
Future<Integer> future = pool.submit(new Callable<Integer>() {
    @Override
    public Integer call() throws Exception {
        throw new Exception("futureExceptionHandler test!");
    }
});

try {
    Integer result = future.get();
    System.out.println("--> " + result);
} catch (InterruptedException e) {
    e.printStackTrace();
} catch (ExecutionException e) {
    // 捕获的线程异常处理
    System.out.println("--> " + e.getMessage());
    e.printStackTrace();
}

运行结果:

--> java.lang.Exception: futureExceptionHandler test!
java.util.concurrent.ExecutionException: java.lang.Exception: futureExceptionHandler test!
	at java.util.concurrent.FutureTask.report(FutureTask.java:122)
	at java.util.concurrent.FutureTask.get(FutureTask.java:192)
	at org.jm.demo.ThreadPoolFunc.futureExceptionHandler(ThreadPoolFunc.java:17)
	at org.jm.demo.Main.main(Main.java:5)
Caused by: java.lang.Exception: futureExceptionHandler test!
Caused by: java.lang.Exception: futureExceptionHandler test!

	at org.jm.demo.ThreadPoolFunc$1.call(ThreadPoolFunc.java:12)
	at org.jm.demo.ThreadPoolFunc$1.call(ThreadPoolFunc.java:9)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

# Thread 异常处理器

使用 Thread 对象中的 setUncaughtExceptionHandler() 方法来给当前工作的线程设置一个异常监听处理,这个只能用于未被捕获的异常,所以不能使用线程池的 submit 方式提交执行任务,因为submit方法自己捕获了异常并进行了处理,可以使用 execute 的方式来提交执行任务,进行线程异常监听捕获处理。

JDK5 之后允许我们在每一个Thread对象上添加一个异常处理器 UncaughtExceptionHandlerThread.UncaughtExceptionHandler.uncaughtException()方法会在线程因未捕获的异常而面临死亡时被调用。

示例代码:

ExecutorService pool = Executors.newFixedThreadPool(1, new ThreadFactory() {
    @Override
    public Thread newThread(Runnable r) {
        Thread thread = new Thread(r);
        thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
            @Override
            public void uncaughtException(Thread t, Throwable e) {
                // 线程异常捕获处理
                System.out.println("--> " + e);
                e.printStackTrace();
            }
        });
        return thread;
    }
});

pool.execute(new Runnable() {
    @Override
    public void run() {
        System.out.println("--> run");
        String str = null;
        str.toString();
    }
});

运行结果:

--> run
--> java.lang.NullPointerException
java.lang.NullPointerException
	at org.jm.demo.ThreadPoolFunc$3.run(ThreadPoolFunc.java:49)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

# ThreadGroup 异常处理器

可以给线程对象设置到 ThreadGroup 中,可以分组进行不同异常场景的处理。

示例代码:

ExecutorService pool = Executors.newFixedThreadPool(1, new ThreadFactory() {
    // 创建一个线程组,实现异常捕获方法
    private ThreadGroup mThreadGroup = new ThreadGroup("threadGroup") {
        @Override
        public void uncaughtException(Thread t, Throwable e) {
            // 线程组异常捕获处理
            System.out.println("--> " + e);
            e.printStackTrace();
        }
    };

    @Override
    public Thread newThread(Runnable r) {
        Thread thread = new Thread(mThreadGroup, r);
        return thread;
    }
});

pool.execute(new Runnable() {
    @Override
    public void run() {
        System.out.println("--> run");
        String str = null;
        str.toString();
    }
});

运行结果:

--> run
--> java.lang.NullPointerException
java.lang.NullPointerException
	at org.jm.demo.ThreadPoolFunc$5.run(ThreadPoolFunc.java:77)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

# Thread 默认线程异常处理器

Java 中可以通过 Thread.setDefaultUncaughtExceptionHandler() 静态方法设置一个默认的线程异常捕获监听处理。当线程出现异常时,如果我们没有指定线程的异常处理器,而且线程组也没有设置异常处理器,那么就会使用默认的线程异常处理器。

示例代码:

// 设置默认线程异常处理
Thread.setDefaultUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
    @Override
    public void uncaughtException(Thread t, Throwable e) {
        // 线程默认异常捕获处理
        System.out.println("--> " + e);
        e.printStackTrace();
    }
});
ExecutorService pool = Executors.newFixedThreadPool(1);
pool.execute(new Runnable() {
    @Override
    public void run() {
        System.out.println("--> run");
        String str = null;
        str.toString();
    }
});

运行结果:

--> run
--> java.lang.NullPointerException
java.lang.NullPointerException
	at org.jm.demo.ThreadPoolFunc$7.run(ThreadPoolFunc.java:97)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

# 10. 线程池状态

线程池有这 5 种状态:RUNNINGSHUTDOWNSTOPTIDYINGTERMINATED

线程状态 说明
RUNNING 1. 该状态的线程池会接收新任务,并处理阻塞队列中的任务。
2. 调用线程池的shutdown()方法,可以切换到SHUTDOWN状态。
3. 调用线程池的shutdownNow()方法,可以切换到STOP状态。
SHUTDOWN 1. 该状态的线程池不会接收新任务,但会处理阻塞队列中的任务。
2. 队列为空,并且线程池中执行的任务也为空,进入TIDYING状态。
STOP 1. 该状态的线程不会接收新任务,也不会处理阻塞队列中的任务,而且会中断正在运行的任务;
2. 线程池中执行的任务为空,进入TIDYING状态;
TIDYING 1. 该状态表明所有的任务已经运行终止,记录的任务数量为0。
2. terminated()执行完毕,进入TERMINATED状态。
TERMINATED 该状态表示线程池彻底终止。

状态流程示意图:

ThreadPoolState

# 11. 线程池的使用

# Executors 使用示例

/**
* Executors 线程池使用示例。
*/

// Runnable任务执行对象
Runnable runnable = new Runnable() {
    @Override
    public void run() {
        System.out.println("--> runnable run");
        String str = null;
        str.toString();
    }
};

// Callable任务执行对象
Callable<Integer> callable = new Callable<Integer>() {
    @Override
    public Integer call() throws Exception {
        System.out.println("--> callable call");
        return 1;
    }
};

// 1. newFixedThreadPool
ExecutorService pool1 = Executors.newFixedThreadPool(1, new ThreadFactory() {
    @Override
    public Thread newThread(Runnable r) {
        return new Thread(r);
    }
});

// 2. newCachedThreadPool
ExecutorService pool2 = Executors.newCachedThreadPool(new ThreadFactory() {
    @Override
    public Thread newThread(Runnable r) {
        return new Thread(r);
    }
});

// 3. newSingleThreadExecutor
ExecutorService pool3 = Executors.newSingleThreadExecutor(new ThreadFactory() {
    @Override
    public Thread newThread(Runnable r) {
        return new Thread(r);
    }
});

/**
* 线程池任务执行使用示例。
*/
// 通过execute执行任务
pool1.execute(runnable);
// 通过submit执行runnable任务
Future<?> submit1 = pool1.submit(runnable);
// 通过submit执行callable任务
Future<Integer> submit2 = pool1.submit(callable);
// 通过submit执行runnable任务,执行成功后会通过Future返回resultObject
int resultObject = 100;
Future<Integer> submit3 = pool1.submit(runnable, resultObject);
try {
    // get()方法总是返回null
    Integer result1 = submit2.get();
    System.out.println("--> result1 = " + result1);

    // 正常成功执行完毕情况,返回callable中的call()的返回值
    Integer result2 = submit2.get();
    System.out.println("---> result2 = " + result2);

    // 正常成功执行完毕情况,返回调用时传入的resultObject
    Integer result3 = submit3.get();
    System.out.println("---> result3 = " + result3);
} catch (InterruptedException e) {
    e.printStackTrace();
} catch (ExecutionException e) {
    e.printStackTrace();
} catch (Exception e) {
    e.printStackTrace();
}

// 4. newScheduledThreadPool
ScheduledExecutorService pool4 = Executors.newScheduledThreadPool(1, new ThreadFactory() {
    @Override
    public Thread newThread(Runnable r) {
        return new Thread(r);
    }
});

/**
* 线程池计划任务基本示例。
*/
// 在指定延迟时间后执行runnable任务
ScheduledFuture<?> schedule1 = pool4.schedule(runnable, 1, TimeUnit.SECONDS);

// 在指定延迟时间后执行callable任务
ScheduledFuture<Integer> schedule2 = pool4.schedule(callable, 1, TimeUnit.SECONDS);

// 固定速率周期执行runnable任务
ScheduledFuture<?> schedule3 = pool4.scheduleAtFixedRate(runnable, 0, 3, TimeUnit.SECONDS);

// 指定延迟后执行runnable任务
ScheduledFuture<?> scheduled4 = pool4.scheduleWithFixedDelay(runnable, 1, 3, TimeUnit.SECONDS);

# ThreadPoolExecutor 使用示例

/**
* ThreadPoolExecutor 线程池使用示例。
*/

// Runnable任务执行对象
Runnable runnable = new Runnable() {
    @Override
    public void run() {
        System.out.println("--> runnable run");
        String str = null;
        str.toString();
    }
};

// Callable任务执行对象
Callable<Integer> callable = new Callable<Integer>() {
    @Override
    public Integer call() throws Exception {
        System.out.println("--> callable call");
        return 1;
    }
};

ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(
    3,  // 指定核心线程数为3
    10, // 指定线程池最大线程数为10
    10, // 指定非核心线程存活时间数值为10
    TimeUnit.SECONDS,   // 非核心线程存活时间的单位为秒
    new LinkedBlockingQueue<>(10), // 指定阻塞队列为链表队列,也可以自定义实现
    new ThreadFactory() {   // 自定义线程创建工厂对象,也可以使用默认的Executors.defaultThreadFactory()工厂对象
        @Override
        public Thread newThread(Runnable r) {
            return new Thread(r);
        }
    },
    new RejectedExecutionHandler() {    // 自定义拒绝策略,也可以使用默认的几种拒绝策略
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            // 自定义拒绝策略,丢弃任务,打印线程池饱和信息
            System.out.println("pool is full!");
        }
    }
);

// 通过execute执行任务
poolExecutor.execute(runnable);
// 通过submit执行runnable任务
Future<?> submit1 = poolExecutor.submit(runnable);
// 通过submit执行callable任务
Future<Integer> submit2 = poolExecutor.submit(callable);
// 通过submit执行runnable任务,执行成功后会通过Future返回resultObject
int resultObject = 100;
Future<Integer> submit3 = poolExecutor.submit(runnable, resultObject);

try {
    // get()方法总是返回null
    Integer result1 = submit2.get();
    System.out.println("--> result1 = " + result1);

    // 正常成功执行完毕情况,返回callable中的call()的返回值
    Integer result2 = submit2.get();
    System.out.println("---> result2 = " + result2);

    // 正常成功执行完毕情况,返回调用时传入的resultObject
    Integer result3 = submit3.get();
    System.out.println("---> result3 = " + result3);
} catch (InterruptedException e) {
    e.printStackTrace();
} catch (ExecutionException e) {
    e.printStackTrace();
} catch (Exception e) {
    e.printStackTrace();
}

# 总结

ThreadPoolExecutor 通过几个核心的参数来定义不同类型的线程池,适用于不同的使用场景,其中在任务提交时,会依次判断 corePoolSize, workQueque, 及 maximumPoolSize,不同的状态不同的处理。在复杂庞大的业务处理场景中线程池会提高执行效率减少资源的消耗可重复利用资源等等优点。