如何正确地使用自定义转换器并进行sklearn流水线的pickling(序列化)

36
我正在尝试使用pickle保存一个sklearn机器学习模型,并在另一个项目中加载它。该模型包含进行特征编码、缩放等处理的流水线。当我想在流水线中使用自己编写的transformer进行更高级的任务时,问题就出现了。
假设我有两个项目:
train_project:其中包含src.feature_extraction.transformers.py中的定制transformer。 use_project:其中包含其他事物的src目录,或者根本没有src目录。
如果在“train_project”中使用joblib.dump()保存pipeline,然后在“use_project”中使用joblib.load()加载它,它将无法找到“src.feature_extraction.transformers”之类的内容,并抛出异常:
ModuleNotFoundError: No module named 'src.feature_extraction'
我还应该补充一点,从一开始我的意图就是简化模型的使用,使程序员可以像使用其他模型一样加载模型,传递非常简单且易于阅读的特征,所有特征的“魔术”预处理(例如梯度提升)都在实际模型内部发生。
我考虑创建/dependencies/xxx_model/目录,将所有所需的类和函数存储在两个项目的根目录下(将代码从“train_project”复制到“use_project”),以使项目结构相同并可以加载transformer。但我认为这种解决方案非常不优雅,因为它会强制使用模型的任何项目的结构。
我想只需在“use_project”中重新创建pipeline和所有transformers,并以某种方式加载“train_project”中的拟合值即可。
最好的解决方案是如果保存的文件包含所有所需信息并且不需要依赖项,我真的很震惊sklearn.Pipelines似乎没有这种可能性——如果我不能在以后加载已拟合的对象,那么管道的拟合有什么意义呢?是的,如果我仅使用sklearn类而不创建自定义类,则它将起作用,但非自定义类没有所有所需的功能。
示例代码:
train_project
src.feature_extraction.transformers.py
from sklearn.pipeline import TransformerMixin
class FilterOutBigValuesTransformer(TransformerMixin):
    def __init__(self):
        pass

    def fit(self, X, y=None):
        self.biggest_value = X.c1.max()
        return self

    def transform(self, X):
        return X.loc[X.c1 <= self.biggest_value]

训练项目

main.py

from sklearn.externals import joblib
from sklearn.preprocessing import MinMaxScaler
from src.feature_extraction.transformers import FilterOutBigValuesTransformer

pipeline = Pipeline([
    ('filter', FilterOutBigValuesTransformer()),
    ('encode', MinMaxScaler()),
])
X=load_some_pandas_dataframe()
pipeline.fit(X)
joblib.dump(pipeline, 'path.x')
test_project
main.py
from sklearn.externals import joblib

pipeline = joblib.load('path.x')

期望的结果是管道已正确加载,可以使用转换方法。

实际结果是加载文件时出现异常。


1
我有同样的问题,我会分享我到目前为止尝试过的。交替使用joblib和pickle,重新导入我的自定义featureUnion子类。如果你找到了解决方法,请在这里发布。 - Irtaza
8个回答

16

我发现一个相当简单的解决方案。假设你正在使用Jupyter笔记本进行训练:

  1. 创建一个.py文件,定义自定义转换器并将其导入到Jupyter笔记本中。

这是custom_transformer.py文件

from sklearn.pipeline import TransformerMixin

class FilterOutBigValuesTransformer(TransformerMixin):
    def __init__(self):
        pass

    def fit(self, X, y=None):
        self.biggest_value = X.c1.max()
        return self

    def transform(self, X):
        return X.loc[X.c1 <= self.biggest_value]
  1. .py文件中导入这个类并使用joblib保存你的模型。
import joblib
from custom_transformer import FilterOutBigValuesTransformer
from sklearn.externals import joblib
from sklearn.preprocessing import MinMaxScaler

pipeline = Pipeline([
    ('filter', FilterOutBigValuesTransformer()),
    ('encode', MinMaxScaler()),
])

X=load_some_pandas_dataframe()
pipeline.fit(X)

joblib.dump(pipeline, 'pipeline.pkl')

在不同的Python脚本中加载.pkl文件时,您将需要导入.py文件才能使其正常工作。
import joblib
from utils import custom_transformer # decided to save it in a utils directory

pipeline = joblib.load('pipeline.pkl')


4

显然,当你在两个不同的文件中拆分定义保存代码部分时,就会出现这个问题。所以我找到了这个解决方法,对我有用。

它包括以下步骤:

假设我们有你的2个项目/仓库:train_project和use_project

train_project:

  • 在你的train_project上创建一个jupyter notebook或.py文件

  • 在那个文件中,让我们定义每个自定义转换器的类,并从sklearn导入所有其他需要设计管道的工具。然后让我们在同一个文件中编写保存pickle的代码。(不要创建外部.py文件src.feature_extraction.transformers来定义自定义转换器)。

  • 然后通过运行那个文件来适应和转储你的管道。

在use_project上:

  • 创建一个名为customthings.py的文件,其中包含所有函数和转换器的定义。

  • 创建另一个file_where_load.py文件,在其中加载pickle。在里面,确保你已经从customthings.py中导入了所有定义。确保函数和类有与你在train_project中使用的相同的名称

我希望这对于所有遇到同样问题的人都起作用。


3

感谢Ture Friese提到的cloudpickle >=2.0.0,但这里有一个适用于您情况的示例。

import cloudpickle

cloudpickle.register_pickle_by_value(FilterOutBigValuesTransformer)
with open('./pipeline.cloudpkl', mode='wb') as file:
    pipeline.dump(
        obj=Pipe
        , file=file
    )

register_pickle_by_value()是关键,因为它将确保在序列化主对象(pipeline)时包括您的自定义模块(src.feature_extraction.transformers)。但是,这不适用于递归模块依赖关系,例如如果FilterOutBigValuesTransformer还包含另一个import语句。


2

前段时间我也遇到了同样的问题,感到十分惊讶。但是有多种方法可以解决这个问题。

最佳实践解决方案:

正如其他人所提到的,最佳实践解决方案是将管道的所有依赖项移动到一个单独的Python包中,并将该包定义为模型环境的依赖项。

然后,在部署模型时必须重新创建环境。在简单的情况下,可以手动完成,例如通过virtualenv或Poetry。但是,模型存储和版本控制框架(MLflow是其中之一)通常提供一种定义所需Python环境的方法(例如通过conda.yaml)。它们通常可以在部署时自动重新创建环境。

将代码放入main的解决方案:

事实上,类和函数声明是可以序列化的,但只有在__main__中的声明才会被序列化。 __main__是脚本的入口点,即运行的文件。因此,如果所有自定义代码以及其所有依赖项都在该文件中,则稍后可以在不包括代码的Python环境中加载自定义对象。这种方法解决了问题,但是谁想要将所有代码都放在__main__中呢?(请注意,此属性也适用于cloudpickle)

"mainify"解决方案:

还有一种方法是在保存之前“mainify”类或函数对象。我前段时间遇到了同样的问题,并编写了一个执行此操作的函数。它本质上是在__main__中重新定义现有对象的代码。其应用很简单:将对象传递给函数,然后序列化对象,就可以在任何地方加载它了。像这样:

# ------ In file1.py: ------    
    
class Foo():
    pass

# ------ In file2.py: ------
from file1 import Foo    

foo = Foo()
foo = mainify(foo)

import dill
    
with open('path/file.dill', 'wb') as f
   dill.dump(foo, f)

我在下面发布了函数代码。请注意,我已经使用dill进行了测试,但我认为它也适用于pickle。
还要注意,原始想法不是我的,而是来自一篇我现在找不到的博客文章。当我找到它时,我将添加参考/确认。 编辑:由Oege Dijk发布的博客文章启发了我的代码。
def mainify(obj, warn_if_exist=True):
    ''' If obj is not defined in __main__ then redefine it in main. Allows dill 
    to serialize custom classes and functions such that they can later be loaded
    without them being declared in the load environment.

    Parameters
    ---------
    obj           : Object to mainify (function or class instance)
    warn_if_exist : Bool, default True. Throw exception if function (or class) of
                    same name as the mainified function (or same name as mainified
                    object's __class__) was already defined in __main__. If False
                    don't throw exception and instead use what was defined in
                    __main__. See Limitations.
    Limitations
    -----------
    Assumes `obj` is either a function or an instance of a class.                
    ''' 
    if obj.__module__ != '__main__':                                                
        
        import __main__       
        is_func = True if isinstance(obj, types.FunctionType) else False                                            
        
        # Check if obj with same name is already defined in __main__ (for funcs)
        # or if class with same name as obj's class is already defined in __main__.
        # If so, simply return the func with same name from __main__ (for funcs)
        # or assign the class of same name to obj and return the modified obj        
        if is_func:
            on = obj.__name__
            if on in __main__.__dict__.keys():
                if warn_if_exist:
                    raise RuntimeError(f'Function with __name__ `{on}` already defined in __main__')
                return __main__.__dict__[on]
        else:
            ocn = obj.__class__.__name__
            if ocn  in __main__.__dict__.keys():
                if warn_if_exist:
                    raise RuntimeError(f'Class with obj.__class__.__name__ `{ocn}` already defined in __main__')
                obj.__class__ = __main__.__dict__[ocn]                
                return obj
                                
        # Get source code and compile
        source = inspect.getsource(obj if is_func else obj.__class__)
        compiled = compile(source, '<string>', 'exec')                    
        # "declare" in __main__, keeping track which key of __main__ dict is the new one        
        pre = list(__main__.__dict__.keys()) 
        exec(compiled, __main__.__dict__)
        post = list(__main__.__dict__.keys())                        
        new_in_main = list(set(post) - set(pre))[0]
        
        # for function return mainified version, else assign new class to obj and return object
        if is_func:
            obj = __main__.__dict__[new_in_main]            
        else:            
            obj.__class__ = __main__.__dict__[new_in_main]
                
    return obj

2
我已经创建了一个解决方案。虽然我不认为它是完整的答案,但它让我摆脱了问题。
解决方案的条件如下:
1.管道需要仅包含两种类型的转换器: - sklearn 转换器 - 自定义转换器,但只能有以下类型的属性: - 数字 - 字符串 - 列表 - 字典
或者这些的任意组合,例如字符串和数字的列表字典。重要的是属性可以进行 JSON 序列化。
2. 管道步骤的名称需要唯一(即使存在管道嵌套)
简而言之,模型将作为目录存储,并使用 joblib 转储文件、自定义转换器的 JSON 文件以及其他关于模型的信息的 JSON 文件。
我创建了一个函数,遍历管道步骤并检查转换器的 __module__ 属性。
如果在其中找到 sklearn,则在指定步骤的名称下运行 joblib.dump 函数,将其保存到某个选定的模型目录中。
否则(__module__ 中没有 sklearn),它会将转换器的 __dict__ 添加到 result_dict 中,键等于指定步骤的名称。最后,我将 result_dict 转储为 result_dict.json 的模型目录下。
如果需要进入某个转换器,例如因为管道内部有一个管道,则可能需要通过向函数开头添加一些规则来递归运行此函数,但始终重要的是在主管道和子管道之间始终具有唯一的步骤/转换器名称。
如果需要其他信息来创建模型管道,则将这些信息保存在 model_info.json 中。
然后,如果您想加载模型以供使用: 您需要在目标项目中创建(而不是拟合)相同的管道。如果管道创建有些动态,并且您需要来自源项目的信息,则从 model_info.json 中加载它。
您可以复制用于序列化的函数并:
- 用 joblib.load 语句替换所有 joblib.dump 语句,将加载对象的 __dict__ 分配给已经存在于管道中的对象的 __dict__ - 将添加 __dict__ 到 result_dict 的所有地方替换为将适当值从 result_dict 分配给对象 __dict__(记得从文件中预先加载 result_dict)
运行此修改后的函数后,以前未拟合的管道应具有所有转换器属性,这些属性是由拟合加载的,整个管道已准备好进行预测。
我不喜欢这种解决方案的主要原因是它需要目标项目中的管道代码,并且需要自定义转换器的所有属性都可以进行 JSON 序列化,但我把它留在这里供其他遇到类似问题的人使用,也许有人能想出更好的解决方案。

1
根据我的研究,最好的解决方案是创建一个包含您训练好的管道和所有文件的Python包。然后,在希望使用它的项目中pip安装它,并使用from <package name> import <pipeline name>导入管道。

1

-1

使用 sys.path.append 调用 transform.py 文件的位置可能会解决问题。

import sys
sys.path.append("src/feature_extraction/transformers")

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接