DevOps | Cloud | Analytics | Open Source | Programming





How To Fix - Data Skewness in Spark (Salting Method)



In this post , we will see How to Fix - Data Skewness in Spark using Salting Method. Data skew problem is basically related to an Uneven or Non-Uniform Distribution of data . In Real-Life Production scenarios, we often have to handle data which is far from ideal data. Hence it is imperative that we are equipped to handle such data scenarios.   We will try to understand Data Skew from Two Table Join perspective.

  • Let's say we have Two Tables A, B - that we are trying to join based on a specific column\key.
 

  • For joins and Other aggregations , Spark has to co-locate various records of a single key in a single partition.
 

  • Records of a particular key will always be in a single partition.
 

  • Similarly any other key and corresponding records will be distributed in other partitions.
 

  • Now imagine if a key has more records compared to the other key. So the corresponding partition would becomes very large or SKEWED (compared to the other partitions). As such whichever executor will be processing that SPECIFIC partition , will need comparatively more time to process.
 

  • This causes the overall Spark job to Standstill , Low utilization of CPU and sometimes even memory issues.
 

  • This is in nutshell what is Data Skew and How it affects Low Performance in Spark
    Now Let's see How to Fix the Data Skew issue - First technique is- Salting or Key-Salting. The idea is to modify the existing key to make an even distribution of data. What we do in this technique is -

  • Table A - Large Table
    • Extend the Existing Key by adding Some-Character + Random No. from some Range

        e.g. Existing-Key + "_" + Range(1,10)
        
  • Table B - Medium Table
    • Use Explode Operation on the Key as shown below

Explode(Existing-Key , Range(1,10))  -> x_1, x_2, .............,x_10

Let's Understand - What Exactly happens UNDER THE HOOD when you use the Salting method (Code is below)

  • Assume we are joining Table A & Table B using Column X which is the Key.

  • Let's say Col X has 3 types of values(keys) - x, y & z.

  • Assume More records exists for key value=x (compared to key y & z).


Before Salting - ("=" represents records - Notice key x has more no. of records)

x ======================================
y ======
z ======

  • This makes the data skewed. Since whichever node or executor has to process records pertaining to key=x , has to process more records, more data and hence it will take much more time compared . The nodes or executors which process key y & z , will process faster since these keys have less records.
 

  • To handle this , in Salting we split the key=x into say x_1, x_2 ....etc. (see code below how this is done). So now ALL the records associated with JUST ONE KEY=x , gets splitted across MORE NO. of KEYS (x_1, x_2......).
    • This splitting is done by appending Random values (1 to 10 in this case) to key=x , hence the key "x" will have more varieties viz x_1, x_2, x_3 ......x_10.
    • Since key-value "x" was the cause of the Data-skew due to larger no. of records belonging to key "x". Now since we have exploded the keys by adding Random value (1 to 10 in this case) , hence the key "x" will have more varieties viz x_1, x_2, x_3 ......x_10.
    • This basically means instead of "x" , we will have 10 New Keys . So all records pertaining to a Single key "x" , will get spread amongst 10 sets of New Keys. This makes the data to be more distributed.

After Salting - ("=" represents records - Notice the Uniformity now across all keys)

x_1 =========
x_2 =========
x_3 =========
x_4 =========
x_5 =========
x_6 =========
x_7 =========
x_8 =========
x_9 =========
x_10 =========
y ==========
z ==========

  • Now when you process the data in Spark (Join or any other operation), the data skewness is eradicated.
  The code to implement the Salting Method is given below -


# SALTING TECHNIQUE - SKEWNESS REMOVAL CODE 

import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions.{array, concat, explode, floor, lit, rand}


def removeDataSkew(bigLeftTable: DataFrame, Col_X: String, rightTable: DataFrame) = {
    var df1 = bigLeftTable
             .withColumn(Col_X, concat(
                         bigLeftTable.col(Col_X), lit("_"), lit(floor(rand(10000) * 10))))
    var df2 = rightTable
             .withColumn("newExplodedCol",
                          explode(array((0 to 10).map(lit(_)): _ *)))
(df1, df2)   ===> THESE ARE THE NEW DFs FROM WHERE SKEWNESS IS REMOVED
}


# CALLING THE SALTING LOGIC TO REMOVE ORIGINAL DATA SKEW
# After removing Skewness, we get the Un-skewed New Dfs

val (newDf1, newDf2) = removeDataSkew(oldDf1, "_1", oldDf2)  ==> "_1" represents the left column(the key)



# NEW JOIN AFTER REMOVING DATA SKEWNESS(THROUGH SALTING TECHNIQUE)
 
newDf1.join(newDf2,
newDf1.col("<col_name>")<=> newDf2.col("<col_name>")
)
.show(10,false)

Hope this post helps to understand How to Fix - Data Skewness in Spark using Salting Method. Please share the post, if it helped you.  

Other Interesting Reads -

 


how to solve data skew in spark , spark data skew repartition , what is garbage collection in spark , why your spark applications are slow or failing, part 3, dynamic repartitioning in spark ,salting for data skewness , spark join, salted join, What is salting in spark ,  How does spark prevent data skew ,  Why Your Spark applications are slow or failing, What is data skew in spark,spark salting, salting in spark, spark skew,data skew in spark,spark skew join optimization,spark skewed join,spark data skew