How to remove automatically added back ticks while using explode() in pyspark?
Asked Answered
S

0

1

I want to add a new column with some expression as defined here(https://www.mien.in/2018/03/25/reshaping-dataframe-using-pivot-and-melt-in-apache-spark-and-pandas/#pivot-in-spark). While doing so, my explode() function changes column names to be sought by adding back ticks(" ` ") at the beginning and at the end of each column which then gives out the error:

Cannot resolve column name `Column_name` from [Column_name, Column_name2]

I tried reading the documentation and few other questions on SO but they don't address this issue.

I tried logging the different steps, in order to give the reader some clarity.

The error is at the line: _tmp = df.withColumn("_vars_and_vals", explode(_vars_and_vals))

The output of explode(...) is available here(https://pastebin.com/LU9p53th)

The function snippet is:

def melt_df(
        df: DataFrame,
        id_vars: Iterable[str], value_vars: Iterable[str],
        var_name: str = "variable", value_name: str = "value") -> DataFrame:
    """Convert :class:`DataFrame` from wide to long format."""
    print("Value name is {} and value vars is {}".format(
        value_name, value_vars
    ))
    # df2 = df2.select([col(k).alias(actual_cols[k]) for k in keys_de_cols])
    # Create array<struct<variable: str, value: ...>>
    _vars_and_vals = array(*(
        struct(lit(c).alias(var_name), col(c).alias(value_name))
        for c in value_vars))
    print("Explode: ")
    print(explode(_vars_and_vals))
    # Add to the DataFrame and explode
    _tmp = df.withColumn("_vars_and_vals", explode(_vars_and_vals))
    print("_tmp:")
    print(_tmp)
    sys.exit()
    cols = id_vars + [
        col("_vars_and_vals")[x].alias(x) for x in [var_name, value_name]]
    return _tmp.select(*cols)

Whereas the whole code is:

import sys
from datetime import datetime
from itertools import chain
from typing import Iterable

from pyspark.context import SparkContext
from pyspark.sql import (DataFrame, DataFrameReader, DataFrameWriter, Row,
                         SparkSession)
from pyspark.sql.functions import *
from pyspark.sql.functions import array, col, explode, lit, struct
from pyspark.sql.types import *

spark = SparkSession.builder.appName('navydish').getOrCreate()
last_correct_constant = 11
output_file = "april19_1.csv"
input_file_name = "input_for_aviral.csv"


def melt_df(
        df: DataFrame,
        id_vars: Iterable[str], value_vars: Iterable[str],
        var_name: str = "variable", value_name: str = "value") -> DataFrame:
    """Convert :class:`DataFrame` from wide to long format."""
    print("Value name is {} and value vars is {}".format(
        value_name, value_vars
    ))
    # df2 = df2.select([col(k).alias(actual_cols[k]) for k in keys_de_cols])
    # Create array<struct<variable: str, value: ...>>
    _vars_and_vals = array(*(
        struct(lit(c).alias(var_name), col(c).alias(value_name))
        for c in value_vars))
    print("Explode: ")
    print(explode(_vars_and_vals))
    # Add to the DataFrame and explode
    _tmp = df.withColumn("_vars_and_vals", explode(_vars_and_vals))
    print("_tmp:")
    print(_tmp)
    sys.exit()
    cols = id_vars + [
        col("_vars_and_vals")[x].alias(x) for x in [var_name, value_name]]
    return _tmp.select(*cols)


def getrows(df, rownums=None):
    return df.rdd.zipWithIndex().filter(
        lambda x: x[1] in rownums).map(lambda x: x[0])


df = spark.read.csv(
    input_file_name,
    header=True
)
df2 = df

for _col in df.columns:
    if _col.startswith("_c"):
        df = df.drop(_col)
        if int(_col.split("_c")[-1]) > last_correct_constant:
            df2 = df2.drop(_col)
    else:
        # removes the reqd cols, keeps the messed up ones only.
        df2 = df2.drop(_col)

actual_cols = getrows(df2, rownums=[0]).collect()[0].asDict()

keys_de_cols = actual_cols.keys()

# df2 = df2.select([col(x).alias("right_" + str(x)) for x in right_cols])
df2 = df2.select([col(k).alias(actual_cols[k]) for k in keys_de_cols])


periods = []
periods_cols = getrows(df, rownums=[0]).collect()[0].asDict()
for k, v in periods_cols.items():
    if v not in periods:
        periods.append(v)
# periods = list(set(periods))

expected_columns_from_df = [
    'Value Offtake(000 Rs.)',
    'Sales Volume (Volume(LITRES))'
]

for _col in df.columns:
    if _col.startswith('Value Offtake(000 Rs.)') or _col.startswith('Sales Volume (Volume(LITRES))'):
        continue
    df = df.drop(_col)

df2 = df2.withColumn("id", monotonically_increasing_id())
df = df.withColumn("id", monotonically_increasing_id())
df = df2.join(df, "id", "inner").drop("id")
print("After merge, cols of final dataframe are: ")
for _col in df.columns:
    print(_col)

# creating a list of all constant columns
id_vars = []
for i in range(len(df.columns)):
    if i < 12:
        id_vars.append(df.columns[i])

# creating a list of Values from expected columns
value_vars = []
for _col in df.columns:
    if _col.startswith(expected_columns_from_df[0]):
        value_vars.append(_col)
value_vars = id_vars + value_vars

print("Sending this value vars to melt:")
print(value_vars)

# the name of the column in the resulting DataFrame, Value Offtake(000 Rs.)
var_name = expected_columns_from_df[0]

# final value for which we want to melt, Periods
value_name = "Periods"

df = melt_df(
    df,
    id_vars, value_vars,
    var_name, value_name
)

print("The final headers of the resultant dataframe are: ")
print(df.columns)

The whole error is here(https://pastebin.com/9cUupTy3)

I understand one would need the data but I guess if one could clarify the working of explode in a way that the extra unwanted quotes(" ` ") can be avoided, I can work.

Sudanic answered 21/4, 2019 at 10:10 Comment(2)
Can you use the built in stack function? A la #42466068Radiophotograph
The issue could be due to the dot (.) inside the column names. Try running after renaming the column or use backticks " ` " (your unwanted quotes) to refer to the column, see: #37252153Koser

© 2022 - 2024 — McMap. All rights reserved.