Skip to content

进程线程协程

进程(Process)、线程(Thread)、协程(Coroutine)

1 进程(Process)

进程是操作系统分配资源的基本单位,每个进程拥有独立的内存空间。一个程序运行后至少会有一个主进程,主进程可以派生出多个子进程。

参考:https://bbs.huaweicloud.com/blogs/289316

特点:

  • 独立性:每个进程有自己独立的内存空间,不会与其他进程共享数据;
  • 多核并行:可以充分利用多核 CPU 进行真正的并行计算;
  • 隔离性好:进程之间互相隔离,崩溃的进程不会影响其他进程;

缺点:

  • 创建开销大:进程创建的开销比线程和协程更大,因为需要分配独立的内存空间;
  • 进程间通信复杂:由于进程间不共享内存,需要通过 IPC (Inter-Process Communication) 进行通信,比如管道 (Pipe)、消息队列 ( Queue)、共享内存 (Shared Memory) 等;

使用场景:

  • CPU 密集型任务,如图像处理、视频编码等需要多核并行计算的任务;

代码示例:

Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
import multiprocessing


def worker(num):
    print(f'Worker: {num}')


if __name__ == '__main__':
    processes = []
    for i in range(5):
        p = multiprocessing.Process(target=worker, args=(i,))
        processes.append(p)
        p.start()

    for p in processes:
        p.join()

在 Python 3 中,multiprocessing 模块提供了丰富的 API 来进行进程管理和进程间通信。以下是常用的进程使用用例,包括进程池 ( Pool)、子进程 (Process) 以及进程间通信 (Queue, Pipe, Manager)。

官方 3.12.6:https://docs.python.org/zh-cn/3/library/multiprocessing.html

1.1 子进程 (Process)

在 Python 中,当你运行一个脚本时,该脚本的执行进程就是所谓的“主进程”。当主进程使用 multiprocessing 模块创建子进程时,子进程会作为主进程的子进程运行。这种父子关系意味着主进程负责管理和控制子进程的生命周期。

主进程与子进程的关系:

  • 主进程:这是运行脚本时操作系统分配的最初进程。主进程负责创建和管理子进程,并等待子进程完成或终止它们;
  • 等待子进程完成 (join()),终止子进程 (terminate()),也可以不等待子进程而继续执行其他任务(不使用 join())
  • 子进程:由主进程创建的进程,子进程在独立的内存空间中运行,并且可以执行与主进程不同的代码片段。子进程运行在自己的上下文中,有自己独立的进程 ID;

Process 对象配置参数:
在 multiprocessing 模块中,Process 类用于创建子进程。创建子进程时,可以通过以下参数配置子进程的行为:

  • target:这是子进程要执行的目标函数。在创建子进程时,target 参数指定函数的引用,子进程启动后将执行该函数的内容;
  • args:这是传递给目标函数的参数。它需要是一个元组,即使目标函数只有一个参数,也需要在参数后加一个逗号,例如 args=(i,);
  • kwargs:可以传递给目标函数的关键字参数,使用字典格式;
  • name:为子进程指定一个名字。默认情况下,进程会自动分配一个名字,如 Process-1;
  • daemon:设定是否为守护进程。守护进程会在主进程结束时自动终止。daemon=True 将进程设为守护进程;

子进程方法:

  • start():启动子进程,调用 start() 方法后,子进程会在后台运行,执行指定的 target 函数。
  • join():阻塞主进程,直到调用该方法的子进程结束。join() 方法常用于确保主进程等待所有子进程完成后再继续执行。join() 可以设置 timeout 参数来指定等待时间。
  • is_alive():检查子进程是否还在运行,返回 True 或 False。
  • terminate():立即终止子进程。这通常用于在异常情况下强制停止子进程。

示例说明:

以下是一个使用 multiprocessing 创建和管理子进程的简单示例,包含对上述方法的应用:

Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import multiprocessing
import os
import time


def worker(num):
    print(
        f'Worker {num} started, Process ID: {multiprocessing.current_process().pid}, Name:{multiprocessing.current_process().name}')
    time.sleep(2)
    print(f'Worker {num} finished')


if __name__ == '__main__':
    print(f'Main Process, Process ID:{os.getpid()}, Name:{multiprocessing.current_process().name}')
    num_processes = multiprocessing.cpu_count()  # 获取CPU核心数量
    processes = []
    for i in range(num_processes):
        # 创建子进程,每个子进程执行 worker 函数
        # p = multiprocessing.Process(target=worker, args=(i,), daemon=True)
        p = multiprocessing.Process(target=worker, args=(i,))
        processes.append(p)
        # 使用 p.start() 方法启动子进程,子进程开始在后台运行
        p.start()

    for p in processes:
        # p.join() 方法阻塞主进程,直到对应的子进程执行完毕。这确保了主进程在所有子进程结束后再继续运行后续代码。
        p.join()  # 主进程将在此处等待所有子进程完成
    # 所有子进程执行完后,主进程输出 "All processes finished"
    print('All processes finished')

输出:主进程将等待子进程结束后输出 “All processes finished”

Text Only
1
2
3
4
5
6
7
8
Main Process, Process ID:43716, Name:MainProcess
Worker 0 started, Process ID: 43717, Name:Process-1
Worker 1 started, Process ID: 43718, Name:Process-2
Worker 2 started, Process ID: 43719, Name:Process-3
Worker 0 finished
Worker 1 finished
Worker 2 finished
All processes finished

控制台进程查看:可以看到上述输出进程 PID 与终端输出一致,包括主进程 & 子进程总共运行 4 个进程。

Text Only
1
2
3
4
jpzhang     43716  0.4  0.0  23520 13488 ?        S    10:01   0:00 /home/jpzhang/workspace/py-env/py3.12-dev-env/bin/python3.12 /home/jpzhang/workspace/examples/python-tricks/src/process_demo20_01.py
jpzhang     43717  0.0  0.0  23520  9668 ?        S    10:01   0:00 /home/jpzhang/workspace/py-env/py3.12-dev-env/bin/python3.12 /home/jpzhang/workspace/examples/python-tricks/src/process_demo20_01.py
jpzhang     43718  0.0  0.0  23520  9668 ?        S    10:01   0:00 /home/jpzhang/workspace/py-env/py3.12-dev-env/bin/python3.12 /home/jpzhang/workspace/examples/python-tricks/src/process_demo20_01.py
jpzhang     43719  0.0  0.0  23520  9668 ?        S    10:01   0:00 /home/jpzhang/workspace/py-env/py3.12-dev-env/bin/python3.12 /home/jpzhang/workspace/examples/python-tricks/src/process_demo20_01.py

若注释 p.join() 相关代码,主进程将不会等待子进程结束,而是继续运行,输出类似如下:

Text Only
1
2
3
4
5
6
7
8
Main Process, Process ID:44231, Name:MainProcess
All processes finished
Worker 0 started, Process ID: 44232, Name:Process-1
Worker 1 started, Process ID: 44233, Name:Process-2
Worker 2 started, Process ID: 44234, Name:Process-3
Worker 0 finished
Worker 1 finished
Worker 2 finished

若将子进程设置为守护进程:multiprocessing.Process(target=worker, args=(i,), daemon=True) ,则主进程结束时子进程自动终止(前提是 "注释 p.join() 相关代码",避免主进程等待子进程结束)。

Text Only
1
2
3
4
5
Main Process, Process ID:44367, Name:MainProcess
Worker 0 started, Process ID: 44368, Name:Process-1
All processes finished

进程已结束,退出代码为 0

主进程结束,守护进程自动终止。

1.2 进程池 (Pool)

1.2.1 进程池 (multiprocessing.Pool)

在 Python 中,multiprocessing.Pool 提供了一种简单的方式来管理进程池,以并发地执行任务。进程池允许你预先创建一组工作进程,并通过这些进程来执行多个任务,避免频繁地创建和销毁进程所带来的开销。Pool 对象支持多种方法来分发任务,包括同步和异步方式。

示例:

Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
import multiprocessing
import os


def worker(num):
    print(f'Worker: {num}, PID: {os.getpid()}')
    return num * num


if __name__ == '__main__':
    # 创建一个进程池,最多允许 3 个进程同时执行
    with multiprocessing.Pool(processes=3) as pool:
        # map(worker, range(5)):将任务分配给进程池中的工作进程并行执行,map 方法返回一个列表,其中包含 worker 函数的返回值。
        results = pool.map(worker, range(5))
    print(results)

以下通过示例分别介绍 Pool 多种分发任务方式。

1.2.1.1 apply & apply_async

创建进程池:

Python
1
2
3
4
from multiprocessing import Pool

# 创建一个进程池,指定池中的进程数量
pool = Pool(processes=4)

【注意】processes 参数指定了进程池中的进程数。如果省略,默认会使用 os.cpu_count() 来确定。

  • apply 和 apply_async
  • apply(func, args=(), kwargs={}):同步执行指定的函数 func,并传入 args 和 kwargs。执行完毕后,返回函数的结果。这个方法类似于直接调用函数,但是在进程池中执行;
  • apply_async(func, args=(), kwargs={}, callback=None, error_callback=None):异步执行指定的函数 func。它立即返回一个 ApplyResult 对象,可以通过 get() 方法来获取结果。callback 是一个可选的回调函数,用于处理结果,error_callback 用于处理异常。

示例代码:

Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import multiprocessing
import time


def square(x):
    time.sleep(20)
    return x * x


# __name__ == '__main__' 确保只有在直接运行脚本时才会执行主程序块。这是 Python 中防止在 Windows 平台上重复创建进程的必要条件
if __name__ == '__main__':
    # 创建一个包含 4 个工作进程的进程池(Pool 对象),进程池将管理和分发任务到这些进程中。
    pool = multiprocessing.Pool(processes=4)

    # 同步调用:
    # apply 方法将任务 square(10) 分派给一个进程,主进程会等待任务完成后再继续执行
    # 由于 square 函数内部有 time.sleep(20),因此主进程会等待 20 秒
    # result 保存 square(10) 的返回值,也就是 100
    result = pool.apply(square, args=(10,))
    print(f'Synchronous result: {result}')

    # 异步调用:
    # apply_async 方法将任务 square(10) 分派给一个进程,但不会阻塞主进程,主进程可以继续执行后续代码
    # async_result 是一个 ApplyResult 对象,表示异步调用的返回值
    async_result = pool.apply_async(square, args=(10,))
    # async_result.get() 会阻塞主进程,直到异步任务完成并返回结果
    # 因为 square(10) 需要 20 秒来完成,所以主进程在这行代码上会等待任务完成并获取结果 100
    print(f'Asynchronous result: {async_result.get()}')

    # 调用 join 之前,先调用 close 函数,否则会出错。执行完 close 后不会有新的进程加入到 pool,join 函数等待所有子进程结束
    # 关闭进程池,表示不再接受新的任务:当进程池 close 的时候并未关闭进程池,只是会把状态改为不可再插入元素的状态
    pool.close()
    # 等待进程池中所有任务完成。在此之后,主进程才会继续执行后面的代码(如果有的话)
    pool.join()

# 输出:
# Synchronous result: 100
# Asynchronous result: 100

本示例,异步调用不会阻塞主进程,但在 get() 方法调用时还是会等待任务完成。如果希望在等待结果的同时执行其他任务,可以在调用 get() 之前执行更多的代码。

apply_async 与 apply 示例如下:

apply_async(func, args=(), kwargs={}, callback=None, error_callback=None):

Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import multiprocessing
import time


def func(msg):
    print("msg:", msg)
    time.sleep(3)
    print("End Process")


if __name__ == "__main__":
    pool = multiprocessing.Pool(processes=3)
    for i in range(4):
        msg = f"Hello {i}"
        pool.apply_async(func, (msg,))
    print("Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~")
    pool.close()
    pool.join()
    print("Sub-process(es) done.")
# 输出:
# Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~
# msg: Hello 0
# msg: Hello 1
# msg: Hello 2
# End Process
# msg: Hello 3
# End Process
# End Process
# End Process
# Sub-process(es) done.

apply_async 函数允许你传递 callback 和 error_callback 参数。callback 参数用于指定任务成功完成后的回调函数,而 error_callback 参数用于处理任务执行过程中出现的异常。下面是一个使用 apply_async 的 callback 和 error_callback 参数的示例。

Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import multiprocessing


def safe_divide(x, y):
    # 这个函数接收两个参数 x 和 y,并返回 x / y 的结果。如果 y 为 0,则抛出 ValueError 异常
    if y == 0:
        raise ValueError("Division by zero!")
    return x / y


def on_success(result):
    # 当 apply_async 成功完成任务时,on_success 回调函数会被调用,并接收任务的结果作为参数
    print(f'Success! Result: {result}')


def on_error(e):
    # 当 apply_async 在任务执行过程中遇到异常时,on_error 回调函数会被调用,并接收异常信息作为参数
    print(f'Error: {e}')


if __name__ == '__main__':
    pool = multiprocessing.Pool(processes=4)

    # 使用 apply_async 调用函数,传递成功回调和错误回调
    pool.apply_async(safe_divide, args=(10, 2), callback=on_success,
                     error_callback=on_error)  # 传递 (10, 2) 作为参数,任务成功后,on_success 会被调用
    pool.apply_async(safe_divide, args=(10, 0), callback=on_success,
                     error_callback=on_error)  # 传递 (10, 0) 作为参数,由于 y 为 0,safe_divide 函数会抛出异常,on_error 回调函数会被调用

    pool.close()
    pool.join()

    print("Main process finished.")

# 输出:
# Success! Result: 5.0
# Error: Division by zero!
# Main process finished.
  • callback:当任务成功完成时,callback 函数会被调用,并接收任务的返回值;
  • error_callback:当任务执行过程中出现异常时,error_callback 函数会被调用,并接收异常对象;

apply(func, args=(), kwargs={}):

Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import multiprocessing
import time


def func(msg):
    print("Msg:", msg)
    time.sleep(3)
    print("End Process")


if __name__ == "__main__":
    pool = multiprocessing.Pool(processes=3)
    for i in range(4):
        msg = f"Hello, {i}"
        # #维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去
        pool.apply(func, (msg,))
    print("Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~")
    pool.close()
    pool.join()  # 调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束
    print("Sub-process(es) done.")

# 输出:
# Msg: Hello, 0
# End Process
# Msg: Hello, 1
# End Process
# Msg: Hello, 2
# End Process
# Msg: Hello, 3
# End Process
# Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~
# Sub-process(es) done.

总结:

  • 同步调用:主进程在 apply 方法上会阻塞,直到任务完成,适用于不需要并行的简单任务;
  • 异步调用:主进程不会在 apply_async 方法上阻塞,而是继续执行后续代码,适用于需要并行处理的复杂任务;

1.2.1.2 map & map_async

  • map(func, iterable, chunksize=None):同步调用,将 iterable 中的每一个元素作为参数,依次传递给函数 func,以并行的方式计算,并返回结果列表。map 是阻塞的,即主进程会等待所有子进程完成。
  • func:要应用到每个元素的函数;
  • iterable:要迭代的对象,每个元素都会作为参数传递给 func;
  • chunksize(可选):将 iterable 切分为更小的块来分发给进程池中的进程,有助于优化性能;
  • map_async(func, iterable, chunksize=None, callback=None, error_callback=None):map 的异步版本。立即返回 ApplyResult 对象,结果可以通过 get() 获取。
  • func:要应用到每个元素的函数;
  • iterable:要迭代的对象;
  • chunksize(可选):将 iterable 切分为更小的块;
  • callback(可选):任务完成时的回调函数,该函数接受一个包含结果的列表作为参数;
  • error_callback(可选):任务失败时的回调函数,该函数接受一个异常对象作为参数;

示例代码:

Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import multiprocessing
import time


def square(param):
    time.sleep(3)
    print(f"My name is: {param['name']}")
    return param['mark'] * param['mark']


if __name__ == "__main__":
    pool = multiprocessing.Pool(processes=3)
    # 使用 map:同步调用,将 square 函数应用到列表 [{"name":"Stars","mark":1},{"name":"Active","mark":2},{"name":"Absurd","mark":3},{"name":"Fairy","mark":4}] 上
    results = pool.map(square,
                       [{"name": "Stars", "mark": 1},
                        {"name": "Active", "mark": 2},
                        {"name": "Absurd", "mark": 3},
                        {"name": "Fairy", "mark": 4}])
    print(f'Synchronous map result: {results}')  # 输出: [1, 4, 9, 16]

    # 使用 map_async:异步调用
    async_results = pool.map_async(square, [{"name": "Stars", "mark": 1},
                                            {"name": "Active", "mark": 2},
                                            {"name": "Absurd", "mark": 3},
                                            {"name": "Fairy", "mark": 4}])
    print(f'Asynchronous map result: {async_results.get()}')  # 输出: [1, 4, 9, 16]
    pool.close()
    pool.join()

# 输出:
# My name is: Stars
# My name is: Active
# My name is: Absurd
# My name is: Fairy
# Synchronous map result: [1, 4, 9, 16]
# My name is: Stars
# My name is: Active
# My name is: Absurd
# My name is: Fairy
# Asynchronous map result: [1, 4, 9, 16]
  • map:同步调用,square 函数接受参数 [{"name":"Stars","mark":1},{"name":"Active","mark":2},{"name":"Absurd","mark":3},{"name":"Fairy","mark":4}] ,返回 [1, 4, 9, 16];
  • map_async:异步调用,结果同样是 [1, 4, 9, 16],但可以通过 get() 方法获取结果;

map_async 的 callback 参数允许你指定一个回调函数,该函数会在所有异步任务完成后自动调用,并且会接收到任务的结果列表作为参数。下面是一个使用 map_async 的回调函数的示例。

Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import multiprocessing
import time


def square(x):
    # 模拟一个耗时操作
    time.sleep(3)
    return x * x


def collect_result(result):
    # 回调函数
    # 当 map_async 的所有任务完成后,collect_result 函数会自动被调用,并接收 map_async 的结果作为参数
    print(f"Results collected: {result}")


if __name__ == "__main__":
    pool = multiprocessing.Pool(processes=3)
    # 异步调用 map_async,传递回调函数 collect_result,当所有任务完成时,collect_result 回调函数会被调用,并打印出所有结果
    async_results = pool.map_async(square, [1, 2, 3, 4], callback=collect_result)
    # 在这里可以做其他的事情, 因为 map_async 是异步的,这个 print 可能在 collect_result 之前或之后执行
    print("Doing other work while waiting for results...")
    # 等待所有的子进程完成
    pool.close()
    pool.join()
    print("Main process finished.")

# 由于 map_async 是异步调用,print("Doing other work while waiting for results...") 可能会在回调函数执行前打印,说明主进程并不会因为等待 map_async 的结果而被阻塞。
# 例如,注释 square 函数: time.sleep(3), time.sleep(3) 添加到 async_results = pool.map_async() 之后,模拟主进程处理其他任务
# 输出:
# Doing other work while waiting for results...
# Results collected: [1, 4, 9, 16]
# Main process finished.

1.2.1.3 starmap & starmap_async

  • starmap(func, iterable, chunksize=None):同步调用,类似于 map,但 iterable 中的元素是元组,它们会被拆包并作为多个参数传递给函数 func;
  • starmap_async(func, iterable, chunksize=None, callback=None, error_callback=None):starmap 的异步版本。返回 ApplyResult 对象,结果可以通过 get() 方法获取;

starmap 示例

starmap 是同步的,阻塞主进程直到所有任务完成,它将每个参数元组解包,然后传递给目标函数。

Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
import multiprocessing


def add(x, y):
    return x + y


if __name__ == "__main__":
    pool = multiprocessing.Pool(processes=3)
    # starmap 将参数元组列表 (1, 2)、(3, 4) 和 (5, 6) 传递给 add 函数,返回 [3, 7, 11]
    results = pool.starmap(add, [(1, 2), (3, 4), (5, 6)])
    pool.close()
    pool.join()
    print(f'Synchronous starmap results: {results}')  # 输出:Synchronous starmap results: [3, 7, 11]

starmap_async 示例

Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import multiprocessing


def safe_divide(x, y):
    # 尝试除法运算,并在除数为零时抛出 ValueError
    if y == 0:
        raise ValueError("Division by zero!")
    return x / y


def on_success(results):
    # 当所有任务都成功完成时,这个回调函数会被调用,接收结果列表
    print(f'Success! Results: {results}')


def on_error(e):
    # 当任何任务发生异常时,错误回调函数会被调用,并接收异常对象
    print(f'Error: {e}')


if __name__ == '__main__':
    pool = multiprocessing.Pool(processes=4)

    # 使用 starmap_async 将参数元组 (10, 2)、(8, 4)、(5, 0) 和 (7, 1) 异步传递给 safe_divide 函数
    async_result = pool.starmap_async(safe_divide, [(10, 2), (8, 4), (5, 0), (7, 1)],
                                      callback=on_success,
                                      error_callback=on_error)

    pool.close()
    pool.join()

    print("Main process finished.")

# 输出:
# Error: Division by zero!
# Main process finished.

【重要提示】:callback 仅在所有任务成功完成时才会调用,而 error_callback 在任一任务失败时触发。因此,不能同时期望 callback 和 error_callback 在同一批任务中都被调用。

1.2.1.4 总结

  • 同步 vs 异步:
  • apply, map, starmap 是同步的,主进程会等待任务完成;
  • apply_async, map_async, starmap_async 是异步的,主进程可以继续其他任务;

  • 单个 vs 多个参数:

  • apply, apply_async 处理单个参数;
  • map, map_async 处理单个参数的 iterable;
  • starmap, starmap_async 处理多个参数的 iterable,每个元素是参数元组;

回调函数:

  • 异步方法可以接受 callback 和 error_callback,处理任务成功或失败后的操作;

1.2.2 进程池 (concurrent.futures.ProcessPoolExecutor)

TODO

1.3 进程间通信(Inter-Process Communication, IPC)

进程间通信(Inter-Process Communication, IPC)是指不同进程之间交换数据的机制。在 Python 的 multiprocessing 模块中,常用的 IPC 方式包括 Queue、Pipe 和 Manager,它们可以帮助不同进程之间安全、有效地传递数据。下面详细介绍它们的使用和特点。

1.3.1 队列 (Queue)

Queue 是一种线程和进程安全的队列,用于在线程和进程之间传递数据。它基于先进先出(FIFO)的原则工作,支持多个生产者和多个消费者。 在进程模式下,Python 提供了 multiprocessing.Queue,它是通过底层的管道(Pipe)和锁(Lock)机制实现的安全队列,可以在不同的进程间传递数据。

特点:

  • 线程(queue.Queue,下文介绍线程在展开说明)和进程(multiprocessing.Queue)安全:Queue 是线程和进程安全的,支持多生产者和多消费者模式;
  • 阻塞与非阻塞操作:Queue 的 get() 和 put() 方法可以设置为阻塞或非阻塞模式;
  • 容量限制:可以设置 Queue 的最大容量,默认无限制;

示例:

Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
from multiprocessing import Process, Queue
import time


# 定义一个函数,向队列中添加数据
def producer(queue):
    for i in range(10):
        item = f'Item {i}'
        print(f'生产者正在生产: {item}')
        queue.put(item)  # 将数据放入队列, 如果队列已满,将阻塞
        time.sleep(1)  # 模拟生产过程中的延迟


# 定义一个函数,从队列中取出数据
def consumer(queue):
    while True:
        item = queue.get()  # 从队列中获取数据
        if item is None:
            break  # 如果获取到 None,则退出循环
        print(f'消费者正在消费: {item}')
        time.sleep(2)  # 模拟消费过程中的延迟


if __name__ == '__main__':
    # 创建一个进程间通信且最大长度为 3 的队列
    queue = Queue(maxsize=3)

    # 创建生产者进程
    producer_process = Process(target=producer, args=(queue,))

    # 创建消费者进程
    consumer_process = Process(target=consumer, args=(queue,))

    # 启动进程
    producer_process.start()
    # 等待生产者产生数据,模拟队列阻塞
    time.sleep(5)
    consumer_process.start()

    # 等待生产者进程结束
    producer_process.join()

    # 生产结束后向队列中放入 None,用于通知消费者结束
    queue.put(None)

    # 等待消费者进程结束
    consumer_process.join()

    print('生产和消费过程完成。')

1). 代码说明

  • Queue 初始化:

    Python
    1
    queue = Queue()
    

Queue() 创建了一个共享的队列,可以在多个进程间传递数据。

  • 生产者函数 producer:

    Python
    1
    2
    3
    4
    5
    6
    def producer(queue):
        for i in range(5):
            item = f'Item {i}'
            print(f'生产者正在生产: {item}')
            queue.put(item)
            time.sleep(1)
    

该函数向队列中添加 5 个项目。每生产一个项目,程序会暂停 1 秒,以模拟生产过程中的延迟。

  • 消费者函数 consumer:

    Python
    1
    2
    3
    4
    5
    6
    7
    def consumer(queue):
        while True:
            item = queue.get()
            if item is None:
                break
            print(f'消费者正在消费: {item}')
            time.sleep(2)
    

该函数不断从队列中读取数据,并消费数据。读取数据后,程序暂停 2 秒以模拟消费过程中的延迟。如果读取到 None,则退出循环。

  • 进程启动和同步:

    Python
    1
    2
    3
    4
    5
    6
    producer_process.start()
    consumer_process.start()
    
    producer_process.join()
    queue.put(None)
    consumer_process.join()
    
  • start() 用于启动进程;

  • join() 等待进程完成;
  • queue.put(None) 传递一个 None 值,告诉消费者数据已经全部处理完毕,可以停止消费;
  • consumer_process.join() 放置在 queue.put(None) 之前则消费者进程(consumer_process)一直不会结束,一直在等待进程完成(进程完成需要队列中获取 None)。

  • 输出: 运行该代码时,生产者和消费者进程会并行工作,生产者生成的数据会被消费者消耗,直到所有数据都处理完毕。

  • 其他说明
  • 线程安全:multiprocessing.Queue 是线程和进程安全的,意味着在多线程或多进程环境下使用时,不需要额外的同步机制;
  • 数据序列化:Queue 会自动序列化和反序列化数据,所以可以传递任何可以被 pickle 模块序列化的数据类型;

2). 参数及方法说明

  • multiprocessing.Queue(maxsize)(设置队列长度)
    通过 multiprocessing.Queue(maxsize) 可以设置队列的最大长度。这个参数限制了队列中能存放的最大项数,防止队列无限制地增长导致内存耗尽。
  • maxsize: 用于指定队列的最大长度。默认值为 0,表示队列大小不受限制,允许存储无限数量的数据。当队列达到 maxsize 时,put() 操作将阻塞,直到队列有空间。

  • put(item, block=True, timeout=None)
    put() 方法将数据放入队列中。它支持阻塞模式,意味着如果队列已满,则可以等待直到队列有空余空间。

  • 参数:
    • item: 需要放入队列的数据;
    • block(默认值为 True):如果设置为 True,当队列已满时,该方法会阻塞,直到有空闲空间。如果为 False,则在队列满时会立即抛出 queue.Full 异常;
    • timeout(可选):等待队列有空闲空间的时间。如果超过这个时间还没有空闲空间,会抛出 queue.Full 异常;
  • 用法:

    Python
    1
    queue.put("Hello, world!", block=True, timeout=None)
    
  • 示例:

    Python
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    import multiprocessing
    import time
    
    def producer(queue):
        # 往队列产生5条数据
        for i in range(5):
            item = f"Putting item {i} in queue"
            print(item)
            # queue.put(item)
            # 队列阻塞,抛出 queue.Full 异常
            queue.put(item, block=False)
            # 模拟任务耗时
            time.sleep(1)
    
    if __name__ == "__main__":
        # 创建一个长度为 3 的队列
        q = multiprocessing.Queue(maxsize=3)
        p = multiprocessing.Process(target=producer, args=(q,))
        p.start()  # 启动子进程,子进程开始在后台运行
        p.join()  # 等待子进程结束
    
    # 输出:
    # Putting item 0 in queue
    # Putting item 1 in queue
    # Putting item 2 in queue
    # Putting item 3 in queue
    # Process Process-1:
    # Traceback (most recent call last):
    #   File "/usr/lib/python3.12/multiprocessing/process.py", line 314, in _bootstrap
    #     self.run()
    #   File "/usr/lib/python3.12/multiprocessing/process.py", line 108, in run
    #     self._target(*self._args, **self._kwargs)
    #   File "/home/bolean/workspace/examples/python-tricks/src/process_queue_demo20_04.py", line 61, in producer
    #     queue.put(item, block=False)
    #   File "/usr/lib/python3.12/multiprocessing/queues.py", line 90, in put
    #     raise Full
    # queue.Full
    

    队列阻塞,抛出 queue.Full 异常。

  • get(block=True, timeout=None)
    get() 方法从队列中取出数据。如果队列为空时,它可以阻塞进程,直到有数据可取。

  • 参数:
    • block(默认值为 True):如果设置为 True,当队列为空时,该方法会阻塞,直到有数据可获取。如果设置为 False,队列为空时会立即抛出 queue.Empty 异常;
    • timeout(可选):阻塞等待的最大时间。如果超过此时间仍然无法获取到数据,会抛出 queue.Empty 异常;
  • 用法:

    Python
    1
    item = queue.get()  # 阻塞模式,等待直到队列中有数据
    
  • 示例:

    Python
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    import multiprocessing
    import time
    
    
    def consumer(queue):
        while True:
            # item = queue.get()
            item = queue.get(block=False)  # 设置为 False,队列为空时会立即抛出 queue.Empty 异常
            if item is None:  # 当获取到 None 时,停止循环
                break
            print(f"Got item: {item}")
            # time.sleep(2)
    
    
    if __name__ == "__main__":
        q = multiprocessing.Queue(maxsize=3)
        p = multiprocessing.Process(target=consumer, args=(q,))
        p.start()
        for i in range(5):
            q.put(i)
            time.sleep(2)  # 模拟队列为空情况
        q.put(None)  # 向队列发送结束信号
        p.join()
    
    # 输出:
    # Got item: 0
    # Process Process-1:
    # Traceback (most recent call last):
    #   File "/usr/lib/python3.12/multiprocessing/process.py", line 314, in _bootstrap
    #     self.run()
    #   File "/usr/lib/python3.12/multiprocessing/process.py", line 108, in run
    #     self._target(*self._args, **self._kwargs)
    #   File "/home/bolean/workspace/examples/python-tricks/src/process_queue_demo20_04.py", line 99, in consumer
    #     item = queue.get(block=False)  # 设置为 False,队列为空时会立即抛出 queue.Empty 异常
    #            ^^^^^^^^^^^^^^^^^^^^^^
    #   File "/usr/lib/python3.12/multiprocessing/queues.py", line 116, in get
    #     raise Empty
    # _queue.Empty
    

    队列为空,抛出 queue.Empty 异常。

  • empty()
    empty() 方法,如果队列为空返回 True ,否则返回 False。需要注意的是,在多进程环境下,由于进程间的竞争,该方法的结果可能不完全可靠。

Python
1
2
if queue.empty():
    print("队列为空")

见 full() 示例。

  • full()
    full() 方法,如果队列已满返回 True ,否则返回 False。与 empty() 类似,在多进程环境下,这个方法的结果也可能不是完全可靠的。
Python
1
2
if queue.full():
    print("Queue is full")

示例:

Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import multiprocessing
import time

def producer(queue):
    for i in range(5):
        if queue.empty():
            print("队列为空!")
        if queue.full():
            print("队列已满!")
        print(i)
        queue.put(i)
        # time.sleep(2)

if __name__ == "__main__":
    q = multiprocessing.Queue(maxsize=3)
    p = multiprocessing.Process(target=producer, args=(q,))
    p.start()
    p.join()

# 输出:
# 队列为空!
# 0
# 队列为空!
# 1
# 2
# 队列已满!
# 3

# time.sleep(2)
# 暂停,则程序正常输出:
# 队列为空!
# 0
# 1
# 2
# 队列已满!
# 3

这里为什么会输出 2 次“队列为空!”?(注,环境差异,输出或有不同) * 初始队列为空: 当 producer 函数刚开始运行时,队列是空的,因此第一次调用 queue.empty() 时,输出“队列为空!”; * 队列为空的并发条件: 虽然在循环中不断向队列中添加元素,但 queue.empty() 是基于队列的当前状态检查是否为空。当调用 queue.empty() 时,如果这时 CPU 正好切换到另一个线程,导致元素还未真正放入队列,可能会再次检查到队列为空。因此有可能出现第二次输出“队列为空!”的情况;

#### 【扩展知识点】:queue.empty() 和 queue.full() 非可靠性

  • multiprocessing.Queue 的工作原理
    multiprocessing.Queue 是一个进程安全的队列,它支持在多个进程间安全地传递数据。通过 put() 方法往队列中插入元素,通过 get() 方法从队列中读取元素。队列是有状态的——它可能是空的,也可能是满的。
    但是,调用 queue.empty() 和 queue.full() 检查队列的状态时,它们并不一定是瞬时精确的,特别是在多进程环境下。这是因为进程调度器的运行时行为可能影响队列状态与执行流的同步。
  • 多进程环境中的竞争条件
    多进程中的竞争条件是指,多个进程对共享资源(如队列)进行读写操作时,可能发生时间竞争,导致状态检查与实际操作不同步。
    例如,在上述代码中,队列初始是空的,producer 函数在向队列中 put() 数据之前,先检查队列是否为空。如果队列为空,则输出 " 队列为空"。
    然而,由于 queue.empty() 检查队列的时间与队列实际写入数据之间存在时间差(微秒级),这时可能发生以下情况:
    • 第一次检查:队列刚创建时确实是空的,因此在第一次 queue.empty() 调用时,返回 True,并输出 "队列为空!";
    • 队列写入的时机:接下来,程序通过 queue.put(i) 将数据放入队列。然而,在向队列写入数据时,CPU 会对两个操作进行调度(检查队列是否为空与向队列写入数据)。操作顺序未必是严格按代码执行顺序完成的——例如,即便 queue.put() 已经被调用,数据可能还未完全写入队列,而是处于处理中或在系统缓冲区。这种情况是操作系统调度器或底层系统资源的延迟。
    • 第二次检查:在队列还未真正放入数据之前,再次调用 queue.empty() 时,队列可能仍然报告为空。尤其是在没有任何同步机制(如锁或条件变量)来确保这些操作的顺序时,这种现象更容易发生;
  • queue.empty()queue.full() 的局限性 根据 Python 文档,queue.empty() 和 queue.full() 并不是绝对可靠的检查函数,尤其在多进程或多线程环境中。这是因为这些方法仅仅是对队列状态的一个瞬时检查,而队列状态在检查后的时刻可能已经被另一个进程修改了。 两次“队列为空”的输出,很可能是因为以下两点:
    • 首次调用:在 put() 之前,队列确实为空;
    • 第二次调用:尽管在逻辑上应该向队列写入了数据,但由于多进程的调度机制或者 put() 操作的延迟,队列状态仍然被认为是空的(即使可能已经在写入数据的过程中);
  • 操作系统调度与 CPU 资源竞争
    在多进程环境中,操作系统调度器负责分配 CPU 时间给不同的进程。如果两个进程之间存在资源竞争(如共享队列),系统可能会在某个关键操作(如 put() 或 empty() 检查)之前或之后切换到其他进程;
    因此,程序的执行顺序可能不是你想象中的严格按照代码顺序执行的,而是受到系统调度器的影响;
  • 如何避免这种情况? 要避免这种竞争条件,可以使用如下方式:
    • 锁:使用 multiprocessing.Lock 来保证在一个进程修改队列时,其他进程不能同时访问队列;
    • 条件变量:使用 multiprocessing.Condition 来实现更复杂的同步机制,确保队列的状态检查和更新保持一致;
    • 信号量:使用 multiprocessing.Semaphore 来控制对共享资源的访问,确保某一时刻只有固定数量的进程可以操作队列;
  • 调整后的代码示例
    尝试添加锁来同步操作,确保队列的状态检查和写入操作不会出现竞争条件。

    Python
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    import multiprocessing
    import time
    
    
    def producer(queue, lock):
        for i in range(5):
            with lock:
                if queue.empty():
                    print("队列为空!")
                if queue.full():
                    print("队列已满!")
                queue.put(i)
                print(f"放入 {i} 到队列")
                time.sleep(0.1)  # 模拟一些延迟
    
    
    if __name__ == "__main__":
        lock = multiprocessing.Lock()
        q = multiprocessing.Queue(maxsize=3)
        p = multiprocessing.Process(target=producer, args=(q, lock))
        p.start()
        p.join()
    # 输出:
    # 队列为空!
    # 放入 0 到队列
    # 放入 1 到队列
    # 放入 2 到队列
    # 队列已满!
    

    在这个例子中,with lock: 确保了每次对队列的检查和修改都是同步进行的,从而避免竞争条件。这种方式可以确保程序按照预期行为执行,不会输出两次“队列为空!”。
    小结: * queue.empty() 和 queue.full() 的非可靠性:多进程环境中,队列状态检查不是瞬时精确的。 * 多进程调度和资源竞争:多进程程序的执行顺序并不严格按照代码顺序执行,系统调度器和资源竞争可能导致队列状态不一致。 * 避免竞争条件:可以通过使用锁或条件变量等同步机制,确保对队列的操作顺序正确。

  • qsize()
    qsize() 返回队列中当前未被获取(即 消费 .get())的数据项的数量。这个方法在 Unix 系统中可以正常使用,但在 Windows 上通常不可用(会抛出 NotImplementedError)。

Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
import multiprocessing


def producer(queue):
    for i in range(5):
        queue.put(i)
        print(f"Queue size: {queue.qsize()}")


if __name__ == "__main__":
    q = multiprocessing.Queue(maxsize=3)
    p = multiprocessing.Process(target=producer, args=(q,))
    p.start()
    p.join()

# 输出:
# Queue size: 1
# Queue size: 2
# Queue size: 3
  • close() 用于关闭队列,不会影响已经在队列中的数据处理。close() 的作用是防止再向队列中添加新的数据,但队列中已经存在的数据依然可以被读取和处理。
  • 关闭队列的作用:当调用 Queue.close() 后,队列将不允许再向其中添加新的数据(即不能再调用 put() 方法)。这个动作相当于通知队列 "不要再接受新任务";
  • 对已提交数据的处理:close() 并不会清空或影响已经在队列中的数据,这些数据依然会按照正常流程被消费者进程读取(通过 get() 方法)并处理。因此,队列关闭后,所有已提交的数据仍可以被继续处理直至队列为空;
  • 与 join() 配合:在关闭队列后,通常会使用 join() 方法来确保所有已提交的数据都被正确处理。join() 会等待队列中的所有数据都被处理完,之后程序才会继续执行;

示例:

Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import multiprocessing
import time


def worker(queue):
    while True:
        data = queue.get()
        if data is None:  # 检测到 None 作为结束信号,退出循环
            break
        print(f"Processed data: {data}")
        time.sleep(1)  # 模拟处理时间


if __name__ == "__main__":
    queue = multiprocessing.Queue()

    # 启动一个消费者进程
    process = multiprocessing.Process(target=worker, args=(queue,))
    process.start()

    # 向队列中放入数据
    for i in range(5):
        queue.put(i)

    # 将所有数据(包括 None 结束信号)放入队列,然后再调用 queue.close(),确保不会因关闭队列而无法放入数据。
    queue.put(None)

    # 关闭队列底层的通信管道
    queue.close()

    # 该位置放入结束信号无效,队列不允许再向其中添加新的数据,导致 process 进程一直等待结束信号,主进程未继续往下执行打印:“All tasks processed.”
    # queue.put(None)

    # 等待消费者进程处理完所有数据
    process.join()

    print("All tasks processed.")

# 输出:
# Processed data: 0
# Processed data: 1
# Processed data: 2
# Processed data: 3
# Processed data: 4
# All tasks processed.

假设 queue.put(None) 代码下移至 queue.close() 之后,则抛出:ValueError: Queue is closed 异常,消费者进程收不到 queue.put(None) 的结束信号,进程阻塞。 * 为什么不能在 queue.close() 之后调用 queue.put()? * queue.put() 是用来将数据放入队列的操作; * queue.close() 是关闭队列底层通信管道的操作。一旦队列关闭,任何尝试往队列中放数据的操作都会抛出 ValueError: Queue is closed 异常; * 子进程未退出的原因: * 子进程在 worker 函数中一直等待从队列中获取数据。如果没有收到 None 作为结束信号,它会一直阻塞,等待新的数据; * 当主进程调用 queue.close() 后再试图放入 None 时,抛出 ValueError,并且 None 没有成功放入队列,导致子进程永远等不到结束信号,进而阻塞; * 如果需要当主进程被终止时,守护进程(子进程)也被终止,可以设置进程:multiprocessing.Process(target=worker, args=( queue,), daemon=True),则主进程抛出 ValueError: Queue is closed 异常,相应的子进程自动被终止执行;

Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
import multiprocessing
import time

def worker(queue):
    while True:
        data = queue.get()
        if data is None:  # 检测到 None 作为结束信号,退出循环
            break
        print(f"Processed data: {data}")
        time.sleep(1)  # 模拟处理时间

if __name__ == "__main__":
    queue = multiprocessing.Queue()

    # 启动一个消费者进程
    process = multiprocessing.Process(target=worker, args=(queue,))
    process.start()

    # 向队列中放入数据
    for i in range(5):
        queue.put(i)

    queue.put(None)

    # 关闭队列底层的通信管道
    queue.close()

    # 等待消费者进程处理完所有数据
    process.join()

    print("All tasks processed.")

    queue.put("New")
    process2 = multiprocessing.Process(target=worker, args=(queue,))
    process2.start()
    queue.put(None)
    process2.join()

# 输出:
# Processed data: 0
# Processed data: 1
# Processed data: 2
# Processed data: 3
# Processed data: 4
# Traceback (most recent call last):
#   File "/home/bolean/workspace/examples/python-tricks/src/process_queue_demo20_04.py", line 321, in <module>
#     queue.put("New")
#   File "/usr/lib/python3.12/multiprocessing/queues.py", line 88, in put
#     raise ValueError(f"Queue {self!r} is closed")
# ValueError: Queue <multiprocessing.queues.Queue object at 0x7fef3ca1bb00> is closed
# All tasks processed.

注释 queue.close() 队列可接受新的数据,可由后面消费者进程处理。

#### 【扩展知识点】

  • process.join():这个方法会阻塞主进程,直到对应的子进程结束。当 join() 调用返回时,子进程已经终止;
  • 如果你在调用 process.join() 之后再试图调用 queue.put() 放入数据,子进程(消费者)已经不再运行,因此这些数据无法被处理或接收;

  • join_thread()cancel_join_thread()

  • 1). join_thread()

    • 功能: join_thread() 用于等待队列的后台线程结束。在队列的生命周期中,Python 维护着一个后台线程来管理进程之间的数据通信。如果你调用了 queue.close(),你可以显式调用 join_thread() 来确保所有数据都已经通过队列的底层通信管道被发送出去,避免数据丢失。
    • 适用场景:
    • 当你需要确保进程在退出之前,所有排入队列的数据都已经被处理完(例如,传输至消费者进程)时,可以使用 join_thread();
    • 适合用在你不希望丢失数据,确保所有任务都已经被完全处理的场景;
    • 使用要求:
    • 必须在 queue.close() 之后才能调用 join_thread(),否则会抛出异常;
    • 这将阻塞调用它的进程,直到后台线程完成所有队列数据的处理并终止;
    • 代码示例:
    Python
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
      import multiprocessing
      import time
    
    
      def worker(queue):
          while True:
              data = queue.get()
              if data is None:  # 检测到结束信号
                  break
              print(f"Processed: {data}")
              time.sleep(1)
    
    
      if __name__ == "__main__":
          queue = multiprocessing.Queue()
    
          process = multiprocessing.Process(target=worker, args=(queue,))
          process.start()
    
          # 放入数据
          for i in range(5):
              queue.put(i)
    
          # 放入 None 作为结束信号
          queue.put(None)
    
          # 关闭队列并等待后台线程结束
          queue.close()
          queue.join_thread()  # 等待队列处理完所有数据
    
          # 等待子进程退出
          process.join()
          print("All tasks processed.")
    
  • 2). cancel_join_thread()

    • 功能: cancel_join_thread() 用于取消 join_thread() 的阻塞行为,防止进程在退出时等待后台线程结束。这意味着当你调用 cancel_join_thread() 后,主进程会立即退出,而不会等待队列中的数据被完全处理。
    • 适用场景:
    • 当你需要立即结束进程,而不关心队列中尚未处理的数据,或者不在乎数据丢失时,可以使用 cancel_join_thread();
    • 通常用于极少数的紧急情况下,比如当你希望程序在某些条件下快速终止时;
    • 注意事项:
    • 使用 cancel_join_thread() 可能导致队列中尚未发送的数据丢失,因为它会跳过等待数据完全写入底层管道的过程;
    • 这个方法仅适用于你不关心数据丢失的情况,因此它在实际开发中不常见
    • 代码示例:为了展示 cancel_join_thread() 导致数据丢失的场景,主进程将不会等待队列中的数据被完全处理,会在放入一部分数据后立即退出,剩余数据会丢失。

    ```python import multiprocessing import time

    def worker(queue):
        while True:
            data = queue.get()
            if data is None:
                print("Received termination signal. Worker exiting.")
                break
            print(f"Worker processing: {data}")
            time.sleep(1)  # 模拟数据处理的时间
    
    if __name__ == "__main__":
        queue = multiprocessing.Queue()
    
        # 启动消费者进程
        process = multiprocessing.Process(target=worker, args=(queue,))
        process.start()
    
        # 向队列中放入数据
        for i in range(5):
            queue.put(i)
    
        # 放入结束信号 None
        queue.put(None)
    
        # 调用 cancel_join_thread() 取消等待后台线程处理队列数据
        queue.cancel_join_thread()
    
        # 关闭队列
        queue.close()
    
        # 模拟主进程提前退出
        time.sleep(1)  # 让部分数据来得及处理,主进程快速退出
    
        process.terminate()  # 强制终止子进程
    
        print("Main process exiting early.")
    
    # 输出:
    # Worker processing: 0
    # Main process exiting early.
    

    ````

    解释: * 在主进程中调用 cancel_join_thread() 后,主进程不再等待后台线程处理完所有数据,并在部分数据处理后强制终止子进程; * 由于主进程提前退出,消费者进程只处理了部分数据,并未收到结束信号 None; * 剩余数据未被处理,数据丢失;

    通过强制终止子进程 (process.terminate()),可以更明显地展示 cancel_join_thread() 可能导致的数据丢失场景。

  • 3). 两者的区别

    • join_thread():阻塞主进程,直到队列的后台线程处理完所有数据并退出。通常用于确保队列中的所有数据都被处理完毕,适合严谨的数据处理场景;
    • cancel_join_thread():允许主进程直接退出,不会等待后台线程处理队列中的数据。适用于无需等待数据处理完成或不担心数据丢失的场景;
  • 4). 重要性和使用建议
    • 何时使用 join_thread():
      当你有多个进程使用 multiprocessing.Queue 进行通信时,通常需要确保在进程退出前所有数据都已被发送或接收,此时使用 join_thread() 是一种可靠的方式来确保数据完整性。
    • 何时使用 cancel_join_thread():
      如果你需要在不等待后台线程的情况下立即退出进程,并且不担心队列中尚未处理的数据丢失,那么可以使用 cancel_join_thread()。但这种情况非常罕见,通常建议避免使用,除非你明确知道其影响。

1.3.2 管道 (Pipe)

在 Python 中,multiprocessing.Pipe 是用于进程间简单高效的通信工具。与 Queue 不同,Pipe 提供了一个单一的双向通信通道(duplex 参数配置),由两个连接点(端点)组成。每个端点可以用来发送或接收数据,两个进程可以通过 Pipe 进行相互通信。

  • 工作原理:
  • multiprocessing.Pipe() 返回一对连接对象 (conn1, conn2),每个连接对象都有 send() 和 recv() 方法,可以分别用于发送和接收数据;
  • 通常,Pipe 的两个连接对象分别在不同的进程中使用,一个用于发送数据,另一个用于接收数据;

  • 参数说明: multiprocessing.Pipe(duplex=True/False)
    duplex(默认值为 True):决定管道是否为双向通信。

  • True(默认):允许双向通信,两端都可以发送和接收数据;
  • False:管道为单向通信,意味着一端只能发送数据,另一端只能接收数据;

  • 行为:

  • send(obj):将 obj 发送到连接的另一端;
    • obj: 需要发送的对象,这个对象必须是可序列化的(可以通过 pickle 模块进行序列化);
  • recv():接收通过 send() 发送的对象。如果没有数据,会阻塞直到接收到数据;
  • close():关闭管道,禁止进一步的发送或接收操作。调用此方法后,尝试使用 send() 或 recv() 会引发异常;
  • poll([timeout]):检查是否有数据可供接收,如果有返回 True,否则返回 False;
    • timeout(可选):设置超时时间,等待管道中是否有数据可接收。如果不传递该参数,poll() 将立即返回 True 或 False。如果设置了超时时间(单位为秒),poll() 会阻塞指定的时间,直到有数据或超时;
  • 优缺:
  • 优点:
    • Pipe 提供了简单且高效的双向通信机制,非常适合轻量级的通信需求;
    • 在性能上,Pipe 通常比 Queue 更快,因为 Queue 基于底层的锁机制,而 Pipe 则是基于文件描述符的轻量机制;
  • 缺点:
    • Pipe 只允许两个进程之间通信,不像 Queue 那样适合多进程通信。如果需要多个进程之间进行通信,可以使用 Queue;
  • Pipe 与 Queue 的比较
  • Pipe 更适合双进程之间的快速通信,提供更轻量的通信机制;
  • Queue 适合在多个进程之间共享数据,但性能较低,因为它需要处理更多的并发控制和锁;
  • 使用 Pipe 的场景
  • 双向通信:两个进程之间需要相互通信,例如客户端-服务器模式;
  • 单向通信:只需要一个进程发送数据,另一个进程接收并处理数据,使用 duplex=False 的 Pipe 能减少复杂性和不必要的操作;
  • 示例:

示例 1:单向通信(duplex=False)

Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import multiprocessing


def worker(conn):
    # 子进程接收来自主进程的消息
    while True:
        msg = conn.recv()
        if msg == "END":
            print("Worker received termination signal.")
            break
        print(f"Worker received: {msg}")


if __name__ == "__main__":
    # 创建单向管道(第二个连接用来发送,第一个用来接收)
    recv_conn, send_conn = multiprocessing.Pipe(duplex=False)

    # 启动子进程
    process = multiprocessing.Process(target=worker, args=(recv_conn,))
    process.start()

    # 主进程发送消息
    send_conn.send("Hello Child")

    # 发送结束信号
    send_conn.send("END")
    process.join()

# 输出:
# Worker received: Hello Child
# Worker received termination signal.

在 multiprocessing.Pipe(duplex=False) 中,返回两个连接对象 (conn1 和 conn2),但在单向通信中,它们的作用是固定的:

  • conn1(recv_conn):用于接收数据;
  • conn2(send_conn):用于发送数据;

如果尝试用 send_conn.recv() 或 recv_conn.send(),将会导致错误,因为 duplex=False 限制了通信的方向。

示例 2:双向通信(默认:duplex=True)

Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import multiprocessing


def worker(conn):
    # 子进程接收来自主进程的消息
    msg = conn.recv()
    print(f"Worker received: {msg}")
    # 子进程发送响应消息
    conn.send("Message received by worker")
    # 关闭连接
    conn.close()
    # OSError: handle is closed
    # conn.send("Test")


if __name__ == "__main__":
    # 创建一个双向管道:默认:duplex=True
    conn1, conn2 = multiprocessing.Pipe(duplex=True)
    # 启动子进程
    process = multiprocessing.Process(target=worker, args=(conn2,))
    process.start()
    # 主进程发送消息
    conn1.send("Hello,World!")
    # 接受来自子进程的消息
    msg = conn1.recv()
    print(f"Parent received: {msg}")
    process.join()

# 输出:
# Worker received: Hello,World!
# Parent received: Message received by worker

双向管道允许主进程和子进程相互发送和接收消息,conn1 和 conn2 都可以用 send() 和 recv()。

示例 3:综合示例

Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import multiprocessing
import time


def worker(conn):
    # 子进程通过 conn 发送数据
    for i in range(5):
        conn.send(i)
        print(f"Worker sent: {i}")
        time.sleep(1)
    # 关闭连接
    conn.send(None)  # 发送结束信号
    conn.close()


if __name__ == "__main__":
    # 创建管道
    parent_conn, child_conn = multiprocessing.Pipe()

    # 启动子进程
    process = multiprocessing.Process(target=worker, args=(child_conn,))
    process.start()

    # 主进程从管道中接收数据
    while True:
        # 检查管道中是否有数据
        if parent_conn.poll(2):  # 最多等待2秒
            data = parent_conn.recv()
            if data is None:  # 如果收到结束信号,则退出循环
                print("Received termination signal. Exiting.")
                break
            print(f"Parent received: {data}")
        else:
            print("No data received within the timeout.")

    # 等待子进程结束
    process.join()

    print("All tasks processed.")

1.3.3 管理器(Manager)

管理器(Manager)提供了一种创建共享数据的方法,从而可以在不同进程中共享,甚至可以通过网络跨机器共享数据。管理器维护一个用于管理共享对象的服务。其他进程可以通过代理访问这些共享对象。

1.3.3.1 multiprocessing.Manager()

multiprocessing.Manager() 会返回一个已经启动的 SyncManager 对象,该对象提供各种方法来创建共享数据对象,如共享的列表、字典、队列等。Manager 使用进程间的代理模式:每个进程都能通过代理对象访问这些共享数据,但实际的数据保存在由 Manager 启动的独立进程中。
在 Manager 被垃圾回收或父进程退出时,管理器的进程也会立即终止。为了启动 Manager,需要调用 start() 方法。
multiprocessing.Manager 提供的共享数据类型(如 list、dict、Namespace、Lock、Queue 等)是进程安全的。它们通过管理器进程进行通信和同步,以确保多个进程对共享数据的操作不会发生竞争条件。所有由 Manager 创建的共享对象都是通过代理访问的,代理对象会自动管理并发访问,提供必要的锁定和同步机制。

Manager 提供了一些常用的共享数据结构,可以通过管理器实例的方法创建:

  • manager.list(): 共享的列表对象,允许多个进程同时访问和修改。操作时自动进行锁定,确保进程间的同步;
  • manager.dict(): 共享的字典对象,进程间可安全地读取和修改键值对;
  • manager.Queue(): 共享的队列,用于在进程之间传递消息或数据。底层实现了同步机制,可以在多个进程中安全地使用;
  • manager.Value() 和 manager.Array(): 共享的基本数据类型和数组,允许多个进程安全地读写数值和数组;
  • manager.Namespace(): 一个可以存储任意属性的对象,属性可以在进程间安全地共享和修改;
  • manager.Lock() 和 manager.RLock(): 提供进程间同步机制,确保某些资源的独占访问权;
  • manager.Event(): 用于跨进程的事件通知; 所有这些数据类型都是通过一个后台的 Manager 服务进程管理的,每个共享对象在不同进程中的操作都通过代理进行,因此可以确保进程安全。

为什么 multiprocessing.Manager 是进程安全的?
multiprocessing.Manager 提供的共享数据类型(如 list, dict, Namespace 等)通过代理对象的方式与底层的管理进程通信,因此,它们的操作(如 append() 或 update())会被同步到管理进程上。这种设计确保了多个进程并发访问这些共享对象时,不会发生低层次的竞争条件。

可能的竞争问题
虽然 Manager 提供了基础操作的同步,但复杂操作(多个操作组成的逻辑)依然会引发竞争关系。例如,如果两个进程同时尝试对同一个共享列表进行读取、修改、写入的组合操作(如检查值是否存在后再插入新值),仍然可能发生不一致的情况,因为 Manager 只保护单个操作是进程安全的,不能保证多个操作的原子性。

示例代码
以下代码展示了 multiprocessing.Manager 共享列表的基本使用,并且由于操作是单步的,因此不会出现竞争关系:

Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import multiprocessing


def worker(shared_list):
    # 每个进程向共享列表中添加数据
    for i in range(5):
        shared_list.append(i)


if __name__ == "__main__":
    # 创建 Manager 对象
    manager = multiprocessing.Manager()

    # 创建共享列表
    shared_list = manager.list()

    # 启动多个进程
    processes = []
    for _ in range(3):
        p = multiprocessing.Process(target=worker, args=(shared_list,))
        processes.append(p)
        p.start()

    # 等待所有进程结束
    for p in processes:
        p.join()

    # 打印共享列表的最终结果
    print(f"Shared list: {list(shared_list)}")

竞争关系的例子
假设有以下场景:先检查某个值是否在列表中,然后再执行一些操作。如果这两步操作没有作为一个整体进行同步,那么就会出现逻辑竞争:

Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import multiprocessing


def worker(shared_list, value):
    # 检查是否已经存在 value,若不存在则添加
    if value not in shared_list:
        shared_list.append(value)


if __name__ == "__main__":
    manager = multiprocessing.Manager()
    shared_list = manager.list()

    processes = []
    for i in range(3):
        p = multiprocessing.Process(target=worker, args=(shared_list, 10))
        processes.append(p)
        p.start()

    for p in processes:
        p.join()

    print(f"Shared list: {list(shared_list)}")

# 输出:Shared list: [10, 10]

在上述例子中,多个进程可能会同时检查 shared_list 中是否有 10,如果都发现没有,则它们都可能向列表中插入 10,导致逻辑竞争。

如何避免复杂逻辑中的竞争关系?

Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import multiprocessing


def worker(shared_list, value, lock):
    with lock:  # 使用锁保护复杂操作
        if value not in shared_list:
            shared_list.append(value)


if __name__ == "__main__":
    manager = multiprocessing.Manager()
    shared_list = manager.list()
    lock = multiprocessing.Lock()  # 创建锁

    processes = []
    for i in range(5):
        p = multiprocessing.Process(target=worker, args=(shared_list, 10, lock))
        processes.append(p)
        p.start()

    for p in processes:
        p.join()

    print(f"Shared list: {list(shared_list)}")

# 输出:Shared list: [10]

通过使用锁,可以确保每个进程在执行检查和插入操作时不会被其他进程打断,从而避免竞争关系。

  • 基础操作是安全的:multiprocessing.Manager 确保对共享对象(如列表、字典等)的基础操作(如 append() 或 setitem())是进程安全的。
  • 复杂操作需要锁:如果涉及多个步骤的复杂操作(如检查和修改),需要使用 Lock 等同步机制来避免逻辑上的竞争条件。
  • 合理使用锁:尽量缩小加锁的范围,以避免性能上的损失。

性能注意事项

虽然 multiprocessing.Manager 提供了方便的进程间安全机制,但由于其底层通过 IPC(进程间通信)机制进行同步,性能可能会比直接使用共享内存结构(如 multiprocessing.Array、multiprocessing.Value)稍慢。如果在高并发场景中性能是关键因素,可能需要使用更高效的原生数据共享类型。

以下是关于 multiprocessing.Manager 的使用示例:

  • 示例 1:共享字典和列表
    在这个例子中,多个进程可以同时修改共享的字典和列表,并且这些修改对其他进程可见。
Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import multiprocessing


def worker(shared_dict, shared_list, value):
    shared_dict[value] = value * 2
    shared_list.append(value)


if __name__ == "__main__":
    # 创建一个 Manager 对象
    manager = multiprocessing.Manager()

    # 创建共享的字典和列表
    shared_dict = manager.dict()
    shared_list = manager.list()

    # 启动多个子进程,每个进程向共享对象中添加不同的数据
    processes = []
    for i in range(5):
        p = multiprocessing.Process(target=worker, args=(shared_dict, shared_list, i))
        processes.append(p)
        p.start()

    # 等待所有进程结束
    for p in processes:
        p.join()

    # 打印最终的共享对象
    print(f"Shared Dictionary: {shared_dict}")
    print(f"Shared List: {shared_list}")

# 输出:
# Shared Dictionary: {1: 2, 2: 4, 3: 6, 0: 0, 4: 8}
# Shared List: [1, 2, 3, 0, 4]
  • 每个进程都修改了共享的字典 shared_dict 和共享的列表 shared_list;
  • Manager 确保了这些对象可以在进程间安全地共享;
  • 示例 2:使用 Namespace 在进程之间共享简单的变量
    Namespace 允许多个进程共享简单的命名空间变量,适合共享单一值而不是复杂的数据结构。
Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import multiprocessing

def worker(namespace, value):
    namespace.result += value  # 修改共享的命名空间变量

if __name__ == "__main__":
    # 创建 Manager 对象
    manager = multiprocessing.Manager()

    # 创建一个共享的命名空间对象
    namespace = manager.Namespace()
    namespace.result = 0  # 初始化共享变量

    # 启动多个进程,每个进程修改命名空间中的共享变量
    processes = []
    for i in range(5):
        p = multiprocessing.Process(target=worker, args=(namespace, i))
        processes.append(p)
        p.start()

    # 等待所有进程结束
    for p in processes:
        p.join()

    # 打印命名空间中的最终结果
    print(f"Namespace result: {namespace.result}")

# 输出:
# Namespace result: 6

上述用例,可以看到 result: 6 并不是准确的累加值,同时输出结果也不固定一致。

原因: += 是非原子操作,导致竞争问题,最终结果可能不正确。 += 实际上涉及多个步骤: * 读取当前值; * 进行加法操作; * 将新值写回变量;
在多进程环境中,如果两个进程同时执行 += 操作,它们可能都读取相同的旧值,在各自的进程中计算出新值,并将其写回,从而导致最终的结果没有累加所有进程的加法操作。
举个例子: 多个进程可能会按照以下步骤同时操作同一个变量 namespace.result: * 进程 A 读取 namespace.result(值为 0); * 进程 B 读取 namespace.result(值为 0); * 进程 A 将 namespace.result 修改为 0 + 1 = 1; * 进程 B 将 namespace.result 修改为 0 + 2 = 2;
最终结果是 namespace.result == 2,而实际上我们希望它等于 3(即所有操作都能累加上去)。这种情况就是竞争条件,并非 Namespace 自身的问题,而是并发环境中多个进程同时操作共享资源所导致的问题。

锁:
锁 (Lock) 是用来防止这种竞争条件的。锁能确保只有一个进程能够访问共享变量并执行操作,其他进程必须等待这个进程释放锁后才能访问共享变量。这可以避免多个进程同时读取和修改共享变量的情况,从而保证操作的原子性。
修改后代码:

Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import multiprocessing


def worker(namespace, value, lock):
    with lock:  # 使用锁确保对命名空间变量的修改是原子操作
        namespace.result += value  # 修改共享的命名空间变量


if __name__ == "__main__":
    # 创建 Manager 对象
    manager = multiprocessing.Manager()

    # 创建一个共享的命名空间对象
    namespace = manager.Namespace()
    namespace.result = 0  # 初始化共享变量

    # 创建一个进程锁
    lock = multiprocessing.Lock()

    # 启动多个进程,每个进程修改命名空间中的共享变量
    processes = []
    for i in range(5):
        p = multiprocessing.Process(target=worker, args=(namespace, i, lock))
        processes.append(p)
        p.start()

    # 等待所有进程结束
    for p in processes:
        p.join()

    # 打印命名空间中的最终结果
    print(f"Namespace result: {namespace.result}")

# 输出:
# Namespace result: 10
  • multiprocessing.Manager() 确保多个进程可以安全地共享对象,但并不保证多个进程同时修改对象时的操作原子性。
  • 锁 (Lock) 确保每次对共享对象的修改是原子的,即一个进程的修改在另一个进程开始修改之前完成。这样可以防止数据被并发修改时产生的不一致问题。

【扩展知识点】
竞争条件(Race Condition)是指在并发或多线程、多进程程序中,多个线程或进程对共享资源的访问或修改顺序不确定,导致程序的结果不一致或不可预期的情况。
当多个进程或线程并发地操作同一个共享资源(如变量、文件或内存区域)时,如果它们的执行顺序或操作顺序没有得到适当的同步控制,结果可能会因为不同的操作顺序而产生不同的结果。竞争条件是并发编程中常见的问题,特别是在没有使用适当的锁或同步机制的情况下。

竞争条件的发生条件: * 多个进程或线程:至少两个以上的线程或进程同时运行; * 共享资源:这些线程或进程必须访问或修改同一个共享的资源(如变量、数据结构、文件等); * 无同步机制:没有适当的锁或同步机制来控制对共享资源的访问顺序;

举个例子:
假设我们有两个线程 T1 和 T2,它们都想修改一个共享的变量 x,初始值为 0:

```
# 初始值
x = 0

def increment():
    global x
    x += 1

```

现在,线程 T1 和 T2 同时执行 increment(),理想情况下,x 应该最终等于 2,因为两个线程各自将 x 增加了 1。然而,由于竞争条件,可能发生如下情况: * 线程 T1 读取 x 的值(此时 x = 0); * 线程 T2 也读取 x 的值(x 仍然是 0); * 线程 T1 将 x 修改为 1; * 线程 T2 将 x 也修改为 1(覆盖了线程 T1 的修改);

最终,x 的值是 1,而不是预期的 2。这就是典型的竞争条件:多个线程同时访问共享资源,且它们的操作顺序没有得到适当的控制。

如何避免竞争条件
为了避免竞争条件,通常需要使用一些同步机制来控制多个进程或线程对共享资源的访问。例如: * 锁(Lock):确保每次只有一个线程或进程能够访问共享资源,其他线程或进程必须等待锁被释放; * 信号量(Semaphore):控制多个线程对资源的访问,可以允许多个线程同时访问某个资源,但限制同时访问的数量; * 条件变量(Condition Variable):允许线程在等待某个条件时释放锁,避免资源死锁;

竞争条件的危害 * 数据不一致:竞争条件会导致数据的不可预测性,最终的数据状态可能与预期不符,造成错误的计算结果; * 难以调试:由于竞争条件往往是依赖于执行顺序的随机性,这使得调试和发现问题变得困难,问题可能只在特定的运行条件下才会显现;

竞争条件是在并发环境下,由于多个进程或线程争用共享资源并且没有适当的同步机制,导致程序结果不确定或不一致的现象。为了避免竞争条件,需要使用适当的同步机制来确保共享资源的访问顺序是可控的。

  • 示例 3:使用 Queue 在进程之间通信

multiprocessing.Manager.Queue() 是 multiprocessing.Queue() 的一种变体,它通过 Manager 机制创建一个共享队列,可以在不同的进程甚至网络中的机器间共享。它与标准的 multiprocessing.Queue() 一样是进程间通信的工具,允许将数据传递到不同的进程中。然而,Manager.Queue() 是通过 SyncManager 代理来管理的,这使得它不仅可以用于本地进程间通信,还可以通过网络实现远程进程间通信。

特点: * 进程安全:Manager.Queue() 是进程安全的,类似于 multiprocessing.Queue(),但它通过管理器代理实现,所以更加通用,可以跨机器使用; * 网络共享:Manager.Queue() 可以在不同机器间通过网络共享数据,这点区别于标准的 multiprocessing.Queue(); * 竞争关系:Manager.Queue() 通过代理机制来管理数据操作,类似于 list.append()、dict.update() 之类的操作,管理器确保操作的进程安全性,因此能够避免竞争条件;

方法与属性: * put(item):将 item 放入队列中; * get():从队列中取出一个项(如果队列为空,会阻塞直到有数据为止); * qsize():返回队列中的项目数(有时无法准确保证,尤其在跨机器使用时); * empty():检查队列是否为空; * full():检查队列是否已满; * put_nowait(item):非阻塞地将 item 放入队列中,如果队列已满,则抛出 queue.Full 异常; * get_nowait():非阻塞地从队列中取出一项,如果队列为空,则抛出 queue.Empty 异常; * close():关闭队列,防止进一步的 put 操作;

Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
import multiprocessing
import time


def producer(queue, items):
    for item in items:
        print(f"Producer adding: {item}")
        queue.put(item)
        time.sleep(1)


def consumer(queue):
    while True:
        item = queue.get()
        if item is None:  # 检测到 None 作为结束信号
            break
        print(f"Consumer processing: {item}")


if __name__ == "__main__":
    manager = multiprocessing.Manager()

    # 创建共享的队列
    queue = manager.Queue()

    # 启动生产者进程
    items_to_produce = [1, 2, 3, 4, 5]
    producer_process = multiprocessing.Process(target=producer, args=(queue, items_to_produce))

    # 启动消费者进程
    consumer_process = multiprocessing.Process(target=consumer, args=(queue,))

    # 启动生产者和消费者
    producer_process.start()
    consumer_process.start()

    # 等待生产者完成
    producer_process.join()

    # 发送结束信号给消费者
    queue.put(None)

    # 等待消费者完成
    consumer_process.join()

    print("All tasks processed.")

# 输出:
# Producer adding: 1
# Consumer processing: 1
# Producer adding: 2
# Consumer processing: 2
# Producer adding: 3
# Consumer processing: 3
# Producer adding: 4
# Consumer processing: 4
# Producer adding: 5
# Consumer processing: 5
# All tasks processed.
  • 生产者进程 不断向队列中放入数据;
  • 消费者进程 从队列中读取数据并处理,直到检测到 None 作为结束信号;
  • Manager.Queue() 保证了进程间队列通信的安全性;

竞争关系的说明:
由于 Manager.Queue() 通过代理对象来管理队列,它的 put() 和 get() 操作是原子的,因此在多个进程同时执行时不会出现竞争条件。这一点与 Namespace 中的非原子操作(如 +=)不同。
举个例子,如果两个进程同时调用 queue.put(),代理对象会确保每次操作完整进行,因此数据不会丢失或混乱。类似地,queue.get() 也是进程安全的,如果两个进程同时从队列中取数据,代理对象会保证每个进程获取的数据都是唯一的。

进程安全锁的使用示例:
尽管 Manager.Queue() 本身是进程安全的,但有时候我们可能需要对队列外的操作进行锁定,避免多个进程同时修改非队列的共享数据。

Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import multiprocessing


def worker(queue, value, lock):
    with lock:  # 使用锁保护
        queue.put(value)  # 向队列中添加数据
        print(f"Worker {value} put data in queue.")


if __name__ == "__main__":
    manager = multiprocessing.Manager()
    queue = manager.Queue()  # 创建一个共享队列
    lock = multiprocessing.Lock()  # 创建锁

    processes = []

    # 启动多个进程,向队列中放入数据
    for i in range(5):
        p = multiprocessing.Process(target=worker, args=(queue, i, lock))
        processes.append(p)
        p.start()

    # 等待所有进程结束
    for p in processes:
        p.join()

    # 从队列中取出所有数据
    while not queue.empty():
        data = queue.get()
        print(f"Main process got data: {data}")

# 输出:
# Worker 0 put data in queue.
# Worker 1 put data in queue.
# Worker 2 put data in queue.
# Worker 3 put data in queue.
# Worker 4 put data in queue.
# Main process got data: 0
# Main process got data: 1
# Main process got data: 2
# Main process got data: 3
# Main process got data: 4
  • 示例 4:使用 Lock & RLock 进程间同步和控制访问
    通过 manager.Lock() 和 manager.RLock() 来实现进程间的同步和控制访问。
  • manager.Lock()
    manager.Lock() 是用于进程间同步的标准互斥锁,它确保只有一个进程在同一时刻可以访问共享资源。其他进程在尝试获取该锁时,如果锁已被持有,它们将被阻塞,直到锁被释放。

    常见方法: * acquire(block=True, timeout=None):获取锁,如果 block 为 True(默认),则阻塞直到锁被释放;否则立即返回。如果设置了 timeout,则最多等待 timeout 秒; * release():释放锁,使其他等待的进程可以获取该锁;

    Lock 示例:

    Python
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    import multiprocessing
    
    def worker(lock, shared_list, value):
        with lock:  # 使用锁来同步对共享资源的访问
            shared_list.append(value)
            print(f"Process {value} added to list.")
    
    if __name__ == "__main__":
        manager = multiprocessing.Manager()
        lock = manager.Lock()  # 创建一个 Manager 提供的锁
        shared_list = manager.list()  # 创建一个共享列表
    
        processes = []
        for i in range(5):
            p = multiprocessing.Process(target=worker, args=(lock, shared_list, i))
            processes.append(p)
            p.start()
    
        for p in processes:
            p.join()
    
        print("Final list:", list(shared_list))
    
    # 输出:
    # Process 0 added to list.
    # Process 1 added to list.
    # Process 2 added to list.
    # Process 3 added to list.
    # Process 4 added to list.
    # Final list: [0, 1, 2, 3, 4]
    

    在这个例子中,每个进程在访问共享列表之前都要先获取锁,从而避免多个进程同时访问列表并导致竞态条件。只有一个进程在某个时刻可以修改共享列表。

  • manager.RLock()
    manager.RLock() 是可重入锁,它允许同一个进程多次获取锁而不会发生死锁。与 Lock() 不同,RLock() 允许持有锁的线程或进程再次获取锁,而不会导致自己阻塞,直到调用相同次数的 release() 以释放锁。

    常见方法: * acquire(block=True, timeout=None):与 Lock 类似,但允许同一进程多次获取锁; * release():与 Lock 类似,需要与获取锁的次数一致,调用多少次 acquire() 就需要相同次数的 release() 才能完全释放锁;

    Lock 示例:

    Python
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
        import multiprocessing
    
    
    def worker(rlock, shared_dict, key, value):
        with rlock:  # 获取可重入锁
            if key not in shared_dict:
                print(f"Adding {key} to shared dict.")
                shared_dict[key] = value
            with rlock:  # 再次获取锁,测试重入
                print(f"Updating {key} to {value * 2}.")
                shared_dict[key] = value * 2
    
    
    if __name__ == "__main__":
        manager = multiprocessing.Manager()
        rlock = manager.RLock()  # 创建一个 Manager 提供的可重入锁
        shared_dict = manager.dict()  # 创建一个共享字典
    
        processes = []
        for i in range(5):
            p = multiprocessing.Process(target=worker, args=(rlock, shared_dict, i, i * 10))
            processes.append(p)
            p.start()
    
        for p in processes:
            p.join()
    
        print("Final dict:", dict(shared_dict))
    
    
    # 输出:
    # Adding 1 to shared dict.
    # Updating 1 to 
    # Adding 0 to shared dict.
    # Updating 0 to 0.
    # Adding 2 to shared dict.
    # Updating 2 to 40.
    # Adding 3 to shared dict.
    # Updating 3 to 60.
    # Adding 4 to shared dict.
    # Updating 4 to 80.
    # Final dict: {1: 20, 0: 0, 2: 40, 3: 60, 4: 80}
    

    在这个例子中,每个进程可以多次获取 RLock() 锁而不会发生死锁。在第一次获取锁后,进程会尝试再次获取锁,确保同一个进程可以重入。

  • Lock 和 RLock 的区别:

    • Lock 是普通的锁,只有一个进程或线程可以持有,其他试图获取锁的进程会阻塞。任何进程只能调用一次 acquire(),然后必须调用一次 release() 才能释放锁;
    • RLock 是可重入锁,同一进程或线程可以多次获取该锁,但需要调用相同次数的 release() 才能完全释放锁。这对于递归函数或需要多次调用共享资源的情况下非常有用;
  • 竞争关系与进程同步:
    在使用 Lock 或 RLock 时,锁的目的是防止多个进程同时访问和修改共享资源,从而避免出现竞争条件(例如多个进程同时修改一个变量的值,导致结果不正确)。

    • manager.Lock() 确保在某一时刻只有一个进程能够修改共享对象。这对于简单的修改操作非常有用,比如向共享列表中添加元素;
    • manager.RLock() 更加灵活,适用于需要递归调用或同一进程多次锁定资源的情况;
  • 进程同步的注意事项:
    在进程间同步时,锁的使用能够保证数据的一致性,但是也要注意锁的使用可能会影响性能,尤其是在大量进程同时等待锁时。因此,应尽量将锁的作用范围控制在最小的代码区域中,减少锁的持有时间。

  • 【扩展知识点】
    在 Python 中,Lock 和 RLock 都可以使用 with 语句来管理锁的获取和释放。这是通过上下文管理器(Context Manager)来实现的。with 语句在处理锁时,自动处理了锁的 acquire() 和 release(),使得代码更加简洁和安全。具体来说:

    • with lock: 是上下文管理器的一部分,进入 with 语句块时,自动调用 lock.acquire();
    • 当退出 with 语句块时,无论是正常退出还是发生异常,都会自动调用 lock.release();

    手动调用 acquire() 和 release():

    通常,使用锁的标准做法是手动调用 acquire() 来获取锁,并在完成操作后手动调用 release() 来释放锁:

    Text Only
    1
    2
    3
    4
    5
    lock.acquire()
    try:
    # 对共享资源进行操作
    finally:
        lock.release()  # 确保锁在任何情况下都能释放
    

    手动调用的风险在于,如果程序出现异常而没有执行 release(),锁将一直被持有,导致其他进程无法获取该锁,从而可能导致死锁或程序无法继续。

    使用 with 管理锁
    with 语句相当于为 acquire() 和 release() 创建了一种自动化的机制,使得即使在出现异常时,锁也能够被正确释放。它让代码更加简洁和易于维护。等价的代码如下:

    Text Only
    1
    2
    with lock:
    # 对共享资源进行操作
    

    当进入 with 代码块时,lock.acquire() 被调用,执行代码块中的操作。当离开 with 代码块时,无论是正常离开还是异常离开,都会调用 lock.release(),确保锁被释放。 * 简洁性:减少代码量,不需要手动调用 acquire() 和 release(),且代码更加直观。 * 安全性:避免忘记释放锁的风险,即使出现异常也能确保锁被正确释放,防止死锁。 * 自动化管理:with 语句自动管理资源的获取和释放,遵循上下文管理器的机制,使代码更加 Pythonic。
    * 示例 5:Manager 中的 Value 和 Array
    multiprocessing.Manager().Value 和 multiprocessing.Manager().Array 是 multiprocessing 模块中用于进程间共享数据的高级工具。它们通过管理器对象提供了进程间安全数据共享方式。与 multiprocessing.Value 和 multiprocessing.Array 的共享内存实现不同,Manager().Value 和 Manager().Array 通过代理模式使用,这使得它们不仅可以在同一台机器上共享,还可以通过网络在不同机器之间共享数据。 * Manager().Value(typecode, value):创建一个用于共享的值对象。 * typecode:表示值的类型,例如 'i' 表示整数,'d' 表示双精度浮点数; * value:初始化时的值; * Manager().Array(typecode, sequence):创建一个用于共享的数组对象。 * typecode:表示数组元素的类型,例如 'i' 表示整数,'d' 表示双精度浮点数; * sequence:用于初始化数组的序列;

示例:Manager().Value 的用法

Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
  import multiprocessing

  def worker(shared_value, lock):
      with lock:
          shared_value.value += 1  # 在共享变量上执行加法操作

  if __name__ == "__main__":
      manager = multiprocessing.Manager()
      shared_value = manager.Value('i', 0)  # 共享整数变量,初始值为0
      lock = manager.Lock()  # 用于同步的锁

      processes = []
      for _ in range(5):
          p = multiprocessing.Process(target=worker, args=(shared_value, lock))
          processes.append(p)
          p.start()

      for p in processes:
          p.join()

      print(f"Final Value: {shared_value.value}")  # 输出应该为 5
  • manager.Value('i', 0) 创建了一个初始值为 0 的共享整数;
  • 使用 lock 来确保对共享变量的操作是原子的(防止竞争条件);
  • 每个进程都会对共享变量执行 +1 操作,因此最终值为 5;

示例:Manager().Array 的用法

Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
import multiprocessing

def worker(shared_array, lock):
    with lock:
        for i in range(len(shared_array)):
            shared_array[i] += 1  # 增加每个数组元素的值

if __name__ == "__main__":
    manager = multiprocessing.Manager()
    shared_array = manager.Array('i', [1, 2, 3])  # 创建一个共享数组
    lock = manager.Lock()

    processes = []
    for _ in range(3):
        p = multiprocessing.Process(target=worker, args=(shared_array, lock))
        processes.append(p)
        p.start()

    for p in processes:
        p.join()

    print(f"Final Array: {shared_array[:]}")  # 输出应该为 [4, 5, 6]  
  • manager.Array('i', [1, 2, 3]) 创建了一个初始为 [1, 2, 3] 的共享数组;
  • 每个进程都会将数组中的每个元素加 1;
  • 最终数组的结果是 [4, 5, 6];

Manager().Value 和 Manager().Array 与 Namespace 的区别: * 共享机制: * Manager().Value 和 Manager().Array 是通过代理模式进行通信的,支持跨进程和跨机器的数据共享; * Namespace 提供一个简单的共享空间,但它是一个更为通用的数据容器,并且需要手动使用锁来避免竞争条件; * 线程/进程安全: * Manager().Value 和 Manager().Array 默认在一定程度上提供了进程间安全的共享机制,但对于复杂的并发访问场景,仍建议使用显式锁来避免数据竞争。 * Namespace 并没有内置锁机制,必须手动加锁来保证数据操作的安全性。

* 示例 6:Manager().Barrier
Barrier 用于同步多个进程,确保所有进程在某个点上等待彼此。

Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import multiprocessing
import time

def worker(barrier, id):
    print(f"Process {id} is doing some work...")
    time.sleep(2)  # 模拟工作
    print(f"Process {id} is ready at the barrier.")
    barrier.wait()  # 等待其他进程到达此点
    print(f"Process {id} has crossed the barrier.")

if __name__ == "__main__":
    manager = multiprocessing.Manager()
    barrier = manager.Barrier(3)  # 设置需要等待的进程数量

    processes = []
    for i in range(3):
        p = multiprocessing.Process(target=worker, args=(barrier, i))
        processes.append(p)
        p.start()

    for p in processes:
        p.join()  # 等待所有进程结束

# 输出:
# Process 1 is doing some work...
# Process 0 is doing some work...
# Process 2 is doing some work...
# Process 1 is ready at the barrier.
# Process 0 is ready at the barrier.
# Process 2 is ready at the barrier.
# Process 2 has crossed the barrier.
# Process 0 has crossed the barrier.
# Process 1 has crossed the barrier.
  • 每个进程在完成工作后会调用 barrier.wait(),这会使其阻塞,直到所有进程都到达这个点;
  • 当所有进程都到达后,它们将一起继续执行;

  • 示例 7:Manager().BoundedSemaphore BoundedSemaphore 用于限制同时访问的进程数。
Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import multiprocessing
import time

def worker(sem, id):
    with sem:  # 获取信号量
        print(f"Process {id} is accessing the resource.")
        time.sleep(2)  # 模拟访问资源
        print(f"Process {id} has released the resource.")

if __name__ == "__main__":
    manager = multiprocessing.Manager()
    semaphore = manager.BoundedSemaphore(2)  # 允许同时两个进程访问

    processes = []
    for i in range(5):
        p = multiprocessing.Process(target=worker, args=(semaphore, i))
        processes.append(p)
        p.start()

    for p in processes:
        p.join()  # 等待所有进程结束

# 输出:
# Process 1 is accessing the resource.
# Process 3 is accessing the resource.
# Process 1 has released the resource.
# Process 0 is accessing the resource.
# Process 3 has released the resource.
# Process 2 is accessing the resource.
# Process 0 has released the resource.
# Process 4 is accessing the resource.
# Process 2 has released the resource.
# Process 4 has released the resource.
  • BoundedSemaphore 限制最多只有两个进程可以同时访问资源,其他进程需要等待;
  • 使用 with sem: 自动管理信号量的获取和释放;

  • 示例 8:Manager().Condition Condition 允许进程在某个条件下等待,并可以被其他进程通知。
Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import multiprocessing
import time

def worker(condition):
    with condition:
        print("Worker is waiting for the condition.")
        condition.wait()  # 等待条件被通知
        print("Worker has been notified and is continuing work.")

def notifier(condition):
    time.sleep(2)  # 模拟一些工作
    with condition:
        print("Notifier is notifying the worker.")
        condition.notify()  # 通知等待的进程

if __name__ == "__main__":
    manager = multiprocessing.Manager()
    condition = manager.Condition()

    worker_process = multiprocessing.Process(target=worker, args=(condition,))
    notifier_process = multiprocessing.Process(target=notifier, args=(condition,))

    worker_process.start()
    notifier_process.start()

    worker_process.join()
    notifier_process.join()

# 输出:
# Worker is waiting for the condition.
# Notifier is notifying the worker.
# Worker has been notified and is continuing work.
  • worker 进程在等待条件时会阻塞,直到被 notifier 进程通知;
  • condition.notify() 通知所有等待的进程继续执行;

  • 示例 9:Manager().Event Event 允许一个或多个进程等待某个事件的发生。
Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import multiprocessing
import time

def worker(event):
    print("Worker is waiting for the event to be set.")
    event.wait()  # 等待事件被设置
    time.sleep(2)
    print("Worker has received the event.")

if __name__ == "__main__":
    manager = multiprocessing.Manager()
    event = manager.Event()

    worker_process = multiprocessing.Process(target=worker, args=(event,))
    worker_process.start()

    time.sleep(2)  # 主进程做一些工作
    print("Main process is setting the event.")
    event.set()  # 设置事件,通知工作进程,主进程继续执行
    print("Main process continue.")
    worker_process.join()  # 等待工作进程结束

# 输出:
# Worker is waiting for the event to be set.
# Main process is setting the event.
# Main process continue.
# Worker has received the event.
  • worker 进程会阻塞在 event.wait(),直到主进程调用 event.set();
  • 一旦事件被设置,工作进程将继续执行;

  • 示例 10:Manager().Semaphore Semaphore 控制对特定资源的访问,允许一定数量的进程同时访问。
Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
import multiprocessing
import time

def worker(sem, id):
    with sem:  # 获取信号量
        print(f"Process {id} is accessing the resource.")
        time.sleep(2)  # 模拟访问资源
        print(f"Process {id} has released the resource.")

if __name__ == "__main__":
    manager = multiprocessing.Manager()
    semaphore = manager.Semaphore(2)  # 允许同时两个进程访问

    processes = []
    for i in range(5):
        p = multiprocessing.Process(target=worker, args=(semaphore, i))
        processes.append(p)
        p.start()

    for p in processes:
        p.join()  # 等待所有进程结束
  • Semaphore 允许最多两个进程同时访问资源,其他进程需等待;
  • 使用 with sem: 语句简化了信号量的管理;

  • 示例 11:Manager().Semaphore() 和 Manager().BoundedSemaphore() 区别
  • Manager().Semaphore()
    • 功能:允许你创建一个信号量,可以控制同时访问某个资源的进程数;
    • 特点:没有上限,只要信号量的计数大于 0,进程就可以获取信号量。即使当前进程已经释放信号量,计数可以超过初始值;
  • Manager().BoundedSemaphore()
    • 功能:也是用于控制并发访问的信号量,但它具有一个上限;
    • 特点:确保信号量的计数不会超过初始值。若超过该值,调用 release() 将引发 ValueError;

示例对比:

Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
  import multiprocessing


  # 使用 Semaphore
  def semaphore_worker(sem, id):
      sem.acquire()
      print(f"Semaphore Worker {id} acquired semaphore.")
      sem.release()
      print(f"Semaphore Worker {id} released semaphore.")


  # 使用 BoundedSemaphore
  def bounded_semaphore_worker(bsem, id):
      bsem.acquire()
      print(f"BoundedSemaphore Worker {id} acquired bounded semaphore.")
      bsem.release()
      print(f"BoundedSemaphore Worker {id} released bounded semaphore.")


  if __name__ == "__main__":
      manager = multiprocessing.Manager()

      # Semaphore 示例
      sem = manager.Semaphore(2)
      semaphore_processes = []
      for i in range(3):
          p = multiprocessing.Process(target=semaphore_worker, args=(sem, i))
          semaphore_processes.append(p)
          p.start()

      # 等待 Semaphore 进程结束
      for p in semaphore_processes:
          p.join()

      # BoundedSemaphore 示例
      bsem = manager.BoundedSemaphore(2)
      bounded_semaphore_processes = []
      for i in range(3):
          p = multiprocessing.Process(target=bounded_semaphore_worker, args=(bsem, i))
          bounded_semaphore_processes.append(p)
          p.start()

      # 等待 BoundedSemaphore 进程结束
      for p in bounded_semaphore_processes:
          p.join()
  • Semaphore: 你创建的 Semaphore 允许最多 2 个进程同时访问。如果同时有超过 2 个进程试图调用 acquire() ,后续的进程将被阻塞,直到信号量被释放;
  • BoundedSemaphore: 也有相同的行为,限制最多 2 个进程同时访问。与普通信号量不同的是,如果你尝试释放一个信号量而计数已经达到初始值,会抛出 ValueError; 如果你需要限制并发访问且想要防止超过最大限制,使用 BoundedSemaphore;如果不需要这种限制,可以使用普通的 Semaphore。

1.3.3.2 自定义管理器

要创建一个自定义的管理器,需要新建一个 multiprocessing.managers.BaseManager 的子类,然后使用这个管理器类上的 register() 类方法将新类型或者可调用方法注册上去。例如:

Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
from multiprocessing.managers import BaseManager


class MathsClass:
    # MathsClass 提供了两个方法:add 用于加法,mul 用于乘法
    def add(self, x, y):
        return x + y

    def mul(self, x, y):
        return x * y


# 定义自定义管理器
class MyManager(BaseManager):
    # MyManager 继承自 BaseManager,并可用于注册共享对象
    pass


# 使用 register 方法将 MathsClass 注册为共享对象,命名为 'Maths'
MyManager.register('Maths', MathsClass)

if __name__ == '__main__':
    # 启动管理器,这样可以自动处理资源的释放
    with MyManager() as manager:
        #  创建 MathsClass 的共享实例
        maths = manager.Maths()
        # 通过调用 maths.add(4, 3) 和 maths.mul(7, 8) 分别计算并输出结果
        print(maths.add(4, 3))  # prints 7
        print(maths.mul(7, 8))  # prints 56

1.3.3.3 使用远程管理器

可以将管理器服务运行在一台机器上,然后使用客户端从其他机器上访问。(假设它们的防火墙允许)

服务器端代码: 运行下面的代码可以启动一个服务,包含了一个共享队列,允许远程客户端访问

Text Only
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
>>> from multiprocessing.managers import BaseManager
>>> from queue import Queue
# 创建一个共享队列
>>> queue = Queue()
# 定义一个管理器类
>>> class QueueManager(BaseManager): pass
# 注册共享队列,使得可以通过名称获取它
>>> QueueManager.register('get_queue', callable=lambda:queue)
# 启动管理器,绑定到特定地址和端口
>>> m = QueueManager(address=('', 50000), authkey=b'abracadabra')
# 创建并启动服务器
>>> s = m.get_server()
>>> s.serve_forever()
  • 导入模块:导入 BaseManager 和 Queue;
  • 创建队列:实例化一个 Queue 对象,用于在进程之间共享数据;
  • 定义管理器:创建一个自定义的 QueueManager 继承自 BaseManager;
  • 注册队列:通过 register 方法将共享队列注册到管理器,允许远程客户端访问;
  • 启动管理器:设置管理器的地址(在本地运行,监听所有接口)和端口(50000),以及授权密钥;
  • 启动服务器:调用 get_server() 方法并使服务器进入无限循环,等待客户端连接;

客户端代码(PUT 数据):远程客户端可以通过下面的方式访问服务

Text Only
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
>>> from multiprocessing.managers import BaseManager
# 定义管理器类
>>> class QueueManager(BaseManager): pass
# 注册共享队列
>>> QueueManager.register('get_queue')
# 连接到远程管理器
>>> m = QueueManager(address=('foo.bar.org', 50000), authkey=b'abracadabra')
>>> m.connect()
# 获取共享队列
>>> queue = m.get_queue()
# 向队列中放入数据
>>> queue.put('hello')
  • 导入模块:同样导入 BaseManager;
  • 定义管理器:重新定义 QueueManager 类;
  • 注册队列:注册共享队列,使客户端能够访问;
  • 连接管理器:通过提供远程地址和端口连接到服务器;
  • 获取队列:获取远程共享队列的代理;
  • 放入数据:使用 put 方法向队列中添加数据;

客户端代码(GET 数据):也可以通过下面的方式

Text Only
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
>>> from multiprocessing.managers import BaseManager
# 定义管理器类
>>> class QueueManager(BaseManager): pass
# 注册共享队列
>>> QueueManager.register('get_queue')
# 连接到远程管理器
>>> m = QueueManager(address=('foo.bar.org', 50000), authkey=b'abracadabra')
>>> m.connect()
# 获取共享队列
>>> queue = m.get_queue()
# 从队列中获取数据
>>> queue.get()
  • 重复定义和注册:与前面的客户端代码相似,定义管理器和注册共享队列;
  • 连接和获取:连接到远程管理器并获取队列;
  • 获取数据:使用 get 方法从队列中读取数据;

本地进程也可以访问这个队列,利用上面的客户端代码通过远程方式访问:

Text Only
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
>>> from multiprocessing import Process, Queue
>>> from multiprocessing.managers import BaseManager
# 定义工作进程
>>> class Worker(Process):
...     def __init__(self, q):
...         self.q = q
...         super().__init__()
...     def run(self):
...         # 向队列放入数据
...         self.q.put('local hello')
...
# 创建本地队列
>>> queue = Queue()
# 启动工作进程
>>> w = Worker(queue)
>>> w.start()
# 启动管理器
>>> class QueueManager(BaseManager): pass
...
>>> QueueManager.register('get_queue', callable=lambda: queue)
>>> m = QueueManager(address=('', 50000), authkey=b'abracadabra')
>>> s = m.get_server()
>>> s.serve_forever()
  • 工作进程类:定义一个 Worker 类,继承自 Process,用于将数据放入共享队列;
  • 创建本地队列:实例化一个本地 Queue 对象;
  • 启动工作进程:创建并启动工作进程;
  • 启动管理器:重新定义并注册共享队列;
  • 运行服务器:启动服务器并等待连接,允许本地进程和远程客户端访问共享队列;

1.3.4 共享 ctypes 对象

ctypes 是 Python 的外部函数库,它提供了与 C 兼容的数据类型,并允许调用 DLL 或共享库中的函数。结合 multiprocessing 模块,我们可以利用 ctypes 来创建共享的 C 类型对象,这些对象可以在多个进程之间共享内存。这种共享机制避免了在不同进程之间复制数据,能够提升性能。

常见共享的 ctypes 对象

  • multiprocessing.Value:用于创建一个共享的单个值,类似于 C 的标量类型(如 int,float);
  • multiprocessing.Array:用于创建共享数组,类似于 C 的数组类型;

这些对象是托管在共享内存中的,多个进程可以同时访问它们,且需要通过同步原语(如锁)来避免数据竞争。

示例代码

  • 示例 1:使用 multiprocessing.Value
Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import multiprocessing
import time
# 或者
# import ctypes


def worker(shared_value, lock):
    for _ in range(5):
        time.sleep(0.1)
        with lock:  # 获取锁
            shared_value.value += 1


if __name__ == "__main__":
    # 创建一个共享内存的 Value
    # 这里使用的是字符串 'i',这是 multiprocessing 提供的一种简化表示方式,表示一个 有符号整数。这种符号化表示来自 Python 的 struct 模块的格式化字符。'i' 相当于 ctypes.c_int,但写法上更简洁。multiprocessing 支持多种类似的符号化字符来表示不同的数据类型。
    shared_value = multiprocessing.Value('i', 0)  # 'i' 表示整型
    # 或者: 这里显式地使用 ctypes.c_int 来指定 Value 对象的数据类型,即创建一个存储在共享内存中的 ctypes 整数类型 (c_int),使用 ctypes 提供的类型时,可以更明确地表示你想要使用的具体 C 数据类型
    # shared_value = multiprocessing.Value(ctypes.c_int, 0)
    lock = multiprocessing.Lock()  # 创建锁

    # 创建多个进程
    processes = []
    for _ in range(3):  # 启动3个进程
        process = multiprocessing.Process(target=worker, args=(shared_value, lock))
        processes.append(process)
        process.start()

    # 等待所有进程完成
    for process in processes:
        process.join()

    print(f'Final value: {shared_value.value}')
  • 创建锁:使用 multiprocessing.Lock() 创建一个锁对象;
  • 获取和释放锁:在访问共享内存时,使用 with lock: 语句来自动获取锁和释放锁。这可以确保在锁被持有时,其他进程无法访问该资源;
  • 多个进程:在这个示例中,我们启动了多个进程同时对共享变量进行增值操作,通过加锁确保数据的一致性;

  • 示例 2:共享数组 multiprocessing.Array

Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import multiprocessing
import time
# 或者
# import ctypes


def worker(shared_array, lock):
    for i in range(len(shared_array)):
        time.sleep(0.1)
        with lock:  # 获取锁
            shared_array[i] += 1


if __name__ == "__main__":
    # 创建一个共享内存的 Array
    shared_array = multiprocessing.Array('i', [0, 1, 2, 3, 4])  # 'i' 表示整型数组
    # 或者
    # shared_array = multiprocessing.Array(ctypes.c_int, [0,1, 2, 3, 4])
    lock = multiprocessing.Lock()  # 创建锁

    # 创建多个进程
    processes = []
    for _ in range(3):  # 启动3个进程
        process = multiprocessing.Process(target=worker, args=(shared_array, lock))
        processes.append(process)
        process.start()

    # 等待所有进程完成
    for process in processes:
        process.join()

    print(f'Final array: {[shared_array[i] for i in range(len(shared_array))]}')

# 输出:
# Final array: [3, 4, 5, 6, 7]
  • multiprocessing.Valuemultiprocessing.Array 会将数据放在共享内存中,因此多个进程可以读写这些数据;
  • 如果多个进程需要并发读写共享数据,建议使用同步原语(如 multiprocessing.Lock)来避免数据竞争;
  • ctypes 提供了对多种 C 类型的支持,如 c_int、c_float、c_double 等;
  • 【说明】
  • 'i':有符号整数(相当于 ctypes.c_int);
  • 'I':无符号整数(相当于 ctypes.c_uint);
  • 'f':浮点数(相当于 ctypes.c_float);
  • 'd':双精度浮点数(相当于 ctypes.c_double);
  • 'b':有符号字节(相当于 ctypes.c_byte);
  • 'B':无符号字节(相当于 ctypes.c_ubyte);

1.3.5 同步原语

在 Python 的多进程编程中,进程同步是指协调多个进程之间的执行顺序,以避免竞争条件和数据不一致的问题。Python 的 multiprocessing 模块提供了多种同步原语:Lock、RLock、Semaphore、Event、Condition、Barrier。通常来说同步原语在多进程环境中并不像它们在多线程环境中那么必要。

  • Lock(锁)
    锁用于确保同一时刻只有一个进程可以访问共享资源。可以使用 with 语句自动管理锁的获取和释放。
  • acquire(block=True, timeout=None):用于请求锁的获取。
    • block(默认值为 True):如果设置为 True,则当锁被其他进程持有时,当前进程会阻塞,直到锁可用。如果设置为 False,则当前进程不会阻塞,如果锁不可用,则立即返回 False;
    • timeout(默认值为 None):指定阻塞等待锁的最长时间(以秒为单位)。如果在超时时间内未获取到锁,方法将返回 False。如果不设置或设置为 None,则会无限期等待;
  • release():用于释放锁,使其他进程可以获取该锁。调用 release 必须在锁被当前进程持有的情况下,否则会引发 RuntimeError。

示例:

Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
import multiprocessing
import time


def worker(lock, num):
    with lock:  # 获取锁
        print(f'Process {num} is running.')
        time.sleep(1)  # 模拟处理时间
    print(f'Process {num} is success.')


if __name__ == "__main__":
    lock = multiprocessing.Lock()
    processes = []

    for i in range(5):
        p = multiprocessing.Process(target=worker, args=(lock, i))
        processes.append(p)
        p.start()

    for p in processes:
        p.join()
  • RLock(递归锁)
    递归锁必须由持有进程亲自释放,如果某个进程拿到了递归锁,这个进程可以再次拿到这个锁而不需要等待,但是这个进程拿锁操作和释放锁操作的次数必须相同。
Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import multiprocessing
import time


def worker(rlock, num):
    with rlock:  # 获取锁
        print(f'Process {num} is running.')
        time.sleep(1)
        with rlock:  # 重新获取锁
            print(f'Process {num} is still running.')


if __name__ == "__main__":
    rlock = multiprocessing.RLock()
    processes = []

    for i in range(3):
        p = multiprocessing.Process(target=worker, args=(rlock, i))
        processes.append(p)
        p.start()

    for p in processes:
        p.join()

# 输出:
# Process 0 is running.
# Process 0 is still running.
# Process 1 is running.
# Process 1 is still running.
# Process 2 is running.
# Process 2 is still running.
  • Semaphore(信号量)
    一种信号量对象: 允许一定数量的进程同时访问某个资源。通过初始化信号量的计数,可以控制访问的数量。类似于 threading.Semaphore 不同在于,它的 acquire 方法的第一个参数名是和 Lock.acquire() 一样的 block
    【备注】
  • 在 macOS 上,不支持 sem_timedwait ,所以,调用 acquire() 时如果使用 timeout 参数,会通过循环 sleep 来模拟这个函数的行为;
  • 这个包的某些功能依赖于宿主机系统的共享信号量的实现,如果系统没有这个特性, multiprocessing.synchronize 会被禁用,尝试导入这个模块会引发 ImportError 异常;
Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import multiprocessing
import time


def worker(sem, num):
    with sem:  # 使用 with 语句自动获取和释放信号量
        print(f'Process {num} is running.')
        time.sleep(2)  # 模拟处理时间
    print(f'Process {num} is end.')


# def worker(sem, num):
#     sem.acquire()  # 获取信号量
#     print(f'Process {num} is running.')
#     time.sleep(2)  # 模拟处理时间
#     sem.release()  # 释放信号量

if __name__ == "__main__":
    sem = multiprocessing.Semaphore(2)  # 允许最多2个进程同时访问
    processes = []

    for i in range(3):
        p = multiprocessing.Process(target=worker, args=(sem, i))
        processes.append(p)
        p.start()

    for p in processes:
        p.join()

# 输出:
# Process 0 is running.
# Process 1 is running.
# Process 0 is end.
# Process 2 is running.
# Process 1 is end.
# Process 2 is end.
  • with sem: 会在进入块时调用 sem.acquire(),在退出块时调用 sem.release(),即使在块中发生异常,也会确保信号量被释放;
  • 这样使用可以减少显式调用 acquire() 和 release() 的错误风险,并使代码更易于维护;

  • BoundedSemaphore(信号量)
    BoundedSemaphore 类似于 Semaphore,但它限制了信号量的最大计数,防止过度释放信号量。

Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import multiprocessing
import time


def worker(sem, num):
    with sem:
        print(f'Worker {num} is running.')
        time.sleep(1)  # 模拟工作


if __name__ == "__main__":
    sem = multiprocessing.BoundedSemaphore(2)  # 最大计数为2
    processes = []

    for i in range(5):
        p = multiprocessing.Process(target=worker, args=(sem, i))
        processes.append(p)
        p.start()

    for p in processes:
        p.join()

# 输出:
# Worker 0 is running.
# Worker 1 is running.
# Worker 4 is running.
# Worker 2 is running.
# Worker 3 is running.
  • Condition(条件)
    允许一个或多个进程等待某个条件的发生,通常用于复杂的同步场景,比如生产者-消费者模型。

  • acquire(): 获取条件的锁;

  • release(): 释放条件的锁;
  • wait(): 释放锁并等待其他进程发出通知;
  • wait_for(): 重复地调用 wait() 直到满足判断式或者发生超时;
  • notify(n=1): 唤醒一个等待该条件的进程;
  • notify_all(): 唤醒所有等待该条件的进程; [参考]:(https://docs.python.org/zh-cn/3/library/threading.html#threading.Condition.acquire)

示例:

Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import multiprocessing
import time


def producer(cond):
    # 在获取锁后模拟生产,完成后调用 notify() 通知消费者
    with cond:  # 获取条件的锁
        print('Producing...')
        time.sleep(2)  # 模拟生产时间
        cond.notify()  # 通知消费者
        print('Producer has notified the consumer.')


def consumer(cond):
    # 先打印等待消息,然后在获取锁后调用 wait() 等待通知
    print('Waiting for producer...')
    with cond:  # 获取条件的锁
        cond.wait()  # 等待生产者的通知
        print('Consumer has been notified.')


if __name__ == "__main__":
    condition = multiprocessing.Condition()

    # 创建并启动消费者进程
    consumer_process = multiprocessing.Process(target=consumer, args=(condition,))
    consumer_process.start()

    # 确保消费者先运行
    time.sleep(1)

    # 创建并启动生产者进程
    producer_process = multiprocessing.Process(target=producer, args=(condition,))
    producer_process.start()

    # 等待两个进程完成
    consumer_process.join()
    producer_process.join()

# 输出:
# Waiting for producer...
# Producing...
# Producer has notified the consumer.
# Consumer has been notified.  

主进程先启动消费者,确保它在生产者之前等待。可以使用 time.sleep(1) 暂停一下,确保消费者先运行。

  • Event(事件)
    Event 用于在进程之间进行简单的信号传递,一个进程可以设置事件,其他进程可以等待该事件的发生。
Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
import multiprocessing
import time


def worker(event):
    print('Worker is waiting for event.')
    event.wait()  # 等待事件
    print('Worker is proceeding.')


if __name__ == "__main__":
    event = multiprocessing.Event()
    p = multiprocessing.Process(target=worker, args=(event,))
    p.start()

    time.sleep(2)  # 模拟处理时间
    event.set()  # 触发事件
    print('Event has been set.')

    p.join()
  • Barrier Barrier 用于协调多个进程的执行,确保所有进程在到达某个点后一起继续执行。
Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import multiprocessing
import time


def worker(barrier, id):
    print(f'Process {id} is doing some work...')
    time.sleep(2)  # 模拟工作
    print(f'Process {id} has reached the barrier.')

    # 等待其他进程到达
    barrier.wait()

    print(f'Process {id} is continuing after the barrier.')


if __name__ == "__main__":
    num_processes = 3
    barrier = multiprocessing.Barrier(num_processes)

    processes = []
    for i in range(num_processes):
        p = multiprocessing.Process(target=worker, args=(barrier, i))
        processes.append(p)
        p.start()

    for p in processes:
        p.join()
  • 创建 Barrier:使用 multiprocessing.Barrier(num_processes) 创建一个 Barrier 对象,指定需要同步的进程数量;
  • 工作函数: 每个进程执行 worker 函数,模拟一些工作,然后调用 barrier.wait(),在这里进程将会被阻塞,直到指定进程数都到达此点;
  • 进程启动: 主进程创建并启动多个子进程,最后通过 join() 等待它们完成;

1.3.6 跨进程直接访问内存共享

multiprocessing.shared_memory 模块是在 Python 3.8 中引入的,用于在多个进程之间直接共享内存,避免了进程间数据的复制,这对于需要高效共享大块数据的应用非常有用。

关键类与方法

  • multiprocessing.shared_memory.SharedMemory: 创建一个 SharedMemory 类的实例用来新建一个共享内存块或关联到一个已存在的共享内存块;
Python
1
class multiprocessing.shared_memory.SharedMemory(name=None, create=False, size=0, *, track=True)

参数: * name (str | None):指定请求共享内存名称,以字符串形式指定。“None”(默认值),则随机生成一个新名称; * create (bool):指定新建共享内存块 (True) 还是关联到已有的共享内存块 (False); * size (int):新建共享内存块所请求的字节数。由于某些平台会选择根据平台的内存页大小来分配内存块,因此共享内存块的实际大小可能会大于等于所请求的大小。 当关联到已有的共享内存块时,size 形参将被忽略; * track (bool):当为 True 时,将在 OS 不会自动为共享内存块注册资源跟踪器进程的平台上执行注册操作;

方法: * SharedMemory.create(size): 创建指定大小的共享内存块; * SharedMemory.attach(name): 附加到一个已有的共享内存块; * SharedMemory.close(): 关闭共享内存,但不删除它; * SharedMemory.unlink(): 删除共享内存,使其不可用;

  • ShareableList: 这是一个类似 Python 列表的对象,可以跨进程共享;

  • 示例:跨进程共享 NumPy 数组

Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
import multiprocessing
import numpy as np
from multiprocessing import shared_memory

# worker 函数: 这个函数在子进程中运行,用来访问和修改共享内存中的数据。
def worker(shared_name, shape, dtype):
    # shared_name: 共享内存的名称。
    # shape 和 dtype: 用于恢复共享的 NumPy 数组的形状和数据类型。
    # 附加到共享内存块,附加到由主进程创建的共享内存块,通过 name 参数,子进程可以识别和连接到相同的共享内存块。
    shm = shared_memory.SharedMemory(name=shared_name)

    # 恢复 NumPy 数组,使用共享内存缓冲区 shm.buf 恢复 NumPy 数组。shape 和 dtype 用来正确地重建数组。
    array = np.ndarray(shape, dtype=dtype, buffer=shm.buf)

    # 修改数组的内容,修改共享内存中的数组,具体来说,将数组的第一个元素设置为 999。
    array[0] = 999
    # 关闭共享内存对象,但不删除它,因为其他进程可能仍在使用它。
    shm.close()

if __name__ == "__main__":
    # 创建 NumPy 数组:创建了一个 NumPy 数组 array,它包含 5 个元素,数据类型为 64 位整数(np.int64)。
    array = np.array([1, 2, 3, 4, 5], dtype=np.int64)

    # 分配共享内存:create=True: 表示正在创建一个新的共享内存块。
    # 为了确保共享内存足够大,我们根据数组的字节大小来分配共享内存(array.nbytes 是数组的字节大小)。
    shm = shared_memory.SharedMemory(create=True, size=array.nbytes)

    # 将 NumPy 数组的内容复制到共享内存
    # 使用共享内存的缓冲区 shm.buf,创建与原始数组相同形状和数据类型的 NumPy 数组 shared_array。
    shared_array = np.ndarray(array.shape, dtype=array.dtype, buffer=shm.buf)

    # 将原始数组 array 的所有内容复制到共享内存中的 shared_array。
    shared_array[:] = array[:]

    # 启动进程,修改共享内存中的数据
    # 创建一个新进程,目标函数是 worker。通过 args 参数传递共享内存的名称 shm.name、数组的形状 array.shape 以及数据类型 array.dtype。
    process = multiprocessing.Process(target=worker, args=(shm.name, array.shape, array.dtype))
    process.start()
    process.join()

    # 检查共享内存中的数据
    print("Main process array:", shared_array)

    # 清理共享内存
    # 关闭主进程中的共享内存对象。
    shm.close()
    # 删除共享内存块,释放它的资源。unlink 是必须的,因为共享内存不会自动删除。
    shm.unlink()

# 输出:
# Main process array: [999   2   3   4   5]
  • 示例:使用 ShareableList

除了使用共享内存直接处理 NumPy 数组,还可以通过 ShareableList 共享简单的 Python 列表。

Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import multiprocessing
from multiprocessing import shared_memory


def worker(shared_list):
    shared_list[0] = 42  # 修改共享列表中的值


if __name__ == "__main__":
    # 创建共享列表
    shared_list = shared_memory.ShareableList([1, 2, 3, 4, 5])

    # 启动进程,修改共享列表中的数据
    process = multiprocessing.Process(target=worker, args=(shared_list,))
    process.start()
    process.join()

    # 查看修改后的共享列表
    print("Main process list:", shared_list)

    # 清理共享内存
    shared_list.shm.close()
    shared_list.shm.unlink()

# 输出:
# Main process list: ShareableList([42, 2, 3, 4, 5], name='psm_4ef9f08d')

参考官网:https://docs.python.org/zh-cn/3/library/multiprocessing.shared_memory.html

2 线程 (Thread)

线程是 CPU 调度的基本单位,一个进程可以包含多个线程,线程之间共享进程的内存空间。

参考:https://bbs.huaweicloud.com/blogs/289314

特点:

  • 轻量级:相比于进程,线程的创建和销毁成本较低;
  • 共享内存:线程之间可以共享内存空间,这使得线程间通信更加便捷;

缺点:

  • GIL 限制:在 CPython 解释器中,线程受到全局解释器锁(GIL)的限制,导致 Python 的多线程在 CPU 密集型任务中无法利用多核并行执行;
  • 安全问题:由于线程共享内存,多个线程同时修改共享数据时,可能会发生竞争条件,需要使用锁 (Lock) 等同步机制来防止数据竞争;

使用场景:

  • I/O 密集型任务,如文件读写、网络请求等;

代码示例:

Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
import threading
import time


def worker(num):
    time.sleep(1)
    print(f'Worker: {num}')


threads = []
for i in range(5):
    t = threading.Thread(target=worker, args=(i,))
    threads.append(t)
    t.start()

for t in threads:
    t.join()

2.1 threading.Thread

threading.Thread 是 Python 中的线程类,用于创建和管理线程。通过指定参数,可以自定义线程的行为。

Text Only
1
class threading.Thread(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)

参数介绍

  • group: 目前默认始终为 None,保留参数,保留给将来实现 ThreadGroup 类的扩展使用;
  • target: 线程执行目标函数(用于 run() 方法调用的可调用对象),默认是 None,表示不需要调用任何方法;
  • name: 线程名称,默认情况下,以 "Thread-N" 形式构造唯一名称,其中 N 为一个较小的十进制数值,或是 "Thread-N (target)" 的形式,其中 "target" 为 target.__name__(前提指定了 target 参数);
  • args: 调用目标函数的参数列表或元组,默认为 ()
  • kwargs: 调用目标函数的关键字参数字典,默认是 {}
  • daemon: 是否将线程设置为守护线程:守护线程在主线程结束时会被强制终止;非守护线程会让主线程等待它执行完。 默认值(None),线程将继承当前线程的守护模式属性(主线程默认是非守护线程),如果不是 None,daemon 参数将显式地设置该线程是否为守护模式。

常用方法

  • start(): 启动线程,调用 run() 方法(同一线程里最多只能被调用一次,否则抛出 RuntimeError),线程将在后台开始执行;
  • run(): 线程启动时调用的方法,可以在子类里重载该方法来定制线程执行的逻辑。如果指定了 target 参数,则默认调用 target(*args, **kwargs);
Text Only
1
2
3
4
5
6
7
>>> from threading import Thread
>>> t = Thread(target=print, args=[1])
>>> t.run()
1
>>> t = Thread(target=print, args=(1,))
>>> t.run()
1
  • join(timeout=None): 阻塞当前线程,等待该线程终止,timeout 是可选的超时时间,指定秒数后超时返回;
  • name: 只用于识别的字符串,它没有语义,多个线程可以赋予相同的名称,初始名称由构造函数设置;
    getName()setName() 3.10 版本弃用,改为直接以特征属性(name)方式使用它;
Text Only
1
2
3
4
5
6
>>> t = Thread(target=print,args=[1])
>>> t.name
'Thread-2 (print)'
>>> t.name='Thread-2 (print)-xxx'
>>> t.name
'Thread-2 (print)-xxx'
  • ident: 线程的 '线程标识符',如果线程尚未开始则为 None。它是一个非零的整数,它的值没有直接含义,主要是用作 magic cookie,比如作为含有线程相关数据的字典的索引。线程标识符可能会在线程退出,新线程创建时被复用;
Text Only
1
2
3
4
5
>>> t.ident
>>> t.start()
1
>>> t.ident
140461852206848
  • native_id: 此线程的线程 ID (TID),由 OS (内核) 分配。 这是一个非负整数,或者如果线程还未启动则为 None。它的值可被用来在整个系统中唯一地标识这个特定线程(直到线程终结,在那之后该值可能会被 OS 回收再利用);
Text Only
1
2
>>> t.native_id
51645
  • is_alive(): 返回线程是否存活。当 run() 方法刚开始直到 run() 方法刚结束,这个方法返回 True 。threading.enumerate() 返回当前所有存活的 Thread 对象的列表;
  • daemon: 布尔值,表示这个线程是否是一个守护线程(True)或不是(False)。 这个值必须在调用 start() 之前设置,否则会引发 RuntimeError 。它的初始值继承自创建线程,主线程不是一个守护线程,因此所有在主线程中创建的线程默认为 daemon = False (当没有存活的非守护线程时,整个Python程序才会退出);
Text Only
1
2
>>> t.daemon
False

isDaemon()setDaemon(),自 3.10 版本弃用的 daemon 的取值/设值 API,改为直接以特征属性(daemon)方式使用它。

Thread 类示例

  • 示例 1: 使用 target、args 和 kwargs 参数
    创建两个线程,分别传递不同的参数给目标函数:
Text Only
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import threading
import time

def worker(message: str, delay: int):
    """
    线程执行目标函数
    """
    for _ in range(3):
        # 返回当前对应调用者控制线程的 Thread 对象
        print(f"{threading.current_thread().name}: {message}")
        time.sleep(delay)

if __name__ == '__main__':
    # 创建 2 个线程
    # 通过位置参数为 worker 函数传递 "Hello from Thread 1" 和延迟 1 秒
    thread1 = threading.Thread(target=worker, args=("Hello from Thread 1", 1), name="worker-1")
    # 通过关键字参数传递不同的参数
    thread2 = threading.Thread(target=worker, kwargs={"message": "Hello from Thread 2", "delay": 2}, name="worker-2")

    # 启动线程
    thread1.start()
    thread2.start()

    # 等待线程结束
    thread1.join()
    thread2.join()

    print("Main thread finished.")

# 输出:
# worker-1: Hello from Thread 1
# worker-2: Hello from Thread 2
# worker-1: Hello from Thread 1
# worker-2: Hello from Thread 2
# worker-1: Hello from Thread 1
# worker-2: Hello from Thread 2
# Main thread finished.  
  • 示例 2: 自定义线程类 继承 threading.Thread 类创建自定义线程类,重写 run() 方法:
Text Only
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
import threading
import time


class CustomThread(threading.Thread):

    def __init__(self, name, delay):
        super().__init__()
        self.name = name
        self.delay = delay

    def run(self):
        """重载 run 方法,定义线程的执行逻辑"""
        for i in range(3):
            print(f"{self.name} is running. Count: {i}")
            time.sleep(self.delay)


if __name__ == "__main__":
    # 创建 2 个自定义线程
    thread1 = CustomThread(name="Custom-Thread-1", delay=1)
    thread2 = CustomThread(name="Custom-Thread-2", delay=2)

    # 启动线程
    thread1.start()
    thread2.start()

    # 等待线程结束
    thread1.join()
    thread2.join()

    print("Main thread finished.")

# 输出:
# Custom-Thread-1 is running. Count: 0
# Custom-Thread-2 is running. Count: 0
# Custom-Thread-1 is running. Count: 1
# Custom-Thread-1 is running. Count: 2
# Custom-Thread-2 is running. Count: 1
# Custom-Thread-2 is running. Count: 2
# Main thread finished.  
  • 示例 3: 守护线程
    守护线程在主线程结束时会自动停止,不管它是否完成了它的任务:
Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import threading
import time


def worker():
    # 线程任务,死循环模拟守护线程
    while True:
        print(f"{threading.current_thread().name} running in the background")
        time.sleep(1)


if __name__ == "__main__":
    # 创建守护线程:daemon=True,将线程设置为守护线程
    daemon_thread = threading.Thread(target=worker, name="", daemon=True)

    # 启动守护线程
    daemon_thread.start()

    # 主线程等待 3 秒:守护线程会在主线程结束时自动停止,因此在主线程等待 3 秒后,守护线程也会自动结束
    time.sleep(3)
    print("Main thread finished.")

# 输出:
# Thread-1 (worker) running in the background
# Thread-1 (worker) running in the background
# Thread-1 (worker) running in the background
# Main thread finished.  
  • 示例 4: 非守护线程
    非守护线程的特点是主线程会等待所有非守护线程执行完毕后才会终止:
Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import threading
import time


def worker():
    for i in range(5):
        print(f"{threading.current_thread().name} task {i}\n")
        time.sleep(1)


if __name__ == "__main__":
    # 创建非守护线程
    non_daemon_thread = threading.Thread(target=worker, name="Non-Daemon-Thread", daemon=False)

    # 启动非守护线程
    non_daemon_thread.start()

    print("Main thread finished. Waiting for non-daemon thread to complete.\n")
    non_daemon_thread.join()  # 主线程等待非守护线程结束
    print("Non-daemon thread finished.\n")

# 输出:
# Non-Daemon-Thread task 0
# Main thread finished. Waiting for non-daemon thread to complete.
# Non-Daemon-Thread task 1
# Non-Daemon-Thread task 2
# Non-Daemon-Thread task 3
# Non-Daemon-Thread task 4
# Non-daemon thread finished.

# non_daemon_thread.join() 注释后输出:
# Non-Daemon-Thread task 0
# Main thread finished. Waiting for non-daemon thread to complete.
# Non-daemon thread finished.
# Non-Daemon-Thread task 1
# Non-Daemon-Thread task 2
# Non-Daemon-Thread task 3
# Non-Daemon-Thread task 4

2.2 线程池

Python 3 concurrent.futures 模块提供了一个 ThreadPoolExecutor 类,用于实现线程池。ThreadPoolExecutorExecutor 的子类,它使用线程池来异步执行调用。

Text Only
1
class concurrent.futures.ThreadPoolExecutor(max_workers=None, thread_name_prefix='', initializer=None, initargs=())

不建议将 ThreadPoolExecutor 用于长期运行的任务:由于 ThreadPoolExecutor 会确保线程池中的所有线程在程序退出之前被合并,如果线程池中的任务是长期运行的(例如无限循环或长时间阻塞的任务),这可能会导致程序在退出时被卡住,因为它要等待这些长期运行的线程完成工作。因此,不建议将 ThreadPoolExecutor 用于长期运行的任务。如果你需要处理长期任务,应该使用其他的解决方案,比如守护线程(daemon threads),或者自己实现更复杂的线程管理机制,来确保程序能够在需要时正常退出,而不是等待长期运行的线程完成。

  • max_workers(可选):指定线程池中最大的工作线程数量。如果提交的任务超过了这个数量,多余的任务会排队等待有空闲线程。
  • 如果 max_workers 为 None 或没有指定,将默认为机器处理器的个数,假如 ThreadPoolExecutor 侧重于I/O操作而不是CPU运算,那么可以乘以 5 ,同时工作线程的数量可以比 ProcessPoolExecutor 的数量高;
  • 在 3.8 版本: max_workers 的默认值已改为 min(32, os.cpu_count() + 4)。 这个默认值会保留至少 5 个工作线程用于 I/O 密集型任务。 对于那些释放了 GIL 的 CPU 密集型任务,它最多会使用 32 个 CPU 核心。这样能够避免在多核机器上不知不觉地使用大量资源;
  • 在 3.13 版本: max_workers 的默认值已改为 min(32, (os.process_cpu_count() or 1) + 4);
  • thread_name_prefix (可选): 允许用户控制由线程池创建的 threading.Thread 工作线程名称以方便调试。如果不设置,线程名称会是类似 ThreadPoolExecutor-0_0 这样的格式;
  • initializer (可选):每个线程启动时都会执行的初始化函数。这个函数会在每个线程开始处理任务前运行一次,常用于初始化线程特定的资源或环境(例如数据库连接、文件句柄等);
  • initargs (可选):传递给 initializer 函数的参数,作为一个元组传入,默认 ()

ThreadPoolExecutor 常用方法

ThreadPoolExecutorExecutor 的子类,Executor 抽象类提供异步执行调用方法,要通过它的子类调用,即 ThreadPoolExecutor 常用方法:

  • submit(fn, /, *args, **kwargs):向线程池提交一个可调用对象(函数:),并立即返回一个 Future 对象,通过 Future 对象可以获取任务的执行结果;
Text Only
1
2
3
with ThreadPoolExecutor(max_workers=1) as executor:
  future = executor.submit(pow, 323, 1235)
  print(future.result())
  • map(fn, *iterables, timeout=None, chunksize=1):将 function 应用于 iterable 的每一项,并产生其结果的迭代器,timeout 超时时间(可以是整数或浮点数),如果 timeout 未指定或为 None,则不限制等待时间;
  • shutdown(wait=True, *, cancel_futures=False): 停止线程池的工作;
  • wait=True:则此方法只有在所有待执行的 future 对象完成执行且释放已分配的资源后才会返回;
  • wait=False:方法立即返回,所有待执行的 future 对象完成执行后会释放已分配的资源。不管 wait 的值是什么,整个 Python 程序将等到所有待执行的 future 对象完成执行后才退出;
  • cancel_futures=True:此方法将取消所有执行器还未开始运行的挂起的 Future。无论 cancel_futures 的值是什么,任何已完成或正在运行的 Future 都不会被取消;
  • cancel_futures=True,wait=True:已开始运行的所有 Future 将在此方法返回之前完成。 其余的 Future 会被取消;
  • 如果使用 with 语句,可以避免显式调用这个方法,它将会停止 Executor (就好像 Executor.shutdown() 调用时 wait 设为 True 一样等待):

    Text Only
    1
    2
    3
    4
    5
    6
    import shutil
    with ThreadPoolExecutor(max_workers=4) as e:
        e.submit(shutil.copy, 'src1.txt', 'dest1.txt')
        e.submit(shutil.copy, 'src2.txt', 'dest2.txt')
        e.submit(shutil.copy, 'src3.txt', 'dest3.txt')
        e.submit(shutil.copy, 'src4.txt', 'dest4.txt')
    

【注意】:死锁 当可调用对象已关联了一个 Future 然后在等待另一个 Future 的结果,会导致死锁情况,例如:

Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import time
# 线程池
from concurrent.futures import ThreadPoolExecutor

def wait_on_b():
    time.sleep(5)
    print(b.result())  # b 永远不会结束因为它在等待 a。
    return 5

def wait_on_a():
    time.sleep(5)
    print(a.result())  # a 永远不会结束因为它在等待 b。
    return 6

executor = ThreadPoolExecutor(max_workers=2)
a = executor.submit(wait_on_b)

# 或者

def wait_on_future():
    f = executor.submit(pow, 5, 2)
    # 这将永远不会完成因为只有一个工作线程
    # 并且它正在执行此函数。
    print(f.result())


executor = ThreadPoolExecutor(max_workers=1)
future = executor.submit(wait_on_future)
result = future.result()
print(result)

示例:

  • 示例 1:使用 submit() 提交单个任务
Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
import time
from concurrent.futures import ThreadPoolExecutor


def task(name):
    print(f"Task {name} is running")
    time.sleep(2)
    return f"Task {name} completed"


if __name__ == '__main__':
    with ThreadPoolExecutor(max_workers=3) as executor:
        future = executor.submit(task, 'jpzhang')  # 向线程池提交任务并立即返回 Future 对象
        result = future.result()  # 阻塞等待任务执行完成并获取结果
        print(result)
  • 示例 2:使用 map() 并行处理多个任务
Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import time
from concurrent.futures import ThreadPoolExecutor


def task(n):
    print(f"Processing {n}")
    time.sleep(1)
    return n * n


if __name__ == '__main__':
    # 创建线程池
    with ThreadPoolExecutor(max_workers=3) as executor:
        nums = [1, 2, 3, 4, 5]
        results = executor.map(task, nums)  # 并行处理多个任务
        for result in results:
            print(result)

# 输出:
# Processing 1
# Processing 2
# Processing 3
# Processing 4
# 1Processing 5
#
# 4
# 9
# 16
# 25
  • executor.map(task, nums) 对列表 nums 中的每个元素都并行执行了 task() 函数,结果是平方数的迭代器;
  • map() 方法返回的是一个结果迭代器,可以使用 for 循环遍历;

  • 示例 3:处理多个异步任务并等待完成

Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import time
from concurrent.futures import ThreadPoolExecutor, as_completed


def task(name):
    print(f"Task {name} is running")
    time.sleep(2)
    return f"Task {name} completed"


if __name__ == '__main__':
    # 创建线程池
    with ThreadPoolExecutor(max_workers=3) as executor:
        # 提交多个任务
        tasks = [executor.submit(task, f"Task-{i}") for i in range(5)]
        for future in as_completed(tasks):  # 等待每个任务完成
            print(future.result())  # 获取每个任务的结果

# 输出:
# Task Task-0 is running
# Task Task-1 is running
# Task Task-2 is running
# Task Task-3 is running
# Task Task-0 completed
# Task Task-4 is running
# Task Task-1 completed
# Task Task-2 completed
# Task Task-3 completed
# Task Task-4 completed
  • executor.submit(task, ...) 提交了多个任务;
  • as_completed() 返回一个迭代器,在每个任务完成时会返回 Future 对象;
  • future.result() 用于获取每个任务的执行结果;

  • 示例 4:线程池中的异常处理
    当任务执行过程中抛出异常时,submit() 返回的 Future 对象可以捕获这些异常。

Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
from concurrent.futures import ThreadPoolExecutor


def task(num):
    if num == 2:
        raise ValueError("Task encountered an error!")
    return num * num


if __name__ == "__main__":
    with ThreadPoolExecutor(max_workers=3) as executor:
        futures = [executor.submit(task, i) for i in range(5)]
        for future in futures:
            try:
                print(future.result())
            except Exception as e:
                print(f"Task raised an exception: {e}")

# 输出:
# 0
# 1
# Task raised an exception: Task encountered an error!
# 9
# 16
  • 当 task(2) 抛出异常时,future.result() 会捕获并抛出该异常,可以在 try-except 块中处理。

  • 示例 5:手动关闭线程池

Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import time
from concurrent.futures import ThreadPoolExecutor


def task(name):
    print(f"Task {name} is running")
    time.sleep(1)
    return f"Task {name} completed"


if __name__ == "__main__":
    # 创建线程池
    executor = ThreadPoolExecutor(max_workers=3)
    futures = [executor.submit(task, f"Task-{i}") for i in range(5)]

    # 手动关闭线程池
    executor.shutdown(wait=True)  # 等待所有任务完成
    print("All tasks completed!")

# 输出:
# Task Task-0 is running
# Task Task-1 is running
# Task Task-2 is running
# Task Task-3 is running
# Task Task-4 is running
# All tasks completed!

# executor.shutdown(wait=False) ,立即返回
# Task Task-0 is running
# Task Task-1 is running
# Task Task-2 is runningAll tasks completed!
#
# Task Task-3 is running
# Task Task-4 is running
  • executor.shutdown() 关闭线程池,并等待所有提交的任务完成。
  • wait=True 表示调用 shutdown() 后会阻塞,直到所有任务完成,否则,立即返回,任务依旧会继续完成。

  • 示例 6:

Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import concurrent.futures
import urllib.request

URLS = ['http://www.foxnews.com/',
        'http://www.cnn.com/',
        'http://europe.wsj.com/',
        'http://www.bbc.co.uk/',
        'http://nonexistant-subdomain.python.org/']

# 获取一个页面并报告其 URL 和内容
def load_url(url, timeout):
    with urllib.request.urlopen(url, timeout=timeout) as conn:
        return conn.read()

# 我们可以使用一个 with 语句来确保线程被迅速清理
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    # 开始加载操作并以每个 Future 对象的 URL 对其进行标记
    future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
        except Exception as exc:
            print('%r generated an exception: %s' % (url, exc))
        else:
            print('%r page is %d bytes' % (url, len(data)))

2.3 GIL(Global Interpreter Lock,全局解释器锁)

在 Python 3 中的多线程主要是并发而不是并行,特别是在使用标准的 CPython 解释器时。这主要是由于“全局解释器锁(GIL)”的存在。

参考:https://zhuanlan.zhihu.com/p/75780308

GIL(Global Interpreter Lock, 全局解释器锁)

  • 什么是 GIL? GIL 是 CPython 解释器中的一个机制,用于保证同一时间只有一个线程执行 Python 字节码。它使得 Python 的多线程在处理 CPU 密集型任务时不能真正实现并行。
  • 影响:即便系统有多个 CPU 核心,Python 多线程在执行 Python 代码时仍然只能同时运行一个线程(这是因为 GIL 会让每个线程轮流获取执行权限)。这导致了多线程在 CPython 中无法充分利用多核 CPU 进行并行计算,而更多地表现为并发。

并发 vs 并行

  • 并发:并发指的是多个任务在同一时间段内交替进行执行。在一个多线程环境中,线程轮流获取 CPU 时间片执行任务,给人一种同时执行的错觉。CPython 中的多线程是这种情况,尤其是在 CPU 密集型任务中。
  • 并行:并行指的是多个任务在同一时刻真正同时执行,通常依赖于多核 CPU 来并行处理多个任务。在 Python 中,要真正实现并行的计算,一般使用多进程(如 multiprocessing 模块)而不是多线程。

多线程的适用场景 虽然 GIL 限制了 Python 多线程的并行能力,但在某些场景下,多线程仍然非常有用:

  • I/O 密集型任务:例如网络请求、文件 I/O 等操作。在这些任务中,线程会等待外部资源的响应,而不需要大量 CPU 计算,因此在等待期间可以切换到其他线程进行工作,充分提高程序的执行效率。
  • 轻量级的任务切换:通过多线程实现任务的交替执行,尤其适合处理大量短小的任务。

如何实现真正的并行 如果程序需要真正的并行计算(特别是在 CPU 密集型任务中),通常有以下几个选择:

  • 使用 multiprocessing 模块:这个模块允许创建多个进程,每个进程拥有自己的 GIL,因此可以充分利用多核 CPU 进行并行计算。
  • 使用 C 扩展模块:对于需要高性能的部分,可以使用 C/C++ 编写扩展模块,然后通过 Python 调用。GIL 对非 Python 代码不生效,因此可以通过这种方式实现并行。
  • 使用 concurrent.futures.ProcessPoolExecutor:这个工具通过多进程来实现真正的并行,适合需要执行大量 CPU 密集型任务的场景。

2.4 线程同步

多个线程共享资源或数据,如果没有合理的同步机制,可能会导致数据竞争、死锁等问题。因此,线程同步是确保多个线程安全地访问共享资源的关键。

2.4.1 Lock (锁)

Lock 是最简单的一种锁机制,表示一个互斥锁。锁的状态可以是“锁定”或“非锁定”。当一个线程获取锁时,其他线程必须等待该线程释放锁才能获取它。这可以确保只有一个线程可以访问共享资源,从而避免竞争条件。

Text Only
1
class threading.Lock

锁对象类,一旦一个线程获得一个锁,会阻塞随后尝试获得锁的线程,直到它被释放,任何线程都可以释放它。

【备注】

  • Lock 锁支持上下文管理协议,因此推荐使用 with 而不是手动调用 acquire() 和 release() 来针对一个代码块处理锁的获取和释放。
  • 在 3.13 版本发生变更: 现在 Lock 是一个类。 在更早的 Python 版本中,Lock 是一个返回下层私有锁类型的实例的工厂函数。

基础方法:

  • acquire(blocking=True, timeout=-1):阻塞或非阻塞获得锁。
  • blocking=True(默认值),阻塞直到锁被释放,然后将锁锁定并返回 True;
  • blocking=False,非阻塞,如果调用时锁未释放,则立即返回 False;否则,将锁锁定并返回 True;
  • timeout:阻塞超时时间,超时未获得锁则返回 False,-1 无限制;
  • release():释放一个锁,这个方法可以在任何线程中调用,不单指获得锁的线程;
  • locked():当锁被获取时,返回 True;

示例 1:acquire/release

Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import threading
import time

# 共享资源
counter = 0
lock = threading.Lock()


def increment_counter():
    global counter
    # 获取锁
    lock.acquire()
    try:
        local_counter = counter
        time.sleep(0.1)  # 模拟其他操作
        local_counter += 1
        counter = local_counter
    finally:
        # 释放锁,任务出错,锁也能被释放,防止死锁
        lock.release()


if __name__ == "__main__":
    # 创建线程
    threads = []
    for i in range(5):
        thread = threading.Thread(target=increment_counter)
        threads.append(thread)
        thread.start()

    # 等待所有线程完成
    for thread in threads:
        thread.join()

    print(f"Final counter value: {counter}")

# 输出:
# Final counter value: 5

示例 2:with

Text Only
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import threading

lock = threading.Lock()
counter = 0

def task():
    global counter
    with lock:  # 使用 with 自动获取和释放锁
        local_counter = counter
        local_counter += 1
        counter = local_counter
        print(f"Counter value: {counter}")

if __name__ == "__main__":
    threads = []
    for _ in range(5):
        t = threading.Thread(target=task)
        threads.append(t)
        t.start()

    for t in threads:
        t.join()

    print(f"Final counter value: {counter}")

# 输出:
# Counter value: 1
# Counter value: 2
# Counter value: 3
# Counter value: 4
# Counter value: 5
# Final counter value: 5

2.4.2 RLock (递归锁)

RLock 表示可重入锁(reentrant lock),允许同一个线程多次获取同一把锁而不会导致死锁。每次 RLock.acquire() 的调用必须匹配一次 RLock.release(),只有在所有的锁定请求都被释放后,锁才会被真正释放(在 “锁定/非锁定” 状态上附加了 "所属线程" 和 "递归等级" 的概念)。

Text Only
1
class threading.RLock

此类实现了重入锁对象,重入锁必须由获取它的线程释放,一旦线程获得了重入锁,同一个线程再次获取它将不阻塞;需要注意的是 RLock 其实是一个工厂函数,返回平台支持的具体递归锁类中最有效的版本的实例。

【备注】

  • RLock 锁支持上下文管理协议,因此推荐使用 with 而不是手动调用 acquire() 和 release() 来针对一个代码块处理锁的获取和释放。

基础方法:

  • acquire(blocking=True, timeout=-1):阻塞或非阻塞获得锁。
  • blocking = True (默认值):
    • 如无任何线程持有锁,则获取锁并立即返回;
    • 如有其他线程持有锁,则阻塞执行直至能够获取锁,或直至 timeout,如果将其设为一个正浮点数值的话;
    • 如同一线程持有锁,则再次获取该锁,并立即返回。 这是 Lock 和 RLock 之间的区别;Lock 将以与之前相同的方式处理此情况,即阻塞执行直至能够获取锁;
  • blocking = False:
    • 如无任何线程持有锁,则获取锁并立即返回;
    • 如有其他线程持有锁,则立即返回;
    • 如同一线程持有锁,则再次获取该锁并立即返回;

如果被多次调用,则未能调用相同次数的 release() 可能导致死锁。考虑将 RLock 用作上下文管理器而不是直接调用 acquire/release。

  • release(): 释放锁,自减递归等级。如果减到零,则将锁重置为非锁定状态(不被任何线程拥有),并且,如果其他线程正被阻塞着等待锁被解锁,则仅允许其中一个线程获得锁。如果自减后,递归等级仍然不是零,则锁保持锁定,仍由调用线程拥有。只有在调用方线程持有锁时才能调用此方法。如果在未获取锁的情况下调用此方法则会引发 RuntimeError。

示例 1:acquire/release

Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import threading
import time
# 共享资源
counter = 0
rlock = threading.RLock()


def increment():
    global counter
    rlock.acquire()
    try:
        local_counter = counter
        time.sleep(0.1)
        local_counter += 1
        counter = local_counter
    finally:
        rlock.release()


def increment_counter():
    global counter
    # 使用 RLock
    rlock.acquire()
    try:
        increment()  # 递归调用
    finally:
        rlock.release()


if __name__ == "__main__":
    # 创建线程
    threads = []
    for i in range(5):
        thread = threading.Thread(target=increment_counter)
        threads.append(thread)
        thread.start()

    # 等待所有线程完成
    for thread in threads:
        thread.join()

    print(f"Final counter value: {counter}")

# 输出:
# Final counter value: 5

示例 2:with

Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import threading

lock = threading.Lock()
counter = 0

def task():
    global counter
    with lock:  # 使用 with 自动获取和释放锁
        local_counter = counter
        local_counter += 1
        counter = local_counter
        print(f"Counter value: {counter}")

if __name__ == "__main__":
    threads = []
    for _ in range(5):
        t = threading.Thread(target=task)
        threads.append(t)
        t.start()

    for t in threads:
        t.join()

    print(f"Final counter value: {counter}")

# 输出:
# Counter value: 1
# Counter value: 2
# Counter value: 3
# Counter value: 4
# Counter value: 5
# Final counter value: 5

LockRLock 的区别

特性 Lock RLock
获取锁 只能获取一次,必须释放后才能再次获取 可以多次获取,适用于递归调用
适用场景 简单互斥操作 复杂递归或重复锁定的场景
是否支持多次获取
获取锁行为 acquire()release() 必须一一对应 每次调用 acquire() 后可多次调用 release()

2.4.3 Condition (条件对象)

threading.Condition 是 Python 中用于线程同步的条件变量,常用于多个线程之间协调某个共享资源的访问。通过 Condition,线程可以等待某个条件的发生,直到另一个线程通知它们该条件已被满足,线程才会继续执行。这通常用于生产者-消费者模型或类似的并发模式。

Text Only
1
class threading.Condition(lock=None)
  • lock: 一个可选的锁(如 Lock 或 RLock)对象。如果不指定,Condition 会内部创建一个 RLock 作为锁对象。

Condition 对象结合了锁机制和条件变量的功能,使用时它必须与锁结合。当线程需要等待某个条件时,它首先会获取锁,然后调用 wait() 方法挂起自己,释放锁,直到其他线程调用 notify() 或 notify_all() 来通知它条件已经满足。

常用方法

  • wait(timeout=None): 当前线程等待,直到其他线程通知条件变量,或直到可选的 timeout 时间结束为止。调用 wait() 时,线程会释放锁,并在被唤醒或超时后重新获得锁。
  • wait_for(predicate, timeout=None):等待特定条件(predicate: 可调用对象而且它的返回值可被解释为一个布尔值)为 True。它提供了比 wait() 更加智能的等待机制,可以通过一个条件函数(predicate)来判断是否满足条件,而不是单纯依赖于外部 notify() 信号。
  • notify(n=1): 通知等待条件的线程,默认唤醒一个线程。如果 n 大于 1,则唤醒指定数量的线程。只有在持有锁的情况下才能调用。如果没有线程在等待,这是一个空操作。
  • notify_all(): 唤醒所有等待该条件的线程。
  • acquire(), release(): 获取和释放与条件关联的锁,通常在使用 with 语句时自动处理锁的获取和释放。
  • 代码片段:
Text Only
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
# 消费一个条目
with cv:
    while not an_item_is_available():
        cv.wait()
    get_an_available_item()

# 生产一个条目
with cv:
    make_an_item_available()
    cv.notify()

wait_for():

Text Only
1
2
3
4
# 消费一个条目
with cv:
    cv.wait_for(an_item_is_available)
    get_an_available_item()

完整示例: 生产者-消费者

Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
import threading
import time

class ProducerConsumer:
    def __init__(self):
        self.items = []  # 用于存储共享数据
        self.condition = threading.Condition()  # 创建条件变量
        self.producer_done = False  # 标志生产者是否完成

    def producer(self):
        for i in range(1, 6):
            with self.condition:  # 每次生产商品时获取条件的锁
                print(f"Producing item {i}")
                self.items.append(i)  # 生产一个商品
                self.condition.notify_all()  # 通知消费者有新商品
            time.sleep(0.5)  # 模拟生产过程
        # 生产完成,标记生产者已完成
        with self.condition:
            self.producer_done = True
            self.condition.notify_all()  # 通知所有消费者,生产结束

    def consumer(self):
        while True:
            with self.condition:
                # 消费者线程被唤醒后,检查是否有商品可消费
                while not self.items and not self.producer_done:  # 如果 items 为空,等待生产者通知
                    print(f"{threading.current_thread().name} - No items to consume, waiting...")
                    self.condition.wait()  # 等待被生产者唤醒
                if not self.items and self.producer_done:  # 生产者完成且没有商品可消费
                    print(f"{threading.current_thread().name} - All items consumed, exiting...")
                    break
                # 消费商品
                item = self.items.pop(0)
                print(f"{threading.current_thread().name} - Consumed item {item}")
            # 这里消费完商品后,已经释放锁,再进行模拟消费的延时
            time.sleep(0.1)  # 模拟消费过程

if __name__ == "__main__":
    pc = ProducerConsumer()

    # 创建多个消费者线程
    consumers = [threading.Thread(target=pc.consumer, name=f"Consumer-{i+1}") for i in range(2)]

    # 创建一个生产者线程
    producer_thread = threading.Thread(target=pc.producer)

    # 启动所有消费者线程
    for consumer in consumers:
        consumer.start()

    time.sleep(0.5)  # 让消费者先启动,模拟现实中的消费等待
    producer_thread.start()

    # 等待所有线程结束
    producer_thread.join()
    for consumer in consumers:
        consumer.join()

使用 while 循环检查所要求的条件成立与否是有必要的,因为 wait() 方法可能要经过不确定长度的时间后才会返回,而此时导致 notify() 方法调用的那个条件可能已经不再成立。这是多线程编程所固有的问题。 wait_for() 方法可自动化条件检查,并简化超时计算。

wait_for():示例

Text Only
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import threading
import time


class ProducerConsumer:
    def __init__(self):
        self.items = []  # 共享资源列表
        self.condition = threading.Condition()

    def producer(self):
        with self.condition:
            for i in range(1, 6):
                print(f"Producing item {i}")
                self.items.append(i)  # 生产物品
                self.condition.notify()  # 通知消费者
                time.sleep(1)

    def consumer(self):
        def item_available():  # 谓词函数:检测 items 是否有元素
            return len(self.items) > 0


        with self.condition:
            self.condition.wait_for(item_available)  # 等待 items 不为空
            item = self.items.pop(0)  # 消费物品
            print(f"Consumed item {item}")


if __name__ == "__main__":
    pc = ProducerConsumer()

    # 创建消费者线程
    consumer_thread = threading.Thread(target=pc.consumer)
    # 创建生产者线程
    producer_thread = threading.Thread(target=pc.producer)

    consumer_thread.start()
    time.sleep(0.5)  # 模拟消费者等待
    producer_thread.start()

    consumer_thread.join()
    producer_thread.join()

2.4.4 Semaphore (信号量对象)

Semaphore 用于控制同时进行的线程数量,确保资源不会被超过设定数量的线程同时使用。一个信号量管理一个内部计数器,该计数器因 acquire() 方法的调用而递减,因 release() 方法的调用而递增。 计数器的值永远不会小于零;当 acquire() 方法发现计数器为零时,将会阻塞,直到其它线程调用 release() 方法。

Text Only
1
class threading.Semaphore(value=1)
  • Semaphore 是信号量的一个实现,用于保护有限资源,防止多个线程同时访问,或者限制某个代码段的并发执行数量;
  • 参数 value: 代表信号量的初始值。value 的默认值为 1,意味着它像一个互斥锁(mutex)一样,仅允许一个线程访问资源。如果设置为大于 1 的值,则表示可以有多个线程同时访问受保护的资源。当 value 为 0 时,表示完全阻塞,所有线程需要等待信号量的释放才能继续执行;
  • 信号量对象也支持上下文管理协议(with);

示例 信号量通常用于保护数量有限的资源,例如数据库服务器。在资源数量固定的任何情况下,都应该使用有界信号量。在生成任何工作线程前,应该在主线程中初始化信号量。

Text Only
1
2
3
maxconnections = 5
# ...
pool_sema = BoundedSemaphore(value=maxconnections)

工作线程生成后,当需要连接服务器时,这些线程将调用信号量的 acquire 和 release 方法:

Text Only
1
2
3
4
5
6
with pool_sema:
    conn = connectdb()
    try:
        # ... 使用连接 ...
    finally:
        conn.close()

使用 Semaphore 控制最多允许 3 个线程同时访问一个共享资源:

Text Only
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
import threading
import time

semaphore = threading.Semaphore(2)

def access_resource():
    with semaphore:
        print(f"{threading.current_thread().name} accessing resource.")
        time.sleep(2)

threads = []
for i in range(4):
    t = threading.Thread(target=access_resource)
    threads.append(t)
    t.start()

for t in threads:
    t.join()

2.4.5 Event (事件对象)

Event 对象通过内部的一个布尔标志来实现线程间的协调和通信,当布尔标志为 True 时,所有等待的线程都会被唤醒;当布尔标志为 False 时,所有等待的线程都会被阻塞。

class threading.Event:实现事件对象的类。事件对象管理一个内部标识,调用 set() 方法可将其设置为true。调用 clear() 方法可将其设置为 false 。调用 wait() 方法将进入阻塞直到标识为true。这个标识初始时为 false 。

主要方法

  • set():将内部的标志设为 True,表示事件发生,并唤醒所有等待此事件的线程;
  • clear():将内部的标志重置为 False,使得后续调用 wait() 的线程进入等待状态;
  • is_set():返回内部标志的状态,如果标志为 True 则返回 True,否则返回 False;
  • wait(timeout=None):阻塞当前线程,直到内部标志为 True 时唤醒线程。如果设置了 timeout 参数,线程会等待指定的秒数,如果超时还没有被唤醒,则返回 False;

工作流程

  • 当 Event 对象的内部标志为 False 时,调用 wait() 的线程会被阻塞,直到其他线程调用 set() 方法将标志设为 True;
  • 一旦标志被设为 True,所有调用 wait() 的线程都会被唤醒,并可以继续执行;

示例

示例 1:生产者与消费者之间的事件同步

Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import threading
import time

# 创建一个事件对象
event = threading.Event()

def producer():
    print("Producer is working...")
    time.sleep(3)  # 模拟生产过程
    print("Producer has produced an item!")
    event.set()  # 生产完成,触发事件

def consumer():
    print("Consumer is waiting for an item...")
    event.wait()  # 等待事件被触发
    print("Consumer has consumed the item!")

if __name__ == "__main__":
    # 创建生产者线程和消费者线程
    producer_thread = threading.Thread(target=producer)
    consumer_thread = threading.Thread(target=consumer)

    # 启动线程
    consumer_thread.start()
    producer_thread.start()

    # 等待线程结束
    producer_thread.join()
    consumer_thread.join()

    print("All done!")

# 输出:
# Consumer is waiting for an item...
# Producer is working...
# Producer has produced an item!
# Consumer has consumed the item!
# All done!
  • 消费者线程首先调用 event.wait(),进入等待状态;
  • 生产者线程经过 3 秒钟后,调用 event.set(),将事件标志设为 True,这时消费者线程被唤醒并继续执行;
  • 生产者和消费者同步完成后,程序正常退出;

示例 2:定时触发的事件

Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import threading
import time

# 创建事件对象
event = threading.Event()


def worker():
    while not event.is_set():
        # 每隔 2 秒检查一次事件是否已触发
        print("Waiting for the event to be set...")
        event.wait(2)
    print("Event has been set, continuing work...")


def trigger_event():
    time.sleep(5)
    print("Setting the event...")
    event.set()


if __name__ == "__main__":
    # 启动工作线程
    worker_thread = threading.Thread(target=worker)
    worker_thread.start()

    # 启动触发事件的线程
    trigger_thread = threading.Thread(target=trigger_event)
    trigger_thread.start()

    # 等待所有线程结束
    worker_thread.join()
    trigger_thread.join()

# 输出:
# Waiting for the event to be set...
# Waiting for the event to be set...
# Waiting for the event to be set...
# Setting the event...
# Event has been set, continuing work...

2.4.6 Timer (定时器对象)

线程定时器对象 (threading.Timer) 是 threading 模块中的一种高级线程工具,专门用于在特定的时间后执行某个操作。Timer 类继承自 Thread 类,本质上是一个延时启动的线程,允许在指定时间间隔后调用某个函数。

class threading.Timer(interval, function, args=None, kwargs=None):创建一个定时器,经过 “interval” 秒的间隔时间后,将会用参数 “args” 和关键字参数 “kwargs” 调用 “function”。如果 “args” 为 “None” (默认值),则会使用一个空列表。如果 “kwargs” 为 “None” (默认值),则会使用一个空字典。

例如:

Text Only
1
2
3
4
5
def hello():
    print("hello, world")

t = Timer(30.0, hello)
t.start()  # 30 秒之后,将打印 "hello, world"

方法:

  • start(): 启动定时器,开始倒计时,倒计时结束后执行目标函数;
  • cancel(): 如果定时器仍在倒计时中,调用此方法可以取消定时器,避免目标函数的执行;

示例:

Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import threading
import time


def worker(name, message):
    print(f"Hello, {name}! {message}")


if __name__ == '__main__':
    # 创建一个 3 秒后执行的定时器,并传递参数
    timer = threading.Timer(3, worker, args=("Alice", "How are you today?"))

    print("Timer started!")
    timer.start()  # 启动定时器

    # 继续执行主线程中的任务
    for i in range(5):
        print(f"Main thread working... {i + 1}")
        time.sleep(1)

# 输出:
# Timer started!
# Main thread working... 1
# Main thread working... 2
# Main thread working... 3
# Hello, Alice! How are you today?
# Main thread working... 4
# Main thread working... 5

取消定时器:

Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import threading
import time

def say():
    print("Hello, world!")



if __name__ == '__main__':
    # 创建一个 5 秒后执行的定时器
    timer = threading.Timer(5, say)

    print("Timer started!")
    timer.start()  # 启动定时器

    # 主线程等待 2 秒后取消定时器
    time.sleep(2)
    timer.cancel()
    print("Timer canceled!")

# 输出:
# Timer started!
# Timer canceled!

2.4.7 Barrier (栅栏对象)

在 Python 3 中,线程栅栏对象 (threading.Barrier) 是一种线程同步机制,能够协调一组线程并确保它们在执行的某个阶段上达到同步。栅栏的作用类似于一个路障,要求所有线程在同一个点等待,直到某个条件满足,然后才能继续执行。

class threading.Barrier(parties, action=None, timeout=None)

  • parties:必须参数,表示需要等待的线程数。只有指定数量的线程都到达栅栏时,这些线程才会继续执行;
  • action:可选参数,当最后一个线程到达栅栏时,执行的可选操作(回调函数);
  • timeout:可选参数,定义一个栅栏的默认超时时间。如果超过这个时间,等待栅栏的线程会抛出 BrokenBarrierError 异常;

常用方法

  • wait(timeout=None):所有线程调用此方法后,会进入等待状态,直到指定数量的线程都调用了 wait()。当所有线程都到达栅栏后,线程可以继续执行。如果提供了 timeout 参数,则超时后线程抛出 BrokenBarrierError;
  • 函数返回值是一个整数,取值范围在0到 parties -- 1,在每个线程中的返回值不相同。可用于从所有线程中选择唯一的一个线程执行一些特别的工作。例如:

    Text Only
    1
    2
    3
    4
    i = barrier.wait()
    if i == 0:
        # 只有一个线程需要打印此文本
        print("passed the barrier")
    
  • reset():重置栅栏,允许重新使用栅栏,但当前处于等待的线程会抛出 BrokenBarrierError;

  • abort():中止栅栏,任何在等待中的线程都会抛出 BrokenBarrierError;
  • broken:检查栅栏是否已被打破(返回布尔值 True 或 False);
  • parties:需要同步的线程的数量;
  • n_waiting:当前已达到栅栏,并处于等待状态的线程数量;

示例

使用 Barrier 同步线程:

Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import threading
import time

def worker(barrier, worker_id):
    print(f"Worker {worker_id} is waiting at the barrier.")
    worker_ready = barrier.wait()  # 等待其他线程到达栅栏
    if worker_ready == 0:  # 选择唯一的一个线程打印如下内容
        print(f"Worker {worker_id} is the last to reach the barrier!")
    print(f"Worker {worker_id} is proceeding.")

if __name__ == "__main__":
    num_threads = 4  # 定义线程数
    barrier = threading.Barrier(num_threads)  # 创建栅栏,线程数为 4

    threads = []
    for i in range(num_threads):
        thread = threading.Thread(target=worker, args=(barrier, i))
        threads.append(thread)
        thread.start()

    for thread in threads:
        thread.join()

    print("All workers have passed the barrier.")

设置回调函数:通过 action 参数,可以在最后一个线程到达栅栏(Barrier)时执行一个额外的动作,例如记录日志或进行某种特定操作。

Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import threading
import time

def barrier_action():
    print("All threads have reached the barrier. Executing action!")

def worker(barrier, worker_id):
    print(f"Worker {worker_id} is waiting at the barrier.")
    barrier.wait()  # 等待其他线程到达栅栏
    print(f"Worker {worker_id} is proceeding.")

if __name__ == "__main__":
    num_threads = 4
    # 创建栅栏,指定 action 为 barrier_action
    barrier = threading.Barrier(num_threads, action=barrier_action)

    threads = []
    for i in range(num_threads):
        thread = threading.Thread(target=worker, args=(barrier, i))
        threads.append(thread)
        thread.start()

    for thread in threads:
        thread.join()

    print("All workers have passed the barrier.")

3 协程 (Coroutine)

协程是 Python 中的一种轻量级并发模型,通过异步编程实现任务的切换。协程与进程和线程不同,它不是由操作系统调度,而是通过程序本身来控制。

详见 21 章节,或者官网:https://docs.python.org/zh-cn/3/library/asyncio.html

概念:

  • 协程是一种特殊的生成器,能够在其执行中暂停和恢复。它们可以用 async def 定义,使用 await 关键字在协程内调用其他协程;
  • 与传统线程不同,协程不需要多线程的上下文切换,因此开销较小,性能更高;

协程的定义与运行:
定义协程:协程使用 async def 关键字定义

Text Only
1
2
3
4
5
async def worker():
    print("Start coroutine")
    # 模拟异步 I/O 操作
    await asyncio.sleep(5)
    print("End coroutine")

运行协程:可以使用 asyncio 模块中的事件循环来运行协程

Text Only
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
import asyncio

async def main():
    # 调用协程
    await worker()


if __name__ == '__main__':
    # 运行事件循环
    asyncio.run(main())

示例:使用协程进行异步编程
下面是一个示例,演示如何使用协程来并发处理多个任务:

Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import asyncio


async def get_data(num):
    """
    定义了一个异步函数,模拟从网络获取数据的过程,使用 await asyncio.sleep(2) 模拟耗时的操作
    """
    print(f"Task {num}: Start fetching data...")
    await asyncio.sleep(2)  # 模拟耗时的网络请求
    print(f"Task {num}: Data fetched!")
    return f"Data {num}"


async def main():
    """
    创建多个协程任务并通过 asyncio.gather() 并发运行它们。gather 会等待所有协程完成,并返回结果
    """
    tasks = [get_data(i) for i in range(1, 4)]  # 创建多个协程任务
    results = await asyncio.gather(*tasks)  # 并发运行所有协程
    print("All tasks completed.")
    print(results)


# 运行事件循环
if __name__ == "__main__":
    """
    运行 main() 协程并启动事件循环
    """
    asyncio.run(main())

# 输出:
# Task 1: Start fetching data...
# Task 2: Start fetching data...
# Task 3: Start fetching data...
# Task 1: Data fetched!
# Task 2: Data fetched!
# Task 3: Data fetched!
# All tasks completed.
# ['Data 1', 'Data 2', 'Data 3']

或者:

Text Only
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
import asyncio


async def task_1():
    print("Task 1 started")
    await asyncio.sleep(2)
    print("Task 1 completed")


async def task_2():
    print("Task 2 started")
    await asyncio.sleep(1)
    print("Task 2 completed")


async def main():
    # 并发执行多个任务
    await asyncio.gather(task_1(), task_2())


# 启动事件循环
asyncio.run(main())

或者:

Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import asyncio


async def task_1():
    print("Task 1 started")
    await asyncio.sleep(2)
    print("Task 1 finished")


async def task_2():
    print("Task 2 started")
    await asyncio.sleep(1)
    print("Task 2 finished")


async def main():
    task1 = asyncio.create_task(task_1())
    task2 = asyncio.create_task(task_2())

    await task1
    await task2


asyncio.run(main())


# 输出: task_1 需要 2 秒完成,但 task_2 只需要 1 秒,因此它会先完成
# Task 1 started
# Task 2 started
# Task 2 finished
# Task 1 finished

常见 asyncio 函数

  • asyncio.create_task(coro): 将协程封装为 Task,可以让事件循环并发地执行多个任务;
  • asyncio.gather(*coros): 并发地运行多个协程,并收集结果;
  • asyncio.sleep(seconds): 模拟异步等待,通常用于测试;
  • asyncio.run(coro): 运行协程,自动处理事件循环的创建和关闭;

协程优势

  • 非阻塞:在等待 I/O 操作时,协程不会阻塞其他任务的执行,可以有效提高程序的并发能力;
  • 轻量级:协程比线程占用的内存更少,创建和切换协程的开销更小;
  • 简洁:通过 async/await 语法,异步代码更易于阅读和理解;

适用场景

  • 高并发 I/O 密集型任务(如网络请求、文件读写等);
  • 需要频繁进行异步操作的应用(如 Web 服务器、爬虫等);

注意事项

  • 协程只能在异步上下文中运行,不能直接在普通函数中调用;
  • 需要使用 asyncio.run() 或者创建一个事件循环来调度协程的执行;

4 并行 & 并发

在计算机科学中,“并行”和“并发”是两种不同的概念,它们虽然都涉及到多任务处理,但在执行任务的方式上有着明显的区别。下面分别对这两个概念进行解释:

4.1 并行 (Parallelism)

并行指的是多个任务同时在多个处理器或 CPU 核心上运行。并行任务的执行方式是同时进行的,多个任务真正地同时执行,以此来提高程序的执行效率。

特点:

  • 物理并行:需要多个 CPU 核心或处理器来实现,因为每个任务会在不同的核心上同时运行;
  • 同时执行:所有任务在同一时刻运行,并行计算可以提高程序执行的速度,尤其是在多核 CPU 环境下;
  • 适用于 CPU 密集型任务:并行更适合需要大量计算的任务,如科学计算、图像处理等;

举例:
在多核处理器中,一个核心执行任务 A,另一个核心同时执行任务 B,这就是并行。比如图像渲染或视频编码时,每个帧可以分配给不同的处理器核心。

图示:

Text Only
1
2
3
4
5
任务 A |=====| 
任务 B |=====| 
任务 C |=====| 

同一时刻多个任务同时执行。

4.2 并发 (Concurrency)

并发是指在一个时间段内,有多个任务可以交替执行。并发不一定要求多个任务同时进行,而是多个任务在多个时间片内切换执行。它的目的是提高系统的响应性,而不是直接加快任务的执行速度。

特点:

  • 任务交替执行:即使只有一个 CPU 核心,多个任务也可以通过切换时间片的方式交替执行,达到 “同时处理多个任务” 的效果;
  • 逻辑并行:在单核处理器上,并发任务实际上是按顺序执行的,但由于任务切换速度非常快,用户感觉像是同时执行的;
  • 适用于 I/O 密集型任务:并发适合需要等待 I/O 操作的任务,如网络请求、文件读写等,因为任务在等待时可以让出 CPU 资源,让其他任务继续执行;

举例:
一个操作系统同时处理多个任务,比如用户在浏览网页的同时,音乐播放器在后台播放音乐。这些任务实际上是交替运行的,操作系统快速切换任务,使得用户感觉它们是在同时进行。

图示:

Text Only
1
2
3
4
5
任务 A |==    ==    ==|
任务 B |  ==    ==    ==|
任务 C |    ==    ==    ==|

任务 A、B、C 轮流执行,在切换时间片时交替运行。

4.3 并行与并发的区别

特性 并行 (Parallelism) 并发 (Concurrency)
核心概念 多个任务真正同时执行 多个任务交替执行
依赖硬件 依赖多核 CPU 或多处理器 可以在单核或多核 CPU 上实现
适用场景 CPU 密集型任务 (如计算、渲染) I/O 密集型任务 (如网络请求、文件读写)
执行效率 提高任务执行的速度 提高系统响应性
执行方式 物理上同时运行 逻辑上同时运行
资源共享 每个任务可能运行在独立的资源上 任务之间共享资源或切换使用资源
代表模型 多进程 (Multiprocessing) 多线程、协程

总结:

  • 并行是多任务真正同时在不同的处理器上运行,需要硬件支持多核 CPU;
  • 并发是在同一时间段内交替执行多个任务,通常不需要多个核心,而是通过任务切换实现任务的同时处理效果;

可以根据程序的需求来选择并行或并发。例如,如果需要处理大量计算,可以选择并行;如果任务涉及大量 I/O 操作,选择并发会更合适。

5 进程、线程、协程对比

特性 进程 线程 协程
并行/并发 并行 并发 并发
内存空间 独立 共享 共享
创建开销
通信方式 IPC (管道等) 共享变量 事件循环
GIL 限制 有 (CPython)
适用场景 CPU 密集型任务 I/O 密集型任务 I/O 密集型任务

总之,进程适合 CPU 密集型任务,而线程和协程适合 I/O 密集型任务。协程是目前在 Python 中实现高并发的首选方式,尤其适用于网络应用和异步操作。 可以根据具体的应用场景选择适合的并发模型。进程是指在系统中正在运行的一个应用程序,是CPU的最小工作单元,一个进程可以有一个或多个线程,一个线程可以有很多协程。