python 多进程浅谈

什么是进程

进程是计算机中执行程序的活动实体,它是计算机系统中资源管理和调度的基本单位。一个进程可以看作是一个独立的程序执行流,具有自己的内存空间、数据栈、程序计数器、寄存器等 system-level 的控制结构。进程能够并行运行,互相之间相互独立,且不会相互干扰。

在操作系统中,每个进程都被赋予了一个唯一的标识符(PID),用于标识该进程的身份和状态。操作系统通过 PID 来管理和调度进程的资源,如内存空间、CPU 时间和 I/O 等。

每个进程都由一个或多个线程组成,线程是进程中的执行单元,用于实现程序的并发执行。在多线程编程中,我们通常会对一个进程中的多个线程进行协同调度,以便让它们高效地利用计算机的资源来完成任务。

进程还可以通过进程间通信的方式来实现数据共享和协同处理。常见的进程间通信方式包括管道、消息队列、共享内存和信号量等。

总之,进程是计算机系统中资源管理和调度的基本单位,它是程序执行的载体,同时也是计算机性能和可扩展性的关键因素之一。

进程的状态

在操作系统中,进程有以下几种状态:

  1. 创建状态(New):当一个新的进程被创建,但还没有分配到任何资源时,它处于创建状态。

  2. 就绪状态(Ready):在进程被分配到足够的资源后,它等待 CPU 调度执行的过程中,它处于就绪状态。

  3. 运行状态(Running):CPU 正在执行该进程的指令时,该进程处于运行状态。

  4. 阻塞状态(Blocked):当一个进程正在等待某个外部事件发生时,如等待 I/O 操作完成或等待信号量释放,它处于阻塞状态。在阻塞状态中,进程无法执行任何指令,等待外部事件结束后会重新变为就绪状态。

  5. 终止状态(Terminated):当进程完成其任务或被操作系统强制终止时,该进程处于终止状态。

总的来说,进程可以根据其所处的状态来进行调度和管理。进程状态的转换通常是由操作系统负责管理和控制的。

进程与线程的区别

进程和线程是操作系统中的两个核心概念,它们都可以实现程序的并发执行,但二者之间有以下几个区别:

  1. 资源占用:进程是操作系统资源分配和调度的基本单位,它拥有自己的地址空间、堆栈、文件句柄等系统资源,因此占用的资源较多。而线程共享进程的资源,包括地址空间、文件句柄和其他系统资源。因此创建和销毁线程的开销较小。

  2. 并发性:由于进程之间相互独立,所以它们可以并发地执行。不同进程之间可以通过进程间通信的方式来实现数据共享和协同处理。而线程共享进程的地址空间和系统资源,因此需要进行同步和互斥来保证执行的正确性。

  3. 切换开销:在同一进程中的线程之间切换的开销相对较小,因为它们共享同一个地址空间。而进程之间切换的开销相对较大,因为每个进程都拥有自己的地址空间。

  4. 执行效率:在某些情况下,多线程的执行效率会比多进程高,因为线程之间的切换和调度开销相对较小。但是多线程也存在诸多问题,如数据共享和同步、死锁等。

总的来说,进程和线程的主要区别在于资源占用、并发性、切换开销和执行效率等方面。选择适当的并发编程模型需要根据实际需求和资源情况进行权衡和评估。

GIL全局解释器锁(扩展)

  • io密集型:涉及到网络、磁盘IO的任务都是IO密集型任务,这类任务的特点是CPU消耗很 少,任务的大部分时间都在等待IO操作完成(因为IO的速度远远低于CPU和内存的速度)

  • cup密集型:cup密集型也称为计算密集型,任务的特点是要进行大量的计算,消耗CPU资 源,比如计算圆周率、对视频进行高清解码等等,全靠CPU的运算能力

GIL(全局解释器锁)是 Python 解释器中的一种机制,用于保证在同一时间只有一个线程可以执行 Python 代码。换句话说,GIL 限制了 Python 的多线程并发性能。

对于并发和并行,它们都是指多个任务同时执行的情况。并发指的是多个任务交替执行,每个任务都获得一定的时间片后再让出 CPU 给其他任务。而并行指的是多个任务真正同时执行,每个任务都获得一个独立的 CPU 核心。

对于多线程和多进程,它们都是实现并发和并行的方式。多线程指的是在同一进程内同时运行多个线程,各个线程之间共享进程资源。而多进程指的是创建多个进程来并行执行不同的任务,在各自的地址空间内运行,相互之间相互独立。

与 GIL 相关的问题主要影响多线程编程,因为在 Python 中,一个进程中只有一个主线程能够直接执行 Python 代码,其他线程必须通过在主线程中运行 Python 程序来获取 GIL 并进行计算。这就导致了当多个线程需要竞争同一资源时,会相互阻塞,从而限制了线程并发的效率。

相比之下,多进程则不受 GIL 的限制,因为每个进程都有自己独立的解释器和资源空间,可以在完全独立的进程中运行,相互之间没有阻塞限制,从而提高了多进程的并行效率。但是,多进程的创建和销毁等过程会占用更多的系统资源,因此需要根据实际情况进行选择。

总的来说,GIL 对于 Python 多线程编程存在着一定的限制,因此在需要进行大量 CPU 密集型计算的场景下,建议使用多进程实现并发和并行。当然,在其它一些 I/O 密集型的场景下(如网络通信等),仍然可以通过多线程等方式实现高效并发。(线程释放GIL锁的情况: 在IO操作等可能会引起阻塞的system call之前,可以暂时释放GIL, 但在执行完毕后,必须重新获取GIL Python 3.x使用计时器(执行时间达到阈值后,当前线程 释放GIL)或Python 2.x,tickets计数达到100

在 Python 中,多线程并不能真正实现并行。

如果需要实现并行,可以考虑使用多进程、协程等方式。多进程创建独立的进程空间,也就意味着每个进程都有自己独立的解释器和资源空间,可以在完全独立的进程中运行,相互之间没有阻塞限制,从而提高了并行效率。而协程则是在单个线程中实现多个任务之间切换,避免了线程之间的上下文切换开销和资源竞争问题,从而提高了并发效率。

multiprocessing

使用 multiprocessing 模块实现多进程:这是 Python 官方推荐的实现多进程的方法之一,multiprocessing 可以同样在 Windows 和 Unix/Linux 等操作系统中运行。

import multiprocessing

def worker():
    print('I am worker process with pid %d' % os.getpid())

if __name__ == '__main__':
    print('I am main process with pid %d' % os.getpid())
    p = multiprocessing.Process(target=worker)
    p.start()
    p.join()

在这个示例中,我们使用 multiprocessing.Process 类创建一个新进程,并指定要执行的代码段是 worker() 函数。然后,我们使用 start() 方法启动子进程,使用 join() 方法等待子进程执行完毕后再退出。

multiprocessing 的使用

Process([group [, target [, name [, args [, kwargs]]]]])multiprocessing.Process 类的构造函数,用于创建一个新的进程对象。该函数的参数介绍如下:

  • group:指定进程组,通常用于在一组进程之间进行批量操作,默认为 None

  • target:指定要执行的任务函数,该函数必须是可调用对象并且不带任何参数。

  • name:指定进程的名称,默认为随机生成的唯一字符串。

  • args:要传递给 target 函数的位置参数,必须以元组的形式指定。

  • kwargs:要传递给 target 函数的关键字参数,必须以字典的形式指定。

Process创建的实例对象的常用方法:

Process 类创建的进程实例对象拥有下列常用方法:

  • start():启动进程并执行任务函数。

  • join([timeout]):阻塞当前进程,等待子进程结束。如果设置了 timeout,则最多等待 timeout 秒;如果 timeout 超时,则会抛出 TimeoutError 异常。

  • terminate():强制终止进程,不会进行清理操作。

  • is_alive():判断进程是否还在执行。

  • close():释放进程所占用的资源。

以下是一个使用 Process 类创建新进程的示例:

import multiprocessing
import time


def worker(name):
    print(f"{name} is running in process {multiprocessing.current_process().name}")
    if name == 'p1':
        time.sleep(5)
        print(f"{name} finished")
    else:
        print(f"{name} finished")


if __name__ == '__main__':
    p1 = multiprocessing.Process(target=worker, args=('p1',))
    p2 = multiprocessing.Process(target=worker, args=('p2',))

    p1.start()
    p2.start()

    p1.join(2)

    if p1.is_alive():
        p1.terminate()

    p2.join()

    print("All processes finished")


# 输出
p1 is running in process Process-1
p2 is running in process Process-2
p2 finished
All processes finished

这段代码使用了 Python 的 multiprocessing 模块实现了多进程编程。具体来说,它创建了两个子进程 p1p2,并分别启动它们运行 worker 函数。

worker 函数用于模拟进程执行任务的过程。在这个函数中,我们首先输出了进程名和进程 ID,以便于在输出中区分不同的进程。然后,如果当前进程名是 p1,就让进程休眠 5 秒钟,然后输出进程已经完成任务的信息。

在主函数中,我们首先调用 p1.start()p2.start() 方法启动两个进程。然后,我们使用 p1.join(2) 方法等待 2 秒钟并检查 p1 进程是否结束,如果没有结束,则调用 p1.terminate() 方法强制终止该进程。

最后,我们使用 p2.join() 方法等待 p2 进程完成,然后输出一条 "All processes finished" 的信息,说明整个程序已经正常结束。

总的来说,这段代码演示了如何使用 Python 的 multiprocessing 模块进行多进程编程,并且介绍了一些常用的多进程编程技巧和注意事项,例如使用进程间通信、在进程中捕获异常、合理处理进程退出等。

使用 p1.is_alive()p1.terminate() 可以增强程序的健壮性,避免进程异常卡死导致整个程序无法运行。当然,在实际开发中,是否需要使用这两个方法还要根据具体情况进行判断,如果进程运行时间较短或者进程之间没有太多的关联,那么这两个方法可能就没必要使用了。

Process创建的实例对象的常用属性:

  1. name: 进程的名称,可以通过构造函数指定或者默认生成。可以使用 my_process.name 获取当前进程的名称。

  2. pid: 进程的 ID,是一个整数。在 Unix/Linux 系统上,可以使用系统工具如 ps 来查看进程 ID。在 Windows 上,需要使用第三方工具如 Process Explorer 或者 Task Manager。

  3. daemon: 是否为守护进程。如果设为 True,那么该进程会随着主进程的退出而自动终止。可以使用 my_process.daemon 获取当前进程是否为守护进程。

  4. exitcode: 进程的退出码,是一个整数。如果进程成功结束,那么退出码通常为 0。如果进程异常退出,那么退出码可能为其他值。可以使用 my_process.exitcode 获取当前进程的退出码。

import multiprocessing
import time


def worker(name):
    print(f"{name} is running in process {multiprocessing.current_process().name}")
    if name == 'p1':
        time.sleep(5)
        print(f"{name} finished")
    else:
        print(f"{name} finished")


if __name__ == '__main__':
    p1 = multiprocessing.Process(target=worker, args=('p1',), name='Process 1')
    p2 = multiprocessing.Process(target=worker, args=('p2',), name='Process 2')

    p1.start()
    p2.start()

    print(f"{p1.name} with pid {p1.pid} is daemon: {p1.daemon}")
    print(f"{p2.name} with pid {p2.pid} is daemon: {p2.daemon}")

    p1.join(2)

    if p1.is_alive():
        p1.terminate()

    p2.join()

    print(f"{p1.name} exit code: {p1.exitcode}")
    print(f"{p2.name} exit code: {p2.exitcode}")
    print("All processes finished")


# 输出
Process 1 with pid 11688 is daemon: False
Process 2 with pid 7868 is daemon: False
p1 is running in process Process 1
p2 is running in process Process 2
p2 finished
Process 1 exit code: None
Process 2 exit code: 0
All processes finished

在这个例子中,我们使用 name 参数指定了进程的名称,并通过 my_process.name 获取了当前进程的名称。我们还打印了进程的 ID (pid) 和是否为守护进程 (daemon) 的信息。

在程序运行结束后,我们使用 exitcode 属性获取了两个进程的退出码,并打印了所有进程的信息。

进程间全局变量共享问题

多个进程之间不能共享全局变量,这是因为多个进程之间的内存是相互独立的,各自有自己的地址空间,因此在一个进程中定义的变量,在另一个进程中并没有定义,所以不能直接共享。

from multiprocessing import Process
import os
import time

nums = [11, 22]


def work1():
    """子进程要执行的代码"""
    print("in process1 pid=%d ,nums=%s" % (os.getpid(), nums))
    for i in range(3):
        nums.append(i)
        time.sleep(1)
        print("in process1 pid=%d ,nums=%s" % (os.getpid(), nums))


def work2():
    """子进程要执行的代码"""
    print("in process2 pid=%d ,nums=%s" % (os.getpid(), nums))


if __name__ == '__main__':
    p1 = Process(target=work1)
    p1.start()
    p1.join()
    p2 = Process(target=work2)
    p2.start()

# 输出
in process1 pid=14940 ,nums=[11, 22]
in process1 pid=14940 ,nums=[11, 22, 0]
in process1 pid=14940 ,nums=[11, 22, 0, 1]
in process1 pid=14940 ,nums=[11, 22, 0, 1, 2]
in process2 pid=7204 ,nums=[11, 22]

以上代码可以看出来两个进程之间是不共享数据的,然而,Python 提供了很多方式来实现进程间的数据共享,比如使用 multiprocessing.Manager 类,它提供了一种进程间通信的方式,可以让多个进程之间共享数据。Manager 实例支持常见的容器类型(如列表、字典等),可以使用它们来存储多个进程共享的数据。

from multiprocessing import Process, Manager
import os
import time


def work1(nums):
    """子进程要执行的代码"""
    print("in process1 pid=%d ,nums=%s" % (os.getpid(), nums))
    for i in range(3):
        nums.append(i)
        time.sleep(1)
        print("in process1 pid=%d ,nums=%s" % (os.getpid(), nums))


def work2(nums):
    """子进程要执行的代码"""
    print("in process2 pid=%d ,nums=%s" % (os.getpid(), nums))


if __name__ == '__main__':
    manager = Manager()
    nums = manager.list([11, 22])

    p1 = Process(target=work1, args=(nums,))
    p1.start()
    p1.join()

    p2 = Process(target=work2, args=(nums,))
    p2.start()
    p2.join()

# 输出
in process1 pid=8804 ,nums=[11, 22]
in process1 pid=8804 ,nums=[11, 22, 0]
in process1 pid=8804 ,nums=[11, 22, 0, 1]
in process1 pid=8804 ,nums=[11, 22, 0, 1, 2]
in process2 pid=11032 ,nums=[11, 22, 0, 1, 2]

进程间通信方式

由于多个进程之间内存空间相互独立,不能直接进行数据共享。因此,Python 提供了多种进程间通信的机制,以实现进程间的数据交换和协作。

队列(queue):队列是一种先进先出的数据结构,可以在多个进程之间进行安全的数据交换。在 Python 中,可以使用 multiprocessing.Queue 创建一个队列对象,多个进程可以通过这个队列进行数据的读写。

进程中Queue的使用

  • 可以使用multiprocessing模块的Queue实现多进程之间的数据传递,Queue本身是一个消息列队程序,首先用一个小实例来演示一下Queue的工作原理:在父进程中创建两个子进程,一个往Queue里写数据,一个从Queue里读数据:

  • 注意点进程之间的queue要当做参数传进去(不共享全局变量)。

from multiprocessing import Process, Queue
"""
# 10个进程去完成100个任务
# a.每次需要处理任务时,会创建一个新的进程去处理任务
# b.一次性创建10个进程,每次需要处理任务时,就去取出空闲的进程去执行任务
"""


# 第一个子进程将队列中的数据加5000000次
def task1(queue_obj: Queue):
    d = queue_obj.get()
    for _ in range(5000000):
        d += 1
    print(f'task1中的数据为:{d}')
    queue_obj.put(d)


# 第二个子进程将队列中的数据加2000000次
def task2(queue_obj: Queue):
    d = queue_obj.get()
    for _ in range(2000000):
        d += 1
    print(f'task2中的数据为:{d}')
    queue_obj.put(d)


# 定义主程序
def main():
    # 1.创建队列对象
    queue_obj = Queue()

    # 2.往队列中添加数据
    queue_obj.put(0)

    # 3.创建两个子进程
    p1 = Process(target=task1, args=(queue_obj, ), name='子进程1')
    p2 = Process(target=task2, args=(queue_obj, ), name='子进程2')
    p1.start()
    p2.start()

    p1.join()
    p2.join()
    data = queue_obj.get()
    print(f'在主进程中,取出的数据为:{data}')


if __name__ == '__main__':
    main()

# 输出 
task1中的数据为:5000000
task2中的数据为:7000000
在主进程中,取出的数据为:7000000

这段代码创建了一个队列对象 queue_obj,往队列中添加了一个初始值为 0 的数据。然后创建了两个子进程 p1p2,分别执行 task1()task2() 函数。这两个函数分别从队列中取出数据,将其加 5000000 和 2000000 次,最后再将计算结果放回到队列中。然后等待子进程结束后,主进程从队列中取出最终的结果并输出。

需要注意的是,在这个程序中使用了多个进程对共享数据进行修改,因此需要确保读写数据的线程之间的同步和互斥。在这里,由于使用了 multiprocessing.Queue 对象,它已经内置了线程同步和互斥的机制,因此不需要额外考虑这些问题。

另外,这个程序的执行过程可能会比较耗时,因为两个子进程都需要对同一个数据进行大量的计算。如果要优化程序的性能,可以考虑使用多个进程同时对不同的数据进行计算,避免竞争和等待。

进程池

ProcessPoolExecutorconcurrent.futures 模块提供的一个进程池实现类,它支持上下文管理器协议,能够自动管理进程池中的子进程生命周期以及关闭并清理进程池资源。除此之外,ProcessPoolExecutor 还提供了以下几个方法:

submit(fn, args, *kwargs)

submit 方法用于向进程池中提交任务,返回一个 Future 对象,可以通过该对象获取任务的返回值、状态等信息。其中,参数 fn 表示要执行的函数,*args**kwargs 表示要传递给函数的参数。

下面是一个使用 submit 方法提交任务的示例代码:

from concurrent.futures import ProcessPoolExecutor

def worker(num):
    """子进程要执行的任务"""
    print(f"Worker {num} is running")
    return num * num

if __name__ == '__main__':
    with ProcessPoolExecutor(max_workers=4) as pool:
        # 提交 10 个任务给进程池,并获取相应的 Future 对象
        futures = [pool.submit(worker, i) for i in range(10)]

    # 获取所有任务的执行结果
    results = [f.result() for f in futures]
    print("Results:", results)

在上面的示例中,我们使用 submit 方法向进程池中提交了 10 个任务,并分别获取了对应的 Future 对象。然后在进程池中的所有任务执行完成之后,通过遍历 Future 对象列表获取每个任务的执行结果。

map(fn, *iterables, timeout=None, chunksize=1)

map 方法类似于 Python 内置函数 map,用于将一个函数应用于多个可迭代对象的元素上。与内置函数 map 不同的是,该方法会利用进程池的并发能力,将函数应用于多个元素的计算分配给多个进程并发执行,从而加快程序的运行速度。

下面是一个使用 map 方法提交任务的示例代码:

from concurrent.futures import ProcessPoolExecutor

def worker(num):
    """子进程要执行的任务"""
    print(f"Worker {num} is running")
    return num * num

if __name__ == '__main__':
    with ProcessPoolExecutor(max_workers=4) as pool:
        # 将 worker 方法应用到 range(10) 中的每个元素上
        results = pool.map(worker, range(10))

    # 获取所有任务的执行结果
    print("Results:", list(results))

在上面的示例中,我们使用 map 方法将 worker 函数应用于 range(10) 中的每个元素,并获取了每个任务的执行结果。

需要注意的是,map 方法返回一个迭代器,因此需要使用 list 函数将其转换为列表以获取所有任务的执行结果。另外,可以通过参数 timeoutchunksize 分别指定任务的超时时间和分块大小,以更好地控制任务的执行效率和质量。

参数 timeoutchunksizeProcessPoolExecutor.map() 方法中的可选参数,用于分别控制任务的超时时间和分块大小。

  • timeout: 该参数设置单个任务的最长等待时间。如果一个任务在指定的时间内没有执行完成,那么该任务将被终止并抛出 concurrent.futures.TimeoutError 异常。默认为 None,表示没有超时限制。 例如:

from concurrent.futures import ProcessPoolExecutor, TimeoutError
import time

def worker(num):
    """子进程要执行的任务"""
    time.sleep(num)
    return num * num

if __name__ == '__main__':
    with ProcessPoolExecutor(max_workers=4) as pool:
        results = pool.map(worker, range(10), timeout=2)

上面代码中,我们使用 map 方法提交了 10 个任务,并设置了 timeout 参数为 2 秒。即使某个任务需要执行的秒数超过 2 秒,它也会被强制终止并抛出超时异常。

  • chunksize: 该参数用于指定每次向子进程发送的批处理任务数量。默认为 1,表示每次只发送一个任务;如果设置为一个较大的值,可以更好地利用进程池的并行能力,同时也可能会增加任务的计算负载和通信开销。 例如:

from concurrent.futures import ProcessPoolExecutor
import time

def worker(num):
    """子进程要执行的任务"""
    time.sleep(num)
    return num * num

if __name__ == '__main__':
    with ProcessPoolExecutor(max_workers=4) as pool:
        results = pool.map(worker, range(10), chunksize=3)

上面代码中,我们使用 map 方法提交了 10 个任务,并设置了 chunksize 参数为 3。即进程池中每个子进程会一次性获取 3 个任务,以此来提高计算效率。

需要注意的是,参数 timeoutchunksize 都是可选参数,可以根据实际情况灵活设置,从而获得最佳的性能和效果。同时,如果同时使用这两个参数,也需要注意它们之间的相互影响关系。

shutdown(wait=True)

shutdown 方法用于关闭进程池,等待所有任务执行完成并清理进程池资源。该方法接受一个 wait 参数,默认为 True,表示在等待所有任务执行完成之前阻塞程序的执行;如果设置为 False,则立即关闭进程池并返回。

下面是一个使用 shutdown 方法关闭进程池的示例代码:

import time
from concurrent.futures import ProcessPoolExecutor

def worker(num):
    """子进程要执行的任务"""
    print(f"Worker {num} is running")
    time.sleep(num)
    return num * num

if __name__ == '__main__':
    with ProcessPoolExecutor(max_workers=4) as pool:
        # 提交 4 个任务给进程池
        futures = [pool.submit(worker, i) for i in range(4)]

        # 立即关闭进程池,并不会等待任务执行完成
        pool.shutdown(wait=False)

    # 输出所有任务的执行结果
    results = [f.result() for f in futures]
    print("Results:", results)

在上面的示例中,我们向进程池中提交了 4 个任务,并使用 shutdown 方法立即关闭了进程池,然后获取了每个任务的执行结果。

需要注意的是,由于进程池内的任务可能会以不同的速度执行,因此在使用 shutdown 方法关闭进程池时,可能需要根据实际情况设置 wait 参数来控制程序的执行逻辑。

进程池的Queue

如果要使用进程池创建进程,则需要使用 multiprocessing.Manager() 中的 Queue(),而不是 multiprocessing.Queue()

使用 multiprocessing.Queue() 创建的队列只能被同一进程内的线程访问和修改,而不能被多个进程同时访问。这是因为 multiprocessing.Queue() 底层是通过共享内存和进程锁来实现进程通信的,而这些底层机制只适用于同一进程内的线程之间。

相比之下,使用 multiprocessing.Manager() 创建的队列则可以被多个进程同时访问和修改。这是因为 multiprocessing.Manager() 底层也是通过共享内存和进程锁来实现进程通信,但是它使用了一个专门的进程来管理所有的共享对象(比如队列、字典、列表等),从而避免了同一进程内的竞争和冲突。

因此,在使用进程池创建进程时,我们通常会选择使用 multiprocessing.Manager().Queue() 来创建进程通信队列,以确保多个进程之间的顺畅通信。


from concurrent.futures import ProcessPoolExecutor
from multiprocessing import Manager


def write_data(queue):
    for i in range(5):
        data = f"数据-{i}"
        queue.put(data)
        print(f"写入数据:{data}")


def read_data(queue):
    while True:
        if not queue.empty():
            data = queue.get()
            print(f"读取数据:{data}")
        else:
            break


if __name__ == "__main__":
    print(f'开始执行')
    # 创建进程池
    with ProcessPoolExecutor(max_workers=4) as pool:

        # 创建进程通信队列
        queue = Manager().Queue()

        # 向进程池提交任务
        future1 = pool.submit(write_data, queue)
        future2 = pool.submit(read_data, queue)

        # 等待所有任务执行完毕
        future1.result()
        future2.result()

    print(f'结束执行')

# 输出
开始执行
写入数据:数据-0
写入数据:数据-1
写入数据:数据-2
写入数据:数据-3
写入数据:数据-4
读取数据:数据-0
读取数据:数据-1
读取数据:数据-2
读取数据:数据-3
读取数据:数据-4
结束执行

上述代码中,我们使用 with ProcessPoolExecutor(max_workers=4) as pool: 的语法创建了一个进程池,并使用 pool.submit() 方法向进程池中提交了两个任务。在程序结束时,由于使用了 with 语法,Python 会自动调用 shutdown 方法,等待所有任务执行完毕。

在主进程中,我们通过分别调用 future1.result()future2.result() 方法等待两个任务执行完毕,并输出了开始和结束执行的提示信息。

ProcessPoolExecutor 中,submit() 方法向进程池中提交任务,并立即返回一个 Future 对象,表示异步执行的结果。这个 Future 对象可以用来等待并获取异步任务的执行结果。因此,在这段代码中,通过调用 future1.result()future2.result() 方法实现了等待两个任务执行完毕并获取结果。

具体来说,result() 方法会阻塞当前线程,直到异步任务执行结束并返回结果。如果异步任务未能正常执行(例如因为抛出了异常),result() 方法会重新抛出相应异常。如果使用 result() 方法时没有设置超时时间,则会一直阻塞当前线程直到任务完成。

总结:multiprocessing.Queue() 和 queue.Queue与multiprocessing.Manager().Queue

  • multiprocessing.Queue()queue.Queue() 是跨进程和跨线程通信机制,而 multiprocessing.Manager().Queue()(以下简称 Manager().Queue())是 multiprocessing 模块提供的基于共享内存的跨进程通信机制。

  • 具体来说,使用 Manager().Queue() 可以实现多进程之间共享数据,并通过队列的方式进行读写操作。这是因为 Manager() 返回一个能够代理进程间通信的管理器对象,可以从中获取各种共享变量(如数组、字典、队列等)。

  • multiprocessing.Queue() 是基于共享内存的进程间通信机制,适用于多进程环境。

  • multiprocessing.Manager().Queue 是基于 multiprocessing.Manager() 提供的代理对象实现的进程间通信,也适用于多进程环境。

  • 另外,由于 Manager().Queue() 实现了跨进程通信,因此其效率可能不如 multiprocessing.Queue()queue.Queue()。同时,在使用 Manager().Queue() 时需要注意共享变量的锁定机制,以避免出现多进程竞争的问题,这对编程提出了更高的要求。

  • 需要注意的是,由于 Manager().Queue() 基于共享内存实现,因此会占用更多的内存资源,如果需要处理大量数据,建议使用其他更加高效的共享变量,如 Manager().Value()Manager().Array() 等。

  • 总之,与 multiprocessing.Queue()queue.Queue() 相比,Manager().Queue() 更加通用和灵活,可以满足多个进程之间共享数据的需求,但其使用和管理也更加复杂。