星屿浅奈 ✧ Saneko

Back

基于QThread的多线程任务管理Blur image

在开发一个 PyQt 应用程序时,我们经常需要执行一些耗时的操作,例如文件读写网络请求复杂的数据处理长时间的计算。如果这些操作在主线程中执行,会导致用户界面(UI)冻结,用户无法与应用程序进行交互,影响用户体验。为了解决这个问题,我们需要将这些耗时操作放在单独的线程中执行。

然而,直接使用多线程会带来一些复杂性,例如线程的创建管理信号槽机制的使用,以及在多线程环境下处理异常和结果的传递等。为了简化这些操作,因此开发了 TaskTaskManager 类。

功能实现#

方法解释#

  • 线程执行Task 类继承自 QThread,用于将函数封装在独立线程中执行。通过 start() 方法在后台运行耗时操作,避免阻塞主线程。

  • 信号机制:使用 PyQt 的 pyqtSignal 机制,Task 类定义了三个信号:

    • success_signal:任务成功完成时触发
    • fail_signal:任务执行异常时触发
    • finished_signal:任务完成时触发(无论成功或失败)
  • 回调函数连接:通过 onSuccessonFailonFinished 方法连接回调函数,实现任务成功时更新 UI、失败时显示错误、完成时执行清理等操作。

  • 任务生命周期管理

    • create_task:创建任务并添加到 _tasks 字典
    • run_task:启动单个任务,完成后自动移除
    • run_tasks:同时运行多个任务
    • remove_task:移除已完成的任务,避免内存泄漏
    • is_idle / is_all_idle:检查任务是否处于空闲状态

使用示例#

def getDeviceInfo():
    # 一些耗时操作...
    return "模拟设备数据"

task = TaskManager.create_task(getDeviceInfo).onSuccess(lambda deviceInfo: print(deviceInfo)).onFail(lambda e: print(e)).onFinished(lambda: print("任务结束"))
TaskManager.run_task(task)
python
基于QThread的多线程任务管理
https://saneko.me/blog/84544b8bdbe9
Author Saneko
Published at January 14, 2025
Buy me a cup of coffee ☕.
Comment seems to stuck. Try to refresh?✨