线程池的底层是基于线程和任务队列来实现的,创建线程池的创建方式通常有以下两种:
普通 Java 项目,使用 ThreadPoolExecutor 来创建线程池,这点《阿里巴巴Java开发手册》中也有说明,如下图所示:
图片
Spring 项目中,会使用代码可读性更高的 ThreadPoolTaskExecutor 来创建线程池,虽然它的底层也是通过 ThreadPoolExecutor 来实现的,但 ThreadPoolTaskExecutor 可读性更高,因为它不需要在构造方法中设置参数,而是通过属性设置的方式来设置参数的,所以可读性更高。
Spring 内置的线程池 ThreadPoolTaskExecutor 的使用示例如下:
@Configuration public class AsyncConfig { @Bean public TaskExecutor taskExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); // 核心线程数 executor.setCorePoolSize(5); // 最大线程数 executor.setMaxPoolSize(10); // 队列容量 executor.setQueueCapacity(20); // 线程池维护线程所允许的空闲时间 executor.setKeepAliveSeconds(60); // 线程池对拒绝任务(无线程可用)的处理策略 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); // 初始化 executor.initialize(); return executor; } }
当有任务来了之后,线程池的执行流程是这样的:
先判断当前线程数是否大于核心线程数,如果结果为 false,则新建线程并执行任务。
如果大于核心线程数,则判断任务队列是否已满,如果结果为 false,则把任务添加到任务队列中等待线程执行。
如果任务队列已满,则判断当前线程数量是否超过最大线程数,如果结果为 false,则新建线程执行此任务。
如果超过最大线程数,则将执行线程池的拒绝策略。
如下图所示:
图片
当线程池无法接受新任务时,会触发拒绝策略,内置的拒绝策略有四种:
AbortPolicy:默认策略,直接抛出 RejectedExecutionException 异常。
CallerRunsPolicy:由调用者线程执行任务。
DiscardPolicy:默默地丢弃任务,没有任何异常抛出。
DiscardOldestPolicy:尝试抛弃队列中最旧的任务,然后重新尝试提交当前任务。
除了内置的拒绝策略之外,我们还可以设置自定义拒绝策略,它的实现如下:
import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadPoolExecutor; public class CustomRejectedExecutionHandler implements RejectedExecutionHandler { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { // 在这里处理拒绝的任务 System.err.println("任务被拒绝执行: " + r.toString()); // 可以选择记录日志、抛出自定义异常或采取其他措施 // 例如,可以将任务保存到某个队列中,稍后再尝试重新执行 } }
使用自定义拒绝策略:
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class ThreadPoolDemo { public static void main(String[] args) { // 配置线程池参数 int corePoolSize = 5; int maximumPoolSize = 10; long keepAliveTime = 60L; TimeUnit unit = TimeUnit.SECONDS; int queueCapacity = 25; // 创建一个阻塞队列 ArrayBlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(queueCapacity); // 创建 ThreadPoolExecutor 实例 ThreadPoolExecutor executor = new ThreadPoolExecutor( corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, new CustomRejectedExecutionHandler() // 使用自定义的拒绝策略 ); // 提交任务 for (int i = 0; i < 50; i++) { final int taskId = i; executor.execute(() -> { System.out.println("执行任务: " + taskId + " 由线程 " + Thread.currentThread().getName() + " 执行"); try { Thread.sleep(1000); // 模拟耗时任务 } catch (InterruptedException e) { e.printStackTrace(); } }); } // 关闭线程池(这不会立即停止所有正在执行的任务) executor.shutdown(); } }
实际项目中线程池会使用哪种拒绝策略?为什么?线程池是通过什么机制来创建线程的?线程池创建线程时可以设置哪些属性?