kpi-dashboard/updater/update_app_metrics.py
Iliyas b3ec72ca34 feat(Метрики МП): top KPI cards, grid/list toggle, clickable cards with chart modal
- 4 top KPI cards (registrations, downloads by OS+total, MAU, DAU) from
  drb_iliyas_telecomkz_daily_info + telecomkz_mau_stats; JSON gets top{} block
- grid/list view toggle (list = full-width cards), persisted in localStorage
- click any card -> modal with Chart.js line chart + per-month table; tooltip
  shows monthly delta for both years; ESC/overlay/✕ to close
2026-06-17 13:09:00 +05:00

401 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Ежедневное обновление статистики «Метрики МП» из Impala.
Модель данных:
• Грузим помесячно (группировка по report_period_id = YYYYmm), начиная с января
прошлого года и до текущего месяца текущего года.
• Текущий (незавершённый) месяц обрезаем по entry_date до ВЧЕРАШНЕГО дня —
симметрично для обоих лет (например, на 16 июня берём данные по 15 июня
включительно и в 2026, и в 2025), чтобы сравнение было «равный период».
• Внутри каждого года считаем нарастающий итог (кумулятив) по месяцам; на старте
нового года накопление сбрасывается.
• Сравнение в карточке = кумулятив-на-дату текущего года vs прошлого года.
Результат пишется в ../app_stats/app_metrics.json и пушится в ветку pages.
Подключение к Impala, конфиг и git-push переиспользуются из update_kpi.py.
"""
import sys
import json
import datetime as dt
from pathlib import Path
import update_kpi as base # общие функции: load_config, connect_impala, git_commit_push, ...
log = base.log
REPO_DIR = base.REPO_DIR
OUT_PATH = REPO_DIR / "app_stats" / "app_metrics.json"
OUT_REL = "app_stats/app_metrics.json"
# ─── Метрики: ключ в SQL → человекочитаемое название (в порядке SELECT) ───
METRICS = [
("my_services", "Мои услуги"),
("traffic", "Детализация трафика"),
("payments", "Платежи"),
("orders", "Заявки"),
("loyalty", "Лояльность"),
("pay", "Оплата"),
("billing_detail", "Детали счета"),
("viktorina", "Викторина KT Club"),
("partners", "Акции партнеров"),
("tv_plus", "TV+"),
("boosters", "Бустеры"),
("roaming", "Роуминг"),
("pereoform", "Переоформление"),
("aitu_music", "Aitu Music"),
("online_booking", "Онлайн очередь"),
("my_docs", "Мои документы"),
("dz_statement", "Справка о ДЗ"),
("new_boosters_roaming_kcell", "Новая линейка бустеров и роумингов Кселл"),
("adsl", "ADSL отключение услуги"),
("law_and_order", "Закон и порядок"),
("acs", "ACS"),
("kaspi_freedom_pay", "Прием платежей через Freedom и Kaspi"),
("csat", "CSAT"),
("multicustomer", "Мультикастомер"),
("tv_plus_setup", "Настройка TV+"),
("static_ip", "Статический IP"),
("turbo_button", "Turbo кнопка"),
("real_estate_docs", "Справка о недвижимости"),
]
METRIC_KEYS = [k for k, _ in METRICS]
_MONTHS_RU_GEN = ["", "января", "февраля", "марта", "апреля", "мая", "июня",
"июля", "августа", "сентября", "октября", "ноября", "декабря"]
_MONTHS_RU_SHORT = ["", "Янв", "Фев", "Мар", "Апр", "Май", "Июн",
"Июл", "Авг", "Сен", "Окт", "Ноя", "Дек"]
def date_bounds(today: dt.date | None = None):
"""Границы выборки. Текущий месяц обрезается по вчерашнему дню (симметрично по годам)."""
today = today or dt.date.today()
end_cur = today - dt.timedelta(days=1) # вчера
cur_year = today.year
prev_year = cur_year - 1
cap_month, cap_day = end_cur.month, end_cur.day
start_cur = dt.date(cur_year, 1, 1)
start_prev = dt.date(prev_year, 1, 1)
try:
end_prev = dt.date(prev_year, cap_month, cap_day)
except ValueError: # 29 февраля в невисокосный год
end_prev = dt.date(prev_year, cap_month, 28)
return {
"cur_year": cur_year, "prev_year": prev_year,
"cap_month": cap_month, "cap_day": cap_day,
"start_cur": start_cur, "end_cur": end_cur,
"start_prev": start_prev, "end_prev": end_prev,
}
def build_sql(b: dict) -> str:
# created_at — timestamp, поэтому верхнюю границу берём как «< следующий день»
cur_end_excl = b["end_cur"] + dt.timedelta(days=1)
prev_end_excl = b["end_prev"] + dt.timedelta(days=1)
return f"""
with t as (
select report_period_id,
count(case when event_type = 'OPENWSCREENMYSERVICES' then 1 end) as my_services,
count(case when event_type = 'OPENWINDOWDETALIZTION' then 1 end) as traffic,
count(case when event_type = 'OPENWINDOWPAYMENT' then 1 end) as payments,
count(case when event_type = 'OPENSCREENAPPEALS' then 1 end) as orders,
count(case when event_type in ('banner_auth', 'banner_unauth', 'loyalty_banner_slider_auth', 'loyalty_banner_slider_unauth', 'get_bonus_opened', 'bonus_opened','promo_partners_opened','company_promo_opened') then 1 end) as loyalty,
count(case when event_type = 'WINDOWPAYMENT' then 1 end) as pay,
count(case when event_type = 'OPENWINDOWBILLING' then 1 end) as billing_detail,
count(case when event_type = 'game_page' then 1 end) as viktorina,
count(case when event_type = 'promo_partners_opened' then 1 end) as partners,
count(case when event_type = 'OPENWINDOWTVPLUS' then 1 end) as tv_plus,
count(case when event_type = 'MOBCONNECTIONOPENWINDOWADDITIONALTRAFFIC' then 1 end) as boosters,
count(case when event_type = 'OPENWINDOWROAMING' then 1 end) as roaming,
count(case when event_type = 'reregistration_comm_start' then 1 end) as pereoform,
count(case when event_type = 'aitu_music_banner_clicked' then 1 end) as aitu_music,
count(case when event_type = 'ONLINE_BOOKING_SERVICES' then 1 end) as online_booking,
count(case when event_type in ('EMPTYLISTDOCS', 'HASLISTDOCS') then 1 end) as my_docs,
count(case when event_type = 'PDFSTATEMENT' then 1 end) as dz_statement,
count(case when event_type in ('booster_success_screen_kcell', 'ROAMINGPACKAGEMOBILEKCELL') then 1 end) as new_boosters_roaming_kcell,
count(case when event_type = 'law_and_order_service_clicked' then 1 end) as law_and_order,
count(case when event_type = 'ACS_DEVICE_SELECTION_OPEN' then 1 end) as acs,
count(case when event_type in ('PAYMENTWASSUCCESSFULFREEDOM', 'PAYWITHKASPI') then 1 end) as kaspi_freedom_pay,
count(case when event_type = 'csat_screen_sent' then 1 end) as csat,
count(case when event_type = 'multicustomer_completed_screen_viewed' then 1 end) as multicustomer,
count(case when event_type = 'tv_plus_setup_success_viewed' then 1 end) as tv_plus_setup,
count(case when event_type = 'static_ip_connect_success_viewed' then 1 end) as static_ip,
count(case when event_type = 'turbo_activation_success_viewed' then 1 end) as turbo_button,
count(case when event_type = 'real_estate_docs_screen_shown' then 1 end) as real_estate_docs
from drb.drb_iliyas_amplitude_metrics_full
where entry_date between '{b['start_cur']:%Y-%m-%d}' and '{b['end_cur']:%Y-%m-%d}'
or entry_date between '{b['start_prev']:%Y-%m-%d}' and '{b['end_prev']:%Y-%m-%d}'
group by report_period_id
)
, a as (
select (year(created_at) * 100 + month(created_at)) as report_period_id,
count(order_id) as adsl
from telecomkz.telecomkz_retention_service_prod_tariff_change_validations
where (created_at >= '{b['start_cur']:%Y-%m-%d}' and created_at < '{cur_end_excl:%Y-%m-%d}')
or (created_at >= '{b['start_prev']:%Y-%m-%d}' and created_at < '{prev_end_excl:%Y-%m-%d}')
group by 1
)
select t.report_period_id, my_services, traffic, payments, orders, loyalty, pay, billing_detail, viktorina, partners, tv_plus, boosters, roaming, pereoform, aitu_music, online_booking, my_docs, dz_statement, new_boosters_roaming_kcell, adsl, law_and_order, acs, kaspi_freedom_pay, csat, multicustomer,
tv_plus_setup, static_ip, turbo_button, real_estate_docs
from t
left join a on t.report_period_id = a.report_period_id
order by t.report_period_id
""".strip()
# ─── Верхние KPI: регистрации / скачивания / MAU / DAU ───
TOP_SQL = """
with t as (
select report_period_id, entry_date, sum(registered) over (order by entry_date) as registered_total, sum(ios_installs) over (order by entry_date) as ios_installs, sum(android_installs) over (order by entry_date) as android_installs,
sum(ios_installs+android_installs) over (order by entry_date) as sum_installs
from drb.drb_iliyas_telecomkz_daily_info
)
, t1 as (
select period as report_period_id,
cast(date_add(DATE '1970-01-01', entry_date) as date) AS entry_date,
dau_wo_none_plat as dau, mau_wo_none_plat as mau
from drb.drb_iliyas_telecomkz_mau_stats
where cast(date_add(DATE '1970-01-01', entry_date) as date) < cast(now() as date)
)
select t.report_period_id, t.entry_date, t.registered_total, t.ios_installs, t.android_installs, t.sum_installs, t1.mau, t1.dau
from t
left join t1 on t.entry_date = t1.entry_date
""".strip()
TOP_TREND_MONTHS = 13 # сколько последних месяцев показывать в тренде верхних карточек
def fetch_top(conn):
cur = conn.cursor()
log.info("Выполнение запроса верхних KPI...")
cur.execute(TOP_SQL)
rows = cur.fetchall()
names = [d[0].lower() for d in cur.description]
cur.close()
idx = {n: i for i, n in enumerate(names)}
def g(r, name):
v = r[idx[name]] if name in idx else None
return v
recs = []
for r in rows:
recs.append({
"rpid": int(g(r, "report_period_id")) if g(r, "report_period_id") is not None else None,
"date": str(g(r, "entry_date")),
"registered_total": g(r, "registered_total"),
"ios_installs": g(r, "ios_installs"),
"android_installs": g(r, "android_installs"),
"sum_installs": g(r, "sum_installs"),
"mau": g(r, "mau"),
"dau": g(r, "dau"),
})
recs.sort(key=lambda x: x["date"])
log.info("Верхние KPI: дней %d, по %s", len(recs), recs[-1]["date"] if recs else "")
return recs
def build_top(recs):
if not recs:
return None
last = recs[-1]
def last_nonnull(field):
for r in reversed(recs):
if r[field] is not None:
return int(r[field]), r["date"]
return 0, last["date"]
mau_v, _ = last_nonnull("mau")
dau_v, _ = last_nonnull("dau")
# помесячная агрегация: month-end для кумулятивных, среднее для активности
months = {} # rpid -> aggregate
for r in recs:
m = months.setdefault(r["rpid"], {
"last_date": "", "registered_total": 0, "sum_installs": 0,
"mau_sum": 0, "mau_cnt": 0, "dau_sum": 0, "dau_cnt": 0,
})
if r["date"] >= m["last_date"]:
m["last_date"] = r["date"]
if r["registered_total"] is not None:
m["registered_total"] = int(r["registered_total"])
if r["sum_installs"] is not None:
m["sum_installs"] = int(r["sum_installs"])
if r["mau"] is not None:
m["mau_sum"] += int(r["mau"]); m["mau_cnt"] += 1
if r["dau"] is not None:
m["dau_sum"] += int(r["dau"]); m["dau_cnt"] += 1
ordered = sorted(m for m in months if m is not None)
ordered = ordered[-TOP_TREND_MONTHS:]
labels, reg_s, inst_s, mau_s, dau_s = [], [], [], [], []
for rpid in ordered:
a = months[rpid]
y, mo = rpid // 100, rpid % 100
labels.append(f"{_MONTHS_RU_SHORT[mo]} {y % 100:02d}")
reg_s.append(a["registered_total"])
inst_s.append(a["sum_installs"])
mau_s.append(round(a["mau_sum"] / a["mau_cnt"]) if a["mau_cnt"] else 0)
dau_s.append(round(a["dau_sum"] / a["dau_cnt"]) if a["dau_cnt"] else 0)
def _i(v):
return int(v) if v is not None else 0
return {
"as_of": last["date"],
"registered_total": _i(last["registered_total"]),
"installs_total": _i(last["sum_installs"]),
"installs_ios": _i(last["ios_installs"]),
"installs_android": _i(last["android_installs"]),
"mau": mau_v,
"dau": dau_v,
"trend_labels": labels,
"registered_series": reg_s,
"installs_series": inst_s,
"mau_series": mau_s,
"dau_series": dau_s,
}
def fetch_monthly(conn, sql):
cur = conn.cursor()
log.info("Выполнение запроса метрик МП (помесячно)...")
cur.execute(sql)
rows = cur.fetchall()
names = [d[0].lower() for d in cur.description]
cur.close()
idx = {n: i for i, n in enumerate(names)}
out = []
for r in rows:
rec = {"report_period_id": int(r[idx["report_period_id"]])}
for key in METRIC_KEYS:
v = r[idx[key]] if key in idx else None
rec[key] = int(v) if v is not None else 0
out.append(rec)
log.info("Получено помесячных строк: %d (периоды: %s)",
len(out), ", ".join(str(x["report_period_id"]) for x in out))
return out
def _cumsum(arr):
acc, out = 0, []
for v in arr:
acc += v
out.append(acc)
return out
def build_payload(rows, b: dict):
cur_year, prev_year, cap_month = b["cur_year"], b["prev_year"], b["cap_month"]
months = list(range(1, cap_month + 1))
month_labels = [_MONTHS_RU_SHORT[m] for m in months]
# key -> {year -> [monthly values по месяцам months]}
monthly = {k: {cur_year: [0] * len(months), prev_year: [0] * len(months)} for k in METRIC_KEYS}
for rec in rows:
rpid = rec["report_period_id"]
y, m = rpid // 100, rpid % 100
if y not in (cur_year, prev_year) or m not in months:
continue
i = months.index(m)
for k in METRIC_KEYS:
monthly[k][y][i] = rec[k]
metrics = []
for key, label in METRICS:
cur_cum = _cumsum(monthly[key][cur_year])
prev_cum = _cumsum(monthly[key][prev_year])
cur_v = cur_cum[-1] if cur_cum else 0
prev_v = prev_cum[-1] if prev_cum else 0
is_new = prev_v == 0
growth = None if is_new else (cur_v - prev_v) / prev_v
metrics.append({
"key": key, "label": label,
"cur": cur_v, "prev": prev_v,
"growth": growth, "is_new": is_new,
"cur_cum": cur_cum, "prev_cum": prev_cum,
})
end_cur = b["end_cur"]
period_label = f"с 1 января по {end_cur.day} {_MONTHS_RU_GEN[end_cur.month]}"
return {
"generated_at": dt.datetime.now().isoformat(timespec="seconds"),
"cur_year": cur_year,
"prev_year": prev_year,
"period_label": period_label,
"range": {"start": f"{b['start_cur']:%Y-%m-%d}", "end": f"{end_cur:%Y-%m-%d}"},
"months": months,
"month_labels": month_labels,
"cumulative": True,
"metrics": metrics,
}
def write_if_changed(payload) -> bool:
text = json.dumps(payload, ensure_ascii=False, indent=2) + "\n"
old = OUT_PATH.read_text(encoding="utf-8") if OUT_PATH.exists() else ""
def strip_ts(s):
try:
d = json.loads(s)
d.pop("generated_at", None)
return json.dumps(d, ensure_ascii=False, sort_keys=True)
except Exception:
return s
if strip_ts(old) == strip_ts(text):
log.info("Метрики МП не изменились — коммит не требуется.")
return False
OUT_PATH.parent.mkdir(parents=True, exist_ok=True)
OUT_PATH.write_text(text, encoding="utf-8")
log.info("app_metrics.json обновлён (%d метрик, %d мес.).",
len(payload["metrics"]), len(payload["months"]))
return True
def main() -> int:
log.info("=" * 60)
log.info("Старт обновления Метрик МП")
cfg = base.load_config()
b = date_bounds()
log.info("Период (тек.): %s..%s | (пред.): %s..%s | месяцев: %d",
b["start_cur"], b["end_cur"], b["start_prev"], b["end_prev"], b["cap_month"])
sql = build_sql(b)
base.patch_thrift_ssl()
conn = base.connect_impala(cfg)
try:
rows = fetch_monthly(conn, sql)
top = build_top(fetch_top(conn))
finally:
try:
conn.close()
except Exception:
pass
cur_year = b["cur_year"]
if not any(r["report_period_id"] // 100 == cur_year for r in rows):
log.error("В ответе нет данных за %s — JSON НЕ перезаписан.", cur_year)
return 1
payload = build_payload(rows, b)
payload["top"] = top
if write_if_changed(payload):
base.git_commit_push(cfg, [OUT_REL],
f"data: update app metrics {dt.date.today():%Y-%m-%d}")
log.info("Готово (Метрики МП).")
return 0
if __name__ == "__main__":
try:
sys.exit(main())
except Exception as e: # noqa: BLE001
log.exception("ОШИБКА (Метрики МП): %s", e)
sys.exit(1)