HarmonyOS CPU与I/O密集型任务开发指导

news/2024/7/21 10:52:58 标签: 华为, HarmonyOS

一、CPU密集型任务开发指导

CPU密集型任务是指需要占用系统资源处理大量计算能力的任务,需要长时间运行,这段时间会阻塞线程其它事件的处理,不适宜放在主线程进行。例如图像处理、视频编码、数据分析等。

基于多线程并发机制处理CPU密集型任务可以提高CPU利用率,提升应用程序响应速度。

当进行一系列同步任务时,推荐使用Worker;而进行大量或调度点较为分散的独立任务时,不方便使用8个Worker去做负载管理,推荐采用TaskPool。接下来将以图像直方图处理以及后台长时间的模型预测任务分别进行举例。

使用TaskPool进行图像直方图处理

  1. 实现图像处理的业务逻辑。
  2. 数据分段,将各段数据通过不同任务的执行完成图像处理。

创建Task,通过execute()执行任务,在当前任务结束后,会将直方图处理结果同时返回。

     3.结果数组汇总处理。

import taskpool from '@ohos.taskpool';

@Concurrent
function imageProcessing(dataSlice: ArrayBuffer) {
  // 步骤1: 具体的图像处理操作及其他耗时操作
  return dataSlice;
}

function histogramStatistic(pixelBuffer: ArrayBuffer) {
  // 步骤2: 分成三段并发调度
  let number = pixelBuffer.byteLength / 3;
  let buffer1 = pixelBuffer.slice(0, number);
  let buffer2 = pixelBuffer.slice(number, number * 2);
  let buffer3 = pixelBuffer.slice(number * 2);

  let task1 = new taskpool.Task(imageProcessing, buffer1);
  let task2 = new taskpool.Task(imageProcessing, buffer2);
  let task3 = new taskpool.Task(imageProcessing, buffer3);

  taskpool.execute(task1).then((ret: ArrayBuffer[]) => {
    // 步骤3: 结果处理
  });
  taskpool.execute(task2).then((ret: ArrayBuffer[]) => {
    // 步骤3: 结果处理
  });
  taskpool.execute(task3).then((ret: ArrayBuffer[]) => {
    // 步骤3: 结果处理
  });
}

@Entry
@Component
struct Index {
  @State message: string = 'Hello World'

  build() {
    Row() {
      Column() {
        Text(this.message)
          .fontSize(50)
          .fontWeight(FontWeight.Bold)
          .onClick(() => {
            let data: ArrayBuffer;
            histogramStatistic(data);
          })
      }
      .width('100%')
    }
    .height('100%')
  }
}

使用Worker进行长时间数据分析

本文通过某地区提供的房价数据训练一个简易的房价预测模型,该模型支持通过输入房屋面积和房间数量去预测该区域的房价,模型需要长时间运行,房价预测需要使用前面的模型运行结果,因此需要使用Worker。

  1. DevEco Studio提供了Worker创建的模板,新建一个Worker线程,例如命名为“MyWorker”。

       2.在主线程中通过调用ThreadWorker的constructor()方法创建Worker对象,当前线程为宿主线程。

import worker from '@ohos.worker';

const workerInstance = new worker.ThreadWorker('entry/ets/workers/MyWorker.ts');

     3.在宿主线程中通过调用onmessage()方法接收Worker线程发送过来的消息,并通过调用postMessage()方法向Worker线程发送消息。

例如向Worker线程发送训练和预测的消息,同时接收Worker线程发送回来的消息。

// 接收Worker子线程的结果
workerInstance.onmessage = function(e) {
  // data:主线程发送的信息
  let data = e.data;
  console.info('MyWorker.ts onmessage');
  // 在Worker线程中进行耗时操作
}

workerInstance.onerror = function (d) {
  // 接收Worker子线程的错误信息
}

// 向Worker子线程发送训练消息
workerInstance.postMessage({ 'type': 0 });
// 向Worker子线程发送预测消息
workerInstance.postMessage({ 'type': 1, 'value': [90, 5] });

     4.在MyWorker.ts文件中绑定Worker对象,当前线程为Worker线程。

import worker, { ThreadWorkerGlobalScope, MessageEvents, ErrorEvent } from '@ohos.worker';

let workerPort: ThreadWorkerGlobalScope = worker.workerPort;

     5.在Worker线程中通过调用onmessage()方法接收宿主线程发送的消息内容,并通过调用postMessage()方法向宿主线程发送消息。

如在Worker线程中定义预测模型及其训练过程,同时与主线程进行信息交互。

import worker, { ThreadWorkerGlobalScope, MessageEvents, ErrorEvent } from '@ohos.worker';

let workerPort: ThreadWorkerGlobalScope = worker.workerPort;

// 定义训练模型及结果 
let result;

// 定义预测函数
function predict(x) {
  return result[x];
}

// 定义优化器训练过程
function optimize() {
  result = {};
}

// Worker线程的onmessage逻辑
workerPort.onmessage = function (e: MessageEvents) {
  let data = e.data
  // 根据传输的数据的type选择进行操作
  switch (data.type) {
    case 0:
    // 进行训练
      optimize();
    // 训练之后发送主线程训练成功的消息
      workerPort.postMessage({ type: 'message', value: 'train success.' });
      break;
    case 1:
    // 执行预测
      const output = predict(data.value);
    // 发送主线程预测的结果
      workerPort.postMessage({ type: 'predict', value: output });
      break;
    default:
      workerPort.postMessage({ type: 'message', value: 'send message is invalid' });
      break;
  }
}

       6.在Worker线程中完成任务之后,执行Worker线程销毁操作。销毁线程的方式主要有两种:根据需要可以在宿主线程中对Worker线程进行销毁;也可以在Worker线程中主动销毁Worker线程。

在宿主线程中通过调用onexit()方法定义Worker线程销毁后的处理逻辑。

// Worker线程销毁后,执行onexit回调方法
workerInstance.onexit = function() {
  console.info("main thread terminate");
}

方式一:在宿主线程中通过调用terminate()方法销毁Worker线程,并终止Worker接收消息。

// 销毁Worker线程
workerInstance.terminate();

方式二:在Worker线程中通过调用close()方法主动销毁Worker线程,并终止Worker接收消息。

// 销毁线程
workerPort.close();

二、I/O密集型任务开发指导

使用异步并发可以解决单次I/O任务阻塞的问题,但是如果遇到I/O密集型任务,同样会阻塞线程中其它任务的执行,这时需要使用多线程并发能力来进行解决。

I/O密集型任务的性能重点通常不在于CPU的处理能力,而在于I/O操作的速度和效率。这种任务通常需要频繁地进行磁盘读写、网络通信等操作。此处以频繁读写系统文件来模拟I/O密集型并发任务的处理。

  1. 定义并发函数,内部密集调用I/O能力。
import fs from '@ohos.file.fs';

// 定义并发函数,内部密集调用I/O能力
@Concurrent
async function concurrentTest(fileList: string[]) {
  // 写入文件的实现
  async function write(data, filePath) {
    let file = await fs.open(filePath, fs.OpenMode.READ_WRITE);
    await fs.write(file.fd, data);
    fs.close(file);
  }
  // 循环写文件操作
  for (let i = 0; i < fileList.length; i++) {
    write('Hello World!', fileList[i]).then(() => {
      console.info(`Succeeded in writing the file. FileList: ${fileList[i]}`);
    }).catch((err) => {
      console.error(`Failed to write the file. Code is ${err.code}, message is ${err.message}`)
      return false;
    })
  }
  return true;
}

       2.使用TaskPool执行包含密集I/O的并发函数:通过调用execute()方法执行任务,并在回调中进行调度结果处理。示例中的filePath1和filePath2的获取方式请参见获取应用文件路径

import taskpool from '@ohos.taskpool';

let filePath1 = ...; // 应用文件路径
let filePath2 = ...;

// 使用TaskPool执行包含密集I/O的并发函数
// 数组较大时,I/O密集型任务任务分发也会抢占主线程,需要使用多线程能力
taskpool.execute(concurrentTest, [filePath1, filePath2]).then((ret) => {
  // 调度结果处理
  console.info(`The result: ${ret}`);
})


http://www.niftyadmin.cn/n/5052788.html

相关文章

8. 基于消影点进行相机内参(主点)的标定

目录 1. ocam模型2. 消影点3. 基于消影点进行相机主点标定3.1 基于ocam模型的主点标定 感谢大家的阅读。 1. ocam模型 可以参考我的另一篇博客ocam模型。 这里简单提一下ocam模型&#xff1a; 这个模型将中心折反射相机和鱼眼相机统一在一个通用模型下&#xff0c;也称为泰勒模…

目标检测性能评价指标

文章目录 一.常见的模型评价术语True positives(TP)False positives(FP)False negatives(FN)True negatives(TN)P-R curveP-R曲线APmAPIOU&#xff08;交并比&#xff09;NMS(非极大抑制)检测速度 一.常见的模型评价术语 现在假设我们的分类目标只有两类&#xff0c;计为正例&…

2023 年解锁网络安全即服务

在当今快速发展的数字世界中&#xff0c;强大的网络安全机制的重要性怎么强调都不为过。对于越来越多地发现自己成为网络威胁焦点的小型企业来说尤其如此。 那么&#xff0c;“网络安全即服务”到底是什么&#xff1f;为什么它对小型企业至关重要&#xff1f; 网络安全即服务…

继苹果、联发科后,传高通下一代5G芯片将由台积电以3纳米代工

台积电3纳米又有重量级客户加入。市场传出&#xff0c;继苹果、联发科之后&#xff0c;手机芯片大厂高通下一代5G旗舰芯片也将交由台积电以3纳米生产&#xff0c;最快将于10月下旬发表&#xff0c;成为台积电3纳米第三家客户。 针对相关传闻&#xff0c;至昨日&#xff08;25日…

(c语言)整形提升

#include<stdio.h> //整形提升 int main() { char a 5; //字符型的内存大小为8个比特位&#xff0c;故在进行加法之类的线性运算时需要整形提升 //00000000000000000000000000000101->5 因为字符型的内存大小不足&#xff0c;故在存放整形时需要裁切 …

邮件功能-python中的SMTP协议邮件发送

文章目录 一、SMTP协议邮件准备二、smtplib模块1.使用smtplib封装一个邮件类2.发送邮件 补充 一、SMTP协议邮件准备 需要一个smtp服务器 二、smtplib模块 smtplib模块是python自带的模块 1.使用smtplib封装一个邮件类 import smtplib import logging # 加入日志&#xff…

Rocky linux8.8系统通过packstack安装OpenStack yoga版本

目录 材料准备环境配置关闭防火墙和selinux主机网络chrony时间同步安装openstack选择你想安装的openstack版本并安装安装packstackall in one一键部署openstack(不推荐,新手适用)通过应答文件部署附录:CentOS Steam 9配置阿里yum源附录:rocky换中科大源材料准备 Rocky lin…

Ceph入门到精通-netstat -s|grep “dropped“

“104081 SYNs to LISTEN sockets dropped” 是一个网络错误消息&#xff0c;通常出现在服务器日志或网络设备日志中。SYN 是 TCP 握手过程中的一个步骤&#xff0c;用于建立连接。而 LISTEN sockets 是服务器用于接收新连接的状态。 这个错误消息表示服务器接收到了 104081 个…