同样的代码,多线程比多进程快了900倍。

3
您需要下载fonts.zip 并将其解压缩到示例代码所在的同一文件夹中,以使示例运行。
此代码的目的是生成随机文本,将其呈现并保存为图像。代码接受lettersnumbers,它们分别是要从中生成文本的字母和数字的种群。还接受character_frequency,它确定将生成每个字符的实例数。然后生成一个长字符串,并将其拆分为随机大小子字符串存储在TextGenerator.dataset属性中,该属性结果来自TextGenerator.initialize_dataset

例如:对于字母='abc',数字='123',字符频率=3,将生成'aaabbbccc111222333',之后打乱并拆分为随机大小的子字符串,例如:['a312c', '1b1', 'bba32c3a2c']。

然后将渲染并将每个单词保存为图像,这是由TextGenerator.save_images产生的,这也是本问题的主题。
这里有一个executor参数,它将传递给TextGenerator用于示例演示,其中包括concurrent.futures.ThreadPoolExecutorconcurrent.futures.ProcessPoolExecutor什么是问题? 随着character_frequency的增加,存储在TextGenerator.dataset中的数据集将变得更长,但不应影响性能。实际发生的情况是:使用concurrent.futures.ProcessPoolExecutor时,TextGenerator.save_images需要的时间越长,而使用concurrent.futures.ThreadPoolExecutor并保持其他所有内容不变时,所需时间是恒定的,并且不受character_frequency的影响。
import random
import string
import tempfile
import textwrap
from concurrent.futures import (ProcessPoolExecutor, ThreadPoolExecutor,
                                as_completed)
from pathlib import Path
from time import perf_counter

import numpy as np
import pandas as pd
from cv2 import cv2
from PIL import Image, ImageDraw, ImageFont


class TextGenerator:
    def __init__(
        self,
        fonts,
        character_frequency,
        executor,
        max_images=None,
        background_colors=((255, 255, 255),),
        font_colors=((0, 0, 0),),
        font_sizes=(25,),
        max_example_size=25,
        min_example_size=1,
        max_chars_per_line=80,
        output_dir='data',
        workers=1,
        split_letters=False,
    ):
        assert (
            min_example_size > 0
        ), f'`min_example_size` should be > 0`, got {min_example_size}'
        assert (
            max_example_size > 0
        ), f'`max_example_size` should be > 0`, got {max_example_size}'
        self.fonts = fonts
        self.character_frequency = character_frequency
        self.executor = executor
        self.max_images = max_images
        self.background_colors = background_colors
        self.font_colors = font_colors
        self.font_sizes = font_sizes
        self.max_example_size = max_example_size
        self.min_example_size = min_example_size
        self.max_chars_per_line = max_chars_per_line
        self.output_dir = Path(output_dir)
        self.workers = workers
        self.split_letters = split_letters
        self.digits = len(f'{character_frequency}')
        self.max_font = max(font_sizes)
        self.generated_labels = []
        self.dataset = []
        self.dataset_size = 0

    def render_text(self, text_lines):
        font = random.choice(self.fonts)
        font_size = random.choice(self.font_sizes)
        background_color = random.choice(self.background_colors)
        font_color = random.choice(self.font_colors)
        max_width, total_height = 0, 0
        font = ImageFont.truetype(font, font_size)
        line_sizes = {}
        for line in text_lines:
            width, height = font.getsize(line)
            line_sizes[line] = width, height
            max_width = max(width, max_width)
            total_height += height
        image = Image.new('RGB', (max_width, total_height), background_color)
        draw = ImageDraw.Draw(image)
        current_height = 0
        for line_text, dimensions in line_sizes.items():
            draw.text((0, current_height), line_text, font_color, font=font)
            current_height += dimensions[1]
        return np.array(image)

    def display_progress(self, example_idx):
        print(
            f'\rGenerating example {example_idx + 1}/{self.dataset_size}',
            end='',
        )

    def generate_example(self, text_lines, example_idx):
        text_box = self.render_text(text_lines)
        filename = (self.output_dir / f'{example_idx:0{self.digits}d}.jpg').as_posix()
        cv2.imwrite(filename, text_box)
        return filename, text_lines

    def create_dataset_pool(self, executor, example_idx):
        future_items = []
        for j in range(self.workers):
            if not self.dataset:
                break
            text = self.dataset.pop()
            if text.strip():
                text_lines = textwrap.wrap(text, self.max_chars_per_line)
                future_items.append(
                    executor.submit(
                        self.generate_example,
                        text_lines,
                        j + example_idx,
                    )
                )
        return future_items

    def write_images(self):
        i = 0
        with self.executor(self.workers) as executor:
            while i < self.dataset_size:
                future_items = self.create_dataset_pool(executor, i)
                for future_item in as_completed(future_items):
                    filename, text_lines = future_item.result()
                    if filename:
                        self.generated_labels.append(
                            {'filename': filename, 'label': '\n'.join(text_lines)}
                        )
                    self.display_progress(i)
                i += min(self.workers, self.dataset_size - i)
                if self.max_images and i >= self.max_images:
                    break

    def initialize_dataset(self, letters, numbers, space_freq):
        for characters in letters, numbers:
            dataset = list(
                ''.join(
                    letter * self.character_frequency
                    for letter in characters + ' ' * space_freq
                )
            )
            random.shuffle(dataset)
            self.dataset.extend(dataset)
        i = 0
        temp_dataset = []
        min_split_example_size = min(self.max_example_size, self.max_chars_per_line)
        total_letters = len(self.dataset)
        while i < total_letters - self.min_example_size:
            example_size = random.randint(self.min_example_size, self.max_example_size)
            example = ''.join(self.dataset[i : i + example_size])
            temp_dataset.append(example)
            i += example_size
            if self.split_letters:
                split_example = ' '.join(list(example))
                for sub_example in textwrap.wrap(split_example, min_split_example_size):
                    if (sub_example_size := len(sub_example)) >= self.min_example_size:
                        temp_dataset.append(sub_example)
                        i += sub_example_size
        self.dataset = temp_dataset
        self.dataset_size = len(self.dataset)

    def generate(self, letters, numbers, space_freq, fp='labels.csv'):
        self.output_dir.mkdir(parents=True, exist_ok=True)
        self.initialize_dataset(letters, numbers, space_freq)
        t1 = perf_counter()
        self.write_images()
        t2 = perf_counter()
        print(
            f'\ntotal time: {t2 - t1} seconds, character frequency '
            f'specified: {self.character_frequency}, type: {self.executor.__name__}'
        )
        pd.DataFrame(self.generated_labels).to_csv(self.output_dir / fp, index=False)


if __name__ == '__main__':
    out = Path(tempfile.mkdtemp())
    total_images = 15
    for char_freq in [100, 1000, 1000000]:
        for ex in [ThreadPoolExecutor, ProcessPoolExecutor]:
            g = TextGenerator(
                [
                    p.as_posix()
                    for p in Path('fonts').glob('*.ttf')
                ],
                char_freq,
                ex,
                max_images=total_images,
                output_dir=out,
                max_example_size=15,
                min_example_size=5,
            )
            g.generate(string.ascii_letters, '0123456789', 1)

在我的i5 MBP上运行该代码会产生以下结果:

Generating example 15/649
total time: 0.0652076720000001 seconds, character frequency specified: 100, type: ThreadPoolExecutor
Generating example 15/656
total time: 1.1637316500000001 seconds, character frequency specified: 100, type: ProcessPoolExecutor
Generating example 15/6442
total time: 0.06430166800000015 seconds, character frequency specified: 1000, type: ThreadPoolExecutor
Generating example 15/6395
total time: 1.2626316840000005 seconds, character frequency specified: 1000, type: ProcessPoolExecutor
Generating example 15/6399805
total time: 0.05754961300000616 seconds, character frequency specified: 1000000, type: ThreadPoolExecutor
Generating example 15/6399726
total time: 45.18768219699999 seconds, character frequency specified: 1000000, type: ProcessPoolExecutor

使用线程处理15张图片,character_frequency=1000000时,用时0.05秒。使用进程处理15张图片,则需要45秒时间。为什么要花费如此长的时间?character_frequency对速度的影响是什么?character_frequency应该只影响初始化时间(线程的情况),而不影响处理时间。


1
如果这是一条规则,那么它应该适用于character_frequency=100和1000的情况,并且需要相同相对的时间量,但事实并非如此。如果character_frequency较低,则进程所需的时间非常接近于线程等效时间,这相当奇怪。 - user12690225
将您想进行序列化的代码移动到单独的函数中,并尽可能少地给出参数。 - dankal444
1个回答

1
假设我正确解释了您的代码,您正在生成由character_frequency值控制大小的示例文本。该值越大,文本越长。
文本是在程序的主循环中生成的。然后,您安排一组任务,这些任务接收该文本并基于其生成图像。
由于进程处于单独的内存地址空间中,因此需要通过pipe将文本发送给它们。这个管道是影响性能的瓶颈。您看到character_frequency增长时性能恶化的原因是因为需要串行地序列化更多的文本并通过该管道发送。您的工作进程正在等待数据到达。
这个问题不会影响您的线程池,因为线程位于主进程的相同内存地址空间中。因此,数据不需要被序列化并跨操作系统发送。
为了在使用进程时加速程序,您可以将文本生成逻辑移动到工作进程本身,或者将该文本写入一个或多个文件中。然后,您让工作进程自己打开这些文件,以便利用I/O并行化。您的主进程所做的就是将工作进程指向正确的文件位置或文件名。

那么假设我不是将生成的文本保存在列表中,而是将生成的样例保存到磁盘上的单独文件中,这些文件会被工作进程消耗和丢弃,对吧?我不理解的是为什么每次分配给工作者写作的文本时,整个字符串列表“TextGenerator.dataset”都必须重新序列化,而不是只序列化他们所分配的部分/仅一次序列化大列表并通过某种缓存、服务器进程或其他方式后续重用? - user12690225
列表只会在您自己发送text的位 (self.dataset.pop()) 上序列化一次。尽管如此,您仍需要通过单一通信渠道传递该文本。在设计基于多处理的应用程序时,必须考虑进程间通信 (IPC) 的开销。在分布式体系结构中同样适用: 如果设计不良,则网络 I/O 将成为主要瓶颈。 - noxdafox
根据最快的解决方案:如果您不需要在主进程中使用文本,则可以将文本生成参数发送给工作进程本身,让它们生成随机字符串。否则,您可以将它们写入一个文件,并向工作进程发送每个文本的文件位置,或者将每个文本写入专用文件并发送其名称。 - noxdafox
如果列表只被序列化一次,为什么增加其大小会影响每个工作进程的调用?character_frequency 不会影响单词大小,它影响存储在 TextGenerator.dataset 中等待写入的总单词数。单词大小由 max_example_sizemin_example_size 控制。如果每个发送到工作进程的字符串都增加两倍或三倍大小,并且速度下降了2-3倍,那么这将更有意义。如果总单词数较低,则写入5-15个字母的字符串的工作进程调用需要0.00001秒,而相同的字符串在总单词数较高时可能需要10秒或更长时间。 - user12690225

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接