Skip to content

异步

asyncio 异步 I/O

Python 的 asyncio 模块是用于编写并发代码的库,它使用 async/await 语法来实现异步 I/O 操作。异步 I/O 允许在等待 I/O 操作完成时执行其他任务,从而提高程序的性能,特别是在 I/O 密集型任务(如网络请求、文件读写)中效果显著。

核心概念:

  • 事件循环 (Event Loop): asyncio 的核心,负责调度和执行协程。事件循环会不断运行,监听 I/O 操作完成后触发的事件;
  • 协程 (Coroutine): 是可以被挂起和恢复的函数,通常使用 async def 定义,并通过 await 关键字挂起协程直到异步操作完成;
  • 任务 (Task): Task 是由事件循环执行的协程。通过将协程封装为任务,事件循环可以并发地运行多个任务;
  • Future: 类似于 JavaScript 中的 Promise,表示一个将来完成的操作。通常你不需要直接操作 Future,而是通过协程和任务来使用它;

示例:

Python
1
2
3
4
5
6
7
8
import asyncio

async def main():
    print('Hello ...')
    await asyncio.sleep(1)
    print('... World!')

asyncio.run(main())

在上述例子中,main 是一个异步协程,调用 asyncio.sleep(1) 会暂停 1 秒,并让出控制权给事件循环。之后,事件循环继续执行下一个任务,1 秒后再恢复这个协程的执行。

asyncio REPL
在 Python 中运行 python -m asyncio 启动 asyncio 调试模式。启动后,进入到 asyncio 的交互环境,方便进行一些异步函数的简单测试。

可以在 REPL 中尝试使用 asyncio 并发上下文:

Text Only
1
2
3
4
5
6
7
$ python -m asyncio
asyncio REPL 3.12.3 (main, Apr 27 2024, 19:00:26) [GCC 9.4.0] on linux
Use "await" directly instead of "asyncio.run()".
Type "help", "copyright", "credits" or "license" for more information.
>>> import asyncio
>>> await asyncio.sleep(10, result='hello')
'hello'

1 运行 asyncio 程序

  • asyncio.run(coro, *, debug=None, loop_factory=None)

此函数会运行传入的协程(coroutine) coro 并返回结果,负责管理 asyncio 事件循环,终结异步生成器,并关闭执行器。当有其他 asyncio 事件循环在同一线程中运行时,此函数不能被调用。

  • coro:要运行的协程,必须是一个协程对象,通常是由 async def 定义的函数;
  • debug:如果设置为 True,事件循环将运行于调试模式。可以通过传递 True 或 False 来启用或禁用调试功能,也可以设置为 None(默认值),将沿用全局 Debug 模式设置;
  • loop_factory:工厂函数,用于创建自定义事件循环。如果传递了该参数,asyncio.run() 会使用该工厂函数生成事件循环,默认 loop_factory 为 None,则使用 asyncio.new_event_loop() 并通过 asyncio.set_event_loop() 将其设置为当前事件循环; 假设使用 uvloop 作为 asyncio 的事件循环:

    Python
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    import asyncio
    import uvloop
    
    async def task():
        print("Task started")
        await asyncio.sleep(1)
        print("Task finished")
    
    # 使用自定义的事件循环工厂
    asyncio.run(task(), loop_factory=uvloop.new_event_loop)
    
  • class asyncio.Runner(*, debug=None, loop_factory=None)
    在相同上下文中,多个异步函数调用可通过上下文管理器进行简化。debugloop_factoryasyncio.run 一样。 通过上下文管理器重写 asyncio.run() 示例:

Python
1
2
3
4
5
6
7
8
9
import asyncio

async def main():
    await asyncio.sleep(1)
    print('hello')


with asyncio.Runner() as runner:
    runner.run(main())

示例:

Text Only
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
import asyncio
import uvloop


async def task():
    print("Task started")
    await asyncio.sleep(1)
    print("Task finished")


# 创建 Runner 实例,使用自定义的事件循环工厂
runner = asyncio.Runner(loop_factory=uvloop.new_event_loop)

try:
    runner.run(task())
finally:
    runner.close()

asyncio.Runner 方法:
* run(coro, *, context=None):运行一个协程,返回协程的结果或者引发其异常。该方法类似于 asyncio.run(),但需要在 Runner 实例的上下文中调用。 * 参数 context 允许指定一个自定义 contextvars.Context 用作 coro 运行所在的上下文。 如果为 None 则会使用运行器的默认上下文。 * runner.close():关闭事件循环,释放相关资源。 * get_loop():返回运行器实例的事件循环。

2 协程与任务

2.1 协程

通过 async/await 语法声明协程,是编写 asyncio 应用的推荐方式。

例如:

Text Only
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
In [1]: import asyncio

In [2]: async def main():
   ...:     print('hello')
   ...:     await asyncio.sleep(1)
   ...:     print('world')
   ...: 

In [3]: main()
Out[3]: <coroutine object main at 0x7fe01b04a680>

In [4]: asyncio.run(main())
hello
world

In [5]: await main()
hello
world

如上,简单通过 main() 调用并不会执行协程,要运行一个协程,asyncio 提供以下机制:

  • asyncio.run()
Text Only
1
asyncio.run(main())
  • await
Text Only
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
import asyncio
import time

async def say_after(delay, what):
    await asyncio.sleep(delay)
    print(what)

async def main():
    print(f"started at {time.strftime('%X')}")

    await say_after(1, 'hello')
    await say_after(2, 'world')

    print(f"finished at {time.strftime('%X')}")

asyncio.run(main())

# 输出:
# started at 14:00:29
# hello
# world
# finished at 14:00:32

运行时间 3 秒。 * asyncio.create_task()
asyncio.create_task(coro, *, name=None, context=None):将 coro 协程封装为一个 Task 并调度其执行,返回 Task 对象。
并发运行多个协程,如,修改以上示例,并发运行两个 say_after 协程:

Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
async def main():
    task1 = asyncio.create_task(say_after(1, 'hello'))
    task2 = asyncio.create_task(say_after(2, 'world'))
    print(f"started at {time.strftime('%X')}")

    # 等待直到两个任务都完成
    # (会花费约 2 秒钟。)
    await task1
    await task2
    print(f"finished at {time.strftime('%X')}")

asyncio.run(main())

# 输出:
# started at 14:08:53
# hello
# world
# finished at 14:08:55

并发起作用,运行时间 2 秒,快上 1 秒。 * asyncio.TaskGroup:
任务分组异步上下文管理器,可以使用 create_task() 将任务添加到分组中。

Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
async def main():
    async with asyncio.TaskGroup() as tg:
        task1 = tg.create_task(say_after(1, 'hello'))
        task2 = tg.create_task(say_after(2, 'world'))
        print(f"started at {time.strftime('%X')}")

    # 当存在上下文管理器时 await 是隐式执行的。
    print(f"finished at {time.strftime('%X')}")

asyncio.run(main())

# 输出:
# started at 14:24:42
# hello
# world
# finished at 14:24:44

2.2 任务

2.2.1 可等待对象

如果一个对象可以在 await 语句中使用,那么它就是可等待对象。可等待对象有三种主要类型:协程, 任务 和 Future。

  • 协程
    Python 协程属于可等待对象,因此可以在其他协程中被等待:
Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
import asyncio

async def nested():
    return 42

async def main():
    # nested()  # Raise:RuntimeWarning
    print(await nested())  # 输出 "42"

asyncio.run(main())

注意: * 协程函数: 定义形式为 async def 的函数; * 协程对象: 调用 协程函数 所返回的对象;

  • Futures
    Future 对象表示一个异步操作的最终结果。可以看作一个占位符,表示稍后会提供结果的操作。可以用于等待异步操作的完成或从异步操作中获取结果。在 asyncio 中,Future 对象通常由事件循环创建和管理。asyncio 的协程会隐式地与 Future 对象交互,最终由事件循环将结果设置到 Future 上。主要用于低层次的操作,例如等待某个异步操作的结果。开发者通常不会直接创建 Future 对象,而是通过 await 来等待异步操作。

核心方法和属性 * future.result():获取异步操作的结果,如果结果尚未准备好,会抛出异常; * future.set_result(value):手动设置 Future 对象的结果; * future.set_exception(exception):手动为 Future 设置一个异常; * future.done():返回 True,表示 Future 已经完成(无论是正常结束还是异常); * future.add_done_callback(fn):当 Future 完成时,注册一个回调函数 fn 来处理结果;

示例:手动创建和完成 Future

Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
import asyncio


async def set_future_value(fut):
    await asyncio.sleep(1)  # 模拟异步操作
    fut.set_result("Future result")  # 设置 Future 的结果


async def main():
    # 创建 Future 对象
    fut = asyncio.Future()

    # 启动异步任务来设置 Future 的值
    await asyncio.create_task(set_future_value(fut))

    # 获取 Future 的结果
    print(fut.result())  # 输出: Future result


# 使用 asyncio.run 运行协程
asyncio.run(main())
  • 任务
    任务(Tasks)是 asyncio 中的一种高级对象,它封装了一个协程并允许将其加入事件循环中执行。与 Future 不同,Task 本质上是一个 Future,但它主要用于调度和运行协程。
Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
import asyncio

async def nested():
    return 42

async def main():
    task = asyncio.create_task(nested())
    result = await task
    print(result)
asyncio.run(main())

2.2.2 创建任务

asyncio.create_task(coro, *, name=None, context=None): 是 asyncio 提供的一个函数,用于将协程(coroutine)封装为 Task 并将其调度到事件循环中执行。这个函数可以让协程以异步的方式执行,而不需要阻塞当前的代码。 返回值是一个 Task 对象,即 asyncio.Task 实例。任务一旦创建,事件循环将会立即开始执行它,直到任务完成或被取消。

  • coro(必须):要封装并调度的协程对象,它是一个异步函数的调用,该协程会立即被调度,加入到事件循环中执行;
  • name=None(可选):任务的名称。可以为任务设置一个字符串名称,便于调试和日志记录,任务名称可以帮助你在调试时更容易识别任务,如果不设置,默认任务没有名字;
  • context=None(可选):传递给 Task 的上下文(Context),允许在不同的 Task 之间共享上下文对象,它提供了异步任务的上下文管理,方便追踪状态、异常等,不常用,默认使用当前上下文;

任务的调度:调用 asyncio.create_task() 后,协程 coro 不会同步阻塞当前的函数,它会被放入事件循环中,并且一旦事件循环准备好执行该任务,任务将自动开始。 与 await 的区别:await 是同步等待协程执行完的结果,而 create_task() 是异步的,不会阻塞当前函数,会让协程并行执行。

  • 示例 1:基本用法
Text Only
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import asyncio


async def task():
    print("Task started")
    await asyncio.sleep(2)
    print("Task finished")


async def main():
    # 创建并调度任务
    task1 = asyncio.create_task(task())

    # 主函数不会等待任务完成,继续执行
    print("Task is running in the background")

    # 等待任务完成(可选的)
    await task1

asyncio.run(main())

# 输出:
# Task is running in the background
# Task started
# Task finished
  • 示例 2:设置任务名称
Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
import asyncio


async def my_named_task():
    await asyncio.sleep(1)
    print("Named Task completed")


async def main():
    # 创建并命名任务
    task = asyncio.create_task(my_named_task(), name="MyUniqueTask")
    print(f"Task Name: {task.get_name()}")  # 输出任务的名称
    await task


asyncio.run(main())
  • 示例 3:任务与上下文(context)

对于 context 的使用,涉及 contextvars 模块,可以在异步任务之间传递上下文信息,一个典型的例子是追踪请求的状态或 ID。

Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import asyncio
import contextvars

# 创建一个上下文变量
request_id = contextvars.ContextVar('request_id')


async def task_with_context():
    # 在任务中读取上下文变量
    print(f"Task request_id: {request_id.get()}")
    await asyncio.sleep(1)


async def main():
    # 创建新的上下文
    ctx = contextvars.copy_context()

    # 设置上下文变量的值
    ctx.run(request_id.set, "12345")

    # 使用设置好的上下文来创建任务
    task = asyncio.create_task(task_with_context(), context=ctx)

    await task


# 运行主协程
asyncio.run(main())

# 输出:
# Task request_id: 12345
  • contextvars.ContextVar:这是一个可以在协程之间共享的变量类型。在不同的协程中可以有独立的值,并且不会相互干扰;
  • contextvars.copy_context():这个函数用于复制当前上下文,生成一个新的上下文。在新上下文中可以进行变量设置并传递给任务;
  • ctx.run(request_id.set, "12345"):使用上下文的 run 方法来设置 request_id 变量的值;
  • asyncio.create_task(..., context=ctx):将创建的上下文 ctx 传递给 create_task,使得这个任务在执行过程中使用这个上下文;

通过上下文管理,可以在异步任务之间共享状态或变量,不会造成并发数据污染。

  • 示例 4:取消任务
Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import asyncio


async def my_task():
    print("Task started")
    try:
        await asyncio.sleep(5)  # 模拟长时间运行的任务
    except asyncio.CancelledError:
        print("Task was cancelled!")
        raise
    print("Task finished")


async def main():
    task = asyncio.create_task(my_task())

    await asyncio.sleep(2)  # 让任务执行一段时间
    task.cancel()  # 取消任务

    try:
        await task  # 等待任务完成并捕获取消异常
    except asyncio.CancelledError:
        print("Task has been cancelled")


# 使用 asyncio.run 运行协程
asyncio.run(main())

# 输出:
# Task started
# Task was cancelled!
# Task has been cancelled

2.2.3 任务组

asyncio.TaskGroup 是 Python 3.11 引入的新特性,提供了一种结构化的方式来管理异步任务组。它旨在简化多个任务的创建和管理,增强了异常处理机制,使得在并发编程中处理任务更安全和高效。与传统的 asyncio.gather() 和 asyncio.wait() 不同,TaskGroup 通过上下文管理器的方式来确保所有任务的生命周期都在同一个范围内被创建和管理。

  • create_task(coro, *, name=None):该方法与 asyncio.create_task() 类似,在任务组中创建并调度协程。返回一个 Task 对象;
  • aenter() 和 aexit():支持上下文管理,当退出上下文时会确保所有任务都已完成或者已经被取消;
  • 异常传播:如果某个任务抛出异常,任务组会自动取消其他所有任务,并将异常抛出给调用者;

示例

  • 示例 1:基本用法示例
Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import asyncio

async def task(num):
    await asyncio.sleep(1)
    print(f"Task {num} done")
    return f"Result {num}"

async def main():
    # 使用 TaskGroup 管理任务组
    async with asyncio.TaskGroup() as tg:
        # 创建并调度任务
        task1 = tg.create_task(task(1))
        task2 = tg.create_task(task(2))
        task3 = tg.create_task(task(3))

    # 在 TaskGroup 上下文结束后,所有任务已经完成
    print(f"Task 1: {task1.result()},Task 2: {task2.result()},Task 3: {task3.result()}")

# 运行主协程
asyncio.run(main())
# 输出:
# Task 1 done
# Task 2 done
# Task 3 done
# Task 1: Result 1,Task 2: Result 2,Task 3: Result 3
  • 示例 2:异常处理
    asyncio.TaskGroup 提供了自动化的异常处理机制。如果任务组中的某个任务抛出异常,TaskGroup 会自动取消其他任务,并且该异常会传播到调用者。
Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import asyncio


async def task(num):
    await asyncio.sleep(num)
    if num == 2:
        raise ValueError("Task 2 encountered an error!")
    print(f"Task {num} done")
    return f"Result {num}"


async def main():
    try:
        # 使用 TaskGroup 管理任务组
        async with asyncio.TaskGroup() as tg:
            # 创建并调度任务
            task1 = tg.create_task(task(1))
            task2 = tg.create_task(task(2))  # 这个任务会抛出异常
            task3 = tg.create_task(task(3))
    except Exception as e:
        print(f"An exception occurred: {e}")

    # 在 TaskGroup 上下文结束后,所有任务已经完成
    print(f"Task 1: {task1.result()},Task 2: {task2.result()},Task 3: {task3.result()}")


# 运行主协程
asyncio.run(main())
  • 当 task2 抛出异常时,TaskGroup 自动捕获该异常并取消 task3,因此 task3 不会完成;
  • TaskGroup 会自动传播异常给调用者,因此外层的 try/except 块能够捕获并处理该异常;

或:

Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import asyncio
from asyncio import TaskGroup


class TerminateTaskGroup(Exception):
    """Exception raised to terminate a task group."""


async def force_terminate_task_group():
    """Used to force termination of a task group."""
    raise TerminateTaskGroup()


async def job(task_id, sleep_time):
    print(f'Task {task_id}: start')
    await asyncio.sleep(sleep_time)
    print(f'Task {task_id}: done')


async def main():
    try:
        async with TaskGroup() as group:
            # spawn some tasks
            group.create_task(job(1, 0.5))
            group.create_task(job(2, 1.5))
            # sleep for 1 second
            await asyncio.sleep(1)
            # add an exception-raising task to force the group to terminate
            group.create_task(force_terminate_task_group())
    except* TerminateTaskGroup:
        pass


asyncio.run(main())
  • 示例 3:自动取消任务示例
    如果某个任务抛出异常,TaskGroup 会立即取消其他所有正在运行的任务,这有助于避免在错误条件下无用的任务继续运行。
Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import asyncio

async def task_1():
    await asyncio.sleep(1)
    print("Task 1 done")
    return "Result 1"

async def task_2():
    await asyncio.sleep(2)
    raise ValueError("Task 2 failed!")

async def task_3():
    try:
        await asyncio.sleep(3)
        print("Task 3 done")
    except asyncio.CancelledError:
        print("Task 3 was cancelled!")

async def main():
    try:
        async with asyncio.TaskGroup() as tg:
            tg.create_task(task_1())
            tg.create_task(task_2())  # This will raise an exception
            tg.create_task(task_3())
    except Exception as e:
        print(f"An exception occurred: {e}")

# 运行主协程
asyncio.run(main())

# 输出:
# Task 1 done
# Task 3 was cancelled!
# An exception occurred: unhandled errors in a TaskGroup (1 sub-exception)

当 task_2 发生异常时,TaskGroup 取消了所有其他任务,因此 task_3 被取消并捕获到了 CancelledError。

2.2.4 休眠

coroutine asyncio.sleep(delay, result=None) 阻塞 delay 指定的秒数,如果指定了 result,则当协程完成时将其返回给调用者。

以下协程示例运行 5 秒,每秒显示一次当前日期:

Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
import asyncio
import datetime

async def display_date():
    loop = asyncio.get_running_loop()
    end_time = loop.time() + 5.0
    while True:
        print(datetime.datetime.now())
        if (loop.time() + 1.0) >= end_time:
            break
        await asyncio.sleep(1)

asyncio.run(display_date())
  • asyncio.get_running_loop():获取当前正在运行的事件循环。如果没有正在运行的事件循环抛出 RuntimeError;
  • loop.time():事件循环中的一个高精度时钟,用来获取事件循环的当前时间。与 time.time() 不同,loop.time() 返回的是相对时间,并且不会受系统时间修改的影响,因此非常适合用于异步编程中的定时任务;

2.2.5 并发运行任务

asyncio.gather(*aws, return_exceptions=False), 并发运行 aws 序列中的可等待对象。

参数说明:

  • *aws:传递多个可等待对象(协程、任务、Future 等),这些任务将会并发运行,asyncio.gather() 会等待所有的任务完成,并返回每个任务的结果,结果的顺序与传入的任务顺序相同。
  • return_exceptions(可选,默认为 False):
  • 当 False 时,任何一个任务抛出的异常会被立即传播并中止 gather,未完成的任务会被取消;
  • 当 True 时,异常不会立即传播,而是作为结果的一部分返回,这样即使某些任务失败,gather 仍然会等待其他任务的结果,异常对象将作为返回值列表中的一部分返回;

返回值:

  • 如果所有任务成功完成,asyncio.gather() 会返回一个包含每个任务结果的列表,顺序与传入的任务相同;
  • 如果设置 return_exceptions=True,并且某些任务抛出异常,则返回的列表会包含异常对象;

示例:

  • 示例 1:基本用法
Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
import asyncio

async def task(num):
    await asyncio.sleep(num)
    print(f"Task {num} completed")
    return f"Num {num}"

async def main():
    # gather 可以并行运行多个任务
    tasks = []
    for i in range(1,4):
        tasks.append(task(i))
    results = await asyncio.gather(*tasks)
    print(f"Results: {results}")

asyncio.run(main())

# 输出:
# Task 1 completed
# Task 2 completed
# Task 3 completed
# Results: ['Num 1', 'Num 2', 'Num 3']
  • 示例 2:异常示例(return_exceptions = False)
Text Only
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import asyncio


async def task(num):
    await asyncio.sleep(num)
    if num == 2:
        # 模拟异常处理
        raise ValueError(f"Task {num} failed!")
    print(f"Task {num} completed")
    return f"Num {num}"


async def main():
    # gather 可以并行运行多个任务
    tasks = []
    for i in range(1,4):
        tasks.append(task(i))
    try:
        results = await asyncio.gather(*tasks)
        print(f"Results: {results}")
    except Exception as e:
        print(f"An error occurred: {e}")

asyncio.run(main())

# 输出:
# Task 1 completed
# An error occurred: Task 2 failed!
  • 示例 3:异常示例(return_exceptions = True)
Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import asyncio


async def task(num):
    await asyncio.sleep(num)
    if num == 2:
        # 模拟异常处理
        raise ValueError(f"Task {num} failed!")
    print(f"Task {num} completed")
    return f"Num {num}"


async def main():
    # gather 可以并行运行多个任务
    tasks = []
    for i in range(1, 4):
        tasks.append(task(i))
    try:
        results = await asyncio.gather(*tasks, return_exceptions=True)
        print(f"Results: {results}")
    except Exception as e:
        print(f"An error occurred: {e}")


asyncio.run(main())

# 输出:
# Task 1 completed
# Task 3 completed
# Results: ['Num 1', ValueError('Task 2 failed!'), 'Num 3']
  • 示例 4:结合 I/O 操作的并发下载 假设有多个 URL,需要并发下载它们,可以用 asyncio.gather() 来并发执行多个下载任务:
Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import asyncio
import aiohttp


async def fetch(session, url):
    async with session.get(url) as response:
        print(f"Fetching: {url}")
        return await response.text()


async def main():
    urls = [
        "https://example.com",
        "https://httpbin.org/get",
        "https://jsonplaceholder.typicode.com/posts"
    ]

    async with aiohttp.ClientSession() as session:
        results = await asyncio.gather(
            *(fetch(session, url) for url in urls)
        )
        for i, content in enumerate(results):
            print(f"Content {i + 1} length: {len(content)}")


asyncio.run(main())

使用 aiohttp 库来并发下载多个 URL 的内容,并使用 asyncio.gather() 来并发运行这些下载任务。gather 会等所有的下载任务完成,然后返回每个下载任务的结果。

2.2.6 任务 “立即执行”

asyncio.create_eager_task_factory()asyncio.eager_task_factory() 提供了一种任务的主动调度,而不是被动等待。 通常情况下,像 asyncio.create_task() 方法创建的任务虽然被加入事件循环,但它们的实际执行依赖于事件循环的调度顺序。而 eager_task_factory 提供的 API 强调的是任务被立即执行,即在可能的情况下尽早进入执行状态。

  • asyncio.eager_task_factory(loop, coro, *, name=None, context=None)
    用于立即创建并调度任务。当使用这个工厂函数时 (通过 loop.set_task_factory(asyncio.eager_task_factory)),协程将在 Task 构造期间同步地开始执行。任务仅会在它们阻塞时被加入事件循环上的计划任务。这可以达成性能提升因为对同步完成的协程来说可以避免循环调度的开销。

参数说明
* loop:事件循环实例,任务会在这个事件循环中被调度; * coro:需要执行的协程对象; * name(可选):为任务指定一个名称,有助于调试; * context(可选):任务的上下文对象,用于处理任务运行时的上下文;

返回值:
* 返回一个 Task 对象,该任务立即调度并开始运行;

示例:

Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
import asyncio

async def task():
    await asyncio.sleep(1)
    print("Task completed")

async def main():
    loop = asyncio.get_running_loop()
    # 立即创建并调度任务
    task1 = asyncio.eager_task_factory(loop, task(), name="MyTask")
    await task1

asyncio.run(main())

性能对比:

Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import asyncio
import time


async def light_coro():
    pass


async def main():
    print('Before running task group!')
    time0 = time.time()
    asyncio.get_event_loop().set_task_factory(asyncio.eager_task_factory)
    async with asyncio.TaskGroup() as tg:
        for _ in range(1000000):
            tg.create_task(light_coro())
    print(f'It took {time.time() - time0} to run!')
    print('After running task group with eager task factory!')

asyncio.run(main())

# 输出:
# Before running task group!
# It took 1.201669692993164 to run!
# After running task group with eager task factory!

注释:asyncio.get_event_loop().set_task_factory(asyncio.eager_task_factory)

Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import asyncio
import time


async def light_coro():
    pass


async def main():
    print('Before running task group!')
    time0 = time.time()
    # asyncio.get_event_loop().set_task_factory(asyncio.eager_task_factory)
    async with asyncio.TaskGroup() as tg:
        for _ in range(1000000):
            tg.create_task(light_coro())
    print(f'It took {time.time() - time0} to run!')
    print('After running task group with eager task factory!')

asyncio.run(main())

# 输出:
# Before running task group!
# It took 8.283295392990112 to run!
# After running task group with eager task factory!  

可以看到性能提升接近 7 倍。

  • asyncio.create_eager_task_factory(custom_task_constructor)
    该函数用于创建一个任务工厂,该工厂能够使用自定义的任务构造函数。通常在默认的任务工厂之外,你可以使用这个方法来创建更多可控的任务。

参数说明:
* custom_task_constructor:这是一个自定义的任务构造函数,该构造函数通常会接受事件循环和协程作为输入,并返回一个任务对象;

返回值:
* 返回一个工厂函数,可以用于生成任务对象;

示例: TODO: 代补充

2.2.7 避免任务 “取消”

asyncio.shield(aw) 是一个用于保护协程或 Future 对象的工具,它可以防止被取消。当一个协程或任务被 await 时,如果外部取消它(例如,由其他任务或超时机制触发),那么该协程将被中断。 但通过 asyncio.shield(aw),你可以保护协程 aw 免受取消影响,从而确保它可以继续执行。

以下语句:

Text Only
1
2
task = asyncio.create_task(something())
res = await shield(task)

相当于:

Text Only
1
res = await something()

不同之处在于如果包含它的协程被取消,在 something() 中运行的任务不会被取消。从 something() 的角度看来,取消操作并没有发生。然而其调用者已被取消,因此 "await" 表达式仍然会引发 CancelledError。 如果通过其他方式取消 something() (例如在其内部操作) 则 shield() 也会取消。 如果希望完全忽略取消操作 (不推荐) 则 shield() 函数需要配合一个 try/except 代码段,如下所示:

Text Only
1
2
3
4
5
task = asyncio.create_task(something())
try:
    res = await shield(task)
except CancelledError:
    res = None

工作机制:

  • 外部取消:当调用 asyncio.shield() 保护的协程时,外部的取消请求不会直接影响协程的运行。任务会被包裹在一个屏障内,直到其完成;
  • 内部取消:如果协程或任务内部自己被取消(例如在代码逻辑中显式调用取消),则该协程依然会被中断;

示例:

Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import asyncio

async def example_task():
    print("Task started")
    await asyncio.sleep(2)  # 模拟长时间运行的任务
    print("Task completed")
    return "Task result"

async def cancel_task(task):
    await asyncio.sleep(1)
    print("Cancelling task...")
    task.cancel()  # 外部取消任务

async def main():
    # 创建一个任务
    task = asyncio.create_task(example_task())

    # 将任务保护在 shield 中,防止取消
    protected_task = asyncio.shield(task)

    # 启动一个任务来取消我们创建的任务
    asyncio.create_task(cancel_task(protected_task))

    try:
        result = await protected_task
        print(f"Protected task finished with result: {result}")
    except asyncio.CancelledError:
        print("Protected task was cancelled")

asyncio.run(main())

# 输出:
# Task started
# Cancelling task...
# Protected task was cancelled

2.2.8 任务超时

  • asyncio.timeout(delay)

asyncio.timeout(delay) 是 Python 3.11 引入的一个新特性,它提供了一种更简便的方式来为异步任务设置超时。在指定的时间 delay 内,如果异步操作未完成,它将抛出 asyncio.TimeoutError 异常。 参数 delay 可以为 None,或是一个表示等待秒数的浮点数/整数。 如果 delay 为 None,将不会应用时间限制;如果当创建上下文管理器时无法确定延时则此设置将很适用。

示例:

Text Only
1
2
3
async def main():
  async with asyncio.timeout(10):
      await long_running_task()

如果 long_running_task 耗费 10 秒以上完成,该上下文管理器将取消当前任务并在内部处理所引发的 asyncio.CancelledError,将其转化为可被捕获和处理的 TimeoutError。 【备注】asyncio.timeout() 上下文管理器负责将 asyncio.CancelledError 转化为 TimeoutError,这意味着 TimeoutError 只能在该上下文管理器 之外 被捕获。

捕获 TimeoutError 的示例:

Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
import asyncio

async def example_task():
    print("Task started")
    await asyncio.sleep(3)  # 模拟一个长时间任务(3秒)
    print("Task completed")
    return "Task result"

async def main():
    try:
        # 设置超时时间为2秒,超过则抛出TimeoutError
        async with asyncio.timeout(2):
            result = await example_task()
            print(f"Task finished with result: {result}")
    except asyncio.TimeoutError:
        print("Task took too long and was cancelled due to timeout.")

asyncio.run(main())

# 输出:
# Task started
# Task took too long and was cancelled due to timeout.

asyncio.timeout(None) 示例:

Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import asyncio


async def long_running_task():
    print("Task started")
    await asyncio.sleep(5)
    print("Task completed")


async def main():
    try:
        # 不知道超时时间,所以传递 `None`
        async with asyncio.timeout(None) as cm:
            # 知道超时时间,重新设置超时
            new_deadline = asyncio.get_running_loop().time() + 3
            cm.reschedule(new_deadline)
            await long_running_task()
    except TimeoutError:
        pass

    if cm.expired():
        print("Looks like we haven't finished on time.")


asyncio.run(main())
  • asyncio.timeout_at(when)

类似于 asyncio.timeout(),不同之处在于 when 是停止等待的绝对时间,或者为 None,而不是基于相对的延迟(如 asyncio.timeout() 使用的延迟时间)。

参数: * when:这是一个浮点数,表示 Unix 时间戳(从1970年1月1日开始计算的秒数)。可以通过 loop.time() 或 time.time() 获得当前时间,再加上需要的延迟,来生成一个目标时间戳。

示例:

Text Only
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import asyncio


async def example_task():
    print("Task started")
    await asyncio.sleep(3)  # 模拟长时间任务
    print("Task completed")
    return "Task result"


async def main():
    # 获取当前事件循环
    loop = asyncio.get_running_loop()

    # 设置任务的绝对超时时间为当前时间 + 2 秒
    when = loop.time() + 2

    try:
        # 使用 asyncio.timeout_at 设置绝对超时时间
        async with asyncio.timeout_at(when):
            result = await example_task()
            print(f"Task finished with result: {result}")
    except asyncio.TimeoutError:
        print("Task was cancelled due to timeout.")


asyncio.run(main())

# 输出:
# Task started
# Task was cancelled due to timeout.

使用场景:
* 当你需要在某个特定的时间点上(例如一个全局的超时时间点)对多个异步任务进行控制时,asyncio.timeout_at() 是非常有用的。 * 例如,你可以设定多个异步操作都必须在某个固定的时间之前完成,或者你可以与其他时间同步机制结合,确保异步任务在特定时间窗口内完成。

  • coroutine asyncio.wait_for(aw, timeout) 类似于 asyncio.timeout(),但 asyncio.timeout() 使用方式更加简单。

asyncio.timeout 与 asyncio.wait_for() 的区别: * 代码风格:asyncio.timeout 使用上下文管理器风格,更加简洁自然,而 asyncio.wait_for() 则是函数调用风格; * 语义清晰:asyncio.timeout 仅负责超时保护,而 asyncio.wait_for() 是一个包装函数,功能类似,但可能显得冗长;

参数:
* aw:这个参数是一个可等待的对象,通常是一个协程或 Future 对象,它代表异步任务; * timeout:这是一个浮点数,表示等待任务完成的最长时间。如果设置为 None,则不会有超时限制; * 返回值:它返回 aw 的结果,如果 aw 在指定时间内完成; * 异常:如果 aw 在 timeout 时间内没有完成,抛出 asyncio.TimeoutError 异常;

示例:

Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
import asyncio

async def example_task():
    print("Task started")
    await asyncio.sleep(3)  # 模拟一个耗时3秒的任务
    print("Task completed")
    return "Task result"

async def main():
    try:
        # 使用 asyncio.wait_for 设置超时为2秒
        result = await asyncio.wait_for(example_task(), timeout=2.0)
        print(f"Task finished with result: {result}")
    except asyncio.TimeoutError:
        print("Task took too long and was cancelled due to timeout.")

asyncio.run(main())

# 输出:
# Task started
# Task took too long and was cancelled due to timeout.

2.2.9 任务等待

  • coroutine asyncio.wait(aws, *, timeout=None, return_when=ALL_COMPLETED) asyncio.wait() 是 Python 的 asyncio 库中用于并发运行多个异步任务的函数。aws 可迭代对象中的 Future 和 Task 实例,并根据指定的条件返回结果。此函数可以用于并行处理多个异步任务,并在某个条件满足时返回结果。 用法:
Text Only
1
done, pending = await asyncio.wait(aws, *, timeout=None, return_when=ALL_COMPLETED)

参数: * aws:可迭代对象 Future 和 Task 实例; * timeout:可选参数,表示等待的最长时间(以秒为单位)。如果在指定的时间内没有完成,函数将返回已完成的任务和未完成的任务,与 wait_for() 不同,wait() 在超时发生时不会取消可等待对象; * return_when:指定何时返回结果,可以是以下几个常量之一: * asyncio.FIRST_COMPLETED:函数将在任意可等待对象结束或取消时返回; * asyncio.FIRST_EXCEPTION:该函数将在任何 future 对象通过引发异常而结束时返回。 如果没有任何 future 对象引发引发那么它将等价于 ALL_COMPLETED; * asyncio.ALL_COMPLETED(默认):函数将在所有可等待对象结束或取消时返回; 返回值:
* 返回两个 Task/Future 集合: 其中 done 是已完成的任务,pending 是尚未完成的任务;

示例:

Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import asyncio


async def example_task(name, delay):
    print(f"Task {name} started")
    await asyncio.sleep(delay)
    print(f"Task {name} completed")
    return name


async def main():
    # 使用 asyncio.create_task() 创建任务
    tasks = [
        asyncio.create_task(example_task("A", 1)),
        asyncio.create_task(example_task("B", 2)),
        asyncio.create_task(example_task("C", 3)),
    ]

    # 使用 asyncio.wait 等待所有任务完成,设置超时为2秒
    done, pending = await asyncio.wait(tasks, timeout=2, return_when=asyncio.ALL_COMPLETED)

    print("\nResults:")
    for task in done:
        print(f"{task.result()} completed")

    if pending:
        print("Pending tasks:")
        for task in pending:
            print(f"{task}")


asyncio.run(main())

# 输出:
# Task A started
# Task B started
# Task C started
# Task A completed
# 
# Results:
# A completed
# Pending tasks:
# <Task pending name='Task-3' coro=<example_task() running at /home/bolean/workspace/examples/python-tricks/src/asyncio_demo21_01.py:661> wait_for=<Future finished result=None>>
# <Task pending name='Task-4' coro=<example_task() running at /home/bolean/workspace/examples/python-tricks/src/asyncio_demo21_01.py:661> wait_for=<Future pending cb=[Task.task_wakeup()]>>
# Task B completed  # 不会自动取消未完成的任务
  • asyncio.as_completed(aws, *, timeout=None)
    asyncio.as_completed() 是一个异步迭代器,它可以在多个任务(aws,即一组可等待对象)中逐个获取已完成任务的结果。与 asyncio.gather() 不同,asyncio.as_completed() 按任务完成的顺序返回结果,而不是按它们在列表中的顺序。

参数:
* aws:一组可等待对象(通常是协程或 Task),传递的是任务列表,在异步迭代期间,将为不属于 Task 或 Future 对象的可等待对象产出隐式创建的任务。 * timeout(可选):如果设置了超时,当某些任务在超时前未完成时,它们将不会被返回,并会抛出 asyncio.TimeoutError;

示例:

Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import asyncio

async def example_task(name, delay):
    print(f"Task {name} started")
    await asyncio.sleep(delay)
    print(f"Task {name} completed")
    return name

async def main():
    # 使用 asyncio.create_task 创建任务
    tasks = [
        asyncio.create_task(example_task("A", 3)),
        asyncio.create_task(example_task("B", 1)),
        asyncio.create_task(example_task("C", 2)),
    ]
    # 使用 asyncio.as_completed 以任务完成的顺序处理任务
    for completed_task in asyncio.as_completed(tasks):
        result = await completed_task  # 获取已完成任务的结果
        print(f"Task {result} has finished")

asyncio.run(main())

# 输出:
# Task A started
# Task B started
# Task C started
# Task B completed
# Task B has finished
# Task C completed
# Task C has finished
# Task A completed
# Task A has finished

带 timeout 参数的示例:

Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import asyncio

async def example_task(name, delay):
    print(f"Task {name} started")
    await asyncio.sleep(delay)
    print(f"Task {name} completed")
    return name

async def main():
    # 使用 asyncio.create_task 创建任务
    tasks = [
        asyncio.create_task(example_task("A", 3)),
        asyncio.create_task(example_task("B", 5)),
        asyncio.create_task(example_task("C", 1)),
    ]
    # 使用 asyncio.as_completed 并设置超时
    try:
        for completed_task in asyncio.as_completed(tasks, timeout=4):
            result = await completed_task
            print(f"Task {result} has finished")
    except asyncio.TimeoutError:
        print("Some tasks did not complete within the timeout period")

asyncio.run(main())

# 输出:
# Task A started
# Task B started
# Task C started
# Task C completed
# Task C has finished
# Task A completed
# Task A has finished
# Some tasks did not complete within the timeout period

2.2.10 线程化异步任务

coroutine asyncio.to_thread(func, /, *args, **kwargs) 在不同的线程中异步地运行函数 func,此函数提供的任何 *args 和 **kwargs 会被直接传给 func。 并且,当前 contextvars.Context 会被传播,允许在不同的线程中访问来自事件循环的上下文变量。返回一个可被等待以获取 func 的最终结果的协程。这个协程函数主要是用于执行在其他情况下会阻塞事件循环的 IO 密集型函数/方法。

例如:

Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import asyncio
import time


def blocking_io():
    print(f"start blocking_io at {time.strftime('%X')}")
    time.sleep(1)
    print(f"blocking_io complete at {time.strftime('%X')}")


async def main():
    print(f"started main at {time.strftime('%X')}")

    await asyncio.gather(
        asyncio.to_thread(blocking_io),
        asyncio.sleep(1))

    print(f"finished main at {time.strftime('%X')}")


asyncio.run(main())

# 输出:
# started main at 18:17:14
# start blocking_io at 18:17:14
# blocking_io complete at 18:17:15
# finished main at 18:17:15

在任何协程中直接调用 blocking_io() 将会在调用期间阻塞事件循环,导致额外的 1 秒运行时间。 但是,通过改用 asyncio.to_thread(),我们可以在单独的线程中运行它从而不会阻塞事件循环。因此只需要 1 秒。

【备注】 由于 GIL 的存在,asyncio.to_thread() 通常只能被用来将 IO 密集型函数变为非阻塞的。 但是,对于会释放 GIL 的扩展模块或无此限制的替代性 Python 实现来说,asyncio.to_thread() 也可被用于 CPU 密集型函数。

示例:将多个阻塞任务异步执行,最长耗时任务作为脚本花费时间

假设有多个阻塞操作需要并发执行,可以使用 asyncio.gather() 与 asyncio.to_thread() 结合运行多个同步任务。

Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import asyncio
import time


def blocking_io(name, delay):
    print(f"Task {name} start blocking_io at {time.strftime('%X')}")
    time.sleep(delay)
    print(f"Task {name} blocking_io complete at {time.strftime('%X')}")


async def main():
    print(f"started main at {time.strftime('%X')}")
    # asyncio.gather():允许多个异步任务同时运行
    await asyncio.gather(
        # asyncio.to_thread():将每个阻塞任务放入不同的线程,以并发的方式执行多个阻塞操作
        asyncio.to_thread(blocking_io, 'A', 1),
        asyncio.to_thread(blocking_io, 'B', 3),
        asyncio.to_thread(blocking_io, 'C', 2))

    print(f"finished main at {time.strftime('%X')}")

asyncio.run(main())

# 输出:
# started main at 11:19:16
# Task A start blocking_io at 11:19:16
# Task B start blocking_io at 11:19:16
# Task C start blocking_io at 11:19:16
# Task A blocking_io complete at 11:19:17
# Task C blocking_io complete at 11:19:18
# Task B blocking_io complete at 11:19:19
# finished main at 11:19:19

2.2.11 跨线程调度

asyncio.run_coroutine_threadsafe(coro, loop) 允许从一个非事件循环线程中,将协程提交给另一个线程中的事件循环执行(线程安全),并返回一个 concurrent.futures.Future 对象,用于跟踪协程的状态或结果。

参数:

  • coro:要运行的协程对象;
  • loop:协程要运行的事件循环;

返回值:
返回一个 concurrent.futures.Future 对象,可用于同步地检查协程执行的结果,通过 Future.result() 可以获取执行结果,Future.exception() 用于捕获异常。

特点:

  • 线程安全:允许在非事件循环线程中调用,主要用于跨线程将协程提交到特定的事件循环;
  • 适合多线程环境:例如在 GUI 应用程序或多线程后台任务中,跨线程调用 asyncio 协程;

示例代码

Text Only
1
2
3
4
5
6
7
8
# 创建一个协程,该协程等待 1 秒后返回值 3
coro = asyncio.sleep(1, result=3)

# 将协程提交给指定的事件循环 loop
future = asyncio.run_coroutine_threadsafe(coro, loop)

# 等待协程的结果,设置一个可选的超时参数(timeout)
assert future.result(timeout) == 3

代码片段解释:

  • coro = asyncio.sleep(1, result=3): 创建一个协程 coro,使用 asyncio.sleep() 来模拟等待 1 秒钟,并指定 result=3。
    asyncio.sleep() 是一个常用的异步函数,通常用于延迟执行。在这个例子中,它会在 1 秒后返回 3。

  • future = asyncio.run_coroutine_threadsafe(coro, loop): 使用 asyncio.run_coroutine_threadsafe() 函数,将 coro 提交到指定的事件循环 loop 中运行。
    run_coroutine_threadsafe() 会返回一个 concurrent.futures.Future 对象,称为 future,它可以同步地跟踪协程的状态。
    future 对象允许在其他非事件循环线程中访问协程的执行结果。

  • assert future.result(timeout) == 3: 调用 future.result(timeout) 方法等待协程的执行结果,timeout 参数(可选)定义了最长等待时间。
    future.result() 是一个同步方法,会阻塞当前线程直到获得结果或超时。
    assert 语句确保返回的结果为 3,否则会引发 AssertionError。

如果在协程内产生了异常,通知返回的 Future 对象。可被用来取消事件循环中的任务:

Text Only
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
try:
    # 等待 future 的结果,设置一个超时时间
    result = future.result(timeout)
except TimeoutError:
    # 如果在指定的 timeout 时间内没有获得结果,则捕获 TimeoutError
    print('The coroutine took too long, cancelling the task...')
    # 取消 future,以避免不必要的等待
    future.cancel()
except Exception as exc:
    # 如果协程执行中出现了其他异常,则捕获该异常
    print(f'The coroutine raised an exception: {exc!r}')
else:
    # 如果协程在规定时间内正常执行完毕并返回结果
    print(f'The coroutine returned: {result!r}')

完整示例:

Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
import asyncio
import threading

# 定义一个示例协程
async def example_coroutine():
    print("Coroutine started")
    await asyncio.sleep(2)  # 模拟一个耗时 2 秒的任务
    print("Coroutine finished")
    return "Task result"

# 运行事件循环的函数
def start_loop(loop):
    asyncio.set_event_loop(loop)
    loop.run_forever()


# 创建一个新的事件循环,并在单独的线程中运行它
loop = asyncio.new_event_loop()
loop_thread = threading.Thread(target=start_loop, args=(loop,))
loop_thread.start()

# 使用 asyncio.run_coroutine_threadsafe 提交协程任务到事件循环中
coro = example_coroutine()
future = asyncio.run_coroutine_threadsafe(coro, loop)

# 处理 future 结果的代码
try:
    # 等待协程的结果,指定超时时间 3 秒
    result = future.result(3)
except asyncio.TimeoutError:
    # 超时处理
    print('The coroutine took too long, cancelling the task...')
    future.cancel()  # 尝试取消任务
except Exception as exc:
    # 其他异常处理
    print(f'The coroutine raised an exception: {exc!r}')
else:
    # 正常返回结果
    print(f'The coroutine returned: {result!r}')

# 停止事件循环
loop.call_soon_threadsafe(loop.stop)
loop_thread.join()

# 输出:
# Coroutine started
# Coroutine finished
# The coroutine returned: 'Task result'

2.2.12 检查 & 管理协程/任务执行状态或类型

  • asyncio.current_task(loop=None)

描述:返回当前正在运行的 Task 实例。如果当前没有运行的任务,则返回 None;

参数: * loop(可选):事件循环实例,如果 loop 为 None 则会使用 get_running_loop() 获取当前事件循环;

用途:在异步代码块中检查当前任务的状态或属性;
示例:

Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
import asyncio

async def task():
    # 获取当前正在运行的任务
    current_task = asyncio.current_task()
    print(f"Current task: {current_task}")

    # 模拟异步工作
    await asyncio.sleep(1)
    print("Coroutine finished")

async def main():
    await task()

asyncio.run(main())
  • asyncio.current_task(loop=None)
    返回事件循环所运行的未完成的 Task 对象的集合。如果 loop 为 None,则会使用 get_running_loop() 获取当前事件循环。
    示例:
Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
import asyncio


async def task(name):
    print(f"{name} started")
    await asyncio.sleep(2)
    print(f"{name} finished")


async def main():
    task1 = asyncio.create_task(task("Task 1"))
    task2 = asyncio.create_task(task("Task 2"))

    # 获取所有未完成的任务
    tasks = asyncio.all_tasks()
    print(f"All tasks: {tasks}")

    await task1
    await task2

asyncio.run(main())
  • asyncio.iscoroutine(obj) 检查对象 obj 是否是协程对象,如果是,则返回 True,否则返回 False。

示例:

Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
import asyncio

async def sample_coroutine():
    await asyncio.sleep(1)

# 检查是否为协程对象
coro = sample_coroutine()
print(asyncio.iscoroutine(coro))  # True

# 检查普通函数
def regular_function():
    pass

print(asyncio.iscoroutine(regular_function))  # False

2.2.13 Task 对象

class asyncio.Task(coro, *, loop=None, name=None, context=None, eager_start=False) 是 asyncio 的基本任务对象,用于包装并运行协程,它将协程转换为一个任务,允许协程在事件循环中异步执行。

参数:

  • coro: 必需参数,执行的协程对象;
  • loop: (可选) 指定任务所属的事件循环,若不传入,则使用默认事件循环;
  • name: (可选) 任务的名称,用于调试或日志记录,默认为 Task- 格式;
  • context: (可选) contextvars.Context 对象,控制任务执行时的上下文;
  • eager_start: (可选) 默认值为 False。当设置为 True 时,任务在创建时立即同步执行,直到遇到第一个 await 表达式,设置 eager_start 可以减少事件循环的调度开销,提高执行性能,适用于无需等待的短期任务;
    工作原理:eager_start=True 时,任务在创建时就开始执行协程代码,并在遇到第一个阻塞操作(如 await 表达式)时暂停,将任务放入事件循环继续调度。这对于短期任务可以提升性能,但对于长时间运行的任务应谨慎使用,以避免阻塞主线程;
    使用建议:eager_start=True 适合短时、同步完成的任务,避免事件循环开销,对于长时间运行的任务,建议保持 eager_start=False,将任务交由事件循环管理,以避免阻塞主线程;

支持函数

  • Task.cancel(): 取消任务的执行,调用后,任务会引发 asyncio.CancelledError 异常; task.cancel() # 尝试取消任务
  • Task.done(): 检查任务是否已完成,完成时返回 True,否则返回 False;

    Text Only
    1
    2
    if task.done():
        print("Task is complete.")
    
  • Task.result(): 获取任务执行过程中抛出异常,若任务未完成或未引发异常,则返回 None,用于检查任务是否在执行过程中遇到错误;

    Text Only
    1
    2
    if task.done() and task.exception():
        print(f"Task raised an exception: {task.exception()}")
    
  • Task.get_name(): 返回任务的名称,默认为 Task-,可以用于调试;

  • Task.set_name(name): 设置任务的名称,便于调试和日志记录;
  • Task.add_done_callback(callback): 为任务添加完成回调函数。任务完成后会自动调用该函数,并传递 task 实例作为参数。在任务结束后执行特定操作。

    Python
    1
    2
    3
    4
    def on_task_done(task):
        print("Task completed!")
    
    task.add_done_callback(on_task_done)
    
  • Task.cancelled(): 判断任务是否已被取消,任务被取消时返回 True,否则返回 False。

    Python
    1
    2
    if task.cancelled():
        print("Task was cancelled.")
    
  • Task.get_loop(): 获取当前任务所属的事件循环。

    Text Only
    1
    2
    loop = task.get_loop()
    print(f"Task loop: {loop}")
    

3 asyncio streams(流)

官方文档:https://docs.python.org/zh-cn/3.13/library/asyncio-stream.html#examples

asyncio 提供了一组用于处理网络流(streams)的高效异步 API,主要包括 StreamReader 和 StreamWriter,用于处理 TCP 或 UDP 等 I/O 流。asyncio streams API 能够帮助用户在事件循环中实现非阻塞的 I/O 操作,并简化网络应用的开发。

核心组件

  • asyncio.StreamReader:用于读取数据流(例如,从网络套接字接收数据);
  • asyncio.StreamWriter:用于写入数据流(例如,将数据发送到网络套接字);
  • asyncio.open_connection(host, port):用于创建 TCP 连接,返回一个 (reader, writer) 元组;

主要方法和用法

  • 创建一个 TCP 客户端连接
    coroutine asyncio.open_connection(host=None, port=None, *, limit=None, ssl=None, family=0, proto=0, flags=0, sock=None, local_addr=None, server_hostname=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None, happy_eyeballs_delay=None, interleave=None) 方法可用于创建一个 TCP 连接,返回一个包含 StreamReader 和 StreamWriter 的元组。
  • host: 目标主机地址;
  • port: 目标端口号;
  • limit: 确定返回的 StreamReader 实例使用的缓冲区大小限制,默认情况下,limit 设置为 64 KiB 。

    示例:

    Python
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    import asyncio
    
    async def tcp_echo_client(message):
        # 创建一个 TCP 连接,返回一个包含 StreamReader 和 StreamWriter 的元组
        reader, writer = await asyncio.open_connection(
            '127.0.0.1', 8888)
    
        print(f'Send: {message!r}')
        # 将字节数据写入流
        writer.write(message.encode())
        # 刷新写入的缓冲区,确保数据发送
        await writer.drain()
    
        # 从流中读取 n 个字节的数据。若 n 未指定,则会读取所有可用数据。
        data = await reader.read(100)
        print(f'Received: {data.decode()!r}')
    
        print('Close the connection')
        # 关闭流
        writer.close()
        # 等待流完全关闭
        await writer.wait_closed()
    
    asyncio.run(tcp_echo_client('Hello World!'))
    
  • 创建一个 TCP 服务器 coroutine asyncio.start_server(client_connected_cb, host=None, port=None, *, limit=None, family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE, sock=None, backlog=100, ssl=None, reuse_address=None, reuse_port=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None, start_serving=True) 方法用于启动一个 TCP 套接字服务器,当客户端连接时会调用回调函数 client_connected_cb。

    当一个新的客户端连接被建立时,回调函数 client_connected_cb 会被调用,该函数会接收到一对参数 (reader, writer) ,reader是类 StreamReader 的实例,而writer是类 StreamWriter 的实例。

  • client_connected_cb:一个回调函数,用于处理客户端连接,即可以是普通的可调用对象也可以是一个协程函数,如果它是一个协程函数,它将自动作为 Task 被调度。

  • host:服务器主机地址;
  • port:服务器端口号;
  • limit:确定返回的 StreamReader 实例使用的缓冲区大小限制,默认情况下,limit 设置为 64 KiB;

    示例:

    实现了一个简单的 TCP 服务器,监听在 127.0.0.1:8888 ,当客户端连接并发送数据时,服务器将读取数据,将其回显给客户端并关闭连接。

    Python
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    import asyncio
    
    async def handle_echo(reader, writer):
        """
        异步协程函数,每当有客户端连接到服务器时,就会创建一个新的任务来运行这个函数,该函数负责处理客户端的每个连接请求。
        :param reader: StreamReader 对象,用于从客户端读取数据
        :param writer: StreamWriter 对象,用于向客户端写数据
        :return:
        """
        # 从客户端读取最多 100 字节的数据
        data = await reader.read(100)
        # 将接收到的字节数据解码为字符串
        message = data.decode()
        # 获取客户端的地址信息
        addr = writer.get_extra_info('peername')
        # 接收到的消息和客户端地址打印到控制台
        # {!r}就是使用format语法时候的%r,因此,只需要关注 %r 就好
        # %r 表示的用 repr() 处理;类似于的%s表示用str()处理一样
        print(f"Received {message!r} from {addr!r}")
    
        print(f"Send: {message!r}")
        # 将收到的数据回写给客户端
        writer.write(data)
        # 确保数据已完全写入(刷新写缓冲区)
        await writer.drain()
    
        print("Close the connection")
        # 关闭连接,并调用 await writer.wait_closed() 等待连接完全关闭
        writer.close()
        await writer.wait_closed()
    
    async def main():
        """
        用于启动 TCP 服务器
        :return:
        """
        # 创建了一个 TCP 服务器,该服务器监听 127.0.0.1:8888 并调用 handle_echo 处理客户端连接
        # asyncio.start_server:异步地创建并启动服务器
        # handle_echo:指定每个客户端连接时的处理函数
        # '127.0.0.1' 和 8888:监听的地址和端口
        server = await asyncio.start_server(
            handle_echo, '127.0.0.1', 8888)
        # server.sockets 是服务器所使用的套接字列表,通过 getaddrinfo() 获取其地址并打印到控制台
        addrs = ', '.join(str(sock.getsockname()) for sock in server.sockets)
        print(f'Serving on {addrs}')
    
        # async with server 上下文管理器用于安全管理服务器生命周期
        async with server:
            # 将服务器保持运行,直到手动停止
            await server.serve_forever()
    # 是 Python 3.7 引入的便捷方法,自动创建事件循环并启动 main() 协程,在事件循环中,服务器会保持监听状态,准备处理连接请求。
    asyncio.run(main())
    
  • Unix 套接字
    coroutine asyncio.open_unix_connection(path=None, *, limit=None, ssl=None, sock=None, server_hostname=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None)
    建立一个 Unix 套接字连接并返回 (reader, writer) 这对返回值,与 open_connection() 相似,但是是在 Unix 套接字上的操作。

    coroutine asyncio.start_unix_server(client_connected_cb, path=None, *, limit=None, sock=None, backlog=100, ssl=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None, start_serving=True)
    启动一个 Unix 套接字服务,与 start_server() 相似,但是是在 Unix 套接字上的操作。

StreamReader 介绍

  • await reader.read(n):从 IO 流中读取 n 个字节的数据。若 n 未指定或设为 -1,则会读取所有可用数据,一直读取到 EOF,然后返回所读取的全部 byte,如果 n 为 0,则立即返回一个空 bytes 对象;
  • await reader.readline():读取一行数据,直到换行符,“行”指的是以 \n 结尾的字节序列;
  • await reader.readexactly(n):精确读取 n 个字节的数据,若未读取到则会引发 IncompleteReadError;
  • readuntil(separator=b'\n'):读取数据直至遇到 separator;

StreamWriter 介绍

  • writer.write(data):将字节数据写入流,此方法应当与 drain() 方法一起使用;
  • await writer.drain():刷新写入的缓冲区,确保数据发送;
  • await writelines(data): 将一个字节串列表(或任何可迭代对象)写入流;
  • writer.close():关闭流;
  • await writer.wait_closed():等待流完全关闭;
  • get_extra_info(name, default=None):访问可选的传输信息,详情参见 BaseTransport.get_extra_info()
  • start_tls(sslcontext, *, server_hostname=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None):将现有基于流的连接升级到 TLS;
  • is_closing: 如果流已被关闭或正在被关闭则返回 True;

示例

  • 示例一:

client.py:

Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
import asyncio

async def tcp_client():
    reader, writer = await asyncio.open_connection('127.0.0.1', 8888)
    print("Connection established.")

    writer.write(b'Hello, server!')
    await writer.drain()

    data = await reader.read(100)
    print(f'Received: {data.decode()}')

    writer.close()
    await writer.wait_closed()

asyncio.run(tcp_client())

server.py:

Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
import asyncio

async def handle_client(reader, writer):
    data = await reader.read(100)
    message = data.decode()
    print(f"Received {message} from client.")

    writer.write(b"Hello, client!")
    await writer.drain()

    writer.close()
    await writer.wait_closed()

async def tcp_server():
    server = await asyncio.start_server(handle_client, '127.0.0.1', 8888)
    print("Server running on 127.0.0.1:8888")

    async with server:
        await server.serve_forever()

asyncio.run(tcp_server())
  • 示例二:获取 HTTP 标头
    查询命令行传入 URL 的 HTTP 标头。
Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import asyncio
import urllib.parse
import sys

async def print_http_headers(url):
    url = urllib.parse.urlsplit(url)
    if url.scheme == 'https':
        reader, writer = await asyncio.open_connection(
            url.hostname, 443, ssl=True)
    else:
        reader, writer = await asyncio.open_connection(
            url.hostname, 80)

    query = (
        f"HEAD {url.path or '/'} HTTP/1.0\r\n"
        f"Host: {url.hostname}\r\n"
        f"\r\n"
    )

    writer.write(query.encode('latin-1'))
    while True:
        line = await reader.readline()
        if not line:
            break

        line = line.decode('latin1').rstrip()
        if line:
            print(f'HTTP header> {line}')

    # Ignore the body, close the socket
    writer.close()
    await writer.wait_closed()

url = sys.argv[1]
asyncio.run(print_http_headers(url))

用法:

Text Only
1
2
3
4
5
python example.py http://example.com/path/page.html

或使用 HTTPS:

python example.py https://example.com/path/page.html
  • 示例三:注册一个打开的套接字以等待使用流的数据
Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import asyncio
import socket

async def wait_for_data():
    # 获取一个指向当前事件循环的引用
    # 因为我们想要访问低层级的 API。
    loop = asyncio.get_running_loop()

    # 创建一对已接连的套接字。
    rsock, wsock = socket.socketpair()

    # 注册打开的套接字以等待数据。
    reader, writer = await asyncio.open_connection(sock=rsock)

    # 模拟从网络接收数据
    loop.call_soon(wsock.send, 'abc'.encode())

    # 等待数据
    data = await reader.read(100)

    # 获得数据,我们已完成:关闭套接字
    print("Received:", data.decode())
    writer.close()
    await writer.wait_closed()

    # 关闭第二个套接字
    wsock.close()

asyncio.run(wait_for_data())

4 同步原语

官方文档:https://docs.python.org/zh-cn/3.13/library/asyncio-sync.html

在 asyncio 中,同步原语主要用于管理协程之间的并发控制,以确保数据一致性、协调协程的执行顺序或对共享资源的访问。asyncio 提供了一些关键的同步原语,类似于多线程编程中的锁、事件和条件变量,但它们都是异步的,适合异步编程。

4.1 asyncio.Lock

异步锁,用于确保协程对共享资源的独占访问。锁只能被一个协程持有,其他试图获取锁的协程会被阻塞,直到锁被释放。

Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
import asyncio

async def worker(lock, name):
    async with lock:  # 加锁
        print(f'{name} 获得了锁')
        await asyncio.sleep(1)
        print(f'{name} 释放了锁')

async def main():
    lock = asyncio.Lock()
    await asyncio.gather(
        worker(lock, 'Worker 1'),
        worker(lock, 'Worker 2')
    )

asyncio.run(main())

# 输出:
# Worker 1 获得了锁
# Worker 1 释放了锁
# Worker 2 获得了锁
# Worker 2 释放了锁

这里,Worker 1 和 Worker 2 按顺序获得锁,确保不会同时访问资源。

4.2 asyncio.Event

异步事件,用于在协程之间发送信号,事件初始状态为未设置,协程可以等待事件,直到事件被其他协程设置。

Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
import asyncio

async def waiter(event):
    print('等待事件设置...')
    await event.wait()
    print('事件已设置!')

async def main():
    event = asyncio.Event()
    asyncio.create_task(waiter(event))
    await asyncio.sleep(2)  # 模拟一些其他操作
    print('设置事件')
    event.set()  # 设置事件

asyncio.run(main())

# 输出:
# 等待事件设置...
# 设置事件
# 事件已设置!

这里,waiter 协程在等待事件设置。当 main 函数调用 event.set() 时,waiter 继续执行。

4.3 asyncio.Condition

异步条件变量,可以与锁配合使用,以便在某些条件成立时唤醒等待的协程。

Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import asyncio

async def consumer(condition):
    async with condition:
        print('等待条件满足...')
        await condition.wait()  # 等待条件满足
        print('条件满足,开始消费数据')

async def producer(condition):
    await asyncio.sleep(1)  # 模拟其他操作
    async with condition:
        print('条件已满足,通知消费者')
        condition.notify()  # 通知等待的协程

async def main():
    condition = asyncio.Condition()
    await asyncio.gather(consumer(condition), producer(condition))

asyncio.run(main())

# 输出:
# 等待条件满足...
# 条件已满足,通知消费者
# 条件满足,开始消费数据

在这里,consumer 等待 condition 满足,直到 producer 调用 notify 以唤醒 consumer。

4.4 asyncio.Semaphore 和 asyncio.BoundedSemaphore

信号量,用于限制并发访问资源的协程数量。Semaphore 可以限制同时运行的协程数,而 BoundedSemaphore 会在超过最大值时抛出异常。

Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import asyncio

async def worker(sem, name):
    async with sem:# 信号量限制
        print(f'{name} 获得了访问权限')
        await asyncio.sleep(1)
        print(f'{name} 释放了访问权限')


async def main():
    # 同时允许最多 2 个协程
    semaphore = asyncio.Semaphore(2)
    await asyncio.gather(
        worker(semaphore, "Worker 1"),
        worker(semaphore, "Worker 2"),
        worker(semaphore, "Worker 3")
    )

asyncio.run(main())

# 输出:
# Worker 1 获得了访问权限
# Worker 2 获得了访问权限
# Worker 1 释放了访问权限
# Worker 2 释放了访问权限
# Worker 3 获得了访问权限
# Worker 3 释放了访问权限

这里,通过信号量 semaphore 限制最多两个协程可以同时访问共享资源。

4.5 asyncio.Barrier

Barrier 是一个同步原语,允许一组协程在特定同步点之前等待,直到所有协程都到达该点。

Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import asyncio

async def worker(barrier, name):
    print(f'{name} 到达屏障')
    await barrier.wait()  # 等待所有协程到达
    print(f'{name} 跨过屏障')

async def main():
    barrier = asyncio.Barrier(3)  # 设置屏障为3个协程
    await asyncio.gather(
        worker(barrier, 'Worker 1'),
        worker(barrier, 'Worker 2'),
        worker(barrier, 'Worker 3')
    )

asyncio.run(main())

# 输出:
# Worker 1 到达屏障
# Worker 2 到达屏障
# Worker 3 到达屏障
# Worker 3 跨过屏障
# Worker 1 跨过屏障
# Worker 2 跨过屏障

这里,所有协程在 await barrier.wait() 处等待,直到达到屏障的数量,然后一起继续。

5 子进程

在 Python 3.13 中,asyncio 提供了多个用于管理和操作子进程的工具,主要通过异步方式启动、通信和管理子进程,以便与异步任务无缝集成。子进程允许我们运行外部命令、执行脚本或利用多进程并行处理能力,同时仍然保持非阻塞的异步操作。

如下例子,演示如何用 asyncio 运行一个 shell 命令并获取其结果:

Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import asyncio

async def run(cmd):
    proc = await asyncio.create_subprocess_shell(
        cmd,
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE)

    stdout, stderr = await proc.communicate()

    print(f'[{cmd!r} exited with {proc.returncode}]')
    if stdout:
        print(f'[stdout]\n{stdout.decode()}')
    if stderr:
        print(f'[stderr]\n{stderr.decode()}')

asyncio.run(run('ls ./'))


# 输出:
# ['ls ./' exited with 0]
# [stdout]
# adapter_pattern_demo19.py
# aop_demo12.py

由于所有 asyncio 子进程函数都是异步的并且 asyncio 提供了许多工具用来配合这些函数使用,因此并行地执行和监视多个子进程十分容易。

修改上面例子同时运行多个命令:

Text Only
1
2
3
4
5
6
async def main():
    await asyncio.gather(
        run('ls ./'),
        run('sleep 1; echo "hello"'))

asyncio.run(main())

5.1 创建子进程

  • coroutine asyncio.create_subprocess_exec(program, *args, stdin=None, stdout=None, stderr=None, limit=None, **kwds)
    通过 exec 方法创建子进程,*args 是要执行的命令及其参数,使用这种方式传入能避免 shell 注入的安全问题。

参数说明
* program: 要执行的程序名称或路径,例如:'python3' 或其他可执行文件路径; * args: 可选参数,程序的其他命令行参数,例如:'-m'、'http.server'; * stdin: 可选,用于指定标准输入流的行为,可以设置为 asyncio.subprocess.PIPE(用于与子进程进行交互)、None(继承当前进程的标准输入)或 asyncio.subprocess.DEVNULL(不使用标准输入); * stdout: 可选,用于指定标准输出流的行为,可以设置为 asyncio.subprocess.PIPE(用于读取子进程输出)、None(继承当前进程的标准输出)或 asyncio.subprocess.DEVNULL(不使用标准输出); * stderr: 与 stdout 类似,控制标准错误流的行为; * limit: 指定流的缓冲区大小,适用于数据量较大的场景; * kwds: 其他关键字参数,这些参数将直接传递给 subprocess.Popen;

该方法返回一个 Process 对象,允许通过此对象与子进程进行通信、获取其状态以及等待其退出。

示例:
使用 asyncio.create_subprocess_exec 启动一个子进程,并异步地从标准输出读取数据。

Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import asyncio

async def run_subprocess():
    # 启动一个子进程并执行 "ls -l" 命令
    process = await asyncio.create_subprocess_exec(
        'ls', '-l','./test',
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE
    )

    # 异步读取标准输出和标准错误
    stdout, stderr = await process.communicate()

    print(f'[stdout]\n{stdout.decode()}')
    if stderr:
        print(f'[stderr]\n{stderr.decode()}')

    # 打印子进程的退出状态码
    print(f'Return code: {process.returncode}')

# 启动事件循环并执行子进程
asyncio.run(run_subprocess())

# 输出:
# [stdout]
# 总用量 0
# -rw-rw-r-- 1 bolean bolean 0 Nov 11 15:06 test.py
# 
# Return code: 0
  • 标准输入输出交互:设置 stdin=asyncio.subprocess.PIPE 后,可以使用 process.stdin.write() 和 await process.stdin.drain() 与子进程交互;类似地,使用 stdout=asyncio.subprocess.PIPE 后可以通过 await process.stdout.read() 读取子进程输出。
  • 错误处理:通过捕获异常来处理子进程的错误,例如,如果子进程失败,可以通过 process.returncode 来检查非零退出码。

  • coroutine asyncio.create_subprocess_shell(cmd, stdin=None, stdout=None, stderr=None, limit=None, **kwds)
    通过 shell 方法创建子进程,接受完整的命令字符串 cmd,可以直接运行像 ls -l | grep .py 这样的复杂命令。

参数说明
* cmd: 一个字符串,包含要执行的命令及其参数,例如:"ls -l | grep py"; * stdin: 控制标准输入流的行为,可以设置为 asyncio.subprocess.PIPE(用于与子进程进行输入输出交互),None(继承当前进程的标准输入)或 asyncio.subprocess.DEVNULL(不使用标准输入); * stdout: 控制标准输出流的行为,可以设置为 asyncio.subprocess.PIPE(用于读取子进程的输出),None(继承当前进程的标准输出)或 asyncio.subprocess.DEVNULL(不使用标准输出); * stderr: 与 stdout 类似,控制标准错误流的行为; * limit: 指定流的缓冲区大小,在处理大量数据时有用; * kwds: 其他关键字参数,直接传递给 subprocess.Popen;

返回一个 Process 对象,可以通过它与子进程进行通信,等待其完成或获取退出状态。
示例:

Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import asyncio

async def run_shell_command():
    # 启动一个子进程并执行命令
    process = await asyncio.create_subprocess_shell(
        'ls -l | grep test',  # 这是 shell 命令
        stdout=asyncio.subprocess.PIPE,  # 读取标准输出
        stderr=asyncio.subprocess.PIPE   # 读取标准错误
    )

    # 异步地获取标准输出和标准错误
    stdout, stderr = await process.communicate()

    # 输出结果
    if stdout:
        print(f'[stdout]\n{stdout.decode()}')
    if stderr:
        print(f'[stderr]\n{stderr.decode()}')

    # 打印子进程的退出状态码
    print(f'Return code: {process.returncode}')

# 启动事件循环并执行子进程
asyncio.run(run_shell_command())

# 输出:
# [stdout]
# drwxrwxr-x 2 bolean bolean  4096 Nov 11 15:06 test
# 
# Return code: 0

常量解释
asyncio.subprocess.PIPE:可以传递给 stdin, stdout 或 stderr 形参,如果 PIPE 传递给 stdin 参数,则 Process.stdin 属性将会指向一个 StreamWriter 实例;如果 PIPE 传递给 stdout 或 stderr 参数,则 Process.stdout 和 Process.stderr 属性将会指向 StreamReader 实例; asyncio.subprocess.STDOUT: 可以用作 stderr 参数的特殊值,表示标准错误被重定向到标准输出;
asyncio.subprocess.DEVNULL:可以用作 stdin, stdout 或 stderr 参数,将子进程的输入或输出流重定向到空设备(devnull),使得主进程忽略这些数据;

常量 用途 示例应用场景
asyncio.subprocess.PIPE 允许主进程与子进程通信 读取子进程的输出或向子进程发送输入
asyncio.subprocess.STDOUT 将子进程的标准错误合并到标准输出中 合并输出流,简化数据处理
asyncio.subprocess.DEVNULL 丢弃子进程的 I/O 数据 忽略子进程的所有输入和输出,比如执行任务后不关心结果的情况

5.2 更多示例

  • 示例一:
    使用 Process 类控制子进程并用 StreamReader 类从其标准输出读取信息。
Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import asyncio
import sys


async def get_date():
    code = 'import datetime; print(datetime.datetime.now())'

    # 创建子进程,重定向标准输出,至一个管道中
    # sys.executable:返回当前Python解释器的完整路径:executable = '/home/bolean/workspace/py-env/py3.12-dev-env/bin/python3.12'
    proc = await asyncio.create_subprocess_exec(
        sys.executable, '-c', code,
        stdout=asyncio.subprocess.PIPE)

    # 读取一行输出
    data = await proc.stdout.readline()
    # 删除 string 字符串末尾的指定字符,默认为空白符,包括空格、换行符、回车符、制表符
    line = data.decode('ascii').rstrip()

    # 等待子进程退出
    await proc.wait()
    return line


date = asyncio.run(get_date())
print(f"Current date: {date}")

# 输出:
# Current date: 2024-11-12 15:40:09.591518
  • 示例二:使用 asyncio.create_subprocess_exec 启动子进程
    使用 create_subprocess_exec 运行一个简单的命令并获取输出。
Text Only
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import asyncio

async def run_command():
    # 创建子进程运行 'echo Hello, World!'
    process = await asyncio.create_subprocess_exec(
        'echo', 'Hello, World!',
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE
    )

    # 等待结束,读取输出和错误输出
    stdout, stderr = await process.communicate()

    # 打印输出和错误输出
    print(f'[stdout] {stdout.decode().strip()}')
    if stderr:
        print(f'[stderr] {stderr.decode().strip()}')

    # 打印返回码
    print(f'Return code: {process.returncode}')

asyncio.run(run_command())

# 输出:
# [stdout] Hello, World!
# Return code: 0  
  • 示例三:使用 asyncio.create_subprocess_shell 启动带有管道的子进程
Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import asyncio

async def run_shell_command():
    # 运行带有管道的 shell 命令
    process = await asyncio.create_subprocess_shell(
        'echo "Hello, World!" | grep World',
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE
    )

    # 异步读取输出和错误输出
    stdout, stderr = await process.communicate()

    print(f'[stdout] {stdout.decode().strip()}')
    if stderr:
        print(f'[stderr] {stderr.decode().strip()}')

    print(f'Return code: {process.returncode}')

asyncio.run(run_shell_command())

# [stdout] Hello, World!
# Return code: 0
  • 示例四:与子进程进行异步交互
    向子进程发送输入,并异步读取它的输出。
Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import asyncio

async def interact_with_subprocess():
    # 启动 Python 解释器子进程
    process = await asyncio.create_subprocess_exec(
        'python3', '-i',  # 交互模式
        stdin=asyncio.subprocess.PIPE,
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE
    )

    # 向 Python 解释器发送命令
    process.stdin.write(b'print("Hello from child process")\n')
    await process.stdin.drain()  # 确保命令被发送

    # 读取子进程的输出
    stdout_line = await process.stdout.readline()
    print(f'[stdout] {stdout_line.decode().strip()}')

    # 关闭子进程的标准输入,并等待它完成
    process.stdin.close()
    await process.wait()
    print(f'Return code: {process.returncode}')

asyncio.run(interact_with_subprocess())

# 输出:
# [stdout] Hello from child process
# Return code: 0
  • 示例五:异步处理多个子进程 异步处理多个子进程,并等待所有子进程完成。
Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import asyncio

async def run_command(cmd):
    process = await asyncio.create_subprocess_shell(
        cmd,
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE
    )
    stdout, stderr = await process.communicate()
    return cmd, process.returncode, stdout.decode().strip(), stderr.decode().strip()

async def main():
    commands = [
        'echo "Hello, World!"',
        'sleep 1; echo "Task 1 completed"',
        'sleep 2; echo "Task 2 completed"'
    ]

    tasks = [run_command(cmd) for cmd in commands]
    results = await asyncio.gather(*tasks)

    for cmd, returncode, stdout, stderr in results:
        print(f'\n[Command: {cmd}]')
        print(f'Return code: {returncode}')
        print(f'[stdout] {stdout}')
        if stderr:
            print(f'[stderr] {stderr}')

asyncio.run(main())

# 输出:
# [Command: echo "Hello, World!"]
# Return code: 0
# [stdout] Hello, World!
#
# [Command: sleep 1; echo "Task 1 completed"]
# Return code: 0
# [stdout] Task 1 completed
#
# [Command: sleep 2; echo "Task 2 completed"]
# Return code: 0
# [stdout] Task 2 completed

6 更多接口说明

(1)事件循环
(2)Futures
(3)传输和协议
(4)策略