如何在使用爆炸()在pyspark删除自动添加反引号?(How to remove automati

2019-10-30 04:33发布

我想与这里定义的一些表达(添加新列https://www.mien.in/2018/03/25/reshaping-dataframe-using-pivot-and-melt-in-apache-spark-and-大熊猫/#枢轴式火花 )。 虽然这样做,我的爆炸()函数更改通过添加反勾寻求的列名(“`”)开头,并在每列,然后给出了错误的结尾:

Cannot resolve column name `Column_name` from [Column_name, Column_name2]

我试着阅读文档和SO几个其他的问题,但他们不解决这个问题。

我试图登录的不同步骤,为了给读者一些清晰度。

该错误是在该行: _tmp = df.withColumn("_vars_and_vals", explode(_vars_and_vals))

输出explode(...)可以在这里( https://pastebin.com/LU9p53th )

功能片段是:

def melt_df(
        df: DataFrame,
        id_vars: Iterable[str], value_vars: Iterable[str],
        var_name: str = "variable", value_name: str = "value") -> DataFrame:
    """Convert :class:`DataFrame` from wide to long format."""
    print("Value name is {} and value vars is {}".format(
        value_name, value_vars
    ))
    # df2 = df2.select([col(k).alias(actual_cols[k]) for k in keys_de_cols])
    # Create array<struct<variable: str, value: ...>>
    _vars_and_vals = array(*(
        struct(lit(c).alias(var_name), col(c).alias(value_name))
        for c in value_vars))
    print("Explode: ")
    print(explode(_vars_and_vals))
    # Add to the DataFrame and explode
    _tmp = df.withColumn("_vars_and_vals", explode(_vars_and_vals))
    print("_tmp:")
    print(_tmp)
    sys.exit()
    cols = id_vars + [
        col("_vars_and_vals")[x].alias(x) for x in [var_name, value_name]]
    return _tmp.select(*cols)

而整个代码为:

import sys
from datetime import datetime
from itertools import chain
from typing import Iterable

from pyspark.context import SparkContext
from pyspark.sql import (DataFrame, DataFrameReader, DataFrameWriter, Row,
                         SparkSession)
from pyspark.sql.functions import *
from pyspark.sql.functions import array, col, explode, lit, struct
from pyspark.sql.types import *

spark = SparkSession.builder.appName('navydish').getOrCreate()
last_correct_constant = 11
output_file = "april19_1.csv"
input_file_name = "input_for_aviral.csv"


def melt_df(
        df: DataFrame,
        id_vars: Iterable[str], value_vars: Iterable[str],
        var_name: str = "variable", value_name: str = "value") -> DataFrame:
    """Convert :class:`DataFrame` from wide to long format."""
    print("Value name is {} and value vars is {}".format(
        value_name, value_vars
    ))
    # df2 = df2.select([col(k).alias(actual_cols[k]) for k in keys_de_cols])
    # Create array<struct<variable: str, value: ...>>
    _vars_and_vals = array(*(
        struct(lit(c).alias(var_name), col(c).alias(value_name))
        for c in value_vars))
    print("Explode: ")
    print(explode(_vars_and_vals))
    # Add to the DataFrame and explode
    _tmp = df.withColumn("_vars_and_vals", explode(_vars_and_vals))
    print("_tmp:")
    print(_tmp)
    sys.exit()
    cols = id_vars + [
        col("_vars_and_vals")[x].alias(x) for x in [var_name, value_name]]
    return _tmp.select(*cols)


def getrows(df, rownums=None):
    return df.rdd.zipWithIndex().filter(
        lambda x: x[1] in rownums).map(lambda x: x[0])


df = spark.read.csv(
    input_file_name,
    header=True
)
df2 = df

for _col in df.columns:
    if _col.startswith("_c"):
        df = df.drop(_col)
        if int(_col.split("_c")[-1]) > last_correct_constant:
            df2 = df2.drop(_col)
    else:
        # removes the reqd cols, keeps the messed up ones only.
        df2 = df2.drop(_col)

actual_cols = getrows(df2, rownums=[0]).collect()[0].asDict()

keys_de_cols = actual_cols.keys()

# df2 = df2.select([col(x).alias("right_" + str(x)) for x in right_cols])
df2 = df2.select([col(k).alias(actual_cols[k]) for k in keys_de_cols])


periods = []
periods_cols = getrows(df, rownums=[0]).collect()[0].asDict()
for k, v in periods_cols.items():
    if v not in periods:
        periods.append(v)
# periods = list(set(periods))

expected_columns_from_df = [
    'Value Offtake(000 Rs.)',
    'Sales Volume (Volume(LITRES))'
]

for _col in df.columns:
    if _col.startswith('Value Offtake(000 Rs.)') or _col.startswith('Sales Volume (Volume(LITRES))'):
        continue
    df = df.drop(_col)

df2 = df2.withColumn("id", monotonically_increasing_id())
df = df.withColumn("id", monotonically_increasing_id())
df = df2.join(df, "id", "inner").drop("id")
print("After merge, cols of final dataframe are: ")
for _col in df.columns:
    print(_col)

# creating a list of all constant columns
id_vars = []
for i in range(len(df.columns)):
    if i < 12:
        id_vars.append(df.columns[i])

# creating a list of Values from expected columns
value_vars = []
for _col in df.columns:
    if _col.startswith(expected_columns_from_df[0]):
        value_vars.append(_col)
value_vars = id_vars + value_vars

print("Sending this value vars to melt:")
print(value_vars)

# the name of the column in the resulting DataFrame, Value Offtake(000 Rs.)
var_name = expected_columns_from_df[0]

# final value for which we want to melt, Periods
value_name = "Periods"

df = melt_df(
    df,
    id_vars, value_vars,
    var_name, value_name
)

print("The final headers of the resultant dataframe are: ")
print(df.columns)

整个错误是在这里( https://pastebin.com/9cUupTy3 )

我明白了一个需要的数据,但我想,如果能明确的方式,额外的不必要的引号(“`”)是可以避免的,我可以工作爆裂的工作。

文章来源: How to remove automatically added back ticks while using explode() in pyspark?