# Build Slowly Changing Dimensions Type 2 (SCD2) with Apache Spark and Apache Hudi on Amazon EMR

## Prerequisites


1. An  [AWS account](https://aws.amazon.com/premiumsupport/knowledge-center/create-and-activate-aws-account/) 
2. To be able to run the code sample,  [create an EMR cluster](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-setting-up.html)  with  [EMR notebook](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-managed-notebooks-create.html) . It is important to use the EMR version greate than 5.28.
3. To enable Apache Hudi on EMR, run the setup as described in the documentation  [How to use Hudi with Amazon EMR Notebooks](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-hudi-work-with-dataset.html) 
4. [Create S3 bucket](https://docs.aws.amazon.com/AmazonS3/latest/user-guide/create-bucket.html)  where Hudi files will be stored



## Enable Hudi libraries

In [1]:
import subprocess
some_path = '/apps/hudi/lib'
subprocess.call(["hdfs", "dfs", "-mkdir", "-p", some_path])
subprocess.call(["hdfs", "dfs", "-copyFromLocal", "/usr/lib/hudi/hudi-spark-bundle.jar", "/apps/hudi/lib/hudi-spark-bundle.jar"])
subprocess.call(["hdfs", "dfs", "-chmod", "755", "/apps/hudi/lib/hudi-spark-bundle.jar"])

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
0,application_1615560907650_0002,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

0

In [2]:
%%configure -f
{ "conf": {
            "spark.jars":"hdfs:///apps/hudi/lib/hudi-spark-bundle.jar",
            "spark.serializer":"org.apache.spark.serializer.KryoSerializer",
            "spark.sql.hive.convertMetastoreParquet":"false"
          }}

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
1,application_1615560907650_0003,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
1,application_1615560907650_0003,pyspark,idle,Link,Link,✔


In [3]:
from pyspark.sql.types import (
    StringType,
    StructField,
    StructType,
    IntegerType,
    DoubleType,
    DateType,
    BooleanType,
    TimestampType
)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Define customer schema

In [4]:
dim_customer_schema = StructType([
        StructField('customer_id', StringType(), False),
        StructField('first_name', StringType(), True),
        StructField('last_name', StringType(), True),
        StructField('city', StringType(), True),
        StructField('country', StringType(), True),
        StructField('eff_start_date', DateType(), True),
        StructField('eff_end_date', DateType(), True),
        StructField('timestamp', TimestampType(), True),
        StructField('is_current', BooleanType(), True),
    ])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Create customer records

For the demo purpose only I will use a current timestamp as a surrogate key for the Customer records

In [5]:
from pyspark.sql.functions import udf
import time

random_udf = udf(lambda: str(int(time.time() * 1000000)), StringType()) 

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [6]:
from datetime import datetime

customer_dim_df = spark.createDataFrame([('1', 'John', 'Smith', 'London', 'UK', datetime.strptime('2020-09-27', '%Y-%m-%d'), datetime.strptime('2999-12-31', '%Y-%m-%d'), datetime.strptime('2020-12-08 09:15:32', '%Y-%m-%d %H:%M:%S'), True),
                       ('2', 'Susan', 'Chas', 'Seattle', 'US', datetime.strptime('2020-10-14', '%Y-%m-%d'), datetime.strptime('2999-12-31', '%Y-%m-%d'), datetime.strptime('2020-12-08 09:15:32', '%Y-%m-%d %H:%M:%S'), True)], dim_customer_schema)

customer_hudi_df = customer_dim_df.withColumn("customer_dim_key", random_udf())

customer_hudi_df.cache()

customer_hudi_df.show(5, False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------+----------+---------+-------+-------+--------------+------------+-------------------+----------+----------------+
|customer_id|first_name|last_name|city   |country|eff_start_date|eff_end_date|timestamp          |is_current|customer_dim_key|
+-----------+----------+---------+-------+-------+--------------+------------+-------------------+----------+----------------+
|1          |John      |Smith    |London |UK     |2020-09-27    |2999-12-31  |2020-12-08 09:15:32|true      |1615561177547777|
|2          |Susan     |Chas     |Seattle|US     |2020-10-14    |2999-12-31  |2020-12-08 09:15:32|true      |1615561178046248|
+-----------+----------+---------+-------+-------+--------------+------------+-------------------+----------+----------------+

## Store customer records

In [7]:
customers_table_path = 's3://MY-BUCKET/customers/'
customers_table_name = 'customers_table'

partition_key = "country:SIMPLE"

hudi_options = {'hoodie.insert.shuffle.parallelism':'2',
               'hoodie.upsert.shuffle.parallelism':'2',
               'hoodie.delete.shuffle.parallelism':'2',
               'hoodie.bulkinsert.shuffle.parallelism':'2',
               'hoodie.datasource.hive_sync.enable':'false',
               'hoodie.datasource.hive_sync.assume_date_partitioning':'true'
}

customer_hudi_df.write.format('org.apache.hudi')\
                             .options(**hudi_options)\
                             .option('hoodie.table.name',customers_table_name)\
                             .option('hoodie.datasource.write.recordkey.field','customer_dim_key')\
                             .option('hoodie.datasource.write.partitionpath.field',partition_key)\
                             .option('hoodie.datasource.write.precombine.field','timestamp')\
                             .option('hoodie.datasource.write.operation', 'insert')\
                             .option('hoodie.datasource.hive_sync.table',customers_table_name)\
                             .mode('Overwrite')\
                             .save(customers_table_path)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Read customers Hudi table

In [8]:
customer_hudi_df = spark. \
  read. \
  format("hudi"). \
  load(customers_table_path + 'default/')

customer_hudi_df.createOrReplaceTempView(customers_table_name)

spark.sql('select customer_id,'
          'first_name, '
          'last_name, '
          'city, '
          'country, '
          'eff_start_date, '
          'eff_end_date, '
          'customer_dim_key, '
          'is_current '
          'from ' + customers_table_name).show(3, False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------+----------+---------+-------+-------+--------------+------------+----------------+----------+
|customer_id|first_name|last_name|city   |country|eff_start_date|eff_end_date|customer_dim_key|is_current|
+-----------+----------+---------+-------+-------+--------------+------------+----------------+----------+
|1          |John      |Smith    |London |UK     |2020-09-27    |2999-12-31  |1615561177547777|true      |
|2          |Susan     |Chas     |Seattle|US     |2020-10-14    |2999-12-31  |1615561178046248|true      |
+-----------+----------+---------+-------+-------+--------------+------------+----------------+----------+

## Create sales

In [9]:
from pyspark.sql.functions import to_timestamp

fact_sales_schema = StructType([
        StructField('item_id', StringType(), True),
        StructField('quantity', IntegerType(), True),
        StructField('price', DoubleType(), True),
        StructField('timestamp', TimestampType(), True),
        StructField('customer_id', StringType(), True)
    ])

sales_fact_df = spark.createDataFrame([('100', 25, 123.46, datetime.strptime('2020-11-17 09:15:32', '%Y-%m-%d %H:%M:%S'), '1'),
                                       ('101', 300, 123.46, datetime.strptime('2020-10-28 09:15:32', '%Y-%m-%d %H:%M:%S'), '1'),
                                      ('102', 5, 1038.0, datetime.strptime('2020-12-08 09:15:32', '%Y-%m-%d %H:%M:%S'), '2')], fact_sales_schema)

sales_fact_df.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+--------+------+-------------------+-----------+
|item_id|quantity| price|          timestamp|customer_id|
+-------+--------+------+-------------------+-----------+
|    100|      25|123.46|2020-11-17 09:15:32|          1|
|    101|     300|123.46|2020-10-28 09:15:32|          1|
|    102|       5|1038.0|2020-12-08 09:15:32|          2|
+-------+--------+------+-------------------+-----------+

## Customer dimension key lookup



In [10]:
from pyspark.sql.functions import when

join_cond = [sales_fact_df.customer_id == customer_hudi_df.customer_id,
             sales_fact_df.timestamp >= customer_hudi_df.eff_start_date,
             sales_fact_df.timestamp < customer_hudi_df.eff_end_date]

customers_dim_key_df = (sales_fact_df
                          .join(customer_hudi_df, join_cond, 'leftouter')
                          .select(sales_fact_df['*'],
                            when(customer_hudi_df.customer_dim_key.isNull(), '-1')
                                  .otherwise(customer_hudi_df.customer_dim_key)
                                  .alias("customer_dim_key") )
                       )

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [11]:
customers_dim_key_df.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+--------+------+-------------------+-----------+----------------+
|item_id|quantity| price|          timestamp|customer_id|customer_dim_key|
+-------+--------+------+-------------------+-----------+----------------+
|    100|      25|123.46|2020-11-17 09:15:32|          1|1615561177547777|
|    101|     300|123.46|2020-10-28 09:15:32|          1|1615561177547777|
|    102|       5|1038.0|2020-12-08 09:15:32|          2|1615561178046248|
+-------+--------+------+-------------------+-----------+----------------+

## Save sales

In [12]:
sales_table_path = 's3://MY-BUCKET/sales/'
sales_table_name = 'sales_table'

customers_dim_key_df.write.format('parquet')\
                             .mode('Overwrite')\
                             .save(sales_table_path)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Read sales table

In [13]:
sales_hudi_df = spark. \
  read. \
  load(sales_table_path)

sales_hudi_df.createOrReplaceTempView(sales_table_name)

spark.sql('select item_id, '
          'quantity,'
          'price,'
          'timestamp,'
          'customer_id,'
          'customer_dim_key from ' + sales_table_name).show(5, False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+--------+------+-------------------+-----------+----------------+
|item_id|quantity|price |timestamp          |customer_id|customer_dim_key|
+-------+--------+------+-------------------+-----------+----------------+
|100    |25      |123.46|2020-11-17 09:15:32|1          |1615561177547777|
|101    |300     |123.46|2020-10-28 09:15:32|1          |1615561177547777|
|102    |5       |1038.0|2020-12-08 09:15:32|2          |1615561178046248|
+-------+--------+------+-------------------+-----------+----------------+

## Get number of sales per country

In [14]:
spark.sql(
    'SELECT ct.country, '
    'SUM(st.quantity) as sales_quantity,'
    'COUNT(*) as count_sales '
    'FROM sales_table st '
    'INNER JOIN customers_table ct on st.customer_dim_key = ct.customer_dim_key '
    'group by ct.country').show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+--------------+-----------+
|country|sales_quantity|count_sales|
+-------+--------------+-----------+
|     US|             5|          1|
|     UK|           325|          2|
+-------+--------------+-----------+

## Customer Susan changed the country from US to FR, new customer Bastian created

In [15]:
new_customer_dim_df = spark.createDataFrame([('3', 'Bastian', 'Back', 
                    'Berlin', 'GE',
                    datetime.strptime(datetime.today().strftime('%Y-%m-%d'), '%Y-%m-%d'),
                    datetime.strptime('2999-12-31', '%Y-%m-%d'), 
                    datetime.strptime('2020-12-09 09:15:32', '%Y-%m-%d %H:%M:%S'), True),
                    ('2', 'Susan', 'Chas',
                    'Paris', 'FR',
                    datetime.strptime(datetime.today().strftime('%Y-%m-%d'), '%Y-%m-%d'),
                    datetime.strptime('2999-12-31', '%Y-%m-%d'), 
                    datetime.strptime('2020-12-09 10:15:32', '%Y-%m-%d %H:%M:%S'), True)],
                dim_customer_schema)

new_customer_dim_df = new_customer_dim_df.withColumn("customer_dim_key", random_udf())

new_customer_dim_df.cache()

new_customer_dim_df.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------+----------+---------+------+-------+--------------+------------+-------------------+----------+----------------+
|customer_id|first_name|last_name|  city|country|eff_start_date|eff_end_date|          timestamp|is_current|customer_dim_key|
+-----------+----------+---------+------+-------+--------------+------------+-------------------+----------+----------------+
|          3|   Bastian|     Back|Berlin|     GE|    2021-03-12|  2999-12-31|2020-12-09 09:15:32|      true|1615561211194211|
|          2|     Susan|     Chas| Paris|     FR|    2021-03-12|  2999-12-31|2020-12-09 10:15:32|      true|1615561211350200|
+-----------+----------+---------+------+-------+--------------+------------+-------------------+----------+----------------+

## Customers UPSERT

In [16]:
from pyspark.sql.functions import lit

join_cond = [customer_hudi_df.customer_id == new_customer_dim_df.customer_id, customer_hudi_df.is_current == True]

## Find customer records to update
customers_to_update_df = (customer_hudi_df
                          .join(new_customer_dim_df, join_cond)
                          .select(customer_hudi_df.customer_id,
                                  customer_hudi_df.first_name,
                                  customer_hudi_df.last_name,
                                  customer_hudi_df.city,
                                  customer_hudi_df.country,
                                  customer_hudi_df.eff_start_date,
                                  new_customer_dim_df.eff_start_date.alias("eff_end_date"),
                                  customer_hudi_df.customer_dim_key,
                                  customer_hudi_df.timestamp)
                          .withColumn('is_current', lit(False))
                         )


## Union with new customer records
merged_customers_df = new_customer_dim_df.unionByName(customers_to_update_df)

partition_key = "country:SIMPLE"

# Upsert
merged_customers_df.write.format('org.apache.hudi')\
                    .options(**hudi_options)\
                    .option('hoodie.table.name',customers_table_name)\
                    .option('hoodie.datasource.write.recordkey.field','customer_dim_key')\
                    .option('hoodie.datasource.write.partitionpath.field',partition_key)\
                    .option('hoodie.datasource.write.precombine.field','timestamp')\
                    .option('hoodie.datasource.write.operation', 'upsert')\
                    .option('hoodie.datasource.hive_sync.table',customers_table_name)\
                    .mode('append')\
                    .save(customers_table_path)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Read customers Hudi table

In [17]:
customer_hudi_df = spark. \
  read. \
  format("hudi"). \
  load(customers_table_path + 'default/')

customer_hudi_df.createOrReplaceTempView(customers_table_name)

spark.sql('select customer_id,'
          'first_name, '
          'last_name, '
          'city, '
          'country, '
          'eff_start_date, '
          'eff_end_date, '
          'customer_dim_key, '
          'is_current '
          'from ' + customers_table_name).show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------+----------+---------+-------+-------+--------------+------------+----------------+----------+
|customer_id|first_name|last_name|   city|country|eff_start_date|eff_end_date|customer_dim_key|is_current|
+-----------+----------+---------+-------+-------+--------------+------------+----------------+----------+
|          1|      John|    Smith| London|     UK|    2020-09-27|  2999-12-31|1615561177547777|      true|
|          2|     Susan|     Chas|Seattle|     US|    2020-10-14|  2021-03-12|1615561178046248|     false|
|          2|     Susan|     Chas|  Paris|     FR|    2021-03-12|  2999-12-31|1615561211350200|      true|
|          3|   Bastian|     Back| Berlin|     GE|    2021-03-12|  2999-12-31|1615561211194211|      true|
+-----------+----------+---------+-------+-------+--------------+------------+----------------+----------+

## Add new sales for Susan

In [18]:
sales_fact_df = spark.createDataFrame([('103', 250, 12.3, datetime.strptime(datetime.today().strftime('%Y-%m-%d')+' 12:15:42', '%Y-%m-%d %H:%M:%S'), '2'),
                                       ('104', 3, 1021.0, datetime.strptime(datetime.today().strftime('%Y-%m-%d')+' 06:35:32', '%Y-%m-%d %H:%M:%S'), '2')], fact_sales_schema)

sales_fact_df.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+--------+------+-------------------+-----------+
|item_id|quantity| price|          timestamp|customer_id|
+-------+--------+------+-------------------+-----------+
|    103|     250|  12.3|2021-03-12 12:15:42|          2|
|    104|       3|1021.0|2021-03-12 06:35:32|          2|
+-------+--------+------+-------------------+-----------+

## Customer dimention key lookup


In [19]:
from pyspark.sql.functions import when

join_cond = [sales_fact_df.customer_id == customer_hudi_df.customer_id, sales_fact_df.timestamp >= customer_hudi_df.eff_start_date, sales_fact_df.timestamp < customer_hudi_df.eff_end_date]


customers_dim_key_df = (sales_fact_df
                          .join(customer_hudi_df, join_cond, 'leftouter')
                          .select(sales_fact_df['*'],
                            when(customer_hudi_df.customer_dim_key.isNull(), '-1').otherwise(customer_hudi_df.customer_dim_key).alias("customer_dim_key") )
                         )

customers_dim_key_df.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+--------+------+-------------------+-----------+----------------+
|item_id|quantity| price|          timestamp|customer_id|customer_dim_key|
+-------+--------+------+-------------------+-----------+----------------+
|    103|     250|  12.3|2021-03-12 12:15:42|          2|1615561211350200|
|    104|       3|1021.0|2021-03-12 06:35:32|          2|1615561211350200|
+-------+--------+------+-------------------+-----------+----------------+

## Store new sales

In [20]:
sales_table_path = 's3://MY-BUCKET/sales/'
sales_table_name = 'sales_table'

customers_dim_key_df.write.format('parquet')\
                             .mode('Append')\
                             .save(sales_table_path)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Read sales table

In [21]:
sales_hudi_df = spark. \
  read. \
  load(sales_table_path)

sales_hudi_df.createOrReplaceTempView(sales_table_name)

spark.sql("select item_id, quantity, price, timestamp, customer_id, customer_dim_key from " + sales_table_name).show(5, False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+--------+------+-------------------+-----------+----------------+
|item_id|quantity|price |timestamp          |customer_id|customer_dim_key|
+-------+--------+------+-------------------+-----------+----------------+
|100    |25      |123.46|2020-11-17 09:15:32|1          |1615561177547777|
|103    |250     |12.3  |2021-03-12 12:15:42|2          |1615561211350200|
|101    |300     |123.46|2020-10-28 09:15:32|1          |1615561177547777|
|102    |5       |1038.0|2020-12-08 09:15:32|2          |1615561178046248|
|104    |3       |1021.0|2021-03-12 06:35:32|2          |1615561211350200|
+-------+--------+------+-------------------+-----------+----------------+

## Get number of sales per country

In [22]:
spark.sql(
    'SELECT ct.country, SUM(st.quantity) as sales_quantity, COUNT(*) as count_sales '
    'FROM sales_table st '
    'INNER JOIN customers_table ct on st.customer_dim_key = ct.customer_dim_key group by ct.country').show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+--------------+-----------+
|country|sales_quantity|count_sales|
+-------+--------------+-----------+
|     US|             5|          1|
|     UK|           325|          2|
|     FR|           253|          2|
+-------+--------------+-----------+