RAPIDs多GPU Kmeans聚类卡死问题

4
我刚开始接触Python和Rapids.AI,尝试在多节点GPU(我有两个GPU)中使用Dask和RAPIDs重新创建SKLearn KMeans。我正在使用装有Jupyter Notebook的RAPIDs docker。下面展示的代码(同时也展示了Iris数据集的例子)会卡死,jupyter notebook单元格永远不会结束。我尝试使用%debug魔术键和Dask仪表板,但没有得出任何清晰的结论(唯一的结论是可能与device_m_csv.iloc有关,但我不确定)。另一件可能发生的事情是我忘记了一些wait()、compute()或persistent()(实际上,我不确定它们应该在哪些场合正确使用)。为了更好地阅读,我将解释代码:
  • 首先,进行所需的导入
  • 接下来,使用KMeans算法开始(分隔符:#######################...)
  • 创建一个CUDA集群,每个GPU有2个工作进程(我有2个GPU),每个工作进程有1个线程(我已经阅读过这是推荐值),并启动客户端
  • 从CSV读取数据集,将其分成2个分区(chunksize = '2kb'
  • 将前面的数据集分成数据(更常称为X)和标签(更常称为y
  • 使用Dask实例化cu_KMeans
  • 拟合模型
  • 预测值
  • 检查获得的分数

很抱歉无法提供更多数据,但我无法获取它。任何解决疑问所需的信息,我都会乐意提供。

您认为问题出在哪里或是什么问题呢?

非常感谢您提前的帮助。

%%time

# Import libraries and show its versions
import numpy as np; print('NumPy Version:', np.__version__)
import pandas as pd; print('Pandas Version:', pd.__version__)
import sklearn; print('Scikit-Learn Version:', sklearn.__version__)
import nvstrings, nvcategory
import cupy; print('cuPY Version:', cupy.__version__)
import cudf; print('cuDF Version:', cudf.__version__)
import cuml; print('cuML Version:', cuml.__version__)
import dask; print('Dask Version:', dask.__version__)
import dask_cuda; print('DaskCuda Version:', dask_cuda.__version__)
import dask_cudf; print('DaskCuDF Version:', dask_cudf.__version__)
import matplotlib; print('MatPlotLib Version:', matplotlib.__version__)
import seaborn as sns; print('SeaBorn Version:', sns.__version__)
#import timeimport warnings

from dask import delayed
import dask.dataframe as dd
from dask.distributed import Client, LocalCluster, wait
from dask_ml.cluster import KMeans as skmKMeans
from dask_cuda import LocalCUDACluster

from sklearn import metrics
from sklearn.cluster import KMeans as skKMeans
from sklearn.metrics import adjusted_rand_score as sk_adjusted_rand_score, silhouette_score as sk_silhouette_score
from cuml.cluster import KMeans as cuKMeans
from cuml.dask.cluster.kmeans import KMeans as cumKMeans
from cuml.metrics import adjusted_rand_score as cu_adjusted_rand_score

# Configure matplotlib library
import matplotlib.pyplot as plt
%matplotlib inline

# Configure seaborn library
sns.set()
#sns.set(style="white", color_codes=True)
%config InlineBackend.figure_format = 'svg'

# Configure warnings
#warnings.filterwarnings("ignore")


####################################### KMEANS #############################################################
# Create local cluster
cluster = LocalCUDACluster(n_workers=2, threads_per_worker=1)
client = Client(cluster)

# Identify number of workers
n_workers = len(client.has_what().keys())

# Read data in host memory
device_m_csv = dask_cudf.read_csv('./DataSet/iris.csv', header = 0, delimiter = ',', chunksize='2kB') # Get complete CSV. Chunksize is 2kb for getting 2 partitions
#x = host_data.iloc[:, [0,1,2,3]].values
device_m_data = device_m_csv.iloc[:, [0, 1, 2, 3]] # Get data columns
device_m_labels = device_m_csv.iloc[:, 4] # Get labels column

# Plot data
#sns.pairplot(device_csv.to_pandas(), hue='variety');

# Define variables
label_type = { 'Setosa': 1, 'Versicolor': 2, 'Virginica': 3 } # Dictionary of variables type

# Create KMeans
cu_m_kmeans = cumKMeans(init = 'k-means||',
                     n_clusters = len(device_m_labels.unique()),
                     oversampling_factor = 40,
                     random_state = 0)
# Fit data in KMeans
cu_m_kmeans.fit(device_m_data)

# Predict data
cu_m_kmeans_labels_predicted = cu_m_kmeans.predict(device_m_data).compute()

# Check score
#print('Cluster centers:\n',cu_m_kmeans.cluster_centers_)
#print('adjusted_rand_score: ', sk_adjusted_rand_score(device_m_labels, cu_m_kmeans.labels_))
#print('silhouette_score: ', sk_silhouette_score(device_m_data.to_pandas(), cu_m_kmeans_labels_predicted))

# Close local cluster
client.close()
cluster.close()

鸢尾花数据集示例:

IrisDatasetExample


编辑 1

@Corey,这是使用您的代码输出的结果:

NumPy Version: 1.17.5
Pandas Version: 0.25.3
Scikit-Learn Version: 0.22.1
cuPY Version: 6.7.0
cuDF Version: 0.12.0
cuML Version: 0.12.0
Dask Version: 2.10.1
DaskCuda Version: 0+unknown
DaskCuDF Version: 0.12.0
MatPlotLib Version: 3.1.3
SeaBorn Version: 0.10.0
Cluster centers:
           0         1         2         3
0  5.006000  3.428000  1.462000  0.246000
1  5.901613  2.748387  4.393548  1.433871
2  6.850000  3.073684  5.742105  2.071053
adjusted_rand_score:  0.7302382722834697
silhouette_score:  0.5528190123564102
2个回答

3

我稍微修改了您可重复的示例,并能够在最新版本的RAPIDS中生成输出。

这是脚本的输出结果。

(cuml_dev_2) cjnolet@deeplearn ~ $ python ~/kmeans_mnmg_reproduce.py 
NumPy Version: 1.18.1
Pandas Version: 0.25.3
Scikit-Learn Version: 0.22.2.post1
cuPY Version: 7.2.0
cuDF Version: 0.13.0a+3237.g61e4d9c
cuML Version: 0.13.0a+891.g4f44f7f
Dask Version: 2.11.0+28.g10db6ba
DaskCuda Version: 0+unknown
DaskCuDF Version: 0.13.0a+3237.g61e4d9c
MatPlotLib Version: 3.2.0
SeaBorn Version: 0.10.0
/share/software/miniconda3/envs/cuml_dev_2/lib/python3.7/site-packages/dask/array/random.py:27: FutureWarning: dask.array.random.doc_wraps is deprecated and will be removed in a future version
  FutureWarning,
/share/software/miniconda3/envs/cuml_dev_2/lib/python3.7/site-packages/distributed/dashboard/core.py:79: UserWarning: 
Port 8787 is already in use. 
Perhaps you already have a cluster running?
Hosting the diagnostics dashboard on a random port instead.
  warnings.warn("\n" + msg)
bokeh.server.util - WARNING - Host wildcard '*' will allow connections originating from multiple (or possibly all) hostnames or IPs. Use non-wildcard values to restrict access explicitly
Cluster centers:
           0         1         2         3
0  5.883607  2.740984  4.388525  1.434426
1  5.006000  3.428000  1.462000  0.246000
2  6.853846  3.076923  5.715385  2.053846
adjusted_rand_score:  0.7163421126838475
silhouette_score:  0.5511916046195927

这里是产生此输出的修改后脚本:
    # Import libraries and show its versions
    import numpy as np; print('NumPy Version:', np.__version__)
    import pandas as pd; print('Pandas Version:', pd.__version__)
    import sklearn; print('Scikit-Learn Version:', sklearn.__version__)
    import nvstrings, nvcategory
    import cupy; print('cuPY Version:', cupy.__version__)
    import cudf; print('cuDF Version:', cudf.__version__)
    import cuml; print('cuML Version:', cuml.__version__)
    import dask; print('Dask Version:', dask.__version__)
    import dask_cuda; print('DaskCuda Version:', dask_cuda.__version__)
    import dask_cudf; print('DaskCuDF Version:', dask_cudf.__version__)
    import matplotlib; print('MatPlotLib Version:', matplotlib.__version__)
    import seaborn as sns; print('SeaBorn Version:', sns.__version__)
    #import timeimport warnings

    from dask import delayed
    import dask.dataframe as dd
    from dask.distributed import Client, LocalCluster, wait
    from dask_ml.cluster import KMeans as skmKMeans
    from dask_cuda import LocalCUDACluster

    from sklearn import metrics
    from sklearn.cluster import KMeans as skKMeans
    from sklearn.metrics import adjusted_rand_score as sk_adjusted_rand_score, silhouette_score as sk_silhouette_score
    from cuml.cluster import KMeans as cuKMeans
    from cuml.dask.cluster.kmeans import KMeans as cumKMeans
    from cuml.metrics import adjusted_rand_score as cu_adjusted_rand_score
    # Configure matplotlib library
    import matplotlib.pyplot as plt

    # Configure seaborn library
    sns.set()
    #sns.set(style="white", color_codes=True)
    # Configure warnings
    #warnings.filterwarnings("ignore")


    ####################################### KMEANS #############################################################
    # Create local cluster
    cluster = LocalCUDACluster(n_workers=2, threads_per_worker=1)
    client = Client(cluster)

    # Identify number of workers
    n_workers = len(client.has_what().keys())

    # Read data in host memory
    from sklearn.datasets import load_iris

    loader = load_iris()

    #x = host_data.iloc[:, [0,1,2,3]].values
    device_m_data = dask_cudf.from_cudf(cudf.from_pandas(pd.DataFrame(loader.data)), npartitions=2) # Get data columns
    device_m_labels = dask_cudf.from_cudf(cudf.from_pandas(pd.DataFrame(loader.target)), npartitions=2)

    # Plot data
    #sns.pairplot(device_csv.to_pandas(), hue='variety');

    # Define variables
    label_type = { 'Setosa': 1, 'Versicolor': 2, 'Virginica': 3 } # Dictionary of variables type

    # Create KMeans
    cu_m_kmeans = cumKMeans(init = 'k-means||',
                     n_clusters = len(np.unique(loader.target)),
                     oversampling_factor = 40,
                     random_state = 0)
    # Fit data in KMeans
    cu_m_kmeans.fit(device_m_data)

    # Predict data
    cu_m_kmeans_labels_predicted = cu_m_kmeans.predict(device_m_data).compute()

    # Check score
    print('Cluster centers:\n',cu_m_kmeans.cluster_centers_)
    print('adjusted_rand_score: ', sk_adjusted_rand_score(loader.target, cu_m_kmeans_labels_predicted.values.get()))
    print('silhouette_score: ', sk_silhouette_score(device_m_data.compute().to_pandas(), cu_m_kmeans_labels_predicted))

    # Close local cluster
    client.close()
    cluster.close()

请提供这些库的版本输出结果。我建议您运行修改后的脚本,并查看是否能够成功运行。如果无法成功运行,我们可以进一步深入挖掘,找出是与Docker相关、RAPIDS版本相关还是其他原因有关。
如果您可以访问运行Jupyter笔记本的命令提示符,建议在构建KMeans对象时传入verbose=True参数以启用日志记录。这可以帮助我们分离出卡住的位置。

谢谢你,Corey,你的代码很好用!我们得到了不同的聚类中心顺序,但这是正常的。所以,我能看出来,问题是在对dask_cuda数据框进行iloc操作,对吧?另外,如果不麻烦的话...你能给我解释一下何时使用.compute()wait().persistent()吗?我已经阅读了相关资料,但我不确定什么时候该使用它们或者不使用。例如,在“# Predict data”中使用了compute()(根据文档),但我不明白为什么要在那里使用。我对这三个概念不太清楚。再次感谢你。 - JuMoGar
1
我在下面写了一个完整的答案,因为 stack overflow 没有给我足够的字符来在评论中完全解释清楚。简而言之,你需要注意你正在执行的操作是惰性的还是急切的。调用 persist 将使一个惰性(延迟)执行变成急切执行(未来),从而实现材料化。只有在未来上才应该等待,因为惰性对象直到被转换为未来才会材料化。Compute 导致结果材料化,但也将它们传输到客户端。 - Corey J. Nolet
非常感谢@Corey J. Nolet的回答。我刚刚阅读了它,非常有帮助 :) - JuMoGar

1
Dask文档非常好且内容广泛,虽然我承认有时它提供的灵活性和功能数量可能会令人感到有些不知所措。我认为将Dask视为分布式计算的API可以帮助用户控制一些不同层次的执行,每个层次都提供更细粒度的控制。 compute()wait()persist()是从支持一系列分布式计算的任务调度方式中产生的概念。所有这些计算共同点是一个表示远程任务及其相互依赖关系的执行图。在某个时刻,该执行图被调度在一组工作进程上。Dask提供了两个API,具体取决于图底层的任务是立即(急切地)还是需要手动触发(惰性)。
这两个API都是在创建依赖于其他任务结果的任务时构建执行图。前者使用dask.futures API进行即时异步执行,有时需要在进行其他操作之前等待wait()结果。而dask.delayed API用于惰性执行,并需要调用compute()persist()等方法才能开始计算。
通常,像RAPIDS这样的库的用户更关心如何操作他们的数据,而不太关心如何在一组工作程序上安排这些操作。dask.dataframedask.array对象是基于delayedfutures API构建的。大多数用户与这些数据结构交互而不是与delayedfutures对象交互,但如果您需要在分布式dataframearray对象所提供的范围之外进行一些数据转换,则了解它们并不是一个坏主意。

dask.dataframedask.array 都会构建惰性执行图,只要可能就会提供一个 compute() 方法来使图形成物化并将结果返回给客户端。它们都还提供了一个 persist() 方法来在后台异步启动计算。如果您想在后台开始计算但不想将结果返回给客户端,则可以使用 wait()

希望这对您有所帮助。


非常感谢您,@Corey J. Nolet。您的解释非常有帮助。我非常感激您花费时间撰写这篇回复。现在我更好地理解了Dask的概念。再次感谢您 :) - JuMoGar

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接