Dask: 如何使用dask delayed并行化我的代码?

30

这是我首次涉足并行处理,我一直在研究Dask,但实际编写代码时遇到了问题。

我查看了他们的示例和文档,认为dask.delayed是最好的选择。我尝试使用delayed(function_name)来包装函数,或添加@delayed装饰器,但似乎无法正常工作。我更喜欢Dask而不是其他方法,因为它是用Python编写的,并且具有(应该的)简单性。我知道dask不能在for循环中运行,但他们说它可以在循环内部运行。

我的代码通过一个包含输入其他函数的函数来传递文件,如下所示:

from dask import delayed
filenames = ['1.csv', '2.csv', '3.csv', etc. etc. ]
for count, name in enumerate(filenames)"
    name = name.split('.')[0]
    ....

然后进行一些预处理,例如:

    preprocess1, preprocess2 = delayed(read_files_and_do_some_stuff)(name)

然后我调用一个构造函数,并将pre_results传递给函数调用:

    fc = FunctionCalls()
    Daily = delayed(fc.function_runs)(filename=name, stringinput='Daily',
                             input_data=pre_result1, model1=pre_result2)

我在这里做的是将文件传递到for循环中进行预处理,然后将文件传递到两个模型中。

有没有关于如何并行化此代码的想法或提示?我开始收到一些奇怪的错误,而我不知道如何修复代码。代码可以正常工作。我使用了许多pandas数据框、序列和numpy数组,并且我希望不必返回更改为dask.dataframes等的所有内容。

我的注释中的代码可能难以阅读。以下是格式化更好的代码。

在下面的代码中,当我键入print(mean_squared_error)时,我只得到:Delayed('mean_squared_error-3009ec00-7ff5-4865-8338-1fec3f9ed138')

from dask import delayed
import pandas as pd
from sklearn.metrics import mean_squared_error as mse
filenames = ['file1.csv']

for count, name in enumerate(filenames):
    file1 = pd.read_csv(name)
    df = pd.DataFrame(file1)
    prediction = df['Close'][:-1]
    observed = df['Close'][1:]
    mean_squared_error = delayed(mse)(observed, prediction)

2
如果您能够提供一个MCVE,您可能会得到更好的响应。 - MRocklin
谢谢。我已经删除了一些代码以更突出问题。如果有任何不清楚的地方,请告诉我。 - Monty
5
最好是提供一个最小化的失败示例,让他人也能够复现。你当前的问题是“我正在尝试这样做,但不起作用”。一个更好的问题应该是:“我完全按照以下几个步骤进行操作,它们足够复杂以展示问题,但也足够简单以便于您可以轻松地复制粘贴,并且在不阅读大量代码的情况下快速理解,我得到了如下的错误信息。” - MRocklin
将我的代码进一步简化真的没有用。我已经编辑过了,使它更容易理解。我试图进一步简化它,但似乎并没有抓住问题的核心。如果您愿意,我可以引导您到Github上查看。目前,在读取文件行时出现类型错误。TypeError: 未指定长度的延迟对象不可迭代 - Monty
我这里有一个示例代码(虽然对回答我的问题没有什么帮助...)from dask import delayed import pandas as pd from sklearn.metrics import mean_squared_error as mse filenames = ['file1.csv'] for count, name in enumerate(filenames):file1 = pd.read_csv(name) df = pd.DataFrame(file1) prediction = df['Close'][:-1] # second vec is the true values to compare observed = df['Close'][1:] mean_squared_error = delayed(mse)(observed, prediction) - Monty
2个回答

37
你需要调用dask.compute最终计算结果。请参见dask.delayed文档

顺序代码

import pandas as pd
from sklearn.metrics import mean_squared_error as mse
filenames = [...]

results = []
for count, name in enumerate(filenames):
    file1 = pd.read_csv(name)
    df = pd.DataFrame(file1)  # isn't this already a dataframe?
    prediction = df['Close'][:-1]
    observed = df['Close'][1:]
    mean_squared_error = mse(observed, prediction)  
    results.append(mean_squared_error)

并行代码

import dask
import pandas as pd
from sklearn.metrics import mean_squared_error as mse
filenames = [...]

delayed_results = []
for count, name in enumerate(filenames):
    df = dask.delayed(pd.read_csv)(name)
    prediction = df['Close'][:-1]
    observed = df['Close'][1:]
    mean_squared_error = dask.delayed(mse)(observed, prediction)
    delayed_results.append(mean_squared_error)

results = dask.compute(*delayed_results)

嗨@MRocklin,调用compute中的*delayed_results是做什么用的? - B_Miner
@B_Miner 我认为我们正在传递数组的地址,所以它会计算该地址之后的所有元素,直到耗尽。 - Asif Ali
@B_Miner Dask会列出要执行的任务清单。当您调用compute时,它会并行地执行这些任务。 - BND

18

在我看来,比起被采纳的答案,这段代码更加清晰易懂。

from dask import compute, delayed
import pandas as pd
from sklearn.metrics import mean_squared_error as mse
filenames = [...]

def compute_mse(file_name):
    df = pd.read_csv(file_name)
    prediction = df['Close'][:-1]
    observed = df['Close'][1:]
    return mse(observed, prediction)

delayed_results = [delayed(compute_mse)(file_name) for file_name in filenames]
mean_squared_errors = compute(*delayed_results, scheduler="processes")

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接