# transaction dataset schema transaction_schema = StructType([ StructField('msno', StringType()), StructField('payment_method_id', IntegerType()), StructField('payment_plan_days', IntegerType()), StructField('plan_list_price', IntegerType()), StructField('actual_amount_paid', IntegerType()), StructField('is_auto_renew', IntegerType()), StructField('transaction_date', DateType()), StructField('membership_expire_date', DateType()), StructField('is_cancel', IntegerType()) ]) # read data from csv transactions = ( spark .read .csv( '/mnt/kkbox/transactions', schema=transaction_schema, header=True, dateFormat='yyyyMMdd' ) ) # persist in delta lake format ( transactions .write .format('delta') .partitionBy('transaction_date') .mode('overwrite') .save('/mnt/kkbox/silver/transactions') ) # create table object to make delta lake queriable spark.sql(''' CREATE TABLE kkbox.transactions USING DELTA LOCATION '/mnt/kkbox/silver/transactions' ''')
Out[9]: DataFrame[]
# members dataset schema member_schema = StructType([ StructField('msno', StringType()), StructField('city', IntegerType()), StructField('bd', IntegerType()), StructField('gender', StringType()), StructField('registered_via', IntegerType()), StructField('registration_init_time', DateType()) ]) # read data from csv members = ( spark .read .csv( '/mnt/kkbox/members/members_v3.csv', schema=member_schema, header=True, dateFormat='yyyyMMdd' ) ) # persist in delta lake format ( members .write .format('delta') .mode('overwrite') .save('/mnt/kkbox/silver/members') ) # create table object to make delta lake queriable spark.sql(''' CREATE TABLE kkbox.members USING DELTA LOCATION '/mnt/kkbox/silver/members' ''')
Out[10]: DataFrame[]
%sql -- drop payment_plan_days of zero or less -- and select largest exp date on a trans date SELECT a.msno, a.transaction_date as trans_at, MAX(a.membership_expire_date) as expires_at FROM kkbox.transactions a WHERE a.payment_plan_days > 0 AND a.msno='WAMleIuG124oDSSdhcZIECwLcHHHbk4or0y6gxkK7t8=' GROUP BY a.msno, a.transaction_date ORDER BY msno, trans_at, expires_at
%sql -- correct expiration dates that have been backdated SELECT x.msno, x.trans_at, CASE -- if expiration date is prior to transaction date, then expiration date = transaction date WHEN x.expires_at < x.trans_at THEN trans_at ELSE expires_at END as expires_at FROM ( SELECT a.msno, a.transaction_date as trans_at, MAX(a.membership_expire_date) as expires_at FROM kkbox.transactions a WHERE a.payment_plan_days > 0 GROUP BY a.msno, a.transaction_date ) x WHERE x.msno='WAMleIuG124oDSSdhcZIECwLcHHHbk4or0y6gxkK7t8=' ORDER BY msno, trans_at, expires_at
%sql -- add dummy transaction entries for 2017-04-01 to prevent misclassification of churn in subsequent steps SELECT x.msno, x.trans_at, CASE WHEN x.expires_at < x.trans_at THEN trans_at ELSE expires_at END as expires_at FROM ( SELECT a.msno, a.transaction_date as trans_at, MAX(a.membership_expire_date) as expires_at FROM kkbox.transactions a WHERE a.payment_plan_days > 0 GROUP BY a.msno, a.transaction_date UNION ALL SELECT DISTINCT -- dummy entries to protect churn calculations msno, TO_DATE('2017-04-01') as trans_at, TO_DATE('2017-04-01') as expires_at FROM kkbox.transactions ) x WHERE x.msno='WAMleIuG124oDSSdhcZIECwLcHHHbk4or0y6gxkK7t8=' ORDER BY msno, trans_at, expires_at
%sql -- compare membership-relevant transactions to prior transactions to identify expiration changes WITH trans AS ( -- membership event dataset (from previous section) SELECT -- ----------------------------------------------- x.msno, x.trans_at, CASE WHEN x.expires_at < x.trans_at THEN trans_at ELSE expires_at END as expires_at FROM ( SELECT a.msno, a.transaction_date as trans_at, MAX(a.membership_expire_date) as expires_at FROM kkbox.transactions a WHERE a.payment_plan_days > 0 GROUP BY a.msno, a.transaction_date UNION ALL SELECT DISTINCT msno, TO_DATE('2017-04-01') as trans_at, TO_DATE('2017-04-01') as expires_at FROM kkbox.transactions ) x ) -- ----------------------------------------------- SELECT msno, LAG(expires_at) OVER(PARTITION BY msno ORDER BY trans_at) as previous_expires_at, trans_at, expires_at, CASE -- idnetify meaningful events that change expiration date WHEN (LAG(expires_at, 1) OVER(PARTITION BY msno ORDER BY trans_at)) IS NULL THEN 1 -- new customer registration WHEN (LAG(expires_at, 1) OVER(PARTITION BY msno ORDER BY trans_at)) != expires_at THEN 1 -- change in expiration date ELSE 0 END as meaningful_event FROM trans WHERE msno='WAMleIuG124oDSSdhcZIECwLcHHHbk4or0y6gxkK7t8=' ORDER BY msno, trans_at, expires_at
%sql -- identify churn events as those there the next transaction is 30+ days from prior expiration date WITH trans AS ( SELECT x.msno, x.trans_at, CASE WHEN x.expires_at < x.trans_at THEN trans_at ELSE expires_at END as expires_at FROM ( SELECT a.msno, a.transaction_date as trans_at, MAX(a.membership_expire_date) as expires_at FROM kkbox.transactions a WHERE a.payment_plan_days > 0 GROUP BY a.msno, a.transaction_date UNION ALL SELECT DISTINCT msno, TO_DATE('2017-04-01') as trans_at, TO_DATE('2017-04-01') as expires_at FROM kkbox.transactions ) x ) SELECT msno, previous_expires_at, trans_at, expires_at, CASE -- identify churn events WHEN DATEDIFF(trans_at, previous_expires_at) > 30 THEN 1 ELSE 0 END as churn_event FROM ( SELECT msno, LAG(expires_at) OVER(PARTITION BY msno ORDER BY trans_at) as previous_expires_at, trans_at, expires_at, CASE WHEN (LAG(expires_at, 1) OVER(PARTITION BY msno ORDER BY trans_at)) IS NULL THEN 1 WHEN (LAG(expires_at, 1) OVER(PARTITION BY msno ORDER BY trans_at)) != expires_at THEN 1 ELSE 0 END as meaningful_event FROM trans ) WHERE meaningful_event=1 AND msno='WAMleIuG124oDSSdhcZIECwLcHHHbk4or0y6gxkK7t8=' ORDER BY msno, trans_at, expires_at
%sql -- move churn flag to prior transaction record WITH trans AS ( SELECT x.msno, x.trans_at, CASE WHEN x.expires_at < x.trans_at THEN trans_at ELSE expires_at END as expires_at FROM ( SELECT a.msno, a.transaction_date as trans_at, MAX(a.membership_expire_date) as expires_at FROM kkbox.transactions a WHERE a.payment_plan_days > 0 GROUP BY a.msno, a.transaction_date UNION ALL SELECT DISTINCT msno, TO_DATE('2017-04-01') as trans_at, TO_DATE('2017-04-01') as expires_at FROM kkbox.transactions ) x ) SELECT msno, previous_expires_at, trans_at, expires_at, LEAD(churn_event, 1) OVER (PARTITION BY msno ORDER BY trans_at) as churned -- adjust churn flag assignment FROM ( SELECT msno, previous_expires_at, trans_at, expires_at, CASE WHEN DATEDIFF(trans_at, previous_expires_at) > 30 THEN 1 ELSE 0 END as churn_event FROM ( SELECT msno, LAG(expires_at) OVER(PARTITION BY msno ORDER BY trans_at) as previous_expires_at, trans_at, expires_at, CASE WHEN (LAG(expires_at, 1) OVER(PARTITION BY msno ORDER BY trans_at)) IS NULL THEN 1 WHEN (LAG(expires_at, 1) OVER(PARTITION BY msno ORDER BY trans_at)) != expires_at THEN 1 ELSE 0 END as meaningful_event FROM trans ) WHERE meaningful_event=1 ) WHERE msno='WAMleIuG124oDSSdhcZIECwLcHHHbk4or0y6gxkK7t8=' ORDER BY msno, trans_at, expires_at
%sql -- remove the dummy transactions added earlier WITH trans AS ( SELECT x.msno, x.trans_at, CASE WHEN x.expires_at < x.trans_at THEN trans_at ELSE expires_at END as expires_at FROM ( SELECT a.msno, a.transaction_date as trans_at, MAX(a.membership_expire_date) as expires_at FROM kkbox.transactions a WHERE a.payment_plan_days > 0 GROUP BY a.msno, a.transaction_date UNION ALL SELECT DISTINCT msno, TO_DATE('2017-04-01') as trans_at, TO_DATE('2017-04-01') as expires_at FROM kkbox.transactions ) x ) SELECT * FROM ( SELECT msno, previous_expires_at, trans_at, expires_at, LEAD(churn_event, 1) OVER (PARTITION BY msno ORDER BY trans_at) as churned FROM ( SELECT msno, previous_expires_at, trans_at, expires_at, CASE WHEN DATEDIFF(trans_at, previous_expires_at) > 30 THEN 1 ELSE 0 END as churn_event FROM ( SELECT msno, LAG(expires_at) OVER(PARTITION BY msno ORDER BY trans_at) as previous_expires_at, trans_at, expires_at, CASE WHEN (LAG(expires_at, 1) OVER(PARTITION BY msno ORDER BY trans_at)) IS NULL THEN 1 WHEN (LAG(expires_at, 1) OVER(PARTITION BY msno ORDER BY trans_at)) != expires_at THEN 1 ELSE 0 END as meaningful_event FROM trans ) WHERE meaningful_event=1 ) ) WHERE churned IS NOT NULL -- remove dummy transaction entries AND msno='WAMleIuG124oDSSdhcZIECwLcHHHbk4or0y6gxkK7t8=' ORDER BY msno, trans_at, expires_at
Our goal over the next few notebooks is to examine how two core Survival Analysis techniques can be applied to better understand patterns around customer attrition in a subscription model. In this notebook, we will prepare a publicly available dataset for analysis. This data will then serve as the basis for some exploratory analysis and modeling intended to assist us in understanding and predicting customer churn.
Last refresh: Never