免费爱碰视频在线观看,九九精品国产屋,欧美亚洲尤物久久精品,1024在线观看视频亚洲

      面試官:有了解過線程池的工作原理嗎?說說看

      面試官: 有了解過線程池的工作原理嗎?說說看

      前言

      目前正在出一個Java多線程專題長期系列教程,從入門到進階源碼解讀, 篇幅會較多, 喜歡的話,給個關(guān)注 ~

      本節(jié)主要帶大家從ThreadPoolExecutor源碼角度來了解一下線程池的工作原理,一起來看下吧~

      Executor 接口

      首先Executor這個接口是線程池實現(xiàn)的頂層接口類,我們上節(jié)遇到的ExecutorService也是繼承了Executor

      public interface ExecutorService extends Executor {…}

      ExecutorService的上層AbstractExecutorService這個抽象類實現(xiàn)了接口ExecutorService

      public abstract class AbstractExecutorService implements ExecutorService {…}

      ThreadPoolExecutor繼承了AbstractExecutorService

      public class ThreadPoolExecutor extends AbstractExecutorService {…}

      ThreadPoolExecutor這個類我們需要重點看一下,它是接口的實現(xiàn)類,我們以newCachedThreadPool為例

      public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue()); }

      可以看到內(nèi)部其實還是調(diào)用了ThreadPoolExecutor,我們再看newFixedThreadPool

      public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()); }

      內(nèi)部也是調(diào)了它, 下面我們就看下這個類

      ThreadPoolExecutor

      首先我們從構(gòu)造函數(shù)看起,它主要有四個構(gòu)造函數(shù)

      構(gòu)造函數(shù)一

      public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); }

      一共有五個參數(shù):

      • corePoolSize – 保留在池中的線 程數(shù),即使是空閑的,除非設(shè)置allowCoreThreadTimeOut
      • maximumPoolSize – 池中允許的最大線程數(shù)
      • keepAliveTime – 當(dāng)線程數(shù)大于核心時,這是多余的空閑線程在終止前等待新任務(wù)的最長時間。
      • unit – keepAliveTime參數(shù)的時間單位
      • workQueue – 用于在執(zhí)行任務(wù)之前保存任務(wù)的隊列。此隊列將僅保存由execute方法提交的Runnable任務(wù)。

      構(gòu)造函數(shù)二

      public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, defaultHandler); }

      其它參數(shù)同上

      • threadFactory 執(zhí)行器創(chuàng)建新線程時使用的工廠

      構(gòu)造函數(shù)三

      public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, RejectedExecutionHandler handler) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler); }

      • handler 由于達到線程邊界和隊列容量而阻塞執(zhí)行時使用的處理程序

      構(gòu)造函數(shù)四

      public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }

      這個把之前都綜合了一下,其實可以看到前幾個內(nèi)部都調(diào)用了this,調(diào)用自身,也就是調(diào)用這個構(gòu)造函數(shù),進行一些初始化

      BlockingQueue 阻塞隊列

      有幾個參數(shù)比較好理解,我們來看下這個參數(shù)workQueue, 它是一個阻塞隊列,這里簡要給大家提一下,這塊內(nèi)容也比較重要,后邊會專門去講

      BlockingQueue本身是一個還接口,它有幾個比較常用的阻塞隊列

      • LinkedBlockingQueue 鏈?zhǔn)阶枞犃校讓訑?shù)據(jù)結(jié)構(gòu)是鏈表
      • ArrayBlockingQueue 數(shù)組阻塞隊列,底層數(shù)據(jù)結(jié)構(gòu)是數(shù)組,需要指定隊列的大小。
      • SynchronousQueue 同步隊列,內(nèi)部容量為0,每個put操作必須等待一個take操作
      • DelayQueue 延遲隊列,該隊列中的元素只有當(dāng)其指定的延遲時間到了,才能夠從隊列中獲取到該元素

      ThreadFactory 線程工廠

      這個是線程工廠類,統(tǒng)一在創(chuàng)建線程時設(shè)置一些參數(shù),如是否守護線程、線程的優(yōu)先級等, 同樣它也是一個接口,我們在ThreadPoolExecutor內(nèi)部看到了 Executors.defaultThreadFactory(),這個是一個默認(rèn)工廠

      static class DefaultThreadFactory implements ThreadFactory { private static final AtomicInteger poolNumber = new AtomicInteger(1); private final ThreadGroup group; private final AtomicInteger threadNumber = new AtomicInteger(1); private final String namePrefix; DefaultThreadFactory() { SecurityManager s = System.getSecurityManager(); group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); namePrefix = “pool-” + poolNumber.getAndIncrement() + “-thread-“; } public Thread newThread(Runnable r) { Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0); if (t.isDaemon()) t.setDaemon(false); if (t.getPriority() != Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY); return t; } }

      沒有指定參數(shù),就會默認(rèn)創(chuàng)建DefaultThreadFactory,還有其它的factory,大家可以自行看下,區(qū)別就在創(chuàng)建線程時指定的參數(shù)

      RejectedExecutionHandler 拒絕策略

      RejectedExecutionHandler同樣是一個接口,這個處理器是用來專門處理拒絕的任務(wù),也就是ThreadPoolExecutor無法處理的程序。同理,我們可以看到ThreadPoolExecutor內(nèi)部有調(diào)了defaultHandler

      private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();

      這個是默認(rèn)的拒絕策略, 可以看到它的默認(rèn)處理是拋出拒絕的異常

      public static class AbortPolicy implements RejectedExecutionHandler { /** * Creates an {@code AbortPolicy}. */ public AbortPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException(“Task ” + r.toString() + ” rejected from ” + e.toString()); } }

      再帶大家看下另外的策略, DiscardPolicy,這個策略不會拋出異常,它會丟棄這個任務(wù)

      public static class DiscardPolicy implements RejectedExecutionHandler { /** * Creates a {@code DiscardPolicy}. */ public DiscardPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { } }

      DiscardOldestPolicy 該策略丟棄最舊的未處理請求,然后重試execute ,除非執(zhí)行程序被關(guān)閉,在這種情況下任務(wù)被丟棄。

      public static class DiscardOldestPolicy implements RejectedExecutionHandler { /** * Creates a {@code DiscardOldestPolicy} for the given executor. */ public DiscardOldestPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { // 判斷是否關(guān)閉 if (!e.isShutdown()) { e.getQueue().poll(); // 任務(wù)重試 e.execute(r); } } }

      CallerRunsPolicy 直接在execute方法的調(diào)用線程中運行被拒絕的任務(wù),除非執(zhí)行程序已關(guān)閉,在這種情況下,任務(wù)將被丟棄。

      public static class CallerRunsPolicy implements RejectedExecutionHandler { /** * Creates a {@code CallerRunsPolicy}. */ public CallerRunsPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { // 直接判斷是否關(guān)閉 未關(guān)閉就執(zhí)行 if (!e.isShutdown()) { r.run(); } } }

      線程調(diào)度策略

      看完構(gòu)造函數(shù),下面看下它的一些常量

      private static final int COUNT_BITS = Integer.SIZE – 3;// runState is stored in the high-order bits private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS;

      通過變量名,我們大致知道是用來表示線程池的狀態(tài)。線程池本身有一個調(diào)度線程,這個線程就是用于管理整個線程池的各種任務(wù)和事務(wù),例如創(chuàng)建線程、銷毀線程、任務(wù)隊列管理、線程隊列管理等等,所以它本身也有上面的狀態(tài)值。

      當(dāng)線程池被創(chuàng)建后就會處于RUNNING狀態(tài), 主池控制狀態(tài)ctl是一個原子整數(shù)

      private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

      調(diào)用shutdown()方法后處于「SHUTDOWN」?fàn)顟B(tài),線程池不能接受新的任務(wù),清除一些空閑worker,不會等待阻塞隊列的任務(wù)完成。

      public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(SHUTDOWN); interruptIdleWorkers(); onShutdown(); // hook for ScheduledThreadPoolExecutor } finally { mainLock.unlock(); } tryTerminate(); }

      另外,還有一個shutdownNow,調(diào)用后處于「STOP」?fàn)顟B(tài),線程池不能接受新的任務(wù),中斷所有線程,阻塞隊列中沒有被執(zhí)行的任務(wù)全部丟棄。此時,poolsize=0,阻塞隊列的size也為0。

      public List shutdownNow() { List tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(STOP); // 中斷所有線程 interruptWorkers(); // 將任務(wù)隊列排空到一個新列表中 這里要注意下 tasks = drainQueue(); } finally { mainLock.unlock(); } tryTerminate(); return tasks; }

      當(dāng)所有的任務(wù)已終止,ctl記錄的”任務(wù)數(shù)量”為0,線程池會變?yōu)椤窽IDYING」?fàn)顟B(tài)。接著會執(zhí)行terminated()函數(shù)。

      final void tryTerminate() { for (;;) { int c = ctl.get(); if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) return; // 不等于0時 中斷任務(wù)線程 if (workerCountOf(c) != 0) { // Eligible to terminate interruptIdleWorkers(ONLY_ONE); return; } final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 將狀態(tài)設(shè)置為 TIDYING if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { try { // 終止 terminated(); } finally { // 執(zhí)行完 terminated 轉(zhuǎn)為 TERMINATED狀態(tài) ctl.set(ctlOf(TERMINATED, 0)); termination.signalAll(); } return; } } finally { mainLock.unlock(); } // else retry on failed CAS } }

      execute

      這個是執(zhí)行任務(wù)的核心方法,我們一起看一下

      public void execute(Runnable command) { // 如果任務(wù)不存在 拋空異常 if (command == null) throw new NullPointerException(); // 獲取當(dāng)前狀態(tài)值 int c = ctl.get(); // 當(dāng)前線程數(shù)小于corePoolSize,則調(diào)用addWorker創(chuàng)建核心線程執(zhí)行任務(wù) if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } // 如果不小于corePoolSize,則將任務(wù)添加到workQueue隊列。 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); // 如果isRunning返回false(狀態(tài)檢查),則remove這個任務(wù),然后執(zhí)行拒絕策略。 if (! isRunning(recheck) && remove(command)) reject(command); // 線程池處于running狀態(tài),但是沒有線程,則創(chuàng)建線程 else if (workerCountOf(recheck) == 0) addWorker(null, false); } // 如果放入workQueue失敗,則創(chuàng)建非核心線程執(zhí)行任務(wù), else if (!addWorker(command, false)) // 如果這時創(chuàng)建失敗,就會執(zhí)行拒絕策略。 reject(command); }

      在源碼中,我們可以看到,多次進行了isRunning判斷。在多線程的環(huán)境下,線程池的狀態(tài)是多變的。很有可能剛獲取線程池狀態(tài)后線程池狀態(tài)就改變了

      總結(jié)

      下面給大家簡要的總結(jié)一下線程池的處理流程

    1. 線程總數(shù)量小于線程池中保留的線程數(shù)量(corePoolSize),無論線程是否空閑,都會新建一個核心線程執(zhí)行任務(wù),這一步需要獲取全局鎖
    2. 線程總數(shù)量大于corePoolSize時,新來的線程任務(wù)會進入任務(wù)隊列中等待,然后空閑的核心線程會依次去緩存隊列中取任務(wù)來執(zhí)行,從而達到線程的復(fù)用
    3. 當(dāng)緩存隊列滿了,會創(chuàng)建非核心線程去執(zhí)行這個任務(wù)。
    4. 緩存隊列滿了, 且總線程數(shù)達到了maximumPoolSize,則會采取拒絕策略進行處理。
    5. 結(jié)束語

      它的源碼還是比較長的,一篇文章說不清楚,有興趣的同學(xué)可以通過本篇文章的理解繼續(xù)閱讀它的源碼。

      下一節(jié), 繼續(xù)帶大家詳探討ThreadPoolExecutor中是如何進行線程復(fù)用 ~

      往期內(nèi)容推薦

      • Java多線程專題之線程與進程概述
      • Java多線程專題之線程類和接口入門
      • Java多線程專題之進階學(xué)習(xí)Thread(含源碼分析)
      • Java多線程專題之Callable、Future與FutureTask(含源碼分析)
      • 面試官: 有了解過線程組和線程優(yōu)先級嗎
      • 面試官: 說一下線程的生命周期過程
      • 面試官: 說一下線程間的通信
      • 面試官: 說一下Java的共享內(nèi)存模型
      • 面試官: 有了解過指令重排嗎,什么是happens-before
      • 面試官: 有了解過volatile關(guān)鍵字嗎 說說看
      • 面試官: 有了解過Synchronized嗎 說說看
      • Java多線程專題之Lock鎖的使用
      • 面試官: 有了解過ReentrantLock的底層實現(xiàn)嗎?說說看
      • 面試官: 有了解過CAS和原子操作嗎?說說看
      • Java多線程專題之線程池的基本使用
      • 我的博客(閱讀體驗較佳)
      • 寫給初學(xué)者的Java基礎(chǔ)教程
      • 一文帶你快速學(xué)習(xí)Java集合類
      • 花幾分鐘快速了解一下泛型與枚舉
      • Java注解與反射入門到進階
      • JavaIO教程從入門到進階

      項目源碼(源碼已更新 歡迎star )

      • java-thread-all
      • 地址: https://github.com/qiuChengleiy/java-thread-all.git

      推薦 SpringBoot & SpringCloud (源碼已更新 歡迎star )

      • springboot-all
      • 地址: https://github.com/qiuChengleiy/springboot-all.git
      • SpringBoot系列教程合集
      • 一起來學(xué)SpringCloud合集
      鄭重聲明:本文內(nèi)容及圖片均整理自互聯(lián)網(wǎng),不代表本站立場,版權(quán)歸原作者所有,如有侵權(quán)請聯(lián)系管理員(admin#wlmqw.com)刪除。
      用戶投稿
      上一篇 2022年6月23日 06:09
      下一篇 2022年6月23日 06:09

      相關(guān)推薦

      聯(lián)系我們

      聯(lián)系郵箱:admin#wlmqw.com
      工作時間:周一至周五,10:30-18:30,節(jié)假日休息