java官网中关于线程池的介绍(一文读懂Java线程池)

为什么要用线程池

在生产环境,我们经常面临的情况是: 处理某次请求的时间非常短暂,但是请求量很大。

在这种情况下,如果为每个请求单独创建一个线程,有限的硬件资源有可能会被OS创建线程,切换线程状态、销毁线程这些操作所占用,用于业务处理的资源反而减少了。

所以理想的处理方式是: 将请求的线程数量控制在一个范围内,既保证后续的请求不会等待太长时间,又保证物理机将足够的资源用于请求处理本身。

线程池设计与结构

开发者通常利用 ThreadPoolExecutor 类的构造函数来创建不同配置的线程池:

ThreadPoolExecutor(int corePoolSize,

int maximußmPoolSize,

long keepAliveTime,

TimeUnit unit,

BlockingQueue<Runnable> workQueue,

ThreadFactory threadFactory,

RejectedExecutionHandler handler)

透过这个构造方法,可以大致勾勒出线程池的基本组成,其大致的结构如下图所示。

java官网中关于线程池的介绍(一文读懂Java线程池)(1)

这里一定要注意: 存在于线程池中的一定是Thread对象,而不是你要处理的任务 。 因此才叫线程池而不是任务池,线程池会分配池中的一个空闲线程对象来运行提交的任务。

构成线程池的几个要素:

  • 等待队列:即将要执行的任务队列,因为某些原因,线程池并没有马上运行这些任务
  • 核心线程:执行任务的线程对象,其数量由 corePoolSize 指定
  • 非核心线程:一旦任务数量过多,线程池将创建非核心线程临时帮助运行任务

注意: 实际上,并没有「核心线程」与「非核心线程」这样的概念,只是为了方便大家理解,而所有区分而已。 因此,全文对这两个概念均加了引号。

线程池的大致工作流程:

1、开发者提交待执行任务,线程池收到这个任务请求后,有以下几种处理情况:

  • 当前线程池中运行的线程数量还没有达到 corePoolSize 大小时,线程池会创建一个新线程执行提交的任务,无论之前创建的线程是否处于空闲状态。
  • 当前线程池中运行的线程数量已经达到 corePoolSize 大小时,线程池会把任务加入到等待队列中,直到某一个线程空闲了,线程池会根据我们设置的等待队列规则,从队列中取出一个新的任务执行。
  • 根据队列规则,这个任务无法加入到等待队列,这时线程池就会创建一个“非核心线程”直接运行这个任务。注意,如果这种情况下任务执行成功,那么当前线程池中的线程数量一定大于 corePoolSize。
  • 如果这个任务,无法被”核心线程”直接执行,又无法加入等待队列,又无法创建“非核心线程”直接执行,线程池将根据拒绝处理器定义的策略处理这个任务。比如在 ThreadPoolExecutor 中,如果你没有为线程池设置 RejectedExecutionHandler。这时线程池会抛出 RejectedExecutionException 异常,即线程池拒绝接受这个任务。实际上抛出 RejectedExecutionException异常的操作,是 ThreadPoolExecutor 线程池中一个默认的RejectedExecutionHandler 实现。

2、一旦线程池中某个线程完成了任务的执行,它就会试图到任务等待队列中拿去下一个等待任务 ( 所有的等待任务都实现了 BlockingQueue 接口,这是一个可阻塞的队列接口 ) ,它会调用等待队列的 poll 方法,并停留在哪里。

3、当线程池中的线程超过您设置的 corePoolSize 参数,说明当前线程池中有所谓的“非核心线程”。 那么当某个线程处理完任务后,如果等待 keepAliveTime 时间后仍然没有新的任务分配给它,那么这个线程将会被回收。 线程池回收线程时,并不是所谓的“非核心线程”才会被回收,而是谁的空闲时间达到 keepAliveTime 这个阀值,就会被回收,直到线程池中线程的数量等于您设置的corePoolSize参数时,回收过程才会停止。

对所谓的“核心线程”和“非核心线程”是一视同仁的,直到线程池中线程的数量等于您设置的corePoolSize参数时,回收过程才会停止。

Executor 框架基本组成

使用 ThreadPoolExecutor 类的构造方法可以创建不同配置的线程池,但平时却很少使用。 大多数应用场景下,使用Java并发包中的 Executors 提供的5个静态工厂方法就足够了。

首先,来看看 Executor 框架的基本组成:

java官网中关于线程池的介绍(一文读懂Java线程池)(2)

Executor 是一个基础接口,只有一个 execute(Runnable) 方法,用于任务执行,它屏蔽了许多任务提交、线程创建和调度等不相关细节。

ExecutorService 接口则更加完善,提供了一些管理功能,比如 shutdown 方法用于关闭线程池; 还提供了更加全面的任务提交机制,比如用 submit 方法提交任务,返回的是一个 Future 对象,用于获取任务执行结果; 甚至还有批量处理任务的功能,比如 invokeAll 或者 invokeAny 等方法。

ThreadPoolExecutor、ScheduledThreadPoolExecutor、ForkJoinPool 则是 Java 提供的几种基础线程池实现,以满足复杂多变的应用场景。

Executors 是一个工具类,从简化使用的角度,提供了各种静态工厂方法来创建不同配置的线程池。

使用 ThreadPoolExecutor 创建线程池

前面的内容已经给出 ThreadPoolExecutor 的构造方法,这里对构造方法的后3个参数作详细的说明,帮你更好的理解和使用线程池。

线程池的等待队列

只要实现了 BlockingQueue 接口的队列,都可以作为线程池的等待队列,常见的比如: ArrayBlockingQueue、LinkedBlockingQueue、SynchronousQueue、PriorityBlockingQueue、LinkedTransferQueue等,至于每种队列的区别以及原理,不再本文讨论范文内。

但网上有些内容真的很容易误导人,这里提两句。

SynchronousQueue 也可以存储数据,且是无锁实现,只是size()方法直接返回0而已。 因此,网上的很多内容在把 SynchronousQueue 和 LinkedBlockingQueue 与 LinkedTransferQueue 作对比时,部分内容的正确性是有待确认的,这点需要读者自己注意。

PriorityBlockingQueue 会按照优先级进行对内部元素进行排序,优先级最高的元素将始终排在队列的头部。 但它不会保证优先级一样的元素的排序,也不保证当前队列中除了优先级最高的元素以外的元素,处于正确排序的位置。 因此,它并不是真正意义的排序。

线程池的 ThreadFactory

线程池最主要的一项工作,就是在满足某些条件的情况下创建线程。 而在 ThreadPoolExecutor 线程池中,创建线程的工作交给 ThreadFactory 来完成。 要使用线程池,就必须要指定 ThreadFactory。 如果没有指定,会使用默认的 ThreadFactory: DefaultThreadFactory (这个类在Executors工具类中)。

当然,在某些特殊业务场景下,您还可以使用一个自定义的 ThreadFactory 线程工厂,如下代码片段:

public class CustomThreadFactory implements ThreadFactory {

@Override

public Thread newThread(Runnable r) {

return new Thread(r);

}

}

线程池的拒绝策略

在 ThreadPoolExecutor 线程池中还有一个重要的接口: RejectedExecutionHandler。 当提交任务给线程池时,出现以下情况时,线程池会拒绝处理这个任务,并触发线程池创建时定义的拒绝策略:

  • 新任务无法直接被线程池中“核心线程”直接处理,又无法加入等待队列,也无法创建新的线程执行
  • 线程池已经调用 shutdown 方法停止了工作
  • 线程池不是处于正常的工作状态

实际上,在 ThreadPoolExecutor 中已经提供了四种可以直接使用的 RejectedExecutionHandler 接口的实现:

  • CallerRunsPolicy:在非线程池以外直接调用这个任务的run方法
  • DiscardPolicy:直接丢弃这个被拒绝的任务,且没有任何提示
  • DiscardOldestPolicy:丢弃等待队列队首的任务,将当前被拒绝的任务提交到线程池执行
  • AbortPolicy:拒绝任务并抛出 RejectedExecutionException 异常

其中,CallerRunsPolicy 直接调用任务的 run 方法,可能会造成线程安全问题; DiscardPolicy 默默的忽略掉被拒绝任务,也没有输出日志或者任何提示,开发者就无法得知线程池在处理过程出现的错误; DiscardOldestPolicy 貌似是科学的,但如果等待队列出现了容量问题,很多任务会被直接丢弃,这时,业务会出现BUG,但开发者却很难定位到。

因此,比较科学的还是 AbortPolicy 提供的处理方式: 抛出异常,由开发人员进行处理。 当然,特殊情况下,我也建议使用自定义拒绝策略,可以缓存任务到Redis,或者发送消息到MQ通知业务方。

扩展 ThreadPoolExecutor 线程池

在 ThreadPoolExecutor 中提供了3个方法供子类重写,它们可以帮助开发者在线程池处理任务的不同阶段,进行额外的业务处理操作:

  • beforeExecute:当线程池正要开始执行某个任务时,线程池会触发这个方法的调用。
  • afterExecute:当线程池完成了某个任务的执行后,线程池就会触发这个方法。
  • terminated:当线程池本身停止执行的时候,该方法就会被调用。

Execute 和 Submit 方法的区别

ThreadPoolExecutor 提供 execute 和 submit 两个方法用于提交任务,其中:

execute : 提交的任务实现 Runnable 接口,任务没有任何返回值,因此,无法获取任何执行结果。

submit : 提交的任务实现 Callable 接口,在任务运行完成后,会返回执行结果。

当然 submit 方法也可以提交实现 Runnable 接口的任务,但其处理方式与 execute 方法的处理方式完全不同: 使用 submit 方法提交的实现了 Runnable 接口的任务,将会被封装到线程池内部使用 Executors.callable 方法创建的 RunnableAdapter 对象中,而 RunnableAdapter 又继承自 Callable。

线程池实践

理解 Executors 创建的线程池

如果使用 Executors 创建线程池,那么一定要理解每个方法创建出来的线程池的配置是什么。

比如,newCachedThreadPool 方法创建的线程池,其 corePoolSize = 0,而 maximumPoolSize = Integer.MAX,而其等待队列是 SynchronousQueue,不能缓冲数据。 它会试图缓存线程并重用,当无缓存线程可用时,就会创建新的工作线程; 如果线程闲置的时间超过 60 秒,则被终止并移出缓存; 长时间闲置时,这种线程池,不会消耗什么资源。 但需要着重注意的是,过快的任务提交速度,不但会导致线程数的急剧增加,也增加了程序OOM的风险。

而newFixedThreadPool ( int nThreads) 方法中,corePoolSize=maximumPoolSize=nThreads,任何时候最多有nThreads 个工作线程是活动的,这也意味着如果任务数量超过了活动队列数目,将在工作队列中等待空闲线程出现; 如果有工作线程退出,将会有新的工作线程被创建,以补足指定的数目 nThreads。

实际应用场景中,可能有很多人滥用了这些方法,所以,阿里Java规约建议使用 ThreadPoolExecutor 构造方法来代替 Executors。

不要随意创建线程池

站在应用或者服务的角度,对整个服务中线程的用途归类,每个分类创建合适的线程池即可。

看到过挺多代码,只要用到线程,就是用 Executors 在类中创建了一个线程池,这样挺不好的。

记住一点: 服务中创建的线程池越多,就越不好监控,也越不好配置合适的线程池数量。

线程池大小如何配置

也许你已经看到过无数计算最佳线程数的公式,首先,随便选一个,真的无所谓。

然后,随时监控线程池状态。 最简单的方式,每次提交任务时,在日志中记录下线程池的几个关键参数: corePoolSize、maximumPoolSize、线程池当前线程数量、工作队列长度。

最后,相信我,有上面4个参数,再结合硬件资源,你能够很清楚的知道,自己的线程池配置是否合适。

避免任务堆积

开发者要随时注意,任务的耗时,以免线程池被耗尽,等待队列被填满。

如果任务依赖于第三方服务,一定要设置超时时间。

避免任务丢失

在一些特殊的情况下,线程池的负载短时间内快速升高,有可能会触发拒绝策略,如果提交的任务不允许丢失,那么需要自定义拒绝策略,将任务暂存到数据库或者缓存中。

也就是在使用线程池时,要尽量兼顾到线程池的所有使用场景。

正确的关闭线程池

调用线程池的 shutdown 方法后,线程池不再接受新任务,如果继续提交,线程池会使用拒绝策略响应。

调用线程池的 shutdownNow 方法,会中断所有线程,然后取出工作队列中所有未完成的任务返回给调用者。

调用这两个方法都不会主动等待任务执行结束,在某些场景下,在关闭线程池时,需要保证所有的任务均执行完毕,可以调用 awaitTermination 方法判断。

如果线程池任务执行结束,awaitTermination 方法将会返回 true,否则在等待指定时间后将会返回 false。

// 阻止接收新任务

threadPool.shutdown();

// 等待线程池中的任务执行完成

while (!threadPool.awaitTermination(60,TimeUnit.SECONDS)){

// do nothing

}

// 示例代码,不建议生产环境直接使用

// 如果有任务卡住,会导致线程池不能关闭

,

免责声明:本文仅代表文章作者的个人观点,与本站无关。其原创性、真实性以及文中陈述文字和内容未经本站证实,对本文以及其中全部或者部分内容文字的真实性、完整性和原创性本站不作任何保证或承诺,请读者仅作参考,并自行核实相关内容。文章投诉邮箱:anhduc.ph@yahoo.com

    分享
    投诉
    首页