免费爱碰视频在线观看,九九精品国产屋,欧美亚洲尤物久久精品,1024在线观看视频亚洲

      造輪子之MemorySafeLinkedBlockingQueue-LinkBlockingQueue改進

      LinkBlockingQueue改進

      問題背景

      https://github.com/apache/dubbo/pull/9722/files使用線程池的同學對于標題中的隊列想必都有過使用,但上述隊列使用不當時則會造成程序OOM,那怎么來控制呢?

      使用ArrayBlockingQueue?如何來評估長度?

      是否有一個完美的解決方案呢,MemorySafeLinkedBlockingQueue則通過對內(nèi)存的限制判斷盡面控制隊列的容量,完成解決了可能存在的OOM問題。

      獲取內(nèi)存大小(注:單位大B;支持準實時更新):

      Runtime.getRuntime().freeMemory()//JVM中已經(jīng)申請到的堆內(nèi)存中還未使用的大小Runtime.getRuntime().maxMemory()// JVM可從操作系統(tǒng)申請到的最大內(nèi)存值 -XxmRuntime.getRuntime().totalMemory()// JVM已從操作系統(tǒng)申請到的內(nèi)存大小 —Xxs可設(shè)置該值大小-初始堆的大小

      線程池在excute任務(wù)時,放隊列,放不進去,使用新線程運行任務(wù)。這個放不進行,是使用的offer??非阻塞方法嗎?

      參考:https://blog.csdn.net/weixin_43108539/article/details/125190023

      public void execute(Runnable command) {if (command == null)throw new NullPointerException(); //拿到32位的intint c = ctl.get(); //工作線程數(shù)<核心線程數(shù)if (workerCountOf(c) < corePoolSize) {//進入if,代表可以創(chuàng)建 核心 線程數(shù)if (addWorker(command, true))return;//如果沒進入if,代表創(chuàng)建核心線程數(shù)失敗,重新獲取 ctlc = ctl.get();}//判斷線程池為Running狀態(tài),將任務(wù)添加入阻塞隊列,使用offerif (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();//再次判斷是否為Running狀態(tài),若不是Running狀態(tài),remove任務(wù)if (! isRunning(recheck) && remove(command))reject(command);//如果線程池在Running狀態(tài),線程池數(shù)量為0else if (workerCountOf(recheck) == 0)//阻塞隊列有任務(wù),但是沒有工作線程,添加一個任務(wù)為空的工作線程處理阻塞隊列中的任務(wù)addWorker(null, false);}//阻塞隊列已滿,創(chuàng)建非核心線程,拒絕策略-addWorker中有判斷核心線程數(shù)是否超過最大線程數(shù)else if (!addWorker(command, false))reject(command);}

      空閑內(nèi)存計算

      package com.zte.sdn.oscp.queue;import cn.hutool.core.thread.NamedThreadFactory;import java.util.concurrent.Executors;import java.util.concurrent.ScheduledExecutorService;import java.util.concurrent.TimeUnit;import java.util.concurrent.atomic.AtomicBoolean;public class MemoryLimitCalculator {private static volatile long maxAvailable;private static final AtomicBoolean refreshStarted = new AtomicBoolean(false);private static void refresh() {maxAvailable = Runtime.getRuntime().freeMemory();}private static void checkAndScheduleRefresh() {if (!refreshStarted.get()) {// immediately refresh when first call to prevent maxAvailable from being 0// to ensure that being refreshed before refreshStarted being set as true// notice: refresh may be called for more than once because there is no lockrefresh();if (refreshStarted.compareAndSet(false, true)) {ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory(“Dubbo-Memory-Calculator”));// check every 50 ms to improve performancescheduledExecutorService.scheduleWithFixedDelay(MemoryLimitCalculator::refresh, 50, 50, TimeUnit.MILLISECONDS);Runtime.getRuntime().addShutdownHook(new Thread(() -> {refreshStarted.set(false);scheduledExecutorService.shutdown();}));}}}/** * Get the maximum available memory of the current JVM. * * @return maximum available memory */public static long maxAvailable() {checkAndScheduleRefresh();return maxAvailable;}/** * Take the current JVM’s maximum available memory * as a percentage of the result as the limit. * * @param percentage percentage * @return available memory */public static long calculate(final float percentage) {if (percentage1) {throw new IllegalArgumentException();}checkAndScheduleRefresh();return (long) (maxAvailable() * percentage);}/** * By default, it takes 80% of the maximum available memory of the current JVM. * * @return available memory */public static long defaultLimit() {checkAndScheduleRefresh();return (long) (maxAvailable() * 0.8);}}

      內(nèi)存安全隊列

      package com.zte.sdn.oscp.queue;import java.util.Collection;import java.util.concurrent.LinkedBlockingQueue;import java.util.concurrent.TimeUnit;public class MemorySafeLinkedBlockingQueue extends LinkedBlockingQueue {private static final long serialVersionUID = 8032578371739960142L;public static int THE_256_MB = 256 * 1024 * 1024;private int maxFreeMemory;private Rejector rejector;public MemorySafeLinkedBlockingQueue() {this(THE_256_MB);}public MemorySafeLinkedBlockingQueue(final int maxFreeMemory) {super(Integer.MAX_VALUE);this.maxFreeMemory = maxFreeMemory;//default as DiscardPolicy to ensure compatibility with the old versionthis.rejector = new DiscardPolicy();}public MemorySafeLinkedBlockingQueue(final Collection c, final int maxFreeMemory) {super(c);this.maxFreeMemory = maxFreeMemory;//default as DiscardPolicy to ensure compatibility with the old versionthis.rejector = new DiscardPolicy();}/** * set the max free memory. * * @param maxFreeMemory the max free memory */public void setMaxFreeMemory(final int maxFreeMemory) {this.maxFreeMemory = maxFreeMemory;}/** * get the max free memory. * * @return the max free memory limit */public int getMaxFreeMemory() {return maxFreeMemory;}/** * set the rejector. * * @param rejector the rejector */public void setRejector(final Rejector rejector) {this.rejector = rejector;}/** * determine if there is any remaining free memory. * * @return true if has free memory */public boolean hasRemainedMemory() {return MemoryLimitCalculator.maxAvailable() > maxFreeMemory;}@Overridepublic void put(final E e) throws InterruptedException {if (hasRemainedMemory()) {super.put(e);} else {rejector.reject(e, this);}}@Overridepublic boolean offer(final E e, final long timeout, final TimeUnit unit) throws InterruptedException {if (!hasRemainedMemory()) {rejector.reject(e, this);return false;}return super.offer(e, timeout, unit);}@Overridepublic boolean offer(final E e) {if (!hasRemainedMemory()) {rejector.reject(e, this);return false;}return super.offer(e);}}

      拒絕策略

      注意其中的rejector是拒絕策略,默認的DiscardPolicy什么也不處理;

      而DiscardOldPolicy的處理邏輯很簡單

      public class DiscardOldestPolicy implements Rejector {@Overridepublic void reject(final E e, final Queue queue) {queue.poll();queue.offer(e);}}

      AbortPolicy則直接拋出異常

      public class AbortPolicy implements Rejector {@Overridepublic void reject(final E e, final Queue queue) {throw new RejectException(“no more memory can be used !”);}}

      個人建議增加日志打印即可。

      鄭重聲明:本文內(nèi)容及圖片均整理自互聯(lián)網(wǎng),不代表本站立場,版權(quán)歸原作者所有,如有侵權(quán)請聯(lián)系管理員(admin#wlmqw.com)刪除。
      上一篇 2022年9月9日 12:18
      下一篇 2022年9月9日 12:18

      相關(guān)推薦

      聯(lián)系我們

      聯(lián)系郵箱:admin#wlmqw.com
      工作時間:周一至周五,10:30-18:30,節(jié)假日休息