使用Python循环进行多线程

4

我正在尝试在我的处理器的多个线程上运行此Python代码,但我找不到如何分配多个线程。 我在Jupyter(以前是IPython)中使用python 2.7

以下是初始代码(所有这部分都完美地工作)。 它是一个Web解析器,它获取x,即我的列表中的url之一,即url列表,然后编写CSV(其中out_string是一行)。

没有多线程的代码

my_list = ['http://stackoverflow.com/', 'http://google.com']

def main():
    with open('Extract.csv'), 'w') as out_file:
        count_loop = 0
        for x in my_list:
            #================  Get title ==================#
            out_string = ""
            campaign = parseCampaign(x)
            out_string += ';' + str(campaign.getTitle())

            #================ Get Profile ==================#
            if campaign.getTitle() != 'NA':
                creator = parseCreator(campaign.getCreatorUrl())
                out_string += ';' + str(creator.getCreatorProfileLinkUrl())
            else:
                pass
            #================ Write ==================#
            out_string += '\n'
            out_file.write(out_string) 
            count_loop +=1
            print '---- %s on %s ------- ' %(count_loop, len(my_list))

使用多线程编码但不起作用

from threading import Thread
my_list = ['http://stackoverflow.com/', 'http://google.com']

def main(x):
    with open('Extract.csv'), 'w') as out_file:
        count_loop = 0
        for x in my_list:
            #================  Get title ==================#
            out_string = ""
            campaign = parseCampaign(x)
            out_string += ';' + str(campaign.getTitle())

            #================ Get Profile ==================#
            if campaign.getTitle() != 'NA':
                creator = parseCreator(campaign.getCreatorUrl())
                out_string += ';' + str(creator.getCreatorProfileLinkUrl())
            else:
                pass
            #================ Write ==================#
            out_string += '\n'
            out_file.write(out_string) 
            count_loop +=1
            print '---- %s on %s ------- ' %(count_loop, len(my_list))

for x in my_list:
    t = Thread(target=main, args=(x,))
    t.start()
    t2 = Thread(target=main, args=(x,))
    t2.start()

我找不到一个好方法来实现多线程运行这段代码,而且由于文档不太容易理解,所以我有点困惑。在一个内核的情况下,这个代码需要花费2个小时,使用多线程将节省我很多时间!


为了使循环运行更快, - SciPy
这是我代码的简化版本,真实版本每个循环需要7到10秒,因为有很多请求(外部API),如果利用我的处理器的每个核心,即core1 = 10s * 3000urls + core2 = 10s * 3000urls + core3 = 10s * 3000urls + core4 = 10s * 3000urls同时运行,那么10s * 12,000 urls的时间将显著缩短。 - SciPy
当你说多线程版本不起作用时,你是指什么?有错误吗,还是只是没有更快地运行?请检查一下这个问题,我认为即使使用线程模块,ipython也不会使用多个核心。https://dev59.com/NnVC5IYBdhLWcg3wtzoQ#204150 - steven
Python 由于很多原因并不适合进行多线程调优。建议使用 multiprocessingasyncio 进行尝试。 - B. M.
请告诉我们"not working"是什么意思。 - tdelaney
2个回答

3

嗯...对于以下问题的答案:

为什么要为完全相同的任务分配两个线程?

是:

为了更快地运行循环

(请查看原帖的评论)

如果两个线程将执行完全相同的操作,那么显然存在问题。

亲爱的发帖者,请尝试像下面这样做:

import multiprocessing

nb_cores = 2  # Put the correct amount

def do_my_process_for(this_argument):
  # Add the actual code
  pass

def main():

  pool = multiprocessing.Pool(processes=nb_cores)

  results_of_processes = [pool.apply_async(
      do_my_process, 
      args=(an_argument, ),
      callback=None
  ) for an_argument in arguments_list]

  pool.close()
  pool.join()

基本上,您可以认为每个进程/线程都有自己的“思想”。这意味着在您的代码中,第一个线程将针对您在列表上迭代的参数x执行main()中定义的进程,第二个线程将再次为参数x执行相同的任务(main()中的任务)。
您需要的是将您的进程制定为具有一组输入参数和一组输出的过程。然后,您可以创建多个进程,给它们中的每一个提供所需的一个输入参数,然后该进程将使用正确的参数执行主例程。
希望它有所帮助。还可以查看代码,我想您会理解它。
另请参阅:
多进程映射和异步映射(我现在记不清确切名称)
以及
functools partial

如果我只有一个核心,我还能运行这个程序并生成线程吗? - ATOzTOA

3

好的,让我们分析一下你的问题。

首先,你的main()方法处理所有的输入和将输出写入文件。当你使用两个线程的main方法时,两个线程都会完成相同的工作。你需要一个方法来处理单个输入并返回该输入的输出。

def process_x(x):
    #================  Get title ==================#
    out_string = ""
    campaign = parseCampaign(x)
    out_string += ';' + str(campaign.getTitle())

    #================ Get Profile ==================#
    if campaign.getTitle() != 'NA':
        creator = parseCreator(campaign.getCreatorUrl())
        out_string += ';' + str(creator.getCreatorProfileLinkUrl())
    else:
        pass
    #================ Write ==================#
    out_string += '\n'
    return out_string

现在您可以在多个线程中调用此方法,并单独获取每个x的输出。
from threading import Thread
my_list = ['http://stackoverflow.com/', 'http://google.com']
threads = list()
for x in my_list:
    t = Thread(target=process_x, args=(x,))
    t.start()

但问题在于,这将启动n个线程,其中n是my_list中元素的数量。因此,在这里使用multiprocessing.Pool会更好。因此,请改为使用:

from multiprocessing import Pool
pool = Pool(processes=4)              # start 4 worker processes
result_list = pool.map(process_x, my_list)

在这里result_list会拥有所有列表的结果。现在你可以将其保存在文件中。
with open('Extract.csv'), 'w') as out_file:
    out_file.writelines(result_list)

映射块执行。异步映射是更好的解决方案,而applyasync比这两个更好(在我看来)。 - Xxxo
@Kostas,你说得对,Pool.map会阻塞直到处理完所有输入列表。但是在这里,我不认为OP会在applyasync和将数据保存到文件之间做任何事情,因此我们需要关注map方法的阻塞情况。此外,[Pool.applyasync(f, x) for x in my_list]; Pool.close(); Pool.join();在逻辑上基本上等同于Pool.map - Muhammad Tahir

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接