How would you go about employing and/or implementing a case class equivalent in PySpark?
问题:
回答1:
As mentioned by Alex Hall a real equivalent of named product type, is a namedtuple
.
Unlike Row
, suggested in the other answer, it has a number of useful properties:
Has well defined shape and can be reliably used for structural pattern matching:
>>> from collections import namedtuple >>> >>> FooBar = namedtuple("FooBar", ["foo", "bar"]) >>> foobar = FooBar(42, -42) >>> foo, bar = foobar >>> foo 42 >>> bar -42
In contrast
Rows
are not reliable when used with keyword arguments:>>> from pyspark.sql import Row >>> >>> foobar = Row(foo=42, bar=-42) >>> foo, bar = foobar >>> foo -42 >>> bar 42
although if defined with positional arguments:
>>> FooBar = Row("foo", "bar") >>> foobar = FooBar(42, -42) >>> foo, bar = foobar >>> foo 42 >>> bar -42
the order is preserved.
Define proper types
>>> from functools import singledispatch >>> >>> FooBar = namedtuple("FooBar", ["foo", "bar"]) >>> type(FooBar) <class 'type'> >>> isinstance(FooBar(42, -42), FooBar) True
and can be used whenever type handling is required, especially with single:
>>> Circle = namedtuple("Circle", ["x", "y", "r"]) >>> Rectangle = namedtuple("Rectangle", ["x1", "y1", "x2", "y2"]) >>> >>> @singledispatch ... def area(x): ... raise NotImplementedError ... ... >>> @area.register(Rectangle) ... def _(x): ... return abs(x.x1 - x.x2) * abs(x.y1 - x.y2) ... ... >>> @area.register(Circle) ... def _(x): ... return math.pi * x.r ** 2 ... ... >>> >>> area(Rectangle(0, 0, 4, 4)) 16 >>> >>> area(Circle(0, 0, 4)) 50.26548245743669
and multiple dispatch:
>>> from multipledispatch import dispatch >>> from numbers import Rational >>> >>> @dispatch(Rectangle, Rational) ... def scale(x, y): ... return Rectangle(x.x1, x.y1, x.x2 * y, x.y2 * y) ... ... >>> @dispatch(Circle, Rational) ... def scale(x, y): ... return Circle(x.x, x.y, x.r * y) ... ... >>> scale(Rectangle(0, 0, 4, 4), 2) Rectangle(x1=0, y1=0, x2=8, y2=8) >>> scale(Circle(0, 0, 11), 2) Circle(x=0, y=0, r=22)
and combined with the first property, there can be used in wide ranges of pattern matching scenarios.
namedtuples
also support standard inheritance and type hints.Rows
don't:>>> FooBar = Row("foo", "bar") >>> type(FooBar) <class 'pyspark.sql.types.Row'> >>> isinstance(FooBar(42, -42), FooBar) # Expected failure Traceback (most recent call last): ... TypeError: isinstance() arg 2 must be a type or tuple of types >>> BarFoo = Row("bar", "foo") >>> isinstance(FooBar(42, -42), type(BarFoo)) True >>> isinstance(BarFoo(42, -42), type(FooBar)) True
Provide highly optimized representation. Unlike
Row
objects, tuple don't use__dict__
and carry field names with each instance. As a result there are can be order of magnitude faster to initialize:>>> FooBar = namedtuple("FooBar", ["foo", "bar"]) >>> %timeit FooBar(42, -42) 587 ns ± 5.28 ns per loop (mean ± std. dev. of 7 runs, 1000000 loops each)
compared to different
Row
constructors:>>> %timeit Row(foo=42, bar=-42) 3.91 µs ± 7.67 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each) >>> FooBar = Row("foo", "bar") >>> %timeit FooBar(42, -42) 2 µs ± 25.4 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)
and are significantly more memory efficient (very important property when working with large scale data):
>>> import sys >>> FooBar = namedtuple("FooBar", ["foo", "bar"]) >>> sys.getsizeof(FooBar(42, -42)) 64
compared to equivalent
Row
>>> sys.getsizeof(Row(foo=42, bar=-42)) 72
Finally attribute access is order of magnitude faster with
namedtuple
:>>> FooBar = namedtuple("FooBar", ["foo", "bar"]) >>> foobar = FooBar(42, -42) >>> %timeit foobar.foo 102 ns ± 1.33 ns per loop (mean ± std. dev. of 7 runs, 10000000 loops each)
compared to equivalent operation on
Row
object:>>> foobar = Row(foo=42, bar=-42) >>> %timeit foobar.foo 2.58 µs ± 26.9 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)
Last but not least
namedtuples
are properly supported in Spark SQL>>> Record = namedtuple("Record", ["id", "name", "value"]) >>> spark.createDataFrame([Record(1, "foo", 42)]) DataFrame[id: bigint, name: string, value: bigint]
Summary:
It should be clear that Row
is a very poor substitute for an actual product type, and should be avoided unless enforced by Spark API.
It should be also clear that pyspark.sql.Row
is not intended to be a replacement of a case class when you consider that, it is direct equivalent of org.apache.spark.sql.Row
- type which is pretty far from an actual product, and behaves like Seq[Any]
(depending on a subclass, with names added). Both Python and Scala implementations were introduced as an useful, albeit awkward interface between external code and internal Spark SQL representation.
See also:
It would be a shame not to mention awesome MacroPy developed by Li Haoyi and its port (MacroPy3) by Alberto Berti:
>>> import macropy.console 0=[]=====> MacroPy Enabled <=====[]=0 >>> from macropy.case_classes import macros, case >>> @case ... class FooBar(foo, bar): pass ... >>> foobar = FooBar(42, -42) >>> foo, bar = foobar >>> foo 42 >>> bar -42
which comes with a rich set of other features including, but not limited to, advanced pattern matching and neat lambda expression syntax.
Python
dataclasses
(Python 3.7+).
回答2:
If you go to sql-programming-guide in Inferring the Schema Using Reflection section, you will see case class
being defined as
case class defines the schema of the table. The names of the arguments to the case class are read using reflection and become the names of the columns. Case classes can also be nested or contain complex types such as Sequences or Arrays.
with example as
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
case class Person(name: String, age: Int)
val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF()
In the same section, if you switch to python i.e. pyspark, you will see Row
being used and defined as
Rows are constructed by passing a list of key/value pairs as kwargs to the Row class. The keys of this list define the column names of the table, and the types are inferred by looking at the first row.
with example as
from pyspark.sql import SQLContext, Row
sqlContext = SQLContext(sc)
lines = sc.textFile("examples/src/main/resources/people.txt")
parts = lines.map(lambda l: l.split(","))
people = parts.map(lambda p: Row(name=p[0], age=int(p[1])))
schemaPeople = sqlContext.createDataFrame(people)
So the conclusion of the explanation is that Row
can be used as case class
in pyspark