Python中类似于R的magrittr包中的%>%的函数管道

135

在R语言中(感谢magrittr),您现在可以通过%>%使用更加函数式的管道语法执行操作。这意味着,您不需要编写以下代码:

> as.Date("2014-01-01")
> as.character((sqrt(12)^2)

你也可以这样做:

> "2014-01-01" %>% as.Date 
> 12 %>% sqrt %>% .^2 %>% as.character

对我来说,这种方式更易读,并且适用于数据框之外的用例。 Python语言是否支持类似的功能呢?


2
很好的问题。我特别关注函数有更多参数的情况。就像在 How dplyr replaced my most common R idioms 的结尾处所示,crime_by_state %>% filter(State=="New York", Year==2005) ... - Piotr Migdal
1
当然,一个人可以使用许多lambda、map和reduce来完成它(而且这样做很简单),但简洁和可读性是主要的关键点。 - Piotr Migdal
14
涉及的软件包是magrittr。 - piccolbo
1
是的,出于同样的原因,每个编写的R包都是由Hadley撰写的。他更加知名。(这里有一点嫉妒) - piccolbo
1
查看解决此问题的 https://dev59.com/3FwX5IYBdhLWcg3wjwJZ 的答案。 - Piotr Migdal
显示剩余3条评论
16个回答

6

我想分享一下我的看法。我个人使用fn包进行函数式编程。您的示例可以翻译为:

from fn import F, _
from math import sqrt

(F(sqrt) >> _**2 >> str)(12)

F是一个包装类,带有函数式语法糖,用于部分应用和组合。 _是匿名函数的Scala风格构造函数(类似于Python的lambda);它表示一个变量,因此您可以将多个_对象组合在一个表达式中,以获得具有更多参数的函数(例如,_ + _相当于lambda a, b: a + b)。F(sqrt) >> _**2 >> str的结果是一个Callable对象,可以使用多次。


正是我所需要的 - 甚至提到了Scala作为一个例子。现在正在尝试它。 - WestCoastProjects
@javadba,我很高兴你觉得这个有用。请注意,_并不是完全灵活的:它不支持所有Python运算符。此外,如果您计划在交互式会话中使用_,您应该使用另一个名称导入它(例如from fn import _ as var),因为大多数(如果不是全部)交互式Python shell使用_来表示最后一个未分配的返回值,从而遮蔽了导入的对象。 - Eli Korvigo

5

这里有一个非常好的pipe模块https://pypi.org/project/pipe/。它重载了|运算符并提供了许多管道函数,例如add、first、where、tail等。

>>> [1, 2, 3, 4] | where(lambda x: x % 2 == 0) | add
6

>>> sum([1, [2, 3], 4] | traverse)
10

此外,编写自己的管道函数非常容易。
@Pipe
def p_sqrt(x):
    return sqrt(x)

@Pipe
def p_pr(x):
    print(x)

9 | p_sqrt | p_pr

4

一种替代方案是使用工作流工具dask。虽然它的语法不如...

var
| do this
| then do that

即使您使用dask,它仍允许变量沿着链路流动,并且使用dask可以在可能的情况下提供并行化的额外好处。

以下是我如何使用dask来实现管道链式模式:

import dask

def a(foo):
    return foo + 1
def b(foo):
    return foo / 2
def c(foo,bar):
    return foo + bar

# pattern = 'name_of_behavior': (method_to_call, variables_to_pass_in, variables_can_be_task_names)
workflow = {'a_task':(a,1),
            'b_task':(b,'a_task',),
            'c_task':(c,99,'b_task'),}

#dask.visualize(workflow) #visualization available. 

dask.get(workflow,'c_task')

# returns 100

在使用过Elixir之后,我想在Python中使用管道模式。虽然这不是完全相同的模式,但它很相似,并且像我所说的那样,具有并行化的附加优势;如果您告诉Dask先运行工作流程中不依赖于其他任务的任务,则它们将并行运行。

如果您希望更简单的语法,可以将其包装在一个可以为您处理任务命名的东西中。当然,在这种情况下,您需要所有函数都将管道作为第一个参数,并且您将失去任何并行化的好处。但是,如果您对此没有问题,可以执行以下操作:

def dask_pipe(initial_var, functions_args):
    '''
    call the dask_pipe with an init_var, and a list of functions
    workflow, last_task = dask_pipe(initial_var, {function_1:[], function_2:[arg1, arg2]})
    workflow, last_task = dask_pipe(initial_var, [function_1, function_2])
    dask.get(workflow, last_task)
    '''
    workflow = {}
    if isinstance(functions_args, list):
        for ix, function in enumerate(functions_args):
            if ix == 0:
                workflow['task_' + str(ix)] = (function, initial_var)
            else:
                workflow['task_' + str(ix)] = (function, 'task_' + str(ix - 1))
        return workflow, 'task_' + str(ix)
    elif isinstance(functions_args, dict):
        for ix, (function, args) in enumerate(functions_args.items()):
            if ix == 0:
                workflow['task_' + str(ix)] = (function, initial_var)
            else:
                workflow['task_' + str(ix)] = (function, 'task_' + str(ix - 1), *args )
        return workflow, 'task_' + str(ix)

# piped functions
def foo(df):
    return df[['a','b']]
def bar(df, s1, s2):
    return df.columns.tolist() + [s1, s2]
def baz(df):
    return df.columns.tolist()

# setup 
import dask
import pandas as pd
df = pd.DataFrame({'a':[1,2,3],'b':[1,2,3],'c':[1,2,3]})

现在,有了这个包装器,您可以按照以下任一语法模式创建管道:
# wf, lt = dask_pipe(initial_var, [function_1, function_2])
# wf, lt = dask_pipe(initial_var, {function_1:[], function_2:[arg1, arg2]})

像这样:

# test 1 - lists for functions only:
workflow, last_task =  dask_pipe(df, [foo, baz])
print(dask.get(workflow, last_task)) # returns ['a','b']

# test 2 - dictionary for args:
workflow, last_task = dask_pipe(df, {foo:[], bar:['string1', 'string2']})
print(dask.get(workflow, last_task)) # returns ['a','b','string1','string2']

这个的一个问题是你不能将函数作为参数传递 :( - MetaStack

2
管道功能可以通过将pandas方法与点组合来实现。以下是一个示例。
加载示例数据框:
import seaborn    
iris = seaborn.load_dataset("iris")
type(iris)
# <class 'pandas.core.frame.DataFrame'>

用点号说明pandas方法的组成:

(iris.query("species == 'setosa'")
     .sort_values("petal_width")
     .head())

如果需要的话,您可以向熊猫数据框添加新方法(例如在这里所做的那样):

pandas.DataFrame.new_method  = new_method

1

只需使用cool

首先,运行python -m pip install cool。 然后,运行python

from cool import F

range(10) | F(filter, lambda x: x % 2) | F(sum) == 25

您可以阅读https://github.com/abersheeran/cool获取更多用法。


0

我受http://tomerfiliba.com/blog/Infix-Operators/启发,想分享我的两分钱意见。

class FuncPipe:
  class Arg:
    def __init__(self, arg):
      self.arg = arg
    def __or__(self, func):
      return func(self.arg)

  def __ror__(self, arg):
    return self.Arg(arg)
pipe = FuncPipe()

那么

1 |pipe| \
  (lambda x: return x+1) |pipe| \
  (lambda x: return 2*x)

返回

4 

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接