What is the Scala case class equivalent in PySpark

2019-01-17 23:28发布

问题:

How would you go about employing and/or implementing a case class equivalent in PySpark?

回答1:

As mentioned by Alex Hall a real equivalent of named product type, is a namedtuple.

Unlike Row, suggested in the other answer, it has a number of useful properties:

  • Has well defined shape and can be reliably used for structural pattern matching:

    >>> from collections import namedtuple
    >>>
    >>> FooBar = namedtuple("FooBar", ["foo", "bar"])
    >>> foobar = FooBar(42, -42)
    >>> foo, bar = foobar
    >>> foo
    42
    >>> bar
    -42
    

    In contrast Rows are not reliable when used with keyword arguments:

    >>> from pyspark.sql import Row
    >>>
    >>> foobar = Row(foo=42, bar=-42)
    >>> foo, bar = foobar
    >>> foo
    -42
    >>> bar
    42
    

    although if defined with positional arguments:

    >>> FooBar = Row("foo", "bar")
    >>> foobar = FooBar(42, -42)
    >>> foo, bar = foobar
    >>> foo
    42
    >>> bar
    -42
    

    the order is preserved.

  • Define proper types

    >>> from functools import singledispatch
    >>> 
    >>> FooBar = namedtuple("FooBar", ["foo", "bar"])
    >>> type(FooBar)
    <class 'type'>
    >>> isinstance(FooBar(42, -42), FooBar)
    True
    

    and can be used whenever type handling is required, especially with single:

    >>> Circle = namedtuple("Circle", ["x", "y", "r"])
    >>> Rectangle = namedtuple("Rectangle", ["x1", "y1", "x2", "y2"])
    >>>
    >>> @singledispatch
    ... def area(x):
    ...     raise NotImplementedError
    ... 
    ... 
    >>> @area.register(Rectangle)
    ... def _(x):
    ...     return abs(x.x1 - x.x2) * abs(x.y1 - x.y2)
    ... 
    ... 
    >>> @area.register(Circle)
    ... def _(x):
    ...     return math.pi * x.r ** 2
    ... 
    ... 
    >>>
    >>> area(Rectangle(0, 0, 4, 4))
    16
    >>> >>> area(Circle(0, 0, 4))
    50.26548245743669
    

    and multiple dispatch:

    >>> from multipledispatch import dispatch
    >>> from numbers import Rational
    >>>
    >>> @dispatch(Rectangle, Rational)
    ... def scale(x, y):
    ...     return Rectangle(x.x1, x.y1, x.x2 * y, x.y2 * y)
    ... 
    ... 
    >>> @dispatch(Circle, Rational)
    ... def scale(x, y):
    ...     return Circle(x.x, x.y, x.r * y)
    ...
    ...
    >>> scale(Rectangle(0, 0, 4, 4), 2)
    Rectangle(x1=0, y1=0, x2=8, y2=8)
    >>> scale(Circle(0, 0, 11), 2)
    Circle(x=0, y=0, r=22)
    

    and combined with the first property, there can be used in wide ranges of pattern matching scenarios. namedtuples also support standard inheritance and type hints.

    Rows don't:

    >>> FooBar = Row("foo", "bar")
    >>> type(FooBar)
    <class 'pyspark.sql.types.Row'>
    >>> isinstance(FooBar(42, -42), FooBar)  # Expected failure
    Traceback (most recent call last):
    ...
    TypeError: isinstance() arg 2 must be a type or tuple of types
    >>> BarFoo = Row("bar", "foo")
    >>> isinstance(FooBar(42, -42), type(BarFoo))
    True
    >>> isinstance(BarFoo(42, -42), type(FooBar))
    True
    
  • Provide highly optimized representation. Unlike Row objects, tuple don't use __dict__ and carry field names with each instance. As a result there are can be order of magnitude faster to initialize:

    >>> FooBar = namedtuple("FooBar", ["foo", "bar"])
    >>> %timeit FooBar(42, -42)
    587 ns ± 5.28 ns per loop (mean ± std. dev. of 7 runs, 1000000 loops each)
    

    compared to different Row constructors:

    >>> %timeit Row(foo=42, bar=-42)
    3.91 µs ± 7.67 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)
    >>> FooBar = Row("foo", "bar")
    >>> %timeit FooBar(42, -42)
    2 µs ± 25.4 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)
    

    and are significantly more memory efficient (very important property when working with large scale data):

    >>> import sys
    >>> FooBar = namedtuple("FooBar", ["foo", "bar"])
    >>> sys.getsizeof(FooBar(42, -42))
    64
    

    compared to equivalent Row

    >>> sys.getsizeof(Row(foo=42, bar=-42))
    72
    

    Finally attribute access is order of magnitude faster with namedtuple:

    >>> FooBar = namedtuple("FooBar", ["foo", "bar"])
    >>> foobar = FooBar(42, -42)
    >>> %timeit foobar.foo
    102 ns ± 1.33 ns per loop (mean ± std. dev. of 7 runs, 10000000 loops each)
    

    compared to equivalent operation on Row object:

    >>> foobar = Row(foo=42, bar=-42)
    >>> %timeit foobar.foo
    2.58 µs ± 26.9 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)
    
  • Last but not least namedtuples are properly supported in Spark SQL

    >>> Record = namedtuple("Record", ["id", "name", "value"])
    >>> spark.createDataFrame([Record(1, "foo", 42)])
    DataFrame[id: bigint, name: string, value: bigint]
    

Summary:

It should be clear that Row is a very poor substitute for an actual product type, and should be avoided unless enforced by Spark API.

It should be also clear that pyspark.sql.Row is not intended to be a replacement of a case class when you consider that, it is direct equivalent of org.apache.spark.sql.Row - type which is pretty far from an actual product, and behaves like Seq[Any] (depending on a subclass, with names added). Both Python and Scala implementations were introduced as an useful, albeit awkward interface between external code and internal Spark SQL representation.

See also:

  • It would be a shame not to mention awesome MacroPy developed by Li Haoyi and its port (MacroPy3) by Alberto Berti:

    >>> import macropy.console
    0=[]=====> MacroPy Enabled <=====[]=0
    >>> from macropy.case_classes import macros, case
    >>> @case
    ... class FooBar(foo, bar): pass
    ... 
    >>> foobar = FooBar(42, -42)
    >>> foo, bar = foobar
    >>> foo
    42
    >>> bar
    -42
    

    which comes with a rich set of other features including, but not limited to, advanced pattern matching and neat lambda expression syntax.

  • Python dataclasses (Python 3.7+).



回答2:

If you go to sql-programming-guide in Inferring the Schema Using Reflection section, you will see case class being defined as

case class defines the schema of the table. The names of the arguments to the case class are read using reflection and become the names of the columns. Case classes can also be nested or contain complex types such as Sequences or Arrays.

with example as

val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
case class Person(name: String, age: Int)
val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF()

In the same section, if you switch to python i.e. pyspark, you will see Row being used and defined as

Rows are constructed by passing a list of key/value pairs as kwargs to the Row class. The keys of this list define the column names of the table, and the types are inferred by looking at the first row.

with example as

from pyspark.sql import SQLContext, Row
sqlContext = SQLContext(sc)
lines = sc.textFile("examples/src/main/resources/people.txt")
parts = lines.map(lambda l: l.split(","))
people = parts.map(lambda p: Row(name=p[0], age=int(p[1])))
schemaPeople = sqlContext.createDataFrame(people)

So the conclusion of the explanation is that Row can be used as case class in pyspark