CPU密集型任務(wù)開發(fā)指導(dǎo)

2024-02-16 13:45 更新

CPU密集型任務(wù)是指需要占用系統(tǒng)資源處理大量計算能力的任務(wù),需要長時間運行,這段時間會阻塞線程其它事件的處理,不適宜放在主線程進行。例如圖像處理、視頻編碼、數(shù)據(jù)分析等。

基于多線程并發(fā)機制處理CPU密集型任務(wù)可以提高CPU利用率,提升應(yīng)用程序響應(yīng)速度。

當進行一系列同步任務(wù)時,推薦使用Worker;而進行大量或調(diào)度點較為分散的獨立任務(wù)時,不方便使用8個Worker去做負載管理,推薦采用TaskPool。接下來將以圖像直方圖處理以及后臺長時間的模型預(yù)測任務(wù)分別進行舉例。

使用TaskPool進行圖像直方圖處理

  1. 實現(xiàn)圖像處理的業(yè)務(wù)邏輯。

  2. 數(shù)據(jù)分段,將各段數(shù)據(jù)通過不同任務(wù)的執(zhí)行完成圖像處理。

    創(chuàng)建Task,通過execute()執(zhí)行任務(wù),在當前任務(wù)結(jié)束后,會將直方圖處理結(jié)果同時返回。

  3. 結(jié)果數(shù)組匯總處理。

  1. import taskpool from '@ohos.taskpool';
  2. @Concurrent
  3. function imageProcessing(dataSlice: ArrayBuffer) {
  4. // 步驟1: 具體的圖像處理操作及其他耗時操作
  5. return dataSlice;
  6. }
  7. function histogramStatistic(pixelBuffer: ArrayBuffer) {
  8. // 步驟2: 分成三段并發(fā)調(diào)度
  9. let number = pixelBuffer.byteLength / 3;
  10. let buffer1 = pixelBuffer.slice(0, number);
  11. let buffer2 = pixelBuffer.slice(number, number * 2);
  12. let buffer3 = pixelBuffer.slice(number * 2);
  13. let task1 = new taskpool.Task(imageProcessing, buffer1);
  14. let task2 = new taskpool.Task(imageProcessing, buffer2);
  15. let task3 = new taskpool.Task(imageProcessing, buffer3);
  16. taskpool.execute(task1).then((ret: ArrayBuffer[]) => {
  17. // 步驟3: 結(jié)果處理
  18. });
  19. taskpool.execute(task2).then((ret: ArrayBuffer[]) => {
  20. // 步驟3: 結(jié)果處理
  21. });
  22. taskpool.execute(task3).then((ret: ArrayBuffer[]) => {
  23. // 步驟3: 結(jié)果處理
  24. });
  25. }
  26. @Entry
  27. @Component
  28. struct Index {
  29. @State message: string = 'Hello World'
  30. build() {
  31. Row() {
  32. Column() {
  33. Text(this.message)
  34. .fontSize(50)
  35. .fontWeight(FontWeight.Bold)
  36. .onClick(() => {
  37. let data: ArrayBuffer;
  38. histogramStatistic(data);
  39. })
  40. }
  41. .width('100%')
  42. }
  43. .height('100%')
  44. }
  45. }

使用Worker進行長時間數(shù)據(jù)分析

本文通過某地區(qū)提供的房價數(shù)據(jù)訓(xùn)練一個簡易的房價預(yù)測模型,該模型支持通過輸入房屋面積和房間數(shù)量去預(yù)測該區(qū)域的房價,模型需要長時間運行,房價預(yù)測需要使用前面的模型運行結(jié)果,因此需要使用Worker。

  1. DevEco Studio提供了Worker創(chuàng)建的模板,新建一個Worker線程,例如命名為“MyWorker”。

  2. 在主線程中通過調(diào)用ThreadWorker的constructor()方法創(chuàng)建Worker對象,當前線程為宿主線程。

    1. import worker from '@ohos.worker';
    2. const workerInstance = new worker.ThreadWorker('entry/ets/workers/MyWorker.ts');
  3. 在宿主線程中通過調(diào)用onmessage()方法接收Worker線程發(fā)送過來的消息,并通過調(diào)用postMessage()方法向Worker線程發(fā)送消息。

    例如向Worker線程發(fā)送訓(xùn)練和預(yù)測的消息,同時接收Worker線程發(fā)送回來的消息。

    1. // 接收Worker子線程的結(jié)果
    2. workerInstance.onmessage = function(e) {
    3. // data:Worker線程發(fā)送的信息
    4. let data = e.data;
    5. console.info('MyWorker.ts onmessage');
    6. }
    7. workerInstance.onerror = function (d) {
    8. // 接收Worker子線程的錯誤信息
    9. }
    10. // 向Worker子線程發(fā)送訓(xùn)練消息
    11. workerInstance.postMessage({ 'type': 0 });
    12. // 向Worker子線程發(fā)送預(yù)測消息
    13. workerInstance.postMessage({ 'type': 1, 'value': [90, 5] });
  4. 在MyWorker.ts文件中綁定Worker對象,當前線程為Worker線程。

    1. import worker, { ThreadWorkerGlobalScope, MessageEvents, ErrorEvent } from '@ohos.worker';
    2. let workerPort: ThreadWorkerGlobalScope = worker.workerPort;
  5. 在Worker線程中通過調(diào)用onmessage()方法接收宿主線程發(fā)送的消息內(nèi)容,并通過調(diào)用postMessage()方法向宿主線程發(fā)送消息。

    例如在Worker線程中定義預(yù)測模型及其訓(xùn)練過程,同時與主線程進行信息交互。

    1. import worker, { ThreadWorkerGlobalScope, MessageEvents, ErrorEvent } from '@ohos.worker';
    2. let workerPort: ThreadWorkerGlobalScope = worker.workerPort;
    3. // 定義訓(xùn)練模型及結(jié)果
    4. let result;
    5. // 定義預(yù)測函數(shù)
    6. function predict(x) {
    7. return result[x];
    8. }
    9. // 定義優(yōu)化器訓(xùn)練過程
    10. function optimize() {
    11. result = {};
    12. }
    13. // Worker線程的onmessage邏輯
    14. workerPort.onmessage = function (e: MessageEvents) {
    15. let data = e.data
    16. // 根據(jù)傳輸?shù)臄?shù)據(jù)的type選擇進行操作
    17. switch (data.type) {
    18. case 0:
    19. // 進行訓(xùn)練
    20. optimize();
    21. // 訓(xùn)練之后發(fā)送主線程訓(xùn)練成功的消息
    22. workerPort.postMessage({ type: 'message', value: 'train success.' });
    23. break;
    24. case 1:
    25. // 執(zhí)行預(yù)測
    26. const output = predict(data.value);
    27. // 發(fā)送主線程預(yù)測的結(jié)果
    28. workerPort.postMessage({ type: 'predict', value: output });
    29. break;
    30. default:
    31. workerPort.postMessage({ type: 'message', value: 'send message is invalid' });
    32. break;
    33. }
    34. }
  6. 在Worker線程中完成任務(wù)之后,執(zhí)行Worker線程銷毀操作。銷毀線程的方式主要有兩種:根據(jù)需要可以在宿主線程中對Worker線程進行銷毀;也可以在Worker線程中主動銷毀Worker線程。

    在宿主線程中通過調(diào)用onexit()方法定義Worker線程銷毀后的處理邏輯。

    1. // Worker線程銷毀后,執(zhí)行onexit回調(diào)方法
    2. workerInstance.onexit = function() {
    3. console.info("main thread terminate");
    4. }

    方式一:在宿主線程中通過調(diào)用terminate()方法銷毀Worker線程,并終止Worker接收消息。

    1. // 銷毀Worker線程
    2. workerInstance.terminate();

    方式二:在Worker線程中通過調(diào)用close()方法主動銷毀Worker線程,并終止Worker接收消息。

    1. // 銷毀線程
    2. workerPort.close();
以上內(nèi)容是否對您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號
微信公眾號

編程獅公眾號