使用PySpark数据帧和多边形(GeoPandas)进行空间连接

9
问题:

我想在以下两个数据集之间进行空间连接:

  • 一个大的Spark数据框(500M行),其中包含(例如道路上的点)
  • 一个小的geojson文件(20000个形状),其中包含多边形(例如区域边界)。

到目前为止,我已经实现了如下代码,但是它运行缓慢(存在很多调度器延迟,可能是由于communes未广播造成的):

@pandas_udf(schema_out, PandasUDFType.GROUPED_MAP)
def join_communes(traces):   
    geometry = gpd.points_from_xy(traces['longitude'], traces['latitude'])
    gdf_traces = gpd.GeoDataFrame(traces, geometry=geometry, crs = communes.crs)
    joined_df = gpd.sjoin(gdf_traces, communes, how='left', op='within')
    return joined_df[columns]

pandas_udf接受作为pandas dataframe的“points” dataframe(跟踪点),使用geopandas将其转换为GeoDataFrame,并与“polygons” GeoDataFrame进行空间连接(因此从Geopandas的Rtree join中受益)。

问题:

有没有方法可以使它更快?我知道我的“communes” geodataframe在Spark Driver的内存中,每个工作进程都必须在每次调用udf时下载它,这是正确的吗?

不过,我不知道如何直接将这个GeoDataFrame提供给工作进程(就像广播连接一样)

有什么想法吗?


2
你已经广播了communes吗?你应该广播communes,然后使用communes.value访问json。 - ndricca
1
那就是我最终做的。 - Luis Blanche
1个回答

7
一年后,这是我最终采取的做法,正如@ndricca所建议的那样,诀窍在于广播社区,但你不能直接广播一个GeoDataFrame,因此你必须将其作为Spark DataFrame加载,然后在广播之前将其转换为JSON。然后你可以使用shapely.wkt(一种将几何对象编码为文本的方式)在UDF中重新构建GeoDataFrame
另一个技巧是在groupby中使用盐来确保数据在集群中平均分配。
import geopandas as gpd
from shapely import wkt
from pyspark.sql.functions import broadcast
communes = gpd.load_file('...communes.geojson')
# Use a previously created spark session
traces= spark_session.read_csv('trajectoires.csv')
communes_spark = spark.createDataFrame(communes[['insee_comm', 'wkt']])
communes_json = provinces_spark.toJSON().collect()
communes_bc = spark.sparkContext.broadcast(communes_json)

@pandas_udf(schema_out, PandasUDFType.GROUPED_MAP)
def join_communes_bc(traces):
    communes = pd.DataFrame.from_records([json.loads(c) for c in communes_bc.value])
    polygons = [wkt.loads(w) for w in communes['wkt']]
    gdf_communes = gpd.GeoDataFrame(communes, geometry=polygons, crs=crs )
    geometry = gpd.points_from_xy(traces['longitude'], traces['latitude'])
    gdf_traces = gpd.GeoDataFrame(traces , geometry=geometry, crs=crs)
    joined_df = gpd.sjoin(gdf_traces, gdf_communes, how='left', op='within')
    return joined_df[columns]
    

traces = traces.groupby(salt).apply(join_communes_bc)

你好,我正在尝试实现相同的函数,但是在使用pyarrow时遇到了一些问题。我按照网上建议的解决方案(将pyarrow降级到0.14.1并添加环境变量:ARROW_PRE_0_15_IPC_FORMAT = 1),但我始终会遇到与pyarrow相关的错误。你是否遇到过类似的错误? pyarrow.lib.ArrowInvalid: 输入对象不是NumPy数组。 - infinity911
没关系,我发现解决方案是永远不要返回wkt列。以某种方式或另一种方式会导致这个错误。 - infinity911
@infinity911 你需要将几何列转换为WKT格式。joined_df['geometry'] = joined_df['geometry'].to_wkt()。 - code_bug

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接