Pyspark: show histogram of a data frame column
Asked Answered
L

8

50

In pandas data frame, I am using the following code to plot histogram of a column:

my_df.hist(column = 'field_1')

Is there something that can achieve the same goal in pyspark data frame? (I am in Jupyter Notebook) Thanks!

Lahomalahore answered 25/8, 2016 at 20:35 Comment(0)
J
56

Unfortunately I don't think that there's a clean plot() or hist() function in the PySpark Dataframes API, but I'm hoping that things will eventually go in that direction.

For the time being, you could compute the histogram in Spark, and plot the computed histogram as a bar chart. Example:

import pandas as pd
import pyspark.sql as sparksql

# Let's use UCLA's college admission dataset
file_name = "https://stats.idre.ucla.edu/stat/data/binary.csv"

# Creating a pandas dataframe from Sample Data
df_pd = pd.read_csv(file_name)

sql_context = sparksql.SQLcontext(sc)

# Creating a Spark DataFrame from a pandas dataframe
df_spark = sql_context.createDataFrame(df_pd)

df_spark.show(5)

This is what the data looks like:

Out[]:    +-----+---+----+----+
          |admit|gre| gpa|rank|
          +-----+---+----+----+
          |    0|380|3.61|   3|
          |    1|660|3.67|   3|
          |    1|800| 4.0|   1|
          |    1|640|3.19|   4|
          |    0|520|2.93|   4|
          +-----+---+----+----+
          only showing top 5 rows


# This is what we want
df_pandas.hist('gre');

Histogram when plotted in using df_pandas.hist()

# Doing the heavy lifting in Spark. We could leverage the `histogram` function from the RDD api

gre_histogram = df_spark.select('gre').rdd.flatMap(lambda x: x).histogram(11)

# Loading the Computed Histogram into a Pandas Dataframe for plotting
pd.DataFrame(
    list(zip(*gre_histogram)), 
    columns=['bin', 'frequency']
).set_index(
    'bin'
).plot(kind='bar');

Histogram computed by using RDD.histogram()

Javanese answered 6/10, 2016 at 8:58 Comment(4)
I'm getting an error when generating a dataframe from the zip iterator. Given the pyspark histogram, creating the pandas dataframe is a bit cleaner and works for me with pd.DataFrame(list(zip(*gre_histogram)), columns=['bin', 'frequency'])Ailsun
gre_histogram = spark_df.select('gre').rdd.flatMap(lambda x: x).histogram(11) is the winning line, combo this guy with the matplotlib answer belowEnact
I usually work with DataFrame and have no experience with RDDs. Why is it necessary to apply flatMap() here?Jongjongleur
For anyone who is wondering what 11 means, it is the number of bins.Nip
C
27

You can now use the pyspark_dist_explore package to leverage the matplotlib hist function for Spark DataFrames:

from pyspark_dist_explore import hist
import matplotlib.pyplot as plt

fig, ax = plt.subplots()
hist(ax, my_df.select('field_1'), bins = 20, color=['red'])

This library uses the rdd histogram function to calculate bin values.

Clint answered 17/7, 2017 at 20:18 Comment(3)
import matplotlib.pyplot as pltContumacious
What is data_frame?Unwarranted
If using databricks notebook, just add display(fig) at the endSchmeltzer
H
6

Another solution, without the need for extra imports, which should also be efficient; First, use window partition:

import pyspark.sql.functions as F
import pyspark.sql as SQL
win = SQL.Window.partitionBy('column_of_values')

Then all you need it to use count aggregation partitioned by the window:

df.select(F.count('column_of_values').over(win).alias('histogram'))

The aggregative operators happens on each partition of the cluster, and does not require an extra round-trip to the host.

Hamer answered 6/11, 2017 at 12:48 Comment(5)
My results showed as 'DataFrame[histogram: bigint]' Do you know why it didn't generate the actual plot? thanksPermissible
It seems to me as if this is the schema of the plot (which is OK), does it contains actual data?Hamer
Yes, it contains the actual data. I tried different ways, but couldn't generate the figure :-(Permissible
Nowhere in this code does it plot a figure... the return of this code is simply a PySpark DataFrame with one column named "histogram".Disjunct
This is not clear. the partitionBy needs a partition for a group by or something, you can't make histogram bins on the fly using this.Enchanter
F
2

The histogram method for RDDs returns the bin ranges and the bin counts. Here's a function that takes this histogram data and plots it as a histogram.

import numpy as np
import matplotlib.pyplot as mplt
import matplotlib.ticker as mtick

def plotHistogramData(data):
    binSides, binCounts = data

    N = len(binCounts)
    ind = np.arange(N)
    width = 1

    fig, ax = mplt.subplots()
    rects1 = ax.bar(ind+0.5, binCounts, width, color='b')

    ax.set_ylabel('Frequencies')
    ax.set_title('Histogram')
    ax.set_xticks(np.arange(N+1))
    ax.set_xticklabels(binSides)
    ax.xaxis.set_major_formatter(mtick.FormatStrFormatter('%.2e'))
    ax.yaxis.set_major_formatter(mtick.FormatStrFormatter('%.2e'))

    mplt.show()

(This code assumes that bins have equal length.)

Fridge answered 11/10, 2017 at 16:52 Comment(2)
binSides, binCounts = data What's the input format of data?Permissible
for this is working using as input aggData.select(columnName).rdd.flatMap(lambda x: x).histogram(10). Question: how would I draw, on the x, instead of the bins value(1,2,3,...), the average value inside the bin?Cardiovascular
S
1

this piece of code simply makes a new column dividing the data to equal size bins and then groups the data by this column. this can be plotted as a bar plot to see a histogram.

bins = 10
df.withColumn("factor", F.expr("round(field_1/bins)*bins")).groupBy("factor").count()
Stansbury answered 30/1, 2021 at 14:22 Comment(2)
While this code may answer the question, providing additional context regarding why and/or how this code answers the question improves its long-term value.Autobus
I get: AnalysisException: cannot resolve 'bins' given input columns: [field_1]; line 1 pos 19Egression
U
1

With this approach, not only can you get distribution results in pyspark, but also easily control groups, nbins or custom bin interval(binn). (without importing any libraries including pandas)

def pyspark_histogram(df, col, group=None, nbins=100, binn=None):
    if not group:
        group = []
    w = Window.partitionBy(group)
    
    df = (
        df
        .withColumn("hist_div", F.lit(binn) if binn else (F.max(col).over(w)-F.min(col).over(w))/nbins)
        .withColumn(col, F.floor(F.col(col)/F.col("hist_div"))*F.col("hist_div"))
        .groupBy(group + [col])
        .agg(F.count("*").alias("count"))
        .withColumn("sum", F.sum("count").over(w))
        .withColumn("percent(%)", F.round(F.col("count")*100.0/F.col("sum"), 1))
        .drop("sum")
        .orderBy(group + [col])
    )
    return df
Ury answered 17/11, 2023 at 7:16 Comment(0)
B
0

This is straightforward and works well.

df.groupby(
  '<group-index>'
).count().select(
  'count'
).rdd.flatMap(
  lambda x: x
).histogram(20)
Bookmaker answered 26/3, 2019 at 21:35 Comment(2)
Nice. How would you plot this?Kink
You'd have to export data and do it somewhere else if you're in the shell, or render within a Jupyter/Zeppelin/Sagemaker notebook if you're in that type of interactive environment.Bookmaker
C
0

This is my approach:

import pyspark.sql.functions as f

def plotHist(df, variable, minValue, maxValue, bins = 10):
  factor = (bins - 1) / (maxValue - minValue)
  (
    df.withColumn('cappedValue', f.least(f.lit(maxValue), f.greatest(f.lit(minValue), variable)))
    .withColumn('buckets', f.round(((f.col('cappedValue')) - minValue)*factor)/factor + minValue)
    .groupBy('buckets').count()
  )
Cordeelia answered 13/7, 2023 at 15:38 Comment(1)
As it’s currently written, your answer is unclear. Please edit to add additional details that will help others understand how this addresses the question asked. You can find more information on how to write good answers in the help center.Yettayetti

© 2022 - 2024 — McMap. All rights reserved.