# функция для создания пользовательских профилей def get_profiles(sessions, orders, ad_costs): # находим параметры первых посещений profiles = ( sessions.sort_values(by=['user_id', 'session_start']) .groupby('user_id') .agg( { 'session_start': 'first', 'channel': 'first', 'device': 'first', 'region': 'first', } ) .rename(columns={'session_start': 'first_ts'}) .reset_index() ) # для когортного анализа определяем дату первого посещения # и первый день месяца, в который это посещение произошло profiles['dt'] = profiles['first_ts'].dt.date profiles['dt'] = pd.to_datetime(profiles['dt'], format="%Y-%m-%d") profiles['month'] = profiles['first_ts'].astype('datetime64[M]') # добавляем признак платящих пользователей profiles['payer'] = profiles['user_id'].isin(orders['user_id'].unique()) # считаем количество уникальных пользователей # с одинаковыми источником и датой привлечения new_users = ( profiles.groupby(['dt', 'channel']) .agg({'user_id': 'nunique'}) .rename(columns={'user_id': 'unique_users'}) .reset_index() ) # объединяем траты на рекламу и число привлечённых пользователей ad_costs = ad_costs.merge(new_users, on=['dt', 'channel'], how='left') # делим рекламные расходы на число привлечённых пользователей # результаты сохраним в столбец acquisition_cost (CAC) ad_costs['acquisition_cost'] = ad_costs['costs'] / ad_costs['unique_users'] # добавляем стоимость привлечения в профили profiles = profiles.merge( ad_costs[['dt', 'channel', 'acquisition_cost']], on=['dt', 'channel'], how='left', ) # стоимость привлечения органических пользователей равна нулю profiles['acquisition_cost'] = profiles['acquisition_cost'].fillna(0) return profiles # функция для расчёта конверсии def get_conversion( profiles, purchases, observation_date, horizon_days, dimensions=[], ignore_horizon=False, ): # исключаем пользователей, не «доживших» до горизонта анализа last_suitable_acquisition_date = observation_date if not ignore_horizon: last_suitable_acquisition_date = observation_date - timedelta( days=horizon_days - 1 ) result_raw = profiles.query('dt <= @last_suitable_acquisition_date') # определяем дату и время первой покупки для каждого пользователя first_purchases = ( purchases.sort_values(by=['user_id', 'event_dt']) .groupby('user_id') .agg({'event_dt': 'first'}) .reset_index() ) # собираем «сырые» данные для расчёта удержания result_raw = result_raw.merge( first_purchases[['user_id', 'event_dt']], on='user_id', how='left' ) result_raw['lifetime'] = ( result_raw['event_dt'] - result_raw['first_ts'] ).dt.days if len(dimensions) == 0: result_raw['cohort'] = 'All users' dimensions = dimensions + ['cohort'] # функция для группировки таблицы по желаемым признакам def group_by_dimensions(df, dims, horizon_days): result = df.pivot_table( index=dims, columns='lifetime', values='user_id', aggfunc='nunique' ) result = result.fillna(0).cumsum(axis = 1) cohort_sizes = ( df.groupby(dims) .agg({'user_id': 'nunique'}) .rename(columns={'user_id': 'cohort_size'}) ) result = cohort_sizes.merge(result, on=dims, how='left').fillna(0) # делим каждую «ячейку» в строке на размер когорты # и получаем conversion rate result = result.div(result['cohort_size'], axis=0) result = result[['cohort_size'] + list(range(horizon_days))] result['cohort_size'] = cohort_sizes return result # получаем таблицу конверсии result_grouped = group_by_dimensions(result_raw, dimensions, horizon_days) if 'cohort' in dimensions: dimensions = [] # получаем таблицу динамики конверсии result_in_time = group_by_dimensions( result_raw, dimensions + ['dt'], horizon_days ) # возвращаем обе таблицы и сырые данные return result_raw, result_grouped, result_in_time def get_ltv( profiles, # Шаг 1. Получить профили и данные о покупках purchases, observation_date, horizon_days, dimensions=[], ignore_horizon=False, ): # исключаем пользователей, не «доживших» до горизонта анализа last_suitable_acquisition_date = observation_date if not ignore_horizon: last_suitable_acquisition_date = observation_date - timedelta( days=horizon_days - 1 ) result_raw = profiles.query('dt <= @last_suitable_acquisition_date') # Шаг 2. Добавить данные о покупках в профили result_raw = result_raw.merge( # добавляем в профили время совершения покупок и выручку purchases[['user_id', 'event_dt', 'revenue']], on='user_id', how='left', ) # Шаг 3. Рассчитать лайфтайм пользователя для каждой покупки result_raw['lifetime'] = ( result_raw['event_dt'] - result_raw['first_ts'] ).dt.days # группируем по cohort, если в dimensions ничего нет if len(dimensions) == 0: result_raw['cohort'] = 'All users' dimensions = dimensions + ['cohort'] # функция для группировки таблицы по желаемым признакам def group_by_dimensions(df, dims, horizon_days): # Шаг 3. Построить таблицу выручки # строим «треугольную» таблицу result = df.pivot_table( index=dims, columns='lifetime', values='revenue', # в ячейках — выручка за каждый лайфтайм aggfunc='sum', ) # Шаг 4. Посчитать сумму выручки с накоплением result = result.fillna(0).cumsum(axis=1) # Шаг 5. Вычислить размеры когорт cohort_sizes = ( df.groupby(dims) .agg({'user_id': 'nunique'}) .rename(columns={'user_id': 'cohort_size'}) ) # Шаг 6. Объединить размеры когорт и таблицу выручки result = cohort_sizes.merge(result, on=dims, how='left').fillna(0) # Шаг 7. Посчитать LTV # делим каждую «ячейку» в строке на размер когорты result = result.div(result['cohort_size'], axis=0) # исключаем все лайфтаймы, превышающие горизонт анализа result = result[['cohort_size'] + list(range(horizon_days))] # восстанавливаем размеры когорт result['cohort_size'] = cohort_sizes # сохраняем в датафрейм данные пользователей и значения CAC, # добавив параметры из dimensions cac = df[['user_id', 'acquisition_cost'] + dims].drop_duplicates() # считаем средний CAC по параметрам из dimensions cac = ( cac.groupby(dims) .agg({'acquisition_cost': 'mean'}) .rename(columns={'acquisition_cost': 'cac'}) ) # считаем ROI: делим LTV на CAC roi = result.div(cac['cac'], axis=0) # удаляем строки с бесконечным ROI roi = roi[~roi['cohort_size'].isin([np.inf])] # восстанавливаем размеры когорт в таблице ROI roi['cohort_size'] = cohort_sizes # добавляем CAC в таблицу ROI roi['cac'] = cac['cac'] # в финальной таблице оставляем размеры когорт, CAC # и ROI в лайфтаймы, не превышающие горизонт анализа roi = roi[['cohort_size', 'cac'] + list(range(horizon_days))] # возвращаем таблицы LTV и ROI return result, roi # получаем таблицы LTV и ROI result_grouped, roi_grouped = group_by_dimensions( result_raw, dimensions, horizon_days ) # для таблиц динамики убираем 'cohort' из dimensions if 'cohort' in dimensions: dimensions = [] # получаем таблицы динамики LTV и ROI result_in_time, roi_in_time = group_by_dimensions( result_raw, dimensions + ['dt'], horizon_days ) return ( result_raw, # сырые данные result_grouped, # таблица LTV result_in_time, # таблица динамики LTV roi_grouped, # таблица ROI roi_in_time, # таблица динамики ROI ) def get_retention( profiles, sessions, observation_date, horizon_days, dimensions=[], ignore_horizon=False, ): # добавляем столбец payer в передаваемый dimensions список dimensions = ['payer'] + dimensions # исключаем пользователей, не «доживших» до горизонта анализа last_suitable_acquisition_date = observation_date if not ignore_horizon: last_suitable_acquisition_date = observation_date - timedelta( days=horizon_days - 1 ) result_raw = profiles.query('dt <= @last_suitable_acquisition_date') # собираем «сырые» данные для расчёта удержания result_raw = result_raw.merge( sessions[['user_id', 'session_start']], on='user_id', how='left' ) result_raw['lifetime'] = ( result_raw['session_start'] - result_raw['first_ts'] ).dt.days # функция для группировки таблицы по желаемым признакам def group_by_dimensions(df, dims, horizon_days): result = df.pivot_table( index=dims, columns='lifetime', values='user_id', aggfunc='nunique' ) cohort_sizes = ( df.groupby(dims) .agg({'user_id': 'nunique'}) .rename(columns={'user_id': 'cohort_size'}) ) result = cohort_sizes.merge(result, on=dims, how='left').fillna(0) result = result.div(result['cohort_size'], axis=0) result = result[['cohort_size'] + list(range(horizon_days))] result['cohort_size'] = cohort_sizes return result # получаем таблицу удержания result_grouped = group_by_dimensions(result_raw, dimensions, horizon_days) # получаем таблицу динамики удержания result_in_time = group_by_dimensions( result_raw, dimensions + ['dt'], horizon_days ) # возвращаем обе таблицы и сырые данные return result_raw, result_grouped, result_in_time # функция для сглаживания фрейма def filter_data(df, window): # для каждого столбца применяем скользящее среднее for column in df.columns.values: df[column] = df[column].rolling(window).mean() return df # функция для визуализации удержания def plot_retention(retention, retention_history, horizon, window=7): # задаём размер сетки для графиков plt.figure(figsize=(15, 10)) # исключаем размеры когорт и удержание первого дня retention = retention.drop(columns=['cohort_size', 0]) # в таблице динамики оставляем только нужный лайфтайм retention_history = retention_history.drop(columns=['cohort_size'])[ [horizon - 1] ] # если в индексах таблицы удержания только payer, # добавляем второй признак — cohort if retention.index.nlevels == 1: retention['cohort'] = 'All users' retention = retention.reset_index().set_index(['cohort', 'payer']) # в таблице графиков — два столбца и две строки, четыре ячейки # в первой строим кривые удержания платящих пользователей ax1 = plt.subplot(2, 2, 1) retention.query('payer == True').droplevel('payer').T.plot( grid=True, ax=ax1 ) plt.legend() plt.xlabel('Лайфтайм') plt.title('Удержание платящих пользователей') # во второй ячейке строим кривые удержания неплатящих # вертикальная ось — от графика из первой ячейки ax2 = plt.subplot(2, 2, 2, sharey=ax1) retention.query('payer == False').droplevel('payer').T.plot( grid=True, ax=ax2 ) plt.legend() plt.xlabel('Лайфтайм') plt.title('Удержание неплатящих пользователей') # в третьей ячейке — динамика удержания платящих ax3 = plt.subplot(2, 2, 3) # получаем названия столбцов для сводной таблицы columns = [ name for name in retention_history.index.names if name not in ['dt', 'payer'] ] # фильтруем данные и строим график filtered_data = retention_history.query('payer == True').pivot_table( index='dt', columns=columns, values=horizon - 1, aggfunc='mean' ) filter_data(filtered_data, window).plot(grid=True, ax=ax3) plt.xlabel('Дата привлечения') plt.title( 'Динамика удержания платящих пользователей на {}-й день'.format( horizon ) ) # в чётвертой ячейке — динамика удержания неплатящих ax4 = plt.subplot(2, 2, 4, sharey=ax3) # фильтруем данные и строим график filtered_data = retention_history.query('payer == False').pivot_table( index='dt', columns=columns, values=horizon - 1, aggfunc='mean' ) filter_data(filtered_data, window).plot(grid=True, ax=ax4) plt.xlabel('Дата привлечения') plt.title( 'Динамика удержания неплатящих пользователей на {}-й день'.format( horizon ) ) plt.tight_layout() plt.show() # функция для визуализации конверсии def plot_conversion(conversion, conversion_history, horizon, window=7): # задаём размер сетки для графиков plt.figure(figsize=(15, 5)) # исключаем размеры когорт conversion = conversion.drop(columns=['cohort_size']) # в таблице динамики оставляем только нужный лайфтайм conversion_history = conversion_history.drop(columns=['cohort_size'])[ [horizon - 1] ] # первый график — кривые конверсии ax1 = plt.subplot(1, 2, 1) conversion.T.plot(grid=True, ax=ax1) plt.legend() plt.xlabel('Лайфтайм') plt.title('Конверсия пользователей') # второй график — динамика конверсии ax2 = plt.subplot(1, 2, 2, sharey=ax1) columns = [ # столбцами сводной таблицы станут все столбцы индекса, кроме даты name for name in conversion_history.index.names if name not in ['dt'] ] filtered_data = conversion_history.pivot_table( index='dt', columns=columns, values=horizon - 1, aggfunc='mean' ) filter_data(filtered_data, window).plot(grid=True, ax=ax2) plt.xlabel('Дата привлечения') plt.title('Динамика конверсии пользователей на {}-й день'.format(horizon)) plt.tight_layout() plt.show() # функция для визуализации LTV и ROI def plot_ltv_roi(ltv, ltv_history, roi, roi_history, horizon, window=7): # задаём сетку отрисовки графиков plt.figure(figsize=(20, 10)) # из таблицы ltv исключаем размеры когорт ltv = ltv.drop(columns=['cohort_size']) # в таблице динамики ltv оставляем только нужный лайфтайм ltv_history = ltv_history.drop(columns=['cohort_size'])[[horizon - 1]] # стоимость привлечения запишем в отдельный фрейм cac_history = roi_history[['cac']] # из таблицы roi исключаем размеры когорт и cac roi = roi.drop(columns=['cohort_size', 'cac']) # в таблице динамики roi оставляем только нужный лайфтайм roi_history = roi_history.drop(columns=['cohort_size', 'cac'])[ [horizon - 1] ] # кривые ltv ax1 = plt.subplot(2, 3, 1) ltv.T.plot(grid=True, ax=ax1) plt.legend() plt.xlabel('Лайфтайм') plt.title('LTV') # динамика ltv ax2 = plt.subplot(2, 3, 2, sharey=ax1) # столбцами сводной таблицы станут все столбцы индекса, кроме даты columns = [name for name in ltv_history.index.names if name not in ['dt']] filtered_data = ltv_history.pivot_table( index='dt', columns=columns, values=horizon - 1, aggfunc='mean' ) filter_data(filtered_data, window).plot(grid=True, ax=ax2) plt.xlabel('Дата привлечения') plt.title('Динамика LTV пользователей на {}-й день'.format(horizon)) # динамика cac ax3 = plt.subplot(2, 3, 3, sharey=ax1) # столбцами сводной таблицы станут все столбцы индекса, кроме даты columns = [name for name in cac_history.index.names if name not in ['dt']] filtered_data = cac_history.pivot_table( index='dt', columns=columns, values='cac', aggfunc='mean' ) filter_data(filtered_data, window).plot(grid=True, ax=ax3) plt.xlabel('Дата привлечения') plt.title('Динамика стоимости привлечения пользователей') # кривые roi ax4 = plt.subplot(2, 3, 4) roi.T.plot(grid=True, ax=ax4) plt.axhline(y=1, color='red', linestyle='--', label='Уровень окупаемости') plt.legend() plt.xlabel('Лайфтайм') plt.title('ROI') # динамика roi ax5 = plt.subplot(2, 3, 5, sharey=ax4) # столбцами сводной таблицы станут все столбцы индекса, кроме даты columns = [name for name in roi_history.index.names if name not in ['dt']] filtered_data = roi_history.pivot_table( index='dt', columns=columns, values=horizon - 1, aggfunc='mean' ) filter_data(filtered_data, window).plot(grid=True, ax=ax5) plt.axhline(y=1, color='red', linestyle='--', label='Уровень окупаемости') plt.xlabel('Дата привлечения') plt.title('Динамика ROI пользователей на {}-й день'.format(horizon)) plt.tight_layout() plt.show() # считаем LTV и ROI ltv_raw, ltv_grouped, ltv_history, roi_grouped, roi_history = get_ltv( profiles, orders, observation_date, horizon_days ) # строим графики plot_ltv_roi(ltv_grouped, ltv_history, roi_grouped, roi_history, horizon_days) --------------------------------------------------------------------------- KeyError Traceback (most recent call last) Input In [80], in () 1 # считаем LTV и ROI ----> 2 ltv_raw, ltv_grouped, ltv_history, roi_grouped, roi_history = get_ltv( 3 profiles, orders, observation_date, horizon_days 4 ) 6 # строим графики 7 plot_ltv_roi(ltv_grouped, ltv_history, roi_grouped, roi_history, horizon_days) Input In [63], in get_ltv(profiles, purchases, observation_date, horizon_days, dimensions, ignore_horizon) 97 return result, roi 99 # получаем таблицы LTV и ROI --> 100 result_grouped, roi_grouped = group_by_dimensions( 101 result_raw, dimensions, horizon_days 102 ) 104 # для таблиц динамики убираем 'cohort' из dimensions 105 if 'cohort' in dimensions: Input In [63], in get_ltv..group_by_dimensions(df, dims, horizon_days) 64 result = result.div(result['cohort_size'], axis=0) 65 # исключаем все лайфтаймы, превышающие горизонт анализа ---> 66 result = result[['cohort_size'] + list(range(horizon_days))] 67 # восстанавливаем размеры когорт 68 result['cohort_size'] = cohort_sizes File ~/Desktop/anaconda3/lib/python3.9/site-packages/pandas/core/frame.py:3511, in DataFrame.__getitem__(self, key) 3509 if is_iterator(key): 3510 key = list(key) -> 3511 indexer = self.columns._get_indexer_strict(key, "columns")[1] 3513 # take() does not accept boolean indexers 3514 if getattr(indexer, "dtype", None) == bool: File ~/Desktop/anaconda3/lib/python3.9/site-packages/pandas/core/indexes/base.py:5796, in Index._get_indexer_strict(self, key, axis_name) 5793 else: 5794 keyarr, indexer, new_indexer = self._reindex_non_unique(keyarr) -> 5796 self._raise_if_missing(keyarr, indexer, axis_name) 5798 keyarr = self.take(indexer) 5799 if isinstance(key, Index): 5800 # GH 42790 - Preserve name from an Index File ~/Desktop/anaconda3/lib/python3.9/site-packages/pandas/core/indexes/base.py:5859, in Index._raise_if_missing(self, key, indexer, axis_name) 5856 raise KeyError(f"None of [{key}] are in the [{axis_name}]") 5858 not_found = list(ensure_index(key)[missing_mask.nonzero()[0]].unique()) -> 5859 raise KeyError(f"{not_found} not in index") KeyError: '[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13] not in index' conversion_raw, conversion_grouped, conversion_history = get_conversion( profiles, orders, observation_date, horizon_days ) plot_conversion(conversion_grouped, conversion_history, horizon_days) --------------------------------------------------------------------------- KeyError Traceback (most recent call last) Input In [79], in () ----> 1 conversion_raw, conversion_grouped, conversion_history = get_conversion( 2 profiles, orders, observation_date, horizon_days 3 ) 5 plot_conversion(conversion_grouped, conversion_history, horizon_days) Input In [62], in get_conversion(profiles, purchases, observation_date, horizon_days, dimensions, ignore_horizon) 60 return result 62 # получаем таблицу конверсии ---> 63 result_grouped = group_by_dimensions(result_raw, dimensions, horizon_days) 66 if 'cohort' in dimensions: 67 dimensions = [] Input In [62], in get_conversion..group_by_dimensions(df, dims, horizon_days) 55 # делим каждую «ячейку» в строке на размер когорты 56 # и получаем conversion rate 57 result = result.div(result['cohort_size'], axis=0) ---> 58 result = result[['cohort_size'] + list(range(horizon_days))] 59 result['cohort_size'] = cohort_sizes 60 return result File ~/Desktop/anaconda3/lib/python3.9/site-packages/pandas/core/frame.py:3511, in DataFrame.__getitem__(self, key) 3509 if is_iterator(key): 3510 key = list(key) -> 3511 indexer = self.columns._get_indexer_strict(key, "columns")[1] 3513 # take() does not accept boolean indexers 3514 if getattr(indexer, "dtype", None) == bool: File ~/Desktop/anaconda3/lib/python3.9/site-packages/pandas/core/indexes/base.py:5796, in Index._get_indexer_strict(self, key, axis_name) 5793 else: 5794 keyarr, indexer, new_indexer = self._reindex_non_unique(keyarr) -> 5796 self._raise_if_missing(keyarr, indexer, axis_name) 5798 keyarr = self.take(indexer) 5799 if isinstance(key, Index): 5800 # GH 42790 - Preserve name from an Index File ~/Desktop/anaconda3/lib/python3.9/site-packages/pandas/core/indexes/base.py:5859, in Index._raise_if_missing(self, key, indexer, axis_name) 5856 raise KeyError(f"None of [{key}] are in the [{axis_name}]") 5858 not_found = list(ensure_index(key)[missing_mask.nonzero()[0]].unique()) -> 5859 raise KeyError(f"{not_found} not in index") KeyError: '[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13] not in index'