概念

线程是处理器调度和分配的基本单位,进程则作为资源拥有的基本单位。每个进程是由私有的虚拟地址空间、代码、数据和其它各种系统资源组成。线程是进程内部的一个执行单元。每一个进程至少有一个主执行线程,它无需由用户去主动创建,是由系统自动创建的。 用户根据需要在应用程序中创建其它线程,多个线程并发地运行于同一个进程中。

创建线程的方式-threading

方法1

在实例化一个线程对象时,将要执行的任务函数以参数的形式传入threading

# -*- coding: utf-8 -*-
import time
import threading
import datetime


def printNumber(n: int) -> None:
    while True:
        times = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
        print(f'{times}-{n}')
        time.sleep(n)


for i in range(1, 3):
    t = threading.Thread(target=printNumber, args=(i,))
    t.start()

# 输出
2022-12-16 11:04:40-1
2022-12-16 11:04:40-2
2022-12-16 11:04:41-1
2022-12-16 11:04:42-2
2022-12-16 11:04:42-1
2022-12-16 11:04:43-1
2022-12-16 11:04:44-2
2022-12-16 11:04:44-1
2022-12-16 11:04:45-1
2022-12-16 11:04:46-2
2022-12-16 11:04:46-1
2022-12-16 11:04:47-1
....
Process finished with exit code -1

创建两个线程,一个线程每隔一秒打印一个“1”,另一个线程每隔2秒打印一个“2”
Thread 类提供了如下的 init() 构造器,可以用来创建线程:

__init__(self, group=None, target=None, name=None, args=(), kwargs=None, *,daemon=None)

此构造方法中,以上所有参数都是可选参数,即可以使用,也可以忽略。其中各个参数的含义如下:

  • group:如果线程被放置在 ThreadGroup 中,则为该组。现在已弃用,并在 Python 3.9 中将删除。

  • target:要调用的可调用对象(函数或方法)。

  • name:线程名称,通常用于调试、日志记录和异常处理。

  • args:以元组形式传递给 target 的位置参数。

  • kwargs:以字典形式传递给 target 的关键字参数。

  • daemon:线程是否是一个守护程序线程。默认值是 None, 表示继承主线程的 daemon 值。开启则设置daemon=True。

这些参数,初学者只需记住 target、args、kwargs 这 3 个参数的功能即可。
但是线程需要手动启动才能运行,threading 模块提供了 start() 方法用来启动线程。因此在上面程序的基础上,添加如下语句:
t.start()

方法2

通过继承 Thread 类,我们可以自定义一个线程类,从而实例化该类对象,获得子线程。

需要注意的是,在创建 Thread 类的子类时,必须重写从父类继承得到的 run() 方法。因为该方法即为要创建的子线程执行的方法,其功能如同第一种创建方法中的 printNumber() 自定义函数。

传参数则需要重写 init 方法(注意重写之后要调用父类的init方法)

# -*- coding: utf-8 -*-
import datetime
import time
import threading


class MyThread(threading.Thread):

    def __init__(self, n):
        self.n = n
        # 注意:一定要调用父类的初始化函数
        super().__init__()

    def run(self) -> None:
        while True:
            times = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
            print(f'{times}-{self.n}')
            time.sleep(self.n)


for i in range(1, 3):
    t = MyThread(i)
    t.start()

# 输出
2022-12-16 12:43:24-1
2022-12-16 12:43:24-2
2022-12-16 12:43:25-1
2022-12-16 12:43:26-2
2022-12-16 12:43:26-1
2022-12-16 12:43:27-1
2022-12-16 12:43:28-2
...

主线程和子线程

# -*- coding: utf-8 -*-
import datetime
import time
import threading


class MyThread(threading.Thread):

    def __init__(self, n):
        self.n = n
        # 注意:一定要调用父类的初始化函数,否则无法创建线程
        super().__init__()

    def run(self) -> None:
        while True:
            _count = threading.active_count()
            threading_name = threading.current_thread().getName()
            times = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
            print(f'{times}-{self.n}-"当前活跃的线程个数:{_count}"-"当前线程的名称是":{threading_name}')
            time.sleep(self.n)


for i in range(1, 3):
    t = MyThread(i)
    t.start()
    print(threading.current_thread().getName())

# 输出
2022-12-16 13:18:00-1-"当前活跃的线程个数:2"-"当前线程的名称是":Thread-1
MainThread
2022-12-16 13:18:00-2-"当前活跃的线程个数:3"-"当前线程的名称是":Thread-2
MainThread

2022-12-16 13:18:01-1-"当前活跃的线程个数:3"-"当前线程的名称是":Thread-1
2022-12-16 13:18:02-2-"当前活跃的线程个数:3"-"当前线程的名称是":Thread-2
2022-12-16 13:18:02-1-"当前活跃的线程个数:3"-"当前线程的名称是":Thread-1
2022-12-16 13:18:03-1-"当前活跃的线程个数:3"-"当前线程的名称是":Thread-1
2022-12-16 13:18:04-2-"当前活跃的线程个数:3"-"当前线程的名称是":Thread-2
2022-12-16 13:18:04-1-"当前活跃的线程个数:3"-"当前线程的名称是":Thread-1
...

注意: 第一次t.start()后,当前存在两个线程(主线程+子线程),第二次t.start()的时候又创建了一个子线程所以当前存在三个线程

如果程序中不显式创建任何线程,则所有程序的执行,都将由主线程 MainThread 完成,程序就只能按照顺序依次执行。

此程序中,子线程 Thread-1和Thread-2 执行的是 run() 方法中的代码,而 MainThread 执行的是主程序中的代码,它们以快速轮换 CPU 的方式在执行。

守护线程(Daemon Thread)

守护线程(Daemon Thread)也叫后台进程,它的目的是为其他线程提供服务。如果其他线程被杀死了,那么守护线程也就没有了存在的必要。因此守护线程会随着非守护线程的消亡而消亡。Thread类中,子线程被创建时默认是非守护线程,我们可以通过setDaemon(True)将一个子线程设置为守护线程。

# -*- coding: utf-8 -*-
import datetime
import time
import threading


class MyThread(threading.Thread):

    def __init__(self, n):
        self.n = n
        # 注意:一定要调用父类的初始化函数,否则无法创建线程
        super().__init__()

    def run(self) -> None:
        while True:
            _count = threading.active_count()
            threading_name = threading.current_thread().getName()
            times = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
            print(f'{times}-{self.n}-"当前活跃的线程个数:{_count}"-"当前线程的名称是":{threading_name}')
            time.sleep(self.n)


for i in range(1, 3):
    t = MyThread(i)
    t.setDaemon(True)
    t.start()

    print(threading.current_thread().getName())
    
    
# 输出
2022-12-16 13:27:46-1-"当前活跃的线程个数:2"-"当前线程的名称是":Thread-1
MainThread
2022-12-16 13:27:46-2-"当前活跃的线程个数:3"-"当前线程的名称是":Thread-2
MainThread

将两个子线程改写为守护线程,因为当主程序中的代码执行完后,主线程就可以结束了,这时候被设定为守护线程的两个子线程会被杀死,然后主线程结束。

在主程序中,我们使用循环创建了两个 MyThread 的对象,并调用它们的 start() 方法启动线程。我们还使用 t.setDaemon(True) 将这两个线程设置为后台线程(即 Daemon 线程),使得在主线程结束时它们能够自动退出。

需要注意的是,Daemon 线程通常用于支持其他线程或程序的运行,例如垃圾回收线程。它们一般不需要进行清理工作,因此可以在主程序结束时自动退出,而不必等待其他线程结束。

注意,当前台线程死亡后,Python 解释器会通知后台线程死亡,但是从它接收指令到做出响应需要一定的时间。如果要将某个线程设置为后台线程,则必须在该线程启动之前进行设置。也就是说,将 daemon 属性设为 True,必须在 start() 方法调用之前进行,否则会引发 RuntimeError 异常。

若将两个子线程的其中一个设置为守护线程,另一个设置为非守护线程

# -*- coding: utf-8 -*-
import datetime
import time
import threading


class MyThread(threading.Thread):

    def __init__(self, n):
        self.n = n
        # 注意:一定要调用父类的初始化函数,否则无法创建线程
        super().__init__()

    def run(self) -> None:
        while True:
            _count = threading.active_count()
            threading_name = threading.current_thread().getName()
            times = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
            print(f'{times}-{self.n}-"当前活跃的线程个数:{_count}"-"当前线程的名称是":{threading_name}')
            time.sleep(self.n)


for i in range(1, 3):
    t = MyThread(i)
    if i == 1:
        t.setDaemon(True)
    t.start()

    print(threading.current_thread().getName())
 
# 输出
2022-12-16 13:30:17-1-"当前活跃的线程个数:2"-"当前线程的名称是":Thread-1
MainThread

2022-12-16 13:30:17-2-"当前活跃的线程个数:3"-"当前线程的名称是":Thread-2
MainThread

2022-12-16 13:30:18-1-"当前活跃的线程个数:3"-"当前线程的名称是":Thread-1
2022-12-16 13:30:19-1-"当前活跃的线程个数:3"-"当前线程的名称是":Thread-1
2022-12-16 13:30:19-2-"当前活跃的线程个数:3"-"当前线程的名称是":Thread-2
2022-12-16 13:30:20-1-"当前活跃的线程个数:3"-"当前线程的名称是":Thread-1
...

此时非守护线程作为前台程序还在继续执行,守护线程就还有“守护”的意义,就会继续执行。

join()方法

不使用join方法:

当设置多个线程时,在一般情况下(无守护线程,setDeamon=False),多个线程同时启动,主线程执行完,会等待其他子线程执行完,程序才会退出。

# -*- coding: utf-8 -*-
import datetime
import time
import threading


class MyThread(threading.Thread):

    def __init__(self, n):
        self.n = n
        # 注意:一定要调用父类的初始化函数,否则无法创建线程
        super().__init__()

    def run(self) -> None:
        _count = threading.active_count()
        threading_name = threading.current_thread().getName()
        times = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
        time.sleep(1)
        print(f'{times}-{self.n}-"当前活跃的线程个数:{_count}"-"当前线程的名称是":{threading_name}')


start_time = time.time()
print(f'{start_time},这是主线程:', threading.current_thread().name)
for i in range(5):
    t = MyThread(i)
    # t.setDaemon(True)
    t.start()
    # t.join()
end_time = time.time()
print(f'{end_time},主线程结束了!', threading.current_thread().name)
print('一共用时:', end_time - start_time)

# 输出
1671174404.6552384,这是主线程: MainThread
1671174404.656239,主线程结束了! MainThread
一共用时: 0.0010006427764892578
2022-12-16 15:06:44-0-"当前活跃的线程个数:2"-"当前线程的名称是":Thread-1
2022-12-16 15:06:44-1-"当前活跃的线程个数:3"-"当前线程的名称是":Thread-2
2022-12-16 15:06:44-2-"当前活跃的线程个数:4"-"当前线程的名称是":Thread-3
2022-12-16 15:06:44-3-"当前活跃的线程个数:5"-"当前线程的名称是":Thread-4
2022-12-16 15:06:44-4-"当前活跃的线程个数:6"-"当前线程的名称是":Thread-5

我们的计时是对主线程计时,主线程结束,计时随之结束,打印出主线程的用时。
主线程的任务完成之后,主线程随之结束,子线程继续执行自己的任务,直到全部的子线程的任务全部结束,程序结束。

使用join()方法:

当调用一个线程的 join() 方法时,当前线程(通常是主线程)会阻塞等待该线程执行完毕。如果该线程已经执行完毕,或者已经被取消或者出现了异常等情况,则 join() 方法会立即返回;否则,它会一直等待直到该线程执行完毕。

使用 join() 方法可以确保线程顺序执行,避免出现竞争和死锁等问题。例如,在主线程中创建了多个子线程并启动后,可以使用 join() 方法等待它们全部执行完毕,然后再继续执行主线程的下一步任务。

# -*- coding: utf-8 -*-
import datetime
import time
import threading


class MyThread(threading.Thread):

    def __init__(self, n):
        self.n = n
        # 注意:一定要调用父类的初始化函数,否则无法创建线程
        super().__init__()

    def run(self) -> None:
        _count = threading.active_count()
        threading_name = threading.current_thread().getName()
        times = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
        time.sleep(1)
        print(f'{times}-{self.n}-"当前活跃的线程个数:{_count}"-"当前线程的名称是":{threading_name}')


start_time = time.time()
print(f'{start_time},这是主线程:', threading.current_thread().name)
for i in range(5):
    t = MyThread(i)
    # t.setDaemon(True)
    t.start()
    t.join()
end_time = time.time()
print(f'{end_time},主线程结束了!', threading.current_thread().name)
print('一共用时:', end_time - start_time)


# 输出
1671174502.0245655,这是主线程: MainThread
2022-12-16 15:08:22-0-"当前活跃的线程个数:2"-"当前线程的名称是":Thread-1
2022-12-16 15:08:23-1-"当前活跃的线程个数:2"-"当前线程的名称是":Thread-2
2022-12-16 15:08:24-2-"当前活跃的线程个数:2"-"当前线程的名称是":Thread-3
2022-12-16 15:08:25-3-"当前活跃的线程个数:2"-"当前线程的名称是":Thread-4
2022-12-16 15:08:26-4-"当前活跃的线程个数:2"-"当前线程的名称是":Thread-5
1671174507.0313594,主线程结束了! MainThread
一共用时: 5.006793975830078

Process finished with exit code 0

join()方法的timeout参数

join的语法结构为join(timeout=None),可以看到join()方法有一个timeout参数,其默认值为None,而参数timeout可以进行赋值,其含义是指定等待被join的线程的时间最长为timeout秒,也就是说当在timeout秒内被join的线程还没有执行结束的话,就不再进行等待了。

# -*- coding: utf-8 -*-
import datetime
import time
import threading


class MyThread(threading.Thread):

    def __init__(self, n):
        self.n = n
        # 注意:一定要调用父类的初始化函数,否则无法创建线程
        super().__init__()

    def run(self) -> None:
        _count = threading.active_count()
        threading_name = threading.current_thread().getName()
        times = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
        time.sleep(5)
        print(f'{times}-{self.n}-"当前活跃的线程个数:{_count}"-"当前线程的名称是":{threading_name}')


start_time = time.time()
print(f'{start_time},这是主线程:', threading.current_thread().name)
for i in range(5):
    t = MyThread(i)
    # t.setDaemon(True)
    t.start()
    t.join(2)
end_time = time.time()
print(f'{end_time},主线程结束了!', threading.current_thread().name)
print('一共用时:', end_time - start_time)

# 输出
1671175114.663927,这是主线程: MainThread
2022-12-16 15:18:34-0-"当前活跃的线程个数:2"-"当前线程的名称是":Thread-1
2022-12-16 15:18:36-1-"当前活跃的线程个数:3"-"当前线程的名称是":Thread-2
2022-12-16 15:18:38-2-"当前活跃的线程个数:4"-"当前线程的名称是":Thread-3
1671175124.6681008,主线程结束了! MainThread
一共用时: 10.004173755645752
2022-12-16 15:18:40-3-"当前活跃的线程个数:4"-"当前线程的名称是":Thread-4
2022-12-16 15:18:42-4-"当前活跃的线程个数:4"-"当前线程的名称是":Thread-5

Process finished with exit code 0

线程锁

线程之间是共用同一块内存的,那么线程可以共享全局变量,如果多个线程同时对同一个全局变量操作,会出现资源竞争问题,从而数据结果会不正确

# -*- coding: utf-8 -*-
import datetime
import threading
import time

number = 0


def add():
    global number  # global声明此处的number是外面的全局变量number
    for _ in range(10000000):  # 进行一个大数级别的循环加一运算
        number += 1
    times = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    print(f'{times}-"当前活跃的线程个数:{threading.active_count()}"')
    print("子线程%s运算结束后,number = %s" % (threading.current_thread().getName(), number))
    print('------------------------------')


for i in range(2):  # 用2个子线程,就可以观察到脏数据
    t = threading.Thread(target=add)
    t.start()

time.sleep(2)  # 等待2秒,确保2个子线程都已经结束运算。

print("主线程执行完毕后,number = ", number)


# 输出
2022-12-20 13:13:05-"当前活跃的线程个数:3"
子线程Thread-1运算结束后,number = 11966305
------------------------------
2022-12-20 13:13:05-"当前活跃的线程个数:2"
子线程Thread-2运算结束后,number = 12272268
------------------------------
主线程执行完毕后,number =  12272268

这里创建两个子线程操作同一个全局变量number,number被初始化为0,两个子线程通过for循环对这个number进行+1,每个子线程循环10000000次,两个子线程同时进行。如果一切正常的话,最终这个number会变成20000000,然而现实并非如此。

可以很明显地看出脏数据的情况。这是因为两个线程在运行过程中,CPU随机调度,你算一会我算一会,在没有对number进行保护的情况下,就发生了数据错误
注意此时两个线程是同时开启的。

若是使用了join()的方法

# -*- coding: utf-8 -*-
# ...
for i in range(2):  # 用2个子线程,就可以观察到脏数据
    t = threading.Thread(target=add)
    t.start()
    t.join()  # 添加这一行就让两个子线程变成了顺序执行

time.sleep(2)  # 等待2秒,确保2个子线程都已经结束运算。

print("主线程执行完毕后,number = ", number)

# 输出
2022-12-20 13:16:02-"当前活跃的线程个数:2"
子线程Thread-1运算结束后,number = 10000000
------------------------------
2022-12-20 13:16:03-"当前活跃的线程个数:2"
子线程Thread-2运算结束后,number = 20000000
------------------------------
主线程执行完毕后,number =  20000000

虽然结果是对的,但是这样的本质是把多线程变成了单线程,失去了多线程的意义。

互斥锁Lock

互斥锁是一种独占锁,同一时刻只有一个线程可以访问共享的数据。使用很简单,初始化锁对象,然后将锁当做参数传递给任务函数,在任务中加锁,使用后释放锁。

  1. 导入 threading 模块:在 Python 中使用 Lock 需要导入 threading 模块。

  2. 创建 Lock 对象:使用 threading.Lock() 方法创建一个 Lock 对象。

  3. 获取锁:在需要访问共享资源的代码块前调用 Lock 对象的 acquire() 方法,以获取锁。如果当前锁没有被其他线程占用,则该方法会立即返回,并将锁状态设置为“已被占用”,其他线程不能获取该锁。

  4. 访问共享资源:在获取到锁后,可以访问共享资源。

  5. 释放锁:当访问共享资源完成后,需要调用 Lock 对象的 release() 方法来释放锁,以便其他线程可以获取该锁。

# -*- coding: utf-8 -*-
import datetime
import threading
import time

number = 0
lock = threading.Lock()


def add(lk):
    global number  # global声明此处的number是外面的全局变量number
    lk.acquire()   # 开始加锁
    for _ in range(10000000):  # 进行一个大数级别的循环加一运算
        number += 1
    times = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    print(f'{times}-"当前活跃的线程个数:{threading.active_count()}"')
    print("子线程%s运算结束后,number = %s" % (threading.current_thread().getName(), number))
    print('------------------------------')
    lk.release()  # 释放锁,让别的线程也可以访问number


for i in range(2):
    t = threading.Thread(target=add, args=(lock,))
    t.start()

time.sleep(2)  # 等待2秒,确保2个子线程都已经结束运算。

print("主线程执行完毕后,number = ", number)


# 输出
2022-12-20 13:34:52-"当前活跃的线程个数:3"
子线程Thread-1运算结束后,number = 10000000
------------------------------
2022-12-20 13:34:53-"当前活跃的线程个数:2"
子线程Thread-2运算结束后,number = 20000000
------------------------------
主线程执行完毕后,number =  20000000

以上代码我重新使用重新run方法的形式写一遍

import datetime
import threading
import time

class AddThread(threading.Thread):
    def __init__(self, lock):
        super().__init__()
        self.lock = lock
        self.number = 0

    def run(self):
        self.lock.acquire()
        for _ in range(10000000):
            self.number += 1
        times = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
        print(f'{times}-"当前活跃的线程个数:{threading.active_count()}"')
        print("子线程%s运算结束后,number = %s" % (self.getName(), self.number))
        print('------------------------------')
        self.lock.release()


lock = threading.Lock()
threads = []
for i in range(2):
    t = AddThread(lock)
    t.start()
    threads.append(t)

for t in threads:
    t.join()

print("主线程执行完毕后,number = ", threads[0].number + threads[1].number)

# 输出
2023-05-21 10:39:28-"当前活跃的线程个数:3"
子线程Thread-1运算结束后,number = 10000000
------------------------------
2023-05-21 10:39:29-"当前活跃的线程个数:2"
子线程Thread-2运算结束后,number = 10000000
------------------------------
主线程执行完毕后,number =  20000000

Process finished with exit code 0

这里之所以使用单独的 for 循环来执行 t.join(),是因为 join() 方法会阻塞当前线程,等待被调用线程执行结束后才会继续执行当前线程,即如果使用 for 循环遍历执行 t.join(),那么第一个线程执行 join() 方法时会阻塞主线程,直到该线程执行结束后才会执行下一个线程的 join() 方法,如此往复,直到所有线程执行结束。

如果将 t.join() 写在 threads 创建的循环中,那么每个线程都会依次执行完毕后才会执行下一个线程的 join() 方法,这样的话就没有利用多线程的优势了。因此,需要单独使用 for 循环来执行 t.join() 方法,确保多个线程能够并发执行,以达到更好的性能。

RLock可重入锁

用于防止访问共享资源时出现不必要的阻塞。如果共享资源在RLock中,那么可以安全地再次调用它。 RLocked资源可以被不同的线程重复访问,即使它在被不同的线程调用时仍然可以正常工作。

在同一个线程中,RLock.acquire()可以被多次调用,利用该特性,可以解决部分死锁问题。

# -*- coding: utf-8 -*-
import threading

number = 0
# lock = threading.RLock()
lock = threading.Lock()


def add(lk):
    global number  # global声明此处的number是外面的全局变量number
    lk.acquire()
    number += 1
    lk.acquire()
    number += 2
    print("子线程%s运算结束后,number = %s" % (threading.current_thread().getName(), number))
    lk.release()
    lk.release()


for i in range(2):
    t = threading.Thread(target=add, args=(lock,))
    t.start()

在上面的程序中,两个线程同时尝试访问共享资源number,这里当一个线程当前正在访问共享资源number时,另一个线程将被阻止访问它。 当两个或多个线程试图访问相同的资源时,有效地阻止了彼此访问该资源,这就是所谓的死锁,因此上述程序没有生成任何输出。

# -*- coding: utf-8 -*-
import threading

number = 0
lock = threading.RLock()
# lock = threading.Lock()


def add(lk):
    global number  # global声明此处的number是外面的全局变量number
    lk.acquire()
    number += 1
    lk.acquire()
    number += 2
    print("子线程%s运算结束后,number = %s" % (threading.current_thread().getName(), number))
    lk.release()
    lk.release()


for i in range(2):
    t = threading.Thread(target=add, args=(lock,))
    t.start()

# 输出
子线程Thread-1运算结束后,number = 3
子线程Thread-2运算结束后,number = 6

这两种锁的主要区别是:RLock允许在同一线程中被多次acquire。而Lock却不允许这种情况。注意:如果使用RLock,那么acquire和release必须成对出现,即调用了n次acquire,必须调用n次的release才能真正释放所占用的锁

RLockLock 都是 Python 中的线程锁,它们的主要区别在于是否支持重入。

Lock 是普通的线程锁,它只能被同一个线程获取一次。如果一个线程已经获取了这个锁,那么再次尝试获取锁时就会被阻塞。只有先释放锁后,其他线程才能获取到这个锁。

RLock 是可重入的线程锁,也称为递归锁或者重入锁。和普通锁不同的是,同一个线程可以多次获取这个锁。当一个线程多次获取该锁时,在释放锁之前,需要对锁进行相应数量的释放操作才能真正释放锁。这种机制可以避免在使用锁的情况下出现死锁。

总体来说,RLock 具备 Lock 的所有特性,同时还支持同一个线程的重入访问。但是,由于 RLock 需要维护一个计数器来记录锁的嵌套层数,因此,它可能会比 Lock 消耗更多的系统资源,在某些情况下甚至会影响程序的性能。因此,通常情况下,我们只有在确实需要支持嵌套访问的情况下才会使用 RLock

需要注意的是,在使用锁的时候一定要遵循规范,以避免出现死锁等问题。在使用 RLock 的时候,尤其需要注意锁的嵌套层数,避免出现不必要的复杂度。

Semaphore信号

# -*- coding: utf-8 -*-
import datetime
import threading
import time

semaphore = threading.BoundedSemaphore(2)


def add(n):
    semaphore.acquire()
    times = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    _count = threading.active_count()
    print(f'{times}', f"线程-{n}", f"当前活跃的子线程个数:{_count}")
    time.sleep(3)
    semaphore.release()


for i in range(1, 10):
    t = threading.Thread(target=add, args=(i,))
    t.start()

# 输出
2022-12-20 14:39:44 线程-1 当前活跃的子线程个数:2
2022-12-20 14:39:44 线程-2 当前活跃的子线程个数:3
2022-12-20 14:39:47 线程-3 当前活跃的子线程个数:8
2022-12-20 14:39:47 线程-4 当前活跃的子线程个数:8
2022-12-20 14:39:50 线程-5 当前活跃的子线程个数:6
2022-12-20 14:39:50 线程-6 当前活跃的子线程个数:6
2022-12-20 14:39:53 线程-7 当前活跃的子线程个数:4
2022-12-20 14:39:53 线程-8 当前活跃的子线程个数:4
2022-12-20 14:39:56 线程-9 当前活跃的子线程个数:2

可以看出用Semaphore来控制后,使得同一个时刻只有两个线程在请求页面
虽然当前活跃的子线程个数很多,但真正运行的子线程个数只有两个。

事件Event

Event类会在全局定义一个Flag,当Flag=False时,调用wait()方法会阻塞所有线程;而当Flag=True时,调用wait()方法不再阻塞。形象的比喻就是“红绿灯”:在红灯时阻塞所有线程,而在在绿灯的时候,一次性放行所有排队中的线程。Event类有四个方法:

  • set():将Flag设置为True

  • wait():等待

  • clear():将Flag设置为False

  • is_set():返回bool值,判断Flag是否为True

# -*- coding: utf-8 -*-

import threading
import time
import datetime


class Boss(threading.Thread):
    def run(self):
        print("BOSS:伙计们今晚上加班到22:00")
        event.set()
        time.sleep(5)  # 模拟一个小时这段时间
        print("BOSS:22:00了可以下班了")
        event.set()


class Worker(threading.Thread):
    def run(self):
        print(f'boss发话了吗:{event.is_set()}')
        event.wait()  # 等待event为真 此列是等待老板发话
        times = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
        print(f'{times}---woker:命苦啊')
        time.sleep(1)  # 模拟工作中
        event.clear()  # 清除Event对象内部的信号标志,即将其设为假,此处等待领导发话
        event.wait()  # Event对象wait的方法只有在内部信号为真的时候才会很快的执行并完成返回。
        times = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
        print(f'{times}--Woker:OhYeah')


if __name__ == "__main__":
    event = threading.Event()
    threads = []
    for i in range(5):
        threads.append(Worker())
    threads.append(Boss())
    for t in threads:
        t.start()
    for t in threads:
        t.join()
    print("公司下班了")

# 输出
boss发话了吗:False
boss发话了吗:False
boss发话了吗:False
boss发话了吗:False
boss发话了吗:False
BOSS:伙计们今晚上加班到22:00
2022-12-21 14:12:52---woker:命苦啊
2022-12-21 14:12:52---woker:命苦啊
2022-12-21 14:12:52---woker:命苦啊
2022-12-21 14:12:52---woker:命苦啊
2022-12-21 14:12:52---woker:命苦啊
BOSS:22:00了可以下班了
2022-12-21 14:12:57--Woker:OhYeah
2022-12-21 14:12:57--Woker:OhYeah
2022-12-21 14:12:57--Woker:OhYeah
2022-12-21 14:12:57--Woker:OhYeah
2022-12-21 14:12:57--Woker:OhYeah
公司下班了

Event的一个好处是:可以实现线程间通信,通过一个线程去控制另一个线程。

condition条件变量

Condition称作条件锁,依然是通过acquire()/release()加锁解锁。

wait([timeout])方法将使线程进入Condition的等待池等待通知,并释放锁。使用前线程必须已获得锁定,否则将抛出异常。

notify()方法将从等待池挑选一个线程并通知,收到通知的线程将自动调用acquire()尝试获得锁定(进入锁定池),其他线程仍然在等待池中。调用这个方法不会释放锁定。使用前线程必须已获得锁定,否则将抛出异常。

notifyAll()方法将通知等待池中所有的线程,这些线程都将进入锁定池尝试获得锁定。调用这个方法不会释放锁定。使用前线程必须已获得锁定,否则将抛出异常。

Python提供的Condition对象提供了对复杂线程同步问题的支持。Condition被称为条件变量,除了提供与Lock类似的 acquire和release方法外,还提供了wait和notify方法。线程首先acquire一个条件变量,然后判断一些条件。如果条件不满足则 wait;如果条件满足,进行一些处理改变条件后,通过notify方法通知其他线程,其他处于wait状态的线程接到通知后会重新判断条件。不断的重复 这一过程,从而解决复杂的同步问题。

可以认为Condition对象维护了一个锁(Lock/RLock)和一个waiting池。线程通过acquire获得Condition对 象,当调用wait方法时,线程会释放Condition内部的锁并进入blocked状态,同时在waiting池中记录这个线程。当调用notify 方法时,Condition对象会从waiting池中挑选一个线程,通知其调用acquire方法尝试取到锁。

Condition对象的构造函数可以接受一个Lock/RLock对象作为参数,如果没有指定,则Condition对象会在内部自行创建一个RLock。

# -*- coding: utf-8 -*-

import threading
import time

con = threading.Condition()
num = 0


# 生产者
class Producer(threading.Thread):

    def __init__(self):
        threading.Thread.__init__(self)

    def run(self):
        # 锁定线程
        global num
        while True:
            con.acquire()
            if num >= 5:
                print("火锅里面里面鱼丸数量已经到达5个,无法添加了!")
                # 唤醒等待的线程
                con.notify()  # 唤醒小伙伴开吃啦
                con.wait()
            print("开始添加!!!")
            num += 1
            print("火锅里面鱼丸个数:%s" % str(num))
            time.sleep(1)
            # 释放锁
            con.release()


# 消费者
class Consumers(threading.Thread):
    def __init__(self):
        threading.Thread.__init__(self)

    def run(self):
        global num
        while True:
            con.acquire()
            if num <= 0:
                print("锅底没货了,赶紧加鱼丸吧!")
                con.notify()  # 通知生产鱼丸
                con.wait()
            print("开始吃啦!!!")
            num -= 1
            print("火锅里面剩余鱼丸数量:%s" % str(num))
            time.sleep(2)
            con.release()


a = Consumers()
b = Producer()
a.start()
b.start()

# 输出
锅底没货了,赶紧加鱼丸吧!
开始添加!!!
火锅里面鱼丸个数:1
开始添加!!!
火锅里面鱼丸个数:2
开始添加!!!
火锅里面鱼丸个数:3
开始添加!!!
火锅里面鱼丸个数:4
开始添加!!!
火锅里面鱼丸个数:5
火锅里面里面鱼丸数量已经到达5个,无法添加了!
开始吃啦!!!
火锅里面剩余鱼丸数量:4
开始吃啦!!!
火锅里面剩余鱼丸数量:3
开始吃啦!!!
火锅里面剩余鱼丸数量:2
开始吃啦!!!
火锅里面剩余鱼丸数量:1
开始吃啦!!!
火锅里面剩余鱼丸数量:0
锅底没货了,赶紧加鱼丸吧!
开始添加!!!
火锅里面鱼丸个数:1
开始添加!!!
火锅里面鱼丸个数:2
开始添加!!!
火锅里面鱼丸个数:3
开始添加!!!
火锅里面鱼丸个数:4
开始添加!!!
火锅里面鱼丸个数:5
火锅里面里面鱼丸数量已经到达5个,无法添加了!
开始吃啦!!!
火锅里面剩余鱼丸数量:4
开始吃啦!!!
火锅里面剩余鱼丸数量:3
开始吃啦!!!
火锅里面剩余鱼丸数量:2
开始吃啦!!!
火锅里面剩余鱼丸数量:1
开始吃啦!!!
火锅里面剩余鱼丸数量:0
锅底没货了,赶紧加鱼丸吧!
开始添加!!!
火锅里面鱼丸个数:1
开始添加!!!
火锅里面鱼丸个数:2
...

我们可以根据所谓的wait池构建一个带有缓冲区的生产者-消费者模型,即缓冲区好比一个火锅,生产者可以不断生产鱼丸知道火锅装满,然后告知消费者消费,而消费者也可以判断火锅是否空了从而告知生产者继续生产鱼丸:

定时器Timer

通过threading.Timer类可以实现n秒后执行某操作。注意一个timer对象相当于一个新的子线程。

import threading

num = 0


def createTimer():
    t = threading.Timer(2, repeat)
    t.start()
    if num == 5:
        t.cancel()


def repeat():
    global num
    num += 1
    print(num)
    createTimer()


createTimer()

# 输出
1
2
3
4
5

cancel()函数,可以在定时器被触发前,取消这个Timer。

通过读写锁(扩展)

读写锁是一种线程同步机制,它可以提高多个线程读取共享资源的并发性能。读写锁分为读锁和写锁,每个锁都具有独占和共享两种模式。

在读写锁中,多个线程可以同时获取共享读锁,并发地读取共享资源;而在获取独占写锁时,所有其他读锁和写锁都必须等待写锁释放才能继续执行。这样就可以在保证数据一致性的前提下,提高程序的并发性能。

读写锁的使用场景是,当有多个线程需要经常读取共享数据,但只有少数几个线程需要修改共享数据时,可以采用读写锁来实现同步。这样可以有效地降低线程之间的竞争,提高程序的并发性能。

rwlock 是读写锁(Read-Write Lock)的一种实现,它是一个 Python 模块,提供了多线程中的读写锁同步机制。

与普通的互斥锁不同,读写锁(RWLock)允许多个线程同时获取“读锁”,而只能有一个线程获取“写锁”。

gen_wlock()gen_rlock()rwlock 模块中提供的两个方法,用于获取写锁和读锁。

  • gen_wlock() 方法返回一个上下文管理器,可以用于获取独占的写锁。如果当前有其他线程已经持有了写锁或读锁,则当前线程将被阻塞,直到获取到写锁为止。

  • gen_rlock() 方法返回一个上下文管理器,可以用于获取共享的读锁。读锁允许多个线程同时读取共享资源,因此可以提高程序的并发性能。

下面是一个示例代码,展示了如何使用 rwlock 模块实现读写锁:

"""
多个线程处理同一个队列的不同的数据,如果有一个需求,是
实现一个多线程的程序:
从队列里每取出来一个数据,就有三个线程需要用到这个数据,一个线程把这写个数据到文件,一个线程在分析这个数据,还有一个线程对这个数进行计算。
应该怎么设计这个数据处理逻辑呢?难道要copy这个数据三份么?如果这个数据有4MB大小 三份就要12MB呢,
如果多了几个功能,需要处理这个数据,就会随着线程的增加 内存也增大,有没有好的设计方案?

pip install readerwriterlock
多个线程去同时读相同的数据,会不会有数据竞争问题? 不会存在资源竞争问题
当没有线程获取到写锁(任何线程都没有修改数据)时,所有读锁(所有读取数据的线程)不会阻塞,其他的写线程也会阻塞
当有一个线程获取到写锁(某个线程修改数据)时,所有读锁(所有读取数据的线程)和其他写数据的线程会阻塞

线程1:写数据
线程2:读数据
线程3:读数据
线程4:读数据
线程5:写数据
"""
import demo01_进程池引入
import threading
import time
from readerwriterlock import rwlock

# 定义待处理的数据
global_data = [1, 2, 3, 4, 5, 6, 7, 8, 9]
# 定义读写锁
lock = rwlock.RWLockFair()


# 定义写入文件的线程任务
def write_to_file():
    # 获取读锁
    with lock.gen_rlock():
        with open("output.txt", "a") as f:  # 将数据写入文件
            f.write(str(global_data) + "\n")
            time.sleep(2)


# 定义数据分析的线程任务
def analyze_data():
    with lock.gen_rlock():
        # 在这里分析数据,这是一个示例,实际的分析会更复杂
        print("Analyzing data: ", global_data)
        time.sleep(3)


# 定义计算数据数量的线程任务
def count_data():
    global global_data
    with lock.gen_wlock():
        global_data.append(10)  # 在数据后面追加一个数据
        print("Data count: ", len(global_data))
        print("Data: ", global_data)
        time.sleep(1)


def main():
    print("Start processing data...")
    start_time = time.time()
    
    # 创建线程
    t1 = threading.Thread(target=write_to_file)
    t2 = threading.Thread(target=analyze_data)
    t3 = threading.Thread(target=count_data)

    # 启动线程
    t1.start()
    t2.start()
    t3.start()

    # 等待线程结束
    t1.join()
    t2.join()
    t3.join()
    
    end_time = time.time()
    print("End processing data...")
    print("Total time taken: ", end_time - start_time)


if __name__ == "__main__":
    main()

读写锁和互斥锁的区别

读写锁和互斥锁都是用于同步线程的机制,但它们的工作方式和应用场景不同。

互斥锁(Mutex)是一种排他锁,它只允许一个线程独占共享资源,其他线程必须等待锁被释放后才能获取锁进行访问。互斥锁可以解决多个线程同时访问共享资源时发生竞争的问题,保证了程序数据的一致性和正确性。

读写锁(Read-Write Lock)则是一种更加灵活的锁,它将共享资源分为两类:读资源和写资源。多个线程可以同时获取读锁并进行访问,而只有一个线程可以获取写锁。这样可以提高程序的并发性能,尤其是在读操作远远多于写操作时。与互斥锁不同的是,读写锁允许多个线程同时访问共享资源,从而避免了因锁竞争导致的性能瓶颈。

需要注意的是,使用读写锁并不是一定比使用互斥锁更好。读写锁的实现比较复杂,容易出现死锁、优先级反转等问题。因此在实际应用中需要根据具体情况进行选择,合理地运用锁机制以达到最佳的性能和可靠性。

with语句使用线程锁

Python Threading中的Lock模块有acquire()和release()两种方法,这两种方法与with语句的搭配相当于,进入with语句块时候会先执行acquire()方法,语句块结束后会执行release方法。

# -*- coding: utf-8 -*-
from threading import Lock

temp_lock = Lock()

with temp_lock:
    print(temp_lock)

print(temp_lock)


# 输出
<locked _thread.lock object at 0x00000231110F01E0>
<unlocked _thread.lock object at 0x00000231110F01E0>

与之对应的有

# -*- coding: utf-8 -*-
from threading import Lock

temp_lock = Lock()

temp_lock.acquire()
try:
    print(temp_lock)
finally:
    temp_lock.release()

print(temp_lock)

# 输出
<locked _thread.lock object at 0x0000024B0DF301E0>
<unlocked _thread.lock object at 0x0000024B0DF301E0>

线程池ThreadPoolExecutor

submit 方法

Python 中的线程池可以通过 concurrent.futures 模块的 ThreadPoolExecutor 类实现。这个类提供了 submitmap 两个方法来异步执行任务。

submit 方法用于将一个可调用对象(如函数)提交给线程池进行处理,返回一个 Future 对象,可以通过该对象查询任务的执行状态及结果

submit 方法接受两个参数:第一个参数是要执行的可调用对象,可以是函数、方法或 lambda 表达式等;第二个参数是传递给可调用对象的参数,可以是位置参数、关键字参数或元组/字典。

如果可调用对象需要多个参数,可以将它们封装为元组或字典,并将其作为第二个参数传递给 submit 方法。例如:

import concurrent.futures


def add(x, y):
    return x + y


if __name__ == '__main__':
    with concurrent.futures.ThreadPoolExecutor() as executor:
        future = executor.submit(add, 2, 3)
        result = future.result()
        print(result)

# 输出
5
  • 在这个例子中,我们定义了一个 add 函数,用于计算两个数的和。然后创建一个线程池对象 executor,并使用 submit 方法提交一个任务,该任务对应 add(2, 3) 的执行,即计算 2 和 3 的和。submit 方法返回一个 Future 对象,我们可以调用其 result 方法来获取任务的执行结果并输出到控制台。

  • 需要注意的是,submit 方法不会立即启动新的线程或处理器执行任务,而是将该任务放入队列中,等待线程池中的某个线程空闲时再执行。因此,submit 方法不会阻塞主线程的执行,而是立即返回一个 Future 对象。

  • 另外,如果需要一次性提交多个任务,并获取它们的执行结果,可以使用 executor.map 方法,该方法接受一个可迭代对象作为参数,并对迭代对象中的每个元素调用指定的可调用对象。

案例1

# -*- coding: utf-8 -*-

import time
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed


def sayhello(name):
    print("%s say Hello to %s" % (threading.current_thread().getName(), name))
    time.sleep(1)
    return name


name_list = ['张三', '李四', '王二', '麻子']
start_time = time.time()
with ThreadPoolExecutor(max_workers=2) as executor:  # 创建 ThreadPoolExecutor
    future_list = [executor.submit(sayhello, name) for name in name_list]  # 提交任务


for future in as_completed(future_list):
    result = future.result()  # 获取任务结果
    print("%s get result : %s" % (threading.current_thread().getName(), result))

print('%s cost %d second' % (threading.current_thread().getName(), time.time() - start_time))


# 输出
ThreadPoolExecutor-0_0 say Hello to 张三
ThreadPoolExecutor-0_1 say Hello to 李四
ThreadPoolExecutor-0_1 say Hello to 王二
ThreadPoolExecutor-0_0 say Hello to 麻子
MainThread get result : 王二
MainThread get result : 麻子
MainThread get result : 李四
MainThread get result : 张三
MainThread cost 2 second
  • max_workers参数用来控制线程池中运行的最大线程数

  • 通过submit方法将任务提交到线程池中,一次只能提交一个.submit会立即返回结果,第一个参数是函数名,第二个参数是函数的参数,他是一个元组

  • submit方法返回是一个future对象

  • as_completed函数会将运行完的任务一个一个yield出来,它返回任务的结果与提交任务的顺序无关,谁先执行完返回谁

map方法

map 方法与 submit 方法类似,但是它可以一次性提交多个任务,并按照任务列表顺序返回结果。需要注意的是,map 方法只能接受一个可迭代对象作为参数,因此需要将多个任务封装为列表或元组。

hreadPoolExecutor.map 方法用于一次性提交多个任务,并按照任务列表顺序返回结果。与 submit 方法不同,map 方法只能接受一个可迭代对象作为参数,而不能传递任意数量和类型的额外参数。

map 方法接受两个参数:第一个参数是要执行的可调用对象,可以是函数、方法或 lambda 表达式等;第二个参数是包含任务参数的可迭代对象,用于对每个元素调用指定的可调用对象。

例如,我们可以使用 map 方法来计算多个数的平方,如下所示:

import concurrent.futures

def square(x):
    return x ** 2

if __name__ == '__main__':
    numbers = [1, 2, 3, 4, 5]
    with concurrent.futures.ThreadPoolExecutor() as executor:
        results = executor.map(square, numbers)
        for result in results:
            print(result)

# 输出
1
4
9
16
25

在这个例子中,我们定义了一个 square 函数,用于计算一个数的平方。然后创建一个包含多个数的列表 numbers。接着,使用 ThreadPoolExecutor 创建一个线程池对象 executor,并使用 map 方法提交多个任务,每个任务对应一个数的平方计算。map 方法返回一个迭代器对象 results,我们可以使用 for 循环来遍历迭代器获取所有任务的计算结果,并输出到控制台。

需要注意的是,map 方法的返回值是一个迭代器对象,该对象按照任务列表的顺序返回各个任务的结果,因此在输出结果时不需要像使用 submit 方法一样先等待所有任务完成再获取结果。如果任务执行过程中发生异常,map 方法会自动将该异常捕获并传递给迭代器对象中未处理的下一个 next 调用。如果不想在 map 方法中捕获异常,可以选择使用 submit 方法提交多个任务,并单独处理每个任务的异常。

案例2

# -*- coding: utf-8 -*-
"""线程池的回调"""
# -*- coding: utf-8 -*-

import time
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed


def sayhello(name):
    print("%s say Hello to %s" % (threading.current_thread().getName(), name))
    time.sleep(1)
    return name


name_list = ['张三', '李四', '王二', '麻子']
start_time = time.time()
with ThreadPoolExecutor(max_workers=2) as executor:  # 创建 ThreadPoolExecutor
    for data in executor.map(sayhello, name_list):
        print("%s get data : %s" % (threading.current_thread().getName(), data))  # 内部迭代中, 每个driver_id开启一个线程


print('%s cost %d second' % (threading.current_thread().getName(), time.time() - start_time))


# 输出
ThreadPoolExecutor-0_0 say Hello to 张三
ThreadPoolExecutor-0_1 say Hello to 李四
ThreadPoolExecutor-0_1 say Hello to 王二
ThreadPoolExecutor-0_0 say Hello to 麻子
MainThread get data : 张三
MainThread get data : 李四
MainThread get data : 王二
MainThread get data : 麻子
MainThread cost 2 second

  • 使用map方法,无需提前使用submit方法,map方法与python标准库中的map含义相同,都是将序列中的每个元素都执行同一个函数

  • map方法会将任务运行的结果yield出来

  • map方法返回任务结果的顺序与提交任务的顺序一致

总结

  • ThreadPoolExecutor.submit 方法和 ThreadPoolExecutor.map 方法都可以用于提交任务给线程池进行处理,但两者之间存在一些区别。

  • 首先,submit 方法是逐个提交任务,每次调用都会返回一个 Future 对象,表示一个待完成的任务。而 map 方法是一次性提交多个任务,返回一个迭代器对象,按照任务列表的顺序返回各个任务的结果。因此,在一些情况下,使用 map 方法可以更加方便和高效地处理多个任务。

  • 其次,submit 方法可以传递任意数量和类型的额外参数,用于指定任务的具体参数;而 map 方法只能接受一个可迭代对象作为参数,不能传递额外参数。也就是说,如果任务所需的参数是固定的,而且任务量比较大,那么使用 map 方法可以更加优雅和简洁;如果需要传递不同的参数,并且任务量不大,那么使用 submit 方法更加灵活。

  • 另外,使用 submit 方法时,可以通过 Future 对象的方法获取任务的执行状态和结果,还可以对任务进行取消、重新安排等操作;而使用 map 方法时,由于任务已经全部提交给线程池处理,无法对单个任务进行操作或查询任务的执行状态,只能等待所有任务执行完毕后才能获取结果。因此,在一些需要对任务进行动态管理或轮询查询的场景中,使用 submit 方法更加合适。

  • 综上所述,ThreadPoolExecutor.submit 方法和 ThreadPoolExecutor.map 方法各有优劣,应根据具体的业务需求和场景选择合适的方法进行任务管理和处理。

队列在线程中的应用

"""
完成编程题
a、用一个队列来存储数据
b、创建一个专门生产数据的任务函数,循环生产5次数据,每轮循环,往队列中添加20条数据,每循环一轮暂停1秒
c、创建一个专门处理数据的任务函数 循环获取队列中的数据处理,每秒处理4条数据。
d、创建一个线程(或进程)生产数据 ,3个线程(或进程)处理数据
e、统计数据生产并获取完  程序运行的总时长

"""
import queue
import threading
# 创建队列
data_queue = queue.Queue()


# 定义生产数据的任务函数
def produce_data():
    for round in range(5):
        for i in range(20):
            data = round + 1
            data_queue.put(data)
        time.sleep(1)  # 暂停1秒


# 定义处理数据的任务函数
def consume_data():
    while True:
        try:
            # 从队列中获取数据
            data = data_queue.get(timeout=1)
            # 处理数据,这里仅打印出来
            print("Processing data: ", data)
            time.sleep(0.25)  # 处理4条数据需要1秒钟
            data_queue.task_done()  # 标记当前数据已经处理完毕
        except queue.Empty:
            # 如果队列为空,则退出循环
            break


def main():
    start_time = time.perf_counter()
    print("Start processing data...")

    # 创建生产数据的线程
    producer_thread = threading.Thread(target=produce_data)

    # 创建处理数据的线程池
    num_workers = 3  # 使用3个线程来处理数据
    worker_threads = []
    for i in range(num_workers):
        t = threading.Thread(target=consume_data)
        worker_threads.append(t)

    # 启动线程
    producer_thread.start()
    for t in worker_threads:
        t.start()

    # 等待所有线程结束
    producer_thread.join()
    for t in worker_threads:
        t.join()

    # 统计运行时间
    end_time = time.perf_counter()
    print("End processing data...")
    print("Total time taken: ", end_time - start_time)


if __name__ == "__main__":
    main()