作者:脸皮乃身外之物 | 来源:互联网 | 2023-07-25 15:01
需求
- 需要对接口进行压力测试,测试其最大并发是多少
python代码
import json
import requests
import time
import concurrent.futures
import threadingSUCCESS = 0
FAIL = 0;def Api(name,pwd):registerUrl = 'http://localhost:8001/api/UserInfo/Login'data = {'name':name,'password':pwd}paramData = json.dumps(data)headers = {'Content-Type':'application/json;charset=utf-8'}try:response = requests.post(registerUrl,data=paramData,headers=headers,timeout=3)resObj = json.loads(response.text)except Exception as ex:global FAILFAIL += 1print('失败次数={}'.format(FAIL))print(str(ex))finally:global SUCCESSSUCCESS += 1class MyThread(threading.Thread):'''线程类'''def __init__(self,name,pwd):threading.Thread.__init__(self)self.name = nameself.pwd = pwddef run(self):'''线程创建后会直接运行run函数'''Api(self.name,self.pwd)class PressuerTest:def simple(self,num):'''单线程,不做任何处理'''for i in range(num):name = "gagag" + str(i)pwd = 'passwrodtest'Api(name,pwd)def _BatchRegister(self,num):name = "gagag" + str(num)pwd = 'passwrodtest'Api(name,pwd)def _BatchRegisterAll(self,numList):'''并发这里创建一个线程池,总共有5个线程可以分配使用executor.map()与map()函数类似,表示对sites中的每一个元素,并发地调用函数'''with concurrent.futures.ThreadPoolExecutor(max_workers=80) as executor:executor.map(self._BatchRegister, numList)def mult(self,num):'''线程池'''numList = []for i in range(num):numList.append(i)self._BatchRegisterAll(numList)def newMultThread(self,num):'''新建多个线程'''threadArr = []for i in range(num):name = "newMultThread_" + str(i)pwd = 'pwd'thread = MyThread(name,pwd)thread.start()threadArr.append(thread)if __name__ == '__main__':startTime = time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(time.time()))pre = PressuerTest()pre.newMultThread(2000)endTime = time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(time.time()))print('开始时间:{}'.format(startTime)) print('结束时间:{}'.format(endTime))
总结
代码中一共使用了3种方式取执行压力测试,
- 单线程明显的会慢很多
- 多线程的采用线程池的话也达不到1秒1000个请求,会在一个线程执行完才会走到下一个线程
- 每个请求一个线程可以达到效果,线程会很快启动完,可以达到1秒1000个的高并发量(也可以更高)
额外因素
- 客户端的设置的超时时间会对并发数量有影响。
- 服务器的MinRequestBodyDataRate也会有影响(针对.netCore)