Introduction
In this blog post, we'll dive into the world of SCD TYPE 1 and how we can use PySpark to make it work.
Problem Statement
In this blog post, we aim to address this challenge by exploring how PySpark, a powerful data processing library in Python, can streamline the implementation of SCD TYPE 1 updates. We'll delve into practical examples and demonstrations to illustrate how to implement SCD TYPE 1 in PySpark.
Data Preparation
We kickstart our journey with the initial CSV files for source, changed_source and target data frames, named source.csv, new_source.csv and target.csv. These files comprise essential columns as below:
CustomerID: Unique identifier for each customer.
Name: Customer Name.
Age: Customer's Age.
Gender: Customer's Gender.
Email : Customer's Email Address.
NOTE : In this practical we will perform SCD TYPE 1 on AGE and EMAIL fields, i.e. we will look for changes in these two fields only.
We will leverage Databricks Community Edition for solving the problem.
Let's begin with creating our data set and reading it into Pyspark Dataframe.
STEP 1 : Create and Store Datasets.
1. Creating source.csv:
dbutils.fs.put('/FileStore/rohit/source.csv','''
CustomerID,Name,Age,Gender,Email
101,John,35,Male,john@example.com
102,Alice,28,Female,alice@example.com
103,Bob,40,Male,bob@example.com''',True)
2. Creating target.csv:
dbutils.fs.put('/FileStore/rohit/target.csv','''
CustomerID,Name,Age,Gender,Email
101,John,35,Male,john@example.com
102,Alice,28,Female,alice@example.com
103,Bob,40,Male,bob@example.com''',True)
3. Creating new_source.csv :
Note : In new Source file Age for Alice is updated to 30 and change in email for John .
dbutils.fs.put('/FileStore/rohit/new_source.csv','''
CustomerID,Name,Age,Gender,Email
101,John,35,Male,john123@example.com
102,Alice,30,Female,alice@example.com
103,Bob,40,Male,bob@example.com''',True)
STEP 2: Read Data into PYSPARK Data Frame and view your data
1. Reading and displaying source.csv :
df_src=spark.read.option('header',True).option('inferSchema',True).csv('/FileStore/rohit/source.csv')
df_src.show()
2. Reading and displaying target.csv :
df_tgt=spark.read.option('header',True).option('inferSchema',True).csv('/FileStore/rohit/target.csv')
df_tgt.show()
Note : Initially source and target has the same data.
3. Reading and displaying new_source.csv :
Note : The is the new source coming with changed data.
new_source=spark.read.option('header',True).option('inferSchema',True).csv('/FileStore/rohit/new_source.csv')
new_source.show()
STEP 3 : SCD TYPE 1 Explanation :
SCD Type 1, or Slowly Changing Dimension Type 1, is a technique used in data warehousing to update dimension tables by overwriting existing data with new values. In other words, it replaces old data with new data without keeping a history of changes. This approach is suitable for scenarios where historical data is not needed or where data changes are infrequent and do not require tracking. SCD Type 1 simplifies data management but does not preserve historical information, making it suitable for scenarios where only the most recent data is relevant.
STEP 4 : Import Necessary Libraries :
from pyspark.sql.functions import sha2, concat_ws, col, when
STEP 5 : Hashing each row in data frames, new_source and target :
df_src_hashed=new_source.withColumn('hash_val_row',sha2(concat_ws(' ',*[col(i) for i in df_src.columns]),256))
df_tgt_hashed=df_tgt.withColumn('hash_val_row',sha2(concat_ws(' ',*[col(i) for i in df_src.columns]),256))
We are creating new data frames called df_src_hashed and df_tgt_hashed by adding a new column named 'hash_val_row' to the existing Data Frames new_source and df_tgt .
The sha2 function is used to compute the SHA-256 hash value for each row in the data frame. It takes two arguments: the first argument is the string to be hashed, and the second argument is the hash algorithm (in this case, 256 bits).
Inside sha2, concat_ws is used to concatenate the values of all columns into a single string, separated by a space. This string will be hashed to generate the hash value.
The list comprehensions *[col(i) for i in df_src.columns] and *[col(i) for i in df_tgt.columns] iterates over all columns in the Data frames, df_src and df_tgt and selects the values of each column.
Finally, the withColumn function adds a new column named 'hash_val_row' to the both the data frames, containing the SHA-256 hash value for each row.
df_src_hashed.show()
df_tgt_hashed.show()
STEP 6 : Finding changed records in the new data :
df_changed_recs=df_src_hashed.alias('a')\
.join(df_tgt_hashed.alias('b'),((col('a.CustomerID')==col('b.CustomerID')) & (col('a.hash_val_row')!=col('b.hash_val_row'))))\
.select("a.*")
The alias method is used to alias each data frame with a name ('a' for df_src_hashed and 'b' for df_tgt_hashed). This is done to avoid ambiguity when referring to columns from different data frames in the join condition.
The join operation is performed using an inner join, where rows from both data frames are combined based on a common condition. In this case, the condition specified is that the 'CustomerID' column from data frame 'a' (df_src_hashed) should match the 'CustomerID' column from data frame 'b' (df_tgt_hashed). Additionally, the condition specifies that the 'hash_val_row' column from data frame 'a' should not match the 'hash_val_row' column from data frame 'b', indicating that there has been a change in the data.
The select operation selects all columns from data frame 'a' (df_src_hashed). This ensures that the resulting data frame df_changed_recs contains only the rows from the source data frame (df_src_hashed) where changes have occurred.
df_changed_recs.show()
STEP 7 : Create a function for renaming the columns in data frame:
def columnRename(df,suffix,replace=False):
if replace:
for col_n in df.columns:
df=df.withColumnRenamed(col_n,f"{col_n.replace('_'+suffix,'')}")
else:
for col_n in df.columns:
df=df.withColumnRenamed(col_n,f"{col_n}_{suffix}")
return df
def columnRename(df, suffix, replace=False):: This line defines a function named columnRename that takes three parameters: df: The data frame whose columns are to be renamed. suffix: The suffix to be appended to the column names. replace: An optional boolean parameter (defaulting to False) indicating whether to replace the existing suffix in column names.
if replace:: This line checks if the replace parameter is True. If replace is True, it indicates that the function should replace the existing suffix in column names with the new suffix.
df = df.withColumnRenamed(col_n, f"{col_n.replace('_'+suffix, '')}"): Inside the loop, if replace is True, it renames each column by removing the existing suffix (suffix) and replacing it with an empty string. This operation is performed using the replace method of strings.
The withColumnRenamed method is used to rename the columns in the data frame. It takes two arguments: the current column name (col_n) and the new column name. The modified data frame is assigned back to the variable df.
else:: If the replace parameter is False, indicating that the function should append the suffix to the column names rather than replacing them, this block of code is executed.
df = df.withColumnRenamed(col_n, f"{col_n}_{suffix}"): Inside the else block, for each column name in the DataFrame, the function appends the specified suffix (suffix) to the column name using string concatenation.
return df: Finally, the modified data frame df is returned from the function. The data frame now has its columns renamed according to the specified suffix, either by appending it or replacing the existing suffix, based on the value of the replace parameter.
STEP 8: RENAME THE COLUMNS IN DATAFRAMES:
df_tgt_hashed=columnRename(df_tgt_hashed,'tgt')
df_changed_recs=columnRename(df_changed_recs,'src')
print(df_tgt_hashed.columns)
print(df_changed_recs.columns)
STEP 9: MERGE THE CHANGED DATA WITH TARGET DATAFRAME.
df_final=df_tgt_hashed.join(df_changed_recs,(col('CustomerID_tgt')==col('CustomerID_src')),'left')
display(df_final)
This line of code performs a left join operation between two DataFrames: df_tgt_hashed and df_changed_recs.
In this case, the condition is (col('CustomerID_tgt') == col('CustomerID_src')), which compares the 'CustomerID_tgt' column from df_tgt_hashed with the 'CustomerID_src' column from df_changed_recs.
The third argument 'left' indicates that it's a left join, meaning that all rows from the left DataFrame (df_tgt_hashed) will be included in the result, regardless of whether there is a matching row in the right DataFrame (df_changed_recs).
The result of this join operation is assigned to the DataFrame df_final.
STEP 10: FINAL RESULT :
scd_type1_cols=['Age','Email']
def scd_type1_implement(df_final):
for cols in scd_type1_cols:
df_final=df_final.withColumn(f"{cols}_tgt",when(col(f'{cols}_src').isNotNull(),col(f'{cols}_src')).otherwise(col(f"{cols}_tgt")))
return df_final.select(*[i for i in df_final.columns if ('tgt' in i) and ('hash' not in i)])
df_final=scd_type1_implement(df_final)
df_final=columnRename(df_final,'tgt',True)
display(df_final)
scd_type1_cols = ['Age', 'Email']: This line defines a list scd_type1_cols containing the names of columns to be considered for Slowly Changing Dimension (SCD) Type 1 updates.
def scd_type1_implement(df_final):: This line defines a function scd_type1_implement that takes a DataFrame df_final as input. This function implements the logic for SCD Type 1 updates.
for cols in scd_type1_cols:: This line iterates over each column specified in the scd_type1_cols list.
df_final = df_final.withColumn(f"{cols}_tgt", when(col(f'{cols}_src').isNotNull(), col(f'{cols}_src')).otherwise(col(f"{cols}_tgt"))): Inside the loop, for each column, it updates columns(AGE or EMAIL) in tgt from src, if the column is not null in src.
If the corresponding source column (e.g., 'Age_src' or 'Email_src') is not null, the value from the source column is updated to the target column ('Age_tgt' or 'Email_tgt'). Otherwise, the value from the existing target column is retained.
return df_final.select(*[i for i in df_final.columns if ('tgt' in i) and ('hash' not in i)]): After updating the target columns, this line selects only the target columns (*_tgt) from the DataFrame df_final and excludes columns containing 'hash' in their names.
df_final = scd_type1_implement(df_final): This line calls the scd_type1_implement function to apply SCD Type 1 logic to the DataFrame df_final.
df_final = columnRename(df_final, 'tgt', True): This line calls the columnRename function to rename the target columns in the DataFrame df_final, replacing the existing '_tgt' suffix with an empty string (i.e., removing the suffix).
df_final.show()
The target Data frame is updated with the changes.
Thankyou, Please share your solution in the comment section below.
Comments