在Python 3中每隔x秒发起一次API请求

4
我正在尝试使用Python 3对服务器进行压力测试。想法是每隔1秒向API服务器发送一个HTTP请求,共持续30分钟。我尝试使用requestsapscheduler来实现这一点,但总是出现以下错误:

执行作业 "send_request (trigger: interval[0:00:01], next run at: 2017-05-23 11:05:46 EDT)" 被跳过:已达到运行实例的最大数量 (1)

如何使这个程序正常工作?以下是我的代码:

import requests, json, time, ipdb
from apscheduler.schedulers.blocking import BlockingScheduler as scheduler

def send_request():
    url = 'http://api/url/'

    # Username and password
    credentials = { 'username': 'username', 'password': 'password'}

    # Header
    headers = { 'Content-Type': 'application/json', 'Client-Id': 'some string'}

    # Defining payloads
    payload = dict()

    payload['item1']    = 1234
    payload['item2'] = 'some string'
    data_array = [{"id": "id1", "data": "some value"}]
    payload['json_data_array'] = [{ "time": int(time.time()), "data": data_array]

    # Posting data
    try:
        request = requests.post(url, headers = headers, data =  json.dumps(payload))
    except (requests.Timeout, requests.ConnectionError, requests.HTTPError) as err:
        print("Error while trying to POST pid data")
        print(err)
    finally:
        request.close()

    print(request.content)

    return request.content

if __name__ == '__main__':
    sched = scheduler()
    print(time.time())
    sched.add_job(send_request, 'interval', seconds=1)
    sched.start()
    print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C'))

    try:
        # This is here to simulate application activity (which keeps the main thread alive).
        while true:
            pass
    except (KeyboardInterrupt, SystemExit):
        # Not strictly necessary if daemonic mode is enabled but should be done if possible
        scheduler.shutdown()

我在Stack Overflow上进行了搜索,但是迄今为止没有任何其他问题能够满足我的要求,或者可能我错过了什么。如果是这样,请有人指引我到正确的线程。非常感谢!

@calico_ 谢谢,我会尽快查看。 - user8054069
@calico_ 是的,问题在于请求时间超过了1秒。但由于这是一项压力测试,如果已经有请求在进行中,我不能跳过该请求。我希望代码能够即使前一个请求尚未完成/返回,也能发出API请求。 - user8054069
是的,抱歉之前的回答不够详细。我编辑了我的回答并加入了一个解决方案。 - calico_
2个回答

4
我认为您的错误与我标记的重复内容以及@jeff的答案描述得很好。
编辑:显然不是这样。因此,我将在此处描述如何解决最大实例问题:
最大实例问题
当您向调度程序添加作业时,有一个参数可以设置作业的最大允许并发实例数。您应该在此处阅读此内容: BaseScheduler.add_job() 因此,修复您的问题只是将其设置为更高的值:
sch.add_job(myfn, 'interval', seconds=1, max_instances=10)

但是,你需要多少并发请求呢?如果它们需要超过一秒钟来响应,并且你每秒请求一个,如果让它运行足够长的时间,你最终将总是会出现错误...

调度器

有几个调度器选项可用,这里介绍两个:

BackgroundScheduler

你正在导入阻塞调度器-当启动时会阻塞。因此,在调度器停止之后,不会执行其他代码。如果需要在启动调度器后执行其他代码,我会使用后台调度器,如下所示:

from apscheduler.schedulers.background import BackgroundScheduler as scheduler

def myfn():
    # Insert your requests code here
    print('Hello')

sch = scheduler()
sch.add_job(myfn, 'interval', seconds=5)
sch.start()

# This code will be executed after the sceduler has started
try:
    print('Scheduler started, ctrl-c to exit!')
    while 1:
        # Notice here that if you use "pass" you create an unthrottled loop
        # try uncommenting "pass" vs "input()" and watching your cpu usage.
        # Another alternative would be to use a short sleep: time.sleep(.1)

        #pass
        #input()
except KeyboardInterrupt:
    if sch.state:
        sch.shutdown()

BlockingScheduler

如果你在启动调度器后不需要执行其他代码,你可以使用阻塞调度器,这样更加简单:

apscheduler.schedulers.blocking import BlockingScheduler as scheduler

def myfn():
    # Insert your requests code here
    print('Hello')

# Execute your code before starting the scheduler
print('Starting scheduler, ctrl-c to exit!')

sch = scheduler()
sch.add_job(myfn, 'interval', seconds=5)
sch.start()

我尝试了两种解决方案,问题仍然存在。我认为原因是请求本身执行的时间超过了1秒(我尝试每5秒运行一次,这样可以正常工作)。 - user8054069
如何在600秒后停止? - Developer Vicky

0

我以前从未使用过Python中的调度器,但是这个其他StackOverflow问题似乎处理了这个问题。

这意味着任务需要超过一秒钟的时间,默认情况下,给定作业只允许一个并发执行... - Alex Grönholm

在您的情况下,我想使用线程可能会满足您的需求。如果您创建一个继承自Python线程的类,像这样:

class Requester(threading.Thread):
  def __init__(self, url, credentials, payload):
    threading.Thread._init__(self)
    self.url = url
    self.credentials = credentials
    self.payload = payload        
  def run(self):
    # do the post request here
    # you may want to write output (errors and content) to a file
    # rather then just printing it out sometimes when using threads 
    # it gets really messing if you just print everything out

然后就像你处理轻微变化一样。

if __name__ == '__main__':
  url = 'http://api/url/'
# Username and password
  credentials = { 'username': 'username', 'password': 'password'}
# Defining payloads
  payload = dict()
  payload['item1']    = 1234
  payload['item2'] = 'some string'
  data_array = [{"id": "id1", "data": "some value"}]
  payload['json_data_array'] = [{ "time": int(time.time()), "data": data_array]
  counter = 0
  while counter < 1800:
    req = Requester(url, credentials, payload)
    req.start()
    counter++
    time.sleep(1)

当然,您可以按照自己的喜好完成其余部分,如果您希望这样做,您可以使KeyboardInterrupt成为实际完成脚本的方式。

当然,如果问题是调度程序,那么这是绕过调度程序的一种方法。


1
嗯,所以请求者不会等到下一个请求者开始吗?这是一个有趣的想法,我会尝试一下看看它的工作原理。谢谢! - user8054069
不过有一个问题,如果我们定义 def run(self),而不是使用 req.start(),那么应该改成 req.run() 对吧?我还更新了头部分,因为犯了一个错误把它和一些不相关的代码一起删掉了。 - user8054069
因此,在使用线程时,通常不会直接调用run方法,而是从start方法中调用run。 - Jeff
一个小提示。我更多地来自于Java背景,但我看到了Jerubs的答案[https://dev59.com/EnRB5IYBdhLWcg3wUVxd],这可能是一种更Python的解决方案。所以如果你不想创建一个线程子类,你可以像这样做:def makeRequest(url, headers, payload):#实际处理请求(抱歉,我似乎无法弄清楚如何将代码放入代码块中) - Jeff
apscheduler包以清晰的面向对象方式实现了这个目标,但需要理解一些中级Python原则。有关详细信息,请参见下面的答案。 - calico_

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接