1.?线程̳߳ص?һ??Դ??
2.java线程池(一):java线程池基本使用及Executors
3.ThreadPoolExecutor简介&源码解析
4.深度分析Binder线程池的启动流程
5.线程池newCachedThreadPool
6.太强了!阿里老哥分享的池第程池JDK源码学习指南,含8大核心内容讲解
?章源̳߳ص?һ??Դ??
本文旨在通过手写一个线程池,来深入理解ThreadPoolExecutor线程池的码线实现原理。首先,源码线程池的线程rtmp源码查询核心目标是资源管理和性能优化,通过池化技术减少线程创建和销毁的池第程池开销。手写线程池的章源实现步骤包括确定核心流程和添加辅助流程,虽然代码简单,码线但能体现核心的源码池化思想。
手写线程池的线程实现涉及到状态管理,如线程池数量和状态的池第程池记录,这部分在ThreadPoolExecutor中通过AtomicInteger的章源高3位和低位实现。线程池的码线状态流转包括RUNNING、BLOCKED等,源码并通过execute方法提交任务,这个过程与我们自己的实现类似,包括任务的执行、加入队列和策略决策。
添加执行任务的过程分为增加线程数量和启动线程,这部分与我们最初的设想基本一致。在runWorker方法中,执行线程的核心是调用task.run(),同时还会涉及队列任务的获取。这些步骤与手写线程池的逻辑相吻合,帮助我们更好地理解线程池的工作机制。
总结来说,通过对比分析和实践,我们对ThreadPoolExecutor的线程池实现有了更深入的理解,包括状态管理、任务提交和执行流程等。深入阅读源码后,你会发现线程池的复杂性和优化设计。如果你对Java线程池感兴趣,这将是一次很好的学习和实践机会。
java线程池(一):java线程池基本使用及Executors
@[toc] 在前面学习线程组的时候就提到过线程池。实际上线程组在我们的kubeedge调度源码日常工作中已经不太会用到,但是线程池恰恰相反,是我们日常工作中必不可少的工具之一。现在开始对线程池的使用,以及底层ThreadPoolExecutor的源码进行分析。1.为什么需要线程池我们在前面对线程基础以及线程的生命周期有过详细介绍。一个基本的常识就是,线程是一个特殊的对象,其底层是依赖于JVM的native方法,在jvm虚拟机内部实现的。线程与普通对象不一样的地方在于,除了需要在堆上分配对象之外,还需要给每个线程分配一个线程栈、以及本地方法栈、程序计数器等线程的私有空间。线程的初始化工作相对于线程执行的大多数任务而言,都是一个耗时比较长的工作。这与数据库使用一样。有时候我们连接数据库,仅仅只是为了执行一条很小的sql语句。但是在我们日常的开发工作中,我们的绝大部分工作内容,都会分解为一个个短小的执行任务来执行。这样才能更加合理的复用资源。这种思想就与我们之前提到的协程一样。任务要尽可能的小。但是在java中,任务不可能像协程那样拆分得那么细。那么试想,如果说,有一个已经初始化好的很多线程,在随时待命,那么当我们有任务提交的时候,这些线程就可以立即工作,无缝接管我们的任务请求。那么效率就会大大增加。这些个线程可以处理任何任务。async源码详解这样一来我们就把实际的任务与线程本身进行了解耦。从而将这些线程实现了复用。 这种复用的一次创建,可以重复使用的池化的线程对象就被成为线程池。 在线程池中,我们的线程是可以复用的,不用每次都创建一个新的线程。减少了创建和销毁线程的时间开销。 同时,线程池还具有队列缓冲策略,拒绝机制和动态线程管理。可以实现线程环境的隔离。当一个线程有问题的时候,也不会对其他的线程造成影响。 以上就是我们使用线程池的原因。一句话来概括就是资源复用,降低开销。
2.java中线程池的实现在java中,线程池的主要接口是Executor和ExecutorService在这两个接口中分别对线程池的行为进行了约束,最主要的是在ExecutorService。之后,线程池的实际实现类是AbstractExecutorService类。这个类有三个主要的实现类,ThreadpoolExecutorService、ForkJoinPool以及DelegatedExecutorService。
后面我们将对这三种最主要的实现类的源码以及实现机制进行分析。
3.创建线程的工厂方法Executors在java中, 已经给我们提供了创建线程池的工厂方法类Executors。通过这个类以静态方法的模式可以为我们创建大多数线程池。Executors提供了5种创建线程池的方式,我们先来看看这个类提供的工厂方法。
3.1 newFixedThreadPool/** * Creates a thread pool that reuses a fixed number of threads * operating off a shared unbounded queue.At any point, at most * { @code nThreads} threads will be active processing tasks. * If additional tasks are submitted when all threads are active, * they will wait in the queue until a thread is available. * If any thread terminates due to a failure during execution * prior to shutdown, a new one will take its place if needed to * execute subsequent tasks.The threads in the pool will exist * until it is explicitly { @link ExecutorService#shutdown shutdown}. * * @param nThreads the number of threads in the pool * @return the newly created thread pool * @throws IllegalArgumentException if { @code nThreads <= 0} */public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());}这个方法能够创建一个固定线程数量的无界队列的线程池。参数nthreads是最多可同时处理的活动的线程数。如果在所有线程都在处理任务的情况下,提交了其他的任务,那么这些任务将处于等待队列中。dede企业源码直到有一个线程可用为止。如果任何线程在关闭之前的执行过程中,由于失败而终止,则需要在执行后续任务的时候,创建一个新的线程来替换。线程池中的所有线程都将一直存在,直到显示的调用了shutdown方法。 上述方法能创建一个固定线程数量的线程池。内部默认的是使用LinkedBlockingQueue。但是需要注意的是,这个LinkedBlockingQueue底层是链表结构,其允许的最大队列长度为Integer.MAX_VALUE。
public LinkedBlockingQueue() { this(Integer.MAX_VALUE);}这样在使用的过程中如果我们没有很好的控制,那么就可能导致内存溢出,出现OOM异常。因此这种方式实际上已经不被提倡。我们在使用的过程中应该谨慎使用。 newFixedThreadPool(int nThreads, ThreadFactory threadFactory)方法:
/** * Creates a thread pool that reuses a fixed number of threads * operating off a shared unbounded queue, using the provided * ThreadFactory to create new threads when needed.At any point, * at most { @code nThreads} threads will be active processing * tasks.If additional tasks are submitted when all threads are * active, they will wait in the queue until a thread is * available.If any thread terminates due to a failure during * execution prior to shutdown, a new one will take its place if * needed to execute subsequent tasks.The threads in the pool will * exist until it is explicitly { @link ExecutorService#shutdown * shutdown}. * * @param nThreads the number of threads in the pool * @param threadFactory the factory to use when creating new threads * @return the newly created thread pool * @throws NullPointerException if threadFactory is null * @throws IllegalArgumentException if { @code nThreads <= 0} */public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) { return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>(),threadFactory);}这个方法与3.1中newFixedThreadPool(int nThreads)的方法的唯一区别就是,增加了threadFactory参数。在前面方法中,对于线程的创建是采用的默认实现Executors.defaultThreadFactory()。而在此方法中,可以根据需要自行定制。
3.2 newSingleThreadExecutor/** * Creates an Executor that uses a single worker thread operating * off an unbounded queue. (Note however that if this single * thread terminates due to a failure during execution prior to * shutdown, a new one will take its place if needed to execute * subsequent tasks.)Tasks are guaranteed to execute * sequentially, and no more than one task will be active at any * given time. Unlike the otherwise equivalent * { @code newFixedThreadPool(1)} the returned executor is * guaranteed not to be reconfigurable to use additional threads. * * @return the newly created single-threaded Executor */public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));}此方法将会创建指有一个线程和一个无届队列的线程池。需要注意的是,如果这个执行线程在执行过程中由于失败而终止,那么需要在执行后续任务的时候,用一个新的线程来替换。 那么这样一来,上述线程池就能确保任务的顺序性,并且在任何时间都不会有多个线程处于活动状态。与newFixedThreadPool(1)不同的是,使用newSingleThreadExecutor返回的ExecutorService不能被重新分配线程数量。而使用newFixExecutor(1)返回的ExecutorService,其活动的线程的数量可以重新分配。后面专门对这个问题进行详细分析。redis源码级别 newSingleThreadExecutor(ThreadFactory threadFactory) 方法:
/** * Creates an Executor that uses a single worker thread operating * off an unbounded queue, and uses the provided ThreadFactory to * create a new thread when needed. Unlike the otherwise * equivalent { @code newFixedThreadPool(1, threadFactory)} the * returned executor is guaranteed not to be reconfigurable to use * additional threads. * * @param threadFactory the factory to use when creating new * threads * * @return the newly created single-threaded Executor * @throws NullPointerException if threadFactory is null */public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) { return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>(),threadFactory));}这个方法与3.3中newSingleThreadExecutor的区别就在于增加了一个threadFactory。可以自定义创建线程的方法。
3.3 newCachedThreadPool/** * Creates a thread pool that creates new threads as needed, but * will reuse previously constructed threads when they are * available.These pools will typically improve the performance * of programs that execute many short-lived asynchronous tasks. * Calls to { @code execute} will reuse previously constructed * threads if available. If no existing thread is available, a new * thread will be created and added to the pool. Threads that have * not been used for sixty seconds are terminated and removed from * the cache. Thus, a pool that remains idle for long enough will * not consume any resources. Note that pools with similar * properties but different details (for example, timeout parameters) * may be created using { @link ThreadPoolExecutor} constructors. * * @return the newly created thread pool */public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE,L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());}这个方法用来创建一个线程池,该线程池可以根据需要自动增加线程。以前的线程也可以复用。这个线程池通常可以提高很多执行周期短的异步任务的性能。对于execute将重用以前的构造线程。如果没有可用的线程,就创建一个 新的线程添加到pool中。秒内,如果该线程没有被使用,则该线程将会终止,并从缓存中删除。因此,在足够长的时间内,这个线程池不会消耗任何资源。可以使用ThreadPoolExecutor构造函数创建具有类似属性但是详细信息不同的线程池。 ?需要注意的是,这个方法创建的线程池,虽然队列的长度可控,但是线程的数量的范围是Integer.MAX_VALUE。这样的话,如果使用不当,同样存在OOM的风险。比如说,我们使用的每个任务的耗时比较长,任务的请求又非常快,那么这样势必会造成在单位时间内创建了大量的线程。从而造成内存溢出。 newCachedThreadPool(ThreadFactory threadFactory)方法:
/** * Creates a thread pool that creates new threads as needed, but * will reuse previously constructed threads when they are * available, and uses the provided * ThreadFactory to create new threads when needed. * @param threadFactory the factory to use when creating new threads * @return the newly created thread pool * @throws NullPointerException if threadFactory is null */public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) { return new ThreadPoolExecutor(0, Integer.MAX_VALUE,L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>(),threadFactory);}这个方法区别同样也是在于,增加了threadFactory可以自行指定线程的创建方式。
2.4 newScheduledThreadPool/** * Creates a thread pool that can schedule commands to run after a * given delay, or to execute periodically. * @param corePoolSize the number of threads to keep in the pool, * even if they are idle * @return a newly created scheduled thread pool * @throws IllegalArgumentException if { @code corePoolSize < 0} */public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize);}创建一个线程池,该线程池可以将任务在指定的延迟时间之后运行。或者定期运行。这个方法返回的是ScheduledThreadPoolExecutor。这个类是ThreadPoolExecutor的子类。在原有线程池的的基础之上,增加了延迟和定时功能。我们在后面分析了ThreadPoolExecutor源码之后,再来分析这个类的源码。 与之类似的方法:
/** * Creates a thread pool that can schedule commands to run after a * given delay, or to execute periodically. * @param corePoolSize the number of threads to keep in the pool, * even if they are idle * @param threadFactory the factory to use when the executor * creates a new thread * @return a newly created scheduled thread pool * @throws IllegalArgumentException if { @code corePoolSize < 0} * @throws NullPointerException if threadFactory is null */public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory) { return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);}通过这个方法,我们可以指定threadFactory。自定义线程创建的方式。 同样,我们还可以只指定一个线程:
public static ScheduledExecutorService newSingleThreadScheduledExecutor() { return new DelegatedScheduledExecutorService(new ScheduledThreadPoolExecutor(1));}public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) { return new DelegatedScheduledExecutorService(new ScheduledThreadPoolExecutor(1, threadFactory));}上述两个方法都可以实现这个功能,但是需要注意的是,这两个方法的返回在外层包裹了一个包装类。
3.5 newWorkStealingPool这种方式是在jdk1.8之后新增的。我们先来看看其源码:
public LinkedBlockingQueue() { this(Integer.MAX_VALUE);}0这个方法实际上返回的是ForkJoinPool。该方法创建了一
ThreadPoolExecutor简介&源码解析
线程池是通过池化管理线程的高效工具,尤其在多核CPU时代,利用线程池进行并行处理任务有助于提升服务器性能。ThreadPoolExecutor是线程池的具体实现,它负责线程管理和任务管理,以及处理任务拒绝策略。这个类提供了多种功能,如通过Executors工厂方法配置,执行Runnable和Callable任务,维护任务队列,统计任务完成情况等。
创建线程池需要考虑关键参数,如核心线程数(任务开始执行时立即创建),最大线程数(任务过多时限制新线程生成),线程存活时间,任务队列大小,线程工厂以及拒绝策略。JDK提供了四种拒绝策略,如默认的AbortPolicy,当资源饱和时抛出异常。此外,线程池还提供了beforeExecute和afterExecute钩子函数,用于在任务执行前后执行自定义操作。
当任务提交到线程池时,会经历一系列处理流程,包括任务的执行和线程池状态的管理。例如,如果任务队列和线程池满,会根据拒绝策略处理新任务。使用线程池时,需注意线程池容量与状态的计算,以及线程池工作线程的动态调整。
示例中,自定义线程池并重写钩子函数,创建任务后向线程池提交,可以看到线程池如何根据配置动态调整资源。但要注意,如果任务过多且无法处理,可能会抛出异常。源码分析中,submit方法实际上是调用execute,而execute内部包含Worker类和runWorker方法的逻辑,包括任务的获取和执行。
线程池的容量上限并非Integer.MAX_VALUE,而是由ctl变量的低位决定。 Doug Lea的工具函数简化了ctl的操作,使得计算线程池状态和工作线程数更加便捷。通过深入了解ThreadPoolExecutor,开发者可以更有效地利用线程池提高应用性能。
深度分析Binder线程池的启动流程
理论基础Binder
Binder它是Android中的一种进程间通信机制,它主要采用的是CS架构模式。Binder框架中主要涉及到4个角色Client、Server、ServiceManager及Binder驱动,其中Client、Server、ServiceManager运行在用户空间,Binder驱动运行在内核空间。
线程池线程池它是一种用于多线程处理形式,处理过程中将任务添加到队列,然后在创建线程后自动启动这些任务。线程池线程都是后台线程。每个线程都使用默认的堆栈大小,以默认的优先级运行,并处于多线程单元中。
简单的说:线程池就是创建一些线程,它们的集合称为线程池。
Binder线程池启动流程我们知道一个新的app应用程序进程在创建完成之后,它会通过调用RunTimeInit类的静态成员函数zygoteInitNative来进行启动Binder线程池。
Binder线程池启动过程中,主要调用几个关键函数:ZygoteInitNative--->onZygoteInit--->startThreadPool。
下面的源码分析主要是以android5.0版本为例。
ZygoteInitNative源码分析由于ZygoteInitNative函数是java实现的代码,实践上最终调用的是由C++实现的JNI方法。以下代码来源于系统的/frameworks/base/core/jni/androidRuntime.cpp文件中
staticvoidcom_android_internal_os_RuntimeInit_nativeZygoteInit(JNIEnv*env,jobjectclazz){ //gCurRuntime是个全局的变量,后面跟上的是另外实现的方法。gCurRuntime->onZygoteInit();}onZygoteInit源码分析onZygoteInit函数在需要源码的位置:/frameworks/base/cmds/app_process/app_main.cpp文件中。
该函数是个虚函数,并且是一个无返回值和无参数的函数virtualvoidonZygoteInit(){ //Re-enabletracingnowthatwe'renolongerinZygote.atrace_set_tracing_enabled(true);//获取进程的状态信息sp<ProcessState>proc=ProcessState::self();//打印日志信息ALOGV("Appprocess:startingthreadpool.\n");//启动线程池proc->startThreadPool();}startThreadPool源码分析startThreadPool系统实现在\frameworks\native\libs\binder\ProcessState.cpp文件中。
每一个支持Binder进程间通信机制的进程内都有一个唯一的ProcessState对象,当这个ProcessState对象的成员函数StartThreadPool函数被第一次调用的时候,它就会在当前进程中启动一个线程池,并将mThreadPoolStarted这个成员变量设置为true。
//该函数是个无参数,无返回值的函数voidProcessState::startThreadPool(){ AutoMutex_l(mLock);//判断线程池是否启动状态,启动的话就将标志信息设置为true属性。if(!mThreadPoolStarted){ mThreadPoolStarted=true;spawnPooledThread(true);}}总结Binder在android底层中是一个非常重要的机制,我们在实际的项目调用过程中,我们在app应用程序中只要实现自己的Binder本地对象的时候,跟其他服务一样,只需要将它进行启动起来,并且进行注册到ServerMananger就可以了。至于内部的实现一般是不需要去关心的。
线程池newCachedThreadPool
新线程池newCachedThreadPool的源码揭示了其独特设计和功能。它的核心特点在于动态创建和重用线程,以提高执行短暂异步任务的程序性能。此池允许在先前构造的线程可用时重复使用它们,且最大线程数为Integer.MAX_VALUE,意味着资源使用相对灵活。
在newCachedThreadPool中,线程的存活时间设置为秒,超过此时间未使用的线程将被终止并从池中移除。这一特性有助于避免资源浪费,保持空闲时间足够长的池不会消耗任何资源。此外,新线程池不包含核心线程,其操作基于SynchronousQueue队列,确保线程间高效同步。
使用newCachedThreadPool时,程序执行到大约秒后自动终止,因为线程池已完成所有任务。存活线程在超过秒的闲置后被终止和移除,这体现了其设计原理。
为何newCachedThreadPool选择SynchronousQueue而不是其他线程池通常采用的LinkedBlockQueue?SynchronousQueue是一个特殊的阻塞队列,旨在实现线程间高效同步。它没有内部容量,且插入操作需等待相应的删除操作。此特性使其成为切换设计的理想选择,允许线程在需要时安全地传递信息、事件或任务,尤其适用于需要多线程间同步的应用场景。
SynchronousQueue通过实现Collection和Iterator接口支持所有可选方法,包括支持可选的公平性策略。默认情况下,不保证生产者和使用者线程的FIFO顺序访问,但通过将公平性策略设置为true,可以确保按此顺序授予访问权限。
总之,newCachedThreadPool通过动态线程重用和SynchronousQueue的高效同步机制,提供了一种灵活且高效的处理短暂异步任务的方法。其设计旨在优化资源使用,通过在任务完成后的秒内自动清理资源,保持系统性能高效。
太强了!阿里老哥分享的JDK源码学习指南,含8大核心内容讲解
Java开发中,JDK源码的重要性不言而喻。作为Java运行环境的基石,JDK涵盖了Java的全部运行环境和开发工具,没有它,程序编译都无从谈起。为此,本文将分享一份来自阿里的资深程序员整理的JDK源码学习指南。
这份指南详尽介绍了JDK源码的多个核心内容,包括多线程基础、Atomic类、Lock与Condition接口、同步工具类、并发容器、线程池与Future、ForkJoinPool分治算法、异步编程工具CompletableFuture等。需要这份资料的朋友,请点击此处获取完整版。
以下是学习指南的具体章节:
第1章 多线程基础
第2章 Atomic类
第3章 Lock与Condition
第4章 同步工具类
第5章 并发容器
第6章 线程池与Future
第7章 ForkJoinPool
第8章 CompletableFuture
以上就是这份JDK源码学习笔记的概述,感兴趣的朋友可以点击此处获取完整版资料。