异步压力测试

我们平时使用到的脚本思想,更多的还是同步的做法。例如`requests` 。我们请求一个接口的时候,需要发送请求,并且等待结果。在发送请求到接收请求的这段时间内,程序是被阻塞的。而且阻塞的成本根据情况而言,有可能会很大。

在做压力测试的时候,过多的线程数,会极度的消耗客户端的资源。使我们没有办法给服务器足够的压力。

为了解决这个问题,我们使用了异步的方式来进行了压力测试。
  • asyncio 一个容易让人不想看下去的库。
  • aiohttp 由于上手难度最低的`requests`并不支持异步的方式。所以用到这个库。

Demo

首先让我们看下Demo

 class Test(BlueTest.SoloPressAsync):
    async def setup(self, **kwargs):
        kwargs["data"]["code"] = await self.queue.get()
        return kwargs

    def newQueue(self):
        temp_list = []
        for i in range(0, 1000000):
            if len(str(i)) < 6:
                i = "0" * (6 - len(str(i))) + str(i)
            temp_list.append(str(i))
        queue = asyncio.Queue()
        [queue.put_nowait(temp) for temp in temp_list]
        self.queue = queue

if __name__ == '__main__':
    new = Test("http://hq.sinajs.cn/list=sh600001", "Get", headers={"requestTime": "test",
                                                                    "User-Agent": "Mozilla/5.0 (Windows; U; Windows NT 6.1; en-us) AppleWebKit/534.50 (KHTML, like Gecko) Version/5.1 Safari/534.50 IE 9.0"},
               vuser=500, total_num=2000,
               data={"code": "", "phone": "13111111111"})
    new.mainrun()
    new.dataReduction()

if __name__ 之上是我们的测试类。可以看到我们继承了 BlueTest.SoloPressAsync 。然后重写了两个函数 setup , newQueue

  • setup 数据初始化
  • newQueue 建立数据队列

大家可以看到,以上的两个函数,都是为了压力时的数据定制化而存在。基本思想是,在 newQueue 里进行数据组装和入队。在 setup 里进行数据赋值与出队。如果不需要定制化数据,以上操作均不需要。

详细说明

Class SoloPressAsync

构造函数

def __init__(self,url,method,path=”./result/Press_press.log”,vuser=5,total_num=10,**kwargs):

由此可以看出,初始化它或者它的子类的时候。我们需要传入不少参数。

  • url 请求地址
  • method “GET” or “POST” or “OPTION” ….
  • path 结果文件,一般不需要更改
  • vuser 虚拟用户数,在不更改系统配置的情况下最大值为500
  • total_num 总请求数
  • **kwargs 用来传入http请求的其他参数,比如,data,params,headers… 与 requests 规则一致

所以一般的实例化方式如下:

new = Test(“请求地址”, “Get”,vuser=500, total_num=2000,
data={“code”: “”, “phone”: “”}, headers={“User-Agent”: “Test”})

暂时只做了测试执行前的,自定义数据。暂时没做多接口的场景模拟demo。 以下说明两个可能需要用户重写的函数。

newQueue

1    def newQueue(self):
2        temp_list = []
3        for i in range(0, 1000000):
4            if len(str(i)) < 6:
5                i = "0" * (6 - len(str(i))) + str(i)
6            temp_list.append(str(i))
7        queue = asyncio.Queue()
8        [queue.put_nowait(temp) for temp in temp_list]
9        self.queue = queue

我们来简单解释一下以上代码。2-6行,我们生成了一个自定义的数组。内容并没有特定的含义,大家可以根据实际的业务需要来新建自己的数据。 第7行,新建了一个asyncio的队列。 第8行,数组的值写入队列 第9行,赋值 self.queue

至此,我们再测试中需要使用到的数据已经搞定了。

setup

async def setup(self, **kwargs):
    kwargs["data"]["code"] = await self.queue.get()
    return kwargs

通过之前的 newQueue 组装好数据之后。我们需要在测试过程中使用到它。我们需要重写 setup 函数。首先这个函数需要添加异步的标签。并且入参是 **kwargs 是为了方便后面的基础函数直接使用数据进行异步请求。 kwargs["data"]["code"] = await self.queue.get() 相当于 new = Test(“请求地址”, “Get”,vuser=500, total_num=2000,

data={“code”: await self.queue.get(), “phone”: “”}, headers={“User-Agent”: “Test”})

当然,实际使用肯定不是这样。这行代码只是为了便于读者理解数据是传入了哪里。

执行 在大概理解以上内容后。我们就可以使用自己的自定义函数来执行异步的压力测试了。

new.mainrun()  #执行
new.dataReduction() #数据整理

这里的执行原始数据与press类的结果格式完全相同。数据整理结果也完全相同。

更多的场景DEMO,还没写