我正在尝试使用Python 3对服务器进行压力测试。想法是每隔1秒向API服务器发送一个HTTP请求,共持续30分钟。我尝试使用
我在Stack Overflow上进行了搜索,但是迄今为止没有任何其他问题能够满足我的要求,或者可能我错过了什么。如果是这样,请有人指引我到正确的线程。非常感谢!
requests
和apscheduler
来实现这一点,但总是出现以下错误:
执行作业 "send_request (trigger: interval[0:00:01], next run at: 2017-05-23 11:05:46 EDT)" 被跳过:已达到运行实例的最大数量 (1)
如何使这个程序正常工作?以下是我的代码:
import requests, json, time, ipdb
from apscheduler.schedulers.blocking import BlockingScheduler as scheduler
def send_request():
url = 'http://api/url/'
# Username and password
credentials = { 'username': 'username', 'password': 'password'}
# Header
headers = { 'Content-Type': 'application/json', 'Client-Id': 'some string'}
# Defining payloads
payload = dict()
payload['item1'] = 1234
payload['item2'] = 'some string'
data_array = [{"id": "id1", "data": "some value"}]
payload['json_data_array'] = [{ "time": int(time.time()), "data": data_array]
# Posting data
try:
request = requests.post(url, headers = headers, data = json.dumps(payload))
except (requests.Timeout, requests.ConnectionError, requests.HTTPError) as err:
print("Error while trying to POST pid data")
print(err)
finally:
request.close()
print(request.content)
return request.content
if __name__ == '__main__':
sched = scheduler()
print(time.time())
sched.add_job(send_request, 'interval', seconds=1)
sched.start()
print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C'))
try:
# This is here to simulate application activity (which keeps the main thread alive).
while true:
pass
except (KeyboardInterrupt, SystemExit):
# Not strictly necessary if daemonic mode is enabled but should be done if possible
scheduler.shutdown()
我在Stack Overflow上进行了搜索,但是迄今为止没有任何其他问题能够满足我的要求,或者可能我错过了什么。如果是这样,请有人指引我到正确的线程。非常感谢!