python 的队列简述

在 Python 的队列模块中,一般常用的队列类有 QueueLifoQueuePriorityQueue。除了这些队列类,队列模块还提供了一些常用的方法,下面将会简单介绍它们的作用以及举例说明。

Queue

Queue 是一个线程安全的先进先出队列。下面是 Queue 常用的方法:

  • Queue.put(item[, block[, timeout]]): 向队列中添加元素。

    • 参数 item 表示要添加到队列的元素;

    • 参数 block(可选)表示操作阻塞时的行为,默认为 True。若设置为 False,则当队列满时会抛出异常而不是阻塞线程;

    • 参数 timeout(可选)表示阻塞超时时间,默认为 None 表示一直阻塞直到操作成功或发生异常。若设置了该参数,但是操作超时时也会抛出异常。

  • Queue.get([block[, timeout]]): 从队列中获取元素,并将其从队列中移除。

    • 参数 block(可选)表示操作阻塞时的行为,默认为 True。若设置为 False,则当队列为空时会抛出异常而不是阻塞线程;

    • 参数 timeout(可选)表示阻塞超时时间,默认为 None 表示一直阻塞直到操作成功或发生异常。若设置了该参数,但是操作超时时也会抛出异常。

  • Queue.qsize(): 返回队列中当前元素的个数,该值并不一定准确,因为多线程环境下其可能正在进行操作。

  • Queue.empty(): 判断队列是否为空。

  • Queue.full(): 判断队列是否已满。

  • Queue.task_done(): 通知队列一个任务已经被处理完毕。

  • Queue.join(): 阻塞当前线程,直到队列中所有的元素都被处理完毕。

  • put_nowait(item)queue.Queue 类的一个方法,它用于将元素添加到队列中,但是如果队列已满,则会立即引发 queue.Full 异常。

    在这个方法的实现中,它简单地调用了 put(item, block=False) 方法,并将 block 参数设置为 False。因此,如果队列已满,则 put() 方法会立即返回一个 queue.Full 异常,而 put_nowait() 方法也会立即将该异常抛出。

  • Queue 类中的 get_nowait() 方法与 get() 方法类似,但是它不会阻塞当前线程,如果队列为空,则立即抛出 queue.Empty 异常。因此,我们通常需要在代码中使用异常处理机制来处理此异常。

下面是 Queue 的举例代码:

from queue import Queue

# 创建一个最大容量为 3 的队列
q = Queue(maxsize=3)

# 向队列中添加元素
q.put(1)
q.put(2)
q.put(3)

# 获取队列中的元素
print(q.get())  # 输出:1
print(q.get())  # 输出:2

# 判断队列是否为空和是否已满
print(q.empty())  # 输出:False
print(q.full())   # 输出:False

# 等待队列中所有元素都被处理完毕
q.join()

# 输出
1
2
False
False

join() 方法阻塞当前线程,直到队列中所有的元素都被处理完毕。在使用 Queue 时,有时需要等待队列中所有元素都被消费完毕后再进行下一步操作,这时就可以使用 join() 方法。

在上述代码中,如果没有 q.join() 方法的调用,主线程会直接结束,导致无法保证队列中的所有元素都被处理完毕。而通过调用 q.join() 方法,可以阻塞主线程,直到队列中的所有元素都被处理完毕为止。

[block[, timeout]] 案例

Queue 类中的 get() 方法用于从队列中获取元素,并将其移除。该方法有两个可选参数:blocktimeout,它们分别表示是否阻塞和阻塞的超时时间。

block 参数为 True 时,如果队列中有元素,则立即获取并返回该元素;否则,该方法将阻塞当前线程,直到队列中有元素可以获取或者超时时间到达(如果指定了 timeout 参数)。

block 参数为 False 时,如果队列中有元素,则立即获取并返回该元素;否则,该方法将立即返回一个 queue.Empty 异常。这种情况下,我们通常需要在代码中使用异常处理机制来处理此异常。

下面是一个示例:

import queue
import time

if __name__ == '__main__':
    q = queue.Queue()

    # 向队列中添加元素
    for i in range(5):
        q.put(i)

    # 从队列中获取元素
    for i in range(5):
        try:
            data = q.get(block=True, timeout=1)
            print(f'Got data: {data}')
        except queue.Empty:
            print('Queue is empty.')
        
        time.sleep(0.5)

在这个示例中,我们首先向队列 q 中添加了 5 个元素。接着,我们从队列中获取元素并打印出来。由于使用了 block=True, timeout=1 参数,因此该方法会等待 1 秒钟来获取队列中的元素,如果超时则会抛出 queue.Empty 异常。在每次获取元素后,我们使用 time.sleep(0.5) 来模拟一个耗时的操作。

运行这个示例,可以看到以下输出:

Got data: 0
Got data: 1
Got data: 2
Got data: 3
Got data: 4

由于队列中有 5 个元素,因此我们成功地从队列中获取了所有元素,并打印出它们的值。

for i in range(6):

Got data: 0
Got data: 1
Got data: 2
Got data: 3
Got data: 4
Queue is empty.

join()案例:

假设现在有一个需求:在一个多线程程序中,需要读取多个文件,并将文件中的数据写入到一个大文件中。每个文件需要开启一个线程来读取数据,并将读取到的数据写入到队列中。而主线程则负责从队列中获取数据,并将其写入到大文件中。

为了保证数据写入的正确性,我们需要等待所有的文件都被读取并且队列中的所有数据都被处理完毕后再进行下一步操作。这时,就可以使用 join() 方法来实现。

import threading
from queue import Queue


def read_file(file_name, queue):
    with open(file_name) as f:
        for line in f:
            queue.put(line)


def write_file(queue, out_file):
    with open(out_file, 'w') as f:
        while True:
            data = queue.get()
            if data is None:
                queue.task_done()
                break
            f.write(data)
            queue.task_done()


if __name__ == '__main__':
    # 创建一个队列
    q = Queue()

    # 创建多个读取文件的线程
    for i in range(3):
        t = threading.Thread(target=read_file, args=(f'file_{i}.txt', q))
        t.start()

    # 创建一个写入文件的线程
    t = threading.Thread(target=write_file, args=(q, 'out.txt'))
    t.start()

    # 等待队列中所有元素都被处理完毕
    q.join()

    # 将一个 None 元素添加到队列中,用于通知写入文件的线程退出
    q.put(None)

    # 等待写入文件的线程退出
    t.join()

queue.task_done() 方法

import queue
import time
import threading

def worker(q):
    while True:
        item = q.get()
        print(f'Processing: {item}')
        time.sleep(1)
        q.task_done()

if __name__ == '__main__':
    q = queue.Queue()

    # 向队列中添加元素
    for i in range(5):
        q.put(i)

    # 使用两个线程处理队列中的任务
    for i in range(2):
        t = threading.Thread(target=worker, args=(q,))
        t.daemon = True
        t.start()

    # 等待所有任务执行完成
    q.join()
    print('All tasks done.')

# 输出
Processing: 0
Processing: 1
Processing: 2
Processing: 3
Processing: 4
All tasks done.

queue.task_done()Queue 类的一个方法,它用于通知队列已完成指定任务。

在使用 Queue 类时,我们可以使用 put() 方法将任务添加到队列中,并使用 get() 方法从队列中获取任务。在处理任务时,如果我们希望在任务执行完成后得到通知,那么可以使用 queue.task_done() 方法来实现这一功能。

LifoQueue 类

LifoQueue 是一个线程安全的后进先出队列,在实现方式上与 Queue 的实现方式对称。它具有和 Queue 完全一样的方法。因此,使用 LifoQueue 时,只需要替换 Queue 类即可,不需要修改任何代码。下面是 LifoQueue 的举例代码:

from queue import LifoQueue

# 创建一个最大容量为 3 的后进先出队列
q = LifoQueue(maxsize=3)

# 向队列中添加元素
q.put(1)
q.put(2)
q.put(3)

# 获取队列中的元素
print(q.get())  # 输出:3
print(q.get())  # 输出:2

# 判断队列是否为空和是否已满
print(q.empty())  # 输出:False
print(q.full())   # 输出:False

# 等待队列中所有元素都被处理完毕
q.join()

PriorityQueue 类

PriorityQueue 是一个线程安全的具有优先级的队列,其中元素会按照优先级从低到高排序。下面是 PriorityQueue 常用方法:

  • PriorityQueue.put(item[, block[, timeout]]): 向队列中添加元素。

  • PriorityQueue.get([block[, timeout]]): 从队列中获取元素,并将其从队列中移除。

  • PriorityQueue.qsize(): 返回队列中当前元素的个数,该值并不一定准确,因为多线程环境下其可能正在进行操作。

  • PriorityQueue.empty(): 判断队列是否为空。

  • PriorityQueue.full(): 判断队列是否已满。

  • PriorityQueue.task_done(): 通知队列一个任务已经被处理完毕。

  • PriorityQueue.join(): 阻塞当前线程,直到队列中所有的元素都被处理完毕。

下面是 PriorityQueue 的举例代码:

from queue import PriorityQueue

# 创建一个具有优先级的队列
q = PriorityQueue()

# 向队列中添加元素,元素为 (优先级, 元素) 的形式
q.put((2, 'a'))
q.put((1, 'b'))
q.put((3, 'c'))

# 获取队列中的元素,注意元素是按照优先级从低到高排序的
print(q.get())  # 输出:(1, 'b')
print(q.get())  # 输出:(2, 'a')

# 判断队列是否为空和是否已满
print(q.empty())  # 输出:False
print(q.full())   # 输出:False

# 等待队列中所有元素都被处理完毕
q.join()