使用多个并行线程分段下载大文件

3
我有一个使用案例,需要通过多个线程以部分方式下载大型远程文件。每个线程必须同时(并行)运行,抓取文件的特定部分。期望在成功下载所有部分后将它们组合成一个单一(原始)文件。
也许使用requests库可以完成这项工作,但我不确定如何将其多线程化为结合块的解决方案。
url = 'https://url.com/file.iso'
headers = {"Range": "bytes=0-1000000"}  # first megabyte
r = get(url, headers=headers)

我也在考虑使用curl,让Python编排下载过程,但我不确定这是否是正确的方法。这似乎太复杂了,远离了原始的Python解决方案。类似这样:

curl --range 200000000-399999999 -o file.iso.part2

有人能解释一下如何做到这样吗?或者提供一个在Python 3中有效的代码示例?我通常很容易找到与Python相关的答案,但是这个问题的解决方案似乎一直在逃避我。


这个答案怎么样?(https://dev59.com/0WYr5IYBdhLWcg3wE2Sz) - bug
那似乎与Python 2有关,不适用于Python 3。 - jjj
这是毫无意义的。网络不是多线程的。使用单个线程。 - user207421
@user207421 Aria2的用户和开发人员可能会持有不同的意见。此外,许多下载工具都支持恢复下载。你认为这是如何实现的呢?网络绝对是多线程的。诸如FastAPI之类的API如何向多个并发客户端发送并发响应?你的Web服务器只服务于一个客户端吗(先来先服务)?例如:Nginx有worker_connections(和速率限制)是有原因的。顺便说一下,在网络中,“线程”称为“套接字”。而且,如果您的声望超过300k,我也无所谓。 - DataMinion
4个回答

11

以下是使用Python 3和Asyncio的示例版本,它仅供参考,可以改进,但您应该能够获得所需的一切。

  • get_size:发送HEAD请求以获取文件大小
  • download_range:下载单个块
  • download:下载所有块并合并它们
import asyncio
import concurrent.futures
import functools
import requests
import os


# WARNING:
# Here I'm pointing to a publicly available sample video.
# If you are planning on running this code, make sure the
# video is still available as it might change location or get deleted.
# If necessary, replace it with a URL you know is working.
URL = 'https://download.samplelib.com/mp4/sample-30s.mp4'
OUTPUT = 'video.mp4'


async def get_size(url):
    response = requests.head(url)
    size = int(response.headers['Content-Length'])
    return size


def download_range(url, start, end, output):
    headers = {'Range': f'bytes={start}-{end}'}
    response = requests.get(url, headers=headers)

    with open(output, 'wb') as f:
        for part in response.iter_content(1024):
            f.write(part)


async def download(run, loop, url, output, chunk_size=1000000):
    file_size = await get_size(url)
    chunks = range(0, file_size, chunk_size)

    tasks = [
        run(
            download_range,
            url,
            start,
            start + chunk_size - 1,
            f'{output}.part{i}',
        )
        for i, start in enumerate(chunks)
    ]

    await asyncio.wait(tasks)

    with open(output, 'wb') as o:
        for i in range(len(chunks)):
            chunk_path = f'{output}.part{i}'

            with open(chunk_path, 'rb') as s:
                o.write(s.read())

            os.remove(chunk_path)


if __name__ == '__main__':
    executor = concurrent.futures.ThreadPoolExecutor(max_workers=3)
    loop = asyncio.new_event_loop()
    run = functools.partial(loop.run_in_executor, executor)

    asyncio.set_event_loop(loop)

    try:
        loop.run_until_complete(
            download(run, loop, URL, OUTPUT)
        )
    finally:
        loop.close()

这给了我一个错误 ValueError: Set of coroutines/Futures is empty - West
这种情况发生在chunks为空时,可能由于头部Content-Length中的file_size为0所致。 - bug
1
可能是因为链接 https://file-examples.com/storage/fe2ef7477862f581f9ce264/2017/04/file_example_MP4_1920_18MG.mp4 已经失效了。请使用其他你知道有效的链接。 - Inspired_Blue
谢谢@Inspired_Blue, 这绝对是需要注意的事情。我已经更换了失效的URL,但这些公共示例视频会不断变化,所以可能会再次失效。 - bug
@bug: 我已修改了您的答案,使用ThreadPoolExercutor而不是asyncio,只是为了说明这也是可能的。没有必要使用ThreadPoolExercutor。只是因为我觉得ThreadPoolExercutor更适合初学者,接口更简单。但我也认为asyncio更强大。这是我的答案。我还展示了如何使用tqdm显示进度条。 - Inspired_Blue

2

我发现最好的方法是使用一个名为pySmartDL的模块。

编辑:这个模块有一些问题,比如没有暂停下载并以后恢复的方法,而且项目已经不再得到维护。

如果你正在寻找这样的特性,我建议你尝试使用pypdl,但请注意它没有pySmartDL提供的一些高级特性,虽然对于大多数人来说,pypdl可能更好。

  • pypdl可以暂停/恢复下载

  • pypdl可以在下载失败的情况下重试下载,并提供一个选项以便在需要时使用其他URL继续下载

还有许多其他功能...

如何安装pypdl

步骤1:pip install pypdl

步骤2:为了下载文件,您可以使用

from pypdl import Downloader

dl = Downloader()
dl.start('http://example.com/file.txt', 'file.txt')

注意:默认情况下,这将为您提供一个下载计量器。
如果您需要将下载进度挂钩到GUI上,可以使用以下方法。
dl = Downloader()
dl.start('http://example.com/file.txt', 'file.txt', block=False, display=False)
while d.progress != 100:
    print(d.progress)

如果你想使用更多的线程,你可以使用

dl = Downloader()
dl.start('http://example.com/file.txt', 'file.txt', num_connections=8)

您可以在项目页面上找到更多功能:https://pypi.org/project/pypdl/


1
非常整洁,完美运行,这是最佳解决方案! - West

1
你还可以使用 concurrent.futures 中的 ThreadPoolExecutor(或 ProcessPoolExecutor)而不是使用 asyncio。以下展示了如何通过使用 ThreadPoolExecutor 来修改 bug 的答案

额外福利:下面的代码片段使用了 tqdm 来显示下载进度条。如果你不想使用 tqdm,只需在 with tqdm(total=file_size . . . 下面的代码块中注释掉即可。更多有关 tqdm 的信息,请参见此处,可以使用 pip install tqdm 安装。顺便说一句,tqdm 也可以与 asyncio 一起使用。

import requests
import concurrent.futures
from concurrent.futures import as_completed
from tqdm import tqdm
import os

def download_part(url_and_headers_and_partfile):
    url, headers, partfile = url_and_headers_and_partfile
    response = requests.get(url, headers=headers)
    # setting same as below in the main block, but not necessary:
    chunk_size = 1024*1024 

    # Need size to make tqdm work.
    size=0 
    with open(partfile, 'wb') as f:
        for chunk in response.iter_content(chunk_size):
            if chunk:
                size+=f.write(chunk)
    return size

def make_headers(start, chunk_size):
    end = start + chunk_size - 1
    return {'Range': f'bytes={start}-{end}'}

url = 'https://download.samplelib.com/mp4/sample-30s.mp4'
file_name = 'video.mp4'
response = requests.get(url, stream=True)
file_size = int(response.headers.get('content-length', 0))
chunk_size = 1024*1024

chunks = range(0, file_size, chunk_size)
my_iter = [[url, make_headers(chunk, chunk_size), f'{file_name}.part{i}'] for i, chunk in enumerate(chunks)] 

with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
    jobs = [executor.submit(download_part, i) for i in my_iter]

    with tqdm(total=file_size, unit='iB', unit_scale=True, unit_divisor=chunk_size, leave=True, colour='cyan') as bar:
        for job in as_completed(jobs):
            size = job.result()
            bar.update(size)

with open(file_name, 'wb') as outfile:
    for i in range(len(chunks)):
        chunk_path = f'{file_name}.part{i}'
        with open(chunk_path, 'rb') as s:
            outfile.write(s.read())
        os.remove(chunk_path)

1
你可以使用 grequests 进行并行下载。
import grequests

URL = 'https://cdimage.debian.org/debian-cd/current/amd64/iso-cd/debian-10.1.0-amd64-netinst.iso'
CHUNK_SIZE = 104857600  # 100 MB
HEADERS = []

_start, _stop = 0, 0
for x in range(4):  # file size is > 300MB, so we download in 4 parts. 
    _start = _stop
    _stop = 104857600 * (x + 1)
    HEADERS.append({"Range": "bytes=%s-%s" % (_start, _stop)})


rs = (grequests.get(URL, headers=h) for h in HEADERS)
downloads = grequests.map(rs)

with open('/tmp/debian-10.1.0-amd64-netinst.iso', 'ab') as f:
    for download in downloads:
        print(download.status_code)
        f.write(download.content)

PS:我没有检查范围是否正确确定,以及下载的md5sum是否匹配!这只是一般显示它如何工作。


这正是我所需要的。顺便说一句,这很棒,但如果您有时间修改代码以显示每个下载部分的进度,那就太好了。 - jjj
你可以尝试这个链接:https://stackoverflow.com/questions/33703730/adding-progress-feedback-in-grequests-task - Maurice Meyer
我发现这个脚本的问题是,合并下载的文件的字节数与原始文件不匹配。对于该文件(iso),您显示的总大小为351272960字节,但下载的文件长了3个字节:351272963字节。 - jjj

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接