# функция для создания пользовательских профилей
def get_profiles(sessions, orders, ad_costs):
# находим параметры первых посещений
profiles = (
sessions.sort_values(by=['user_id', 'session_start'])
.groupby('user_id')
.agg(
{
'session_start': 'first',
'channel': 'first',
'device': 'first',
'region': 'first',
}
)
.rename(columns={'session_start': 'first_ts'})
.reset_index()
)
# для когортного анализа определяем дату первого посещения
# и первый день месяца, в который это посещение произошло
profiles['dt'] = profiles['first_ts'].dt.date
profiles['dt'] = pd.to_datetime(profiles['dt'], format="%Y-%m-%d")
profiles['month'] = profiles['first_ts'].astype('datetime64[M]')
# добавляем признак платящих пользователей
profiles['payer'] = profiles['user_id'].isin(orders['user_id'].unique())
# считаем количество уникальных пользователей
# с одинаковыми источником и датой привлечения
new_users = (
profiles.groupby(['dt', 'channel'])
.agg({'user_id': 'nunique'})
.rename(columns={'user_id': 'unique_users'})
.reset_index()
)
# объединяем траты на рекламу и число привлечённых пользователей
ad_costs = ad_costs.merge(new_users, on=['dt', 'channel'], how='left')
# делим рекламные расходы на число привлечённых пользователей
# результаты сохраним в столбец acquisition_cost (CAC)
ad_costs['acquisition_cost'] = ad_costs['costs'] / ad_costs['unique_users']
# добавляем стоимость привлечения в профили
profiles = profiles.merge(
ad_costs[['dt', 'channel', 'acquisition_cost']],
on=['dt', 'channel'],
how='left',
)
# стоимость привлечения органических пользователей равна нулю
profiles['acquisition_cost'] = profiles['acquisition_cost'].fillna(0)
return profiles
# функция для расчёта конверсии
def get_conversion(
profiles,
purchases,
observation_date,
horizon_days,
dimensions=[],
ignore_horizon=False,
):
# исключаем пользователей, не «доживших» до горизонта анализа
last_suitable_acquisition_date = observation_date
if not ignore_horizon:
last_suitable_acquisition_date = observation_date - timedelta(
days=horizon_days - 1
)
result_raw = profiles.query('dt <= @last_suitable_acquisition_date')
# определяем дату и время первой покупки для каждого пользователя
first_purchases = (
purchases.sort_values(by=['user_id', 'event_dt'])
.groupby('user_id')
.agg({'event_dt': 'first'})
.reset_index()
)
# собираем «сырые» данные для расчёта удержания
result_raw = result_raw.merge(
first_purchases[['user_id', 'event_dt']], on='user_id', how='left'
)
result_raw['lifetime'] = (
result_raw['event_dt'] - result_raw['first_ts']
).dt.days
if len(dimensions) == 0:
result_raw['cohort'] = 'All users'
dimensions = dimensions + ['cohort']
# функция для группировки таблицы по желаемым признакам
def group_by_dimensions(df, dims, horizon_days):
result = df.pivot_table(
index=dims, columns='lifetime', values='user_id', aggfunc='nunique'
)
result = result.fillna(0).cumsum(axis = 1)
cohort_sizes = (
df.groupby(dims)
.agg({'user_id': 'nunique'})
.rename(columns={'user_id': 'cohort_size'})
)
result = cohort_sizes.merge(result, on=dims, how='left').fillna(0)
# делим каждую «ячейку» в строке на размер когорты
# и получаем conversion rate
result = result.div(result['cohort_size'], axis=0)
result = result[['cohort_size'] + list(range(horizon_days))]
result['cohort_size'] = cohort_sizes
return result
# получаем таблицу конверсии
result_grouped = group_by_dimensions(result_raw, dimensions, horizon_days)
if 'cohort' in dimensions:
dimensions = []
# получаем таблицу динамики конверсии
result_in_time = group_by_dimensions(
result_raw, dimensions + ['dt'], horizon_days
)
# возвращаем обе таблицы и сырые данные
return result_raw, result_grouped, result_in_time
def get_ltv(
profiles, # Шаг 1. Получить профили и данные о покупках
purchases,
observation_date,
horizon_days,
dimensions=[],
ignore_horizon=False,
):
# исключаем пользователей, не «доживших» до горизонта анализа
last_suitable_acquisition_date = observation_date
if not ignore_horizon:
last_suitable_acquisition_date = observation_date - timedelta(
days=horizon_days - 1
)
result_raw = profiles.query('dt <= @last_suitable_acquisition_date')
# Шаг 2. Добавить данные о покупках в профили
result_raw = result_raw.merge(
# добавляем в профили время совершения покупок и выручку
purchases[['user_id', 'event_dt', 'revenue']],
on='user_id',
how='left',
)
# Шаг 3. Рассчитать лайфтайм пользователя для каждой покупки
result_raw['lifetime'] = (
result_raw['event_dt'] - result_raw['first_ts']
).dt.days
# группируем по cohort, если в dimensions ничего нет
if len(dimensions) == 0:
result_raw['cohort'] = 'All users'
dimensions = dimensions + ['cohort']
# функция для группировки таблицы по желаемым признакам
def group_by_dimensions(df, dims, horizon_days):
# Шаг 3. Построить таблицу выручки
# строим «треугольную» таблицу
result = df.pivot_table(
index=dims,
columns='lifetime',
values='revenue', # в ячейках — выручка за каждый лайфтайм
aggfunc='sum',
)
# Шаг 4. Посчитать сумму выручки с накоплением
result = result.fillna(0).cumsum(axis=1)
# Шаг 5. Вычислить размеры когорт
cohort_sizes = (
df.groupby(dims)
.agg({'user_id': 'nunique'})
.rename(columns={'user_id': 'cohort_size'})
)
# Шаг 6. Объединить размеры когорт и таблицу выручки
result = cohort_sizes.merge(result, on=dims, how='left').fillna(0)
# Шаг 7. Посчитать LTV
# делим каждую «ячейку» в строке на размер когорты
result = result.div(result['cohort_size'], axis=0)
# исключаем все лайфтаймы, превышающие горизонт анализа
result = result[['cohort_size'] + list(range(horizon_days))]
# восстанавливаем размеры когорт
result['cohort_size'] = cohort_sizes
# сохраняем в датафрейм данные пользователей и значения CAC,
# добавив параметры из dimensions
cac = df[['user_id', 'acquisition_cost'] + dims].drop_duplicates()
# считаем средний CAC по параметрам из dimensions
cac = (
cac.groupby(dims)
.agg({'acquisition_cost': 'mean'})
.rename(columns={'acquisition_cost': 'cac'})
)
# считаем ROI: делим LTV на CAC
roi = result.div(cac['cac'], axis=0)
# удаляем строки с бесконечным ROI
roi = roi[~roi['cohort_size'].isin([np.inf])]
# восстанавливаем размеры когорт в таблице ROI
roi['cohort_size'] = cohort_sizes
# добавляем CAC в таблицу ROI
roi['cac'] = cac['cac']
# в финальной таблице оставляем размеры когорт, CAC
# и ROI в лайфтаймы, не превышающие горизонт анализа
roi = roi[['cohort_size', 'cac'] + list(range(horizon_days))]
# возвращаем таблицы LTV и ROI
return result, roi
# получаем таблицы LTV и ROI
result_grouped, roi_grouped = group_by_dimensions(
result_raw, dimensions, horizon_days
)
# для таблиц динамики убираем 'cohort' из dimensions
if 'cohort' in dimensions:
dimensions = []
# получаем таблицы динамики LTV и ROI
result_in_time, roi_in_time = group_by_dimensions(
result_raw, dimensions + ['dt'], horizon_days
)
return (
result_raw, # сырые данные
result_grouped, # таблица LTV
result_in_time, # таблица динамики LTV
roi_grouped, # таблица ROI
roi_in_time, # таблица динамики ROI
)
def get_retention(
profiles,
sessions,
observation_date,
horizon_days,
dimensions=[],
ignore_horizon=False,
):
# добавляем столбец payer в передаваемый dimensions список
dimensions = ['payer'] + dimensions
# исключаем пользователей, не «доживших» до горизонта анализа
last_suitable_acquisition_date = observation_date
if not ignore_horizon:
last_suitable_acquisition_date = observation_date - timedelta(
days=horizon_days - 1
)
result_raw = profiles.query('dt <= @last_suitable_acquisition_date')
# собираем «сырые» данные для расчёта удержания
result_raw = result_raw.merge(
sessions[['user_id', 'session_start']], on='user_id', how='left'
)
result_raw['lifetime'] = (
result_raw['session_start'] - result_raw['first_ts']
).dt.days
# функция для группировки таблицы по желаемым признакам
def group_by_dimensions(df, dims, horizon_days):
result = df.pivot_table(
index=dims, columns='lifetime', values='user_id', aggfunc='nunique'
)
cohort_sizes = (
df.groupby(dims)
.agg({'user_id': 'nunique'})
.rename(columns={'user_id': 'cohort_size'})
)
result = cohort_sizes.merge(result, on=dims, how='left').fillna(0)
result = result.div(result['cohort_size'], axis=0)
result = result[['cohort_size'] + list(range(horizon_days))]
result['cohort_size'] = cohort_sizes
return result
# получаем таблицу удержания
result_grouped = group_by_dimensions(result_raw, dimensions, horizon_days)
# получаем таблицу динамики удержания
result_in_time = group_by_dimensions(
result_raw, dimensions + ['dt'], horizon_days
)
# возвращаем обе таблицы и сырые данные
return result_raw, result_grouped, result_in_time
# функция для сглаживания фрейма
def filter_data(df, window):
# для каждого столбца применяем скользящее среднее
for column in df.columns.values:
df[column] = df[column].rolling(window).mean()
return df
# функция для визуализации удержания
def plot_retention(retention, retention_history, horizon, window=7):
# задаём размер сетки для графиков
plt.figure(figsize=(15, 10))
# исключаем размеры когорт и удержание первого дня
retention = retention.drop(columns=['cohort_size', 0])
# в таблице динамики оставляем только нужный лайфтайм
retention_history = retention_history.drop(columns=['cohort_size'])[
[horizon - 1]
]
# если в индексах таблицы удержания только payer,
# добавляем второй признак — cohort
if retention.index.nlevels == 1:
retention['cohort'] = 'All users'
retention = retention.reset_index().set_index(['cohort', 'payer'])
# в таблице графиков — два столбца и две строки, четыре ячейки
# в первой строим кривые удержания платящих пользователей
ax1 = plt.subplot(2, 2, 1)
retention.query('payer == True').droplevel('payer').T.plot(
grid=True, ax=ax1
)
plt.legend()
plt.xlabel('Лайфтайм')
plt.title('Удержание платящих пользователей')
# во второй ячейке строим кривые удержания неплатящих
# вертикальная ось — от графика из первой ячейки
ax2 = plt.subplot(2, 2, 2, sharey=ax1)
retention.query('payer == False').droplevel('payer').T.plot(
grid=True, ax=ax2
)
plt.legend()
plt.xlabel('Лайфтайм')
plt.title('Удержание неплатящих пользователей')
# в третьей ячейке — динамика удержания платящих
ax3 = plt.subplot(2, 2, 3)
# получаем названия столбцов для сводной таблицы
columns = [
name
for name in retention_history.index.names
if name not in ['dt', 'payer']
]
# фильтруем данные и строим график
filtered_data = retention_history.query('payer == True').pivot_table(
index='dt', columns=columns, values=horizon - 1, aggfunc='mean'
)
filter_data(filtered_data, window).plot(grid=True, ax=ax3)
plt.xlabel('Дата привлечения')
plt.title(
'Динамика удержания платящих пользователей на {}-й день'.format(
horizon
)
)
# в чётвертой ячейке — динамика удержания неплатящих
ax4 = plt.subplot(2, 2, 4, sharey=ax3)
# фильтруем данные и строим график
filtered_data = retention_history.query('payer == False').pivot_table(
index='dt', columns=columns, values=horizon - 1, aggfunc='mean'
)
filter_data(filtered_data, window).plot(grid=True, ax=ax4)
plt.xlabel('Дата привлечения')
plt.title(
'Динамика удержания неплатящих пользователей на {}-й день'.format(
horizon
)
)
plt.tight_layout()
plt.show()
# функция для визуализации конверсии
def plot_conversion(conversion, conversion_history, horizon, window=7):
# задаём размер сетки для графиков
plt.figure(figsize=(15, 5))
# исключаем размеры когорт
conversion = conversion.drop(columns=['cohort_size'])
# в таблице динамики оставляем только нужный лайфтайм
conversion_history = conversion_history.drop(columns=['cohort_size'])[
[horizon - 1]
]
# первый график — кривые конверсии
ax1 = plt.subplot(1, 2, 1)
conversion.T.plot(grid=True, ax=ax1)
plt.legend()
plt.xlabel('Лайфтайм')
plt.title('Конверсия пользователей')
# второй график — динамика конверсии
ax2 = plt.subplot(1, 2, 2, sharey=ax1)
columns = [
# столбцами сводной таблицы станут все столбцы индекса, кроме даты
name for name in conversion_history.index.names if name not in ['dt']
]
filtered_data = conversion_history.pivot_table(
index='dt', columns=columns, values=horizon - 1, aggfunc='mean'
)
filter_data(filtered_data, window).plot(grid=True, ax=ax2)
plt.xlabel('Дата привлечения')
plt.title('Динамика конверсии пользователей на {}-й день'.format(horizon))
plt.tight_layout()
plt.show()
# функция для визуализации LTV и ROI
def plot_ltv_roi(ltv, ltv_history, roi, roi_history, horizon, window=7):
# задаём сетку отрисовки графиков
plt.figure(figsize=(20, 10))
# из таблицы ltv исключаем размеры когорт
ltv = ltv.drop(columns=['cohort_size'])
# в таблице динамики ltv оставляем только нужный лайфтайм
ltv_history = ltv_history.drop(columns=['cohort_size'])[[horizon - 1]]
# стоимость привлечения запишем в отдельный фрейм
cac_history = roi_history[['cac']]
# из таблицы roi исключаем размеры когорт и cac
roi = roi.drop(columns=['cohort_size', 'cac'])
# в таблице динамики roi оставляем только нужный лайфтайм
roi_history = roi_history.drop(columns=['cohort_size', 'cac'])[
[horizon - 1]
]
# кривые ltv
ax1 = plt.subplot(2, 3, 1)
ltv.T.plot(grid=True, ax=ax1)
plt.legend()
plt.xlabel('Лайфтайм')
plt.title('LTV')
# динамика ltv
ax2 = plt.subplot(2, 3, 2, sharey=ax1)
# столбцами сводной таблицы станут все столбцы индекса, кроме даты
columns = [name for name in ltv_history.index.names if name not in ['dt']]
filtered_data = ltv_history.pivot_table(
index='dt', columns=columns, values=horizon - 1, aggfunc='mean'
)
filter_data(filtered_data, window).plot(grid=True, ax=ax2)
plt.xlabel('Дата привлечения')
plt.title('Динамика LTV пользователей на {}-й день'.format(horizon))
# динамика cac
ax3 = plt.subplot(2, 3, 3, sharey=ax1)
# столбцами сводной таблицы станут все столбцы индекса, кроме даты
columns = [name for name in cac_history.index.names if name not in ['dt']]
filtered_data = cac_history.pivot_table(
index='dt', columns=columns, values='cac', aggfunc='mean'
)
filter_data(filtered_data, window).plot(grid=True, ax=ax3)
plt.xlabel('Дата привлечения')
plt.title('Динамика стоимости привлечения пользователей')
# кривые roi
ax4 = plt.subplot(2, 3, 4)
roi.T.plot(grid=True, ax=ax4)
plt.axhline(y=1, color='red', linestyle='--', label='Уровень окупаемости')
plt.legend()
plt.xlabel('Лайфтайм')
plt.title('ROI')
# динамика roi
ax5 = plt.subplot(2, 3, 5, sharey=ax4)
# столбцами сводной таблицы станут все столбцы индекса, кроме даты
columns = [name for name in roi_history.index.names if name not in ['dt']]
filtered_data = roi_history.pivot_table(
index='dt', columns=columns, values=horizon - 1, aggfunc='mean'
)
filter_data(filtered_data, window).plot(grid=True, ax=ax5)
plt.axhline(y=1, color='red', linestyle='--', label='Уровень окупаемости')
plt.xlabel('Дата привлечения')
plt.title('Динамика ROI пользователей на {}-й день'.format(horizon))
plt.tight_layout()
plt.show()
# считаем LTV и ROI
ltv_raw, ltv_grouped, ltv_history, roi_grouped, roi_history = get_ltv(
profiles, orders, observation_date, horizon_days
)
# строим графики
plot_ltv_roi(ltv_grouped, ltv_history, roi_grouped, roi_history, horizon_days)
---------------------------------------------------------------------------
KeyError Traceback (most recent call last)
Input In [80], in <cell line: 2>()
1 # считаем LTV и ROI
----> 2 ltv_raw, ltv_grouped, ltv_history, roi_grouped, roi_history = get_ltv(
3 profiles, orders, observation_date, horizon_days
4 )
6 # строим графики
7 plot_ltv_roi(ltv_grouped, ltv_history, roi_grouped, roi_history, horizon_days)
Input In [63], in get_ltv(profiles, purchases, observation_date, horizon_days, dimensions, ignore_horizon)
97 return result, roi
99 # получаем таблицы LTV и ROI
--> 100 result_grouped, roi_grouped = group_by_dimensions(
101 result_raw, dimensions, horizon_days
102 )
104 # для таблиц динамики убираем 'cohort' из dimensions
105 if 'cohort' in dimensions:
Input In [63], in get_ltv.<locals>.group_by_dimensions(df, dims, horizon_days)
64 result = result.div(result['cohort_size'], axis=0)
65 # исключаем все лайфтаймы, превышающие горизонт анализа
---> 66 result = result[['cohort_size'] + list(range(horizon_days))]
67 # восстанавливаем размеры когорт
68 result['cohort_size'] = cohort_sizes
File ~/Desktop/anaconda3/lib/python3.9/site-packages/pandas/core/frame.py:3511, in DataFrame.__getitem__(self, key)
3509 if is_iterator(key):
3510 key = list(key)
-> 3511 indexer = self.columns._get_indexer_strict(key, "columns")[1]
3513 # take() does not accept boolean indexers
3514 if getattr(indexer, "dtype", None) == bool:
File ~/Desktop/anaconda3/lib/python3.9/site-packages/pandas/core/indexes/base.py:5796, in Index._get_indexer_strict(self, key, axis_name)
5793 else:
5794 keyarr, indexer, new_indexer = self._reindex_non_unique(keyarr)
-> 5796 self._raise_if_missing(keyarr, indexer, axis_name)
5798 keyarr = self.take(indexer)
5799 if isinstance(key, Index):
5800 # GH 42790 - Preserve name from an Index
File ~/Desktop/anaconda3/lib/python3.9/site-packages/pandas/core/indexes/base.py:5859, in Index._raise_if_missing(self, key, indexer, axis_name)
5856 raise KeyError(f"None of [{key}] are in the [{axis_name}]")
5858 not_found = list(ensure_index(key)[missing_mask.nonzero()[0]].unique())
-> 5859 raise KeyError(f"{not_found} not in index")
KeyError: '[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13] not in index'
conversion_raw, conversion_grouped, conversion_history = get_conversion(
profiles, orders, observation_date, horizon_days
)
plot_conversion(conversion_grouped, conversion_history, horizon_days)
---------------------------------------------------------------------------
KeyError Traceback (most recent call last)
Input In [79], in <cell line: 1>()
----> 1 conversion_raw, conversion_grouped, conversion_history = get_conversion(
2 profiles, orders, observation_date, horizon_days
3 )
5 plot_conversion(conversion_grouped, conversion_history, horizon_days)
Input In [62], in get_conversion(profiles, purchases, observation_date, horizon_days, dimensions, ignore_horizon)
60 return result
62 # получаем таблицу конверсии
---> 63 result_grouped = group_by_dimensions(result_raw, dimensions, horizon_days)
66 if 'cohort' in dimensions:
67 dimensions = []
Input In [62], in get_conversion.<locals>.group_by_dimensions(df, dims, horizon_days)
55 # делим каждую «ячейку» в строке на размер когорты
56 # и получаем conversion rate
57 result = result.div(result['cohort_size'], axis=0)
---> 58 result = result[['cohort_size'] + list(range(horizon_days))]
59 result['cohort_size'] = cohort_sizes
60 return result
File ~/Desktop/anaconda3/lib/python3.9/site-packages/pandas/core/frame.py:3511, in DataFrame.__getitem__(self, key)
3509 if is_iterator(key):
3510 key = list(key)
-> 3511 indexer = self.columns._get_indexer_strict(key, "columns")[1]
3513 # take() does not accept boolean indexers
3514 if getattr(indexer, "dtype", None) == bool:
File ~/Desktop/anaconda3/lib/python3.9/site-packages/pandas/core/indexes/base.py:5796, in Index._get_indexer_strict(self, key, axis_name)
5793 else:
5794 keyarr, indexer, new_indexer = self._reindex_non_unique(keyarr)
-> 5796 self._raise_if_missing(keyarr, indexer, axis_name)
5798 keyarr = self.take(indexer)
5799 if isinstance(key, Index):
5800 # GH 42790 - Preserve name from an Index
File ~/Desktop/anaconda3/lib/python3.9/site-packages/pandas/core/indexes/base.py:5859, in Index._raise_if_missing(self, key, indexer, axis_name)
5856 raise KeyError(f"None of [{key}] are in the [{axis_name}]")
5858 not_found = list(ensure_index(key)[missing_mask.nonzero()[0]].unique())
-> 5859 raise KeyError(f"{not_found} not in index")
KeyError: '[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13] not in index'