在PySpark中,什么是与Scala case class等效的内容?

17
你如何在 PySpark 中使用或实现一个类似于 case class 的东西?

4
Pythonзҡ„collections.namedtupleйқһеёёзӣёдјјгҖӮ - Alex Hall
@AlexHall 所以你最终是在说你可以使用一些通用的Python类... 没有与PySpark一起提供的经过优化的Spark案例类对应的吗? - conner.xyz
我对PySpark不是很了解,只是一般的Python建议。 - Alex Hall
3
不,没有因为如果没有静态类型,案例类(或一般的“Product”类型)就不是那么有用。通常纯Python元组就足够了。命名元组很棒,但是需要在工作进程之间分发。 - zero323
2个回答

24
正如Alex Hall所提到的, 一种真正等效于命名产品类型的方法是使用namedtuple
其他答案中建议的Row不同,它具有许多有用的属性:
  • Has well defined shape and can be reliably used for structural pattern matching:

    >>> from collections import namedtuple
    >>>
    >>> FooBar = namedtuple("FooBar", ["foo", "bar"])
    >>> foobar = FooBar(42, -42)
    >>> foo, bar = foobar
    >>> foo
    42
    >>> bar
    -42
    

    In contrast Rows are not reliable when used with keyword arguments:

    >>> from pyspark.sql import Row
    >>>
    >>> foobar = Row(foo=42, bar=-42)
    >>> foo, bar = foobar
    >>> foo
    -42
    >>> bar
    42
    

    although if defined with positional arguments:

    >>> FooBar = Row("foo", "bar")
    >>> foobar = FooBar(42, -42)
    >>> foo, bar = foobar
    >>> foo
    42
    >>> bar
    -42
    

    the order is preserved.

  • Define proper types

    >>> from functools import singledispatch
    >>> 
    >>> FooBar = namedtuple("FooBar", ["foo", "bar"])
    >>> type(FooBar)
    <class 'type'>
    >>> isinstance(FooBar(42, -42), FooBar)
    True
    

    and can be used whenever type handling is required, especially with single:

    >>> Circle = namedtuple("Circle", ["x", "y", "r"])
    >>> Rectangle = namedtuple("Rectangle", ["x1", "y1", "x2", "y2"])
    >>>
    >>> @singledispatch
    ... def area(x):
    ...     raise NotImplementedError
    ... 
    ... 
    >>> @area.register(Rectangle)
    ... def _(x):
    ...     return abs(x.x1 - x.x2) * abs(x.y1 - x.y2)
    ... 
    ... 
    >>> @area.register(Circle)
    ... def _(x):
    ...     return math.pi * x.r ** 2
    ... 
    ... 
    >>>
    >>> area(Rectangle(0, 0, 4, 4))
    16
    >>> >>> area(Circle(0, 0, 4))
    50.26548245743669
    

    and multiple dispatch:

    >>> from multipledispatch import dispatch
    >>> from numbers import Rational
    >>>
    >>> @dispatch(Rectangle, Rational)
    ... def scale(x, y):
    ...     return Rectangle(x.x1, x.y1, x.x2 * y, x.y2 * y)
    ... 
    ... 
    >>> @dispatch(Circle, Rational)
    ... def scale(x, y):
    ...     return Circle(x.x, x.y, x.r * y)
    ...
    ...
    >>> scale(Rectangle(0, 0, 4, 4), 2)
    Rectangle(x1=0, y1=0, x2=8, y2=8)
    >>> scale(Circle(0, 0, 11), 2)
    Circle(x=0, y=0, r=22)
    

    and combined with the first property, there can be used in wide ranges of pattern matching scenarios. namedtuples also support standard inheritance and type hints.

    Rows don't:

    >>> FooBar = Row("foo", "bar")
    >>> type(FooBar)
    <class 'pyspark.sql.types.Row'>
    >>> isinstance(FooBar(42, -42), FooBar)  # Expected failure
    Traceback (most recent call last):
    ...
    TypeError: isinstance() arg 2 must be a type or tuple of types
    >>> BarFoo = Row("bar", "foo")
    >>> isinstance(FooBar(42, -42), type(BarFoo))
    True
    >>> isinstance(BarFoo(42, -42), type(FooBar))
    True
    
  • Provide highly optimized representation. Unlike Row objects, tuple don't use __dict__ and carry field names with each instance. As a result there are can be order of magnitude faster to initialize:

    >>> FooBar = namedtuple("FooBar", ["foo", "bar"])
    >>> %timeit FooBar(42, -42)
    587 ns ± 5.28 ns per loop (mean ± std. dev. of 7 runs, 1000000 loops each)
    

    compared to different Row constructors:

    >>> %timeit Row(foo=42, bar=-42)
    3.91 µs ± 7.67 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)
    >>> FooBar = Row("foo", "bar")
    >>> %timeit FooBar(42, -42)
    2 µs ± 25.4 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)
    

    and are significantly more memory efficient (very important property when working with large scale data):

    >>> import sys
    >>> FooBar = namedtuple("FooBar", ["foo", "bar"])
    >>> sys.getsizeof(FooBar(42, -42))
    64
    

    compared to equivalent Row

    >>> sys.getsizeof(Row(foo=42, bar=-42))
    72
    

    Finally attribute access is order of magnitude faster with namedtuple:

    >>> FooBar = namedtuple("FooBar", ["foo", "bar"])
    >>> foobar = FooBar(42, -42)
    >>> %timeit foobar.foo
    102 ns ± 1.33 ns per loop (mean ± std. dev. of 7 runs, 10000000 loops each)
    

    compared to equivalent operation on Row object:

    >>> foobar = Row(foo=42, bar=-42)
    >>> %timeit foobar.foo
    2.58 µs ± 26.9 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)
    
  • Last but not least namedtuples are properly supported in Spark SQL

    >>> Record = namedtuple("Record", ["id", "name", "value"])
    >>> spark.createDataFrame([Record(1, "foo", 42)])
    DataFrame[id: bigint, name: string, value: bigint]
    

摘要:

很明显,Row是一个非常糟糕的实际产品类型替代品,除非被Spark API强制执行,否则应该避免使用。

同时,应该清楚pyspark.sql.Row并不打算取代case类,因为它是org.apache.spark.sql.Row的直接等价类型,这种类型与实际产品相差甚远,并且表现得像Seq[Any](根据子类而定,添加了名称)。Python和Scala实现都作为一种有用但笨拙的接口介绍给外部代码和内部Spark SQL表示之间的。

另请参阅:

  • It would be a shame not to mention awesome MacroPy developed by Li Haoyi and its port (MacroPy3) by Alberto Berti:

    >>> import macropy.console
    0=[]=====> MacroPy Enabled <=====[]=0
    >>> from macropy.case_classes import macros, case
    >>> @case
    ... class FooBar(foo, bar): pass
    ... 
    >>> foobar = FooBar(42, -42)
    >>> foo, bar = foobar
    >>> foo
    42
    >>> bar
    -42
    

    which comes with a rich set of other features including, but not limited to, advanced pattern matching and neat lambda expression syntax.

  • Python dataclasses (Python 3.7+).


1
这是一个很棒的答案! - Sean Vieira

5
如果您访问 sql-programming-guide 中的Inferring the Schema Using Reflection部分,您将看到case class的定义如下:

case class定义表的模式。使用反射读取case类的参数名称,并成为列名。Case类也可以嵌套或包含复杂类型,例如序列或数组。

附带示例:
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
case class Person(name: String, age: Int)
val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF()

在同一部分中,如果您切换到pythonpyspark,您将看到Row被用作并定义为:

通过将键/值对列表作为kwargs传递给Row类来构造行。此列表的键定义表的列名,并且类型是通过查看第一行来推断的。

例如:
from pyspark.sql import SQLContext, Row
sqlContext = SQLContext(sc)
lines = sc.textFile("examples/src/main/resources/people.txt")
parts = lines.map(lambda l: l.split(","))
people = parts.map(lambda p: Row(name=p[0], age=int(p[1])))
schemaPeople = sqlContext.createDataFrame(people)

因此,解释的结论是在pyspark中可以将Row用作case class

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接