I want to add a new column with some expression as defined here(https://www.mien.in/2018/03/25/reshaping-dataframe-using-pivot-and-melt-in-apache-spark-and-pandas/#pivot-in-spark). While doing so, my explode() function changes column names to be sought by adding back ticks(" ` ") at the beginning and at the end of each column which then gives out the error:
Cannot resolve column name `Column_name` from [Column_name, Column_name2]
I tried reading the documentation and few other questions on SO but they don't address this issue.
I tried logging the different steps, in order to give the reader some clarity.
The error is at the line:
_tmp = df.withColumn("_vars_and_vals", explode(_vars_and_vals))
The output of explode(...)
is available here(https://pastebin.com/LU9p53th)
The function snippet is:
def melt_df(
df: DataFrame,
id_vars: Iterable[str], value_vars: Iterable[str],
var_name: str = "variable", value_name: str = "value") -> DataFrame:
"""Convert :class:`DataFrame` from wide to long format."""
print("Value name is {} and value vars is {}".format(
value_name, value_vars
))
# df2 = df2.select([col(k).alias(actual_cols[k]) for k in keys_de_cols])
# Create array<struct<variable: str, value: ...>>
_vars_and_vals = array(*(
struct(lit(c).alias(var_name), col(c).alias(value_name))
for c in value_vars))
print("Explode: ")
print(explode(_vars_and_vals))
# Add to the DataFrame and explode
_tmp = df.withColumn("_vars_and_vals", explode(_vars_and_vals))
print("_tmp:")
print(_tmp)
sys.exit()
cols = id_vars + [
col("_vars_and_vals")[x].alias(x) for x in [var_name, value_name]]
return _tmp.select(*cols)
Whereas the whole code is:
import sys
from datetime import datetime
from itertools import chain
from typing import Iterable
from pyspark.context import SparkContext
from pyspark.sql import (DataFrame, DataFrameReader, DataFrameWriter, Row,
SparkSession)
from pyspark.sql.functions import *
from pyspark.sql.functions import array, col, explode, lit, struct
from pyspark.sql.types import *
spark = SparkSession.builder.appName('navydish').getOrCreate()
last_correct_constant = 11
output_file = "april19_1.csv"
input_file_name = "input_for_aviral.csv"
def melt_df(
df: DataFrame,
id_vars: Iterable[str], value_vars: Iterable[str],
var_name: str = "variable", value_name: str = "value") -> DataFrame:
"""Convert :class:`DataFrame` from wide to long format."""
print("Value name is {} and value vars is {}".format(
value_name, value_vars
))
# df2 = df2.select([col(k).alias(actual_cols[k]) for k in keys_de_cols])
# Create array<struct<variable: str, value: ...>>
_vars_and_vals = array(*(
struct(lit(c).alias(var_name), col(c).alias(value_name))
for c in value_vars))
print("Explode: ")
print(explode(_vars_and_vals))
# Add to the DataFrame and explode
_tmp = df.withColumn("_vars_and_vals", explode(_vars_and_vals))
print("_tmp:")
print(_tmp)
sys.exit()
cols = id_vars + [
col("_vars_and_vals")[x].alias(x) for x in [var_name, value_name]]
return _tmp.select(*cols)
def getrows(df, rownums=None):
return df.rdd.zipWithIndex().filter(
lambda x: x[1] in rownums).map(lambda x: x[0])
df = spark.read.csv(
input_file_name,
header=True
)
df2 = df
for _col in df.columns:
if _col.startswith("_c"):
df = df.drop(_col)
if int(_col.split("_c")[-1]) > last_correct_constant:
df2 = df2.drop(_col)
else:
# removes the reqd cols, keeps the messed up ones only.
df2 = df2.drop(_col)
actual_cols = getrows(df2, rownums=[0]).collect()[0].asDict()
keys_de_cols = actual_cols.keys()
# df2 = df2.select([col(x).alias("right_" + str(x)) for x in right_cols])
df2 = df2.select([col(k).alias(actual_cols[k]) for k in keys_de_cols])
periods = []
periods_cols = getrows(df, rownums=[0]).collect()[0].asDict()
for k, v in periods_cols.items():
if v not in periods:
periods.append(v)
# periods = list(set(periods))
expected_columns_from_df = [
'Value Offtake(000 Rs.)',
'Sales Volume (Volume(LITRES))'
]
for _col in df.columns:
if _col.startswith('Value Offtake(000 Rs.)') or _col.startswith('Sales Volume (Volume(LITRES))'):
continue
df = df.drop(_col)
df2 = df2.withColumn("id", monotonically_increasing_id())
df = df.withColumn("id", monotonically_increasing_id())
df = df2.join(df, "id", "inner").drop("id")
print("After merge, cols of final dataframe are: ")
for _col in df.columns:
print(_col)
# creating a list of all constant columns
id_vars = []
for i in range(len(df.columns)):
if i < 12:
id_vars.append(df.columns[i])
# creating a list of Values from expected columns
value_vars = []
for _col in df.columns:
if _col.startswith(expected_columns_from_df[0]):
value_vars.append(_col)
value_vars = id_vars + value_vars
print("Sending this value vars to melt:")
print(value_vars)
# the name of the column in the resulting DataFrame, Value Offtake(000 Rs.)
var_name = expected_columns_from_df[0]
# final value for which we want to melt, Periods
value_name = "Periods"
df = melt_df(
df,
id_vars, value_vars,
var_name, value_name
)
print("The final headers of the resultant dataframe are: ")
print(df.columns)
The whole error is here(https://pastebin.com/9cUupTy3)
I understand one would need the data but I guess if one could clarify the working of explode in a way that the extra unwanted quotes(" ` ") can be avoided, I can work.
.
) inside the column names. Try running after renaming the column or use backticks " ` " (your unwanted quotes) to refer to the column, see: #37252153 – Koser