欢迎来到皮皮网网首页

【即可源码网】【营销推广系统 源码】【无未来源码】excutorservice源码

来源:渔枫源码分享网源码下载 时间:2025-01-17 09:51:04

1.java线程池(一):java线程池基本使用及Executors
2.ForkjoinPool -1
3.yarn源码分析(四)AppMaster启动
4.Java原理系列ScheduledThreadPoolExecutor原理用法示例源码详解

excutorservice源码

java线程池(一):java线程池基本使用及Executors

       @[toc] 在前面学习线程组的源码时候就提到过线程池。实际上线程组在我们的源码日常工作中已经不太会用到,但是源码线程池恰恰相反,是源码我们日常工作中必不可少的工具之一。现在开始对线程池的源码使用,以及底层ThreadPoolExecutor的源码即可源码网源码进行分析。

1.为什么需要线程池

       我们在前面对线程基础以及线程的源码生命周期有过详细介绍。一个基本的源码常识就是,线程是源码一个特殊的对象,其底层是源码依赖于JVM的native方法,在jvm虚拟机内部实现的源码。线程与普通对象不一样的源码地方在于,除了需要在堆上分配对象之外,源码还需要给每个线程分配一个线程栈、源码以及本地方法栈、源码程序计数器等线程的私有空间。线程的初始化工作相对于线程执行的大多数任务而言,都是一个耗时比较长的工作。这与数据库使用一样。有时候我们连接数据库,仅仅只是为了执行一条很小的sql语句。但是在我们日常的开发工作中,我们的营销推广系统 源码绝大部分工作内容,都会分解为一个个短小的执行任务来执行。这样才能更加合理的复用资源。这种思想就与我们之前提到的协程一样。任务要尽可能的小。但是在java中,任务不可能像协程那样拆分得那么细。那么试想,如果说,有一个已经初始化好的很多线程,在随时待命,那么当我们有任务提交的时候,这些线程就可以立即工作,无缝接管我们的任务请求。那么效率就会大大增加。这些个线程可以处理任何任务。这样一来我们就把实际的任务与线程本身进行了解耦。从而将这些线程实现了复用。 这种复用的一次创建,可以重复使用的池化的线程对象就被成为线程池。 在线程池中,我们的线程是可以复用的,不用每次都创建一个新的无未来源码线程。减少了创建和销毁线程的时间开销。 同时,线程池还具有队列缓冲策略,拒绝机制和动态线程管理。可以实现线程环境的隔离。当一个线程有问题的时候,也不会对其他的线程造成影响。 以上就是我们使用线程池的原因。一句话来概括就是资源复用,降低开销。

2.java中线程池的实现

       在java中,线程池的主要接口是Executor和ExecutorService在这两个接口中分别对线程池的行为进行了约束,最主要的是在ExecutorService。之后,线程池的实际实现类是AbstractExecutorService类。这个类有三个主要的实现类,ThreadpoolExecutorService、ForkJoinPool以及DelegatedExecutorService。

       后面我们将对这三种最主要的实现类的源码以及实现机制进行分析。

3.创建线程的工厂方法Executors

       在java中, 已经给我们提供了创建线程池的工厂方法类Executors。通过这个类以静态方法的个人博客我源码模式可以为我们创建大多数线程池。Executors提供了5种创建线程池的方式,我们先来看看这个类提供的工厂方法。

3.1 newFixedThreadPool/** * Creates a thread pool that reuses a fixed number of threads * operating off a shared unbounded queue.At any point, at most * { @code nThreads} threads will be active processing tasks. * If additional tasks are submitted when all threads are active, * they will wait in the queue until a thread is available. * If any thread terminates due to a failure during execution * prior to shutdown, a new one will take its place if needed to * execute subsequent tasks.The threads in the pool will exist * until it is explicitly { @link ExecutorService#shutdown shutdown}. * * @param nThreads the number of threads in the pool * @return the newly created thread pool * @throws IllegalArgumentException if { @code nThreads <= 0} */public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());}

       这个方法能够创建一个固定线程数量的无界队列的线程池。参数nthreads是最多可同时处理的活动的线程数。如果在所有线程都在处理任务的情况下,提交了其他的任务,那么这些任务将处于等待队列中。直到有一个线程可用为止。如果任何线程在关闭之前的执行过程中,由于失败而终止,则需要在执行后续任务的时候,创建一个新的线程来替换。线程池中的所有线程都将一直存在,直到显示的调用了shutdown方法。 上述方法能创建一个固定线程数量的线程池。内部默认的是使用LinkedBlockingQueue。但是需要注意的是,这个LinkedBlockingQueue底层是链表结构,其允许的最大队列长度为Integer.MAX_VALUE。

public LinkedBlockingQueue() { this(Integer.MAX_VALUE);}

       这样在使用的过程中如果我们没有很好的控制,那么就可能导致内存溢出,出现OOM异常。csdn 仿qq 源码因此这种方式实际上已经不被提倡。我们在使用的过程中应该谨慎使用。 newFixedThreadPool(int nThreads, ThreadFactory threadFactory)方法:

/** * Creates a thread pool that reuses a fixed number of threads * operating off a shared unbounded queue, using the provided * ThreadFactory to create new threads when needed.At any point, * at most { @code nThreads} threads will be active processing * tasks.If additional tasks are submitted when all threads are * active, they will wait in the queue until a thread is * available.If any thread terminates due to a failure during * execution prior to shutdown, a new one will take its place if * needed to execute subsequent tasks.The threads in the pool will * exist until it is explicitly { @link ExecutorService#shutdown * shutdown}. * * @param nThreads the number of threads in the pool * @param threadFactory the factory to use when creating new threads * @return the newly created thread pool * @throws NullPointerException if threadFactory is null * @throws IllegalArgumentException if { @code nThreads <= 0} */public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) { return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>(),threadFactory);}

       这个方法与3.1中newFixedThreadPool(int nThreads)的方法的唯一区别就是,增加了threadFactory参数。在前面方法中,对于线程的创建是采用的默认实现Executors.defaultThreadFactory()。而在此方法中,可以根据需要自行定制。

3.2 newSingleThreadExecutor/** * Creates an Executor that uses a single worker thread operating * off an unbounded queue. (Note however that if this single * thread terminates due to a failure during execution prior to * shutdown, a new one will take its place if needed to execute * subsequent tasks.)Tasks are guaranteed to execute * sequentially, and no more than one task will be active at any * given time. Unlike the otherwise equivalent * { @code newFixedThreadPool(1)} the returned executor is * guaranteed not to be reconfigurable to use additional threads. * * @return the newly created single-threaded Executor */public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));}

       此方法将会创建指有一个线程和一个无届队列的线程池。需要注意的是,如果这个执行线程在执行过程中由于失败而终止,那么需要在执行后续任务的时候,用一个新的线程来替换。 那么这样一来,上述线程池就能确保任务的顺序性,并且在任何时间都不会有多个线程处于活动状态。与newFixedThreadPool(1)不同的是,使用newSingleThreadExecutor返回的ExecutorService不能被重新分配线程数量。而使用newFixExecutor(1)返回的ExecutorService,其活动的线程的数量可以重新分配。后面专门对这个问题进行详细分析。 newSingleThreadExecutor(ThreadFactory threadFactory) 方法:

/** * Creates an Executor that uses a single worker thread operating * off an unbounded queue, and uses the provided ThreadFactory to * create a new thread when needed. Unlike the otherwise * equivalent { @code newFixedThreadPool(1, threadFactory)} the * returned executor is guaranteed not to be reconfigurable to use * additional threads. * * @param threadFactory the factory to use when creating new * threads * * @return the newly created single-threaded Executor * @throws NullPointerException if threadFactory is null */public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) { return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>(),threadFactory));}

       这个方法与3.3中newSingleThreadExecutor的区别就在于增加了一个threadFactory。可以自定义创建线程的方法。

3.3 newCachedThreadPool/** * Creates a thread pool that creates new threads as needed, but * will reuse previously constructed threads when they are * available.These pools will typically improve the performance * of programs that execute many short-lived asynchronous tasks. * Calls to { @code execute} will reuse previously constructed * threads if available. If no existing thread is available, a new * thread will be created and added to the pool. Threads that have * not been used for sixty seconds are terminated and removed from * the cache. Thus, a pool that remains idle for long enough will * not consume any resources. Note that pools with similar * properties but different details (for example, timeout parameters) * may be created using { @link ThreadPoolExecutor} constructors. * * @return the newly created thread pool */public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE,L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());}

       这个方法用来创建一个线程池,该线程池可以根据需要自动增加线程。以前的线程也可以复用。这个线程池通常可以提高很多执行周期短的异步任务的性能。对于execute将重用以前的构造线程。如果没有可用的线程,就创建一个 新的线程添加到pool中。秒内,如果该线程没有被使用,则该线程将会终止,并从缓存中删除。因此,在足够长的时间内,这个线程池不会消耗任何资源。可以使用ThreadPoolExecutor构造函数创建具有类似属性但是详细信息不同的线程池。 ?需要注意的是,这个方法创建的线程池,虽然队列的长度可控,但是线程的数量的范围是Integer.MAX_VALUE。这样的话,如果使用不当,同样存在OOM的风险。比如说,我们使用的每个任务的耗时比较长,任务的请求又非常快,那么这样势必会造成在单位时间内创建了大量的线程。从而造成内存溢出。 newCachedThreadPool(ThreadFactory threadFactory)方法:

/** * Creates a thread pool that creates new threads as needed, but * will reuse previously constructed threads when they are * available, and uses the provided * ThreadFactory to create new threads when needed. * @param threadFactory the factory to use when creating new threads * @return the newly created thread pool * @throws NullPointerException if threadFactory is null */public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) { return new ThreadPoolExecutor(0, Integer.MAX_VALUE,L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>(),threadFactory);}

       这个方法区别同样也是在于,增加了threadFactory可以自行指定线程的创建方式。

2.4 newScheduledThreadPool/** * Creates a thread pool that can schedule commands to run after a * given delay, or to execute periodically. * @param corePoolSize the number of threads to keep in the pool, * even if they are idle * @return a newly created scheduled thread pool * @throws IllegalArgumentException if { @code corePoolSize < 0} */public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize);}

       创建一个线程池,该线程池可以将任务在指定的延迟时间之后运行。或者定期运行。这个方法返回的是ScheduledThreadPoolExecutor。这个类是ThreadPoolExecutor的子类。在原有线程池的的基础之上,增加了延迟和定时功能。我们在后面分析了ThreadPoolExecutor源码之后,再来分析这个类的源码。 与之类似的方法:

/** * Creates a thread pool that can schedule commands to run after a * given delay, or to execute periodically. * @param corePoolSize the number of threads to keep in the pool, * even if they are idle * @param threadFactory the factory to use when the executor * creates a new thread * @return a newly created scheduled thread pool * @throws IllegalArgumentException if { @code corePoolSize < 0} * @throws NullPointerException if threadFactory is null */public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory) { return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);}

       通过这个方法,我们可以指定threadFactory。自定义线程创建的方式。 同样,我们还可以只指定一个线程:

public static ScheduledExecutorService newSingleThreadScheduledExecutor() { return new DelegatedScheduledExecutorService(new ScheduledThreadPoolExecutor(1));}public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) { return new DelegatedScheduledExecutorService(new ScheduledThreadPoolExecutor(1, threadFactory));}

       上述两个方法都可以实现这个功能,但是需要注意的是,这两个方法的返回在外层包裹了一个包装类。

3.5 newWorkStealingPool

       这种方式是在jdk1.8之后新增的。我们先来看看其源码:

public LinkedBlockingQueue() { this(Integer.MAX_VALUE);}0

       这个方法实际上返回的是ForkJoinPool。该方法创建了一

ForkjoinPool -1

        ForkJoin是用于并行执行任务的框架, 是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。Fork就是把一个大任务切分为若干子任务并行的执行,Join就是合并这些子任务的执行结果,最后得到这个大任务的结果。

        下面是一个是一个简单的Join/Fork计算过程,将1—数字相加

        通常这样个模型,你们会想到什么?

        Release Framework ? 常见的处理模型是什么? task pool - worker pool的模型。 但是Forkjoinpool 采取了完全不同的模型。

        ForkJoinPool一种ExecutorService的实现,运行ForkJoinTask任务。ForkJoinPool区别于其它ExecutorService,主要是因为它采用了一种工作窃取(work-stealing)的机制。所有被ForkJoinPool管理的线程尝试窃取提交到池子里的任务来执行,执行中又可产生子任务提交到池子中。

        ForkJoinPool维护了一个WorkQueue的数组(数组长度是2的整数次方,自动增长)。每个workQueue都有任务队列(ForkJoinTask的数组),并且用base、top指向任务队列队尾和队头。work-stealing机制就是工作线程挨个扫描任务队列,如果队列不为空则取队尾的任务并执行。示意图如下

        流程图:

        pool属性

        workQueues是pool的属性,它是WorkQueue类型的数组。externalPush和externalSubmit所创建的workQueue没有owner(即不是worker),且会被放到workQueues的偶数位置;而createWorker创建的workQueue(即worker)有owner,且会被放到workQueues的奇数位置。

        WorkQueue的几个重要成员变量说明如下:

        这是WorkQueue的config,高位跟pool的config值保持一致,而低位则是workQueue在workQueues数组的位置。

        从workQueues属性的介绍中,我们知道,不是所有workQueue都有worker,没有worker的workQueue称为公共队列(shared queue),config的第位就是用来判断是否是公共队列的。在externalSubmit创建工作队列时,有:

        q.config = k | SHARED_QUEUE;

        其中q是新创建的workQueue,k就是q在workQueues数组中的位置,SHARED_QUEUE=1<<,注意这里config没有保留mode的信息。

        而在registerWorker中,则是这样给workQueue的config赋值的:

        w.config = i | mode;

        w是新创建的workQueue,i是其在workQueues数组中的位置,没有设置SHARED_QUEUE标记位

        scanState是workQueue的属性,是int类型的。scanState的低位可以用来定位当前worker处于workQueues数组的哪个位置。每个worker在被创建时会在其构造函数中调用pool的registerWorker,而registerWorker会给scanState赋一个初始值,这个值是奇数,因为worker是由createWorker创建,并会被放到WorkQueues的奇数位置,而createWorker创建worker时会调用registerWorker。

        简言之,worker的scanState初始值是奇数,非worker的scanstate初始值=INACTIVE=1<<,小于0(非worker的workQueue在externalSubmit中创建)。

        当每次调用signalWork(或tryRelease)唤醒worker时,worker的高位就会加1

        另外,scanState<0表示worker未激活,当worker调用runtask执行任务时,scanState会被置为偶数,即设置scanState的最右边一位为0。

        worker休眠时,是这样存储的

        worker的唤醒类似这样:

        在worker休眠的4行伪码中,让ctl的低位的值变为worker.scanState,这样下次就可以通过scanState唤醒该worker。唤醒该worker时,把该worker的preStack设置为ctl低位的值,这样下下次唤醒的worker就是scanState等于该preStack的worker。

        这里通过preStack保存下一个worker,这个worker比当前worker更早地在等待,所以形成一个后进先出的栈。

        runState是int类型的值,控制整个pool的运行状态和生命周期,有下面几个值(可以好几个值同时存在):

        如果runState值为0,表示pool尚未初始化。

        RSLOCK表示锁定pool,当添加worker和pool终止时,就要使用RSLOCK锁定整个pool。如果由于runState被锁定,导致其他操作等待runState解锁(通常用wait进行等待),当runState设置了RSIGNAL,表示runState解锁,并通知(notifyAll)等待的操作。

        剩下4个值都跟runState生命周期有关,都可以顾名思义:

        当需要停止时,设置runState的STOP值,表示准备关闭,这样其他操作看到这个标记位,就不会继续操作,比如tryAddWorker看到STOP就不会再创建worker:

        而tryTerminate对这些生命周期状态的处理则是这样的:

        当前top和base的初始值为 INITIAL_QUEUE_CAPACITY >>>1= (1 << )>>>1 = /2。然后push一个task之后,top+=1,也就是说,top对应的位置是没有task的,最近push进来的task在top-1的位置。而base的位置则能对应到task,base对应最先放进队列的task,top-1对应最后放进队列的task。

        qlock值含义:1: locked, < 0: terminate; else 0

        即当qlock值位0时,可以正常操作,值=1时,表示锁定

        int SQMASK=0xe,则任何整数跟SQMASK位与后,得到的数就是偶数。

        证明:

        注意这里化为二进制是 ,尤其注意最右边第一位是0,任何数跟最右边第一位是0的数位与后,得到的数就是偶数,因为位与之后,第一位就是0,比如s=A&SQMASK,A可以是任意整数,然后把s按二进制进行多项式展开,则有s=2 n1+2 n2 ……+2^nn,这里n≥1,所以s可以被2整除,即s是偶数。

        所以一个数是奇数还是偶数,看其最右边第一位即可。

        我们知道workQueue有externalPush创建的和createWorker创建的worker,两种方式创建的workQueue,其放置到workQueues的位置是不同的,前者放到workQueue的偶数位置,而后者则放到奇数位置。不同workQueue找到自己在workQueues的位置的算法有点不同。

        下面看一下forkjoin框架获取workQueues中的偶数位置的workQueue的算法:

        这样就能获取workQueues的偶数位置的workQueue。m保证m & r & SQMASK这整个运算结果不会超出workQueues的下标,SQMASK保证取到的是偶数位置的workQueue。这里有一个有趣的现象,假设0到workQueues.length-1之间有n个偶数,m & r & SQMASK每次都能取到其中一个偶数,而且连续n次取到的偶数不会出现重复值,散列性非常好。而且是循环的,即1到n次取n个不同偶数,n+1到2n也是取n次不同偶数,此时n个偶数每个都被重新取一次。下面分析下r值有什么秘密,为何能保证这样的散列性

        ThreadLocalRandom内有一常量PROBE_INCREMENT = 0x9eb9,以及一个静态的probeGenerator =new AtomicInteger() ,然后每个线程的probe= probeGenerator.addAndGet(PROBE_INCREMENT)所以第一个线程的probe值是0x9eb9,第二个线程的值就是0x9eb9+0x9eb9,第三个线程的值就是0x9eb9+0x9eb9+0x9eb9以此类推,整个值是线性的,可以用y=kx表示,其中k=0x9eb9,x表示第几个线程。这样每个线程的probe可以保证不一样,而且具有很好的离散性。

        实际上,可以不用0x9eb9这个值,用任意一个奇数都是可以的,比如1。如果用1的话,probe+=1,这样每个线程的probe就都是不同的,而且具有很好的离散性。也就是说,假设有限制条件probe<n,超过n则产生溢出。则probe自加n次后才会开始出现重复值,n次前probe每次自加的值都不同。实际上用任意一个奇数,都可以保证probe自加n次后才会开始出现重复值,有兴趣可看本文最后附录部分。由于奇数的离散性,所以只要线程数小于m或者SQMASK两者中的最小值,则每个线程都能唯一地占据一个ws中的一个位置

        当一个操作是在非ForkjoinThread的线程中进行的,则称该操作为外部操作。比如我们前面执行pool.invoke,invoke内又执行externalPush。由于invoke是在非ForkjoinThread线程中进行的(这里是在main线程中进行),所以是一个外部操作,调用的是externalPush。之后task的执行是通过ForkJoinThread来执行的,所以task中的fork就是内部操作,调用的是push,把任务提交到工作队列。其实fork的实现是类似下面这样的:

        即fork会根据执行自身的线程是否是ForkJoinThread的实例来判断是处于外部还是内部。那为何要区分内外部?

        任何线程都可以使用ForkJoin框架,但是对于非ForkJoinThread的线程,它到底是怎样的,ForkJoin无法控制,也无法对其优化。因此区分出内外部,这样方便ForkJoin框架对任务的执行进行控制和优化

        forkJoinPool.invoke(task)是把任务放入工作队列,并等待任务执行。源码如下

        这里externalPush负责任务提交,externalPush源码如下:

yarn源码分析(四)AppMaster启动

       在容器分配完成之后,启动容器的代码主要在ContainerImpl.java中进行。通过状态机转换,container从NEW状态向其他状态转移时,会调用RequestResourceTransition对象。RequestResourceTransition负责将所需的资源进行本地化,或者避免资源本地化。若需本地化,还需过渡到LOCALIZING状态。为简化理解,此处仅关注是否进行资源本地化的情况。

       为了将LAUNCH_CONTAINER事件加入事件处理队列,调用了sendLaunchEvent方法。该事件由ContainersLauncher负责处理。ContainersLauncher的handle方法中,使用一个ExecutorService(线程池)容器Launcher。ContainerLaunch实现了Callable接口,其call方法生成并执行launch_container脚本。以MapReduce框架为例,该脚本在hadoop.tmp.dir/application name/container name目录下生成,其主要作用是启动MRAppMaster进程,即MapReduce的ApplicationMaster。

Java原理系列ScheduledThreadPoolExecutor原理用法示例源码详解

       ScheduledThreadPoolExecutor是Java中实现定时任务与周期性执行任务的高效工具。它继承自ThreadPoolExecutor类,能够提供比常规Timer类更强大的灵活性与功能,特别是在需要多个工作线程或有特殊调度需求的场景下。

       该类主要功能包含但不限于提交在指定延迟后执行的任务,以及按照固定间隔周期执行的任务。它实现了ScheduledExecutorService接口,进而提供了丰富的API以实现任务的调度与管理。其中包括now()、getDelay()、compareTo()等方法,帮助开发者更精确地处理任务调度与延迟。

       在实际应用中,ScheduledThreadPoolExecutor的使用案例广泛。比如,初始化一个ScheduledThreadPoolExecutor实例,设置核心线程数,从而为定时任务提供资源保障。提交延迟任务,例如在5秒后执行特定操作,并输出相关信息。此外,提交周期性任务,如每隔2秒执行一次特定操作,用于实时监控或数据更新。最后,通过调用shutdown()与shutdownNow()方法来关闭执行器并等待所有任务完成,确保系统资源的合理释放与任务的有序结束。

       总的来说,ScheduledThreadPoolExecutor在处理需要精确时间控制的任务时展现出了强大的功能与灵活性,是Java开发者在实现定时与周期性任务时的首选工具。