最近在開發(fā) 延保服務(wù) 頻道頁(yè)時(shí),為了提高查詢效率,使用到了多線程技術(shù)。為了對(duì)多線程方案設(shè)計(jì)有更加充分的了解,在業(yè)余時(shí)間讀完了《圖解 Java 多線程設(shè)計(jì)模式》這本書,覺(jué)得收獲良多。本篇文章將介紹其中提到的 Future 模式,以及在實(shí)際業(yè)務(wù)開發(fā)中對(duì)該模式的應(yīng)用,而這些內(nèi)容對(duì)于本書來(lái)說(shuō)只是冰山一角,還是推薦大家有時(shí)間去閱讀原書。
1. Future 模式:“先給您提貨單”
我們先來(lái)看一個(gè)場(chǎng)景:假如我們?nèi)サ案獾曩I蛋糕,下單后,店員會(huì)遞給我們提貨單并告知“請(qǐng)您傍晚來(lái)取蛋糕”。到了傍晚我們拿著提貨單去取蛋糕,店員會(huì)先和我們說(shuō)“您的蛋糕已經(jīng)做好了”,然后將蛋糕拿給我們。
如果將下單蛋糕到取蛋糕的過(guò)程抽象成一個(gè)方法的話,那么意味著這個(gè)方法需要花很長(zhǎng)的時(shí)間才能獲取執(zhí)行結(jié)果,與其一直等待結(jié)果,不如先拿著一張“提貨單”,到我們需要取貨的時(shí)候,再通過(guò)它去取,而獲取“提貨單”的過(guò)程是幾乎不耗時(shí)的,而這個(gè)提貨單對(duì)象就被稱為 Future,后續(xù)便可以通過(guò)它來(lái)獲取方法的返回值。用 Java 來(lái)表示這個(gè)過(guò)程的話,需要使用到 FutureTask 和 Callable 兩個(gè)類,如下:
public class Example {
public static void main(String[] args) throws InterruptedException, ExecutionException {
// 預(yù)定蛋糕,并定義“提貨單”
System.out.println("我:預(yù)定蛋糕");
FutureTask future = new FutureTask?>(() -> {
System.out.println("店員:請(qǐng)您傍晚來(lái)取蛋糕");
Thread.sleep(2000);
System.out.println("店員:您的蛋糕已經(jīng)做好了");
return "Holiland";
});
// 開始做蛋糕
new Thread(future).start();
// 去做其他事情
Thread.sleep(1000);
System.out.println("我:忙碌中...");
// 取蛋糕
System.out.println("我:取蛋糕 " + future.get());
}
}
// 運(yùn)行結(jié)果:
// 我:預(yù)定蛋糕
// 店員:請(qǐng)您傍晚來(lái)取蛋糕
// 我:忙碌中...
// 店員:您的蛋糕已經(jīng)做好了
// 我:取蛋糕 Holiland
方法的調(diào)用者可以將任務(wù)交給其他線程去處理,無(wú)需阻塞等待方法的執(zhí)行,這樣調(diào)用者便可以繼續(xù)執(zhí)行其他任務(wù),并能通過(guò) Future 對(duì)象獲取執(zhí)行結(jié)果。
它的運(yùn)行原理如下:創(chuàng)建 FutureTask 實(shí)例時(shí),Callable 對(duì)象會(huì)被傳遞給構(gòu)造函數(shù),當(dāng)線程調(diào)用 FutureTask 的 run 方法時(shí),Callable 對(duì)象的 call 方法也會(huì)被執(zhí)行。調(diào)用 call 方法的線程會(huì)同步地獲取結(jié)果,并通過(guò) FutureTask 的 set 方法來(lái)記錄結(jié)果對(duì)象,如果 call 方法執(zhí)行期間發(fā)生了異常,則會(huì)調(diào)用 setException 方法記錄異常。最后,通過(guò)調(diào)用 get 方法獲取方法的結(jié)果,注意這里可能會(huì)拋出方法執(zhí)行時(shí)產(chǎn)生的異常。
public void run() {
// ...
try {
// “提貨任務(wù)”
Callable c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
// 調(diào)用 callable 的 call 方法
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
// 捕獲并設(shè)置異常
setException(ex);
}
if (ran)
// 為結(jié)果賦值
set(result);
}
} finally {
// ...
}
}
protected void set(V v) {
if (STATE.compareAndSet(this, NEW, COMPLETING)) {
// 將結(jié)果賦值給 outcome 全局變量,供 get 時(shí)獲取
outcome = v;
// 修改狀態(tài)為 NORMAL
STATE.setRelease(this, NORMAL); // final state
finishCompletion();
}
}
protected void setException(Throwable t) {
if (STATE.compareAndSet(this, NEW, COMPLETING)) {
// 將異常賦值給 outcome 變量,供 get 時(shí)拋出
outcome = t;
// 修改狀態(tài)為 EXCEPTIONAL
STATE.setRelease(this, EXCEPTIONAL); // final state
finishCompletion();
}
}
public V get() throws InterruptedException, ExecutionException {
int s = state;
// 未完成時(shí)阻塞等一等
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}
private V report(int s) throws ExecutionException {
Object x = outcome;
// 正常結(jié)束的話能正常獲取到結(jié)果
if (s == NORMAL)
return (V)x;
// 否則會(huì)拋出異常,注意如果執(zhí)行中出現(xiàn)異常,調(diào)用 get 時(shí)會(huì)被拋出
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}
現(xiàn)在對(duì) Future 模式 已經(jīng)有了基本的了解:它通過(guò) Future 接口來(lái)表示未來(lái)的結(jié)果,實(shí)現(xiàn) 調(diào)用者與執(zhí)行者之間的解耦,提高系統(tǒng)的吞吐量和響應(yīng)速度,那在實(shí)踐中對(duì)該模式是如何使用的呢?
2. 對(duì) Future 模式的實(shí)踐
因?yàn)?延保服務(wù) 頻道頁(yè)訪問(wèn)量大且對(duì)接口性能要求較高,單線程處理并不能滿足性能要求,所以應(yīng)用了 Future 模式 來(lái)提高查詢效率,但是并沒(méi)有借助上文所述的 FutureTask 來(lái)實(shí)現(xiàn),而是使用了 CompletableFuture 工具類,它們的實(shí)現(xiàn)原理基本一致,但是后者提供的方法和對(duì) 鏈?zhǔn)?a target="_blank">編程 的支持使代碼更加簡(jiǎn)潔,實(shí)現(xiàn)更加容易(相關(guān) API 參考見(jiàn)文末)。
如下是使用 CompletableFuture 異步多線程查詢訂單列表的邏輯,根據(jù)配置的 pageNo 分多條線程查詢各頁(yè)的訂單數(shù)據(jù):
List result = new ArrayList?>();
// 并發(fā)查詢訂單列表
List>> futureList = new ArrayList?>();
try {
// 配置需要查詢的頁(yè)數(shù) pageNo,并發(fā)查詢不同頁(yè)碼的訂單
for (int i = 1; i <= pageNo; i++) {
int curPageNo = i;
CompletableFuture> future = CompletableFuture.supplyAsync(
() -> getOrderInfoList(userNo, curPageNo), threadPoolExecutor);
futureList.add(future);
}
// 等待所有線程處理完畢,并封裝結(jié)果值
for (CompletableFuture> future : futureList) {
result.addAll(future.get());
}
} catch (Exception e) {
log.error("并發(fā)查詢用戶訂單信息異常", e);
}
這段代碼中對(duì)異常的處理能進(jìn)行優(yōu)化:第 15 行代碼,如果某條線程查詢訂單列表時(shí)發(fā)生異常,那么在調(diào)用 get 方法時(shí)會(huì)拋出該異常,被 catch 后返回空結(jié)果,即使有其他線程查詢成功,這些訂單結(jié)果值也會(huì)被忽略掉,可以針對(duì)這一點(diǎn)進(jìn)行優(yōu)化,如下:
List result = new ArrayList?>();
// 并發(fā)查詢訂單列表
List>> futureList = new ArrayList?>();
try {
// 配置需要查詢的頁(yè)數(shù) pageNo,并發(fā)查詢不同頁(yè)碼的訂單
for (int i = 1; i <= pageNo; i++) {
int curPageNo = i;
CompletableFuture> future = CompletableFuture
.supplyAsync(() -> getOrderInfoList(userNo, curPageNo), threadPoolExecutor)
// 添加異常處理
.exceptionally(e -> {
log.error("查詢用戶訂單信息異常", e);
return Collections.emptyList();
});
futureList.add(future);
}
// 等待所有線程處理完畢,并封裝結(jié)果值
for (CompletableFuture> future : futureList) {
result.addAll(future.get());
}
} catch (Exception e) {
log.error("并發(fā)查詢用戶訂單信息異常", e);
}
優(yōu)化后針對(duì)查詢發(fā)生異常的任務(wù)打印異常日志,并返回空集合,這樣即使單線程查詢失敗,也不會(huì)影響到其他線程查詢成功的結(jié)果。
CompletableFuture 還提供了 allOf 方法,它返回的 CompletableFuture 對(duì)象在所有 CompletableFuture 執(zhí)行完成時(shí)完成,相比于對(duì)每個(gè)任務(wù)都調(diào)用 get 阻塞等待任務(wù)完成的實(shí)現(xiàn)可讀性更好,改造后代碼如下:
List result = new ArrayList?>();
// 并發(fā)查詢訂單列表
CompletableFuture>[] futures = new CompletableFuture[pageNo];
// 配置需要查詢的頁(yè)數(shù) pageNo,并發(fā)查詢不同頁(yè)碼的訂單
for (int i = 1; i <= pageNo; i++) {
int curPageNo = i;
CompletableFuture> future = CompletableFuture
.supplyAsync(() -> getOrderInfoList(userNo, curPageNo), threadPoolExecutor)
// 添加異常處理
.exceptionally(e -> {
log.error("查詢用戶訂單信息異常", e);
return Collections.emptyList();
});
futures[i - 1] = future;
}
try {
// 等待所有線程處理完畢
CompletableFuture.allOf(futures).get();
for (CompletableFuture> future : futures) {
List orderInfoList = future.get();
if (CollectionUtils.isEmpty(orderInfoList)) {
result.addAll(orderInfoList);
}
}
} catch (Exception e) {
log.error("處理用戶訂單結(jié)果信息異常", e);
}
Tips:
CompletableFuture的設(shè)計(jì)初衷是支持異步編程,所以應(yīng)盡量避免在CompletableFuture鏈中使用get()/join()方法,因?yàn)檫@些方法會(huì)阻塞當(dāng)前線程直到CompletableFuture完成,應(yīng)該在必須使用該結(jié)果值時(shí)才調(diào)用它們。
相關(guān)的模式:命令模式
命令模式能將操作的調(diào)用者和執(zhí)行者解耦,它能很容易的與 Future 模式 結(jié)合,以查詢訂單的任務(wù)為例,我們可以將該任務(wù)封裝為“命令”對(duì)象的形式,執(zhí)行時(shí)為每個(gè)線程提交一個(gè)命令,實(shí)現(xiàn)解耦并提高擴(kuò)展性。在命令模式中,命令對(duì)象需要 支持撤銷和重做,那么這便在查詢出現(xiàn)異常時(shí),提供了補(bǔ)償處理的可能,命令模式類圖關(guān)系如下:

3.《圖解Java多線程設(shè)計(jì)模式》書籍推薦
我覺(jué)得本書算得上是一本老書:05 年出版的基于 JDK1.5 的Java多線程書籍,相比于目前我們常用的 JDK1.8 和時(shí)髦的 JDK21,在讀之前總會(huì)讓人覺(jué)得有一種過(guò)時(shí)的感覺(jué)。但是當(dāng)我讀完時(shí),發(fā)現(xiàn)其中的模式能對(duì)應(yīng)上代碼中的處理邏輯:對(duì) CompletableFuture 的使用正對(duì)應(yīng)了其中的 Future 模式(異步獲取其他線程的執(zhí)行結(jié)果)等等,所以我覺(jué)得模式的應(yīng)用不會(huì)局限于技術(shù)的新老,它是在某種情況下,研發(fā)人員共識(shí)或通用的解決方案,在知曉某種模式,采用已有的技術(shù)實(shí)現(xiàn)它是容易的,而反過(guò)來(lái)在只掌握技術(shù)去探索模式是困難且沒(méi)有方向的。
同時(shí),我也在考慮一個(gè)問(wèn)題:對(duì)于新人學(xué)習(xí)多線程技術(shù)來(lái)說(shuō),究竟適不適合直接從模式入門呢?因?yàn)槲覍?duì)設(shè)計(jì)模式有了比較多的實(shí)踐經(jīng)驗(yàn),所以對(duì)“模式”相關(guān)的內(nèi)容足夠敏感,如果新人沒(méi)有這些經(jīng)驗(yàn)的話,這對(duì)他們來(lái)說(shuō)會(huì)不會(huì)更像是一個(gè)個(gè)知識(shí)點(diǎn)的堆砌呢?好在的是,本書除了模式相關(guān)的內(nèi)容,對(duì)基礎(chǔ)知識(shí)也做足了鋪墊,而且提出的關(guān)于多線程編程的思考點(diǎn)也是非常值得參考和學(xué)習(xí)的,以線程互斥和協(xié)同為例,書中談到:在對(duì)線程進(jìn)行互斥處理時(shí)需要考慮 “要保護(hù)的東西是什么”,這樣便能夠 清晰的確定鎖的粒度;對(duì)于線程的協(xié)同,書中提到的是需要考慮 “放在中間的東西是什么”,直接的拋出這個(gè)觀點(diǎn)是不容易理解的,“中間的東西”是在多線程的 生產(chǎn)者和消費(fèi)者模式 中提出的,部分線程負(fù)責(zé)生產(chǎn),生產(chǎn)完成后將對(duì)象放在“中間”,部分線程負(fù)責(zé)消費(fèi),消費(fèi)時(shí)取的便是“中間”的對(duì)象,而合理規(guī)劃這些中間的東西便能 消除生產(chǎn)者和消費(fèi)者之間的速度差異,提高系統(tǒng)的吞吐量和響應(yīng)速度。而再深入考慮這兩個(gè)角度時(shí),線程的互斥和協(xié)同其實(shí)是內(nèi)外統(tǒng)一的:為了讓線程協(xié)調(diào)運(yùn)行,必須執(zhí)行互斥處理,以防止共享的內(nèi)容被破壞,而線程的互斥是為了線程的協(xié)調(diào)運(yùn)行才進(jìn)行的必要操作。
附:CompletableFuture 常用 API
使用 supplyAsync 方法異步執(zhí)行任務(wù),并返回 CompletableFuture 對(duì)象
如下代碼所示,調(diào)用 CompletableFuture.supplyAsync 靜態(tài)方法異步執(zhí)行查詢邏輯,并返回一個(gè)新的 CompletableFuture 對(duì)象
CompletableFuture> future = CompletableFuture.supplyAsync(() -> doQuery(), executor);
使用 join 方法阻塞獲取完成結(jié)果
如下代碼所示,在封裝結(jié)果前,調(diào)用 join 方法阻塞等待獲取結(jié)果
futureList.forEach(CompletableFuture::join);
它與 get 方法的主要區(qū)別在于,join 方法拋出的是未經(jīng)檢查的異常 CompletionException,并將原始異常作為其原因,這意味著我們可以不需要在方法簽名中聲明它或在調(diào)用 join 方法的地方進(jìn)行異常處理,而 get 方法會(huì)拋出 InterruptedException 和 ExecutionException 異常,我們必須對(duì)它進(jìn)行處理,get 方法源碼如下:
public T get() throws InterruptedException, ExecutionException {
Object r;
if ((r = result) == null)
r = waitingGet(true);
return (T) reportGet(r);
}
用 thenApply(Function) 和 thenAccept(Consumer) 等回調(diào)函數(shù)處理結(jié)果
如下是使用 thenApply() 方法對(duì) CompletableFuture 的結(jié)果進(jìn)行轉(zhuǎn)換的操作:
CompletableFuture future = CompletableFuture.supplyAsync(() -> "Hello")
.thenApply(greeting -> greeting + " World");
使用 exceptionally() 處理 CompletableFuture 中的異常
CompletableFuture 提供了exceptionally() 方法來(lái)處理異常,這是一個(gè)非常重要的步驟。如果在 CompletableFuture 的運(yùn)行過(guò)程中拋出異常,那么這個(gè)異常會(huì)被傳遞到最終的結(jié)果中。如果沒(méi)有適當(dāng)?shù)漠惓L幚?,那么在調(diào)用 get() 或 join() 方法時(shí)可能會(huì)拋出異常。
CompletableFuture future = CompletableFuture.supplyAsync(() -> {
if (true) {
throw new RuntimeException("Exception occurred");
}
return "Hello, World!";
}).exceptionally(e -> "An error occurred");
使用 allOf() 和 anyOf() 處理多個(gè) CompletableFuture
如果有多個(gè) CompletableFuture 需要處理,可以使用 CompletableFuture.allOf() 或者 CompletableFuture.anyOf()。allOf() 在所有的 CompletableFuture 完成時(shí)完成,而 anyOf() 則會(huì)在任意一個(gè) CompletableFuture 完成時(shí)完成。
complete()、completeExceptionally()、cancel() 方法
CompletableFuture 的運(yùn)行是在調(diào)用了 complete()、completeExceptionally()、cancel() 等方法后才會(huì)被標(biāo)記為完成。如果沒(méi)有正確地完成 CompletableFuture,那么在調(diào)用 get() 方法時(shí)可能會(huì)永久阻塞。這三個(gè)方法在 Java 并發(fā)編程中有著重要的應(yīng)用。以下是這三個(gè)方法的常見(jiàn)使用場(chǎng)景:
complete(T value): 此方法用于顯式地完成一個(gè) CompletableFuture,并設(shè)置它的結(jié)果值。這在你需要在某個(gè)計(jì)算完成時(shí),手動(dòng)設(shè)置 CompletableFuture 的結(jié)果值的場(chǎng)景中非常有用。例如,你可能在一個(gè)異步操作完成時(shí),需要設(shè)置 CompletableFuture 的結(jié)果值。
CompletableFuture future = new CompletableFuture?>();
// Some asynchronous operation
future.complete("Operation Result");
completeExceptionally(Throwable ex): 此方法用于顯式地以異常完成一個(gè) CompletableFuture。這在你需要在某個(gè)計(jì)算失敗時(shí),手動(dòng)設(shè)置 CompletableFuture 的異常的場(chǎng)景中非常有用。例如,你可能在一個(gè)異步操作失敗時(shí),需要設(shè)置 CompletableFuture 的異常。
CompletableFuture future = new CompletableFuture?>();
// Some asynchronous operation
future.completeExceptionally(new RuntimeException("Operation Failed"));
cancel(boolean mayInterruptIfRunning): 此方法用于取消與 CompletableFuture 關(guān)聯(lián)的計(jì)算。這在你需要取消一個(gè)長(zhǎng)時(shí)間運(yùn)行的或者不再需要的計(jì)算的場(chǎng)景中非常有用。例如,你可能在用戶取消操作或者超時(shí)的情況下,需要取消 CompletableFuture 的計(jì)算。
CompletableFuture future = CompletableFuture.supplyAsync(() -> {
// Long running operation
});
// Some condition
future.cancel(true);
這些方法都是線程安全的,可以從任何線程中調(diào)用。
使用 thenCompose() 處理嵌套的 CompletableFuture
如果在處理 CompletableFuture 的結(jié)果時(shí)又創(chuàng)建了新的CompletableFuture,那么就會(huì)產(chǎn)生嵌套的 CompletableFuture。這時(shí)可以使用 thenCompose() 方法來(lái)避免 CompletableFuture 的嵌套,如下代碼所示:
CompletableFuture completableFuture
= CompletableFuture.supplyAsync(() -> "Hello")
.thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " World"));
使用 thenCombine() 處理兩個(gè) CompletableFuture 的結(jié)果
CompletableFuture completableFuture
= CompletableFuture.supplyAsync(() -> "Hello")
.thenCombine(CompletableFuture.supplyAsync(() -> " World"), (s1, s2) -> s1 + s2);
審核編輯 黃宇
-
JAVA
+關(guān)注
關(guān)注
20文章
3001瀏覽量
116419 -
API
+關(guān)注
關(guān)注
2文章
2368瀏覽量
66752 -
多線程
+關(guān)注
關(guān)注
0文章
279瀏覽量
21025
發(fā)布評(píng)論請(qǐng)先 登錄
Cortex-M3工作模式及異常
【瑞薩RA × Zephyr評(píng)測(cè)】多線程和看門狗
解析Linux的進(jìn)程、線程和協(xié)程
多線程的系統(tǒng)
Linux多線程對(duì)比單線程的優(yōu)勢(shì)
rt-thread studio 如何進(jìn)行多線程編譯?
rtt中建兩個(gè)線程a和b,怎么確保線程a執(zhí)行完立刻切到線程b?
【HZ-T536開發(fā)板免費(fèi)體驗(yàn)】—— linux創(chuàng)建線程
【RA4E2開發(fā)板評(píng)測(cè)】LED1及LED2輪流點(diǎn)亮并同時(shí)亮8秒,體驗(yàn)FreeRTOS多線程
從底層解讀labview的TDMS高級(jí)異步寫入的工作原理
緩存之美:萬(wàn)文詳解 Caffeine 實(shí)現(xiàn)原理(上)
多線程的安全注意事項(xiàng)
工控一體機(jī)多線程任務(wù)調(diào)度優(yōu)化:聚徽分享破解工業(yè)復(fù)雜流程高效協(xié)同密碼
一種實(shí)時(shí)多線程VSLAM框架vS-Graphs介紹
從多線程設(shè)計(jì)模式到對(duì) CompletableFuture 的應(yīng)用
評(píng)論