问题:
我想在以下两个数据集之间进行空间连接:
- 一个大的Spark数据框(500M行),其中包含点(例如道路上的点)
- 一个小的geojson文件(20000个形状),其中包含多边形(例如区域边界)。
到目前为止,我已经实现了如下代码,但是它运行缓慢(存在很多调度器延迟,可能是由于communes未广播造成的):
@pandas_udf(schema_out, PandasUDFType.GROUPED_MAP)
def join_communes(traces):
geometry = gpd.points_from_xy(traces['longitude'], traces['latitude'])
gdf_traces = gpd.GeoDataFrame(traces, geometry=geometry, crs = communes.crs)
joined_df = gpd.sjoin(gdf_traces, communes, how='left', op='within')
return joined_df[columns]
pandas_udf接受作为pandas dataframe的“points” dataframe(跟踪点),使用geopandas将其转换为GeoDataFrame,并与“polygons” GeoDataFrame进行空间连接(因此从Geopandas的Rtree join中受益)。
问题:
有没有方法可以使它更快?我知道我的“communes” geodataframe在Spark Driver的内存中,每个工作进程都必须在每次调用udf时下载它,这是正确的吗?
不过,我不知道如何直接将这个GeoDataFrame提供给工作进程(就像广播连接一样)
有什么想法吗?