WebSocket 接口测试浅谈

什么是 WebSocket

WebSocket 是一种基于在单个 TCP 连接上进行全双工通信的协议,解决了HTTP协议不适用于实时通信的缺点,相较于 HTTP 协议,WebSocket 协议实现了持久化网络通信,可以实现客户端和服务端的长连接,能够进行双向实时通信,协议名为"ws"。

WebSocket 使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据。在 WebSocket API 中,浏览器和服务器只需要完成一次握手,两者之间就直接可以创建持久性的连接,并进行双向数据传输。
在 WebSocket API 中,浏览器和服务器只需要做一个握手的动作,然后,浏览器和服务器之间就形成了一条快速通道。两者之间就直接可以数据互相传送。

HTTP与WebSocket

image-1662173780431

WebSocket的特点

  • 建立在 TCP 协议之上,服务器端的实现比较容易。
  • 与 HTTP 协议有着良好的兼容性。默认端口也是80和443,并且握手阶段采用 HTTP 协议,因此握手时不容易屏蔽,能通过各种 HTTP 代理服务器。
  • 数据格式比较轻量,性能开销小,通信高效。
  • 可以发送文本,也可以发送二进制数据。
  • 没有同源限制,客户端可以与任意服务器通信。
  • 协议标识符是ws(如果加密,则为wss),服务器网址就是 URL。
ws://example.com:80/some/path

image-1662191779893

WebSocket测试方法

在线测试工具

http://www.jsons.cn/websocket/

使用python 编程作为客户端测试

安装

Install with pip

pip install websocket-server

使用接口进行通信

我们使用python写一个简单的websocket的服务端

#! -*- coding: utf-8 -*-
"""
Info: Websocket 的使用示例
"""
import asyncio
import websockets

websocket_users = set()


# 检测客户端权限,用户名密码通过才能退出循环
async def check_user_permit(websocket):
    print("new websocket_users:", websocket)
    websocket_users.add(websocket)
    print("websocket_users list:", websocket_users)
    while True:
        recv_str = await websocket.recv()
        cred_dict = recv_str.split(":")
        if cred_dict[0] == "admin" and cred_dict[1] == "123456":
            response_str = "Congratulation, you have connect with server..."
            await websocket.send(response_str)
            print(cred_dict)
            print("Password is ok...")
            return True
        else:
            print(cred_dict)
            response_str = "Sorry, please input the username or password..."
            print("Password is wrong...")
            await websocket.send(response_str)


# 接收客户端消息并处理,这里只是简单把客户端发来的返回回去
async def recv_user_msg(websocket):
    while True:
        recv_text = await websocket.recv()
        print("recv_text:", websocket.pong, recv_text)
        response_text = f"Server return: {recv_text}"
        print("response_text:", response_text)
        await websocket.send(response_text)


# 服务器端主逻辑
async def run(websocket, path):
    while True:
        try:
            await check_user_permit(websocket)
            await recv_user_msg(websocket)
        except websockets.ConnectionClosed:
            print("ConnectionClosed...", path)    # 链接断开
            print("websocket_users old:", websocket_users)
            websocket_users.remove(websocket)
            print("websocket_users new:", websocket_users)
            break
        except websockets.InvalidState:
            print("InvalidState...")    # 无效状态
            break
        except Exception as e:
            print("Exception:", e)


if __name__ == '__main__':
    print("127.0.0.1:8281 websocket...")
    asyncio.get_event_loop().run_until_complete(websockets.serve(run, "127.0.0.1", 8281))
    asyncio.get_event_loop().run_forever()

运行服务端后可使用在线工具调试
image-1662256803205

使用python封装一个WebsocketUtil

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time    : 2022/9/3 16:24
# @File    : web_socket_util.py.py
# @Software: win10 Tensorflow1.13.1 python3.9
import logging
import json
from json import JSONDecodeError

from websocket import create_connection, WebSocketTimeoutException

logger = logging.getLogger(__name__)


class WebsocketUtil():
    def conn(self, url, timeout=3):
        '''
        短连接方法(使用create_connection链接,此方法不建议使用,链接不稳定,容易断,并且连接很耗时):
        连接web服务器
        :param url: 服务的url
        :param timeout: 超时时间
        :return:
        '''
         '''一直链接,直到连接上就退出循环'''
        while True:
            try:
               self.wss = create_connection(url, timeout=timeout)
                print(self.wss)
                break
            except Exception as e:
                print('连接异常:', e)
                continue

    def send(self, message):
        '''
        发送请求数据体
        :param message: 待发送的数据信息
        :return:
        '''
        if not isinstance(message, str):
            message = json.dumps(message)
        return self.wss.send(message)

    def load_json(self, base_str):
        '''
        进行数据体处理
        :param base_str: 待处理的数据
        :return:
        '''
        if isinstance(base_str, str):
            try:
                res = json.loads(base_str)
                return base_str
            except JSONDecodeError:
                return base_str
        elif isinstance(base_str, list):
            res = []
            for i in base_str:
                res.append(self.load_json(i))
            return res
        elif isinstance(base_str, str):
            for key, value in base_str.items():
                base_str[key] = self.load_json(value)
            return base_str
        return base_str

    def recv(self, timeout=3):
        '''
        接收数据体信息,并调用数据体处理方法处理响应体
        :param timeout: 超时时间
        :return:
        '''
        if isinstance(timeout, dict):
            timeout = timeout["timeout"]
        try:
            self.settimeout(timeout)
            recv_json = self.wss.recv()
            all_json_recv = self.load_json(recv_json)
            self._set_response(all_json_recv)
            return all_json_recv
        except WebSocketTimeoutException:
            logger.error(f'已经超过{timeout}秒没有接收数据啦')

    def settimeout(self, timeout):
        '''
        设置超时时间
        :param timeout: 超时时间
        :return:
        '''
        self.wss.settimeout(timeout)

    def recv_all(self, timeout=3):
        '''
        接收多个数据体信息,并调用数据体处理方法处理响应体
        :param timeout: 超时时间
        :return:
        '''
        if isinstance(timeout, dict):
            timeout = timeout['timeout']
        recv_list = []
        while True:
            try:
                self.settimeout(timeout)
                recv_list = self.wss.recv()
                all_json_recv = self.load_json(recv_list)
                recv_list.append(all_json_recv)
                logger.info(f'all::::: {all_json_recv}')
            except WebSocketTimeoutException:
                logger.error(f'已经超过{timeout}秒没有接收数据啦')
                break
        self._set_response(recv_list)
        return recv_list

    def close(self):
        '''
        关闭连接
        :return:
        '''
        return self.wss.close()

    def _set_response(self, response):
        self.response = response

    def _get_response(self) -> list:
        return self.response

编写测试用例

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time    : 2022/9/3 16:25
# @Author  : shisuiyi
# @File    : test_case.py.py
# @Software: win10 Tensorflow1.13.1 python3.9
from web_socket_util import WebsocketUtil


class TestWsDemo:
    def setup(self):
        url = 'ws://127.0.0.1:8281'
        self.wss = WebsocketUtil()
        self.wss.conn(url)
        data = 'admin:123456'
        self.wss.send(data)
        res = self.wss.recv()
        print(res)

    def teardown(self):
        self.wss.close()

    def test_send_01(self):
        data = 'shisuiyi'
        self.wss.send(data)
        res = self.wss.recv()
        print(res)
        assert data in res

image-1662256463599
image-1662256485405

补充WebSocket长连接

  1. 安装pip install websocket-client
  2. 长连接的调用方法
ws = websocket.WebSocketApp("ws://echo.websocket.org/",
							on_message = on_message,
							on_error = on_error,
							on_close = on_close)
ws.on_open = on_open
ws.run_forever()
  1. 长连接,参数介绍
  • url: websocket的地址。
  • header: 客户发送websocket握手请求的请求头,{‘head1:value1’,‘head2:value2’}。
  • on_open:在建立Websocket握手时调用的可调用对象,这个方法只有一个参数,就是该类本身。
  • on_message:这个对象在接收到服务器返回的消息时调用。有两个参数,一个是该类本身,一个是我们从服务器获取的字符串(utf-8格式)。
  • on_error:这个对象在遇到错误时调用,有两个参数,第一个是该类本身,第二个是异常对象。
  • on_close:在遇到连接关闭的情况时调用,参数只有一个,就是该类本身。
  • on_cont_message:这个对象在接收到连续帧数据时被调用,有三个参数,分别是:类本身,从服务器接受的字符串(utf-8),连续标志。
  • on_data:当从服务器接收到消息时被调用,有四个参数,分别是:该类本身,接收到的字符串(utf-8),数据类型,连续标志。
  • keep_running:一个二进制的标志位,如果为True,这个app的主循环将持续运行,默认值为True。
  • get_mask_key:用于产生一个掩码。
  • subprotocols:一组可用的子协议,默认为空。
  1. 长连接关键方法:ws.run_forever(ping_interval=60,ping_timeout=5)

如果不断开关闭websocket连接,会一直阻塞下去。另外这个函数带两个参数,如果传的话,启动心跳包发送。
ping_interval:自动发送“ping”命令,每个指定的时间(秒),如果设置为0,则不会自动发送。
ping_timeout:如果没有收到pong消息,则为超时(秒)。

ws.run_forever(ping_interval=60,ping_timeout=5)
#ping_interval心跳发送间隔时间#ping_timeout 设置,发送ping到收到pong的超时时间
  1. 长连接示例
import websocket
import json
import time
try:
    import thread
except ImportError:
    import _thread as thread


def on_message(ws, message):
    '''服务器有数据更新时,主动推送过来的数据'''
    print(message)


def on_error(ws, error):
    '''程序报错时,就会触发on_error事件'''
    print(error)


def on_close(ws):
    '''关闭websocket连接后,打印此消息'''
    print("### closed ###")


def on_open(ws):
    '''连接到服务器之后就会触发on_open事件,这里用于send数据'''
    def run(*args):
        Sign = {
            "name": 1,
            "nam2": 2
            }
        ws.send(json.dumps(Sign))  # 发送数据(必须为str类型)
        time.sleep(1)
        ws.close()  # 关闭连接
    thread.start_new_thread(run, ())


def run_long():
    websocket.enableTrace(True)
    ws = websocket.WebSocketApp("ws://echo.websocket.org/",
                                on_message=on_message,
                                on_error=on_error,
                                on_close=on_close)
    ws.on_open = on_open
    ws.run_forever(ping_timeout=5)
    # ws.run_forever()


if __name__ == "__main__":
    run_long()