python 协程简述

什么是协程

协程,又称微线程,英文名Coroutine,协程是python个中另外一种实现多任务的方式,只不过比线程更小占用更小执行单元(理解为需要的资源)。为啥说它是一个执行单元,因为它自带CPU上下文。这样只要在合适gr的时机,我们可以把一个协程切换到另一个协程。只要这个过程中保存或恢复 CPU上下文那么程序还是可以运行的。

直白的讲:

协程是线程中的一个特殊的函数,这个函数执行的时候,可以在某个地方暂停,并且可以重新在暂停处,继续运行,协程在进行切换的时候,只需要保存当前协程函数中的一些临时变量等信息,然后切换到另外一个函数中执行,并且切换的次数以及什么时候再切换到原来的函数,都可以由开发者自己决定。协程切换的时候,既不涉及到资源切换,也不涉及到操作系统的调度,而是在在同一个程序中切换不同的函数执行,所以协成占用的资源非常少,切换得时候几乎不耗费什么资源,一秒钟切换个上百万次系统都抗的住

所以说,协程与进程、线程相比并不是一个维度的概念.

gevent模块

gevent其原理是当一个协程遇到IO(指的是input output 输入输出,比如网络、文件操作等)操作时,比如访问网络,就自动切换到其他的greenlet,等到IO操作完成,再在适当的时候切换回来继续执行。由于IO操作非常耗时,经常使程序处于等待状态,有了gevent为我们自动切换协程,就保证总有greenlet在运行,而不是等待IO

gevent是一个基于协程的Python网络编程库,它使用greenlet协程库实现了类似线程池的机制,使得在一个线程中可以并发执行多个协程任务,从而达到高效并发处理的目的,消除了线程启动和切换的开销,降低了网络编程难度。

gevent可以通过monkey patching(猴子补丁)机制实现协程模型的替换,即将Python标准库中的阻塞式I/O相关函数(如socket、select等)替换成gevent提供的对应协程版本的函数,这样就可以让原本的同步阻塞程序变为异步非阻塞程序,提高程序的性能和可维护性。

import time


def worker1(n):
    print('start1', n)
    time.sleep(1)  # 模拟耗时操作
    print('end1', n)


if __name__ == '__main__':
    start_time = time.perf_counter()
    tasks2 = [worker1(i) for i in range(10)]
    end_time = time.perf_counter()
    print(f'耗时:{end_time - start_time}')
    print('------------------结束执行程序------------------')

# 输出
start1 0
end1 0
start1 1
end1 1
start1 2
end1 2
start1 3
end1 3
start1 4
end1 4
start1 5
end1 5
start1 6
end1 6
start1 7
end1 7
start1 8
end1 8
start1 9
end1 9
耗时:10.005151757
------------------结束执行程序------------------

在这段代码中,worker1函数模拟了一个耗时操作,并将操作结果输出。在主程序中,通过列表推导式将10个任务传递给worker1函数执行,然后使用time.perf_counter方法计算整个程序的耗时。由于在列表推导式中直接调用了worker1函数,并未创建任务,所以输出结果实际上是在主程序执行期间输出的,并没有体现并发执行的效果。

gevent.spawn() 方法

spawn() 方法有以下参数:

  • func:要在协程中运行的函数或方法

  • *args:传递给函数或方法的位置参数,以元组的形式传入

  • **kwargs:传递给函数或方法的关键字参数,以字典的形式传入

import time
import gevent


def worker(n):
    print('start1', n)
    gevent.sleep(1)  # 模拟耗时操作
    print('end1', n)


if __name__ == '__main__':
    start_time = time.perf_counter()
    tasks = [gevent.spawn(worker, i) for i in range(10)]
    end_time = time.perf_counter()
    gevent.joinall(tasks)  # 等待所有协程执行完成
    print(f'耗时:{end_time - start_time}')
    print('------------------结束执行程序------------------')

start1 0
start1 1
start1 2
start1 3
start1 4
start1 5
start1 6
start1 7
start1 8
start1 9
end1 0
end1 1
end1 2
end1 3
end1 4
end1 5
end1 6
end1 7
end1 8
end1 9
耗时:0.030676652
------------------结束执行程序------------------

Process finished with exit code 0

在这段代码中,使用gevent模块实现了并发执行10个协程任务。在主程序中,使用gevent.spawn方法创建10个协程任务,并将任务函数和参数传递给它。返回的结果是一个Greenlet对象的集合,表示协程任务的执行状态。

在任务函数worker中,使用gevent.sleep方法模拟了一个耗时操作。gevent.sleep方法是gevent模块提供的辅助函数,用于模拟异步非阻塞的等待。在等待期间,协程任务会被挂起,让出CPU资源,让其他协程任务或线程可以继续执行。当等待时间到达后,协程任务会再次运行,完成需要的操作。

在主程序中,调用gevent.joinall方法等待所有协程任务执行完成,然后使用time.perf_counter方法计算整个程序的耗时。输出结果中可以看到协程执行的效果和整个程序的耗时。

joinall() 方法:

  • 在 Gevent 中,gevent.joinall() 方法有多个可选参数,用于控制等待的行为。下面是 gevent.joinall() 的常用参数:

    • args=None:一个包含 Greenlet 对象的列表,表示要等待执行的 Greenlet 对象,默认值为 None。

    • timeout=None:等待所有 Greenlet 对象完成的超时时间,单位为秒,默认为 None,表示无限等待。

    • raise_error=False:如果设置为 True,在等待过程中如果有任何一个 Greenlet 抛出异常,就会立即停止等待,并在退出时重新抛出异常,默认为 False。

    • count=None:在等待所有 Greenlet 对象完成之前,最多允许同时执行的 Greenlet 数量,当调用 gevent.joinall() 时正在执行的 Greenlet 数量超过此值时,将会挂起新的 Greenlet 等待其余绿色线程空闲,默认值为 None,表示没有限制。

    • return_exceptions=False:如果设置为 True,不会抛出任何异常,从而保留所有 Greenlet 对象抛出的异常信息;如果设置为 False,任何 Greenlet 对象抛出的异常都会被捕获并作为异常重新抛出(缺省值为 False)。

    除此之外,还有一些其他可选的参数可以用于控制等待过程的细节和行为,例如 raise_error 参数可以用来控制是否在等待过程中立即停止并重新抛出异常,count 参数可以用来控制同时执行的 Greenlet 数量,等等。

greenlet_object.join() 方法

如果在调用 gevent.spawn() 方法时,将返回的 Greenlet 对象存储下来,那么可以使用 greenlet_object.join() 方法等待单个协程任务执行结束。该方法与 joinall() 方法的作用相同,只不过只能等待单个协程任务。

import gevent

def worker():
    print('start working...')
    gevent.sleep(1)
    print('work done.')

if __name__ == '__main__':
    task = gevent.spawn(worker)
    task.join()
    print('task done.')

需要注意的是,在使用 join() 方法时,要注意协程任务的返回值。如果协程任务抛出异常或被杀死,那么 join() 方法也会立即返回。因此,建议在使用 join() 方法时,先使用 ready() 方法判断协程任务是否已经完成。ready() 方法返回 True 表示协程任务已经结束,False 表示协程任务仍在运行。

import gevent


def worker():
    print('start working...')
    gevent.sleep(1)
    print('work done.')
    return 'result'


if __name__ == '__main__':
    task = gevent.spawn(worker)
    task.join()
    if task.ready():
        print('task done.')
        result = task.value
        print('result:', result)
    else:
        print('task not completed.')

# 输出
start working...
work done.
task done.
result: result

程序补丁:monkey.patch_all()

打补丁作⽤,只要有耗时的地⽅(⼤部分),⾃动切换任务,不局限于gevent.sleep()

注意点:只能在单线程中使用,不能在多线程中使用

import gevent.monkey
import requests

gevent.monkey.patch_all()


def fetch(url):
    response = requests.get(url)
    print(f'{url}: {response.status_code}')


if __name__ == '__main__':
    urls = ['https://www.baidu.com', 'https://www.bing.com']
    tasks = [gevent.spawn(fetch, url) for url in urls]
    gevent.joinall(tasks)

gevent.monkey.patch_all() 对所有的IO和耗时操作打补丁(将普通的耗时IO操作--->转换为异步耗时IO操作)。必须在全局作用域下添加

在这段代码中,使用了 requests 库发送 HTTP 请求,由于 requests 库中的请求方法是阻塞式的,因此在进行并发编程时会受到阻塞的影响。但是,在调用 monkey.patch_all() 函数之后,requests 库中的阻塞式 I/O 操作就会被自动替换为 gevent 的协程实现,从而变成了非阻塞的协程操作。

需要注意的是,monkey.patch_all() 函数只能在程序的入口处调用一次,如果在多个地方调用,可能会导致程序出现一些奇怪的问题。另外,由于该函数会替换标准库中的阻塞式 I/O 操作,因此在使用其他第三方库时,需要先查看它们是否与 gevent 兼容,避免出现不可预期的错误。

原生协程函数(异步编程)

协程函数的定义、创建和执行过程

import asyncio


# 1.协程函数定义
# 在定义函数时,使用async关键字之后,当前函数就不再是一个普通函数,不能像普通函数那样来调用
async def task1():
    for i in range(4):
        print(f'------------------正在执行任务1: {i}------------------')
    return 'hello'


# 2.创建协程对象
# 调用协程函数
c1 = task1()

# 3.执行单个协程
# python 3.7版本之前
# 获取事件循环
# get_event_loop方法的作用:获取一个事件循环对象,如果当前线程已经存在一个事件循环对象,则直接返回,如果不存在则创建一个新的事件循环对象
loop = asyncio.get_event_loop()

# 等待协程执行结束
#   run_until_complete方法的作用:执行协程函数,直到协程函数执行结束,仅接收一个参数,参数类型为协程对象
#   run_until_complete方法的返回值为协程函数的返回值
result1 = loop.run_until_complete(c1)
print(result1)

这段代码演示了一个基本的协程函数的定义、创建和执行过程。

  1. 在代码中定义了一个协程函数 task1(),由于使用了 async 关键字,task1() 变成了一个协程函数。

  2. 通过调用 task1() 函数来创建一个协程对象 c1。

  3. 获取事件循环对象 loop,并使用 loop.run_until_complete() 方法来执行协程对象 c1。这个方法会等待协程对象 c1 执行结束,并返回协程函数的返回值。

  4. 最后,输出协程函数的返回值。

什么是事件循环?简单来理解,就死一个检查任务是否执行完毕的死循环

需要注意的是,在 Python 3.7 版本之前,需要显式地获取事件循环对象,并使用 run_until_complete() 方法来执行协程函数。在 Python 3.7 版本及之后,可以直接使用 asyncio.run() 函数来执行协程函数,简化了协程的执行过程。另外,协程函数也可以通过 await 表达式来等待其他协程或异步操作的完成,进一步提高了程序的并发性能。

import asyncio


# 1.协程函数定义
# 在定义函数时,使用async关键字之后,当前函数就不再是一个普通函数,不能像普通函数那样来调用
async def task1():
    for i in range(4):
        print(f'------------------正在执行任务1: {i}------------------')
    return 'hello'


# 2.创建协程对象
# 调用协程函数
c1 = task1()

# 3.执行单个协程
# python 3.7版本之后
# 直接运行协程
result1 = asyncio.run(c1)
print(result1)
pass

原生协程函数实现多任务

下面这段代码使用 asyncio 模块实现了协程任务的执行和结果获取。具体流程如下:

  1. 定义两个协程函数 task1 和 task2,分别模拟耗时操作1和耗时操作2。

  2. 定义主协程函数 main,用于管理协程任务的执行和结果获取。

  3. 在主协程函数中,通过 asyncio.create_task() 函数将协程对象转换为任务对象,并将它们添加到事件循环中等待执行。

  4. 通过 asyncio.wait() 函数等待多个任务执行结束,并用遍历已完成任务集合的方式获取它们的执行结果。

  5. 通过 asyncio.run() 函数执行主协程函数,并进行耗时统计。

import asyncio
import time


# 定义协程函数 task1,用于模拟耗时操作1
async def task1():
    for i in range(4):
        print(f'------------------正在执行任务1: {i}------------------')
        # 异步耗时操作,使用 asyncio.sleep() 方法模拟
        await asyncio.sleep(1)
    return 'hello'


# 定义协程函数 task2,用于模拟耗时操作2
async def task2():
    for i in range(5):
        print(f'------------------正在执行任务2: {i}------------------')
        # 异步耗时操作,使用 asyncio.sleep() 方法模拟
        await asyncio.sleep(1)
    return 'world'


# 定义主协程函数 main,用于管理协程任务的执行和结果获取
async def main():
    """
    主协程入口函数
    :return:
    """
    # 1.创建协程对象
    c1 = task1()
    c2 = task2()

    # 2.将协程对象打包成任务
    t1 = asyncio.create_task(c1)
    t2 = asyncio.create_task(c2)

    # 3.等待任务执行结束
    # 方法一:依次等待单个任务执行结束
    # result1 = await t1
    # result2 = await t2
    # print(result1, result2)

    # 方法二:等待多个任务执行结束
    done, _ = await asyncio.wait([t1, t2])
    for task in done:
        print(task.result())  # 获取任务的执行结果

    # 方法三:同时等待多个任务执行结束,并获取它们的结果
    # result1, result2 = await asyncio.gather(t1, t2)
    # print(result1, result2)


if __name__ == '__main__':
    print('------------------开始执行程序------------------')
    start_time = time.perf_counter()
    asyncio.run(main())  # 执行主协程函数
    end_time = time.perf_counter()
    print(f'耗时:{end_time - start_time}')
    print('------------------结束执行程序------------------')

asyncio.create_task() 函数

asyncio.create_task() 函数是 asyncio 模块中的一个函数,用于将协程对象转为任务对象,并将其添加到事件循环中等待执行。它的使用语法如下:

task = asyncio.create_task(coroutine)

其中,coroutine 表示一个协程对象,task 表示生成的任务对象。当 coroutine 对象被转换为任务对象后,可以使用 await task 等待任务的完成,并获取其执行结果。

使用 asyncio.create_task() 函数创建任务对象时,该函数会将任务对象和事件循环相关联,即该任务对象会在事件循环中被调度执行。和 asyncio.ensure_future() 函数类似, asyncio.create_task() 函数也是一种常用的启动协程的方式。

asyncio.wait() 函数

在 Python 的 asyncio 模块中,有三种方法可以等待协程任务的执行,并获取它们的执行结果:

  1. 依次等待单个任务执行结束,并获取其结果

这种方法通过 await 关键字来等待任务执行结束,直到它完成并返回结果。在等待期间,事件循环可以处理其他协程的调度请求,从而提高程序的并发性能。

示例代码如下:

# 方法一:依次等待单个任务执行结束
result1 = await t1
result2 = await t2
print(result1, result2)
  1. 同时等待多个任务执行结束,并获取它们的结果

这种方法通过 asyncio.wait() 函数同时等待多个任务执行结束,返回一个包含完成的任务和未完成的任务的两个集合。通过遍历完成的任务集合获取它们的结果。

示例代码如下:

# 方法二:等待多个任务执行结束
done, _ = await asyncio.wait([t1, t2])
# 遍历已完成的任务
for task in done:
    print(task.result())  # 获取任务的执行结果

await asyncio.wait() 将会返回两个著名的集合:已完成(done)和已挂起(pending)。这个 await 表达式只有在所有的任务都完成后才会返回结果。同时,在这个例子中我们只关心已完成的任务集合,因此使用 _ 变量存储了已挂起任务的集合。

  1. 同时等待多个任务执行结束,并获取它们的结果

这种方法通过 asyncio.gather() 函数同时等待多个任务执行结束,并将它们的执行结果合并成一个列表或元组进行返回。

示例代码如下:

# 方法三:同时等待多个任务执行结束,并获取它们的结果
result1, result2 = await asyncio.gather(t1, t2)
print(result1, result2)

除了以上三种方法外,还可以使用 asyncio.ensure_future() 函数或 asyncio.create_task() 函数将协程对象转化成任务对象,并将它们添加到事件循环中等待执行。一旦任务被添加到事件循环中,就会由事件循环自动调度和执行,无需手动等待任务的执行。

总之,在使用 asyncio 模块编写异步程序时,选择合适的等待方式来管理协程任务的执行及结果获取是非常重要的。

await asyncio.sleep() 函数

await asyncio.sleep() 是 asyncio 模块中的一个异步耗时操作函数,用于模拟一个协程中的耗时操作。它可以让当前协程暂停指定的一段时间(单位是秒),等待其他任务的执行或等待 I/O 操作完成。在这个代码片段的例子中,await asyncio.sleep(1) 表示让协程休眠 1 秒钟。

当在协程中调用 await asyncio.sleep() 函数时,在该协程的执行过程中会发生以下两件事情:

  1. 会将当前的协程挂起,并放回事件循环中。

  2. 等待指定的时间(这里是 1 秒钟)后,事件循环会重新调度该协程并继续执行它。

因此,使用 await asyncio.sleep() 可以让我们在协程中模拟出一个耗时的操作,同时也不会阻塞事件循环。

原生协程实现高并发请求

# 1.安装
# pip install aiohttp
import asyncio
import time

import aiohttp  # requests


async def send_http_request(url: str) -> str:
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            print(f"Status: {response.status}")
            print(f"Content-Type: {response.headers['content-type']}")
            return await response.text()


# 2.创建入口协程
async def main():
    # 定义需要请求的url
    urls = ["https://test.baidu.com/" for i in range(50)]
    tasks = [asyncio.create_task(send_http_request(url)) for url in urls]
    return await asyncio.gather(*tasks)


# 4.启动协程
if __name__ == '__main__':
    print('------------------开始执行程序------------------')
    start_time = time.perf_counter()
    res = asyncio.run(main())
    end_time = time.perf_counter()
    print(len(res))
    print(f'耗时:{end_time - start_time}')
    print('------------------结束执行程序------------------')

# 输出
------------------开始执行程序------------------
Status: 200
Content-Type: text/html; charset=UTF-8
Status: 200
Content-Type: text/html; charset=UTF-8
.
.
.
Status: 200
Content-Type: text/html; charset=UTF-8
50
耗时:1.678242478
------------------结束执行程序------------------

Process finished with exit code 0

在主程序中,我们使用 asyncio.run() 方法来启动入口协程 main,并在控制台输出所有请求的响应状态码和内容类型。由于是异步执行,因此程序的执行时间也会显著缩短。

补充说明:使用 aiohttp 库发送一个 HTTP GET 请求可以分为以下三步:

  1. 创建一个 session 对象。

  2. 使用 session 对象发送请求并获取响应。

  3. 处理响应数据。

import aiohttp
import asyncio

async def get_data(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as resp:
            data = await resp.text()
            # 处理响应数据
            print(data)

asyncio.run(get_data('https://www.example.com'))

在这个例子中,我们首先定义了一个 get_data 函数,用于发送 HTTP GET 请求。在这个函数中,我们使用 aiohttp.ClientSession() 创建一个 session 对象,然后通过该对象发送 GET 请求,并使用 await resp.text() 异步方法获取响应内容。最后,我们可以在这个函数中处理响应数据,比如打印出来。

最后,我们使用 asyncio.run() 方法运行这个函数,并传入目标 URL 进行测试。

进程、线程、协程对比

  • 进程:是资源分配的单位,进程资源是独立的,每次切换的资源很大,耗时更长

  • 线程:是操作系统调度的单位,线程的切换涉及到用户空间和内核空间的切换,需要操作系统调度,还有线程私有的栈和寄存器等效率一般。

  • 协程存在于线程之中,本质上就是一个特定情况下可以切换的特殊的函数。协程切换任务,只设及到CUP上下文的切换,资源很小,耗时几乎可以忽略不计

  • 多进程、多线程根据cpu核数不一样可能是并行的,但是协程是在一个线程中所以是并发

  • 注意点:python中的线程由于GIL锁的的存在,并不能够实现并行,要充分利用多核CUP还是需要使用进程来做。

什么场景适合用进程

计算密集型的任务(比如:大规模的数据计算和处理)

什么场景适合用线程?

IO密集型的任务(比如:数据读取和写入多,网络请求多的任务)

什么场景适合用协程?

IO密集型项目切要求高并发( 比如:用locust 搞压测里面就是用的协程) ,实际上真实项目中对应高并发的业务并不会选择使用python语言。