使用 Asyncio 创建和管理任务

2024-08-15 0 298

使用 Asyncio 创建和管理任务

asyncio 允许开发者轻松地用 Python 编写异步程序。该模块还提供了多种异步任务的方法,并且由于执行方法多种多样,因此可能会让人困惑于使用哪一种。

在本文中,我们将讨论使用 asyncio 创建和管理任务的多种方法。

什么是异步任务?

在 asyncio 中,task 是一个包装协程并安排其在事件循环内运行的对象。简而言之,任务是一种与其他任务同时运行协程的方式。创建任务后,事件循环将运行它,并根据需要暂停和恢复它以允许其他任务运行。

创建和管理 asyncio 任务的方法

现在,我们可以讨论创建和管理任务的方法。首先,要使用 asyncio 在 python 中创建任务,请使用 asyncio.create_task 方法,该方法采用以下参数:

  • coro(必填):要调度的协程对象。这是你想要异步运行的函数。

  • 名称(可选):可用于调试或日志记录目的的任务名称。您可以为该参数分配一个字符串

    • 您还可以稍后使用 task.set_name(name) 和 task.get_name() 设置或获取名称。
  • context(可选):在python 3.11中引入,用于为任务设置上下文变量,从而启用任务本地存储。它类似于线程本地存储,但用于异步任务。

    • 这个参数并不常用,除非你正在处理需要上下文管理的高级场景。

这里是asyncio.create_task的使用示例:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

import asyncio

# define a coroutine

async def greet(name):

    await asyncio.sleep(1)  # simulate an i/o-bound operation

    print(f"hello, {name}!")

async def main():

    # create tasks

    task1 = asyncio.create_task(greet("alice"), name="greetingalice")

    task2 = asyncio.create_task(greet("bob"), name="greetingbob")

    # check task names

    print(f"task 1 name: {task1.get_name()}")

    print(f"task 2 name: {task2.get_name()}")

    # wait for both tasks to complete

    await task1

    await task2

# run the main function

asyncio.run(main())

创建任务时,可以执行很多方法,例如:

  • .cancel(): 取消任务。

  • .add_done_callback(cb): 添加任务完成时运行的回调函数。

  • .done(): 检查任务是否完成。

  • .result():任务完成后检索结果。

现在我们了解了如何创建任务,让我们看看如何处理等待一个任务或多个任务。

等待任务完成

在本节中,我们将讨论如何等待一个或多个任务的任务完成。异步编程基于这样一个事实:如果正在运行异步任务,我们可以继续执行程序。有时您可能想要更好地控制流程,并希望确保在安全地继续执行程序之前获得可以使用的结果。

要等待单个任务完成,可以使用asyncio.wait_for。它需要两个参数:

  • awaiTable(必需):这是您想要等待的协程、任务或未来。它可以是任何可以等待的对象,例如协程函数调用、asyncio.task 或 asyncio.future。

  • timeout(可选):这指定等待 aw 完成的最大秒数。如果达到超时并且等待尚未完成,asyncio.wait_for 会引发 timeouterror。如果超时设置为 none,该函数将无限期地等待等待完成。

这是使用此方法的示例:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

import asyncio

async def slow_task():

    print("task started...")

    await asyncio.sleep(5)  # simulating a long-running task

    print("task finished!")

    return "completed"

async def main():

    try:

        # wait for slow_task to finish within 2 seconds

        result = await asyncio.wait_for(slow_task(), timeout=2)

        print(result)

    except asyncio.timeouterror:

        print("the task took too long and was canceled!")

asyncio.run(main())

上面的代码中,slow_task() 是一个协程,通过休眠 5 秒来模拟长时间运行的任务。 asyncio.wait_for(slow_task(), timeout=2) 行等待任务完成,但将等待时间限制为 2 秒,从而导致超时,因为任务需要更长的时间。当超过超时时,会引发 timeouterror,任务被取消,并通过打印一条指示任务花费太长时间的消息来处理异常。

我们还可以等待多个或一组任务完成。这可以使用 asyncio.wait、asyncio.gather 或 asyncio.as_completed 来实现。让我们探索每种方法。

异步等待

asyncio.wait 方法等待一组任务并返回两组:一组用于已完成的任务,一组用于待处理的任务。它需要以下参数:

  • aws(必需,可迭代):您想要等待的协程对象、任务或 future 的集合。

  • timeout(float 或 none,可选):等待的最大秒数。如果没有提供,它将无限期地等待。

  • return_when (常量,可选):指定 asyncio.wait 何时返回。选项包括:

    • asyncio.all_completed(默认):当所有任务完成时返回。
    • asyncio.first_completed:第一个任务完成时返回。
    • asyncio.first_exception:当第一个任务引发异常时返回。

让我们看看它是如何在示例中使用的。

1

2

3

4

5

6

7

8

9

10

11

12

import asyncio

import random

async def task():

    await asyncio.sleep(random.unifORM(1, 3))

async def main():

    tasks = [asyncio.create_task(task()) for _ in range(3)]

    done, pending = await asyncio.wait(tasks, return_when=asyncio.first_completed)

    print(f"done tasks: {len(done)}, pending tasks: {len(pending)}")

asyncio.run(main())

在上面的代码中,asyncio.wait 等待一组任务并返回两组:一组包含已完成的任务,另一组包含仍待处理的任务。您可以控制它何时返回,例如第一个任务完成后或所有任务完成后。在示例中,asyncio.wait 在第一个任务完成时返回,将其余任务留在待处理集中。

异步收集

asyncio.gather 方法同时运行多个可等待对象并返回其结果列表,可以选择处理异常。让我们看看它所需要的论据。

  • *aws(必需,多个可等待):并发运行的可变数量的可等待对象(如协程、任务或 future)。

  • return_exceptions(bool,可选):如果为 true,任务中的异常将作为结果列表的一部分返回,而不是引发。

让我们看看如何在示例中使用它。

1

2

3

4

5

6

7

8

9

10

11

12

import asyncio

import random

async def task(id):

    await asyncio.sleep(random.uniform(1, 3))

    return f"task {id} done"

async def main():

    results = await asyncio.gather(task(1), task(2), task(3))

    print(results)

asyncio.run(main())

在上面的代码中,asyncio.gather 同时运行多个可等待对象,并按照传入的顺序返回结果列表。如果 return_exceptions 设置为 true,它允许您优雅地处理异常。在示例中,三个任务同时运行,所有任务完成后,它们的结果将在列表中返回。

asyncio.as_completed 已完成

asyncio.as_completed 方法用于返回一个迭代器,该迭代器在任务完成时生成任务,从而允许立即处理结果。它需要以下参数:

  • aws(可迭代的可等待对象):协程对象、任务或 future 的集合。

  • timeout(float 或 none,可选):等待任务完成的最大秒数。如果没有提供,它将无限期地等待。

例子

1

2

3

4

5

6

7

8

9

10

11

12

13

14

import asyncio

import random

async def task(id):

    await asyncio.sleep(random.uniform(1, 3))

    return f"task {id} done"

async def main():

    tasks = [task(i) for i in range(3)]

    for coro in asyncio.as_completed(tasks):

        result = await coro

        print(result)

asyncio.run(main())

在上面的示例中,asyncio.as_completed 返回一个迭代器,该迭代器在每个任务完成时生成结果,使您可以立即处理它们。当您希望在结果可用时立即对其进行处理,而不是等待所有任务完成时,这非常有用。在示例中,任务同时运行,并且在每个任务完成时按完成顺序打印其结果。

因此,为了进行总结,您可以使用:

  • asyncio.wait:当您需要处理多个任务并想要跟踪哪些任务已完成、哪些任务仍待处理时。当您分别关心每个任务的状态时,它很有用。

  • asyncio.gather:当你想要同时运行多个任务并需要列表中的结果时,特别是当结果的顺序很重要或者你需要优雅地处理异常时。

  • asyncio.as_completed:当您想要在每个任务完成后立即处理结果,而不是等待所有任务完成时。这对于按可用顺序处理结果很有用。

但是,这些方法不采用带有内置错误处理的原子任务管理。在下一节中,我们将了解 asyncio.taskgroup 以及如何使用它来管理一组任务。

异步任务组

asyncio.taskgroup 是 python 3.11 中引入的上下文管理器,可简化对多个任务作为一个组的管理。它确保如果组内的任何任务失败,所有其他任务都会被取消,从而提供了一种通过强大的错误处理来处理复杂任务管理的方法。该类有一个名为created_task 的方法,用于创建任务并将其添加到任务组中。您将协程传递给此方法,它会返回一个由组管理的 asyncio.task 对象。

这是如何使用它的示例:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

import asyncio

async def task1():

    await asyncio.sleep(1)

    return "Task 1 done"

async def task2():

    await asyncio.sleep(2)

    return "Task 2 done"

async def task_with_error():

    await asyncio.sleep(1)

    raise ValueError("An error occurred")

async def main():

    try:

        async with asyncio.TaskGroup() as tg:

            task1 = tg.create_task(task1())

            task2 = tg.create_task(task2())

            error_task = tg.create_task(task_with_error())

    except Exception as e:

        print(f"Error: {e}")

    # Print results from completed tasks

    print("Task 1 result:", task1.result())

    print("Task 2 result:", task2.result())

asyncio.run(main())

asyncio.taskgroup 管理多个任务,并确保如果任何任务失败,组中的所有其他任务都将被取消。在示例中,出现错误的任务会导致整个组被取消,并且只打印已完成任务的结果。

此用途可以用于网络抓取。您可以使用 asyncio.taskgroup 来处理多个并发 API 请求,并确保如果任何请求失败,则所有其他请求都被取消,以避免数据不完整。

我们到了文章的最后,我们已经了解了 asyncio 提供的多种创建和管理任务的方法。方法总结如下:

  • asyncio.wait_for:等待超时的任务。

  • asyncio.wait:等待多个任务,完成条件灵活。

  • asyncio.gather:将多个任务聚合到一个等待中。

  • asyncio.as_completed:处理完成的任务。

  • asyncio.taskgroup:管理一组任务,并在失败时自动取消。

结论

异步编程可以改变您在 python 中处理并发任务的方式,使您的代码更加高效且响应迅速。在本文中,我们浏览了 asyncio 提供的各种方法来创建和管理任务,从简单的超时到复杂的任务组。了解何时以及如何使用每种方法(asyncio.wait_for、asyncio.wait、asyncio.gather、asyncio.as_completed 和 asyncio.taskgroup)将帮助您充分利用异步编程的潜力,使您的应用程序更加健壮和可扩展。

要更深入地了解异步编程和更多实际示例,请在此处浏览我们的详细指南。

如果您喜欢这篇文章,请考虑订阅我的时事通讯,这样您就不会错过未来的更新。

快乐编码!

收藏 (0) 打赏

感谢您的支持,我会继续努力的!

打开微信/支付宝扫一扫,即可进行扫码打赏哦,分享从这里开始,精彩与您同在
点赞 (0)

免责声明
1. 本站所有资源来源于用户上传和网络等,如有侵权请邮件联系本站整改team@lcwl.fun!
2. 分享目的仅供大家学习和交流,您必须在下载后24小时内删除!
3. 不得使用于非法商业用途,不得违反国家法律。否则后果自负!
4. 本站提供的源码、模板、插件等等其他资源,都不包含技术服务请大家谅解!
5. 如有链接无法下载、失效或广告,请联系本站工作人员处理!
6. 本站资源售价或VIP只是赞助,收取费用仅维持本站的日常运营所需!
7. 如遇到加密压缩包,请使用WINRAR解压,如遇到无法解压的请联系管理员!
8. 因人力时间成本问题,部分源码未能详细测试(解密),不能分辨部分源码是病毒还是误报,所以没有进行任何修改,大家使用前请进行甄别!
9.本站所有源码资源都是经过本站工作人员人工亲测可搭建的,保证每个源码都可以正常搭建,但不保证源码内功能都完全可用,源码属于可复制的产品,无任何理由退款!

网站搭建学习网 Python 使用 Asyncio 创建和管理任务 https://www.xuezuoweb.com/13153.html

常见问题
  • 本站所有的源码都是经过平台人工部署搭建测试过可用的
查看详情
  • 购买源码资源时购买了带主机的套餐是指可以享受源码和所选套餐型号的主机两个产品,在本站套餐里开通主机可享优惠,最高免费使用主机
查看详情

相关文章

发表评论
暂无评论
官方客服团队

为您解决烦忧 - 24小时在线 专业服务

Fa快捷助手
手机编程软件开发

在手机上用手点一点就能轻松做软件

去做软件
链未云主机
免备案香港云主机

开通主机就送域名的免备案香港云主机

去使用
链未云服务器
免备案香港云服务器

支持售后、超低价、稳定的免备案香港云服务器

去使用