我工作的星火2.3,Python的3.6与pyspark 2.3.1
我有一个数据帧星火其中每个条目是一个工作步骤,我希望得到一些行连成一个工作会议。 这应该在下面的函数来完成getSessions
。 我相信,它的工作原理。
我还创建包含我希望所有的信息RDD - 每个条目与所需的列行对象,它看起来像类型是罚款(一些数据伪装):
rddSessions_flattened.take(1)
# [Row(counter=1, end=datetime.datetime(2017, 11, 6, 9, 15, 20), end_id=2758327, no_of_entries=5, shortID=u'DISGUISED', start=datetime.datetime(2017, 11, 6, 9, 13, 59), start_id=INTEGERNUMBER, strDuration='0:01:21', tNumber=u'DISGUISED', timeDuration=datetime.timedelta(0, 81))]
如果我现在想使RDD的数据帧我们,我得到一个类型错误。
df = rddSessions_flattened.toDF()
df.show()
# TypeError: not supported type: type 'datetime.timedelta'
(张贴在最末端的完整的错误消息)
任何思考什么地方出了错,以及如何解决这一问题?
- 我基本上是靠星火推断架构; 我认为这可以作为spark.sql.types模块具有类TimestampType
- 我将如何编程界定呢? 阿帕奇星火编程指南是我也不清楚。
感谢您的想法!
def getSessions(values, threshold=threshold):
"""
Create sessions for one person on one case
Arguments:
values: sorted list of tuples (datetime, id)
threshold: time delta object; max time of a session
Return:
sessions: list of sessions (SData)
"""
SData = Row(
'counter'
, 'start'
, 'start_id'
, 'end'
, 'end_id'
, 'strDuration'
, 'timeDuration'
, 'no_of_entries'
)
counter = 1
no_of_rows = 1
sessions = [] # list of sessions
session_start = values[0][0] # first entry of the first tuple in the list
start_row_id = values[0][1]
session_end = session_start
end_row_id = start_row_id
for row_time, row_id in values[1:]:
if row_time - session_start > threshold:
# new session found, so append previous session
sessions.append(SData(
counter
, session_start
, start_row_id
, session_end
, end_row_id
, str(session_end - session_start)
, session_end - session_start
, no_of_rows
)
)
# get the information for the next session
counter += 1
no_of_rows = 1
session_start = row_time
start_row_id = row_id
else:
no_of_rows +=1
# regardless if new session or not: session_end reset to current entry
session_end = row_time
end_row_id = row_id
# very last session has to be recorded as there is no "next" row
sessions.append(SData(
counter
, session_start
, start_row_id
, session_end
, end_row_id
, str(session_end - session_start)
, session_end - session_start
, no_of_rows
)
)
return sessions
完整的错误信息:
not supported type: <type 'datetime.timedelta'>
Traceback (most recent call last):
File "/app/cdh/lib/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2/python/lib/pyspark.zip/pyspark/sql/session.py", line 58, in toDF
return sparkSession.createDataFrame(self, schema, sampleRatio)
File "/app/cdh/lib/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2/python/lib/pyspark.zip/pyspark/sql/session.py", line 687, in createDataFrame
rdd, schema = self._createFromRDD(data.map(prepare), schema, samplingRatio)
File "/app/cdh/lib/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2/python/lib/pyspark.zip/pyspark/sql/session.py", line 384, in _createFromRDD
struct = self._inferSchema(rdd, samplingRatio, names=schema)
File "/app/cdh/lib/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2/python/lib/pyspark.zip/pyspark/sql/session.py", line 364, in _inferSchema
schema = _infer_schema(first, names=names)
File "/app/cdh/lib/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2/python/lib/pyspark.zip/pyspark/sql/types.py", line 1096, in _infer_schema
fields = [StructField(k, _infer_type(v), True) for k, v in items]
File "/app/cdh/lib/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2/python/lib/pyspark.zip/pyspark/sql/types.py", line 1070, in _infer_type
raise TypeError("not supported type: %s" % type(obj))
TypeError: not supported type: <type 'datetime.timedelta'>