chore: add Impala->CSV daily updater

This commit is contained in:
Iliyas 2026-06-16 16:59:32 +05:00
parent 504a5280e8
commit 26390f6239
7 changed files with 658 additions and 0 deletions

6
.gitignore vendored
View File

@ -2,3 +2,9 @@
config.js config.js
*.env *.env
.env* .env*
# updater (скрипт обновления CSV из Impala)
updater/config.yaml
updater/venv/
updater/__pycache__/
updater/update.log

181
Impala_connection.md Normal file
View File

@ -0,0 +1,181 @@
# Подключение к Impala из Python
Инструкция описывает, как подключиться к корпоративному кластеру Impala через Python-библиотеку `impyla`. Используется в проекте `metrics_audit`.
---
## Параметры кластера
| Параметр | Значение |
|---|---|
| Основной хост | `bdas-worker-08.bdpak.telecom.kz` |
| Резервный хост | `bdas-utility-01.bdpak.telecom.kz` |
| Порт | `21050` |
| База данных | `drb` |
| Аутентификация | PLAIN (логин + пароль) |
| SSL | включён (`use_ssl=True`) |
| Доступ | только из внутренней сети или через VPN |
> **Username и password** хранятся в `config.yaml` и не коммитятся в git.
---
## Зависимости
`impyla` работает через протокол Thrift напрямую — Java-драйверы (JDBC `.zip`) **не нужны**, они только для DBeaver/Tableau.
```
# requirements.txt
impyla==0.19.0
thrift==0.16.0
thrift-sasl==0.4.3
pure-sasl>=0.6.2 # вместо sasl — не требует C++ компилятора на Windows
```
> **Важно:** пакет `sasl==0.3.1` на Windows не устанавливается без Microsoft C++ Build Tools.
> Используйте `pure-sasl` — он устанавливается автоматически как зависимость `thrift-sasl`.
Установка:
```bat
python -m venv venv
venv\Scripts\activate
pip install impyla==0.19.0 thrift==0.16.0 thrift-sasl==0.4.3 pandas
```
---
## Проблема с SSL и её решение
При подключении с `use_ssl=True` возникает ошибка:
```
ssl.SSLError: [SSL: SSLV3_ALERT_HANDSHAKE_FAILURE] sslv3 alert handshake failure
```
**Причина:** сервер Impala использует устаревшие cipher suites, которые Python 3.10+ отклоняет по умолчанию (security level 2).
**Решение:** monkey-patch SSL-контекста библиотеки `thrift` перед подключением:
```python
import ssl
import thrift.transport.TSSLSocket as _mod
def _patch_thrift_ssl():
_orig = _mod.TSSLSocket.__init__
def _patched(self, *a, **kw):
_orig(self, *a, **kw)
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
ctx.set_ciphers("DEFAULT:@SECLEVEL=0")
self._context = ctx
_mod.TSSLSocket.__init__ = _patched
# Вызвать ДО impyla_connect:
_patch_thrift_ssl()
```
Это нужно вызвать один раз перед первым подключением. Патч снижает проверку сертификата и разрешает старые шифры — приемлемо для внутренней сети за VPN.
---
## Подключение
```python
from impala.dbapi import connect
conn = connect(
host="bdas-worker-08.bdpak.telecom.kz",
port=21050,
database="drb",
user="<username>", # из config.yaml
password="<password>", # из config.yaml
use_ssl=True,
auth_mechanism="PLAIN", # AuthMech=3 в JDBC-терминологии
timeout=60,
)
```
С резервным хостом — оберните в try/except и переключитесь на `bdas-utility-01.bdpak.telecom.kz` при ошибке.
---
## Выполнение запроса
```python
cursor = conn.cursor()
cursor.execute("SELECT event_type, entry_date, count(*) AS cnt FROM drb.drb_iliyas_amplitude_metrics_full GROUP BY 1, 2")
import pandas as pd
rows = cursor.fetchall()
columns = [desc[0] for desc in cursor.description]
df = pd.DataFrame(rows, columns=columns)
```
---
## Схема таблицы метрик
**`drb.drb_iliyas_amplitude_metrics_full`**
| Колонка | Тип | Описание |
|---|---|---|
| `metrics` | string | Платформа: `TelecomKz` или `Aitu` |
| `event_type` | string | Название метрики (событие Amplitude) |
| `entry_date` | string | Дата события (`YYYY-MM-DD`) |
| `event_time` | string | Полная метка времени |
| `event_properties` | string | JSON с параметрами события |
| `platform` | string | `android` / `iOS` / `Web` |
| ... | | прочие поля пользователя и устройства |
> **Внимание:** колонка `metrics` содержит **платформу** (`TelecomKz`/`Aitu`), а не название метрики.
> Название метрики — в колонке `event_type`.
**`external_sources.amplitude_loyalty_program_logs`**
| Колонка | Тип | Описание |
|---|---|---|
| `event_type` | string | Название метрики |
| `event_time` | string | Полная метка времени (дата берётся через `left(event_time, 10)`) |
| ... | | прочие поля |
---
## Агрегирующий запрос проекта
```sql
-- TelecomKz (все метрики) + Aitu (только whitelist)
SELECT metrics AS platform, event_type AS metrics, entry_date, count(*) AS cnt
FROM drb.drb_iliyas_amplitude_metrics_full
WHERE event_type IS NOT NULL AND event_type != ''
AND (
metrics = 'TelecomKz'
OR (metrics = 'Aitu' AND event_type IN ('miniapp_opened', 'main_tab_selected', ...))
)
GROUP BY 1, 2, 3
UNION ALL
-- Loyalty (без временных акционных метрик)
SELECT 'Loyalty' AS platform, event_type AS metrics,
left(event_time, 10) AS entry_date, count(*) AS cnt
FROM external_sources.amplitude_loyalty_program_logs
WHERE event_type IS NOT NULL AND event_type != ''
AND event_type NOT LIKE 'detail_promo_%'
AND event_type NOT LIKE 'company_promo_%'
GROUP BY 1, 2, 3
```
---
## Типичные ошибки
| Ошибка | Причина | Решение |
|---|---|---|
| `SSLV3_ALERT_HANDSHAKE_FAILURE` | Старые cipher suites на сервере | Применить `_patch_thrift_ssl()` (см. выше) |
| `ConnectionRefusedError` / `TSocket read 0 bytes` | VPN не подключён | Подключить VPN и повторить |
| `AuthenticationError` | Неверный логин/пароль | Проверить `config.yaml` |
| `sasl` не устанавливается | Нет C++ Build Tools | Использовать `pure-sasl` (устанавливается автоматически) |
| `ssl.PROTOCOL_TLS is deprecated` | Предупреждение thrift 0.16 | Некритично, патч перезаписывает контекст после |

116
updater/README.md Normal file
View File

@ -0,0 +1,116 @@
# KPI Updater — ежедневное обновление CSV из Impala
Скрипт запускается на рабочем компьютере с доступом к корпоративной сети (VPN):
выгружает свежие KPI из Impala, перезаписывает `../drb_iliyas_kpi_2026.csv`
и пушит его в ветку `pages`. Сайт ([Gitea Pages]) при открытии делает
`fetch('drb_iliyas_kpi_2026.csv')`, поэтому из внешней сети видны свежие данные.
```
updater/
├─ update_kpi.py # основной скрипт
├─ requirements.txt # зависимости (impyla и пр.)
├─ config.example.yaml # шаблон конфига -> скопировать в config.yaml
├─ run_update.bat # обёртка для Планировщика задач
└─ update.log # лог (создаётся при запуске, в git не коммитится)
```
---
## 1. Установка (один раз)
> ⚠️ **Версия Python.** `impyla`/`thrift`/`thrift-sasl` стабильно ставятся на
> **Python 3.103.12**. На системном Python 3.14 колёс может не быть и сборка
> упадёт. Установите отдельный Python 3.12 и создавайте venv именно им.
В папке `updater/`:
```bat
REM создать виртуальное окружение (укажите путь к python 3.12, если нужно)
python -m venv venv
venv\Scripts\activate
pip install -r requirements.txt
```
Создать конфиг и заполнить логин/пароль Impala + git-токен:
```bat
copy config.example.yaml config.yaml
notepad config.yaml
```
`config.yaml` уже в `.gitignore` — он **не попадёт в git**.
---
## 2. Аутентификация для пуша
Пуш по расписанию должен идти без ручного ввода пароля. Два варианта:
**A. Токен (рекомендуется).**
В Gitea: *Settings → Applications → Generate New Token*, право
`write:repository`. Вставьте его в `config.yaml``git.token`.
Скрипт использует токен только в команде пуша, в git-конфиг он не сохраняется,
а в логах маскируется.
**B. Git Credential Manager.**
Оставьте `git.token` пустым и один раз выполните вручную
`git push origin pages`, сохранив логин/пароль в системном хранилище Windows.
---
## 3. Проверка вручную
Подключитесь к VPN, затем:
```bat
cd /d "путь\к\kpi-dashboard\updater"
run_update.bat
```
Ожидаемо: в `update.log` появятся строки подключения, число строк, и при
изменении данных — коммит и `Пуш выполнен успешно`. Если данные не менялись —
`Данные не изменились`, коммита не будет.
---
## 4. Расписание (раз в сутки)
Создать задачу в Планировщике Windows (запуск каждый день в 06:30).
Выполнить **в PowerShell от администратора**, поправив путь:
```powershell
$bat = "C:\Users\1\Desktop\QC\Vibecode\Project 1\kpi-dashboard\updater\run_update.bat"
schtasks /Create /TN "KPI Dashboard Update" /TR "`"$bat`"" /SC DAILY /ST 06:30 /RL LIMITED /F
```
Проверить / запустить вручную / удалить:
```powershell
schtasks /Query /TN "KPI Dashboard Update" /V /FO LIST
schtasks /Run /TN "KPI Dashboard Update"
schtasks /Delete /TN "KPI Dashboard Update" /F
```
> Задача выполнится, только когда компьютер включён и (для успешной выгрузки)
> поднят VPN. Если в момент запуска VPN не подключён, скрипт залогирует ошибку
> подключения и завершится с ненулевым кодом — данные останутся прежними.
---
## Логика скрипта (кратко)
1. Читает `config.yaml`.
2. Применяет SSL monkey-patch (старые cipher suites сервера, см. `../Impala_connection.md`).
3. Подключается к `host`, при ошибке — к `fallback_host`.
4. Выполняет SQL (зашит в `update_kpi.py`, константа `SQL_QUERY`).
5. Формирует CSV **в точном формате исходника**: разделитель `;`, строки в
кавычках, точка как десятичный разделитель, CRLF, сортировка по убыванию
`report_period_id`, затем `entry_date`.
6. Если содержимое не изменилось — выходит без коммита.
7. Иначе: `git add` только CSV → commit → `fetch + rebase + push` в `pages`
(с авто-rebase и повтором, т.к. веб-приложение тоже пушит `ai-cache.json`).
Пустой результат запроса (0 строк) НЕ перезаписывает CSV — защита от стирания данных.
[Gitea Pages]: https://git.vibe42.kz/kyrykbaev/kpi-dashboard/src/branch/pages

View File

@ -0,0 +1,21 @@
# Скопируйте этот файл в config.yaml и заполните реальными значениями.
# config.yaml в git НЕ коммитится (см. .gitignore).
impala:
host: bdas-worker-08.bdpak.telecom.kz # основной хост
fallback_host: bdas-utility-01.bdpak.telecom.kz # резервный хост (можно убрать)
port: 21050
database: drb
user: "ВАШ_ЛОГИН"
password: "ВАШ_ПАРОЛЬ"
timeout: 60
git:
remote: origin
branch: pages
# Personal Access Token из Gitea (Settings -> Applications -> Generate Token,
# право write:repository). Нужен для НЕинтерактивного пуша по расписанию.
# Если оставить пустым, пуш пойдёт через системный Git Credential Manager
# (нужно один раз вручную выполнить git push и сохранить логин/пароль).
username: "kyrykbaev"
token: ""

5
updater/requirements.txt Normal file
View File

@ -0,0 +1,5 @@
impyla==0.19.0
thrift==0.16.0
thrift-sasl==0.4.3
pure-sasl>=0.6.2
PyYAML>=6.0

20
updater/run_update.bat Normal file
View File

@ -0,0 +1,20 @@
@echo off
REM ─────────────────────────────────────────────────────────────
REM Запуск ежедневного обновления KPI.
REM Этот .bat вызывается Планировщиком задач Windows раз в сутки.
REM ─────────────────────────────────────────────────────────────
setlocal
cd /d "%~dp0"
REM Активируем venv, если он есть; иначе используем системный python.
if exist "venv\Scripts\python.exe" (
set "PYEXE=venv\Scripts\python.exe"
) else (
set "PYEXE=python"
)
"%PYEXE%" "%~dp0update_kpi.py"
set "RC=%ERRORLEVEL%"
echo Exit code: %RC%
exit /b %RC%

309
updater/update_kpi.py Normal file
View File

@ -0,0 +1,309 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Ежедневное обновление drb_iliyas_kpi_2026.csv из Impala и пуш в Gitea (ветка pages).
Что делает скрипт:
1. Подключается к корпоративному Impala (см. Impala_connection.md).
2. Выполняет SQL-запрос KPI.
3. Перезаписывает ../drb_iliyas_kpi_2026.csv в точно том же формате,
что и исходный файл (разделитель «;», строки в кавычках, CRLF, точка как
десятичный разделитель, сортировка по убыванию периода/даты).
4. Если данные изменились коммитит и пушит CSV в ветку pages.
Настройки подключения и git-токен берутся из config.yaml (не коммитится).
Запускается раз в сутки через Планировщик задач Windows (см. README.md).
"""
import sys
import io
import ssl
import logging
import subprocess
from pathlib import Path
from datetime import datetime
import yaml
# ─────────────────────────── Пути ───────────────────────────
SCRIPT_DIR = Path(__file__).resolve().parent
REPO_DIR = SCRIPT_DIR.parent
CSV_PATH = REPO_DIR / "drb_iliyas_kpi_2026.csv"
CONFIG_PATH = SCRIPT_DIR / "config.yaml"
LOG_PATH = SCRIPT_DIR / "update.log"
# ──────────────────────── SQL-запрос ────────────────────────
SQL_QUERY = """
select report_period_id, entry_date, abons, registered_total2 as registered_total,
registered_pct2 as registered_pct, mau_daily3 as mau_daily,
mau_per_registered3 as mau_per_registered,
comms_pct as traditional_comms_pct,
comms_pct_prev as prev_yesr_traditional_comms_pct,
traditional_comms_pct as traditional_comms_decrease_pct,
cumulative_digital_rap_total, cumulative_rap_total, fd_rap_pct,
fd_orders, fd_orders_goal, fd_pct as fd_orders_pct,
cum_fd_orders, cum_fd_orders_goal, cum_fd_pct as cum_fd_orders_pct
from drb.drb_iliyas_kpi_2025
where report_period_id > 202600
""".strip()
# ─── Колонки CSV в нужном порядке и их тип для форматирования ───
# str → значение в двойных кавычках
# int → целое без кавычек
# float → число с точкой, полная точность (repr), без кавычек
COLUMNS = [
("report_period_id", "int"),
("entry_date", "str"),
("abons", "int"),
("registered_total", "int"),
("registered_pct", "float"),
("mau_daily", "int"),
("mau_per_registered", "float"),
("traditional_comms_pct", "float"),
("prev_yesr_traditional_comms_pct", "float"),
("traditional_comms_decrease_pct", "float"),
("cumulative_digital_rap_total", "float"),
("cumulative_rap_total", "float"),
("fd_rap_pct", "float"),
("fd_orders", "int"),
("fd_orders_goal", "int"),
("fd_orders_pct", "float"),
("cum_fd_orders", "int"),
("cum_fd_orders_goal", "int"),
("cum_fd_orders_pct", "float"),
]
# ─────────────────────── Логирование ────────────────────────
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)-7s %(message)s",
handlers=[
logging.FileHandler(LOG_PATH, encoding="utf-8"),
logging.StreamHandler(sys.stdout),
],
)
log = logging.getLogger("kpi-updater")
# ═══════════════════════ Конфигурация ════════════════════════
def load_config() -> dict:
if not CONFIG_PATH.exists():
log.error("Не найден %s. Скопируйте config.example.yaml -> config.yaml и заполните.", CONFIG_PATH)
sys.exit(2)
with open(CONFIG_PATH, "r", encoding="utf-8") as f:
return yaml.safe_load(f)
# ═══════════════════════ SSL monkey-patch ════════════════════
def patch_thrift_ssl() -> None:
"""Разрешает устаревшие cipher suites сервера Impala (см. Impala_connection.md)."""
import thrift.transport.TSSLSocket as _mod
_orig = _mod.TSSLSocket.__init__
def _patched(self, *a, **kw):
_orig(self, *a, **kw)
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
ctx.set_ciphers("DEFAULT:@SECLEVEL=0")
self._context = ctx
_mod.TSSLSocket.__init__ = _patched
# ═══════════════════════ Impala ══════════════════════════════
def connect_impala(cfg: dict):
from impala.dbapi import connect
imp = cfg["impala"]
hosts = [imp["host"]]
if imp.get("fallback_host"):
hosts.append(imp["fallback_host"])
last_err = None
for host in hosts:
try:
log.info("Подключение к Impala: %s:%s", host, imp.get("port", 21050))
conn = connect(
host=host,
port=int(imp.get("port", 21050)),
database=imp.get("database", "drb"),
user=imp["user"],
password=imp["password"],
use_ssl=True,
auth_mechanism="PLAIN",
timeout=int(imp.get("timeout", 60)),
)
log.info("Подключение установлено (%s)", host)
return conn
except Exception as e: # noqa: BLE001
last_err = e
log.warning("Не удалось подключиться к %s: %s", host, e)
log.error("Не удалось подключиться ни к одному хосту Impala.")
raise last_err
def fetch_rows(conn):
cur = conn.cursor()
log.info("Выполнение запроса...")
cur.execute(SQL_QUERY)
rows = cur.fetchall()
col_names = [d[0].lower() for d in cur.description]
cur.close()
log.info("Получено строк: %d", len(rows))
# name -> index, чтобы не зависеть от порядка колонок в ответе
idx = {name: i for i, name in enumerate(col_names)}
missing = [c for c, _ in COLUMNS if c.lower() not in idx]
if missing:
raise RuntimeError(f"В ответе Impala нет колонок: {missing}")
return rows, idx
# ═══════════════════════ Форматирование CSV ══════════════════
def _fmt(value, kind: str) -> str:
if value is None:
return "" # пустое поле без кавычек
if kind == "str":
return '"' + str(value).replace('"', '""') + '"'
if kind == "int":
return str(int(value))
if kind == "float":
return repr(float(value)) # полная точность round-trip, как в исходнике
raise ValueError(kind)
def build_csv_text(rows, idx) -> str:
# Сортировка: период по убыванию, затем дата по убыванию (как в исходном файле)
pid_i = idx["report_period_id"]
date_i = idx["entry_date"]
rows_sorted = sorted(
rows,
key=lambda r: (r[pid_i] if r[pid_i] is not None else -1,
str(r[date_i]) if r[date_i] is not None else ""),
reverse=True,
)
buf = io.StringIO()
header = ";".join('"' + name + '"' for name, _ in COLUMNS)
buf.write(header + "\r\n")
for r in rows_sorted:
cells = [_fmt(r[idx[name]], kind) for name, kind in COLUMNS]
buf.write(";".join(cells) + "\r\n")
return buf.getvalue()
def write_csv_if_changed(text: str) -> bool:
old = ""
if CSV_PATH.exists():
with open(CSV_PATH, "r", encoding="utf-8", newline="") as f:
old = f.read()
if old == text:
log.info("Данные не изменились — запись и коммит не требуются.")
return False
with open(CSV_PATH, "w", encoding="utf-8", newline="") as f:
f.write(text)
log.info("CSV перезаписан (%d байт).", len(text.encode("utf-8")))
return True
# ═══════════════════════ Git ═════════════════════════════════
def _run_git(args, check=True, mask=None):
cmd = ["git", "-C", str(REPO_DIR)] + args
printable = " ".join(cmd)
if mask:
printable = printable.replace(mask, "***")
log.info("$ %s", printable)
res = subprocess.run(cmd, capture_output=True, text=True)
out = (res.stdout or "") + (res.stderr or "")
if mask:
out = out.replace(mask, "***")
if out.strip():
log.info(out.strip())
if check and res.returncode != 0:
raise RuntimeError(f"git {' '.join(args)} -> код {res.returncode}")
return res.returncode, out
def push_url(cfg: dict, mask_token: str):
"""Возвращает URL для fetch/push с токеном (если задан), иначе имя remote 'origin'."""
git = cfg.get("git", {}) or {}
token = git.get("token")
rc, out = _run_git(["remote", "get-url", git.get("remote", "origin")], check=True)
url = out.strip().splitlines()[-1].strip()
if token:
user = git.get("username", "git")
# https://host/path -> https://user:token@host/path
if url.startswith("https://"):
rest = url[len("https://"):]
return f"https://{user}:{token}@{rest}"
return url
def commit_and_push(cfg: dict):
git = cfg.get("git", {}) or {}
branch = git.get("branch", "pages")
token = (git.get("token") or "")
url = push_url(cfg, token)
# Коммитим только CSV — daily-диффы остаются чистыми.
_run_git(["add", "--", CSV_PATH.name])
rc, out = _run_git(["status", "--porcelain", "--", CSV_PATH.name], check=True)
if not out.strip():
log.info("Нет изменений CSV для коммита.")
return
msg = f"data: update KPI {datetime.now():%Y-%m-%d}"
_run_git(["commit", "-m", msg])
# Пуш с автоматическим rebase при гонке (веб-приложение тоже пушит ai-cache.json через API)
for attempt in range(1, 4):
# подтянуть свежий remote и переставить наш коммит сверху
_run_git(["fetch", url, branch], check=True, mask=token or None)
# --autostash: не падать, если в дереве есть посторонние незакоммиченные правки
_run_git(["rebase", "--autostash", "FETCH_HEAD"], check=True)
rc, out = _run_git(["push", url, f"HEAD:{branch}"], check=False, mask=token or None)
if rc == 0:
log.info("Пуш выполнен успешно (попытка %d).", attempt)
return
log.warning("Пуш отклонён (попытка %d), повтор после rebase...", attempt)
raise RuntimeError("Не удалось запушить изменения после 3 попыток.")
# ═══════════════════════ main ════════════════════════════════
def main() -> int:
log.info("=" * 60)
log.info("Старт обновления KPI")
cfg = load_config()
patch_thrift_ssl()
conn = connect_impala(cfg)
try:
rows, idx = fetch_rows(conn)
finally:
try:
conn.close()
except Exception: # noqa: BLE001
pass
if not rows:
log.error("Запрос вернул 0 строк — CSV НЕ перезаписан (защита от пустых данных).")
return 1
text = build_csv_text(rows, idx)
changed = write_csv_if_changed(text)
if changed:
commit_and_push(cfg)
log.info("Готово.")
return 0
if __name__ == "__main__":
try:
sys.exit(main())
except Exception as e: # noqa: BLE001
log.exception("ОШИБКА: %s", e)
sys.exit(1)