Python: 如何在生产中加速机器学习预测?

3

我已经在scikit-learn中创建了一个机器学习模型,现在需要将其用于生产环境中的实时数据。例如,特征看起来像这样:

  date          event_id  user_id     feature1    feature2    featureX...
  2017-01-27    100       5555        1.23        2           2.99
  2017-01-27    100       4444        2.55        5           3.16
  2017-01-27    100       3333        0.45        3           1.69
  2017-01-27    105       1212        3.96        4           0.0
  2017-01-27    105       2424        1.55        2           5.56
  2017-01-27    105       3636        0.87        4           10.28

因此,每天都有不同的事件。在事件开始之前,我基本上通过从数据库中提取它们并使用存储的scikit模型计算预测结果来将它们存储在数据框中:

df_X = df.drop(['date', 'event_id', 'user_id'], axis=1)
loaded_model = joblib.load("model.joblib.dat")
prediction = loaded_model.predict_proba(df_X)

然后我将预测结果与df匹配,并根据需要将其作为输出发送到API或文件中。
当事件开始时,我从API获取不断更新的featureX。为了进行更新,我使用循环遍历每个event_id和user_id,并使用新的featureX值更新df,重新计算并再次发送到输出。
为此,我做了类似于以下的操作:
# get list of unique event ids
events = set(df['event_id'].tolist())

try:
    while True:
        start = time.time()
        for event in events:
            featureX = request.get(API_URL + event)
            featureX_json = featureX.json()

            for user in featureX_json['users']:
                df.loc[df.user_id == user['user_id'],
                       'featureX'] = user['featureX']

        df_X = df.drop(['date', 'event_id', 'user_id'], axis=1)
        df['prediction'] = loaded_model.predict_proba(df_X)

        # send to API or write to file

        end = time.time()
        print('recalculation time {} secs'.format(end - start))

except KeyboardInterrupt:
    print('exiting !')

这对我来说很有效,但整个预测更新在服务器上需要大约4秒的时间,而我需要它在1秒内完成。我正在尝试找出可以在while循环中更改以获得所需加速的内容。

event_id = 100的请求下,已添加了json样例,URL为http://myapi/api/event_users/<event_id>

{
    "count": 3,
    "users": [
        {
            "user_id": 4444,
            "featureY": 34,
            "featureX": 4.49,
            "created": "2017-01-17T13:00:09.065498Z"
        },
        {
            "user_id": 3333,
            "featureY": 22,
            "featureX": 1.09,
            "created": "2017-01-17T13:00:09.065498Z"
        },
         {
            "user_id": 5555,
            "featureY": 58,
            "featureX": 9.54,
            "created": "2017-01-17T13:00:09.065498Z"
        }
    ]
}

你能提供一个featureX_json的样本数据集吗? - MaxU - stand with Ukraine
1
@MaxU 我已经在问题中添加了一个样本数据集。 - sfactor
3个回答

1
# get list of unique event ids
events = df['event_id'].unique().tolist()

try:
    while True:     # i don't understand why do you need this loop...
        start = time.time()
        for event in events:
            featureX = request.get(API_URL + event)
            tmp = pd.DataFrame(featureX.json()['users'])

            df.loc[(df.event_id == event), 'featureX'] = \
                df.loc[df.event_id == event, 'user_id'] \
                  .map(tmp.set_index('user_id').featureX)

        df_X = df.drop(['date', 'event_id', 'user_id'], axis=1)
        df['prediction'] = loaded_model.predict_proba(df_X)

        # send to API or write to file

        end = time.time()
        print('recalculation time {} secs'.format(end - start))

except KeyboardInterrupt:
    print('exiting !')

演示: 对于 event_id == 100

首先让我们从您的JSON对象创建一个DF:

tmp = pd.DataFrame(featureX_json['users'])

In [33]: tmp
Out[33]:
                       created  featureX  featureY  user_id
0  2017-01-17T13:00:09.065498Z      4.49        34     4444
1  2017-01-17T13:00:09.065498Z      1.09        22     3333
2  2017-01-17T13:00:09.065498Z      9.54        58     5555

现在我们可以摆脱 for user in featureX_json['users']: 循环:

In [29]: df.loc[df.event_id == 100, 'featureX'] = \
             df.loc[df.event_id == 100, 'user_id'].map(tmp.set_index('user_id').featureX)

In [30]: df
Out[30]:
         date  event_id  user_id  feature1  feature2  featureX
0  2017-01-27       100     5555      1.23         2      9.54   # 2.99 -> 9.54
1  2017-01-27       100     4444      2.55         5      4.49   # 3.16 -> 4.49
2  2017-01-27       100     3333      0.45         3      1.09   # 1.69 -> 1.09
3  2017-01-27       105     1212      3.96         4      0.00
4  2017-01-27       105     2424      1.55         2      5.56
5  2017-01-27       105     3636      0.87         4     10.28

0
你可以尝试使用算法的加速实现,例如scikit-learn-intelex - https://github.com/intel/scikit-learn-intelex。 这是一个免费的软件AI加速器,可在各种应用程序中提供10-100倍的加速。
该库将为训练和预测都提供巨大的性能改进。

您可以实现的加速示例

首先安装包

pip install scikit-learn-intelex

然后在你的Python脚本中添加

from sklearnex import patch_sklearn
patch_sklearn()

0

建议订阅某种消息队列,例如Kafka。这样,您可以在更新FeatureX时消费它,而不是无休止地在循环中进行批量API调用,然后迭代整个数据源等。

关于预测,使用更可扩展的方法可能更有意义。您可以将数据框分成块,并向可扩展的高吞吐量预测API发出异步请求。使用此方法,您仅受网络延迟和同时可以进行多少请求的限制。如果预测API能够处理数千/数万/数十万个请求/秒,则您的预测时间可以缩短到不到一秒钟(可能只有几百毫秒)。

我的服务mlrequest是一个低延迟、高吞吐量、高可用性的机器学习API,非常适合解决这类问题。我们可以处理并扩展到许多许多每秒预测。Scikit Learn模型和Pandas Dataframes将在下一个版本中得到支持(即将推出)。以下是一个简单的训练和预测示例。您可以获得免费API密钥,每月可获得50,000个模型事务。

安装 mlrequest Python 客户端

$pip install mlrequest

训练一个模型并将其部署到全球5个数据中心就像这样简单:
from mlrequest import Classifier
classifier = Classifier('my-api-key')
features = {'feature1': 'val1','feature2': 100}
training_data = [{'features': features, 'label': 1}, ...]
r = classifier.learn(training_data=training_data, model_name='my-model', class_count=2) 

预测

features = [{'feature1': 'val1', 'feature2': 77}, ...]
r = classifier.predict(features=features, model_name='my-model', class_count=2)
r.predict_result

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接