一、配置化补充

用例的url是写死的,如果出现变动修改起来就不够优雅,解决方案,配置化

setting.py 中添加接口地址
# 项目主机地址
PROJECT_HOST = 'http://api.xxxx.com/xxxxx'
# 接口地址
INTERFACES = {
    'register': PROJECT_HOST + '/member/register'
}

将excel 表中的url字段内容改为register

# 发送请求
res = send_http_request(url=setting.INTERFACES[item['url']], method=item['method'], **request_data)

二、数据库校验

1. python操作数据库

pymysql

PyMSQL是一个纯的python的MySQL客户端。

pip install PyMySQL

使用步骤

  1. 创建连接(修高速公路)
  2. 创建游标(运输车)
  3. 执行sql(提货单)
  4. 获取结果(卸货)
  5. 关闭游标(车退掉)
  6. 关闭连接(路也断掉)
import pymysql
from pymysql.cursors import DictCursor
# 1.创建连接(修高速公路)
conn = pymysql.connect(
    host='api.xxxx.com',
    user='fu***e',
    password='1*3***',
    port=3306,
    db='fu****',
    charset='utf8'
)
# 2. 创建游标(运输车)
# cursor = conn.cursor()  # 返回元组
cursor = conn.cursor(DictCursor)  # 返回字典
# 3. 执行sql(提货单)
sql = 'select id, reg_name, mobile_phone from member limit 10'
res = cursor.execute(sql)
print(res)
# 4. 获取结果(卸货)
res1 = cursor.fetchone() # 一次卸载一条
res2 = cursor.fetchmany(3) # 一次卸载指定的条数
res3 = cursor.fetchall() # 一次获取所有
# 5. 关闭游标(车退掉)
cursor.close()
# 6. 关闭连接(路也断掉)
conn.close()

10
res1:
{'id': 1, 'reg_name': '檬檬', 'mobile_phone': '13453457687'}
res2:
[{'id': 2, 'reg_name': '青柠檬', 'mobile_phone': '13622120322'},
 {'id': 3, 'reg_name': '小傻瓜', 'mobile_phone': '15500000000'},
 {'id': 4, 'reg_name': '测试开发小可爱', 'mobile_phone': '15088314689'}]
res3:
[{'id': 5, 'reg_name': '小傻瓜', 'mobile_phone': '13574660252'},
 {'id': 6, 'reg_name': '小傻瓜', 'mobile_phone': '18717906533'},
 {'id': 7, 'reg_name': '圆圆', 'mobile_phone': '18235075293'},
 {'id': 8, 'reg_name': '小傻瓜', 'mobile_phone': '15209028515'},
 {'id': 9, 'reg_name': '小傻瓜', 'mobile_phone': '13800001001'},
 {'id': 10, 'reg_name': '小傻瓜', 'mobile_phone': '15100002222'}]

更新数据

import pymysql
db_config = {
    'host': 'api.xxxx.com',
    'user': 'fu***e',
    'password': '1*3**',
    'port': 3306,
    'db': 'fu****',
    'charset': 'utf8'
}
conn = pymysql.connect(**db_config)
try:
    with conn.cursor() as cursor: # 上下文管理--自动关闭游标
        sql1 = 'UPDATE account set amount=amount - 100 where username="A"'
        sql2 = 'UPDATE account set amount=amount + 100 where username="B"'
        cursor.execute(sql1)
        cursor.execute(sql2)
        # pymysql默认开启事务
        conn.commit()
except Exception as e:
    # 回滚
    conn.rollback()
finally:
    conn.close()

2. 数据查询类封装

1. 功能分析

  1. 可以连接不同sql数据库
  2. 查一条数据,多条数据
  3. 可以获取不同格式的数据

2. 封装成数据库查询类

封装思路:

  1. 数据库查询模块有多个功能,且需要复用,所以封装成类

  2. 在构造方法中创建连接

  3. 创建对象方法实现各种查询

    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    # @Time    : 2021/11/28 16:33
    # @Author  : shisuiyi
    # @File    : db_handler.py
    # @Software: win10 Tensorflow1.13.1 python3.9
    import pymysql
    from pymysql.cursors import DictCursor
    
    
    class SQLdbHandler:
        """
        sql数据库查询类
        """
    
        def __init__(self, db_config: dict):
            # 创建连接
            # 根据不同的数据库,创建不同的连接
            engine = db_config.pop('engine', 'mysql')
            # Python 字典 pop() 方法删除字典给定键 key 及对应的值,返回值为被删除的值。key 值必须给出。 否则,返回 default 值。
            if engine.lower() == 'mysql':
                self.conn = pymysql.connect(**db_config)
            elif engine.lower() == 'oracle':
                # 扩展
                pass
    
        def __execute(self, sql, action, res_type='t', *args):
            """
            执行sql
            :param sql:
            :param action: 字符串,指定执行cursor对应的方法
            :param res_type:
            :param args: 其他参数,比如当执行fetchmany传入的size
            :return:
            """
            if res_type == 't':
                cursor = self.conn.cursor()
            else:
                cursor = self.conn.cursor(DictCursor)
            try:
                cursor.execute(sql)
                return getattr(cursor, action)(*args)
            except Exception as e:
                raise e
            finally:
                cursor.close()
    
        def get_one(self, sql, res_type='t'):
            """
            获取一条数据
            :param sql:
            :param res_type: 返回数据的类型,默认为t表示以元组返回,'d'表示以字典的形式返回
            :return: 元组/字典
            """
            # 数据库若断开即重连
            self.reConnect()
            return self.__execute(sql, 'fetchone', res_type)
    
        def get_many(self, sql, size, res_type='t'):
            # 数据库若断开即重连
            self.reConnect()
            return self.__execute(sql, 'fetchmany', res_type, size)
    
        def get_all(self, sql, res_type='t'):
            # 数据库若断开即重连
            self.reConnect()
            return self.__execute(sql, 'fetchall', res_type)
    
        def exist(self, sql):
            if self.get_one(sql):
                return True
            else:
                return False
    
        def reConnect(self):
            """
            重连机制
            :@return
            """
            try:
                self.conn.ping()
            except:
                self.conn()
    
        def __del__(self):
            """
            对象销毁的时候自动会被调用
            :return:
            """
            self.conn.close()
    
    
    if __name__ == '__main__':
        db_config = {
            'engine': 'mysql',  # 指定mysql数据
            'host': 'api.xxxx.com',
            'user': 'fu**e',
            'password': '1*3***',
            'port': 3306,
            'db': 'fu****',
            'charset': 'utf8'
        }
        db = SQLdbHandler(db_config)
        sql = 'select id, reg_name, mobile_phone from member limit 10'
        # res = db.get_one(sql)
        res = db.get_many(sql, size=5)
        print(res)
    
    

3. 应用到项目中

3.1 数据库配置

setting.py中添加# 数据库配置:DB_CONFIG

DB_CONFIG = {
        'engine': 'mysql',  # 指定mysql数据
        'host': 'api.xxxx.com',
        'user': 'fu***e',
        'password': '1*3***',
        'port': 3306,
        'db': 'fu****',
        'charset': 'utf8'
    }

3.2 导入

在一个项目中会查询多次数据库,如果每次都要实例化一个数据库查询对象,也即是,每次都要重新连接,显然是比较浪费资源。当前这种场景下需要用到单例模式,也即是在一个脚本的生命周期内,某个类的实例始终只有一个。

因为python的模块就是天然的单例模式,所有的模块有且只会被导入一次。

在common 目录下的__init__模块创建数据库查询对象实现单例模式

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time    : 2021/11/13 10:16
# @Author  : shisuiyi
# @File    : __init__.py.py
# @Software: win10 Tensorflow1.13.1 python3.9
from common.log_handler import get_logger
from common.read_excel_tool import get_data_from_excel
import setting
from common.make_requests import send_http_request
from common.db_handler import SQLdbHandler

logger = get_logger(**setting.LOG_CONFIG)  # 在这里创建日志器
db = SQLdbHandler(db_config=setting.DB_CONFIG)  # 这里创建数据库查询对象实现单例模式

"""
在一个项目中会查询多次数据库,如果每次都要实例化一个数据库查询对象,
也即是,每次都要重新连接,显然是比较浪费资源。当前这种场景下需要用到单例模式,也即是在一个脚本的生命周期内,
某个类的实例始终只有一个。
因为python的模块就是天然的单例模式,所有的模块有且只会被导入一次。
"""

3.3 修改用例数据

在用例中添加一个sql字段,里面编写需要校验的sql语句

3.4 用例代码编写

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time    : 2021/11/28 13:02
# @Author  : shisuiyi
# @File    : test_register.py
# @Software: win10 Tensorflow1.13.1 python3.9
import json
import unittest
from unittestreport import ddt, list_data

import setting
from common import get_data_from_excel
from common import logger
from common import send_http_request
from common import db

# 读取测试用例数据
cases = get_data_from_excel(setting.TEST_DATA_FILE)


@ddt
class RegisterTestCase(unittest.TestCase):

    @classmethod
    def setUpClass(cls) -> None:
        logger.info('===========注册接口开始测试===========')

    @classmethod
    def tearDownClass(cls) -> None:
        logger.info('===========注册接口结束测试===========')

    @list_data(cases)
    def test_register(self, item):
        logger.info('>>>>>>>用例{}开始执行>>>>>>>>'.format(item['title']))
        # 1. 处理测试数据
        # 把请求参数处理好,request_data是一个json字符串,怎么通过封装好的发送请求的函数发送
        # expect_data也是一个json字符串,也需要loads成python对象-字典
        request_data = json.loads(item['request'])
        expect_data = json.loads(item['expect_data'])
        # 2. 测试步骤
        # 发送请求
        res = send_http_request(url=setting.INTERFACES[item['url']], method=item['method'], **request_data)
        print(res.json())
        # 3. 断言
        # 3.1 断言响应状态码
        try:
            self.assertEqual(item['status_code'], res.status_code)

        except AssertionError as e:  # AssertionError断言异常
            logger.warning('<<<<<<<<<用例{}响应异常<<<<<<<'.format(item['title']))
            raise e
        else:
            logger.info('<<<<<<<<<用例{}响应状态码断言成功<<<<<<<'.format(item['title']))

        # 3.2 断言响应数据
        if item['res_type'].lower() == 'json':
            res = res.json()
        elif item['res_type'].lower() == 'html':
            # 扩展思路
            res = res.text
        try:
            self.assertEqual(expect_data, {'code': res['code'], 'msg': res['msg']})
        except AssertionError as e:
            logger.warning('用例【{}】响应数据断言异常'.format(item['title']))
            logger.warning('用例【{}】期望结果为:{}'.format(item['title'], expect_data))
            logger.warning('用例【{}】实际结果为:{}'.format(item['title'], {'code': res['code'], 'msg': res['msg']}))
            raise e
        else:
            logger.info('<<<<<<<<<用例{}响应数据断言成功<<<<<<<'.format(item['title']))

        # 3.3 数据库断言后面的任务
        if item.get('sql'):  # 返回指定键的值,如果键不在字典中返回默认值 None 或者设置的默认值。
            # 只有sql字段有sql的才需要校验数据库
            try:
                self.assertTrue(db.exist(item['sql']))
            except AssertionError as e:
                logger.warning('用例【{}】数据库断言异常,执行的sql为:{}'.format(item['title'], item['sql']))
                logger.info('<<<<<<<<<用例{}测试结束<<<<<<<'.format(item['title']))
                raise e
        logger.info('---------------用例{}测试成功---------------'.format(item['title']))


if __name__ == '__main__':
    unittest.main()