时间三角洲Pyspark数据帧 - 类型错误(Timedelta in Pyspark Datafr

2019-10-29 16:20发布

我工作的星火2.3,Python的3.6与pyspark 2.3.1

我有一个数据帧星火其中每个条目是一个工作步骤,我希望得到一些行连成一个工作会议。 这应该在下面的函数来完成getSessions 。 我相信,它的工作原理。

我还创建包含我希望所有的信息RDD - 每个条目与所需的列行对象,它看起来像类型是罚款(一些数据伪装):

rddSessions_flattened.take(1)

# [Row(counter=1, end=datetime.datetime(2017, 11, 6, 9, 15, 20), end_id=2758327, no_of_entries=5, shortID=u'DISGUISED', start=datetime.datetime(2017, 11, 6, 9, 13, 59), start_id=INTEGERNUMBER, strDuration='0:01:21', tNumber=u'DISGUISED', timeDuration=datetime.timedelta(0, 81))]

如果我现在想使RDD的数据帧我们,我得到一个类型错误。

df = rddSessions_flattened.toDF()

df.show()

# TypeError: not supported type: type 'datetime.timedelta'

(张贴在最末端的完整的错误消息)

任何思考什么地方出了错,以及如何解决这一问题?

  • 我基本上是靠星火推断架构; 我认为这可以作为spark.sql.types模块具有类TimestampType
  • 我将如何编程界定呢? 阿帕奇星火编程指南是我也不清楚。

感谢您的想法!


def getSessions(values, threshold=threshold):
"""
Create sessions for one person on one case

Arguments:
    values:      sorted list of tuples (datetime, id)
    threshold:   time delta object; max time of a session

Return:
    sessions:    list of sessions (SData)
"""

SData = Row(
    'counter'
    , 'start'
    , 'start_id'
    , 'end'
    , 'end_id'
    , 'strDuration'
    , 'timeDuration'
    , 'no_of_entries'
)

counter = 1
no_of_rows = 1
sessions = []   # list of sessions

session_start = values[0][0]   # first entry of the first tuple in the list
start_row_id = values[0][1]
session_end = session_start
end_row_id = start_row_id

for row_time, row_id in values[1:]:
    if row_time - session_start > threshold:
        # new session found, so append previous session
        sessions.append(SData(
            counter
            , session_start
            , start_row_id
            , session_end
            , end_row_id
            , str(session_end - session_start)
            , session_end - session_start
            , no_of_rows
            )
        )         
        # get the information for the next session
        counter += 1
        no_of_rows = 1
        session_start = row_time
        start_row_id = row_id
    else:
        no_of_rows +=1

    # regardless if new session or not: session_end reset to current entry
    session_end = row_time
    end_row_id = row_id            

# very last session has to be recorded as there is no "next" row
sessions.append(SData(
    counter
    , session_start
    , start_row_id
    , session_end
    , end_row_id
    , str(session_end - session_start)
    , session_end - session_start
    , no_of_rows
    )
)
return sessions

完整的错误信息:

not supported type: <type 'datetime.timedelta'>
Traceback (most recent call last):
  File "/app/cdh/lib/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2/python/lib/pyspark.zip/pyspark/sql/session.py", line 58, in toDF
    return sparkSession.createDataFrame(self, schema, sampleRatio)
  File "/app/cdh/lib/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2/python/lib/pyspark.zip/pyspark/sql/session.py", line 687, in createDataFrame
    rdd, schema = self._createFromRDD(data.map(prepare), schema, samplingRatio)
  File "/app/cdh/lib/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2/python/lib/pyspark.zip/pyspark/sql/session.py", line 384, in _createFromRDD
    struct = self._inferSchema(rdd, samplingRatio, names=schema)
  File "/app/cdh/lib/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2/python/lib/pyspark.zip/pyspark/sql/session.py", line 364, in _inferSchema
    schema = _infer_schema(first, names=names)
  File "/app/cdh/lib/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2/python/lib/pyspark.zip/pyspark/sql/types.py", line 1096, in _infer_schema
    fields = [StructField(k, _infer_type(v), True) for k, v in items]
  File "/app/cdh/lib/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2/python/lib/pyspark.zip/pyspark/sql/types.py", line 1070, in _infer_type
    raise TypeError("not supported type: %s" % type(obj))
TypeError: not supported type: <type 'datetime.timedelta'>

Answer 1:

TimestampType是不一样的pd.timedelta 。 前者类似于pd.timestamp第二有火花这是一种类似类型CalendarIntervalType但看起来像自动推理的确没有为你工作,这是不平凡的工作。 我建议,如果可能的话,您将转换pd.timedelta到现在已经一个秒或毫秒integer的(秒或毫秒)和下游应用中使用它,因为它是,但更易于使用,并且仍然表示时间间隔在您选择单位。



文章来源: Timedelta in Pyspark Dataframes - TypeError