unittest二次开发-重写TestRunner方法

unittest用例执行的流程

这里我们复习一下unittest的使用,以及结合多线程重写一下TestRunner方法

unittest 中,测试用例的执行流程可以分为以下主要步骤:

  1. 通过 TestLoader 对象加载测试用例模块,并将其转化为可执行的测试用例对象。

  2. 将多个测试用例对象添加到 TestSuite 测试套件中。TestSuiteunittest 中的一个容器,它可以存储多个测试用例对象,并且支持嵌套使用。可以通过 addTest()addTests() 方法将测试用例对象添加到 TestSuite 中。

  3. 创建 TestResult 对象来存储测试结果。TestResultunittest 中的一个类,它可以记录测试过程中的各种信息,包括测试用例的执行情况、总体统计信息等。

  4. 执行测试套件中的所有测试用例对象,并将测试结果存储到 TestResult 对象中。测试用例的执行顺序可以按照 TestSuite 中添加的顺序或按照测试方法名称的字母表顺序执行。

  5. 使用 TestRunner 对测试结果进行处理。TestRunner 是一个抽象类,它定义了处理测试结果的基本接口。可以通过继承 TextTestRunner 类或 HTMLTestRunner 类等具体子类来实现测试结果的输出和报告生成等功能。

from testcases.test_demo1 import TestDemo11

# 1.创建测试对象
case1 = TestDemo11('test_01_login')

# 2.创建测试结果对象(收集测试结果)
result_obj = unittest.TestResult()

# 3.使用测试用例对象的run方法执行用例
# result_obj1 = case1.run(result_obj)
result_obj1 = case1(result_obj)

# 4.获取测试结果
print(result_obj)

这里是执行单个测试方法的案例,直接调用测试用例对象的 run 方法。

result_obj1 = case1.run(result_obj)result_obj1 = case1(result_obj) 这两行代码的作用是一样的,都是执行测试用例对象 case1 的测试方法并将测试结果传递给测试结果对象 result_obj,最后将更新后的测试结果存储在 result_obj1 变量中。

unittest 模块中,测试用例对象 TestCase 提供了 run 方法用于执行测试方法,并返回一个 TestResult 对象,表示该测试方法的测试结果。因此,两种写法都是调用 case1 对象的 run 方法执行测试方法,将测试结果传递给 result_obj 对象,最后将更新后的测试结果存储在 result_obj1 变量中。

TestRunner的工作流程

  1. 遍历测试类中的用例,调用用例对象的run方法

  2. 将用例执行的结构保存到TestResult中

测试套件的结构

重写运行器

import time
import unittest
from concurrent.futures.thread import ThreadPoolExecutor

start_time = time.perf_counter()

# 测试套件对象:实际上就是存放用例的容器
suite: unittest.TestSuite = unittest.defaultTestLoader.discover('testcases')
suite.countTestCases()

# 创建测试结果对象,用于记录测试结果
result_obj = unittest.TestResult()

# 创建测试用例对象列表
testcase_obj_lst = []

for test_module_obj in suite:
    test_module_obj: unittest.TestSuite
    print(f'测试模块:{test_module_obj}')
    for test_class_obj in test_module_obj:
        test_class_obj: unittest.TestSuite
        print(f'测试类:{test_class_obj}')
        for testcase_obj in test_class_obj:
            testcase_obj: unittest.TestCase
            print(f'测试用例:{testcase_obj}')
            # testcase_obj.run(result_obj)
            # testcase_obj(result_obj)
            testcase_obj_lst.append(testcase_obj)

        print('\n')

    print('\n')

# print(result_obj)
print(testcase_obj_lst)


def run_testcase(testcase_obj: unittest.TestCase):
    testcase_obj.run(result_obj)


with ThreadPoolExecutor() as tp:
    tp.map(run_testcase, testcase_obj_lst)

end_time = time.perf_counter()
print(result_obj)
print(f'TestSuite 中包含 {suite.countTestCases()}个测试用例耗时:{end_time-start_time}')

这段代码的主要作用是将测试套件对象中的所有测试用例对象存储到一个列表中,并且可以通过多线程的方式执行这些测试用例对象。

首先,测试套件对象 suite 是一个容器,里面存放了所有要执行的测试用例对象,但是它只是一个容器,不能直接执行其中的测试用例对象。因此,需要遍历测试套件对象,获取其中的所有测试模块、测试类和测试用例,并将这些测试用例对象存储到一个列表 testcase_obj_lst 中。这样,就可以通过遍历这个列表来执行这些测试用例对象。

具体而言,三层循环分别是:

  1. 遍历测试套件对象 suite 中的所有测试模块 test_module_obj,并打印出每个测试模块的名称。

  2. 遍历每个测试模块 test_module_obj 中的所有测试类 test_class_obj,并打印出每个测试类的名称。

  3. 遍历每个测试类 test_class_obj 中的所有测试用例对象 testcase_obj,并打印出每个测试用例对象的名称,并将其存储到 testcase_obj_lst 列表中。

最后,使用多线程的方式并行地执行 testcase_obj_lst 中的所有测试用例对象。由于 result_obj 是一个共享对象,因此需要在多线程中传递同一个 result_obj 对象。

#输出:

测试模块:<unittest.suite.TestSuite tests=[<unittest.suite.TestSuite tests=[<test_demo1.TestDemo11 testMethod=test_01_login>, <test_demo1.TestDemo11 testMethod=test_02_register>, <test_demo1.TestDemo11 testMethod=test_03_main>, <test_demo1.TestDemo11 testMethod=test_04_user>]>, <unittest.suite.TestSuite tests=[<test_demo1.TestDemo22 testMethod=test_11_login>, <test_demo1.TestDemo22 testMethod=test_12_register>, <test_demo1.TestDemo22 testMethod=test_13_main>]>]>
测试类:<unittest.suite.TestSuite tests=[<test_demo1.TestDemo11 testMethod=test_01_login>, <test_demo1.TestDemo11 testMethod=test_02_register>, <test_demo1.TestDemo11 testMethod=test_03_main>, <test_demo1.TestDemo11 testMethod=test_04_user>]>
测试用例:test_01_login (test_demo1.TestDemo11)
测试用例:test_02_register (test_demo1.TestDemo11)
测试用例:test_03_main (test_demo1.TestDemo11)
测试用例:test_04_user (test_demo1.TestDemo11)


测试类:<unittest.suite.TestSuite tests=[<test_demo1.TestDemo22 testMethod=test_11_login>, <test_demo1.TestDemo22 testMethod=test_12_register>, <test_demo1.TestDemo22 testMethod=test_13_main>]>
测试用例:test_11_login (test_demo1.TestDemo22)
测试用例:test_12_register (test_demo1.TestDemo22)
测试用例:test_13_main (test_demo1.TestDemo22)




测试模块:<unittest.suite.TestSuite tests=[<unittest.suite.TestSuite tests=[<test_demo2.TestDemo33 testMethod=test_31_login>, <test_demo2.TestDemo33 testMethod=test_32_register>, <test_demo2.TestDemo33 testMethod=test_33_main>]>]>
测试类:<unittest.suite.TestSuite tests=[<test_demo2.TestDemo33 testMethod=test_31_login>, <test_demo2.TestDemo33 testMethod=test_32_register>, <test_demo2.TestDemo33 testMethod=test_33_main>]>
测试用例:test_31_login (test_demo2.TestDemo33)
测试用例:test_32_register (test_demo2.TestDemo33)
测试用例:test_33_main (test_demo2.TestDemo33)




[<test_demo1.TestDemo11 testMethod=test_01_login>, <test_demo1.TestDemo11 testMethod=test_02_register>, <test_demo1.TestDemo11 testMethod=test_03_main>, <test_demo1.TestDemo11 testMethod=test_04_user>, <test_demo1.TestDemo22 testMethod=test_11_login>, <test_demo1.TestDemo22 testMethod=test_12_register>, <test_demo1.TestDemo22 testMethod=test_13_main>, <test_demo2.TestDemo33 testMethod=test_31_login>, <test_demo2.TestDemo33 testMethod=test_32_register>, <test_demo2.TestDemo33 testMethod=test_33_main>]
-----test_01_login----------test_03_main-----
-----test_02_register-----

-----test_11_login-----
-----test_13_main-----
-----test_04_user-----
-----test_33_main-----
-----test_32_register----------test_31_login-----

<unittest.result.TestResult run=10 errors=1 failures=1>
TestSuite 中包含 10个测试用例耗时:1.011513075

Process finished with exit code 0

优化封装运行器

import time
import unittest
from concurrent.futures.thread import ThreadPoolExecutor
from typing import Union, AnyStr, List


class ManualTestRunner(object):
    resultclass = unittest.TestResult

    def __init__(self, path: AnyStr = '.', concurrent=1, task_level: AnyStr = 'case', resultclass=None):
        self.concurrent = concurrent
        if resultclass is not None:
            self.resultclass = resultclass

        self.task_level = task_level
        self.suite: unittest.TestSuite = unittest.defaultTestLoader.discover(path)

    def parse_suite_task_list(self) -> List:
        task_lst = []
        if self.task_level == 'case':   # 存放的是测试对象 TestCase子类
            for test_module_obj in self.suite:
                for test_class_obj in test_module_obj:
                    for testcase_obj in test_class_obj:
                        task_lst.append(testcase_obj)

        elif self.task_level == 'cls':  # 存放的是测试类所在的套件对象
            for test_module_obj in self.suite:
                for test_class_obj in test_module_obj:
                    task_lst.append(test_class_obj)

        elif self.task_level == 'module':  # 存放的是测试模块所在的套件对象
            for test_module_obj in self.suite:
                task_lst.append(test_module_obj)

        return task_lst

    def run(self) -> unittest.TestResult:
        result_obj = self.resultclass()
        tasks_lst = self.parse_suite_task_list()
        if len(tasks_lst) == 0:
            raise ValueError('当前没有需要执行的用例')

        with ThreadPoolExecutor(max_workers=self.concurrent) as tp:
            for tsk in tasks_lst:
                tsk: Union[unittest.TestCase, unittest.TestSuite]
                tp.submit(tsk.run, result_obj)
        return result_obj


if __name__ == '__main__':
    start_time = time.perf_counter()
    runner = ManualTestRunner('testcases', concurrent=10)
    result = runner.run()
    end_time = time.perf_counter()
    print(f'耗时:{end_time - start_time}')
    print(result)

这段代码中的 tsk: Union[unittest.TestCase, unittest.TestSuite] 是类型注解,表示 tsk 变量可以是 unittest.TestCaseunittest.TestSuite 类型的实例。

这里ManualTestRunner 类有以下几个方法:

  • __init__(self, path: AnyStr = '.', concurrent=1, task_level: AnyStr = 'case', resultclass=None):初始化函数,接受四个参数,分别是测试用例所在的路径、并发数、测试任务的粒度和测试结果类。默认情况下,测试用例路径设置为当前目录,并发数设置为 1,测试任务的粒度设置为 'case',即以测试用例为单位进行测试。测试结果类默认为 unittest.TestResult

  • parse_suite_task_list(self) -> List:解析测试套件对象,并将测试任务存放到列表中。具体实现方式根据测试任务的粒度不同而有所不同。对于测试用例粒度的,遍历每个测试模块、测试类和测试用例,将测试用例对象存放到列表中;对于测试类粒度的,遍历每个测试模块和测试类,将测试类对象存放到列表中;对于测试模块粒度的,遍历每个测试模块,将测试模块对象存放到列表中。最后返回存放测试任务的列表。

  • run(self) -> unittest.TestResult:执行单元测试并返回测试结果。首先创建一个测试结果类的实例,然后解析测试套件对象并将测试任务存放到列表中。如果没有测试任务需要执行则抛出异常。通过 ThreadPoolExecutor 对象并发执行测试任务,每个测试任务调用 run() 方法进行运行。最后返回测试结果。

这段代码对之前的代码进行了优化,主要包括:

  1. 将测试运行相关的代码封装到了一个 ManualTestRunner 类中,让代码更具有可读性和可维护性。

  2. 新增了一个 parse_suite_task_list() 方法,用于解析测试套件对象中包含的测试用例、测试类或测试模块,并将其存放到一个列表中。在并发执行测试用例时,可以直接从列表中取出相应的测试任务进行执行,避免了多次遍历测试套件对象的开销。

  3. 可以通过 concurrent 参数设置并发执行的线程数,提高测试运行效率。同时,在执行测试用例时,也使用了 ThreadPoolExecutor 对象,简化了多线程编程的复杂度。

  4. 可以通过 task_level 参数设置测试任务的粒度,支持对测试用例、测试类和测试模块进行灵活的控制。可以根据实际情况灵活选择测试任务的粒度,提高测试运行的灵活性和可扩展性。

  5. 将代码模块化,将测试运行代码和程序入口分开,方便单元测试和模块重用。

重写测试结果收集器对象

import time
import unittest
import platform
import traceback
from concurrent.futures.thread import ThreadPoolExecutor
from typing import Union, AnyStr, List


class ManualTestResult(unittest.TestResult):
    def __init__(self, case_sum):
        super().__init__()
        # 定义测试用例总数属性
        self.cast_total = case_sum
        # 定义用于存放执行成功的用例列表
        self.success = []
        self.test_start_time = None
        self.test_end_time = None

    def startTestRun(self) -> None:
        """
        是在所有测试用例执行之前会自动调用
        :return:
        """
        print("*******************************开始执行测试*******************************")
        print(f"当前操作系统版本:{platform.platform()}")
        print(f"python版本:{platform.python_version()}")
        print(f"用例总数为:{self.cast_total}")
        self.test_start_time = time.perf_counter()

    def stopTestRun(self) -> None:
        """
        所有测试用例执行完成之后会自动调用
        :return:
        """
        self.test_end_time = time.perf_counter()
        print(f'执行的用例数:{self.testsRun}')
        print(f'执行通过的用例数:{len(self.success)}')
        print(f'执行失败的用例数:{len(self.failures)}')
        print(f'执行错误的用例数:{len(self.errors)}')
        print(f'执行总耗时:{self.test_end_time-self.test_start_time}')

    def addSuccess(self, test: unittest.case.TestCase) -> None:
        """
        用例执行通过会自动调用次方法
        :param test:
        :return:
        """
        self.success.append(test)
        print(f'{test}---------->【执行通过】')

    def addError(self, test: unittest.case.TestCase, err) -> None:
        super().addError(test, err)
        print(f'{test}---------->【执行代码错误】')
        tb_lst = traceback.format_exception(*err)
        print(''.join(tb_lst))

    def addFailure(self, test: unittest.case.TestCase, err) -> None:
        super().addFailure(test, err)
        print(f'{test}---------->【断言失败】')
        tb_lst = traceback.format_exception(*err)
        print(''.join(tb_lst))

    def addSkip(self, test: unittest.case.TestCase, reason: str) -> None:
        super().addSkip(test, reason)
        print(f'{test}---------->【跳过执行】')
        print(f'跳过执行的原因:{reason}')


class ManualTestRunner(object):

    def __init__(self, path: AnyStr = '.', concurrent=1, task_level: AnyStr = 'case', resultclass=unittest.TestResult):
        self.concurrent = concurrent
        if resultclass is not None:
            self.resultclass = resultclass
        self.task_level = task_level
        self.suite: unittest.TestSuite = unittest.defaultTestLoader.discover(path)
        if issubclass(resultclass, ManualTestResult):
            self.result_obj = self.resultclass(self.suite.countTestCases())
        else:
            self.result_obj = self.resultclass()

    def parse_suite_task_list(self) -> List:
        task_lst = []
        if self.task_level == 'case':  # 存放的是测试对象 TestCase子类
            for test_module_obj in self.suite:
                for test_class_obj in test_module_obj:
                    for testcase_obj in test_class_obj:
                        task_lst.append(testcase_obj)

        elif self.task_level == 'cls':  # 存放的是测试类所在的套件对象
            for test_module_obj in self.suite:
                for test_class_obj in test_module_obj:
                    task_lst.append(test_class_obj)

        elif self.task_level == 'module':  # 存放的是测试模块所在的套件对象
            for test_module_obj in self.suite:
                task_lst.append(test_module_obj)

        return task_lst

    def run(self) -> unittest.TestResult:
        tasks_lst = self.parse_suite_task_list()
        if len(tasks_lst) == 0:
            raise ValueError('当前没有需要执行的用例')

        with ThreadPoolExecutor(max_workers=self.concurrent) as tp:
            for tsk in tasks_lst:
                tsk: Union[unittest.TestCase, unittest.TestSuite]
                tp.submit(tsk.run, self.result_obj)
        return self.result_obj


if __name__ == '__main__':
    start_time = time.perf_counter()
    # runner = ManualTestRunner('testcases', concurrent=10, task_level='cls')
    runner = ManualTestRunner('testcases', concurrent=10, resultclass=ManualTestResult, task_level='cls')
    result = runner.run()
    end_time = time.perf_counter()
    print(f'耗时:{end_time - start_time}')
    print(result)

重点ManualTestResult 类自定义了一个测试结果类,继承了 unittest.TestResult 类,并对其进行了定制化。主要作用是用于记录测试用例的执行状态,并输出测试结果的详细信息。

具体来讲,ManualTestResult 类定义的属性和方法如下:

  1. cast_total:测试用例总数;

  2. success:用于存放执行成功的测试用例列表;

  3. test_start_time:测试开始时间;

  4. test_end_time:测试结束时间;

  5. startTestRun() 方法:在所有测试用例执行之前会自动调用,用于输出测试信息;

  6. stopTestRun() 方法:所有测试用例执行完成之后会自动调用,用于输出执行结果和耗时统计;

  7. addSuccess() 方法:用例执行通过会自动调用此方法,用于记录执行成功的用例;

  8. addError() 方法:用例执行发生代码错误(即执行过程中出现了异常)会自动调用此方法,用于记录执行错误的用例并输出异常信息;

  9. addFailure() 方法:用例执行断言失败会自动调用此方法,用于记录执行失败的用例并输出错误信息;

  10. addSkip() 方法:用例执行跳过会自动调用此方法,用于记录跳过的用例并输出跳过原因;

扩展一下:

写代码的过程中经常需要去捕获异常,当使用传统的try...except...时候,打印出来的信息过于简单,没有指出相应代码的具体位置,不利于定位问题,那么使用traceback可能会更好点。

Python程序的traceback信息均来源于一个叫做traceback object的对象,而这个traceback object通常是通过函数sys.exc_info()来获取的

traceback 常用的几个函数

1、traceback.print_exc():是对异常栈输出 

2、traceback.format_exc():是把异常栈以字符串的形式返回,print(traceback.format_exc()) 就相当于traceback.print_exc() 

3、print_exception():traceback.print_exc()实现方式就是traceback.print_exception(sys.exc_info())

如果不知道用哪个,就用traceback.print_exc()好了

addFailure源码中表示当发生错误时调用此函数。'err'是一个由sys.exc_info()返回的值的元组

    def addFailure(self, test, err):
        """Called when an error has occurred. 'err' is a tuple of values as
        returned by sys.exc_info()."""
        self.failures.append((test, self._exc_info_to_string(err, test)))
        self._mirrorOutput = True

Traceback 示例演示

import sys
import traceback


def func2():
    return 1 / 0


def func1():
    return func2()


try:
    func1()
except Exception as e:
    exc_type, exc_value, exc_traceback = sys.exc_info()
    traceback.print_tb(exc_traceback)

# 输出
""" 
  File "test.py", line 14, in <module>
    func1()
  File "test.py", line 10, in func1
    return func2()
  File "test.py", line 6, in func2
    return 1 / 0
"""
# 若:
# try:
#     func1()
# except Exception as e:
#     print(e) # 只输出division by zero

最后注意这里:

  if issubclass(resultclass, ManualTestResult):
      self.result_obj = self.resultclass(self.suite.countTestCases())
  else:
      self.result_obj = self.resultclass()

这部分代码的主要作用是创建测试结果对象 result_obj。其中,resultclass 参数指定了测试结果的类,如果该类是 ManualTestResult 类或 ManualTestResult 类的子类,则程序会根据测试用例总数创建一个 ManualTestResult 对象;否则,会根据 resultclass 创建一个默认的测试结果对象。

需要注意的是,ManualTestRunner 类中的 run() 方法是通过调用 ThreadPoolExecutor 实现多线程执行测试用例的,而每个执行线程都会访问同一个测试结果对象。因此,为了防止不同测试线程之间的数据冲突,需要创建多个独立的测试结果对象。在这里,我们通过判断 resultclass 是否为 ManualTestResult 类或其子类来确定是否需要按照测试用例总数创建 ManualTestResult 对象。