dask.distributed中LocalCluster线程和进程的区别

13
以下是 dask.distributedLocalCluster 配置的区别?
Client(n_workers=4, processes=False, threads_per_worker=1)

对抗

Client(n_workers=1, processes=True, threads_per_worker=4)

他们都有四个线程在处理任务图,但第一个客户端有四个工人。那么,相较于一个拥有多个线程的单个工人,拥有多个作为线程的工人会带来什么好处呢?

编辑:只是澄清一下,我知道进程、线程和共享内存之间的区别,所以这个问题更加关注这两个客户端的配置差异。

3个回答

8

我受到Victor和Martin的回答的启发,想深入了解一下,因此这里是我理解的深度总结。(无法在评论中完成)

首先,请注意此版本dask中的调度程序输出并不直观。processes实际上是工作进程(worker)的数量,cores实际上是所有worker中线程(thread)的总数。

其次,Victor关于TCP地址以及添加/连接更多worker的评论是值得指出的。我不确定是否可以在 processes=False 的情况下向集群添加更多worker,但我认为答案可能是“是”。

现在,请考虑以下脚本:

from dask.distributed import Client

if __name__ == '__main__':
    with Client(processes=False) as client:  # Config 1
        print(client)
    with Client(processes=False, n_workers=4) as client:  # Config 2
        print(client)
    with Client(processes=False, n_workers=3) as client:  # Config 3
        print(client)
    with Client(processes=True) as client:  # Config 4
        print(client)
    with Client(processes=True, n_workers=3) as client:  # Config 5
        print(client)
    with Client(processes=True, n_workers=3,
                threads_per_worker=1) as client:  # Config 6
        print(client)

这是在我的笔记本电脑上(4个核心),使用dask版本2.3.0产生的输出:

<Client: scheduler='inproc://90.147.106.86/14980/1' processes=1 cores=4>
<Client: scheduler='inproc://90.147.106.86/14980/9' processes=4 cores=4>
<Client: scheduler='inproc://90.147.106.86/14980/26' processes=3 cores=6>
<Client: scheduler='tcp://127.0.0.1:51744' processes=4 cores=4>
<Client: scheduler='tcp://127.0.0.1:51788' processes=3 cores=6>
<Client: scheduler='tcp://127.0.0.1:51818' processes=3 cores=3>

以下是我对配置差异的理解:
  1. 调度程序和所有工作程序都作为客户端进程中的线程运行。(如Martin所说,这对于内省很有用。)由于未给出工作程序或每个工作程序线程的数量,因此 dask 调用其函数 nprocesses_nthreads() 来设置默认值(在 processes=False 的情况下,1 个进程和线程等于可用核心数)。
  2. 与第一种相同,但由于给定了 n_workers,因此 dask 选择线程/工作程序,使得总线程数等于核心数(即为 1)。同样,在打印输出中的 processes 不是完全正确 - 实际上,它是工作程序的数量(在这种情况下实际上是线程)。
  3. 与第二种相同,但由于 n_workers 不能被核心数整除,dask 选择 2 个线程/工作程序进行过量提交而不是欠量提交。
  4. 客户端、调度程序和所有工作程序都是单独的进程。Dask 选择默认数量的工作程序(等于核心数,因为它小于等于 4)和每个工作程序的默认线程数(1)。
  5. 与第 5 种相同的进程/线程配置,但由于与第 3 种相同的原因,总线程数被过量预约。
  6. 这个行为符合预期。

考虑到您在第2和第3个项目符号中的解释,如果processes = 3实际上意味着“3个线程”(项目符号2),那么每个线程有2个线程是什么意思?(客户端3的3个进程,6个核心)@jrinker - honor
1
我正在使用一台有4个可用核心的机器工作,并且我能够定义一个具有5个或更多n_workers的客户端,为什么会这样?这如何符合“进程”实际上是工作人员数量的概念,为什么不限于我的机器上的物理核心数? - Luis Chaves Rodriguez

6
您在混淆几件不同的事情:
- 进程和线程之间的平衡,不同的混合方式适用于不同的工作负载。每个工作进程使用更多的线程可以更好地共享内存资源并避免序列化; 较少的线程和更多的进程意味着更好地避免全局解释器锁(GIL)。 - 使用 processes=False,调度程序和工作进程都作为客户端的同一进程中的线程运行。同样,它们将共享内存资源,您甚至不必在客户端和调度程序之间序列化对象。然而,您将拥有许多线程,客户端可能会变得不太响应。这通常用于测试,因为可以直接检查调度程序和工作进程对象。

嗨,马丁,第二个要点是我正在寻找的信息 - 谢谢!我深入研究了几种不同的配置,并用自己的答案回复了,可能有太多细节了。希望我写的一切都是正确的,但如果你发现我在那里犯了一个错误,请告诉我,我会修正它。 - jrinker

5
当您使用processes=False时,您将限制集群仅通过您的计算机架构工作。
from dask.distributed import Client

# The address provided by processes=False is a In-process transport address. 
# This is used to perform communication between threads 
# Scheduler and workers are on the same machine.
client = Client(processes=False)
client
<Client: scheduler='inproc://10.0.0.168/31904/1' processes=1 cores=4>

# The address provided on processes=True is tcp protocol. 
# This is a network address. You can start workers from others machines
# just pointing the scheduler address to this tcp address 
# (All machines must be on the same network).
client = Client(processes=True)
client
<Client: scheduler='tcp://127.0.0.1:53592' processes=4 cores=4>

嗨Victor,我认为Martin的答案在技术上是正确的 - 区别不在于本地机器与集群,而在于工作进程(和调度程序)作为客户端进程中的线程运行。此外,我认为使用processes=False设置的集群也可以通过给定的地址连接并进行扩展/缩小,但我可能错了。但是你打印客户端对象的演示让我找到了正确的方向,非常感谢你。 - jrinker

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接