kpi-dashboard/updater/update_app_metrics.py
Iliyas 5d82f7b7f3 feat(Метрики МП): monthly cumulative model + YoY trend sparklines
- updater: group by report_period_id (YYYYmm), current month capped to
  yesterday symmetrically for both years; per-year cumulative reset each Jan;
  adsl CTE now date-bounded for equal-period comparison
- JSON adds months/month_labels + per-metric cur_cum/prev_cum
- page: per-card cumulative sparkline (2025 dashed vs 2026 solid) + legend
2026-06-16 18:00:51 +05:00

284 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Ежедневное обновление статистики «Метрики МП» из Impala.
Модель данных:
• Грузим помесячно (группировка по report_period_id = YYYYmm), начиная с января
прошлого года и до текущего месяца текущего года.
• Текущий (незавершённый) месяц обрезаем по entry_date до ВЧЕРАШНЕГО дня —
симметрично для обоих лет (например, на 16 июня берём данные по 15 июня
включительно и в 2026, и в 2025), чтобы сравнение было «равный период».
• Внутри каждого года считаем нарастающий итог (кумулятив) по месяцам; на старте
нового года накопление сбрасывается.
• Сравнение в карточке = кумулятив-на-дату текущего года vs прошлого года.
Результат пишется в ../app_stats/app_metrics.json и пушится в ветку pages.
Подключение к Impala, конфиг и git-push переиспользуются из update_kpi.py.
"""
import sys
import json
import datetime as dt
from pathlib import Path
import update_kpi as base # общие функции: load_config, connect_impala, git_commit_push, ...
log = base.log
REPO_DIR = base.REPO_DIR
OUT_PATH = REPO_DIR / "app_stats" / "app_metrics.json"
OUT_REL = "app_stats/app_metrics.json"
# ─── Метрики: ключ в SQL → человекочитаемое название (в порядке SELECT) ───
METRICS = [
("my_services", "Мои услуги"),
("traffic", "Детализация трафика"),
("payments", "Платежи"),
("orders", "Заявки"),
("loyalty", "Лояльность"),
("pay", "Оплата"),
("billing_detail", "Детали счета"),
("viktorina", "Викторина KT Club"),
("partners", "Акции партнеров"),
("tv_plus", "TV+"),
("boosters", "Бустеры"),
("roaming", "Роуминг"),
("pereoform", "Переоформление"),
("aitu_music", "Aitu Music"),
("online_booking", "Онлайн очередь"),
("my_docs", "Мои документы"),
("dz_statement", "Справка о ДЗ"),
("new_boosters_roaming_kcell", "Новая линейка бустеров и роумингов Кселл"),
("adsl", "ADSL отключение услуги"),
("law_and_order", "Закон и порядок"),
("acs", "ACS"),
("kaspi_freedom_pay", "Прием платежей через Freedom и Kaspi"),
("csat", "CSAT"),
("multicustomer", "Мультикастомер"),
("tv_plus_setup", "Настройка TV+"),
("static_ip", "Статический IP"),
("turbo_button", "Turbo кнопка"),
("real_estate_docs", "Справка о недвижимости"),
]
METRIC_KEYS = [k for k, _ in METRICS]
_MONTHS_RU_GEN = ["", "января", "февраля", "марта", "апреля", "мая", "июня",
"июля", "августа", "сентября", "октября", "ноября", "декабря"]
_MONTHS_RU_SHORT = ["", "Янв", "Фев", "Мар", "Апр", "Май", "Июн",
"Июл", "Авг", "Сен", "Окт", "Ноя", "Дек"]
def date_bounds(today: dt.date | None = None):
"""Границы выборки. Текущий месяц обрезается по вчерашнему дню (симметрично по годам)."""
today = today or dt.date.today()
end_cur = today - dt.timedelta(days=1) # вчера
cur_year = today.year
prev_year = cur_year - 1
cap_month, cap_day = end_cur.month, end_cur.day
start_cur = dt.date(cur_year, 1, 1)
start_prev = dt.date(prev_year, 1, 1)
try:
end_prev = dt.date(prev_year, cap_month, cap_day)
except ValueError: # 29 февраля в невисокосный год
end_prev = dt.date(prev_year, cap_month, 28)
return {
"cur_year": cur_year, "prev_year": prev_year,
"cap_month": cap_month, "cap_day": cap_day,
"start_cur": start_cur, "end_cur": end_cur,
"start_prev": start_prev, "end_prev": end_prev,
}
def build_sql(b: dict) -> str:
# created_at — timestamp, поэтому верхнюю границу берём как «< следующий день»
cur_end_excl = b["end_cur"] + dt.timedelta(days=1)
prev_end_excl = b["end_prev"] + dt.timedelta(days=1)
return f"""
with t as (
select report_period_id,
count(case when event_type = 'OPENWSCREENMYSERVICES' then 1 end) as my_services,
count(case when event_type = 'OPENWINDOWDETALIZTION' then 1 end) as traffic,
count(case when event_type = 'OPENWINDOWPAYMENT' then 1 end) as payments,
count(case when event_type = 'OPENSCREENAPPEALS' then 1 end) as orders,
count(case when event_type in ('banner_auth', 'banner_unauth', 'loyalty_banner_slider_auth', 'loyalty_banner_slider_unauth', 'get_bonus_opened', 'bonus_opened','promo_partners_opened','company_promo_opened') then 1 end) as loyalty,
count(case when event_type = 'WINDOWPAYMENT' then 1 end) as pay,
count(case when event_type = 'OPENWINDOWBILLING' then 1 end) as billing_detail,
count(case when event_type = 'game_page' then 1 end) as viktorina,
count(case when event_type = 'promo_partners_opened' then 1 end) as partners,
count(case when event_type = 'OPENWINDOWTVPLUS' then 1 end) as tv_plus,
count(case when event_type = 'MOBCONNECTIONOPENWINDOWADDITIONALTRAFFIC' then 1 end) as boosters,
count(case when event_type = 'OPENWINDOWROAMING' then 1 end) as roaming,
count(case when event_type = 'reregistration_comm_start' then 1 end) as pereoform,
count(case when event_type = 'aitu_music_banner_clicked' then 1 end) as aitu_music,
count(case when event_type = 'ONLINE_BOOKING_SERVICES' then 1 end) as online_booking,
count(case when event_type in ('EMPTYLISTDOCS', 'HASLISTDOCS') then 1 end) as my_docs,
count(case when event_type = 'PDFSTATEMENT' then 1 end) as dz_statement,
count(case when event_type in ('booster_success_screen_kcell', 'ROAMINGPACKAGEMOBILEKCELL') then 1 end) as new_boosters_roaming_kcell,
count(case when event_type = 'law_and_order_service_clicked' then 1 end) as law_and_order,
count(case when event_type = 'ACS_DEVICE_SELECTION_OPEN' then 1 end) as acs,
count(case when event_type in ('PAYMENTWASSUCCESSFULFREEDOM', 'PAYWITHKASPI') then 1 end) as kaspi_freedom_pay,
count(case when event_type = 'csat_screen_sent' then 1 end) as csat,
count(case when event_type = 'multicustomer_completed_screen_viewed' then 1 end) as multicustomer,
count(case when event_type = 'tv_plus_setup_success_viewed' then 1 end) as tv_plus_setup,
count(case when event_type = 'static_ip_connect_success_viewed' then 1 end) as static_ip,
count(case when event_type = 'turbo_activation_success_viewed' then 1 end) as turbo_button,
count(case when event_type = 'real_estate_docs_screen_shown' then 1 end) as real_estate_docs
from drb.drb_iliyas_amplitude_metrics_full
where entry_date between '{b['start_cur']:%Y-%m-%d}' and '{b['end_cur']:%Y-%m-%d}'
or entry_date between '{b['start_prev']:%Y-%m-%d}' and '{b['end_prev']:%Y-%m-%d}'
group by report_period_id
)
, a as (
select (year(created_at) * 100 + month(created_at)) as report_period_id,
count(order_id) as adsl
from telecomkz.telecomkz_retention_service_prod_tariff_change_validations
where (created_at >= '{b['start_cur']:%Y-%m-%d}' and created_at < '{cur_end_excl:%Y-%m-%d}')
or (created_at >= '{b['start_prev']:%Y-%m-%d}' and created_at < '{prev_end_excl:%Y-%m-%d}')
group by 1
)
select t.report_period_id, my_services, traffic, payments, orders, loyalty, pay, billing_detail, viktorina, partners, tv_plus, boosters, roaming, pereoform, aitu_music, online_booking, my_docs, dz_statement, new_boosters_roaming_kcell, adsl, law_and_order, acs, kaspi_freedom_pay, csat, multicustomer,
tv_plus_setup, static_ip, turbo_button, real_estate_docs
from t
left join a on t.report_period_id = a.report_period_id
order by t.report_period_id
""".strip()
def fetch_monthly(conn, sql):
cur = conn.cursor()
log.info("Выполнение запроса метрик МП (помесячно)...")
cur.execute(sql)
rows = cur.fetchall()
names = [d[0].lower() for d in cur.description]
cur.close()
idx = {n: i for i, n in enumerate(names)}
out = []
for r in rows:
rec = {"report_period_id": int(r[idx["report_period_id"]])}
for key in METRIC_KEYS:
v = r[idx[key]] if key in idx else None
rec[key] = int(v) if v is not None else 0
out.append(rec)
log.info("Получено помесячных строк: %d (периоды: %s)",
len(out), ", ".join(str(x["report_period_id"]) for x in out))
return out
def _cumsum(arr):
acc, out = 0, []
for v in arr:
acc += v
out.append(acc)
return out
def build_payload(rows, b: dict):
cur_year, prev_year, cap_month = b["cur_year"], b["prev_year"], b["cap_month"]
months = list(range(1, cap_month + 1))
month_labels = [_MONTHS_RU_SHORT[m] for m in months]
# key -> {year -> [monthly values по месяцам months]}
monthly = {k: {cur_year: [0] * len(months), prev_year: [0] * len(months)} for k in METRIC_KEYS}
for rec in rows:
rpid = rec["report_period_id"]
y, m = rpid // 100, rpid % 100
if y not in (cur_year, prev_year) or m not in months:
continue
i = months.index(m)
for k in METRIC_KEYS:
monthly[k][y][i] = rec[k]
metrics = []
for key, label in METRICS:
cur_cum = _cumsum(monthly[key][cur_year])
prev_cum = _cumsum(monthly[key][prev_year])
cur_v = cur_cum[-1] if cur_cum else 0
prev_v = prev_cum[-1] if prev_cum else 0
is_new = prev_v == 0
growth = None if is_new else (cur_v - prev_v) / prev_v
metrics.append({
"key": key, "label": label,
"cur": cur_v, "prev": prev_v,
"growth": growth, "is_new": is_new,
"cur_cum": cur_cum, "prev_cum": prev_cum,
})
end_cur = b["end_cur"]
period_label = f"с 1 января по {end_cur.day} {_MONTHS_RU_GEN[end_cur.month]}"
return {
"generated_at": dt.datetime.now().isoformat(timespec="seconds"),
"cur_year": cur_year,
"prev_year": prev_year,
"period_label": period_label,
"range": {"start": f"{b['start_cur']:%Y-%m-%d}", "end": f"{end_cur:%Y-%m-%d}"},
"months": months,
"month_labels": month_labels,
"cumulative": True,
"metrics": metrics,
}
def write_if_changed(payload) -> bool:
text = json.dumps(payload, ensure_ascii=False, indent=2) + "\n"
old = OUT_PATH.read_text(encoding="utf-8") if OUT_PATH.exists() else ""
def strip_ts(s):
try:
d = json.loads(s)
d.pop("generated_at", None)
return json.dumps(d, ensure_ascii=False, sort_keys=True)
except Exception:
return s
if strip_ts(old) == strip_ts(text):
log.info("Метрики МП не изменились — коммит не требуется.")
return False
OUT_PATH.parent.mkdir(parents=True, exist_ok=True)
OUT_PATH.write_text(text, encoding="utf-8")
log.info("app_metrics.json обновлён (%d метрик, %d мес.).",
len(payload["metrics"]), len(payload["months"]))
return True
def main() -> int:
log.info("=" * 60)
log.info("Старт обновления Метрик МП")
cfg = base.load_config()
b = date_bounds()
log.info("Период (тек.): %s..%s | (пред.): %s..%s | месяцев: %d",
b["start_cur"], b["end_cur"], b["start_prev"], b["end_prev"], b["cap_month"])
sql = build_sql(b)
base.patch_thrift_ssl()
conn = base.connect_impala(cfg)
try:
rows = fetch_monthly(conn, sql)
finally:
try:
conn.close()
except Exception:
pass
cur_year = b["cur_year"]
if not any(r["report_period_id"] // 100 == cur_year for r in rows):
log.error("В ответе нет данных за %s — JSON НЕ перезаписан.", cur_year)
return 1
payload = build_payload(rows, b)
if write_if_changed(payload):
base.git_commit_push(cfg, [OUT_REL],
f"data: update app metrics {dt.date.today():%Y-%m-%d}")
log.info("Готово (Метрики МП).")
return 0
if __name__ == "__main__":
try:
sys.exit(main())
except Exception as e: # noqa: BLE001
log.exception("ОШИБКА (Метрики МП): %s", e)
sys.exit(1)