Alternative ways to apply a user defined aggregate function in pyspark

I am trying to apply a user defined aggregate function to a spark dataframe, to apply additive smoothing, see the code below:

import findspark
findspark.init()
import pyspark as ps
from pyspark.sql import SQLContext
from pyspark.sql.functions import col, col, collect_list, concat_ws, udf

try:
    sc
except NameError:
    sc = ps.SparkContext()
    sqlContext = SQLContext(sc)

df = sqlContext.createDataFrame([['A', 1],
                            ['A',1],
                            ['A',0],
                            ['B',0],
                            ['B',0],
                            ['B',1]], schema=['name', 'val'])


def smooth_mean(x):
    return (sum(x)+5)/(len(x)+5)

smooth_mean_udf = udf(smooth_mean)

df.groupBy('name').agg(collect_list('val').alias('val'))\
.withColumn('val', smooth_mean_udf('val')).show()

Does it make sense to do it this way? To my understanding this does not scale well, since I am using a udf. I also fail to find the exact working of collect_list, the collect part in the name seems to indicate that data is ‘collected’ to the edge node, but I assume that data is ‘collected’ to various nodes?

Thanks in advance for any feedback.

Solution:

To my understanding this does not scale

Your understanding is correct and the biggest problem here is collect_list which is just good old groupByKey. Python udf has much lesser impact, but it doesn’t make sense to use it, for simple arithmetic operations.

Just use standard aggregations

from pyspark.sql.functions import sum as sum_, count

(df
    .groupBy("name")
    .agg(((sum_("val") + 5) / (count("val") + 5)).alias("val"))
    .show())

# +----+-----+
# |name|  val|
# +----+-----+
# |   B| 0.75|
# |   A|0.875|
# +----+-----+