Spacy与Joblib库一起使用生成_pickle.PicklingError:无法将任务腌制以发送给工作人员。

23

我有一个庞大的句子列表(约700万),想从中提取名词。

我使用了joblib库来并行化提取过程,如下所示:

import spacy
from tqdm import tqdm
from joblib import Parallel, delayed
nlp = spacy.load('en_core_web_sm')

class nouns:

    def get_nouns(self, text):
        doc = nlp(u"{}".format(text))
        return [token.text for token in doc if token.tag_ in ['NN', 'NNP', 'NNS', 'NNPS']]

    def parallelize(self, sentences):
        results = Parallel(n_jobs=1)(delayed(self.get_nouns)(sent) for sent in tqdm(sentences))
        return results

if __name__ == '__main__':
    sentences = ['we went to the school yesterday',
                 'The weather is really cold',
                 'Can we catch the dog?',
                 'How old are you John?',
                 'I like diving and swimming',
                 'Can the world become united?']
    obj = nouns()
    print(obj.parallelize(sentences))

当 `parallelize` 函数中的 `n_jobs` 大于1时,我会收到这个很长的错误:

100%|██████████| 6/6 [00:00<00:00, 200.00it/s]
joblib.externals.loky.process_executor._RemoteTraceback: 
"""
Traceback (most recent call last):
  File "C:\Python35\lib\site-packages\joblib\externals\loky\backend\queues.py", line 150, in _feed
    obj_ = dumps(obj, reducers=reducers)
  File "C:\Python35\lib\site-packages\joblib\externals\loky\backend\reduction.py", line 243, in dumps
    dump(obj, buf, reducers=reducers, protocol=protocol)
  File "C:\Python35\lib\site-packages\joblib\externals\loky\backend\reduction.py", line 236, in dump
    _LokyPickler(file, reducers=reducers, protocol=protocol).dump(obj)
  File "C:\Python35\lib\site-packages\joblib\externals\cloudpickle\cloudpickle.py", line 267, in dump
    return Pickler.dump(self, obj)
  File "C:\Python35\lib\pickle.py", line 408, in dump
    self.save(obj)
  File "C:\Python35\lib\pickle.py", line 520, in save
    self.save_reduce(obj=obj, *rv)
  File "C:\Python35\lib\pickle.py", line 623, in save_reduce
    save(state)
  File "C:\Python35\lib\pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Python35\lib\pickle.py", line 810, in save_dict
    self._batch_setitems(obj.items())
  File "C:\Python35\lib\pickle.py", line 836, in _batch_setitems
    save(v)
  File "C:\Python35\lib\pickle.py", line 520, in save
    self.save_reduce(obj=obj, *rv)
  File "C:\Python35\lib\pickle.py", line 623, in save_reduce
    save(state)
  File "C:\Python35\lib\pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Python35\lib\pickle.py", line 810, in save_dict
    self._batch_setitems(obj.items())
  File "C:\Python35\lib\pickle.py", line 841, in _batch_setitems
    save(v)
  File "C:\Python35\lib\pickle.py", line 520, in save
    self.save_reduce(obj=obj, *rv)
  File "C:\Python35\lib\pickle.py", line 623, in save_reduce
    save(state)
  File "C:\Python35\lib\pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Python35\lib\pickle.py", line 810, in save_dict
    self._batch_setitems(obj.items())
  File "C:\Python35\lib\pickle.py", line 836, in _batch_setitems
    save(v)
  File "C:\Python35\lib\pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Python35\lib\pickle.py", line 770, in save_list
    self._batch_appends(obj)
  File "C:\Python35\lib\pickle.py", line 797, in _batch_appends
    save(tmp[0])
  File "C:\Python35\lib\pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Python35\lib\pickle.py", line 725, in save_tuple
    save(element)
  File "C:\Python35\lib\pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Python35\lib\site-packages\joblib\externals\cloudpickle\cloudpickle.py", line 718, in save_instancemethod
    self.save_reduce(types.MethodType, (obj.__func__, obj.__self__), obj=obj)
  File "C:\Python35\lib\pickle.py", line 599, in save_reduce
    save(args)
  File "C:\Python35\lib\pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Python35\lib\pickle.py", line 725, in save_tuple
    save(element)
  File "C:\Python35\lib\pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Python35\lib\site-packages\joblib\externals\cloudpickle\cloudpickle.py", line 395, in save_function
    self.save_function_tuple(obj)
  File "C:\Python35\lib\site-packages\joblib\externals\cloudpickle\cloudpickle.py", line 594, in save_function_tuple
    save(state)
  File "C:\Python35\lib\pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Python35\lib\pickle.py", line 810, in save_dict
    self._batch_setitems(obj.items())
  File "C:\Python35\lib\pickle.py", line 836, in _batch_setitems
    save(v)
  File "C:\Python35\lib\pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Python35\lib\pickle.py", line 810, in save_dict
    self._batch_setitems(obj.items())
  File "C:\Python35\lib\pickle.py", line 841, in _batch_setitems
    save(v)
  File "C:\Python35\lib\pickle.py", line 520, in save
    self.save_reduce(obj=obj, *rv)
  File "C:\Python35\lib\pickle.py", line 623, in save_reduce
    save(state)
  File "C:\Python35\lib\pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Python35\lib\pickle.py", line 810, in save_dict
    self._batch_setitems(obj.items())
  File "C:\Python35\lib\pickle.py", line 836, in _batch_setitems
    save(v)
  File "C:\Python35\lib\pickle.py", line 520, in save
    self.save_reduce(obj=obj, *rv)
  File "C:\Python35\lib\pickle.py", line 599, in save_reduce
    save(args)
  File "C:\Python35\lib\pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Python35\lib\pickle.py", line 740, in save_tuple
    save(element)
  File "C:\Python35\lib\pickle.py", line 520, in save
    self.save_reduce(obj=obj, *rv)
  File "C:\Python35\lib\pickle.py", line 623, in save_reduce
    save(state)
  File "C:\Python35\lib\pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Python35\lib\pickle.py", line 740, in save_tuple
    save(element)
  File "C:\Python35\lib\pickle.py", line 495, in save
    rv = reduce(self.proto)
  File "stringsource", line 2, in preshed.maps.PreshMap.__reduce_cython__
TypeError: self.c_map cannot be converted to a Python object for pickling
"""Exception in thread QueueFeederThread:
Traceback (most recent call last):
  File "C:\Python35\lib\site-packages\joblib\externals\loky\backend\queues.py", line 150, in _feed
    obj_ = dumps(obj, reducers=reducers)
  File "C:\Python35\lib\site-packages\joblib\externals\loky\backend\reduction.py", line 243, in dumps
    dump(obj, buf, reducers=reducers, protocol=protocol)
  File "C:\Python35\lib\site-packages\joblib\externals\loky\backend\reduction.py", line 236, in dump
    _LokyPickler(file, reducers=reducers, protocol=protocol).dump(obj)
  File "C:\Python35\lib\site-packages\joblib\externals\cloudpickle\cloudpickle.py", line 267, in dump
    return Pickler.dump(self, obj)
  File "C:\Python35\lib\pickle.py", line 408, in dump
    self.save(obj)
  File "C:\Python35\lib\pickle.py", line 520, in save
    self.save_reduce(obj=obj, *rv)
  File "C:\Python35\lib\pickle.py", line 623, in save_reduce
    save(state)
  File "C:\Python35\lib\pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Python35\lib\pickle.py", line 810, in save_dict
    self._batch_setitems(obj.items())
  File "C:\Python35\lib\pickle.py", line 836, in _batch_setitems
    save(v)
  File "C:\Python35\lib\pickle.py", line 520, in save
    self.save_reduce(obj=obj, *rv)
  File "C:\Python35\lib\pickle.py", line 623, in save_reduce
    save(state)
  File "C:\Python35\lib\pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Python35\lib\pickle.py", line 810, in save_dict
    self._batch_setitems(obj.items())
  File "C:\Python35\lib\pickle.py", line 841, in _batch_setitems
    save(v)
  File "C:\Python35\lib\pickle.py", line 520, in save
    self.save_reduce(obj=obj, *rv)
  File "C:\Python35\lib\pickle.py", line 623, in save_reduce
    save(state)
  File "C:\Python35\lib\pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Python35\lib\pickle.py", line 810, in save_dict
    self._batch_setitems(obj.items())
  File "C:\Python35\lib\pickle.py", line 836, in _batch_setitems
    save(v)
  File "C:\Python35\lib\pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Python35\lib\pickle.py", line 770, in save_list
    self._batch_appends(obj)
  File "C:\Python35\lib\pickle.py", line 797, in _batch_appends
    save(tmp[0])
  File "C:\Python35\lib\pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Python35\lib\pickle.py", line 725, in save_tuple
    save(element)
  File "C:\Python35\lib\pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Python35\lib\site-packages\joblib\externals\cloudpickle\cloudpickle.py", line 718, in save_instancemethod
    self.save_reduce(types.MethodType, (obj.__func__, obj.__self__), obj=obj)
  File "C:\Python35\lib\pickle.py", line 599, in save_reduce
    save(args)
  File "C:\Python35\lib\pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Python35\lib\pickle.py", line 725, in save_tuple
    save(element)
  File "C:\Python35\lib\pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Python35\lib\site-packages\joblib\externals\cloudpickle\cloudpickle.py", line 395, in save_function
    self.save_function_tuple(obj)
  File "C:\Python35\lib\site-packages\joblib\externals\cloudpickle\cloudpickle.py", line 594, in save_function_tuple
    save(state)
  File "C:\Python35\lib\pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Python35\lib\pickle.py", line 810, in save_dict
    self._batch_setitems(obj.items())
  File "C:\Python35\lib\pickle.py", line 836, in _batch_setitems
    save(v)
  File "C:\Python35\lib\pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Python35\lib\pickle.py", line 810, in save_dict
    self._batch_setitems(obj.items())
  File "C:\Python35\lib\pickle.py", line 841, in _batch_setitems
    save(v)
  File "C:\Python35\lib\pickle.py", line 520, in save
    self.save_reduce(obj=obj, *rv)
  File "C:\Python35\lib\pickle.py", line 623, in save_reduce
    save(state)
  File "C:\Python35\lib\pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Python35\lib\pickle.py", line 810, in save_dict
    self._batch_setitems(obj.items())
  File "C:\Python35\lib\pickle.py", line 836, in _batch_setitems
    save(v)
  File "C:\Python35\lib\pickle.py", line 520, in save
    self.save_reduce(obj=obj, *rv)
  File "C:\Python35\lib\pickle.py", line 599, in save_reduce
    save(args)
  File "C:\Python35\lib\pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Python35\lib\pickle.py", line 740, in save_tuple
    save(element)
  File "C:\Python35\lib\pickle.py", line 520, in save
    self.save_reduce(obj=obj, *rv)
  File "C:\Python35\lib\pickle.py", line 623, in save_reduce
    save(state)
  File "C:\Python35\lib\pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Python35\lib\pickle.py", line 740, in save_tuple
    save(element)
  File "C:\Python35\lib\pickle.py", line 495, in save
    rv = reduce(self.proto)
  File "stringsource", line 2, in preshed.maps.PreshMap.__reduce_cython__
TypeError: self.c_map cannot be converted to a Python object for pickling

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "C:\Python35\lib\threading.py", line 914, in _bootstrap_inner
    self.run()
  File "C:\Python35\lib\threading.py", line 862, in run
    self._target(*self._args, **self._kwargs)
  File "C:\Python35\lib\site-packages\joblib\externals\loky\backend\queues.py", line 175, in _feed
    onerror(e, obj)
  File "C:\Python35\lib\site-packages\joblib\externals\loky\process_executor.py", line 310, in _on_queue_feeder_error
    self.thread_wakeup.wakeup()
  File "C:\Python35\lib\site-packages\joblib\externals\loky\process_executor.py", line 155, in wakeup
    self._writer.send_bytes(b"")
  File "C:\Python35\lib\multiprocessing\connection.py", line 183, in send_bytes
    self._check_closed()
  File "C:\Python35\lib\multiprocessing\connection.py", line 136, in _check_closed
    raise OSError("handle is closed")
OSError: handle is closed



The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File ".../playground.py", line 43, in <module>
    print(obj.Paralize(sentences))
  File ".../playground.py", line 32, in Paralize
    results = Parallel(n_jobs=2)(delayed(self.get_nouns)(sent) for sent in tqdm(sentences))
  File "C:\Python35\lib\site-packages\joblib\parallel.py", line 934, in __call__
    self.retrieve()
  File "C:\Python35\lib\site-packages\joblib\parallel.py", line 833, in retrieve
    self._output.extend(job.get(timeout=self.timeout))
  File "C:\Python35\lib\site-packages\joblib\_parallel_backends.py", line 521, in wrap_future_result
    return future.result(timeout=timeout)
  File "C:\Python35\lib\concurrent\futures\_base.py", line 405, in result
    return self.__get_result()
  File "C:\Python35\lib\concurrent\futures\_base.py", line 357, in __get_result
    raise self._exception
_pickle.PicklingError: Could not pickle the task to send it to the workers.

我的代码有什么问题?


我在下面添加了一个替代解决方案,以并行化所提到的Spacy过程。 - Minions
如果 dill 可以用于并行化代码,那么使用 multiprocess(即使用 dillmultiprocessing) 是否也是合适的呢? - Mike McKerns
7个回答

22

同样的问题。我通过将后端从loky更改为threadingParallel中解决了这个问题。


2
谢谢你的评论,它对我很有帮助 :) - Stepan K.
2
使用threading后端有什么注意事项吗?会有性能损失吗? - liang
1
好问题。虽然我不是joblib专家,但我在sklearn找到了这个(它在后台使用joblib):“建议使用‘loky’来运行操作Python对象的函数。‘threading’是一个低开销的替代方案,最适合释放全局解释器锁定的功能:例如I/O绑定代码或CPU绑定代码中仅涉及少量调用显式释放GIL的本地代码。” - Tommaso Di Noto
2
Parallel()(delayed(sqrt)(i ** 2) for i in range(10))``` - ajilesh
@liang <code>joblib</code>的默认行为是使用<code>multiprocessing</code>,而<code>prefer="threading"</code>是使用不同线程的提示:https://joblib.readthedocs.io/en/latest/parallel.html#thread-based-parallelism-vs-process-based-parallelism - undefined

13

如果您仍然无法找到解决方案。

我通过更改解决了错误。

Parallel(n_jobs=8)

为了

Parallel(n_jobs=8, prefer="threads")

事情进行得非常顺利 - chilifan
工作中!谢谢。 - Yirmi Oppenhime

12

问:我的代码有什么问题?

很可能问题不是出在代码上,而是来自“隐藏”的处理过程。一旦n_jobs指定(并且joblib内部编排)准备好了许多与主进程完全相同的副本,使它们各自独立地工作(有效地从GIL锁定中逃脱,并将多个进程流映射到物理硬件资源上)。

这一步负责复制所有Python对象,并使用Pickle进行此操作。众所周知,Pickle模块对可以进行pickling和不能进行pickling的内容有历史上的主要限制。

错误消息证实了这一点:

TypeError: self.c_map cannot be converted to a Python object for pickling

您可以尝试一个小技巧,用Mike McKearns的dill模块代替Pickle并测试一下,看看您的“有问题”的Python对象能否使用此模块进行pickling而不抛出此错误。

dill具有相同的API签名,因此纯粹的import dill as pickle可能有助于保留所有其他代码。

我曾经遇到类似的问题,需要将大型模型分布式地传输到多个进程中并返回,使用 dill 就是一种解决方法。此外,性能也得到了提高。

额外福利:使用 dill 可以保存 / 恢复完整的 Python 解释器状态!

这是发现 dill 的一个很酷的副作用,只需执行 import dill as pickle ,然后执行命令 pickle.dump_session(<aFile>) 即可保存完整的 Python 解释器会话状态的副本。如果需要还原(例如在崩溃后恢复、完全保存和恢复训练过的机器学习模型状态,或者完全保存增量学习机器学习模型状态并重新分发至部署用户群体进行远程还原等情况),也可以使用该状态。


感谢您的简要说明。我尝试导入dill as pickle,它确实可以工作,但性能并没有提高。我认为我必须找到另一种并行处理的方法。 - Minions
18
在哪里添加import dill as pickle语句?我尝试在from joblib import Parallel, delayed之前和之后都加了,但仍然出现相同的错误_pickle.PicklingError - Muhammad Adeel Zahid
2
@user_007 请问您能告诉我们您在哪里添加了import吗?是在joblib文件中的一个吗? - Isi

1

对于我的问题,另一个答案是:

我没有找到使用Joblib和Spacy的解决方案,但是为了并行处理,我发现Spacy发布了一个称为Pipeline的东西,可以使用多线程解析大量文档。

我将其应用于上面相同的示例:

class nouns:

    def get_nouns(self, sentences):
        start = time.time()
        docs = nlp.pipe(sentences, n_threads=-1)
        result = [ ' '.join([token.text for token in doc if token.tag_ in ['NN', 'NNP', 'NNS', 'NNPS']]) for doc in docs]
        print('Time Elapsed {} ms'.format((time.time() - start) * 1000))
        print(result)


if __name__ == '__main__':
    sentences = ['we went to the school yesterday',
                 'The weather is really cold',
                 'Can we catch the dog?',
                 'How old are you John?',
                 'I like diving and swimming',
                 'Can the world become united?']
    obj = nouns()
    obj.get_nouns(sentences)

0

我曾遇到过类似的问题,涉及到另一个库——pymystem3

from pymystem3 import Mystem
mystem = Mystem()
    
def preprocess_text(text):
   ...
   tokens = mystem.lemmatize(text)
   ...
   text = " ".join(tokens)
   return text

data_set = Parallel(n_jobs=-1)(delayed(preprocess_text)(article) for article in tqdm(articles))

解决方案是将初始化放入函数中。
def preprocess_text(text):
   ...
   mystem = Mystem()
   tokens = mystem.lemmatize(text)
   ...
   text = " ".join(tokens)
   return text

我猜你可以尝试使用 nlp = spacy.load 来做同样的事情。


0

我如何解决这个问题的案例研究:

环境

  • Windows 10 x64
  • Python 3.9 或 3.10
  • joblib v1.1

解决方案

# Examine the stack trace very carefully, you will see a line something like this:
TypeError: self.c_map cannot be converted to a Python object for pickling

这告诉你哪个变量无法序列化。

要修复,请选择以下一个选项:

  1. 从函数中删除该变量。
  2. 在函数中从头开始初始化该变量。

在我的情况下,我必须使用#1和#2的混合:

  1. 删除指向具有打开文件句柄的类的变量(它无法pickle任何带有打开文件的内容)。
  2. 类变量无法被pickled,因此我在函数内部重新初始化它(这消除了将此类序列化并传递给新进程的需要)。

示例代码

# [Bugfix]. Add next line to initialize this again to eliminate pmap pickle error. Stacktrace is your friend!
hdb = HivedbApi(base_dir=hivedb_base_dir, table_name=table_name, partition_type=PartitionType.HiveFilePerDate)
hdb.write(df_trades)

在这个例子中,我会在get_nouns()内部寻找一些无法序列化的变量,基于堆栈跟踪(它会告诉你它正在遇到哪个变量)。

未能解决的方案

这个页面上没有其他方法可行,包括将后端更改为threading,将pickler更改为dill,注释函数,更改Python版本等。

底线:

有时候,没有任何东西可以序列化一个类,特别是如果它有打开文件的句柄。在这种情况下,唯一的解决方案是(a)从目标函数中删除这些变量,或者(b)在目标函数内重新初始化这些变量。


0

只想添上我的两分意见。在你的类方法中使用@staticmethod而非普通方法,并避免自动注入self对象,以防止意外序列化整个框架,正如我(flask)所遇到的问题一样。因为这个框架有很多后台注入和导致序列化依赖性爆炸的操作。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接