So what I am trying to do is simply to convert fields:
year, month, day, hour, minute
(which are of type integer as seen below) into a string type.
So I have a dataframe df_src of type :
<class 'pyspark.sql.dataframe.DataFrame'>
and here is its schema:
root
|-- src_ip: string (nullable = true)
|-- year: integer (nullable = true)
|-- month: integer (nullable = true)
|-- day: integer (nullable = true)
|-- hour: integer (nullable = true)
|-- minute: integer (nullable = true)
I also declared a function earlier :
def parse_df_to_string(year, month, day, hour=0, minute=0):
second = 0
return "{0:04d}-{1:02d}-{2:02d} {3:02d}:{4:02d}:{5:02d}".format(year, month, day, hour, minute, second)
And I also did a test and it works like a charm :
print parse_df_to_string(2016, 10, 15, 21)
print type(parse_df_to_string(2016, 10, 15, 21))
2016-10-15 21:00:00
<type 'str'>
so I also did something similar as in spark api with udf :
from pyspark.sql.functions import udf
u_parse_df_to_string = udf(parse_df_to_string)
where finally this request :
df_src.select('*',
u_parse_df_to_string(df_src['year'], df_src['month'], df_src['day'], df_src['hour'], df_src['minute'])
).show()
would cause :
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
<ipython-input-126-770b587e10e6> in <module>()
25 # Could not make this part wor..
26 df_src.select('*',
---> 27 u_parse_df_to_string(df_src['year'], df_src['month'], df_src['day'], df_src['hour'], df_src['minute'])
28 ).show()
/opt/spark-2.0.0-bin-hadoop2.7/python/pyspark/sql/dataframe.pyc in show(self, n, truncate)
285 +---+-----+
286 """
--> 287 print(self._jdf.showString(n, truncate))
288
289 def __repr__(self):
/opt/spark-2.0.0-bin-hadoop2.7/python/lib/py4j-0.10.1-src.zip/py4j/java_gateway.py in __call__(self, *args)
931 answer = self.gateway_client.send_command(command)
932 return_value = get_return_value(
--> 933 answer, self.gateway_client, self.target_id, self.name)
934
935 for temp_arg in temp_args:
/opt/spark-2.0.0-bin-hadoop2.7/python/pyspark/sql/utils.pyc in deco(*a, **kw)
61 def deco(*a, **kw):
62 try:
---> 63 return f(*a, **kw)
64 except py4j.protocol.Py4JJavaError as e:
65 s = e.java_exception.toString()
...
Py4JJavaError: An error occurred while calling o5074.showString.
: java.lang.UnsupportedOperationException: Cannot evaluate expression: parse_df_to_string(input[1, int, true], input[2, int, true], input[3, int, true], input[4, int, true], input[5, int, true])
at org.apache.spark.sql.catalyst.expressions.Unevaluable$class.doGenCode(Expression.scala:224)
at org.apache.spark.sql.execution.python.PythonUDF.doGenCode(PythonUDF.scala:27)
at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:104)
at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:101)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:101)
at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext$$anonfun$generateExpressions$1.apply(CodeGenerator.scala:740)
at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext$$anonfun$generateExpressions$1.apply(CodeGenerator.scala:740)
...
I tried many things, I tried to call the method with only one parameter&argument...but did not help.
One way it did work though is by creating a new dataframe with a new column as follow :
df_src_grp_hr_d = df_src.select('*', concat(
col("year"),
lit("-"),
col("month"),
lit("-"),
col("day"),
lit(" "),
col("hour"),
lit(":0")).alias('time'))`
where after that I could cast the column to timestamp :
df_src_grp_hr_to_timestamp = df_src_grp_hr_d.select(
df_src_grp_hr_d['src_ip'],
df_src_grp_hr_d['year'],
df_src_grp_hr_d['month'],
df_src_grp_hr_d['day'],
df_src_grp_hr_d['hour'],
df_src_grp_hr_d['time'].cast('timestamp'))