kpi-dashboard/updater/update_kpi.py
Iliyas ccf82b026f feat: add Метрики МП page + app-metrics updater
- new page app_stats/index.html (login-gated, same style/nav)
- app_stats/app_metrics.json data (year-over-year comparison, NEW badges)
- updater/update_app_metrics.py: adaptive SQL (Jan 1 -> yesterday vs prev year)
- run both updaters from run_update.bat; refactor shared git push
2026-06-16 17:34:19 +05:00

316 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Ежедневное обновление drb_iliyas_kpi_2026.csv из Impala и пуш в Gitea (ветка pages).
Что делает скрипт:
1. Подключается к корпоративному Impala (см. Impala_connection.md).
2. Выполняет SQL-запрос KPI.
3. Перезаписывает ../drb_iliyas_kpi_2026.csv в точно том же формате,
что и исходный файл (разделитель «;», строки в кавычках, CRLF, точка как
десятичный разделитель, сортировка по убыванию периода/даты).
4. Если данные изменились — коммитит и пушит CSV в ветку pages.
Настройки подключения и git-токен берутся из config.yaml (не коммитится).
Запускается раз в сутки через Планировщик задач Windows (см. README.md).
"""
import sys
import io
import ssl
import logging
import subprocess
from pathlib import Path
from datetime import datetime
import yaml
# ─────────────────────────── Пути ───────────────────────────
SCRIPT_DIR = Path(__file__).resolve().parent
REPO_DIR = SCRIPT_DIR.parent
CSV_PATH = REPO_DIR / "drb_iliyas_kpi_2026.csv"
CONFIG_PATH = SCRIPT_DIR / "config.yaml"
LOG_PATH = SCRIPT_DIR / "update.log"
# ──────────────────────── SQL-запрос ────────────────────────
SQL_QUERY = """
select report_period_id, entry_date, abons, registered_total2 as registered_total,
registered_pct2 as registered_pct, mau_daily3 as mau_daily,
mau_per_registered3 as mau_per_registered,
comms_pct as traditional_comms_pct,
comms_pct_prev as prev_yesr_traditional_comms_pct,
traditional_comms_pct as traditional_comms_decrease_pct,
cumulative_digital_rap_total, cumulative_rap_total, fd_rap_pct,
fd_orders, fd_orders_goal, fd_pct as fd_orders_pct,
cum_fd_orders, cum_fd_orders_goal, cum_fd_pct as cum_fd_orders_pct
from drb.drb_iliyas_kpi_2025
where report_period_id > 202600
""".strip()
# ─── Колонки CSV в нужном порядке и их тип для форматирования ───
# str → значение в двойных кавычках
# int → целое без кавычек
# float → число с точкой, полная точность (repr), без кавычек
COLUMNS = [
("report_period_id", "int"),
("entry_date", "str"),
("abons", "int"),
("registered_total", "int"),
("registered_pct", "float"),
("mau_daily", "int"),
("mau_per_registered", "float"),
("traditional_comms_pct", "float"),
("prev_yesr_traditional_comms_pct", "float"),
("traditional_comms_decrease_pct", "float"),
("cumulative_digital_rap_total", "float"),
("cumulative_rap_total", "float"),
("fd_rap_pct", "float"),
("fd_orders", "int"),
("fd_orders_goal", "int"),
("fd_orders_pct", "float"),
("cum_fd_orders", "int"),
("cum_fd_orders_goal", "int"),
("cum_fd_orders_pct", "float"),
]
# ─────────────────────── Логирование ────────────────────────
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)-7s %(message)s",
handlers=[
logging.FileHandler(LOG_PATH, encoding="utf-8"),
logging.StreamHandler(sys.stdout),
],
)
log = logging.getLogger("kpi-updater")
# ═══════════════════════ Конфигурация ════════════════════════
def load_config() -> dict:
if not CONFIG_PATH.exists():
log.error("Не найден %s. Скопируйте config.example.yaml -> config.yaml и заполните.", CONFIG_PATH)
sys.exit(2)
with open(CONFIG_PATH, "r", encoding="utf-8") as f:
return yaml.safe_load(f)
# ═══════════════════════ SSL monkey-patch ════════════════════
def patch_thrift_ssl() -> None:
"""Разрешает устаревшие cipher suites сервера Impala (см. Impala_connection.md)."""
import thrift.transport.TSSLSocket as _mod
_orig = _mod.TSSLSocket.__init__
def _patched(self, *a, **kw):
_orig(self, *a, **kw)
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
ctx.set_ciphers("DEFAULT:@SECLEVEL=0")
self._context = ctx
_mod.TSSLSocket.__init__ = _patched
# ═══════════════════════ Impala ══════════════════════════════
def connect_impala(cfg: dict):
from impala.dbapi import connect
imp = cfg["impala"]
hosts = [imp["host"]]
if imp.get("fallback_host"):
hosts.append(imp["fallback_host"])
last_err = None
for host in hosts:
try:
log.info("Подключение к Impala: %s:%s", host, imp.get("port", 21050))
conn = connect(
host=host,
port=int(imp.get("port", 21050)),
database=imp.get("database", "drb"),
user=imp["user"],
password=imp["password"],
use_ssl=True,
auth_mechanism="PLAIN",
timeout=int(imp.get("timeout", 60)),
)
log.info("Подключение установлено (%s)", host)
return conn
except Exception as e: # noqa: BLE001
last_err = e
log.warning("Не удалось подключиться к %s: %s", host, e)
log.error("Не удалось подключиться ни к одному хосту Impala.")
raise last_err
def fetch_rows(conn):
cur = conn.cursor()
log.info("Выполнение запроса...")
cur.execute(SQL_QUERY)
rows = cur.fetchall()
col_names = [d[0].lower() for d in cur.description]
cur.close()
log.info("Получено строк: %d", len(rows))
# name -> index, чтобы не зависеть от порядка колонок в ответе
idx = {name: i for i, name in enumerate(col_names)}
missing = [c for c, _ in COLUMNS if c.lower() not in idx]
if missing:
raise RuntimeError(f"В ответе Impala нет колонок: {missing}")
return rows, idx
# ═══════════════════════ Форматирование CSV ══════════════════
def _fmt(value, kind: str) -> str:
if value is None:
return "" # пустое поле без кавычек
if kind == "str":
return '"' + str(value).replace('"', '""') + '"'
if kind == "int":
return str(int(value))
if kind == "float":
return repr(float(value)) # полная точность round-trip, как в исходнике
raise ValueError(kind)
def build_csv_text(rows, idx) -> str:
# Сортировка: период по убыванию, затем дата по убыванию (как в исходном файле)
pid_i = idx["report_period_id"]
date_i = idx["entry_date"]
rows_sorted = sorted(
rows,
key=lambda r: (r[pid_i] if r[pid_i] is not None else -1,
str(r[date_i]) if r[date_i] is not None else ""),
reverse=True,
)
buf = io.StringIO()
header = ";".join('"' + name + '"' for name, _ in COLUMNS)
buf.write(header + "\r\n")
for r in rows_sorted:
cells = [_fmt(r[idx[name]], kind) for name, kind in COLUMNS]
buf.write(";".join(cells) + "\r\n")
return buf.getvalue()
def write_csv_if_changed(text: str) -> bool:
old = ""
if CSV_PATH.exists():
with open(CSV_PATH, "r", encoding="utf-8", newline="") as f:
old = f.read()
if old == text:
log.info("Данные не изменились — запись и коммит не требуются.")
return False
with open(CSV_PATH, "w", encoding="utf-8", newline="") as f:
f.write(text)
log.info("CSV перезаписан (%d байт).", len(text.encode("utf-8")))
return True
# ═══════════════════════ Git ═════════════════════════════════
def _run_git(args, check=True, mask=None):
cmd = ["git", "-C", str(REPO_DIR)] + args
printable = " ".join(cmd)
if mask:
printable = printable.replace(mask, "***")
log.info("$ %s", printable)
res = subprocess.run(cmd, capture_output=True, text=True)
out = (res.stdout or "") + (res.stderr or "")
if mask:
out = out.replace(mask, "***")
if out.strip():
log.info(out.strip())
if check and res.returncode != 0:
raise RuntimeError(f"git {' '.join(args)} -> код {res.returncode}")
return res.returncode, out
def push_url(cfg: dict, mask_token: str):
"""Возвращает URL для fetch/push с токеном (если задан), иначе имя remote 'origin'."""
git = cfg.get("git", {}) or {}
token = git.get("token")
rc, out = _run_git(["remote", "get-url", git.get("remote", "origin")], check=True)
url = out.strip().splitlines()[-1].strip()
if token:
user = git.get("username", "git")
# https://host/path -> https://user:token@host/path
if url.startswith("https://"):
rest = url[len("https://"):]
return f"https://{user}:{token}@{rest}"
return url
def git_commit_push(cfg: dict, rel_paths, message: str):
"""Коммитит указанные файлы (пути относительно корня репо) и пушит в ветку.
Переиспользуется и для KPI, и для метрик МП. Если изменений нет — выходит молча.
"""
git = cfg.get("git", {}) or {}
branch = git.get("branch", "pages")
token = (git.get("token") or "")
url = push_url(cfg, token)
_run_git(["add", "--", *rel_paths])
rc, out = _run_git(["status", "--porcelain", "--", *rel_paths], check=True)
if not out.strip():
log.info("Нет изменений (%s) для коммита.", ", ".join(rel_paths))
return
_run_git(["commit", "-m", message])
# Пуш с автоматическим rebase при гонке (веб-приложение тоже пушит ai-cache.json через API)
for attempt in range(1, 4):
_run_git(["fetch", url, branch], check=True, mask=token or None)
# --autostash: не падать, если в дереве есть посторонние незакоммиченные правки
_run_git(["rebase", "--autostash", "FETCH_HEAD"], check=True)
rc, out = _run_git(["push", url, f"HEAD:{branch}"], check=False, mask=token or None)
if rc == 0:
log.info("Пуш выполнен успешно (попытка %d).", attempt)
return
log.warning("Пуш отклонён (попытка %d), повтор после rebase...", attempt)
raise RuntimeError("Не удалось запушить изменения после 3 попыток.")
def commit_and_push(cfg: dict):
# Коммитим только CSV — daily-диффы остаются чистыми.
git_commit_push(cfg, [CSV_PATH.name], f"data: update KPI {datetime.now():%Y-%m-%d}")
# ═══════════════════════ main ════════════════════════════════
def main() -> int:
log.info("=" * 60)
log.info("Старт обновления KPI")
cfg = load_config()
patch_thrift_ssl()
conn = connect_impala(cfg)
try:
rows, idx = fetch_rows(conn)
finally:
try:
conn.close()
except Exception: # noqa: BLE001
pass
if not rows:
log.error("Запрос вернул 0 строк — CSV НЕ перезаписан (защита от пустых данных).")
return 1
text = build_csv_text(rows, idx)
changed = write_csv_if_changed(text)
if changed:
commit_and_push(cfg)
log.info("Готово.")
return 0
if __name__ == "__main__":
try:
sys.exit(main())
except Exception as e: # noqa: BLE001
log.exception("ОШИБКА: %s", e)
sys.exit(1)