关于在调用gather()时同时发送n个请求的问题,关键是在每次调用之前使用create_task()和await asyncio.sleep(1.1)
。使用create_task创建的任何任务都会立即运行:
for i in range(THREADS):
await asyncio.sleep(1.1)
tasks.append(
asyncio.create_task(getData(session, q, ''.join(random.choice(string.ascii_lowercase) for i in range(10))))
)
await asyncio.gather(*tasks)
另一个限制同时连接数量的问题在下面的例子中也通过在async_payload_wrapper中使用ClientSession()上下文并设置具有限制的连接器来解决。
通过这种设置,我可以运行25个协程(THREADS=25),每个循环遍历一个URL队列,而不违反25个并发连接规则。
async def send_request(session, url, routine):
start_time = time.time()
print(f"{routine}, sending request: {datetime.now()}")
params = {
'api_key': 'nunya',
'url': '%s' % url,
'render_js': 'false',
'premium_proxy': 'false',
'country_code':'us'
}
try:
async with session.get(url='http://yourAPI.com',params=params,) as response:
data = await response.content.read()
print(f"{routine}, done request: {time.time() - start_time} seconds")
return data
except asyncio.TimeoutError as e:
print('timeout---------------------')
errors.append(url)
except aiohttp.ClientResponseError as e:
print('request failed - Server Error')
errors.append(url)
except Exception as e:
errors.append(url)
async def getData(session, q, test):
while True:
if not q.empty():
url = q.get_nowait()
resp = await send_request(session, url ,test)
if resp is not None:
processData(resp, test, url)
else:
print(f'{test} queue empty')
break
async def async_payload_wrapper():
tasks = []
q = asyncio.Queue()
for url in urls:
await q.put(url)
async with ClientSession(connector=aiohttp.TCPConnector(limit=THREADS), timeout=ClientTimeout(total=61), raise_for_status=True) as session:
for i in range(THREADS):
await asyncio.sleep(1.1)
tasks.append(
asyncio.create_task(getData(session, q, ''.join(random.choice(string.ascii_lowercase) for i in range(10))))
)
await asyncio.gather(*tasks)
if __name__ == '__main__':
start_time = time.time()
asyncio.run(async_payload_wrapper())