为什么我在使用多进程时遇到了递归错误?

3

我想使用多进程对大量地址进行地理编码。这是我的代码:

import multiprocessing
import geocoder

addresses = ['New York City, NY','Austin, TX', 'Los Angeles, CA', 'Boston, MA'] # and on and on

def geocode_worker(address):
    return geocoder.arcgis(address)

def main_process():
    pool = multiprocessing.Pool(processes=multiprocessing.cpu_count())
    return pool.map(geocode_worker, addresses)

if __name__ == '__main__':
    main_process()

但是它给我返回了这个错误:

Traceback (most recent call last):
  File "/opt/anaconda3/lib/python3.7/threading.py", line 926, in _bootstrap_inner
    self.run()
  File "/opt/anaconda3/lib/python3.7/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "/opt/anaconda3/lib/python3.7/multiprocessing/pool.py", line 470, in _handle_results
    task = get()
  File "/opt/anaconda3/lib/python3.7/multiprocessing/connection.py", line 251, in recv
    return _ForkingPickler.loads(buf.getbuffer())
  File "/opt/anaconda3/lib/python3.7/site-packages/geocoder/base.py", line 599, in __getattr__
    if not self.ok:
  File "/opt/anaconda3/lib/python3.7/site-packages/geocoder/base.py", line 536, in ok
    return len(self) > 0
  File "/opt/anaconda3/lib/python3.7/site-packages/geocoder/base.py", line 422, in __len__
    return len(self._list)

错误的最后三行反复出现,然后回溯信息的最后一行是:
RecursionError: maximum recursion depth exceeded while calling a Python object

有人可以帮我弄清楚为什么吗?


2
错误的完整堆栈跟踪是什么? - Mark Tolonen
那这是否意味着地理编码器与多进程不兼容?或者我能否对地理编码过程进行一些处理以使其正常工作?当我尝试另一个地理编码器(geopy)时,我收到了“服务超时错误”。 - bms2202
1
这也意味着创建一个 [mcve]。 - Mark Tolonen
multiprocessing尝试在主进程中反序列化由geocoder.arcgis返回的结果时,问题就会发生。 geocoder中存在一个错误导致无限循环。 - dano
1
是的,问题在于 geocoder.arcgis 返回的对象无法被 pickle 序列化。您可以通过执行 pickle.loads(pickle.dumps(<geocoder.arcgs()返回的对象>)) 来重现它。最简单的解决方法是尽可能从返回的内容中提取所需的数据。 - dano
显示剩余3条评论
1个回答

4
问题是由于geocoder返回的ArcgisQuery对象无法进行pickle序列化,或者说它无法进行反序列化。反序列化过程中由于使用了__getattr__方法,导致进入了无限循环,该方法在内部尝试访问self.ok,最终依赖于定义了self._list。而在反序列化时,self._list没有被定义,因为它仅在__init__中定义,但是不会在反序列化时调用__init__。因此,在找不到self._list的情况下,它尝试使用__getattr__查找,再次尝试访问self.ok,从而创建一个无限循环。
您可以通过不直接传递ArcgisQuery对象本身而是传递其底层的__dict__来解决此问题,并在主进程中重新构建ArcgisQuery对象。
import multiprocessing
import geocoder
from geocoder.arcgis import ArcgisQuery

addresses = ['New York City, NY','Austin, TX', 'Los Angeles, CA', 'Boston, MA'] # and on and on

def geocode_worker(address):
    out = geocoder.arcgis(address)
    return out.__dict__ # Only return the object's __dict__

def main_process():
    pool = multiprocessing.Pool(processes=multiprocessing.cpu_count())
    l = pool.map(geocode_worker, addresses)
    out = []
    for d in l:
        q = ArcgisQuery(d['location'])  # location is a required constructor arg
        q.__dict__.update(d)  # Load the rest of our state into the new object
        out.append(q)
    return out

if __name__ == '__main__':
    print(main_process())

如果您不需要整个“ArcgisQuery”对象,只需要其中的一些部分,则可以从工作进程中返回它们,以避免需要使用此技巧。
值得注意的是,看起来“geocoder”可以通过在“ArcgisQuery”或其基类上实现“__getstate__”和“__setstate__”来解决它的pickling问题。例如:
    def __getstate__(self):
        return self.__dict__

    def __setstate__(self, state):
        self.__dict__.update(state)

非常感谢。我不需要整个对象,所以当我只调用其中的一部分时,它完美地工作了。 - bms2202

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接