下面的代码来自一篇文章/博客帖子,你一定应该查看(没有关联)-一行代码实现并行:日常线程任务的更好模型。我将在下面进行总结 - 最终只需要几行代码即可完成。
from multiprocessing.dummy import Pool as ThreadPool
pool = ThreadPool(4)
results = pool.map(my_function, my_array)
以下哪个是多线程版本:
results = []
for item in my_array:
results.append(my_function(item))
描述
Map是一个很棒的小函数,是轻松将并行性注入Python代码的关键。对于那些不熟悉的人来说,map是从类似Lisp的函数式语言中提取出来的东西。它是一个将另一个函数映射到序列上的函数。
Map为我们处理了序列的迭代、应用函数,并将所有结果存储在方便的列表中。
实现
两个库提供了map函数的并行版本:multiprocessing和它的鲜为人知的好伙伴multiprocessing.dummy。
multiprocessing.dummy
与multiprocessing模块完全相同,但使用线程而不是进程 (一个重要的区别 - 对于CPU密集型任务使用多个进程;对于I/O(期间)使用线程):
multiprocessing.dummy复制了multiprocessing的API,但不过是线程模块的一个包装。
import urllib2
from multiprocessing.dummy import Pool as ThreadPool
urls = [
'http://www.python.org',
'http://www.python.org/about/',
'http://www.onlamp.com/pub/a/python/2003/04/17/metaclasses.html',
'http://www.python.org/doc/',
'http://www.python.org/download/',
'http://www.python.org/getit/',
'http://www.python.org/community/',
'https://wiki.python.org/moin/',
]
# Make the Pool of workers
pool = ThreadPool(4)
# Open the URLs in their own threads
# and return the results
results = pool.map(urllib2.urlopen, urls)
# Close the pool and wait for the work to finish
pool.close()
pool.join()
并且这是定时结果:
Single thread: 14.4 seconds
4 Pool: 3.1 seconds
8 Pool: 1.4 seconds
13 Pool: 1.3 seconds
传递多个参数(仅适用于Python 3.3及更高版本,类似于这样操作):
要传递多个数组:
results = pool.starmap(function, zip(list_a, list_b))
或者传递一个常量和一个数组:
results = pool.starmap(function, zip(itertools.repeat(constant), list_a))
如果您使用早期版本的Python,您可以通过这个解决方法来传递多个参数。(感谢user136036提供的有用评论。)
with Pool(8) as p: p.map( *whatever* )
,这样可以省去繁琐的代码。 - user297171import Queue
import threading
import urllib2
# Called by each thread
def get_url(q, url):
q.put(urllib2.urlopen(url).read())
theurls = ["http://google.com", "http://yahoo.com"]
q = Queue.Queue()
for u in theurls:
t = threading.Thread(target=get_url, args = (q,u))
t.daemon = True
t.start()
s = q.get()
print s
get
等待直到其中一个执行了put
,然后发出结果并终止(这将关闭可能仍在运行的任何子线程,因为它们是守护线程)。join()
方法,因为这将使主线程等待直到它们完成,而不会通过不断检查值来消耗处理器。谢谢,这恰好是我需要理解如何使用线程的东西。 - krs013Queue
替换为 queue
。方法名称不变。 - JSmyths = q.get()
print s
@krs013 因为Queue.get()是阻塞的,所以您不需要使用join
。 - Tom Anderson然而,如果您仅仅需要交错执行(或者正在进行可以并行化的I/O操作,尽管存在全局解释器锁定),那么threading模块是开始的地方。作为一个非常简单的示例,让我们考虑通过并行求和子范围来求解大范围求和问题:
import threading
class SummingThread(threading.Thread):
def __init__(self,low,high):
super(SummingThread, self).__init__()
self.low=low
self.high=high
self.total=0
def run(self):
for i in range(self.low,self.high):
self.total+=i
thread1 = SummingThread(0,500000)
thread2 = SummingThread(500000,1000000)
thread1.start() # This actually causes the thread to run
thread2.start()
thread1.join() # This waits until the thread has completed
thread2.join()
# At this point, both threads have completed
result = thread1.total + thread2.total
print result
请注意,上述示例非常愚蠢,因为它根本不进行输入/输出操作,将在CPython中由于全局解释器锁的存在串行执行,尽管会交错执行(带有上下文切换的额外开销)。
thread1
运行直到完成,同时主线程被阻塞,然后thread2
也是同样的过程,接着主线程恢复并打印出它们累加的值。 - martineausuper(SummingThread, self).__init__()
吗?就像这个链接中的 https://dev59.com/YHI95IYBdhLWcg3wsQFm#2197625 所示。 - James Andres正如其他人所提到的,由于全局解释器锁(GIL),CPython只能在I/O等待时使用线程。
如果您想从多个核心中受益以完成CPU密集型任务,请使用multiprocessing:
from multiprocessing import Process
def f(name):
print 'hello', name
if __name__ == '__main__':
p = Process(target=f, args=('bob',))
p.start()
p.join()
f
启动的进程。同时,主程序现在只等待进程退出,并与之 join
起来。如果主程序刚刚退出,子进程可能会完成,也可能不会完成,因此总是建议使用 join
。 - johntellsallmap
函数的扩展答案:https://dev59.com/BXE85IYBdhLWcg3wSxkv#28463266 - philshem请注意:线程不需要队列。
以下是我可以想象到的最简单的示例,展示了10个进程同时运行。
import threading
from random import randint
from time import sleep
def print_number(number):
# Sleeps a random 1 to 10 seconds
rand_int_var = randint(1, 10)
sleep(rand_int_var)
print "Thread " + str(number) + " slept for " + str(rand_int_var) + " seconds"
thread_list = []
for i in range(1, 10):
# Instantiates the thread
# (i) does not make a sequence, so (i,)
t = threading.Thread(target=print_number, args=(i,))
# Sticks the thread in a list so that it remains accessible
thread_list.append(t)
# Starts threads
for thread in thread_list:
thread.start()
# This blocks the calling thread until the thread whose join() method is called is terminated.
# From http://docs.python.org/2/library/threading.html#thread-objects
for thread in thread_list:
thread.join()
# Demonstrates that the main process waited for threads to complete
print "Done"
for
循环,你可以在第一个循环中调用thread.start()
。 - Mark MishynAlex Martelli的回答对我很有帮助。不过,这里有一个我认为更有用的修改版(至少对我是这样)。
更新:适用于Python 2和Python 3。
try:
# For Python 3
import queue
from urllib.request import urlopen
except:
# For Python 2
import Queue as queue
from urllib2 import urlopen
import threading
worker_data = ['http://google.com', 'http://yahoo.com', 'http://bing.com']
# Load up a queue with your data. This will handle locking
q = queue.Queue()
for url in worker_data:
q.put(url)
# Define a worker function
def worker(url_queue):
queue_full = True
while queue_full:
try:
# Get your data off the queue, and do some work
url = url_queue.get(False)
data = urlopen(url).read()
print(len(data))
except queue.Empty:
queue_full = False
# Create as many threads as you want
thread_count = 5
for i in range(thread_count):
t = threading.Thread(target=worker, args = (q,))
t.start()
import Queue ModuleNotFoundError: No module named 'Queue'
我正在运行Python 3.6.5,一些帖子提到在Python 3.6.5中应该是 queue
,但即使我更改了它,仍然不起作用。 - user9371654给定一个函数f
,可以按照以下方式进行线程处理:
import threading
threading.Thread(target=f).start()
要将参数传递给f
threading.Thread(target=f, args=(a,b,c)).start()
is_alive
方法,但是我不知道该如何将其应用到线程上。我尝试使用 thread1=threading.Thread(target=f).start()
进行赋值,并用 thread1.is_alive()
进行检查,但是 thread1
被填充为 None
,所以没有成功。你知道是否有其他访问线程的方法吗? - cameronroytaylorthread1=threading.Thread(target=f)
,接着调用 thread1.start()
。然后你可以使用 thread1.is_alive()
方法来检查线程是否还在运行。 - starfrythread1.is_alive()
进行测试,当函数退出时返回 False
。 - cameronroytaylor我发现这很有用:创建与核心数相同数量的线程,并让它们执行大量的任务(在本例中,调用一个shell程序):
import Queue
import threading
import multiprocessing
import subprocess
q = Queue.Queue()
for i in range(30): # Put 30 tasks in the queue
q.put(i)
def worker():
while True:
item = q.get()
# Execute a task: call a shell program and wait until it completes
subprocess.call("echo " + str(item), shell=True)
q.task_done()
cpus = multiprocessing.cpu_count() # Detect number of cores
print("Creating %d threads" % cpus)
for i in range(cpus):
t = threading.Thread(target=worker)
t.daemon = True
t.start()
q.join() # Block until all tasks are done
我在这里看到了很多例子,大部分都是CPU密集型的,并没有真正的工作。以下是一个计算1000万至1005万之间所有质数的CPU密集型任务的示例,我在这里使用了四种方法:
import math
import timeit
import threading
import multiprocessing
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
def time_stuff(fn):
"""
Measure time of execution of a function
"""
def wrapper(*args, **kwargs):
t0 = timeit.default_timer()
fn(*args, **kwargs)
t1 = timeit.default_timer()
print("{} seconds".format(t1 - t0))
return wrapper
def find_primes_in(nmin, nmax):
"""
Compute a list of prime numbers between the given minimum and maximum arguments
"""
primes = []
# Loop from minimum to maximum
for current in range(nmin, nmax + 1):
# Take the square root of the current number
sqrt_n = int(math.sqrt(current))
found = False
# Check if the any number from 2 to the square root + 1 divides the current numnber under consideration
for number in range(2, sqrt_n + 1):
# If divisible we have found a factor, hence this is not a prime number, lets move to the next one
if current % number == 0:
found = True
break
# If not divisible, add this number to the list of primes that we have found so far
if not found:
primes.append(current)
# I am merely printing the length of the array containing all the primes, but feel free to do what you want
print(len(primes))
@time_stuff
def sequential_prime_finder(nmin, nmax):
"""
Use the main process and main thread to compute everything in this case
"""
find_primes_in(nmin, nmax)
@time_stuff
def threading_prime_finder(nmin, nmax):
"""
If the minimum is 1000 and the maximum is 2000 and we have four workers,
1000 - 1250 to worker 1
1250 - 1500 to worker 2
1500 - 1750 to worker 3
1750 - 2000 to worker 4
so let’s split the minimum and maximum values according to the number of workers
"""
nrange = nmax - nmin
threads = []
for i in range(8):
start = int(nmin + i * nrange/8)
end = int(nmin + (i + 1) * nrange/8)
# Start the thread with the minimum and maximum split up to compute
# Parallel computation will not work here due to the GIL since this is a CPU-bound task
t = threading.Thread(target = find_primes_in, args = (start, end))
threads.append(t)
t.start()
# Don’t forget to wait for the threads to finish
for t in threads:
t.join()
@time_stuff
def processing_prime_finder(nmin, nmax):
"""
Split the minimum, maximum interval similar to the threading method above, but use processes this time
"""
nrange = nmax - nmin
processes = []
for i in range(8):
start = int(nmin + i * nrange/8)
end = int(nmin + (i + 1) * nrange/8)
p = multiprocessing.Process(target = find_primes_in, args = (start, end))
processes.append(p)
p.start()
for p in processes:
p.join()
@time_stuff
def thread_executor_prime_finder(nmin, nmax):
"""
Split the min max interval similar to the threading method, but use a thread pool executor this time.
This method is slightly faster than using pure threading as the pools manage threads more efficiently.
This method is still slow due to the GIL limitations since we are doing a CPU-bound task.
"""
nrange = nmax - nmin
with ThreadPoolExecutor(max_workers = 8) as e:
for i in range(8):
start = int(nmin + i * nrange/8)
end = int(nmin + (i + 1) * nrange/8)
e.submit(find_primes_in, start, end)
@time_stuff
def process_executor_prime_finder(nmin, nmax):
"""
Split the min max interval similar to the threading method, but use the process pool executor.
This is the fastest method recorded so far as it manages process efficiently + overcomes GIL limitations.
RECOMMENDED METHOD FOR CPU-BOUND TASKS
"""
nrange = nmax - nmin
with ProcessPoolExecutor(max_workers = 8) as e:
for i in range(8):
start = int(nmin + i * nrange/8)
end = int(nmin + (i + 1) * nrange/8)
e.submit(find_primes_in, start, end)
def main():
nmin = int(1e7)
nmax = int(1.05e7)
print("Sequential Prime Finder Starting")
sequential_prime_finder(nmin, nmax)
print("Threading Prime Finder Starting")
threading_prime_finder(nmin, nmax)
print("Processing Prime Finder Starting")
processing_prime_finder(nmin, nmax)
print("Thread Executor Prime Finder Starting")
thread_executor_prime_finder(nmin, nmax)
print("Process Executor Finder Starting")
process_executor_prime_finder(nmin, nmax)
if __name__ == "__main__":
main()
Sequential Prime Finder Starting
9.708213827005238 seconds
Threading Prime Finder Starting
9.81836523200036 seconds
Processing Prime Finder Starting
3.2467174359990167 seconds
Thread Executor Prime Finder Starting
10.228896902000997 seconds
Process Executor Finder Starting
2.656402041000547 seconds
if __name__ == '__main__':
,否则测量会自行生成并打印 An attempt has been made to start a new process before...。 - SteinPython 3提供了启动并行任务的功能,这使我们的工作更加轻松。
以下是一个示例:
ThreadPoolExecutor示例(源代码)
import concurrent.futures
import urllib.request
URLS = ['http://www.foxnews.com/',
'http://www.cnn.com/',
'http://europe.wsj.com/',
'http://www.bbc.co.uk/',
'http://some-made-up-domain.com/']
# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
with urllib.request.urlopen(url, timeout=timeout) as conn:
return conn.read()
# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
# Start the load operations and mark each future with its URL
future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
try:
data = future.result()
except Exception as exc:
print('%r generated an exception: %s' % (url, exc))
else:
print('%r page is %d bytes' % (url, len(data)))
ProcessPoolExecutor(源代码)
import concurrent.futures
import math
PRIMES = [
112272535095293,
112582705942171,
112272535095293,
115280095190773,
115797848077099,
1099726899285419]
def is_prime(n):
if n % 2 == 0:
return False
sqrt_n = int(math.floor(math.sqrt(n)))
for i in range(3, sqrt_n + 1, 2):
if n % i == 0:
return False
return True
def main():
with concurrent.futures.ProcessPoolExecutor() as executor:
for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
print('%d is prime: %s' % (number, prime))
if __name__ == '__main__':
main()