老司机91精品网站在线观看_久久69精品久久久久久hb_成人欧美在线观看_免费一级日本c片完整版

首頁>理財 > 正文

Java?CompletableFuture?異步超時實現深入研究|天天速看

2023-07-03 12:45:28    出處:腳本之家
目錄
前言常見使用方式存在的問題分析現有做法解決方式JDK 9JDK 8

前言

作者:京東科技 張天賜

JDK 8 是一次重大的版本升級,新增了非常多的特性,其中之一便是CompletableFuture。自此從 JDK 層面真正意義上的支持了基于事件的異步編程范式,彌補了Future的缺陷。

在我們的日常優化中,最常用手段便是多線程并行執行。這時候就會涉及到CompletableFuture的使用。


(資料圖片)

常見使用方式

下面舉例一個常見場景。

假如我們有兩個 RPC 遠程調用服務,我們需要獲取兩個 RPC 的結果后,再進行后續邏輯處理。

public static void main(String[] args) {
    // 任務 A,耗時 2 秒
    int resultA = compute(1);
    // 任務 B,耗時 2 秒
    int resultB = compute(2);
    // 后續業務邏輯處理
    System.out.println(resultA + resultB);
}

可以預估到,串行執行最少耗時 4 秒,并且 B 任務并不依賴 A 任務結果。

對于這種場景,我們通常會選擇并行的方式優化,Demo 代碼如下:

public static void main(String[] args) {
    // 僅簡單舉例,在生產代碼中可別這么寫!
    // 統計耗時的函數
    time(() -> {
        CompletableFuture result = Stream.of(1, 2)
                                                  // 創建異步任務
                                                  .map(x -> CompletableFuture.supplyAsync(() -> compute(x), executor))
                                                  // 聚合
                                                  .reduce(CompletableFuture.completedFuture(0), (x, y) -> x.thenCombineAsync(y, Integer::sum, executor));
        // 等待結果
        try {
            System.out.println("結果:" + result.get());
        } catch (ExecutionException | InterruptedException e) {
            System.err.println("任務執行異常");
        }
    });
}
輸出:
[async-1]: 任務執行開始:1
[async-2]: 任務執行開始:2
[async-1]: 任務執行完成:1
[async-2]: 任務執行完成:2
結果:3
耗時:2 秒

可以看到耗時變成了 2 秒。

存在的問題

分析

看上去CompletableFuture現有功能可以滿足我們訴求。但當我們引入一些現實常見情況時,一些潛在的不足便暴露出來了。

compute(x)如果是一個根據入參查詢用戶某類型優惠券列表的任務,我們需要查詢兩種優惠券并組合在一起返回給上游。假如上游要求我們 2 秒內處理完畢并返回結果,但compute(x)耗時卻在 0.5 秒 ~ 無窮大波動。這時候我們就需要把耗時過長的compute(x)任務結果放棄,僅處理在指定時間內完成的任務,盡可能保證服務可用。

那么以上代碼的耗時由耗時最長的服務決定,無法滿足現有訴求。通常我們會使用get(long timeout, TimeUnit unit)來指定獲取結果的超時時間,并且我們會給compute(x)設置一個超時時間,達到后自動拋異常來中斷任務。

public static void main(String[] args) {
    // 僅簡單舉例,在生產代碼中可別這么寫!
    // 統計耗時的函數
    time(() -> {
        List> result = Stream.of(1, 2)
                                                        // 創建異步任務,compute(x) 超時拋出異常
                                                        .map(x -> CompletableFuture.supplyAsync(() -> compute(x), executor))
                                                        .toList();
        // 等待結果
        int res = 0;
        for (CompletableFuture future : result) {
            try {
                res += future.get(2, SECONDS);
            } catch (ExecutionException | InterruptedException | TimeoutException e) {
                System.err.println("任務執行異常或超時");
            }
        }
        System.out.println("結果:" + res);
    });
}
輸出:
[async-2]: 任務執行開始:2
[async-1]: 任務執行開始:1
[async-1]: 任務執行完成:1
任務執行異常或超時
結果:1
耗時:2 秒

可以看到,只要我們能夠給compute(x)設置一個超時時間將任務中斷,結合getgetNow等獲取結果的方式,就可以很好地管理整體耗時。

那么問題也就轉變成了,如何給任務設置異步超時時間呢

現有做法

當異步任務是一個 RPC 請求時,我們可以設置一個 JSF 超時,以達到異步超時效果。

當請求是一個 R2M 請求時,我們也可以控制 R2M 連接的最大超時時間來達到效果。

這么看好像我們都是在依賴三方中間件的能力來管理任務超時時間?那么就存在一個問題,中間件超時控制能力有限,如果異步任務是中間件 IO 操作 + 本地計算操作怎么辦?

用 JSF 超時舉一個具體的例子,反編譯 JSF 的獲取結果代碼如下:

public V get(long timeout, TimeUnit unit) throws InterruptedException {
    // 配置的超時時間
    timeout = unit.toMillis(timeout);
    // 剩余等待時間
    long remaintime = timeout - (this.sentTime - this.genTime);
    if (remaintime <= 0L) {
        if (this.isDone()) {
            // 反序列化獲取結果
            return this.getNow();
        }
    } else if (this.await(remaintime, TimeUnit.MILLISECONDS)) {
        // 等待時間內任務完成,反序列化獲取結果
        return this.getNow();
    }
    this.setDoneTime();
    // 超時拋出異常
    throw this.clientTimeoutException(false);
}

當這個任務剛好卡在超時邊緣完成時,這個任務的耗時時間就變成了超時時間 + 獲取結果時間。而獲取結果(反序列化)作為純本地計算操作,耗時長短受 CPU 影響較大。

某些 CPU 使用率高的情況下,就會出現異步任務沒能觸發拋出異常中斷,導致我們無法準確控制超時時間。對上游來說,本次請求全部失敗。

解決方式

JDK 9

這類問題非常常見,如大促場景,服務器 CPU 瞬間升高就會出現以上問題。

那么如何解決呢?其實 JDK 的開發大佬們早有研究。在 JDK 9,CompletableFuture正式提供了orTimeoutcompleteTimeout方法,來準確實現異步超時控制。

public CompletableFuture orTimeout(long timeout, TimeUnit unit) {
    if (unit == null)
        throw new NullPointerException();
    if (result == null)
        whenComplete(new Canceller(Delayer.delay(new Timeout(this), timeout, unit)));
    return this;
}

JDK 9orTimeout其實現原理是通過一個定時任務,在給定時間之后拋出異常。如果任務在指定時間內完成,則取消拋異常的操作。

以上代碼我們按執行順序來看下:

首先執行new Timeout(this)

static final class Timeout implements Runnable {
    final CompletableFuture f;
    Timeout(CompletableFuture f) { this.f = f; }
    public void run() {
        if (f != null && !f.isDone())
            // 拋出超時異常
            f.completeExceptionally(new TimeoutException());
    }
}

通過源碼可以看到,Timeout是一個實現 Runnable 的類,run()方法負責給傳入的異步任務通過completeExceptionallyCAS 賦值異常,將任務標記為異常完成。

那么誰來觸發這個run()方法呢?我們看下Delayer的實現。

static final class Delayer {
    static ScheduledFuture delay(Runnable command, long delay,
                                    TimeUnit unit) {
        // 到時間觸發 command 任務
        return delayer.schedule(command, delay, unit);
    }
    static final class DaemonThreadFactory implements ThreadFactory {
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r);
            t.setDaemon(true);
            t.setName("CompletableFutureDelayScheduler");
            return t;
        }
    }
    static final ScheduledThreadPoolExecutor delayer;
    static {
        (delayer = new ScheduledThreadPoolExecutor(
            1, new DaemonThreadFactory())).
            setRemoveOnCancelPolicy(true);
    }
}

Delayer其實就是一個單例定時調度器,Delayer.delay(new Timeout(this), timeout, unit)通過ScheduledThreadPoolExecutor實現指定時間后觸發Timeoutrun()方法。

到這里就已經實現了超時拋出異常的操作。但當任務完成時,就沒必要觸發Timeout了。因此我們還需要實現一個取消邏輯。

static final class Canceller implements BiConsumer {
    final Future f;
    Canceller(Future f) { this.f = f; }
    public void accept(Object ignore, Throwable ex) {
        if (ex == null && f != null && !f.isDone())
        // 3 未觸發拋異常任務則取消
            f.cancel(false);
    }
}

當任務執行完成,或者任務執行異常時,我們也就沒必要拋出超時異常了。因此我們可以把delayer.schedule(command, delay, unit)返回的定時超時任務取消,不再觸發Timeout。 當我們的異步任務完成,并且定時超時任務未完成的時候,就是我們取消的時機。因此我們可以通過whenComplete(BiConsumer action)來完成。

Canceller就是一個BiConsumer的實現。其持有了delayer.schedule(command, delay, unit)返回的定時超時任務,accept(Object ignore, Throwable ex)實現了定時超時任務未完成后,執行cancel(boolean mayInterruptIfRunning)取消任務的操作。

JDK 8

如果我們使用的是 JDK 9 或以上,我們可以直接用 JDK 的實現來完成異步超時操作。那么 JDK 8 怎么辦呢?

其實我們也可以根據上述邏輯簡單實現一個工具類來輔助。

以下是我們營銷自己的工具類以及用法,貼出來給大家作為參考,大家也可以自己寫的更優雅一些~

調用方式:

CompletableFutureExpandUtils.orTimeout(異步任務, 超時時間, 時間單位);

工具類源碼:

package com.jd.jr.market.reduction.util;
import com.jdpay.market.common.exception.UncheckedException;
import java.util.concurrent.*;
import java.util.function.BiConsumer;
/**
 * CompletableFuture 擴展工具
 *
 * @author zhangtianci7
 */
public class CompletableFutureExpandUtils {
    /**
     * 如果在給定超時之前未完成,則異常完成此 CompletableFuture 并拋出 {@link TimeoutException} 。
     *
     * @param timeout 在出現 TimeoutException 異常完成之前等待多長時間,以 {@code unit} 為單位
     * @param unit    一個 {@link TimeUnit},結合 {@code timeout} 參數,表示給定粒度單位的持續時間
     * @return 入參的 CompletableFuture
     */
    public static  CompletableFuture orTimeout(CompletableFuture future, long timeout, TimeUnit unit) {
        if (null == unit) {
            throw new UncheckedException("時間的給定粒度不能為空");
        }
        if (null == future) {
            throw new UncheckedException("異步任務不能為空");
        }
        if (future.isDone()) {
            return future;
        }
        return future.whenComplete(new Canceller(Delayer.delay(new Timeout(future), timeout, unit)));
    }
    /**
     * 超時時異常完成的操作
     */
    static final class Timeout implements Runnable {
        final CompletableFuture future;
        Timeout(CompletableFuture future) {
            this.future = future;
        }
        public void run() {
            if (null != future && !future.isDone()) {
                future.completeExceptionally(new TimeoutException());
            }
        }
    }
    /**
     * 取消不需要的超時的操作
     */
    static final class Canceller implements BiConsumer {
        final Future future;
        Canceller(Future future) {
            this.future = future;
        }
        public void accept(Object ignore, Throwable ex) {
            if (null == ex && null != future && !future.isDone()) {
                future.cancel(false);
            }
        }
    }
    /**
     * 單例延遲調度器,僅用于啟動和取消任務,一個線程就足夠
     */
    static final class Delayer {
        static ScheduledFuture delay(Runnable command, long delay, TimeUnit unit) {
            return delayer.schedule(command, delay, unit);
        }
        static final class DaemonThreadFactory implements ThreadFactory {
            public Thread newThread(Runnable r) {
                Thread t = new Thread(r);
                t.setDaemon(true);
                t.setName("CompletableFutureExpandUtilsDelayScheduler");
                return t;
            }
        }
        static final ScheduledThreadPoolExecutor delayer;
        static {
            delayer = new ScheduledThreadPoolExecutor(1, new DaemonThreadFactory());
            delayer.setRemoveOnCancelPolicy(true);
        }
    }
}

參考資料JEP 266: JDK 9 并發包更新提案

以上就是Java CompletableFuture 異步超時實現深入研究的詳細內容,更多關于Java CompletableFuture 異步超時的資料請關注腳本之家其它相關文章!

關鍵詞:

相關內容

消費
產業
寧波首次設立標準創新貢獻獎管理辦法 最高獎勵50萬元|天天資訊 上證報中國證券網訊據寧波發布消息,從寧波市市場監管局獲悉,《寧波市
北京:老舊小區改造 如何落實“人民城市為人民”? 北京:老舊小區改造如何落實“人民城市為人民”?
TikTok Shop馬來西亞更新基于購買的獎勵商品上架要求 TikTokShop馬來西亞站宣布從2023年7月4日起,賣家在上架基于購買的獎勵
全球消息!陸豐警方:在加油站點燃摩托車油箱并阻止滅火,一男子被刑拘 經查,嫌疑人吳某因付費問題與加油站工作人員發生口角,期間情緒失控,
基金
老司机91精品网站在线观看_久久69精品久久久久久hb_成人欧美在线观看_免费一级日本c片完整版

      色综合中文字幕| 国产精品视频在线看| 一区二区三区毛片| 国产制服丝袜一区| 国产日韩v精品一区二区| 天天做天天摸天天爽国产一区| 欧美成人伊人久久综合网| 亚洲精品成人a在线观看| 国产一区二区久久| 国产精品久久久久久久久久久免费看 | 亚洲精品一线二线三线| 亚洲高清免费视频| 成人高清免费在线播放| 亚洲一区二区三区在线| 精品国产一区二区三区四区四 | 91首页免费视频| 香蕉加勒比综合久久 | 99国产精品久久久| 亚洲成人动漫在线观看| 国产欧美日韩中文久久| 黄一区二区三区| 亚洲欧洲日韩一区二区三区| 欧美一区二区黄色| 亚洲va欧美va国产va天堂影院| 91视频在线观看免费| 在线观看欧美日本| 亚洲欧美视频在线观看| 成人av片在线观看| 91成人在线精品| 中文字幕亚洲视频| 福利91精品一区二区三区| 亚洲国产精品一区二区尤物区| 久久精品人人爽人人爽| 精品综合久久久久久8888| 最新欧美精品一区二区三区| 精品嫩草影院久久| 久久国产精品第一页| 亚洲精品一二三四区| 久久精品日韩一区二区三区| 国产主播一区二区三区| 亚洲一区二区三区四区五区中文| 久久精品网站免费观看| 国产激情一区二区三区| 色综合天天综合色综合av| 日韩理论电影院| 26uuu欧美| 欧美一区二区私人影院日本| 青青草成人在线观看| 中文字幕一区av| 久久精子c满五个校花| 国产成人午夜精品5599| 色综合色狠狠天天综合色| 一区二区在线免费| 亚洲国产精品二十页| 欧美精品一区二区三区高清aⅴ | 亚洲精品视频在线看| 久久九九全国免费| 欧美成人一区二区三区片免费 | 成人综合婷婷国产精品久久蜜臀 | 成人av在线资源网站| 欧美日韩卡一卡二| 日本一区中文字幕| 亚洲综合成人在线视频| 亚洲三级理论片| 国产欧美日韩另类一区| 久久精品在这里| 成人高清免费在线播放| 69堂亚洲精品首页| 狠狠色伊人亚洲综合成人| 一本大道综合伊人精品热热| 亚洲宅男天堂在线观看无病毒| 国产精品另类一区| 中文乱码免费一区二区| 91美女蜜桃在线| 精品福利av导航| 成人一区二区三区在线观看| 欧美美女bb生活片| 九色综合狠狠综合久久| 在线中文字幕一区二区| 婷婷成人激情在线网| 一级女性全黄久久生活片免费| 亚洲欧美日韩久久| 国产精品家庭影院| 《视频一区视频二区| 国产精品进线69影院| 成人免费视频在线观看| 国产精品视频你懂的| 国产精品欧美一级免费| 国产精品一区二区黑丝| 一区二区三区久久久| 亚洲美女视频在线观看| 国产精品久久毛片a| 国产精品大尺度| 中文字幕第一区综合| 国产精品高潮久久久久无| xf在线a精品一区二区视频网站| 精品国一区二区三区| 91玉足脚交白嫩脚丫在线播放| 欧美精品一区二区三区蜜桃| 99精品黄色片免费大全| 久久精品在这里| 国产欧美日韩综合| 国产精品卡一卡二| 国产精品久久久久9999吃药| 亚洲男帅同性gay1069| 亚洲丝袜制服诱惑| 亚洲国产综合视频在线观看| 亚洲h动漫在线| 美女一区二区久久| 欧美色区777第一页| 国产精品99久久久久久宅男| 日韩欧美国产一区二区三区| 99国产精品国产精品久久| 国产性做久久久久久| 国产精品你懂的在线| 一区二区欧美在线观看| 性久久久久久久久久久久| 久久国产综合精品| 91精品国产欧美一区二区| 成人免费视频视频在线观看免费| 精品成人一区二区三区四区| 国产婷婷色一区二区三区四区| 亚洲丝袜制服诱惑| 亚洲成a人在线观看| 极品少妇xxxx精品少妇偷拍| 日韩天堂在线观看| 久久久精品一品道一区| 亚洲精品网站在线观看| 污片在线观看一区二区| 国模一区二区三区白浆| 精品国产在天天线2019| 国产精品美日韩| 亚洲成av人片一区二区梦乃| 欧美少妇一区二区| a级精品国产片在线观看| 日本一区二区三区dvd视频在线| 亚洲色图一区二区三区| 青娱乐精品视频| 日韩视频一区二区三区| 欧美国产日本视频| 午夜亚洲国产au精品一区二区| 欧美日韩一区二区三区四区| 97久久精品人人做人人爽50路| 亚洲三级电影网站| 色噜噜偷拍精品综合在线| www.亚洲在线| 亚洲激情自拍偷拍| 欧美少妇一区二区| 91免费版pro下载短视频| 一区二区三区在线播放| 欧美色偷偷大香| 久久久亚洲精品石原莉奈| 艳妇臀荡乳欲伦亚洲一区| 欧美日韩视频在线观看一区二区三区 | 337p日本欧洲亚洲大胆色噜噜| 亚洲欧美综合另类在线卡通| 免费观看91视频大全| 精品国产sm最大网站| 亚洲精品成人悠悠色影视| 国产在线精品免费| 国产精品色噜噜| 色婷婷亚洲综合| 91色在线porny| 亚洲一二三区在线观看| 欧美一区二区三区四区久久| 国产精品国产三级国产普通话三级 | 中文字幕中文字幕一区| 在线免费亚洲电影| 久久婷婷国产综合精品青草| 日日夜夜免费精品视频| 337p粉嫩大胆色噜噜噜噜亚洲| 一片黄亚洲嫩模| 波多野洁衣一区| 亚洲v精品v日韩v欧美v专区| 精品少妇一区二区三区视频免付费| 亚洲乱码国产乱码精品精可以看 | 国产精品一线二线三线| 最新国产の精品合集bt伙计| 欧美美女直播网站| 一色屋精品亚洲香蕉网站| 国产精品一区二区你懂的| 亚洲欧美色图小说| 69久久99精品久久久久婷婷| 亚洲三级电影网站| 成人激情小说网站| 亚洲成人免费在线观看| 26uuu亚洲综合色欧美| 色呦呦一区二区三区| 国产午夜精品一区二区三区视频| 蜜桃av噜噜一区二区三区小说| 亚洲国产精品二十页| 欧美日韩一二三区| 亚洲欧美一区二区久久| av中文字幕一区| 日韩黄色免费网站| 亚洲欧洲www| 日韩欧美成人激情| 色综合天天综合网天天狠天天| 欧美激情在线一区二区| 国产精品综合网|