多線程在現(xiàn)在工作中出現(xiàn)越來越頻繁、需要我們熟記并且能熟練地使用之、對相關(guān)線程池的一些配置需要我們非常熟悉。
1. 參數(shù)詳解
1.1 corePoolSize
corePoolSize 核心線程數(shù)
– 核心線程會一直存活,即使沒有任務(wù)需要執(zhí)行- 當線程數(shù)小于核心線程數(shù)時,即使有線程空閑,線程池也會優(yōu)先創(chuàng)建新線程處理- 設(shè)置allowCoreThreadTimeOut=true(默認false)時,核心線程會超時關(guān)閉
1.2 queueCapacity
queueCapacity 任務(wù)隊列容量(阻塞隊列)
- 當核心線程數(shù)達到最大時,新任務(wù)會放在隊列中排隊等待
1.3 maxPoolSize
maxPoolSize 最大線程數(shù)
– 當線程數(shù) >= corePoolSize 且任務(wù)隊列已滿時,線程池會創(chuàng)建新線程來處理任務(wù)- 當線程數(shù) = maxPoolSize且任務(wù)隊列已滿時,線程池會拒絕處理任務(wù)而拋出異常
1.4 keepAliveTime
keepAliveTime 線程空閑時間
– 當線程空閑時間達到keepAliveTime時,線程會退出,直到線程數(shù)量=coolPoolSize- 如果allowCoreThreadTimeOut=true時,則會直到線程數(shù)量=0
1.5 allowCoreThreadTimeOut
allowCoreThreadTimeOut 允許核心線程超時
1.6 rejectedExecutionHandler
rejectedExecutionHandler 任務(wù)拒絕處理器
- 兩種情況會拒絕處理任務(wù)
- 當線程數(shù)已經(jīng)達到maxPoolSize且任務(wù)隊列已滿,就會拒絕新的任務(wù)
- 當線程池被調(diào)運shutdown() 然后,會等待線程池里的任務(wù)執(zhí)行完畢在shutdown。如果在調(diào)運shutdown() 和線程池真正shutdown之間提交的任務(wù),會拒絕新的任務(wù)
- 線程池會調(diào)運rejectedExecutionHandler來處理這個任務(wù),如果沒有設(shè)置默認是AbortPolicy,會拋出異常
– ThreadPoolExecutor類有幾個內(nèi)部實現(xiàn)類來處理這類情況 1. **AbortPolicy** 丟棄任務(wù)、拋運行時異常 1. **CallerRunsPolicy** 執(zhí)行任務(wù) 1. **DisCardPolicy** 忽視,什么都不會發(fā)生 1. **DisCardOldestPolicy** 從隊列中提出最先 進入隊列(最后一個執(zhí)行)的任務(wù)
- 實現(xiàn)RejectedExecutionHandler接口,可自定義處理器
2. ThreadPoolExecutor 執(zhí)行順序
線程池按照以下行為執(zhí)行任務(wù)
1. 當線程數(shù) = 核心線程數(shù),且任務(wù)隊列未滿時,將任務(wù)放入任務(wù)隊列。3. 當線程數(shù) >= 核心線程數(shù),且任務(wù)隊列已滿- 若線程數(shù) < 最大線程數(shù),創(chuàng)建線程- 若線程數(shù) = 最大線程數(shù),拋出異常,拒絕任務(wù)
3. 如何設(shè)置參數(shù)
3.1 默認值
corePoolSize = 1queueCapacity = Integer.MAX_VALUEmaxPoolSize = Integer.MAX_VALUEkeepAliveTime = 60sallowCoreThreadTimeOut = falserejectedExecutionHandler = AbortPolicy
3.2 如何來設(shè)置
- 需要根據(jù)幾個值來決定
tasks : 每秒的任務(wù)數(shù),假設(shè)為 500 – 1000taskcost : 每個任務(wù)花費時間,假設(shè)為0.1sresponsetime : 系統(tǒng)允許容忍的最大響應(yīng)時間,假設(shè)為1s
- 計算方式
corePoolSize = 每秒需要多少個線程處理 * threadcount = tasks/(1/taskcost) =tasks*taskcout = (500~1000)*0.1 = 50~100 個線程。corePoolSize設(shè)置應(yīng)該大于50 * 根據(jù)8020原則,如果80%的每秒任務(wù)數(shù)小于800,那么corePoolSize設(shè)置為80即可queueCapacity = (coreSizePool/taskcost)*responsetime * 計算可得 queueCapacity = 80/0.1*1 = 80。意思是隊列里的線程可以等待1s,超過了的需要新開線程來執(zhí)行 * 切記不能設(shè)置為Integer.MAX_VALUE,這樣隊列會很大,線程數(shù)只會保持在corePoolSize大小,當任務(wù)陡增時,不能新開線程來執(zhí)行,響應(yīng)時間會隨之陡增。maxPoolSize = (max(tasks)- queueCapacity)/(1/taskcost) * 計算可得 maxPoolSize = (1000-80)/10 = 92 * (最大任務(wù)數(shù)-隊列容量)/每個線程每秒處理能力 = 最大線程數(shù)rejectedExecutionHandler:根據(jù)具體情況來決定,任務(wù)不重要可丟棄,任務(wù)重要則要利用一些緩沖機制來處理keepAliveTime和allowCoreThreadTimeout采用默認通常能滿足
4. 使用案例
@Componentpublic class KafkaReceiver { @Autowired private BroadbandService broadbandService; private ThreadFactory namedThreadFactory = new ThreadFactoryBuilder() .setNameFormat(“deal-pool-%d”) .build(); private ExecutorService threadPool = new ThreadPoolExecutor(10, 20, 5, TimeUnit.SECONDS, new LinkedBlockingQueue(10), namedThreadFactory, new ThreadPoolExecutor.CallerRunsPolicy()); /** * 監(jiān)聽kafka消息受理業(yè)務(wù) */ @KafkaListener(topics = “${kafka.topics[1]}”) public void listenerKafkaProductCommit(ConsumerRecord record) { threadPool.execute(() -> { try { BusinessOrder businessOrder = JsonUtil.jsonToObject(record.value(), BusinessOrder.class); log.info(“消息隊列正在工作,訂單號: {}”, businessOrder.getOrderNo()); //嘗試多線程處理 broadbandService.asynSchoolBroadbandRemoveService(businessOrder); } catch (Exception e) { log.error(“消息隊列正在工作異常”, e); } }); } }