DevOps | Cloud | Analytics | Open Source | Programming





How to use Broadcast Variable in Spark ?



In this post , we will see - How to use Broadcast Variable in Spark . Broadcast variables can be tricky if the concepts behind are not clearly understood. This creates errors while using any Broadcast variables down the line. Broadcast variables are used to implement map-side join, i.e. a join using a map. e.g.. Lookup tables or data are distributed across nodes in a Distributed cluster using broadcast . And they are then used inside map (to do the join implicitly).

When you broadcast some data , the data gets copied to All the executors only once (So we avoid copying the same data again & again for tasks otherwise). Hence the broadcast makes your Spark application faster when you have a large value to use in tasks or there are more no. of tasks than executors.   To use any Broadcast variables correctly , note the below points and cross-check against your usage .  

  • Broadcast Type errors - A broadcast variable is not necessarily an RDD or a Collection. It's just whatever type you assign to it. You just think of a broadcast variable as a local variable that is local to every machine. Every worker will have a copy of whatever you've broadcasted so you don't need to worry about assigning it to specific RDD values. It is simply a data structure like a global variable that is read (and not written to) by all the workers.
Consider the example


var2=sc.broadcast(var1)

In this case , Type of Broadcast var2 is whatever var1 Type is ! A broadcast variable can contain any class (Integer or any object etc.). It is by no means a scala collection. The best time to use and RDD is when you have a fairly large object that you're going to need for most values in the RDD.    

  • Broadcast Join Errors - You should not use Standard broadcasts to handle distributed data structures. To perform broadcast join on any dataframe , the dataframe needs to be prepped for broadcasting beforehand. See the example below -
 


import org.apache.spark.sql.functions.broadcast

val productType: DataFrame = <SOME\_DATAFRAME>

val tmpDf: DataFrame = broadcast(
productType.withColumnRenamed("PROD\_ID", "PROD\_ID\_SOLD").as("soldProducts")
)

dataTable.as("somedf").join(
broadcast(tmpDf), $"somedf.SOLD\_PROD" === $"productType.PROD\_ID\_SOLD", "inner")

Note that - distributed data structure broadcast value is evaluated locally before join is called. This will ensure to make the tmpDf to broadcast afterwards.  

  • Broadcast Join Plans - If you want to see the Plan of the Broadcast join , use "explain. Example as reference -
 


Df1.join(
    broadcast(Df2),
    Df1("col1") <=> Df2("col2")
).explain()

 

  • To release a broadcast variable, first unpersist it and then destroy it.

broadcastVar.unpersist

broadcastVar.destroy

       

Other Interesting Reads -

   


How to use broadcast join in spark sql, What is accumulator and broadcast variable in spark,  spark broadcast variable,  shared variable in spark, spark broadcast join, spark broadcast dataframe, spark broadcast join syntax, pyspark broadcast variable in udf, broadcast variable spark java example, spark update broadcast variable, accumulator and broadcast variable in spark, spark broadcast join, shared variables in spark, accumulator variable in spark, spark broadcast large dataset