概念
线程是处理器调度和分配的基本单位,进程则作为资源拥有的基本单位。每个进程是由私有的虚拟地址空间、代码、数据和其它各种系统资源组成。线程是进程内部的一个执行单元。每一个进程至少有一个主执行线程,它无需由用户去主动创建,是由系统自动创建的。 用户根据需要在应用程序中创建其它线程,多个线程并发地运行于同一个进程中。
创建线程的方式-threading
方法1
在实例化一个线程对象时,将要执行的任务函数以参数的形式传入threading
# -*- coding: utf-8 -*-
import time
import threading
import datetime
def printNumber(n: int) -> None:
while True:
times = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
print(f'{times}-{n}')
time.sleep(n)
for i in range(1, 3):
t = threading.Thread(target=printNumber, args=(i,))
t.start()
# 输出
2022-12-16 11:04:40-1
2022-12-16 11:04:40-2
2022-12-16 11:04:41-1
2022-12-16 11:04:42-2
2022-12-16 11:04:42-1
2022-12-16 11:04:43-1
2022-12-16 11:04:44-2
2022-12-16 11:04:44-1
2022-12-16 11:04:45-1
2022-12-16 11:04:46-2
2022-12-16 11:04:46-1
2022-12-16 11:04:47-1
....
Process finished with exit code -1
创建两个线程,一个线程每隔一秒打印一个“1”,另一个线程每隔2秒打印一个“2”
Thread 类提供了如下的 init() 构造器,可以用来创建线程:
__init__(self, group=None, target=None, name=None, args=(), kwargs=None, *,daemon=None)
此构造方法中,以上所有参数都是可选参数,即可以使用,也可以忽略。其中各个参数的含义如下:
group
:如果线程被放置在 ThreadGroup 中,则为该组。现在已弃用,并在 Python 3.9 中将删除。target
:要调用的可调用对象(函数或方法)。name
:线程名称,通常用于调试、日志记录和异常处理。args
:以元组形式传递给target
的位置参数。kwargs
:以字典形式传递给target
的关键字参数。daemon
:线程是否是一个守护程序线程。默认值是 None, 表示继承主线程的 daemon 值。开启则设置daemon=True。
这些参数,初学者只需记住 target、args、kwargs 这 3 个参数的功能即可。
但是线程需要手动启动才能运行,threading 模块提供了 start() 方法用来启动线程。因此在上面程序的基础上,添加如下语句: t.start()
方法2
通过继承 Thread 类,我们可以自定义一个线程类,从而实例化该类对象,获得子线程。
需要注意的是,在创建 Thread 类的子类时,必须重写从父类继承得到的 run() 方法。因为该方法即为要创建的子线程执行的方法,其功能如同第一种创建方法中的 printNumber() 自定义函数。
传参数则需要重写 init 方法(注意重写之后要调用父类的init方法)
# -*- coding: utf-8 -*-
import datetime
import time
import threading
class MyThread(threading.Thread):
def __init__(self, n):
self.n = n
# 注意:一定要调用父类的初始化函数
super().__init__()
def run(self) -> None:
while True:
times = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
print(f'{times}-{self.n}')
time.sleep(self.n)
for i in range(1, 3):
t = MyThread(i)
t.start()
# 输出
2022-12-16 12:43:24-1
2022-12-16 12:43:24-2
2022-12-16 12:43:25-1
2022-12-16 12:43:26-2
2022-12-16 12:43:26-1
2022-12-16 12:43:27-1
2022-12-16 12:43:28-2
...
主线程和子线程
# -*- coding: utf-8 -*-
import datetime
import time
import threading
class MyThread(threading.Thread):
def __init__(self, n):
self.n = n
# 注意:一定要调用父类的初始化函数,否则无法创建线程
super().__init__()
def run(self) -> None:
while True:
_count = threading.active_count()
threading_name = threading.current_thread().getName()
times = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
print(f'{times}-{self.n}-"当前活跃的线程个数:{_count}"-"当前线程的名称是":{threading_name}')
time.sleep(self.n)
for i in range(1, 3):
t = MyThread(i)
t.start()
print(threading.current_thread().getName())
# 输出
2022-12-16 13:18:00-1-"当前活跃的线程个数:2"-"当前线程的名称是":Thread-1
MainThread
2022-12-16 13:18:00-2-"当前活跃的线程个数:3"-"当前线程的名称是":Thread-2
MainThread
2022-12-16 13:18:01-1-"当前活跃的线程个数:3"-"当前线程的名称是":Thread-1
2022-12-16 13:18:02-2-"当前活跃的线程个数:3"-"当前线程的名称是":Thread-2
2022-12-16 13:18:02-1-"当前活跃的线程个数:3"-"当前线程的名称是":Thread-1
2022-12-16 13:18:03-1-"当前活跃的线程个数:3"-"当前线程的名称是":Thread-1
2022-12-16 13:18:04-2-"当前活跃的线程个数:3"-"当前线程的名称是":Thread-2
2022-12-16 13:18:04-1-"当前活跃的线程个数:3"-"当前线程的名称是":Thread-1
...
注意: 第一次t.start()后,当前存在两个线程(主线程+子线程),第二次t.start()的时候又创建了一个子线程所以当前存在三个线程
如果程序中不显式创建任何线程,则所有程序的执行,都将由主线程 MainThread 完成,程序就只能按照顺序依次执行。
此程序中,子线程 Thread-1和Thread-2 执行的是 run() 方法中的代码,而 MainThread 执行的是主程序中的代码,它们以快速轮换 CPU 的方式在执行。
守护线程(Daemon Thread)
守护线程(Daemon Thread)也叫后台进程,它的目的是为其他线程提供服务。如果其他线程被杀死了,那么守护线程也就没有了存在的必要。因此守护线程会随着非守护线程的消亡而消亡。Thread类中,子线程被创建时默认是非守护线程,我们可以通过setDaemon(True)将一个子线程设置为守护线程。
# -*- coding: utf-8 -*-
import datetime
import time
import threading
class MyThread(threading.Thread):
def __init__(self, n):
self.n = n
# 注意:一定要调用父类的初始化函数,否则无法创建线程
super().__init__()
def run(self) -> None:
while True:
_count = threading.active_count()
threading_name = threading.current_thread().getName()
times = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
print(f'{times}-{self.n}-"当前活跃的线程个数:{_count}"-"当前线程的名称是":{threading_name}')
time.sleep(self.n)
for i in range(1, 3):
t = MyThread(i)
t.setDaemon(True)
t.start()
print(threading.current_thread().getName())
# 输出
2022-12-16 13:27:46-1-"当前活跃的线程个数:2"-"当前线程的名称是":Thread-1
MainThread
2022-12-16 13:27:46-2-"当前活跃的线程个数:3"-"当前线程的名称是":Thread-2
MainThread
将两个子线程改写为守护线程,因为当主程序中的代码执行完后,主线程就可以结束了,这时候被设定为守护线程的两个子线程会被杀死,然后主线程结束。
在主程序中,我们使用循环创建了两个 MyThread
的对象,并调用它们的 start()
方法启动线程。我们还使用 t.setDaemon(True)
将这两个线程设置为后台线程(即 Daemon 线程),使得在主线程结束时它们能够自动退出。
需要注意的是,Daemon 线程通常用于支持其他线程或程序的运行,例如垃圾回收线程。它们一般不需要进行清理工作,因此可以在主程序结束时自动退出,而不必等待其他线程结束。
注意,当前台线程死亡后,Python 解释器会通知后台线程死亡,但是从它接收指令到做出响应需要一定的时间。如果要将某个线程设置为后台线程,则必须在该线程启动之前进行设置。也就是说,将 daemon 属性设为 True,必须在 start() 方法调用之前进行,否则会引发 RuntimeError 异常。
若将两个子线程的其中一个设置为守护线程,另一个设置为非守护线程
# -*- coding: utf-8 -*-
import datetime
import time
import threading
class MyThread(threading.Thread):
def __init__(self, n):
self.n = n
# 注意:一定要调用父类的初始化函数,否则无法创建线程
super().__init__()
def run(self) -> None:
while True:
_count = threading.active_count()
threading_name = threading.current_thread().getName()
times = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
print(f'{times}-{self.n}-"当前活跃的线程个数:{_count}"-"当前线程的名称是":{threading_name}')
time.sleep(self.n)
for i in range(1, 3):
t = MyThread(i)
if i == 1:
t.setDaemon(True)
t.start()
print(threading.current_thread().getName())
# 输出
2022-12-16 13:30:17-1-"当前活跃的线程个数:2"-"当前线程的名称是":Thread-1
MainThread
2022-12-16 13:30:17-2-"当前活跃的线程个数:3"-"当前线程的名称是":Thread-2
MainThread
2022-12-16 13:30:18-1-"当前活跃的线程个数:3"-"当前线程的名称是":Thread-1
2022-12-16 13:30:19-1-"当前活跃的线程个数:3"-"当前线程的名称是":Thread-1
2022-12-16 13:30:19-2-"当前活跃的线程个数:3"-"当前线程的名称是":Thread-2
2022-12-16 13:30:20-1-"当前活跃的线程个数:3"-"当前线程的名称是":Thread-1
...
此时非守护线程作为前台程序还在继续执行,守护线程就还有“守护”的意义,就会继续执行。
join()方法
不使用join方法:
当设置多个线程时,在一般情况下(无守护线程,setDeamon=False),多个线程同时启动,主线程执行完,会等待其他子线程执行完,程序才会退出。
# -*- coding: utf-8 -*-
import datetime
import time
import threading
class MyThread(threading.Thread):
def __init__(self, n):
self.n = n
# 注意:一定要调用父类的初始化函数,否则无法创建线程
super().__init__()
def run(self) -> None:
_count = threading.active_count()
threading_name = threading.current_thread().getName()
times = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
time.sleep(1)
print(f'{times}-{self.n}-"当前活跃的线程个数:{_count}"-"当前线程的名称是":{threading_name}')
start_time = time.time()
print(f'{start_time},这是主线程:', threading.current_thread().name)
for i in range(5):
t = MyThread(i)
# t.setDaemon(True)
t.start()
# t.join()
end_time = time.time()
print(f'{end_time},主线程结束了!', threading.current_thread().name)
print('一共用时:', end_time - start_time)
# 输出
1671174404.6552384,这是主线程: MainThread
1671174404.656239,主线程结束了! MainThread
一共用时: 0.0010006427764892578
2022-12-16 15:06:44-0-"当前活跃的线程个数:2"-"当前线程的名称是":Thread-1
2022-12-16 15:06:44-1-"当前活跃的线程个数:3"-"当前线程的名称是":Thread-2
2022-12-16 15:06:44-2-"当前活跃的线程个数:4"-"当前线程的名称是":Thread-3
2022-12-16 15:06:44-3-"当前活跃的线程个数:5"-"当前线程的名称是":Thread-4
2022-12-16 15:06:44-4-"当前活跃的线程个数:6"-"当前线程的名称是":Thread-5
我们的计时是对主线程计时,主线程结束,计时随之结束,打印出主线程的用时。
主线程的任务完成之后,主线程随之结束,子线程继续执行自己的任务,直到全部的子线程的任务全部结束,程序结束。
使用join()方法:
当调用一个线程的
join()
方法时,当前线程(通常是主线程)会阻塞等待该线程执行完毕。如果该线程已经执行完毕,或者已经被取消或者出现了异常等情况,则join()
方法会立即返回;否则,它会一直等待直到该线程执行完毕。使用
join()
方法可以确保线程顺序执行,避免出现竞争和死锁等问题。例如,在主线程中创建了多个子线程并启动后,可以使用join()
方法等待它们全部执行完毕,然后再继续执行主线程的下一步任务。
# -*- coding: utf-8 -*-
import datetime
import time
import threading
class MyThread(threading.Thread):
def __init__(self, n):
self.n = n
# 注意:一定要调用父类的初始化函数,否则无法创建线程
super().__init__()
def run(self) -> None:
_count = threading.active_count()
threading_name = threading.current_thread().getName()
times = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
time.sleep(1)
print(f'{times}-{self.n}-"当前活跃的线程个数:{_count}"-"当前线程的名称是":{threading_name}')
start_time = time.time()
print(f'{start_time},这是主线程:', threading.current_thread().name)
for i in range(5):
t = MyThread(i)
# t.setDaemon(True)
t.start()
t.join()
end_time = time.time()
print(f'{end_time},主线程结束了!', threading.current_thread().name)
print('一共用时:', end_time - start_time)
# 输出
1671174502.0245655,这是主线程: MainThread
2022-12-16 15:08:22-0-"当前活跃的线程个数:2"-"当前线程的名称是":Thread-1
2022-12-16 15:08:23-1-"当前活跃的线程个数:2"-"当前线程的名称是":Thread-2
2022-12-16 15:08:24-2-"当前活跃的线程个数:2"-"当前线程的名称是":Thread-3
2022-12-16 15:08:25-3-"当前活跃的线程个数:2"-"当前线程的名称是":Thread-4
2022-12-16 15:08:26-4-"当前活跃的线程个数:2"-"当前线程的名称是":Thread-5
1671174507.0313594,主线程结束了! MainThread
一共用时: 5.006793975830078
Process finished with exit code 0
join()方法的timeout参数
join的语法结构为join(timeout=None)
,可以看到join()方法有一个timeout参数,其默认值为None,而参数timeout可以进行赋值,其含义是指定等待被join的线程的时间最长为timeout秒,也就是说当在timeout秒内被join的线程还没有执行结束的话,就不再进行等待了。
# -*- coding: utf-8 -*-
import datetime
import time
import threading
class MyThread(threading.Thread):
def __init__(self, n):
self.n = n
# 注意:一定要调用父类的初始化函数,否则无法创建线程
super().__init__()
def run(self) -> None:
_count = threading.active_count()
threading_name = threading.current_thread().getName()
times = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
time.sleep(5)
print(f'{times}-{self.n}-"当前活跃的线程个数:{_count}"-"当前线程的名称是":{threading_name}')
start_time = time.time()
print(f'{start_time},这是主线程:', threading.current_thread().name)
for i in range(5):
t = MyThread(i)
# t.setDaemon(True)
t.start()
t.join(2)
end_time = time.time()
print(f'{end_time},主线程结束了!', threading.current_thread().name)
print('一共用时:', end_time - start_time)
# 输出
1671175114.663927,这是主线程: MainThread
2022-12-16 15:18:34-0-"当前活跃的线程个数:2"-"当前线程的名称是":Thread-1
2022-12-16 15:18:36-1-"当前活跃的线程个数:3"-"当前线程的名称是":Thread-2
2022-12-16 15:18:38-2-"当前活跃的线程个数:4"-"当前线程的名称是":Thread-3
1671175124.6681008,主线程结束了! MainThread
一共用时: 10.004173755645752
2022-12-16 15:18:40-3-"当前活跃的线程个数:4"-"当前线程的名称是":Thread-4
2022-12-16 15:18:42-4-"当前活跃的线程个数:4"-"当前线程的名称是":Thread-5
Process finished with exit code 0
线程锁
线程之间是共用同一块内存的,那么线程可以共享全局变量,如果多个线程同时对同一个全局变量操作,会出现资源竞争问题,从而数据结果会不正确
# -*- coding: utf-8 -*-
import datetime
import threading
import time
number = 0
def add():
global number # global声明此处的number是外面的全局变量number
for _ in range(10000000): # 进行一个大数级别的循环加一运算
number += 1
times = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
print(f'{times}-"当前活跃的线程个数:{threading.active_count()}"')
print("子线程%s运算结束后,number = %s" % (threading.current_thread().getName(), number))
print('------------------------------')
for i in range(2): # 用2个子线程,就可以观察到脏数据
t = threading.Thread(target=add)
t.start()
time.sleep(2) # 等待2秒,确保2个子线程都已经结束运算。
print("主线程执行完毕后,number = ", number)
# 输出
2022-12-20 13:13:05-"当前活跃的线程个数:3"
子线程Thread-1运算结束后,number = 11966305
------------------------------
2022-12-20 13:13:05-"当前活跃的线程个数:2"
子线程Thread-2运算结束后,number = 12272268
------------------------------
主线程执行完毕后,number = 12272268
这里创建两个子线程操作同一个全局变量number,number被初始化为0,两个子线程通过for循环对这个number进行+1,每个子线程循环10000000次,两个子线程同时进行。如果一切正常的话,最终这个number会变成20000000,然而现实并非如此。
可以很明显地看出脏数据的情况。这是因为两个线程在运行过程中,CPU随机调度,你算一会我算一会,在没有对number进行保护的情况下,就发生了数据错误
注意此时两个线程是同时开启的。
若是使用了join()的方法
# -*- coding: utf-8 -*-
# ...
for i in range(2): # 用2个子线程,就可以观察到脏数据
t = threading.Thread(target=add)
t.start()
t.join() # 添加这一行就让两个子线程变成了顺序执行
time.sleep(2) # 等待2秒,确保2个子线程都已经结束运算。
print("主线程执行完毕后,number = ", number)
# 输出
2022-12-20 13:16:02-"当前活跃的线程个数:2"
子线程Thread-1运算结束后,number = 10000000
------------------------------
2022-12-20 13:16:03-"当前活跃的线程个数:2"
子线程Thread-2运算结束后,number = 20000000
------------------------------
主线程执行完毕后,number = 20000000
虽然结果是对的,但是这样的本质是把多线程变成了单线程,失去了多线程的意义。
互斥锁Lock
互斥锁是一种独占锁,同一时刻只有一个线程可以访问共享的数据。使用很简单,初始化锁对象,然后将锁当做参数传递给任务函数,在任务中加锁,使用后释放锁。
导入
threading
模块:在 Python 中使用Lock
需要导入threading
模块。创建
Lock
对象:使用threading.Lock()
方法创建一个Lock
对象。获取锁:在需要访问共享资源的代码块前调用
Lock
对象的acquire()
方法,以获取锁。如果当前锁没有被其他线程占用,则该方法会立即返回,并将锁状态设置为“已被占用”,其他线程不能获取该锁。访问共享资源:在获取到锁后,可以访问共享资源。
释放锁:当访问共享资源完成后,需要调用
Lock
对象的release()
方法来释放锁,以便其他线程可以获取该锁。
# -*- coding: utf-8 -*-
import datetime
import threading
import time
number = 0
lock = threading.Lock()
def add(lk):
global number # global声明此处的number是外面的全局变量number
lk.acquire() # 开始加锁
for _ in range(10000000): # 进行一个大数级别的循环加一运算
number += 1
times = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
print(f'{times}-"当前活跃的线程个数:{threading.active_count()}"')
print("子线程%s运算结束后,number = %s" % (threading.current_thread().getName(), number))
print('------------------------------')
lk.release() # 释放锁,让别的线程也可以访问number
for i in range(2):
t = threading.Thread(target=add, args=(lock,))
t.start()
time.sleep(2) # 等待2秒,确保2个子线程都已经结束运算。
print("主线程执行完毕后,number = ", number)
# 输出
2022-12-20 13:34:52-"当前活跃的线程个数:3"
子线程Thread-1运算结束后,number = 10000000
------------------------------
2022-12-20 13:34:53-"当前活跃的线程个数:2"
子线程Thread-2运算结束后,number = 20000000
------------------------------
主线程执行完毕后,number = 20000000
以上代码我重新使用重新run方法的形式写一遍
import datetime
import threading
import time
class AddThread(threading.Thread):
def __init__(self, lock):
super().__init__()
self.lock = lock
self.number = 0
def run(self):
self.lock.acquire()
for _ in range(10000000):
self.number += 1
times = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
print(f'{times}-"当前活跃的线程个数:{threading.active_count()}"')
print("子线程%s运算结束后,number = %s" % (self.getName(), self.number))
print('------------------------------')
self.lock.release()
lock = threading.Lock()
threads = []
for i in range(2):
t = AddThread(lock)
t.start()
threads.append(t)
for t in threads:
t.join()
print("主线程执行完毕后,number = ", threads[0].number + threads[1].number)
# 输出
2023-05-21 10:39:28-"当前活跃的线程个数:3"
子线程Thread-1运算结束后,number = 10000000
------------------------------
2023-05-21 10:39:29-"当前活跃的线程个数:2"
子线程Thread-2运算结束后,number = 10000000
------------------------------
主线程执行完毕后,number = 20000000
Process finished with exit code 0
这里之所以使用单独的 for
循环来执行 t.join()
,是因为 join()
方法会阻塞当前线程,等待被调用线程执行结束后才会继续执行当前线程,即如果使用 for
循环遍历执行 t.join()
,那么第一个线程执行 join()
方法时会阻塞主线程,直到该线程执行结束后才会执行下一个线程的 join()
方法,如此往复,直到所有线程执行结束。
如果将 t.join()
写在 threads
创建的循环中,那么每个线程都会依次执行完毕后才会执行下一个线程的 join()
方法,这样的话就没有利用多线程的优势了。因此,需要单独使用 for
循环来执行 t.join()
方法,确保多个线程能够并发执行,以达到更好的性能。
RLock可重入锁
用于防止访问共享资源时出现不必要的阻塞。如果共享资源在RLock中,那么可以安全地再次调用它。 RLocked资源可以被不同的线程重复访问,即使它在被不同的线程调用时仍然可以正常工作。
在同一个线程中,RLock.acquire()可以被多次调用,利用该特性,可以解决部分死锁问题。
# -*- coding: utf-8 -*-
import threading
number = 0
# lock = threading.RLock()
lock = threading.Lock()
def add(lk):
global number # global声明此处的number是外面的全局变量number
lk.acquire()
number += 1
lk.acquire()
number += 2
print("子线程%s运算结束后,number = %s" % (threading.current_thread().getName(), number))
lk.release()
lk.release()
for i in range(2):
t = threading.Thread(target=add, args=(lock,))
t.start()
在上面的程序中,两个线程同时尝试访问共享资源number,这里当一个线程当前正在访问共享资源number时,另一个线程将被阻止访问它。 当两个或多个线程试图访问相同的资源时,有效地阻止了彼此访问该资源,这就是所谓的死锁,因此上述程序没有生成任何输出。
# -*- coding: utf-8 -*-
import threading
number = 0
lock = threading.RLock()
# lock = threading.Lock()
def add(lk):
global number # global声明此处的number是外面的全局变量number
lk.acquire()
number += 1
lk.acquire()
number += 2
print("子线程%s运算结束后,number = %s" % (threading.current_thread().getName(), number))
lk.release()
lk.release()
for i in range(2):
t = threading.Thread(target=add, args=(lock,))
t.start()
# 输出
子线程Thread-1运算结束后,number = 3
子线程Thread-2运算结束后,number = 6
这两种锁的主要区别是:RLock允许在同一线程中被多次acquire。而Lock却不允许这种情况。注意:如果使用RLock,那么acquire和release必须成对出现,即调用了n次acquire,必须调用n次的release才能真正释放所占用的锁
RLock
与Lock
都是 Python 中的线程锁,它们的主要区别在于是否支持重入。
Lock
是普通的线程锁,它只能被同一个线程获取一次。如果一个线程已经获取了这个锁,那么再次尝试获取锁时就会被阻塞。只有先释放锁后,其他线程才能获取到这个锁。
RLock
是可重入的线程锁,也称为递归锁或者重入锁。和普通锁不同的是,同一个线程可以多次获取这个锁。当一个线程多次获取该锁时,在释放锁之前,需要对锁进行相应数量的释放操作才能真正释放锁。这种机制可以避免在使用锁的情况下出现死锁。总体来说,
RLock
具备Lock
的所有特性,同时还支持同一个线程的重入访问。但是,由于RLock
需要维护一个计数器来记录锁的嵌套层数,因此,它可能会比Lock
消耗更多的系统资源,在某些情况下甚至会影响程序的性能。因此,通常情况下,我们只有在确实需要支持嵌套访问的情况下才会使用RLock
。需要注意的是,在使用锁的时候一定要遵循规范,以避免出现死锁等问题。在使用
RLock
的时候,尤其需要注意锁的嵌套层数,避免出现不必要的复杂度。
Semaphore信号
# -*- coding: utf-8 -*-
import datetime
import threading
import time
semaphore = threading.BoundedSemaphore(2)
def add(n):
semaphore.acquire()
times = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
_count = threading.active_count()
print(f'{times}', f"线程-{n}", f"当前活跃的子线程个数:{_count}")
time.sleep(3)
semaphore.release()
for i in range(1, 10):
t = threading.Thread(target=add, args=(i,))
t.start()
# 输出
2022-12-20 14:39:44 线程-1 当前活跃的子线程个数:2
2022-12-20 14:39:44 线程-2 当前活跃的子线程个数:3
2022-12-20 14:39:47 线程-3 当前活跃的子线程个数:8
2022-12-20 14:39:47 线程-4 当前活跃的子线程个数:8
2022-12-20 14:39:50 线程-5 当前活跃的子线程个数:6
2022-12-20 14:39:50 线程-6 当前活跃的子线程个数:6
2022-12-20 14:39:53 线程-7 当前活跃的子线程个数:4
2022-12-20 14:39:53 线程-8 当前活跃的子线程个数:4
2022-12-20 14:39:56 线程-9 当前活跃的子线程个数:2
可以看出用Semaphore来控制后,使得同一个时刻只有两个线程在请求页面
虽然当前活跃的子线程个数很多,但真正运行的子线程个数只有两个。
事件Event
Event类会在全局定义一个Flag,当Flag=False时,调用wait()方法会阻塞所有线程;而当Flag=True时,调用wait()方法不再阻塞。形象的比喻就是“红绿灯”:在红灯时阻塞所有线程,而在在绿灯的时候,一次性放行所有排队中的线程。Event类有四个方法:
set():将Flag设置为True
wait():等待
clear():将Flag设置为False
is_set():返回bool值,判断Flag是否为True
# -*- coding: utf-8 -*-
import threading
import time
import datetime
class Boss(threading.Thread):
def run(self):
print("BOSS:伙计们今晚上加班到22:00")
event.set()
time.sleep(5) # 模拟一个小时这段时间
print("BOSS:22:00了可以下班了")
event.set()
class Worker(threading.Thread):
def run(self):
print(f'boss发话了吗:{event.is_set()}')
event.wait() # 等待event为真 此列是等待老板发话
times = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
print(f'{times}---woker:命苦啊')
time.sleep(1) # 模拟工作中
event.clear() # 清除Event对象内部的信号标志,即将其设为假,此处等待领导发话
event.wait() # Event对象wait的方法只有在内部信号为真的时候才会很快的执行并完成返回。
times = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
print(f'{times}--Woker:OhYeah')
if __name__ == "__main__":
event = threading.Event()
threads = []
for i in range(5):
threads.append(Worker())
threads.append(Boss())
for t in threads:
t.start()
for t in threads:
t.join()
print("公司下班了")
# 输出
boss发话了吗:False
boss发话了吗:False
boss发话了吗:False
boss发话了吗:False
boss发话了吗:False
BOSS:伙计们今晚上加班到22:00
2022-12-21 14:12:52---woker:命苦啊
2022-12-21 14:12:52---woker:命苦啊
2022-12-21 14:12:52---woker:命苦啊
2022-12-21 14:12:52---woker:命苦啊
2022-12-21 14:12:52---woker:命苦啊
BOSS:22:00了可以下班了
2022-12-21 14:12:57--Woker:OhYeah
2022-12-21 14:12:57--Woker:OhYeah
2022-12-21 14:12:57--Woker:OhYeah
2022-12-21 14:12:57--Woker:OhYeah
2022-12-21 14:12:57--Woker:OhYeah
公司下班了
Event的一个好处是:可以实现线程间通信,通过一个线程去控制另一个线程。
condition条件变量
Condition
称作条件锁,依然是通过acquire()/release()加锁解锁。
wait([timeout])
方法将使线程进入Condition的等待池等待通知,并释放锁。使用前线程必须已获得锁定,否则将抛出异常。
notify()
方法将从等待池挑选一个线程并通知,收到通知的线程将自动调用acquire()
尝试获得锁定(进入锁定池),其他线程仍然在等待池中。调用这个方法不会释放锁定。使用前线程必须已获得锁定,否则将抛出异常。
notifyAll()
方法将通知等待池中所有的线程,这些线程都将进入锁定池尝试获得锁定。调用这个方法不会释放锁定。使用前线程必须已获得锁定,否则将抛出异常。
Python提供的Condition对象提供了对复杂线程同步问题的支持。Condition被称为条件变量,除了提供与Lock类似的 acquire和release方法外,还提供了wait和notify方法。线程首先acquire一个条件变量,然后判断一些条件。如果条件不满足则 wait;如果条件满足,进行一些处理改变条件后,通过notify方法通知其他线程,其他处于wait状态的线程接到通知后会重新判断条件。不断的重复 这一过程,从而解决复杂的同步问题。
可以认为Condition对象维护了一个锁(Lock/RLock)和一个waiting池。线程通过acquire获得Condition对 象,当调用wait方法时,线程会释放Condition内部的锁并进入blocked状态,同时在waiting池中记录这个线程。当调用notify 方法时,Condition对象会从waiting池中挑选一个线程,通知其调用acquire方法尝试取到锁。
Condition对象的构造函数可以接受一个Lock/RLock对象作为参数,如果没有指定,则Condition对象会在内部自行创建一个RLock。
# -*- coding: utf-8 -*-
import threading
import time
con = threading.Condition()
num = 0
# 生产者
class Producer(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
def run(self):
# 锁定线程
global num
while True:
con.acquire()
if num >= 5:
print("火锅里面里面鱼丸数量已经到达5个,无法添加了!")
# 唤醒等待的线程
con.notify() # 唤醒小伙伴开吃啦
con.wait()
print("开始添加!!!")
num += 1
print("火锅里面鱼丸个数:%s" % str(num))
time.sleep(1)
# 释放锁
con.release()
# 消费者
class Consumers(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
def run(self):
global num
while True:
con.acquire()
if num <= 0:
print("锅底没货了,赶紧加鱼丸吧!")
con.notify() # 通知生产鱼丸
con.wait()
print("开始吃啦!!!")
num -= 1
print("火锅里面剩余鱼丸数量:%s" % str(num))
time.sleep(2)
con.release()
a = Consumers()
b = Producer()
a.start()
b.start()
# 输出
锅底没货了,赶紧加鱼丸吧!
开始添加!!!
火锅里面鱼丸个数:1
开始添加!!!
火锅里面鱼丸个数:2
开始添加!!!
火锅里面鱼丸个数:3
开始添加!!!
火锅里面鱼丸个数:4
开始添加!!!
火锅里面鱼丸个数:5
火锅里面里面鱼丸数量已经到达5个,无法添加了!
开始吃啦!!!
火锅里面剩余鱼丸数量:4
开始吃啦!!!
火锅里面剩余鱼丸数量:3
开始吃啦!!!
火锅里面剩余鱼丸数量:2
开始吃啦!!!
火锅里面剩余鱼丸数量:1
开始吃啦!!!
火锅里面剩余鱼丸数量:0
锅底没货了,赶紧加鱼丸吧!
开始添加!!!
火锅里面鱼丸个数:1
开始添加!!!
火锅里面鱼丸个数:2
开始添加!!!
火锅里面鱼丸个数:3
开始添加!!!
火锅里面鱼丸个数:4
开始添加!!!
火锅里面鱼丸个数:5
火锅里面里面鱼丸数量已经到达5个,无法添加了!
开始吃啦!!!
火锅里面剩余鱼丸数量:4
开始吃啦!!!
火锅里面剩余鱼丸数量:3
开始吃啦!!!
火锅里面剩余鱼丸数量:2
开始吃啦!!!
火锅里面剩余鱼丸数量:1
开始吃啦!!!
火锅里面剩余鱼丸数量:0
锅底没货了,赶紧加鱼丸吧!
开始添加!!!
火锅里面鱼丸个数:1
开始添加!!!
火锅里面鱼丸个数:2
...
我们可以根据所谓的wait池构建一个带有缓冲区的生产者-消费者模型,即缓冲区好比一个火锅,生产者可以不断生产鱼丸知道火锅装满,然后告知消费者消费,而消费者也可以判断火锅是否空了从而告知生产者继续生产鱼丸:
定时器Timer
通过threading.Timer类可以实现n秒后执行某操作。注意一个timer对象相当于一个新的子线程。
import threading
num = 0
def createTimer():
t = threading.Timer(2, repeat)
t.start()
if num == 5:
t.cancel()
def repeat():
global num
num += 1
print(num)
createTimer()
createTimer()
# 输出
1
2
3
4
5
cancel()
函数,可以在定时器被触发前,取消这个Timer。
通过读写锁(扩展)
读写锁是一种线程同步机制,它可以提高多个线程读取共享资源的并发性能。读写锁分为读锁和写锁,每个锁都具有独占和共享两种模式。
在读写锁中,多个线程可以同时获取共享读锁,并发地读取共享资源;而在获取独占写锁时,所有其他读锁和写锁都必须等待写锁释放才能继续执行。这样就可以在保证数据一致性的前提下,提高程序的并发性能。
读写锁的使用场景是,当有多个线程需要经常读取共享数据,但只有少数几个线程需要修改共享数据时,可以采用读写锁来实现同步。这样可以有效地降低线程之间的竞争,提高程序的并发性能。
rwlock
是读写锁(Read-Write Lock)的一种实现,它是一个 Python 模块,提供了多线程中的读写锁同步机制。
与普通的互斥锁不同,读写锁(RWLock)允许多个线程同时获取“读锁”,而只能有一个线程获取“写锁”。
gen_wlock()
和gen_rlock()
是rwlock
模块中提供的两个方法,用于获取写锁和读锁。
gen_wlock()
方法返回一个上下文管理器,可以用于获取独占的写锁。如果当前有其他线程已经持有了写锁或读锁,则当前线程将被阻塞,直到获取到写锁为止。gen_rlock()
方法返回一个上下文管理器,可以用于获取共享的读锁。读锁允许多个线程同时读取共享资源,因此可以提高程序的并发性能。
下面是一个示例代码,展示了如何使用 rwlock
模块实现读写锁:
"""
多个线程处理同一个队列的不同的数据,如果有一个需求,是
实现一个多线程的程序:
从队列里每取出来一个数据,就有三个线程需要用到这个数据,一个线程把这写个数据到文件,一个线程在分析这个数据,还有一个线程对这个数进行计算。
应该怎么设计这个数据处理逻辑呢?难道要copy这个数据三份么?如果这个数据有4MB大小 三份就要12MB呢,
如果多了几个功能,需要处理这个数据,就会随着线程的增加 内存也增大,有没有好的设计方案?
pip install readerwriterlock
多个线程去同时读相同的数据,会不会有数据竞争问题? 不会存在资源竞争问题
当没有线程获取到写锁(任何线程都没有修改数据)时,所有读锁(所有读取数据的线程)不会阻塞,其他的写线程也会阻塞
当有一个线程获取到写锁(某个线程修改数据)时,所有读锁(所有读取数据的线程)和其他写数据的线程会阻塞
线程1:写数据
线程2:读数据
线程3:读数据
线程4:读数据
线程5:写数据
"""
import demo01_进程池引入
import threading
import time
from readerwriterlock import rwlock
# 定义待处理的数据
global_data = [1, 2, 3, 4, 5, 6, 7, 8, 9]
# 定义读写锁
lock = rwlock.RWLockFair()
# 定义写入文件的线程任务
def write_to_file():
# 获取读锁
with lock.gen_rlock():
with open("output.txt", "a") as f: # 将数据写入文件
f.write(str(global_data) + "\n")
time.sleep(2)
# 定义数据分析的线程任务
def analyze_data():
with lock.gen_rlock():
# 在这里分析数据,这是一个示例,实际的分析会更复杂
print("Analyzing data: ", global_data)
time.sleep(3)
# 定义计算数据数量的线程任务
def count_data():
global global_data
with lock.gen_wlock():
global_data.append(10) # 在数据后面追加一个数据
print("Data count: ", len(global_data))
print("Data: ", global_data)
time.sleep(1)
def main():
print("Start processing data...")
start_time = time.time()
# 创建线程
t1 = threading.Thread(target=write_to_file)
t2 = threading.Thread(target=analyze_data)
t3 = threading.Thread(target=count_data)
# 启动线程
t1.start()
t2.start()
t3.start()
# 等待线程结束
t1.join()
t2.join()
t3.join()
end_time = time.time()
print("End processing data...")
print("Total time taken: ", end_time - start_time)
if __name__ == "__main__":
main()
读写锁和互斥锁的区别
读写锁和互斥锁都是用于同步线程的机制,但它们的工作方式和应用场景不同。
互斥锁(Mutex)是一种排他锁,它只允许一个线程独占共享资源,其他线程必须等待锁被释放后才能获取锁进行访问。互斥锁可以解决多个线程同时访问共享资源时发生竞争的问题,保证了程序数据的一致性和正确性。
读写锁(Read-Write Lock)则是一种更加灵活的锁,它将共享资源分为两类:读资源和写资源。多个线程可以同时获取读锁并进行访问,而只有一个线程可以获取写锁。这样可以提高程序的并发性能,尤其是在读操作远远多于写操作时。与互斥锁不同的是,读写锁允许多个线程同时访问共享资源,从而避免了因锁竞争导致的性能瓶颈。
需要注意的是,使用读写锁并不是一定比使用互斥锁更好。读写锁的实现比较复杂,容易出现死锁、优先级反转等问题。因此在实际应用中需要根据具体情况进行选择,合理地运用锁机制以达到最佳的性能和可靠性。
with语句使用线程锁
Python Threading中的Lock模块有acquire()和release()两种方法,这两种方法与with语句的搭配相当于,进入with语句块时候会先执行acquire()方法,语句块结束后会执行release方法。
# -*- coding: utf-8 -*-
from threading import Lock
temp_lock = Lock()
with temp_lock:
print(temp_lock)
print(temp_lock)
# 输出
<locked _thread.lock object at 0x00000231110F01E0>
<unlocked _thread.lock object at 0x00000231110F01E0>
与之对应的有
# -*- coding: utf-8 -*-
from threading import Lock
temp_lock = Lock()
temp_lock.acquire()
try:
print(temp_lock)
finally:
temp_lock.release()
print(temp_lock)
# 输出
<locked _thread.lock object at 0x0000024B0DF301E0>
<unlocked _thread.lock object at 0x0000024B0DF301E0>
线程池ThreadPoolExecutor
submit
方法
Python 中的线程池可以通过 concurrent.futures
模块的 ThreadPoolExecutor
类实现。这个类提供了 submit
和 map
两个方法来异步执行任务。
submit
方法用于将一个可调用对象(如函数)提交给线程池进行处理,返回一个 Future
对象,可以通过该对象查询任务的执行状态及结果
submit
方法接受两个参数:第一个参数是要执行的可调用对象,可以是函数、方法或 lambda 表达式等;第二个参数是传递给可调用对象的参数,可以是位置参数、关键字参数或元组/字典。
如果可调用对象需要多个参数,可以将它们封装为元组或字典,并将其作为第二个参数传递给 submit
方法。例如:
import concurrent.futures
def add(x, y):
return x + y
if __name__ == '__main__':
with concurrent.futures.ThreadPoolExecutor() as executor:
future = executor.submit(add, 2, 3)
result = future.result()
print(result)
# 输出
5
在这个例子中,我们定义了一个
add
函数,用于计算两个数的和。然后创建一个线程池对象executor
,并使用submit
方法提交一个任务,该任务对应add(2, 3)
的执行,即计算 2 和 3 的和。submit
方法返回一个Future
对象,我们可以调用其result
方法来获取任务的执行结果并输出到控制台。需要注意的是,
submit
方法不会立即启动新的线程或处理器执行任务,而是将该任务放入队列中,等待线程池中的某个线程空闲时再执行。因此,submit
方法不会阻塞主线程的执行,而是立即返回一个Future
对象。另外,如果需要一次性提交多个任务,并获取它们的执行结果,可以使用
executor.map
方法,该方法接受一个可迭代对象作为参数,并对迭代对象中的每个元素调用指定的可调用对象。
案例1
# -*- coding: utf-8 -*-
import time
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
def sayhello(name):
print("%s say Hello to %s" % (threading.current_thread().getName(), name))
time.sleep(1)
return name
name_list = ['张三', '李四', '王二', '麻子']
start_time = time.time()
with ThreadPoolExecutor(max_workers=2) as executor: # 创建 ThreadPoolExecutor
future_list = [executor.submit(sayhello, name) for name in name_list] # 提交任务
for future in as_completed(future_list):
result = future.result() # 获取任务结果
print("%s get result : %s" % (threading.current_thread().getName(), result))
print('%s cost %d second' % (threading.current_thread().getName(), time.time() - start_time))
# 输出
ThreadPoolExecutor-0_0 say Hello to 张三
ThreadPoolExecutor-0_1 say Hello to 李四
ThreadPoolExecutor-0_1 say Hello to 王二
ThreadPoolExecutor-0_0 say Hello to 麻子
MainThread get result : 王二
MainThread get result : 麻子
MainThread get result : 李四
MainThread get result : 张三
MainThread cost 2 second
max_workers参数用来控制线程池中运行的最大线程数
通过submit方法将任务提交到线程池中,一次只能提交一个.submit会立即返回结果,第一个参数是函数名,第二个参数是函数的参数,他是一个元组
submit方法返回是一个future对象
as_completed函数会将运行完的任务一个一个yield出来,它返回任务的结果与提交任务的顺序无关,谁先执行完返回谁
map方法
map
方法与 submit
方法类似,但是它可以一次性提交多个任务,并按照任务列表顺序返回结果。需要注意的是,map
方法只能接受一个可迭代对象作为参数,因此需要将多个任务封装为列表或元组。
hreadPoolExecutor.map
方法用于一次性提交多个任务,并按照任务列表顺序返回结果。与 submit
方法不同,map
方法只能接受一个可迭代对象作为参数,而不能传递任意数量和类型的额外参数。
map
方法接受两个参数:第一个参数是要执行的可调用对象,可以是函数、方法或 lambda 表达式等;第二个参数是包含任务参数的可迭代对象,用于对每个元素调用指定的可调用对象。
例如,我们可以使用 map
方法来计算多个数的平方,如下所示:
import concurrent.futures
def square(x):
return x ** 2
if __name__ == '__main__':
numbers = [1, 2, 3, 4, 5]
with concurrent.futures.ThreadPoolExecutor() as executor:
results = executor.map(square, numbers)
for result in results:
print(result)
# 输出
1
4
9
16
25
在这个例子中,我们定义了一个 square
函数,用于计算一个数的平方。然后创建一个包含多个数的列表 numbers
。接着,使用 ThreadPoolExecutor
创建一个线程池对象 executor
,并使用 map
方法提交多个任务,每个任务对应一个数的平方计算。map
方法返回一个迭代器对象 results
,我们可以使用 for 循环来遍历迭代器获取所有任务的计算结果,并输出到控制台。
需要注意的是,map
方法的返回值是一个迭代器对象,该对象按照任务列表的顺序返回各个任务的结果,因此在输出结果时不需要像使用 submit
方法一样先等待所有任务完成再获取结果。如果任务执行过程中发生异常,map
方法会自动将该异常捕获并传递给迭代器对象中未处理的下一个 next
调用。如果不想在 map
方法中捕获异常,可以选择使用 submit
方法提交多个任务,并单独处理每个任务的异常。
案例2
# -*- coding: utf-8 -*-
"""线程池的回调"""
# -*- coding: utf-8 -*-
import time
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
def sayhello(name):
print("%s say Hello to %s" % (threading.current_thread().getName(), name))
time.sleep(1)
return name
name_list = ['张三', '李四', '王二', '麻子']
start_time = time.time()
with ThreadPoolExecutor(max_workers=2) as executor: # 创建 ThreadPoolExecutor
for data in executor.map(sayhello, name_list):
print("%s get data : %s" % (threading.current_thread().getName(), data)) # 内部迭代中, 每个driver_id开启一个线程
print('%s cost %d second' % (threading.current_thread().getName(), time.time() - start_time))
# 输出
ThreadPoolExecutor-0_0 say Hello to 张三
ThreadPoolExecutor-0_1 say Hello to 李四
ThreadPoolExecutor-0_1 say Hello to 王二
ThreadPoolExecutor-0_0 say Hello to 麻子
MainThread get data : 张三
MainThread get data : 李四
MainThread get data : 王二
MainThread get data : 麻子
MainThread cost 2 second
使用map方法,无需提前使用submit方法,map方法与python标准库中的map含义相同,都是将序列中的每个元素都执行同一个函数
map方法会将任务运行的结果yield出来
map方法返回任务结果的顺序与提交任务的顺序一致
总结
ThreadPoolExecutor.submit
方法和ThreadPoolExecutor.map
方法都可以用于提交任务给线程池进行处理,但两者之间存在一些区别。首先,
submit
方法是逐个提交任务,每次调用都会返回一个Future
对象,表示一个待完成的任务。而map
方法是一次性提交多个任务,返回一个迭代器对象,按照任务列表的顺序返回各个任务的结果。因此,在一些情况下,使用map
方法可以更加方便和高效地处理多个任务。其次,
submit
方法可以传递任意数量和类型的额外参数,用于指定任务的具体参数;而map
方法只能接受一个可迭代对象作为参数,不能传递额外参数。也就是说,如果任务所需的参数是固定的,而且任务量比较大,那么使用map
方法可以更加优雅和简洁;如果需要传递不同的参数,并且任务量不大,那么使用submit
方法更加灵活。另外,使用
submit
方法时,可以通过Future
对象的方法获取任务的执行状态和结果,还可以对任务进行取消、重新安排等操作;而使用map
方法时,由于任务已经全部提交给线程池处理,无法对单个任务进行操作或查询任务的执行状态,只能等待所有任务执行完毕后才能获取结果。因此,在一些需要对任务进行动态管理或轮询查询的场景中,使用submit
方法更加合适。综上所述,
ThreadPoolExecutor.submit
方法和ThreadPoolExecutor.map
方法各有优劣,应根据具体的业务需求和场景选择合适的方法进行任务管理和处理。
队列在线程中的应用
"""
完成编程题
a、用一个队列来存储数据
b、创建一个专门生产数据的任务函数,循环生产5次数据,每轮循环,往队列中添加20条数据,每循环一轮暂停1秒
c、创建一个专门处理数据的任务函数 循环获取队列中的数据处理,每秒处理4条数据。
d、创建一个线程(或进程)生产数据 ,3个线程(或进程)处理数据
e、统计数据生产并获取完 程序运行的总时长
"""
import queue
import threading
# 创建队列
data_queue = queue.Queue()
# 定义生产数据的任务函数
def produce_data():
for round in range(5):
for i in range(20):
data = round + 1
data_queue.put(data)
time.sleep(1) # 暂停1秒
# 定义处理数据的任务函数
def consume_data():
while True:
try:
# 从队列中获取数据
data = data_queue.get(timeout=1)
# 处理数据,这里仅打印出来
print("Processing data: ", data)
time.sleep(0.25) # 处理4条数据需要1秒钟
data_queue.task_done() # 标记当前数据已经处理完毕
except queue.Empty:
# 如果队列为空,则退出循环
break
def main():
start_time = time.perf_counter()
print("Start processing data...")
# 创建生产数据的线程
producer_thread = threading.Thread(target=produce_data)
# 创建处理数据的线程池
num_workers = 3 # 使用3个线程来处理数据
worker_threads = []
for i in range(num_workers):
t = threading.Thread(target=consume_data)
worker_threads.append(t)
# 启动线程
producer_thread.start()
for t in worker_threads:
t.start()
# 等待所有线程结束
producer_thread.join()
for t in worker_threads:
t.join()
# 统计运行时间
end_time = time.perf_counter()
print("End processing data...")
print("Total time taken: ", end_time - start_time)
if __name__ == "__main__":
main()
评论