我有一组2000个已经训练好的随机回归树(使用scikit learn的Random Forest Regressor,其中
我的策略是将测试矩阵存储在内存中,并在所有进程之间共享。根据开发人员之一的声明,测试的内存需求为2*n_jobs*sizeof(X),而在我的情况下,另一个因素*4也很重要,因为RF内部将8位矩阵条目向上转换为float32。
按照数字来看,我认为测试需要:
14GB的内存来存储测试矩阵+50(=n_jobs)*20000(n_samples)*700(=n_features)*4(向上转型为float)*2字节=14 gb + 5.6 gb = ~21GB的内存。
n_estimators=1
)。在大型数据集(~100000*700000 = 70GB @ 8-bit)上使用multiprocessing
和共享内存并行训练树(50个核心)非常顺利。请注意,我在进行特征选择之前不使用RF的内置多核支持。
问题是:当并行测试一个大矩阵(~20000*700000)时,我总是会耗尽内存(我可以访问具有500 GB RAM的服务器)。我的策略是将测试矩阵存储在内存中,并在所有进程之间共享。根据开发人员之一的声明,测试的内存需求为2*n_jobs*sizeof(X),而在我的情况下,另一个因素*4也很重要,因为RF内部将8位矩阵条目向上转换为float32。
按照数字来看,我认为测试需要:
14GB的内存来存储测试矩阵+50(=n_jobs)*20000(n_samples)*700(=n_features)*4(向上转型为float)*2字节=14 gb + 5.6 gb = ~21GB的内存。
然而它总是爆炸到几百GB。 我错过了什么吗? (我正在使用最新版本的scikit-learn,所以旧的内存问题应该已经解决了)
一个观察结果:
在单个核心上运行时,测试的内存使用量在30到100 GB之间波动(由free
测量)
我的代码:
#----------------
#helper functions
def initializeRFtest(*args):
global df_test, pt_test #initialize test data and test labels as globals in shared memory
df_test, pt_test = args
def star_testTree(model_featidx):
return predTree(*model_featidx)
#end of helper functions
#-------------------
def RFtest(models, df_test, pt_test, features_idx, no_trees):
#test trees in parallel
ncores = 50
p = Pool(ncores, initializer=initializeRFtest, initargs=(df_test, pt_test))
args = itertools.izip(models, features_idx)
out_list = p.map(star_testTree, args)
p.close()
p.join()
return out_list
def predTree(model, feat_idx):
#get all indices of samples that meet feature subset requirement
nan_rows = np.unique(np.where(df_test.iloc[:,feat_idx] == settings.nan_enc)[0])
all_rows = np.arange(df_test.shape[0])
rows = all_rows[np.invert(np.in1d(all_rows, nan_rows))] #discard rows with missing values in the given features
#predict
pred = model.predict(df_test.iloc[rows,feat_idx])
return predicted
#main program
out = RFtest(models, df_test, pt_test, features_idx, no_trees)
编辑:
另一个观察结果:
当对测试数据进行分块处理时,程序运行顺畅,且内存使用量大为降低。这是我用来运行程序的方法。
更新后的predTree
函数代码片段:
def predTree(model, feat_idx):
# get all indices of samples that meet feature subset requirement
nan_rows = np.unique(np.where(test_df.iloc[:,feat_idx] == settings.nan_enc)[0])
all_rows = np.arange(test_df.shape[0])
rows = all_rows[np.invert(np.in1d(all_rows, nan_rows))] #discard rows with missing values in the given features
# predict height per valid sample
chunksize = 500
n_chunks = np.int(math.ceil(np.float(rows.shape[0])/chunksize))
pred = []
for i in range(n_chunks):
if n_chunks == 1:
pred_chunked = model.predict(test_df.iloc[rows[i*chunksize:], feat_idx])
pred.append(pred_chunked)
break
if i == n_chunks-1:
pred_chunked = model.predict(test_df.iloc[rows[i*chunksize:], feat_idx])
else:
pred_chunked = model.predict(test_df.iloc[rows[i*chunksize:(i+1)*chunksize], feat_idx])
print pred_chunked.shape
pred.append(pred_chunked)
pred = np.concatenate(pred)
# populate matrix
predicted = np.empty(test_df.shape[0])
predicted.fill(np.nan)
predicted[rows] = pred
return predicted