Cyber Security | DevOps | Cloud | Analytics | Open Source | Programming





How to Handle Bad or Corrupt records in Apache Spark ?



In this post , we will see How to Handle Bad or Corrupt records in Apache Spark . When reading data from any file source, Apache Spark might face issues if the file contains any bad or corrupted records. Or in case Spark is unable to parse such records. Lets see all the options we have to handle bad or corrupted records or data.  

Option 1- Using badRecordsPath :

To handle such bad or corrupted records/files , we can use an Option called "badRecordsPath" while sourcing the data. In this option, Spark processes only the correct records and the corrupted or bad records are excluded from the processing logic as explained below. It has two main features -

  • The path to store exception files for recording the information about bad records (CSV and JSON sources) and
  • Bad files for all the file-based built-in sources (for example, Parquet).
In case of erros like network issue , IO exception etc. , the errors are ignored . But these are recorded under the badRecordsPath, and Spark will continue to run the tasks. Lets see an example -


//Consider an input csv file with below data
Country, Rank
France,1
Canada,2
Netherlands,Netherlands

val df = spark.read
              .option("badRecordsPath", "/tmp/badRecordsPath")
              .schema("Country String, Rank Integer")
              .csv("/tmp/inputFile.csv")

df.show()

When we run the above command , there are two things we should note - The outFile and the data in the outFile (the outFile is a JSON file). Points to note -

  • We have two correct records - "France ,1", "Canada ,2" . The df.show() will show only these records
  • The other record which is a bad record or corrupt record ("Netherlands,Netherlands") as per the schema, will be re-directed to the Exception file - outFile.json.
    • The exception file is located in /tmp/badRecordsPath as defined by "badrecordsPath" variable.
    • The exception file contains the bad record, the path of the file containing the record, and the exception/reason message.
    • We can use a JSON reader to process the exception file.
 

Option 2 - Using Permissive Mode:

In this option , Spark will load & process both the correct record as well as the corrupted\bad records i.e. Spark is "Permissive" even about the non-correct records. But the results , corresponding to the, "Permitted" bad or corrupted records will not be accurate and Spark will process these in a non-traditional way (since Spark is not able to Parse these records but still needs to process these). Hence you might see inaccurate results like Null etc. for such records. Let's see an example -


//Consider an input csv file with below data
Country, Rank
France,1
Canada,2
Netherlands,Netherlands

val df = spark.read
         .option("mode", "PERMISSIVE")
         .schema("Country String, Rank Integer")
         .csv("/tmp/inputFile.csv")

df.show()

Spark will not correctly process the second record since it contains corrupted data "baddata" instead of an Integer . But Spark will still process the data.  

Option 3 - Using Dropmalformed Mode:

Spark completely ignores the bad or corrupted record when you use "Dropmalformed" mode. In this case , whenever Spark encounters non-parsable record , it simply excludes such records and continues processing from the next record. Let's see an example -


//Consider an input csv file with below data
Country, Rank
France,1
Canada,2
Netherlands,Netherlands

val df = spark.read
         .option("mode", "DROPMALFORMED")
         .schema("Country String, Rank Integer")
         .csv("/tmp/inputFile.csv")

df.show()

 

Option 4 - Using Failfast Mode:

If you expect the all data to be Mandatory and Correct and it is not Allowed to skip or re-direct any bad or corrupt records or in other words , the Spark job has to throw Exception even in case of a Single corrupt record , then we can use Failfast mode. In this mode, Spark throws and exception and halts the data loading process when it finds any bad or corrupted records. Let's see an example -


//Consider an input csv file with below data
Country, Rank
France,1
Canada,2
Netherlands,Netherlands

val df = spark.read
         .option("mode", "FAILFAST")
         .schema("Country String, Rank Integer")
         .csv("/tmp/inputFile.csv")

df.show()

 

Option 5 - Using columnNameOfCorruptRecord :

When using columnNameOfCorruptRecord option , Spark will implicitly create the column before dropping it during parsing. If you want to retain the column, you have to explicitly add it to the schema. Let's see an example -


//Consider an input csv file with below data
Country, Rank
France,1
Canada,2
Netherlands,Netherlands

dataSchema = "Country String, Rank Integer, CORRUPTED String"

df = spark.read.csv('/tmp/inputFile.csv', header=True, schema=dataSchema, enforceSchema=True, 
columnNameOfCorruptRecord='CORRUPTED')

print(df.show())

When you run the above snippet -

  • You can see the Corrupted records in the "CORRUPTED" column.
  • For the correct records , the corresponding column value will be Null.
  Hope this post helps. If you liked this post , share it.  

Other Interesting Reads -

 

How to Handle Bad or Corrupt records in Apache Spark, how to handle bad records in pyspark, spark skip bad records, spark dataframe exception handling, spark exception handling, spark corrupt record csv, spark ignore missing files, spark dropmalformed, spark ignore corrupt files, databricks exception handling, spark dataframe exception handling, spark corrupt record, spark corrupt record csv, spark ignore corrupt files, spark skip bad records, spark badrecordspath not working, spark exception handling, _corrupt_record spark scala,spark handle bad data, spark handling bad records, how to handle bad records in pyspark, spark dataframe exception handling, spark read options spark skip bad records, spark exception handling, spark ignore corrupt files, _corrupt_record spark scala, spark handle invalid,spark dataframe handle null, spark replace empty string with null, spark dataframe null values, how to replace null values in spark dataframe, spark dataframe filter empty string, how to handle null values in pyspark, spark-sql check if column is null,spark csv null values, pyspark replace null with 0 in a column, spark, pyspark, Apache Spark, Scala, handle bad records,handle corrupt data
spark dataframe exception handling, pyspark error handling, spark exception handling java, common exceptions in spark, exception handling in spark streaming, spark throw exception, scala error handling, exception handling in pyspark code , apache spark error handling, org apache spark shuffle fetchfailedexception: too large frame, org.apache.spark.shuffle.fetchfailedexception: failed to allocate, spark job failure, org.apache.spark.shuffle.fetchfailedexception: failed to allocate 16777216 byte(s) of direct memory, spark dataframe exception handling, spark error handling, spark errors, spark common errors