这是我首次涉足并行处理,我一直在研究Dask,但实际编写代码时遇到了问题。
我查看了他们的示例和文档,认为dask.delayed是最好的选择。我尝试使用delayed(function_name)来包装函数,或添加@delayed装饰器,但似乎无法正常工作。我更喜欢Dask而不是其他方法,因为它是用Python编写的,并且具有(应该的)简单性。我知道dask不能在for循环中运行,但他们说它可以在循环内部运行。
我的代码通过一个包含输入其他函数的函数来传递文件,如下所示:
from dask import delayed
filenames = ['1.csv', '2.csv', '3.csv', etc. etc. ]
for count, name in enumerate(filenames)"
name = name.split('.')[0]
....
然后进行一些预处理,例如:
preprocess1, preprocess2 = delayed(read_files_and_do_some_stuff)(name)
然后我调用一个构造函数,并将pre_results传递给函数调用:
fc = FunctionCalls()
Daily = delayed(fc.function_runs)(filename=name, stringinput='Daily',
input_data=pre_result1, model1=pre_result2)
我在这里做的是将文件传递到for循环中进行预处理,然后将文件传递到两个模型中。
有没有关于如何并行化此代码的想法或提示?我开始收到一些奇怪的错误,而我不知道如何修复代码。代码可以正常工作。我使用了许多pandas数据框、序列和numpy数组,并且我希望不必返回更改为dask.dataframes等的所有内容。
我的注释中的代码可能难以阅读。以下是格式化更好的代码。
在下面的代码中,当我键入print(mean_squared_error)时,我只得到:Delayed('mean_squared_error-3009ec00-7ff5-4865-8338-1fec3f9ed138')
from dask import delayed
import pandas as pd
from sklearn.metrics import mean_squared_error as mse
filenames = ['file1.csv']
for count, name in enumerate(filenames):
file1 = pd.read_csv(name)
df = pd.DataFrame(file1)
prediction = df['Close'][:-1]
observed = df['Close'][1:]
mean_squared_error = delayed(mse)(observed, prediction)
TypeError: 未指定长度的延迟对象不可迭代
- Montyfile1 = pd.read_csv(name) df = pd.DataFrame(file1) prediction = df['Close'][:-1] # second vec is the true values to compare observed = df['Close'][1:] mean_squared_error = delayed(mse)(observed, prediction)
- Monty