我想与这里定义的一些表达(添加新列https://www.mien.in/2018/03/25/reshaping-dataframe-using-pivot-and-melt-in-apache-spark-and-大熊猫/#枢轴式火花 )。 虽然这样做,我的爆炸()函数更改通过添加反勾寻求的列名(“`”)开头,并在每列,然后给出了错误的结尾:
Cannot resolve column name `Column_name` from [Column_name, Column_name2]
我试着阅读文档和SO几个其他的问题,但他们不解决这个问题。
我试图登录的不同步骤,为了给读者一些清晰度。
该错误是在该行: _tmp = df.withColumn("_vars_and_vals", explode(_vars_and_vals))
输出explode(...)
可以在这里( https://pastebin.com/LU9p53th )
功能片段是:
def melt_df(
df: DataFrame,
id_vars: Iterable[str], value_vars: Iterable[str],
var_name: str = "variable", value_name: str = "value") -> DataFrame:
"""Convert :class:`DataFrame` from wide to long format."""
print("Value name is {} and value vars is {}".format(
value_name, value_vars
))
# df2 = df2.select([col(k).alias(actual_cols[k]) for k in keys_de_cols])
# Create array<struct<variable: str, value: ...>>
_vars_and_vals = array(*(
struct(lit(c).alias(var_name), col(c).alias(value_name))
for c in value_vars))
print("Explode: ")
print(explode(_vars_and_vals))
# Add to the DataFrame and explode
_tmp = df.withColumn("_vars_and_vals", explode(_vars_and_vals))
print("_tmp:")
print(_tmp)
sys.exit()
cols = id_vars + [
col("_vars_and_vals")[x].alias(x) for x in [var_name, value_name]]
return _tmp.select(*cols)
而整个代码为:
import sys
from datetime import datetime
from itertools import chain
from typing import Iterable
from pyspark.context import SparkContext
from pyspark.sql import (DataFrame, DataFrameReader, DataFrameWriter, Row,
SparkSession)
from pyspark.sql.functions import *
from pyspark.sql.functions import array, col, explode, lit, struct
from pyspark.sql.types import *
spark = SparkSession.builder.appName('navydish').getOrCreate()
last_correct_constant = 11
output_file = "april19_1.csv"
input_file_name = "input_for_aviral.csv"
def melt_df(
df: DataFrame,
id_vars: Iterable[str], value_vars: Iterable[str],
var_name: str = "variable", value_name: str = "value") -> DataFrame:
"""Convert :class:`DataFrame` from wide to long format."""
print("Value name is {} and value vars is {}".format(
value_name, value_vars
))
# df2 = df2.select([col(k).alias(actual_cols[k]) for k in keys_de_cols])
# Create array<struct<variable: str, value: ...>>
_vars_and_vals = array(*(
struct(lit(c).alias(var_name), col(c).alias(value_name))
for c in value_vars))
print("Explode: ")
print(explode(_vars_and_vals))
# Add to the DataFrame and explode
_tmp = df.withColumn("_vars_and_vals", explode(_vars_and_vals))
print("_tmp:")
print(_tmp)
sys.exit()
cols = id_vars + [
col("_vars_and_vals")[x].alias(x) for x in [var_name, value_name]]
return _tmp.select(*cols)
def getrows(df, rownums=None):
return df.rdd.zipWithIndex().filter(
lambda x: x[1] in rownums).map(lambda x: x[0])
df = spark.read.csv(
input_file_name,
header=True
)
df2 = df
for _col in df.columns:
if _col.startswith("_c"):
df = df.drop(_col)
if int(_col.split("_c")[-1]) > last_correct_constant:
df2 = df2.drop(_col)
else:
# removes the reqd cols, keeps the messed up ones only.
df2 = df2.drop(_col)
actual_cols = getrows(df2, rownums=[0]).collect()[0].asDict()
keys_de_cols = actual_cols.keys()
# df2 = df2.select([col(x).alias("right_" + str(x)) for x in right_cols])
df2 = df2.select([col(k).alias(actual_cols[k]) for k in keys_de_cols])
periods = []
periods_cols = getrows(df, rownums=[0]).collect()[0].asDict()
for k, v in periods_cols.items():
if v not in periods:
periods.append(v)
# periods = list(set(periods))
expected_columns_from_df = [
'Value Offtake(000 Rs.)',
'Sales Volume (Volume(LITRES))'
]
for _col in df.columns:
if _col.startswith('Value Offtake(000 Rs.)') or _col.startswith('Sales Volume (Volume(LITRES))'):
continue
df = df.drop(_col)
df2 = df2.withColumn("id", monotonically_increasing_id())
df = df.withColumn("id", monotonically_increasing_id())
df = df2.join(df, "id", "inner").drop("id")
print("After merge, cols of final dataframe are: ")
for _col in df.columns:
print(_col)
# creating a list of all constant columns
id_vars = []
for i in range(len(df.columns)):
if i < 12:
id_vars.append(df.columns[i])
# creating a list of Values from expected columns
value_vars = []
for _col in df.columns:
if _col.startswith(expected_columns_from_df[0]):
value_vars.append(_col)
value_vars = id_vars + value_vars
print("Sending this value vars to melt:")
print(value_vars)
# the name of the column in the resulting DataFrame, Value Offtake(000 Rs.)
var_name = expected_columns_from_df[0]
# final value for which we want to melt, Periods
value_name = "Periods"
df = melt_df(
df,
id_vars, value_vars,
var_name, value_name
)
print("The final headers of the resultant dataframe are: ")
print(df.columns)
整个错误是在这里( https://pastebin.com/9cUupTy3 )
我明白了一个需要的数据,但我想,如果能明确的方式,额外的不必要的引号(“`”)是可以避免的,我可以工作爆裂的工作。