使用多进程池在Python中加速TFLite推理

9

我在使用tflite时发现,在推理期间我的多核CPU未被充分利用。我通过使用numpy创建随机输入数据(类似于图像的随机矩阵)消除了IO瓶颈,但是tflite仍然未能充分利用CPU的全部潜力。

文档提到了调整使用线程数的可能性。然而,我无法找到如何在Python API中进行此操作的方法。但是,既然我看到有人为不同的模型使用多个解释器实例,我想一个人也可以使用多个相同模型的实例并在不同的线程/进程上运行它们。我编写了以下简短的脚本:

import numpy as np
import os, time
import tflite_runtime.interpreter as tflite
from multiprocessing import Pool


# global, but for each process the module is loaded, so only one global var per process
interpreter = None
input_details = None
output_details = None
def init_interpreter(model_path):
    global interpreter
    global input_details
    global output_details
    interpreter = tflite.Interpreter(model_path=model_path)
    input_details = interpreter.get_input_details()
    output_details = interpreter.get_output_details()
    interpreter.allocate_tensors()
    print('done init')

def do_inference(img_idx, img):
    print('Processing image %d'%img_idx)
    print('interpreter: %r' % (hex(id(interpreter)),))
    print('input_details: %r' % (hex(id(input_details)),))
    print('output_details: %r' % (hex(id(output_details)),))

    tstart = time.time()

    img = np.stack([img]*3, axis=2) # replicates layer three time for RGB
    img = np.array([img]) # create batch dimension
    interpreter.set_tensor(input_details[0]['index'], img )
    interpreter.invoke()

    logit= interpreter.get_tensor(output_details[0]['index'])
    pred = np.argmax(logit, axis=1)[0]
    logit = list(logit[0])
    duration = time.time() - tstart 

    return logit, pred, duration

def main_par():
    optimized_graph_def_file = r'./optimized_graph.lite'

    # init model once to find out input dimensions
    interpreter_main = tflite.Interpreter(model_path=optimized_graph_def_file)
    input_details = interpreter_main.get_input_details()
    input_w, intput_h = tuple(input_details[0]['shape'][1:3])

    num_test_imgs=1000
    # pregenerate random images with values in [0,1]
    test_imgs = np.random.rand(num_test_imgs, input_w,intput_h).astype(input_details[0]['dtype'])

    scores = []
    predictions = []
    it_times = []

    tstart = time.time()
    with Pool(processes=4, initializer=init_interpreter, initargs=(optimized_graph_def_file,)) as pool:         # start 4 worker processes

        results = pool.starmap(do_inference, enumerate(test_imgs))
        scores, predictions, it_times = list(zip(*results))
    duration =time.time() - tstart

    print('Parent process time for %d images: %.2fs'%(num_test_imgs, duration))
    print('Inference time for %d images: %.2fs'%(num_test_imgs, sum(it_times)))
    print('mean time per image: %.3fs +- %.3f' % (np.mean(it_times), np.std(it_times)) )



if __name__ == '__main__':
    # main_seq()
    main_par()

然而,通过hex(id(interpreter))打印的解释器实例的内存地址对于每个进程都是相同的。输入/输出细节的内存地址却是不同的。因此,即使我能够体验到加速,我想知道这种做法是否有潜在问题?如果有,那么如何使用TFLite和Python实现并行推理?
tflite_runtime版本:1.14.0,来自此处(x86-64 Python 3.5版本)。
Python版本:3.5。

我认为你正在尝试解决我需要解决的相同问题。顺便说一下,我在 https://stackoverflow.com/questions/61263640/does-tf-config-experimental-set-synchronous-execution-make-the-python-tensorfl 上提出了这个问题。 - mherzog
1
@mherzog 我成功地使用了上述方法,并且通过一些测试推理结果发现,我得到了具有单独tflite解释器实例的独立进程,并且它们可以正确地工作且互不影响。我认为内存地址是相同的,因为这些进程是以相同的方式启动的,因此变量具有相同的内存布局。然而,这只是一个猜测,我没有深入研究这个问题。 - v.tralala
我尝试运行了类似的东西,但只是为了比较,也在一个简单的循环中运行它,并且使用5个工作进程处理50个数据点时获得的加速比(相对于在for循环中运行这50个图像)小于5%,所以我猜想如果没有使用不同的物理设备,那么不可能真正并行地处理任何TF代码。 - Vikram Murthy
@VikramMurthy 在我的情况下,从单核到四核的加速并不完全是4倍,但速度快了大约3.5倍。因此上面的代码在撰写时是有效的。然而,我不知道随后的tf版本是否有所改变(尽管我非常怀疑)。也许您应该确保速度瓶颈是模型推理而不是某些IO过程?此外,启动比可用CPU核心更多的工作程序可能会导致一些减速。 - v.tralala
2个回答

6

我知道这个帖子是两年半前创建的。

对于我来说,

import multiprocessing
tf.lite.Interpreter(modelfile, num_threads=multiprocessing.cpu_count())

工作得非常好。


0

我没有设置初始化器,使用以下代码加载模型,并在同一个函数中执行推理以解决此问题。

with Pool(processes=multiprocessing.cpu_count()) as pool:
   results = pool.starmap(inference, enumerate(test_imgs))

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接