diff --git a/.gitignore b/.gitignore index 4a306fe..ec60715 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,9 @@ config.js *.env .env* + +# updater (скрипт обновления CSV из Impala) +updater/config.yaml +updater/venv/ +updater/__pycache__/ +updater/update.log diff --git a/Impala_connection.md b/Impala_connection.md new file mode 100644 index 0000000..4d7890a --- /dev/null +++ b/Impala_connection.md @@ -0,0 +1,181 @@ +# Подключение к Impala из Python + +Инструкция описывает, как подключиться к корпоративному кластеру Impala через Python-библиотеку `impyla`. Используется в проекте `metrics_audit`. + +--- + +## Параметры кластера + +| Параметр | Значение | +|---|---| +| Основной хост | `bdas-worker-08.bdpak.telecom.kz` | +| Резервный хост | `bdas-utility-01.bdpak.telecom.kz` | +| Порт | `21050` | +| База данных | `drb` | +| Аутентификация | PLAIN (логин + пароль) | +| SSL | включён (`use_ssl=True`) | +| Доступ | только из внутренней сети или через VPN | + +> **Username и password** хранятся в `config.yaml` и не коммитятся в git. + +--- + +## Зависимости + +`impyla` работает через протокол Thrift напрямую — Java-драйверы (JDBC `.zip`) **не нужны**, они только для DBeaver/Tableau. + +``` +# requirements.txt +impyla==0.19.0 +thrift==0.16.0 +thrift-sasl==0.4.3 +pure-sasl>=0.6.2 # вместо sasl — не требует C++ компилятора на Windows +``` + +> **Важно:** пакет `sasl==0.3.1` на Windows не устанавливается без Microsoft C++ Build Tools. +> Используйте `pure-sasl` — он устанавливается автоматически как зависимость `thrift-sasl`. + +Установка: +```bat +python -m venv venv +venv\Scripts\activate +pip install impyla==0.19.0 thrift==0.16.0 thrift-sasl==0.4.3 pandas +``` + +--- + +## Проблема с SSL и её решение + +При подключении с `use_ssl=True` возникает ошибка: + +``` +ssl.SSLError: [SSL: SSLV3_ALERT_HANDSHAKE_FAILURE] sslv3 alert handshake failure +``` + +**Причина:** сервер Impala использует устаревшие cipher suites, которые Python 3.10+ отклоняет по умолчанию (security level 2). + +**Решение:** monkey-patch SSL-контекста библиотеки `thrift` перед подключением: + +```python +import ssl +import thrift.transport.TSSLSocket as _mod + +def _patch_thrift_ssl(): + _orig = _mod.TSSLSocket.__init__ + + def _patched(self, *a, **kw): + _orig(self, *a, **kw) + ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) + ctx.check_hostname = False + ctx.verify_mode = ssl.CERT_NONE + ctx.set_ciphers("DEFAULT:@SECLEVEL=0") + self._context = ctx + + _mod.TSSLSocket.__init__ = _patched + +# Вызвать ДО impyla_connect: +_patch_thrift_ssl() +``` + +Это нужно вызвать один раз перед первым подключением. Патч снижает проверку сертификата и разрешает старые шифры — приемлемо для внутренней сети за VPN. + +--- + +## Подключение + +```python +from impala.dbapi import connect + +conn = connect( + host="bdas-worker-08.bdpak.telecom.kz", + port=21050, + database="drb", + user="", # из config.yaml + password="", # из config.yaml + use_ssl=True, + auth_mechanism="PLAIN", # AuthMech=3 в JDBC-терминологии + timeout=60, +) +``` + +С резервным хостом — оберните в try/except и переключитесь на `bdas-utility-01.bdpak.telecom.kz` при ошибке. + +--- + +## Выполнение запроса + +```python +cursor = conn.cursor() +cursor.execute("SELECT event_type, entry_date, count(*) AS cnt FROM drb.drb_iliyas_amplitude_metrics_full GROUP BY 1, 2") + +import pandas as pd +rows = cursor.fetchall() +columns = [desc[0] for desc in cursor.description] +df = pd.DataFrame(rows, columns=columns) +``` + +--- + +## Схема таблицы метрик + +**`drb.drb_iliyas_amplitude_metrics_full`** + +| Колонка | Тип | Описание | +|---|---|---| +| `metrics` | string | Платформа: `TelecomKz` или `Aitu` | +| `event_type` | string | Название метрики (событие Amplitude) | +| `entry_date` | string | Дата события (`YYYY-MM-DD`) | +| `event_time` | string | Полная метка времени | +| `event_properties` | string | JSON с параметрами события | +| `platform` | string | `android` / `iOS` / `Web` | +| ... | | прочие поля пользователя и устройства | + +> **Внимание:** колонка `metrics` содержит **платформу** (`TelecomKz`/`Aitu`), а не название метрики. +> Название метрики — в колонке `event_type`. + +**`external_sources.amplitude_loyalty_program_logs`** + +| Колонка | Тип | Описание | +|---|---|---| +| `event_type` | string | Название метрики | +| `event_time` | string | Полная метка времени (дата берётся через `left(event_time, 10)`) | +| ... | | прочие поля | + +--- + +## Агрегирующий запрос проекта + +```sql +-- TelecomKz (все метрики) + Aitu (только whitelist) +SELECT metrics AS platform, event_type AS metrics, entry_date, count(*) AS cnt +FROM drb.drb_iliyas_amplitude_metrics_full +WHERE event_type IS NOT NULL AND event_type != '' + AND ( + metrics = 'TelecomKz' + OR (metrics = 'Aitu' AND event_type IN ('miniapp_opened', 'main_tab_selected', ...)) + ) +GROUP BY 1, 2, 3 + +UNION ALL + +-- Loyalty (без временных акционных метрик) +SELECT 'Loyalty' AS platform, event_type AS metrics, + left(event_time, 10) AS entry_date, count(*) AS cnt +FROM external_sources.amplitude_loyalty_program_logs +WHERE event_type IS NOT NULL AND event_type != '' + AND event_type NOT LIKE 'detail_promo_%' + AND event_type NOT LIKE 'company_promo_%' +GROUP BY 1, 2, 3 +``` + +--- + +## Типичные ошибки + +| Ошибка | Причина | Решение | +|---|---|---| +| `SSLV3_ALERT_HANDSHAKE_FAILURE` | Старые cipher suites на сервере | Применить `_patch_thrift_ssl()` (см. выше) | +| `ConnectionRefusedError` / `TSocket read 0 bytes` | VPN не подключён | Подключить VPN и повторить | +| `AuthenticationError` | Неверный логин/пароль | Проверить `config.yaml` | +| `sasl` не устанавливается | Нет C++ Build Tools | Использовать `pure-sasl` (устанавливается автоматически) | +| `ssl.PROTOCOL_TLS is deprecated` | Предупреждение thrift 0.16 | Некритично, патч перезаписывает контекст после | diff --git a/updater/README.md b/updater/README.md new file mode 100644 index 0000000..070f70f --- /dev/null +++ b/updater/README.md @@ -0,0 +1,116 @@ +# KPI Updater — ежедневное обновление CSV из Impala + +Скрипт запускается на рабочем компьютере с доступом к корпоративной сети (VPN): +выгружает свежие KPI из Impala, перезаписывает `../drb_iliyas_kpi_2026.csv` +и пушит его в ветку `pages`. Сайт ([Gitea Pages]) при открытии делает +`fetch('drb_iliyas_kpi_2026.csv')`, поэтому из внешней сети видны свежие данные. + +``` +updater/ +├─ update_kpi.py # основной скрипт +├─ requirements.txt # зависимости (impyla и пр.) +├─ config.example.yaml # шаблон конфига -> скопировать в config.yaml +├─ run_update.bat # обёртка для Планировщика задач +└─ update.log # лог (создаётся при запуске, в git не коммитится) +``` + +--- + +## 1. Установка (один раз) + +> ⚠️ **Версия Python.** `impyla`/`thrift`/`thrift-sasl` стабильно ставятся на +> **Python 3.10–3.12**. На системном Python 3.14 колёс может не быть и сборка +> упадёт. Установите отдельный Python 3.12 и создавайте venv именно им. + +В папке `updater/`: + +```bat +REM создать виртуальное окружение (укажите путь к python 3.12, если нужно) +python -m venv venv +venv\Scripts\activate + +pip install -r requirements.txt +``` + +Создать конфиг и заполнить логин/пароль Impala + git-токен: + +```bat +copy config.example.yaml config.yaml +notepad config.yaml +``` + +`config.yaml` уже в `.gitignore` — он **не попадёт в git**. + +--- + +## 2. Аутентификация для пуша + +Пуш по расписанию должен идти без ручного ввода пароля. Два варианта: + +**A. Токен (рекомендуется).** +В Gitea: *Settings → Applications → Generate New Token*, право +`write:repository`. Вставьте его в `config.yaml` → `git.token`. +Скрипт использует токен только в команде пуша, в git-конфиг он не сохраняется, +а в логах маскируется. + +**B. Git Credential Manager.** +Оставьте `git.token` пустым и один раз выполните вручную +`git push origin pages`, сохранив логин/пароль в системном хранилище Windows. + +--- + +## 3. Проверка вручную + +Подключитесь к VPN, затем: + +```bat +cd /d "путь\к\kpi-dashboard\updater" +run_update.bat +``` + +Ожидаемо: в `update.log` появятся строки подключения, число строк, и при +изменении данных — коммит и `Пуш выполнен успешно`. Если данные не менялись — +`Данные не изменились`, коммита не будет. + +--- + +## 4. Расписание (раз в сутки) + +Создать задачу в Планировщике Windows (запуск каждый день в 06:30). +Выполнить **в PowerShell от администратора**, поправив путь: + +```powershell +$bat = "C:\Users\1\Desktop\QC\Vibecode\Project 1\kpi-dashboard\updater\run_update.bat" +schtasks /Create /TN "KPI Dashboard Update" /TR "`"$bat`"" /SC DAILY /ST 06:30 /RL LIMITED /F +``` + +Проверить / запустить вручную / удалить: + +```powershell +schtasks /Query /TN "KPI Dashboard Update" /V /FO LIST +schtasks /Run /TN "KPI Dashboard Update" +schtasks /Delete /TN "KPI Dashboard Update" /F +``` + +> Задача выполнится, только когда компьютер включён и (для успешной выгрузки) +> поднят VPN. Если в момент запуска VPN не подключён, скрипт залогирует ошибку +> подключения и завершится с ненулевым кодом — данные останутся прежними. + +--- + +## Логика скрипта (кратко) + +1. Читает `config.yaml`. +2. Применяет SSL monkey-patch (старые cipher suites сервера, см. `../Impala_connection.md`). +3. Подключается к `host`, при ошибке — к `fallback_host`. +4. Выполняет SQL (зашит в `update_kpi.py`, константа `SQL_QUERY`). +5. Формирует CSV **в точном формате исходника**: разделитель `;`, строки в + кавычках, точка как десятичный разделитель, CRLF, сортировка по убыванию + `report_period_id`, затем `entry_date`. +6. Если содержимое не изменилось — выходит без коммита. +7. Иначе: `git add` только CSV → commit → `fetch + rebase + push` в `pages` + (с авто-rebase и повтором, т.к. веб-приложение тоже пушит `ai-cache.json`). + +Пустой результат запроса (0 строк) НЕ перезаписывает CSV — защита от стирания данных. + +[Gitea Pages]: https://git.vibe42.kz/kyrykbaev/kpi-dashboard/src/branch/pages diff --git a/updater/config.example.yaml b/updater/config.example.yaml new file mode 100644 index 0000000..c3c3f97 --- /dev/null +++ b/updater/config.example.yaml @@ -0,0 +1,21 @@ +# Скопируйте этот файл в config.yaml и заполните реальными значениями. +# config.yaml в git НЕ коммитится (см. .gitignore). + +impala: + host: bdas-worker-08.bdpak.telecom.kz # основной хост + fallback_host: bdas-utility-01.bdpak.telecom.kz # резервный хост (можно убрать) + port: 21050 + database: drb + user: "ВАШ_ЛОГИН" + password: "ВАШ_ПАРОЛЬ" + timeout: 60 + +git: + remote: origin + branch: pages + # Personal Access Token из Gitea (Settings -> Applications -> Generate Token, + # право write:repository). Нужен для НЕинтерактивного пуша по расписанию. + # Если оставить пустым, пуш пойдёт через системный Git Credential Manager + # (нужно один раз вручную выполнить git push и сохранить логин/пароль). + username: "kyrykbaev" + token: "" diff --git a/updater/requirements.txt b/updater/requirements.txt new file mode 100644 index 0000000..07b974c --- /dev/null +++ b/updater/requirements.txt @@ -0,0 +1,5 @@ +impyla==0.19.0 +thrift==0.16.0 +thrift-sasl==0.4.3 +pure-sasl>=0.6.2 +PyYAML>=6.0 diff --git a/updater/run_update.bat b/updater/run_update.bat new file mode 100644 index 0000000..2ac40f1 --- /dev/null +++ b/updater/run_update.bat @@ -0,0 +1,20 @@ +@echo off +REM ───────────────────────────────────────────────────────────── +REM Запуск ежедневного обновления KPI. +REM Этот .bat вызывается Планировщиком задач Windows раз в сутки. +REM ───────────────────────────────────────────────────────────── +setlocal +cd /d "%~dp0" + +REM Активируем venv, если он есть; иначе используем системный python. +if exist "venv\Scripts\python.exe" ( + set "PYEXE=venv\Scripts\python.exe" +) else ( + set "PYEXE=python" +) + +"%PYEXE%" "%~dp0update_kpi.py" +set "RC=%ERRORLEVEL%" + +echo Exit code: %RC% +exit /b %RC% diff --git a/updater/update_kpi.py b/updater/update_kpi.py new file mode 100644 index 0000000..ab468ed --- /dev/null +++ b/updater/update_kpi.py @@ -0,0 +1,309 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +""" +Ежедневное обновление drb_iliyas_kpi_2026.csv из Impala и пуш в Gitea (ветка pages). + +Что делает скрипт: + 1. Подключается к корпоративному Impala (см. Impala_connection.md). + 2. Выполняет SQL-запрос KPI. + 3. Перезаписывает ../drb_iliyas_kpi_2026.csv в точно том же формате, + что и исходный файл (разделитель «;», строки в кавычках, CRLF, точка как + десятичный разделитель, сортировка по убыванию периода/даты). + 4. Если данные изменились — коммитит и пушит CSV в ветку pages. + +Настройки подключения и git-токен берутся из config.yaml (не коммитится). +Запускается раз в сутки через Планировщик задач Windows (см. README.md). +""" + +import sys +import io +import ssl +import logging +import subprocess +from pathlib import Path +from datetime import datetime + +import yaml + +# ─────────────────────────── Пути ─────────────────────────── +SCRIPT_DIR = Path(__file__).resolve().parent +REPO_DIR = SCRIPT_DIR.parent +CSV_PATH = REPO_DIR / "drb_iliyas_kpi_2026.csv" +CONFIG_PATH = SCRIPT_DIR / "config.yaml" +LOG_PATH = SCRIPT_DIR / "update.log" + +# ──────────────────────── SQL-запрос ──────────────────────── +SQL_QUERY = """ +select report_period_id, entry_date, abons, registered_total2 as registered_total, + registered_pct2 as registered_pct, mau_daily3 as mau_daily, + mau_per_registered3 as mau_per_registered, + comms_pct as traditional_comms_pct, + comms_pct_prev as prev_yesr_traditional_comms_pct, + traditional_comms_pct as traditional_comms_decrease_pct, + cumulative_digital_rap_total, cumulative_rap_total, fd_rap_pct, + fd_orders, fd_orders_goal, fd_pct as fd_orders_pct, + cum_fd_orders, cum_fd_orders_goal, cum_fd_pct as cum_fd_orders_pct +from drb.drb_iliyas_kpi_2025 +where report_period_id > 202600 +""".strip() + +# ─── Колонки CSV в нужном порядке и их тип для форматирования ─── +# str → значение в двойных кавычках +# int → целое без кавычек +# float → число с точкой, полная точность (repr), без кавычек +COLUMNS = [ + ("report_period_id", "int"), + ("entry_date", "str"), + ("abons", "int"), + ("registered_total", "int"), + ("registered_pct", "float"), + ("mau_daily", "int"), + ("mau_per_registered", "float"), + ("traditional_comms_pct", "float"), + ("prev_yesr_traditional_comms_pct", "float"), + ("traditional_comms_decrease_pct", "float"), + ("cumulative_digital_rap_total", "float"), + ("cumulative_rap_total", "float"), + ("fd_rap_pct", "float"), + ("fd_orders", "int"), + ("fd_orders_goal", "int"), + ("fd_orders_pct", "float"), + ("cum_fd_orders", "int"), + ("cum_fd_orders_goal", "int"), + ("cum_fd_orders_pct", "float"), +] + +# ─────────────────────── Логирование ──────────────────────── +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s %(levelname)-7s %(message)s", + handlers=[ + logging.FileHandler(LOG_PATH, encoding="utf-8"), + logging.StreamHandler(sys.stdout), + ], +) +log = logging.getLogger("kpi-updater") + + +# ═══════════════════════ Конфигурация ════════════════════════ +def load_config() -> dict: + if not CONFIG_PATH.exists(): + log.error("Не найден %s. Скопируйте config.example.yaml -> config.yaml и заполните.", CONFIG_PATH) + sys.exit(2) + with open(CONFIG_PATH, "r", encoding="utf-8") as f: + return yaml.safe_load(f) + + +# ═══════════════════════ SSL monkey-patch ════════════════════ +def patch_thrift_ssl() -> None: + """Разрешает устаревшие cipher suites сервера Impala (см. Impala_connection.md).""" + import thrift.transport.TSSLSocket as _mod + + _orig = _mod.TSSLSocket.__init__ + + def _patched(self, *a, **kw): + _orig(self, *a, **kw) + ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) + ctx.check_hostname = False + ctx.verify_mode = ssl.CERT_NONE + ctx.set_ciphers("DEFAULT:@SECLEVEL=0") + self._context = ctx + + _mod.TSSLSocket.__init__ = _patched + + +# ═══════════════════════ Impala ══════════════════════════════ +def connect_impala(cfg: dict): + from impala.dbapi import connect + + imp = cfg["impala"] + hosts = [imp["host"]] + if imp.get("fallback_host"): + hosts.append(imp["fallback_host"]) + + last_err = None + for host in hosts: + try: + log.info("Подключение к Impala: %s:%s", host, imp.get("port", 21050)) + conn = connect( + host=host, + port=int(imp.get("port", 21050)), + database=imp.get("database", "drb"), + user=imp["user"], + password=imp["password"], + use_ssl=True, + auth_mechanism="PLAIN", + timeout=int(imp.get("timeout", 60)), + ) + log.info("Подключение установлено (%s)", host) + return conn + except Exception as e: # noqa: BLE001 + last_err = e + log.warning("Не удалось подключиться к %s: %s", host, e) + + log.error("Не удалось подключиться ни к одному хосту Impala.") + raise last_err + + +def fetch_rows(conn): + cur = conn.cursor() + log.info("Выполнение запроса...") + cur.execute(SQL_QUERY) + rows = cur.fetchall() + col_names = [d[0].lower() for d in cur.description] + cur.close() + log.info("Получено строк: %d", len(rows)) + # name -> index, чтобы не зависеть от порядка колонок в ответе + idx = {name: i for i, name in enumerate(col_names)} + missing = [c for c, _ in COLUMNS if c.lower() not in idx] + if missing: + raise RuntimeError(f"В ответе Impala нет колонок: {missing}") + return rows, idx + + +# ═══════════════════════ Форматирование CSV ══════════════════ +def _fmt(value, kind: str) -> str: + if value is None: + return "" # пустое поле без кавычек + if kind == "str": + return '"' + str(value).replace('"', '""') + '"' + if kind == "int": + return str(int(value)) + if kind == "float": + return repr(float(value)) # полная точность round-trip, как в исходнике + raise ValueError(kind) + + +def build_csv_text(rows, idx) -> str: + # Сортировка: период по убыванию, затем дата по убыванию (как в исходном файле) + pid_i = idx["report_period_id"] + date_i = idx["entry_date"] + rows_sorted = sorted( + rows, + key=lambda r: (r[pid_i] if r[pid_i] is not None else -1, + str(r[date_i]) if r[date_i] is not None else ""), + reverse=True, + ) + + buf = io.StringIO() + header = ";".join('"' + name + '"' for name, _ in COLUMNS) + buf.write(header + "\r\n") + for r in rows_sorted: + cells = [_fmt(r[idx[name]], kind) for name, kind in COLUMNS] + buf.write(";".join(cells) + "\r\n") + return buf.getvalue() + + +def write_csv_if_changed(text: str) -> bool: + old = "" + if CSV_PATH.exists(): + with open(CSV_PATH, "r", encoding="utf-8", newline="") as f: + old = f.read() + if old == text: + log.info("Данные не изменились — запись и коммит не требуются.") + return False + with open(CSV_PATH, "w", encoding="utf-8", newline="") as f: + f.write(text) + log.info("CSV перезаписан (%d байт).", len(text.encode("utf-8"))) + return True + + +# ═══════════════════════ Git ═════════════════════════════════ +def _run_git(args, check=True, mask=None): + cmd = ["git", "-C", str(REPO_DIR)] + args + printable = " ".join(cmd) + if mask: + printable = printable.replace(mask, "***") + log.info("$ %s", printable) + res = subprocess.run(cmd, capture_output=True, text=True) + out = (res.stdout or "") + (res.stderr or "") + if mask: + out = out.replace(mask, "***") + if out.strip(): + log.info(out.strip()) + if check and res.returncode != 0: + raise RuntimeError(f"git {' '.join(args)} -> код {res.returncode}") + return res.returncode, out + + +def push_url(cfg: dict, mask_token: str): + """Возвращает URL для fetch/push с токеном (если задан), иначе имя remote 'origin'.""" + git = cfg.get("git", {}) or {} + token = git.get("token") + rc, out = _run_git(["remote", "get-url", git.get("remote", "origin")], check=True) + url = out.strip().splitlines()[-1].strip() + if token: + user = git.get("username", "git") + # https://host/path -> https://user:token@host/path + if url.startswith("https://"): + rest = url[len("https://"):] + return f"https://{user}:{token}@{rest}" + return url + + +def commit_and_push(cfg: dict): + git = cfg.get("git", {}) or {} + branch = git.get("branch", "pages") + token = (git.get("token") or "") + url = push_url(cfg, token) + + # Коммитим только CSV — daily-диффы остаются чистыми. + _run_git(["add", "--", CSV_PATH.name]) + + rc, out = _run_git(["status", "--porcelain", "--", CSV_PATH.name], check=True) + if not out.strip(): + log.info("Нет изменений CSV для коммита.") + return + + msg = f"data: update KPI {datetime.now():%Y-%m-%d}" + _run_git(["commit", "-m", msg]) + + # Пуш с автоматическим rebase при гонке (веб-приложение тоже пушит ai-cache.json через API) + for attempt in range(1, 4): + # подтянуть свежий remote и переставить наш коммит сверху + _run_git(["fetch", url, branch], check=True, mask=token or None) + # --autostash: не падать, если в дереве есть посторонние незакоммиченные правки + _run_git(["rebase", "--autostash", "FETCH_HEAD"], check=True) + rc, out = _run_git(["push", url, f"HEAD:{branch}"], check=False, mask=token or None) + if rc == 0: + log.info("Пуш выполнен успешно (попытка %d).", attempt) + return + log.warning("Пуш отклонён (попытка %d), повтор после rebase...", attempt) + raise RuntimeError("Не удалось запушить изменения после 3 попыток.") + + +# ═══════════════════════ main ════════════════════════════════ +def main() -> int: + log.info("=" * 60) + log.info("Старт обновления KPI") + cfg = load_config() + + patch_thrift_ssl() + conn = connect_impala(cfg) + try: + rows, idx = fetch_rows(conn) + finally: + try: + conn.close() + except Exception: # noqa: BLE001 + pass + + if not rows: + log.error("Запрос вернул 0 строк — CSV НЕ перезаписан (защита от пустых данных).") + return 1 + + text = build_csv_text(rows, idx) + changed = write_csv_if_changed(text) + + if changed: + commit_and_push(cfg) + log.info("Готово.") + return 0 + + +if __name__ == "__main__": + try: + sys.exit(main()) + except Exception as e: # noqa: BLE001 + log.exception("ОШИБКА: %s", e) + sys.exit(1)