Introduction
Slowly Changing Dimension (SCD) Type 2 is a data management technique used in data warehousing to track historical changes in dimension tables. Unlike SCD Type 1, which overwrites old data, SCD Type 2 preserves the history of changes by creating multiple records for the same natural key (e.g., customer ID). Each record represents a different version of the data, with indicators of when each version was valid.
Problem Statement
In this blog post, we aim to tackle the challenge of tracking historical data changes by exploring how PySpark, a robust data processing library in Python, can simplify the implementation of SCD TYPE 2. We’ll walk through practical example and demonstration to showcase how to effectively implement SCD TYPE 2 in PySpark, preserving historical data while maintaining current records.
Data Preparation
We kickstart our journey with the initial CSV files for source and target data frames, named source_data.csv and existing_data.csv.
The columns in the file existing_data.csv or the target table are listed below.
id: A unique identifier for each individual.
name: The name of the individual.
course: The course in which the individual is enrolled.
start_date: The date when the individual started the course.
end_date: The date when the course enrollment ended. The value 9999-12-31Â indicates that the course is still active.
The columns in the file source_data.csv or the source table are listed below.
id: A unique identifier for each individual.
name: The name of the individual.
course: The course in which the individual is enrolled.
We will leverage Databricks Community Edition for solving the problem.
Let's begin with creating our data set and reading it into Pyspark Dataframe.
STEP 1 : Create and Store Datasets.
1. Creating source_data.csv:
dbutils.fs.put("/FileStore/leet_code_csvs/source_data.csv",'''
id,name,course
1,Rohit,Data Science
3,Mansi,Commerce
4,Muskan,CS
6,Sahil,Electronics''',True)
2. Creating existing_data.csv:
dbutils.fs.put("/FileStore/leet_code_csvs/existing_data.csv",'''
id,name,course,start_date,end_date
1,Rohit,Data Eng,2022-01-01,9999-12-31
2,Ravi,English,2022-01-01,9999-12-31
3,Mansi,Hindi,2022-01-01,9999-12-31
4,Vanshu,Math,2022-01-01,2022-01-05
4,Vanshu,Physics,2022-01-01,9999-12-31''',True)
STEP 2: Read Data into PYSPARK Data Frame and view your data
df_exist=spark.read.option("header",True).option('inferSchema',True).csv("/FileStore/leet_code_csvs/existing_data.csv")
df_exist.show()
df_source=spark.read.option("header",True).option('inferSchema',True).csv("/FileStore/leet_code_csvs/source_data.csv")
df_source.show()
STEP 3: Write both the data frames to a delta table.
df_exist.write.mode('overwrite').option('header','true').saveAsTable('target')
df_source.write.mode('overwrite').option('header','true').saveAsTable('source')
STEP 4: Read history and current version of records in two different data frames for further processing.
df_target=spark.sql('''select * from target where end_date='9999-12-31' ''')
df_hist_recs=spark.sql('''select * from target where end_date!='9999-12-31' ''')
df_source=spark.sql('''select * from source ''')
df_target.show()
df_hist_recs.show()
Note :: In the df_target, we are loading only the current version of each record for processing, excluding any historical records.
History version of each records we are loading into data frame df_hist_recs.
STEP 5: Now we will perform outer join between df_source and df_target, to get all new and updated records
from pyspark.sql.functions import col,current_date,lit,when
df_outer=df_target.alias('target').join(df_source.alias('source'),['id'],'outer')
df_outer.show()
Note :
The records which are matched at source and target are the updated records (Blue colored ones)
The records null at target and not null at source are the newly inserted records.(Green colored ones)
The records null at source and not null at target are the records which has no change.(Yellow colored ones)
STEP 5: Keep the records which has no change in separate data frame and drop it from df_outer for further processing.
The records null at source side are the records which has no change. We will use PySpark inbuilt function coalesce to identify nulls at source side.
Note :: source columns are aliased which source. So here we are identifying the rows where all columns of source are null using coalesce function.
We are loading the records with no change in df_no_change and droping it from df_outer, so that df_outer has new and updated records only.
from pyspark.sql.functions import coalesce
df_no_change=df_outer.withColumn('has_null',coalesce(*[f'source.{col}' for col in df_source.columns if col!='id'])).filter(col('has_null').isNull()).drop('has_null')
df_outer=df_outer.exceptAll(df_no_change)
df_no_change=df_no_change.select('target.*')
df_no_change.show()
df_outer.show()
STEP 6: Update the end date to yesterday's date for updated records.
The records which are matched or do not have nulls at both source and target side, are the updated records (Blue colored ones) and we need to update the end date to yesterday's date.
df_updated_recs_hist=df_outer.withColumn("end_date",when((col('source.name').isNotNull()) & (col('target.name').isNotNull()),current_date()-lit(1)).otherwise(col('end_date'))).select('id',"target.*",'end_date').na.drop()
df_updated_recs_hist.show()
We have updated the end date to yesterday's date and selected end_date and all the columns from target table aliased with target.
This records we are loading into df_updated_recs_hist, to keep the history.
STEP 7: Selecting new version of the updated records and set the start and end date.
The records which are matched or do not have nulls at both source and target side, are the updated records (Blue colored ones) and we will select new values from the source column aliased with source.
We will also update the start date to today's date and end date to any infinite date as '9999-12-31'
df_updated_recs=df_outer.filter((col('source.name').isNotNull()) & (col('target.name').isNotNull())).select('id',"source.*").withColumn('start_date',current_date()).withColumn('end_date',lit('9999-12-31').cast('date'))
df_updated_recs.show()
STEP 8: Insert the record which are not there in target but there at source side.
The records null at target and not null at source are the newly inserted records.(Green colored ones).
We will use same coalesce function to identify nulls at target side. Also update the start date to today's date and end date to any infinite date as '9999-12-31'
df_inserted=df_outer.withColumn('has_nulls',coalesce(*[f'target.{col}' for col in df_target.columns if col!='id'])).filter(col('has_nulls').isNull()).select('id','source.*').withColumn('start_date',current_date()).withColumn('end_date',lit('9999-12-31').cast('date'))
df_inserted.show()
STEP 9: Now we will merge all the intermediate data frames to create a final data frame having all history and new version of records.
We will merge all the intermediate data frames using PySpark union function.
df_final=df_updated_recs_hist.union(df_updated_recs).union(df_inserted).union(df_no_change).union(df_hist_recs)
df_updated_recs_hist : This data frame has the history version of the records which are there in both target and source side with updated start and end date.
df_updated_recs : This data frame has the new version of the updated records which are there in both target and source side with updated start and end date.
df_inserted : This data frame has the new records coming in the source with updated start and end date.
df_no_change : This data frame has the records which has no change and still the current version of the record.
df_hist_recs : This data frame has the records which are history.
Finally writing it to target delta table
df_final.write.mode('overwrite').saveAsTable('target')
spark.sql('select * from target').show()
Thankyou, Please share your solution in the comment section below.
Comments