commit 7c9487e0f9c48e82b785cbea97a581be39ae547a Author: Радик Date: Sun May 31 14:06:21 2026 +0300 Initial commit: gp-mcp server MCP stdio server for Greenplum 6.x query plan evaluation: - explain_sql / explain_dbt_model tools - read-only session enforcement + statement_timeout - dbt compile integration - all settings via env vars (no hardcoded defaults) diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..24b6c16 --- /dev/null +++ b/.env.example @@ -0,0 +1,26 @@ +# Greenplum connection (required) +GP_HOST= +GP_PORT= +GP_USER= +GP_PASSWORD= +GP_DATABASE= + +# Greenplum schema search_path (optional, comma-separated) +GP_SCHEMA= + +# dbt project (required for explain_dbt_model) +DBT_PROJECT_DIR= +DBT_PROFILES_DIR= +DBT_TARGET= + +# Path to dbt executable (optional, defaults to "dbt" on PATH) +DBT_EXECUTABLE= + +# Statement timeout in milliseconds. +# STATEMENT_TIMEOUT_MS = default applied to every EXPLAIN ANALYZE. +# MAX_STATEMENT_TIMEOUT_MS = upper bound; per-call override cannot exceed this. +STATEMENT_TIMEOUT_MS= +MAX_STATEMENT_TIMEOUT_MS= + +# Logging: DEBUG, INFO, WARNING, ERROR +LOG_LEVEL= diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..909c0c8 --- /dev/null +++ b/.gitignore @@ -0,0 +1,11 @@ +.env +.venv/ +venv/ +__pycache__/ +*.pyc +*.pyo +*.egg-info/ +.pytest_cache/ +.mypy_cache/ +.ruff_cache/ +.DS_Store diff --git a/README.md b/README.md new file mode 100644 index 0000000..13bbdee --- /dev/null +++ b/README.md @@ -0,0 +1,301 @@ +# gp-mcp + +MCP-сервер для оценки плана запросов dbt-моделей в Greenplum 6.x. + +Запускается локально по `stdio` рядом с AI-агентом, который рефакторит легаси PL/SQL +в dbt-модели. Сервер: + +1. компилирует выбранную dbt-модель (`dbt compile --select `); +2. подключается к Greenplum под read-only пользователем + (`SET default_transaction_read_only = on`, `statement_timeout`); +3. выполняет `EXPLAIN (ANALYZE, VERBOSE, FORMAT JSON)`; +4. возвращает JSON-план + краткую сводку с GP-метриками (motion-узлы, + самый медленный узел, ошибка оценки строк). + +## Tools + +| Tool | Параметры | Что делает | +|------|-----------|------------| +| `explain_sql` | `sql: str`, `statement_timeout_ms?: int` | EXPLAIN ANALYZE для произвольного SQL | +| `explain_dbt_model` | `model_name: str`, `statement_timeout_ms?: int` | `dbt compile` + EXPLAIN ANALYZE для модели | + +Возвращаемый JSON: + +```json +{ + "summary": { + "total_cost": 12345.6, + "plan_rows": 100000, + "actual_rows": 98412, + "execution_time_ms": 842.3, + "planning_time_ms": 12.1, + "slowest_node": { "node_type": "Seq Scan", "actual_total_time_ms": 700.2, "...": "..." }, + "motion_nodes": [{ "node_type": "Redistribute Motion", "...": "..." }], + "rows_misestimation_factor": 1.02 + }, + "plan": [ /* raw EXPLAIN JSON */ ], + "statement_timeout_ms": 300000, + "compiled_sql": "select ...", + "model_name": "fct_orders" +} +``` + +## Установка + +```bash +cd /Users/admin/Projects/vpn +python3 -m venv .venv +source .venv/bin/activate +pip install -r requirements.txt +``` + +## Конфигурация + +Все настройки — через переменные окружения. Скопируй `.env.example` в `.env` +и заполни. + +| Переменная | Обязательная | Назначение | +|------------|:-:|---| +| `GP_HOST` | + | Хост Greenplum master | +| `GP_PORT` | + | Порт | +| `GP_USER` | + | Read-only пользователь (см. ниже) | +| `GP_PASSWORD` | + | Пароль | +| `GP_DATABASE` | + | Имя БД | +| `GP_SCHEMA` | | `search_path`, можно через запятую | +| `DBT_PROJECT_DIR` | + | Каталог dbt-проекта (содержит `dbt_project.yml`) | +| `DBT_PROFILES_DIR` | + | Каталог с `profiles.yml` | +| `DBT_TARGET` | + | Имя target из `profiles.yml` (напр. `dev`) | +| `DBT_EXECUTABLE` | | Путь к `dbt`, по умолчанию `dbt` из PATH | +| `STATEMENT_TIMEOUT_MS` | + | Дефолтный `statement_timeout` для EXPLAIN ANALYZE | +| `MAX_STATEMENT_TIMEOUT_MS` | + | Верхняя граница, агент не сможет превысить | +| `LOG_LEVEL` | | `DEBUG`/`INFO`/`WARNING`/`ERROR`, дефолт `INFO` | + +Если обязательная переменная не задана — сервер не стартует и пишет в stderr +имя недостающей переменной. + +## Read-only роль в Greenplum + +Сервер требует, чтобы доступ был ограничен на уровне БД. Минимум: + +```sql +CREATE ROLE dbt_explain LOGIN PASSWORD '...'; +GRANT CONNECT ON DATABASE TO dbt_explain; +GRANT USAGE ON SCHEMA TO dbt_explain; +GRANT SELECT ON ALL TABLES IN SCHEMA TO dbt_explain; +ALTER DEFAULT PRIVILEGES IN SCHEMA + GRANT SELECT ON TABLES TO dbt_explain; +``` + +Сервер дополнительно ставит сессионный `default_transaction_read_only = on`, +но GRANT-ы — единственная надёжная защита. + +## Запуск + +Локально (для отладки): + +```bash +python -m gp_mcp.server +``` + +Сервер ничего не печатает в stdout (это канал MCP) — все логи идут в stderr. + +## Подключение к клиенту + +Сервер общается по `stdio`, поэтому клиент должен сам его запускать. +Конфиг — стандартный MCP JSON: одинаковая форма для Claude Code и Cursor, +различаются только пути к файлам настроек. + +Общий блок, который пригодится ниже: + +```json +{ + "command": "/Users/admin/Projects/vpn/.venv/bin/python", + "args": ["-m", "gp_mcp.server"], + "cwd": "/Users/admin/Projects/vpn/src", + "env": { + "GP_HOST": "gp-master.internal", + "GP_PORT": "5432", + "GP_USER": "dbt_explain", + "GP_PASSWORD": "REPLACE_ME", + "GP_DATABASE": "analytics", + "GP_SCHEMA": "analytics,public", + "DBT_PROJECT_DIR": "/Users/admin/Projects/dbt-analytics", + "DBT_PROFILES_DIR": "/Users/admin/.dbt", + "DBT_TARGET": "dev", + "STATEMENT_TIMEOUT_MS": "300000", + "MAX_STATEMENT_TIMEOUT_MS": "900000", + "LOG_LEVEL": "INFO" + } +} +``` + +Важно: +- `command` — **абсолютный** путь к Python из venv проекта. Клиенты MCP + обычно стартуют без активированного окружения, поэтому полагаться на + `python` из PATH нельзя. +- `cwd` указан на `src/`, чтобы Python нашёл пакет `gp_mcp` без установки + (`pip install -e .` не делаем). +- Секреты держим в `env` соответствующего конфига клиента, **не** в коде + и **не** в репозитории. + +--- + +### Claude Code + +Есть три способа добавить сервер — выбери один. + +**1. Через CLI (быстрее всего)** + +```bash +claude mcp add gp-mcp \ + --scope user \ + --env GP_HOST=gp-master.internal \ + --env GP_PORT=5432 \ + --env GP_USER=dbt_explain \ + --env GP_PASSWORD=REPLACE_ME \ + --env GP_DATABASE=analytics \ + --env DBT_PROJECT_DIR=/Users/admin/Projects/dbt-analytics \ + --env DBT_PROFILES_DIR=/Users/admin/.dbt \ + --env DBT_TARGET=dev \ + --env STATEMENT_TIMEOUT_MS=300000 \ + --env MAX_STATEMENT_TIMEOUT_MS=900000 \ + -- /Users/admin/Projects/vpn/.venv/bin/python -m gp_mcp.server +``` + +Флаг `--scope`: +- `user` — для всех проектов (пишется в `~/.claude.json`); +- `project` — общий для команды, кладётся в `.mcp.json` в корне проекта, + его можно коммитить в git (секреты тогда задают через `${VAR}`-подстановку + из окружения, а не хардкодом); +- `local` — только в текущем проекте, только у тебя. + +**2. Вручную, user-scope: `~/.claude.json`** + +```json +{ + "mcpServers": { + "gp-mcp": { /* см. общий блок выше */ } + } +} +``` + +**3. Вручную, project-scope: `.mcp.json` в корне dbt-репозитория** + +```json +{ + "mcpServers": { + "gp-mcp": { + "command": "/Users/admin/Projects/vpn/.venv/bin/python", + "args": ["-m", "gp_mcp.server"], + "cwd": "/Users/admin/Projects/vpn/src", + "env": { + "GP_HOST": "${GP_HOST}", + "GP_PORT": "${GP_PORT}", + "GP_USER": "${GP_USER}", + "GP_PASSWORD": "${GP_PASSWORD}", + "GP_DATABASE": "${GP_DATABASE}", + "DBT_PROJECT_DIR": "${DBT_PROJECT_DIR}", + "DBT_PROFILES_DIR": "${DBT_PROFILES_DIR}", + "DBT_TARGET": "${DBT_TARGET}", + "STATEMENT_TIMEOUT_MS": "300000", + "MAX_STATEMENT_TIMEOUT_MS": "900000" + } + } + } +} +``` + +**Проверка:** + +```bash +claude mcp list # gp-mcp должен быть в списке +claude mcp get gp-mcp # детали конфига +``` + +В сессии `/mcp` покажет статус подключения и список tool'ов. Если статус +`failed`, посмотри `~/Library/Logs/Claude/` — сервер пишет ошибки запуска +(включая отсутствующие env-переменные) в stderr. + +--- + +### Cursor IDE + +Cursor использует тот же MCP-формат, но свой файл настроек. + +**1. Через UI** + +`Settings` → `Cursor Settings` → `MCP & Integrations` → `New MCP Server` → +откроется `mcp.json` для редактирования. + +**2. Вручную, глобально: `~/.cursor/mcp.json`** + +Доступно во всех проектах. + +```json +{ + "mcpServers": { + "gp-mcp": { + "command": "/Users/admin/Projects/vpn/.venv/bin/python", + "args": ["-m", "gp_mcp.server"], + "cwd": "/Users/admin/Projects/vpn/src", + "env": { + "GP_HOST": "gp-master.internal", + "GP_PORT": "5432", + "GP_USER": "dbt_explain", + "GP_PASSWORD": "REPLACE_ME", + "GP_DATABASE": "analytics", + "DBT_PROJECT_DIR": "/Users/admin/Projects/dbt-analytics", + "DBT_PROFILES_DIR": "/Users/admin/.dbt", + "DBT_TARGET": "dev", + "STATEMENT_TIMEOUT_MS": "300000", + "MAX_STATEMENT_TIMEOUT_MS": "900000" + } + } + } +} +``` + +**3. Вручную, для проекта: `.cursor/mcp.json` в корне dbt-репозитория** + +Видно только в этом проекте. Удобно, когда у разных dbt-проектов разные +`DBT_PROJECT_DIR`/`DBT_TARGET`. + +**Проверка:** + +`Settings` → `MCP & Integrations` — справа от `gp-mcp` должен загореться +зелёный индикатор и появиться список tool'ов (`explain_sql`, +`explain_dbt_model`). В чате tools будут доступны Agent-режиму. + +Если индикатор красный — раскрой сервер в этом же окне, там показывается +stderr запуска (включая `Configuration error: Required environment variable +'...' is not set`). + +--- + +### Общие проблемы при подключении + +| Симптом | Причина | +|---------|---------| +| `Configuration error: Required environment variable 'X' is not set` | Переменная `X` не задана в `env` конфига клиента | +| `ModuleNotFoundError: No module named 'gp_mcp'` | Неверный `cwd` — должен указывать на `src/`, или Python не из venv | +| `ModuleNotFoundError: No module named 'mcp'` | `command` указывает не на Python из venv, где установлены зависимости | +| Сервер стартует, но tools не появляются | Клиент не перезапущен / нет permissions в Cursor для MCP | +| `dbt: command not found` при вызове `explain_dbt_model` | Поставь `DBT_EXECUTABLE=/абсолютный/путь/к/dbt` в `env` | + +## Структура + +``` +vpn/ +├── .env.example +├── .gitignore +├── requirements.txt +├── README.md +└── src/ + └── gp_mcp/ + ├── __init__.py + ├── config.py # загрузка и валидация env + ├── db.py # psycopg2 + read-only + timeout + ├── dbt_runner.py # subprocess dbt compile + чтение compiled SQL + ├── explain.py # EXPLAIN ANALYZE + summary + └── server.py # FastMCP, регистрация tools, stdio +``` diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..73192e3 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,4 @@ +mcp>=1.2.0 +psycopg2-binary>=2.9.9 +python-dotenv>=1.0.1 +PyYAML>=6.0.1 diff --git a/src/gp_mcp/__init__.py b/src/gp_mcp/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/gp_mcp/config.py b/src/gp_mcp/config.py new file mode 100644 index 0000000..43a69c9 --- /dev/null +++ b/src/gp_mcp/config.py @@ -0,0 +1,125 @@ +"""Configuration loaded entirely from environment variables. + +No hard-coded defaults for connection or paths — required variables must be +set explicitly. A missing required variable raises ConfigError at startup with +the offending variable name. +""" + +from __future__ import annotations + +import os +from dataclasses import dataclass +from pathlib import Path + +from dotenv import load_dotenv + + +class ConfigError(RuntimeError): + pass + + +def _require(name: str) -> str: + value = os.environ.get(name) + if value is None or value.strip() == "": + raise ConfigError(f"Required environment variable {name!r} is not set") + return value.strip() + + +def _optional(name: str) -> str | None: + value = os.environ.get(name) + if value is None or value.strip() == "": + return None + return value.strip() + + +def _require_int(name: str) -> int: + raw = _require(name) + try: + return int(raw) + except ValueError as exc: + raise ConfigError(f"Environment variable {name!r} must be an integer, got {raw!r}") from exc + + +def _require_positive_int(name: str) -> int: + value = _require_int(name) + if value <= 0: + raise ConfigError(f"Environment variable {name!r} must be > 0, got {value}") + return value + + +def _require_dir(name: str) -> Path: + raw = _require(name) + path = Path(raw).expanduser() + if not path.is_dir(): + raise ConfigError(f"Environment variable {name!r} points to {raw!r}, which is not a directory") + return path + + +@dataclass(frozen=True) +class GreenplumConfig: + host: str + port: int + user: str + password: str + database: str + schema: str | None + + +@dataclass(frozen=True) +class DbtConfig: + project_dir: Path + profiles_dir: Path + target: str + executable: str + + +@dataclass(frozen=True) +class LimitsConfig: + statement_timeout_ms: int + max_statement_timeout_ms: int + + +@dataclass(frozen=True) +class AppConfig: + gp: GreenplumConfig + dbt: DbtConfig + limits: LimitsConfig + log_level: str + + +def load_config() -> AppConfig: + """Load and validate the entire configuration from environment.""" + + load_dotenv(override=False) + + gp = GreenplumConfig( + host=_require("GP_HOST"), + port=_require_positive_int("GP_PORT"), + user=_require("GP_USER"), + password=_require("GP_PASSWORD"), + database=_require("GP_DATABASE"), + schema=_optional("GP_SCHEMA"), + ) + + dbt = DbtConfig( + project_dir=_require_dir("DBT_PROJECT_DIR"), + profiles_dir=_require_dir("DBT_PROFILES_DIR"), + target=_require("DBT_TARGET"), + executable=_optional("DBT_EXECUTABLE") or "dbt", + ) + + limits = LimitsConfig( + statement_timeout_ms=_require_positive_int("STATEMENT_TIMEOUT_MS"), + max_statement_timeout_ms=_require_positive_int("MAX_STATEMENT_TIMEOUT_MS"), + ) + if limits.statement_timeout_ms > limits.max_statement_timeout_ms: + raise ConfigError( + "STATEMENT_TIMEOUT_MS must be <= MAX_STATEMENT_TIMEOUT_MS " + f"(got {limits.statement_timeout_ms} > {limits.max_statement_timeout_ms})" + ) + + log_level = (_optional("LOG_LEVEL") or "INFO").upper() + if log_level not in {"DEBUG", "INFO", "WARNING", "ERROR"}: + raise ConfigError(f"LOG_LEVEL must be one of DEBUG/INFO/WARNING/ERROR, got {log_level!r}") + + return AppConfig(gp=gp, dbt=dbt, limits=limits, log_level=log_level) diff --git a/src/gp_mcp/db.py b/src/gp_mcp/db.py new file mode 100644 index 0000000..b7a77c7 --- /dev/null +++ b/src/gp_mcp/db.py @@ -0,0 +1,51 @@ +"""Greenplum connections with enforced read-only mode and statement_timeout.""" + +from __future__ import annotations + +from contextlib import contextmanager +from typing import Iterator + +import psycopg2 +from psycopg2 import sql +from psycopg2.extensions import connection as PgConnection + +from .config import GreenplumConfig + + +def connect(gp: GreenplumConfig, statement_timeout_ms: int) -> PgConnection: + """Open a new connection with read-only and timeout enforced. + + Why session-level (not just transaction-level) read-only: a misbehaving query + that opens its own transaction inside the session still cannot write. + """ + + conn = psycopg2.connect( + host=gp.host, + port=gp.port, + user=gp.user, + password=gp.password, + dbname=gp.database, + application_name="gp-mcp", + ) + conn.autocommit = True + with conn.cursor() as cur: + cur.execute("SET SESSION CHARACTERISTICS AS TRANSACTION READ ONLY") + cur.execute("SET default_transaction_read_only = on") + cur.execute("SET statement_timeout = %s", (statement_timeout_ms,)) + if gp.schema: + schemas = [s.strip() for s in gp.schema.split(",") if s.strip()] + if schemas: + stmt = sql.SQL("SET search_path TO {}").format( + sql.SQL(", ").join(sql.Identifier(s) for s in schemas) + ) + cur.execute(stmt) + return conn + + +@contextmanager +def open_connection(gp: GreenplumConfig, statement_timeout_ms: int) -> Iterator[PgConnection]: + conn = connect(gp, statement_timeout_ms) + try: + yield conn + finally: + conn.close() diff --git a/src/gp_mcp/dbt_runner.py b/src/gp_mcp/dbt_runner.py new file mode 100644 index 0000000..6dc0d0b --- /dev/null +++ b/src/gp_mcp/dbt_runner.py @@ -0,0 +1,96 @@ +"""Run `dbt compile` and read the compiled SQL for a selected model.""" + +from __future__ import annotations + +import re +import subprocess +from pathlib import Path + +import yaml + +from .config import DbtConfig + + +class DbtCompileError(RuntimeError): + pass + + +_MODEL_NAME_RE = re.compile(r"^[A-Za-z_][A-Za-z0-9_]*$") + + +def _validate_model_name(model_name: str) -> str: + """Reject anything that isn't a bare dbt identifier. + + Why: model_name is appended to a `dbt --select` argument and used to locate + a file on disk. Restricting to identifier characters keeps both subprocess + and filesystem lookup safe. + """ + + if not _MODEL_NAME_RE.match(model_name): + raise DbtCompileError( + f"Invalid dbt model name {model_name!r}: must match {_MODEL_NAME_RE.pattern}" + ) + return model_name + + +def _project_name(project_dir: Path) -> str: + project_file = project_dir / "dbt_project.yml" + if not project_file.is_file(): + raise DbtCompileError(f"dbt_project.yml not found in {project_dir}") + with project_file.open("r", encoding="utf-8") as f: + data = yaml.safe_load(f) or {} + name = data.get("name") + if not isinstance(name, str) or not name: + raise DbtCompileError(f"`name` not found in {project_file}") + return name + + +def _find_compiled_sql(project_dir: Path, project_name: str, model_name: str) -> Path: + compiled_root = project_dir / "target" / "compiled" / project_name + if not compiled_root.is_dir(): + raise DbtCompileError(f"Compiled output dir does not exist: {compiled_root}") + matches = list(compiled_root.rglob(f"{model_name}.sql")) + if not matches: + raise DbtCompileError( + f"Compiled SQL for model {model_name!r} not found under {compiled_root}" + ) + if len(matches) > 1: + # Ambiguous (model name reused across paths). Surface the candidates. + rels = ", ".join(str(p.relative_to(project_dir)) for p in matches) + raise DbtCompileError( + f"Multiple compiled files match model {model_name!r}: {rels}" + ) + return matches[0] + + +def compile_model(cfg: DbtConfig, model_name: str) -> str: + """Compile a single dbt model and return the resulting SQL.""" + + model_name = _validate_model_name(model_name) + project_name = _project_name(cfg.project_dir) + + cmd = [ + cfg.executable, + "compile", + "--select", model_name, + "--project-dir", str(cfg.project_dir), + "--profiles-dir", str(cfg.profiles_dir), + "--target", cfg.target, + ] + + result = subprocess.run( + cmd, + cwd=cfg.project_dir, + capture_output=True, + text=True, + check=False, + ) + if result.returncode != 0: + raise DbtCompileError( + f"dbt compile failed (exit {result.returncode}):\n" + f"stdout:\n{result.stdout}\n" + f"stderr:\n{result.stderr}" + ) + + compiled_path = _find_compiled_sql(cfg.project_dir, project_name, model_name) + return compiled_path.read_text(encoding="utf-8") diff --git a/src/gp_mcp/explain.py b/src/gp_mcp/explain.py new file mode 100644 index 0000000..b775303 --- /dev/null +++ b/src/gp_mcp/explain.py @@ -0,0 +1,120 @@ +"""Run EXPLAIN (ANALYZE, VERBOSE, FORMAT JSON) and summarise the GP plan.""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import Any + +from psycopg2.extensions import connection as PgConnection + + +class ExplainError(RuntimeError): + pass + + +@dataclass +class PlanSummary: + total_cost: float | None + plan_rows: float | None + actual_rows: float | None + execution_time_ms: float | None + planning_time_ms: float | None + slowest_node: dict[str, Any] | None + motion_nodes: list[dict[str, Any]] = field(default_factory=list) + rows_misestimation_factor: float | None = None + + def as_dict(self) -> dict[str, Any]: + return { + "total_cost": self.total_cost, + "plan_rows": self.plan_rows, + "actual_rows": self.actual_rows, + "execution_time_ms": self.execution_time_ms, + "planning_time_ms": self.planning_time_ms, + "slowest_node": self.slowest_node, + "motion_nodes": self.motion_nodes, + "rows_misestimation_factor": self.rows_misestimation_factor, + } + + +def explain_analyze_json(conn: PgConnection, sql_text: str) -> list[dict[str, Any]]: + """Run EXPLAIN (ANALYZE, VERBOSE, FORMAT JSON) and return the raw plan list.""" + + if not sql_text or not sql_text.strip(): + raise ExplainError("SQL is empty") + + # ANALYZE actually executes the statement. The session is set + # default_transaction_read_only=on (see db.connect), so writes are rejected + # by the server. statement_timeout caps runaway plans. + wrapped = f"EXPLAIN (ANALYZE, VERBOSE, FORMAT JSON) {sql_text}" + with conn.cursor() as cur: + cur.execute(wrapped) + row = cur.fetchone() + if not row: + raise ExplainError("EXPLAIN returned no rows") + payload = row[0] + if not isinstance(payload, list): + raise ExplainError(f"EXPLAIN returned unexpected payload type: {type(payload).__name__}") + return payload + + +def _walk(node: dict[str, Any]): + yield node + for child in node.get("Plans", []) or []: + yield from _walk(child) + + +def summarise(plan_payload: list[dict[str, Any]]) -> PlanSummary: + """Extract GP-relevant metrics from the JSON plan.""" + + if not plan_payload: + raise ExplainError("Plan payload is empty") + + root = plan_payload[0] + plan = root.get("Plan", {}) + + total_cost = plan.get("Total Cost") + plan_rows = plan.get("Plan Rows") + actual_rows = plan.get("Actual Rows") + execution_time = root.get("Execution Time") + planning_time = root.get("Planning Time") + + slowest: dict[str, Any] | None = None + motions: list[dict[str, Any]] = [] + + for node in _walk(plan): + node_type = node.get("Node Type", "") + if "Motion" in node_type or node_type.startswith("Gather"): + motions.append({ + "node_type": node_type, + "slice": node.get("Slice"), + "senders": node.get("Senders"), + "receivers": node.get("Receivers"), + "actual_rows": node.get("Actual Rows"), + "actual_total_time_ms": node.get("Actual Total Time"), + }) + actual_total = node.get("Actual Total Time") + if actual_total is not None: + if slowest is None or actual_total > slowest.get("actual_total_time_ms", -1): + slowest = { + "node_type": node_type, + "actual_total_time_ms": actual_total, + "actual_rows": node.get("Actual Rows"), + "plan_rows": node.get("Plan Rows"), + "relation": node.get("Relation Name"), + "alias": node.get("Alias"), + } + + misestimation: float | None = None + if plan_rows and actual_rows and plan_rows > 0 and actual_rows > 0: + misestimation = max(plan_rows / actual_rows, actual_rows / plan_rows) + + return PlanSummary( + total_cost=total_cost, + plan_rows=plan_rows, + actual_rows=actual_rows, + execution_time_ms=execution_time, + planning_time_ms=planning_time, + slowest_node=slowest, + motion_nodes=motions, + rows_misestimation_factor=misestimation, + ) diff --git a/src/gp_mcp/server.py b/src/gp_mcp/server.py new file mode 100644 index 0000000..0360378 --- /dev/null +++ b/src/gp_mcp/server.py @@ -0,0 +1,109 @@ +"""MCP stdio server exposing Greenplum EXPLAIN tools for dbt model review.""" + +from __future__ import annotations + +import json +import logging +import sys +from typing import Any + +from mcp.server.fastmcp import FastMCP + +from .config import AppConfig, ConfigError, load_config +from .db import open_connection +from .dbt_runner import DbtCompileError, compile_model +from .explain import ExplainError, explain_analyze_json, summarise + + +logger = logging.getLogger("gp_mcp") + + +def _resolve_timeout(cfg: AppConfig, override_ms: int | None) -> int: + if override_ms is None: + return cfg.limits.statement_timeout_ms + if override_ms <= 0: + raise ValueError("statement_timeout_ms must be > 0") + if override_ms > cfg.limits.max_statement_timeout_ms: + raise ValueError( + f"statement_timeout_ms {override_ms} exceeds MAX_STATEMENT_TIMEOUT_MS " + f"{cfg.limits.max_statement_timeout_ms}" + ) + return override_ms + + +def _explain_payload(cfg: AppConfig, sql_text: str, timeout_ms: int) -> dict[str, Any]: + with open_connection(cfg.gp, timeout_ms) as conn: + plan = explain_analyze_json(conn, sql_text) + summary = summarise(plan) + return { + "summary": summary.as_dict(), + "plan": plan, + "statement_timeout_ms": timeout_ms, + } + + +def build_server(cfg: AppConfig) -> FastMCP: + mcp = FastMCP("gp-mcp") + + @mcp.tool() + def explain_sql(sql: str, statement_timeout_ms: int | None = None) -> str: + """Run EXPLAIN (ANALYZE, VERBOSE, FORMAT JSON) on the given SQL. + + Returns a JSON string with: + - summary: GP-relevant metrics (total_cost, execution_time_ms, + motion_nodes, slowest_node, rows_misestimation_factor) + - plan: raw EXPLAIN JSON + - statement_timeout_ms: actual timeout applied + The session is read-only; writes are rejected by the server. + """ + timeout_ms = _resolve_timeout(cfg, statement_timeout_ms) + payload = _explain_payload(cfg, sql, timeout_ms) + return json.dumps(payload, ensure_ascii=False, default=str) + + @mcp.tool() + def explain_dbt_model(model_name: str, statement_timeout_ms: int | None = None) -> str: + """Compile a dbt model and run EXPLAIN (ANALYZE, FORMAT JSON) on it. + + Steps: + 1. `dbt compile --select ` in the configured project + 2. Read target/compiled//.../.sql + 3. EXPLAIN ANALYZE against Greenplum (read-only session) + + Returns the same JSON shape as explain_sql, plus `compiled_sql`. + """ + timeout_ms = _resolve_timeout(cfg, statement_timeout_ms) + compiled_sql = compile_model(cfg.dbt, model_name) + payload = _explain_payload(cfg, compiled_sql, timeout_ms) + payload["compiled_sql"] = compiled_sql + payload["model_name"] = model_name + return json.dumps(payload, ensure_ascii=False, default=str) + + return mcp + + +def main() -> int: + try: + cfg = load_config() + except ConfigError as exc: + # stderr — stdout is the MCP transport channel. + print(f"Configuration error: {exc}", file=sys.stderr) + return 2 + + logging.basicConfig( + level=cfg.log_level, + stream=sys.stderr, + format="%(asctime)s %(levelname)s %(name)s: %(message)s", + ) + logger.info( + "gp-mcp starting (host=%s db=%s schema=%s timeout_ms=%d max_timeout_ms=%d)", + cfg.gp.host, cfg.gp.database, cfg.gp.schema, + cfg.limits.statement_timeout_ms, cfg.limits.max_statement_timeout_ms, + ) + + mcp = build_server(cfg) + mcp.run(transport="stdio") + return 0 + + +if __name__ == "__main__": + raise SystemExit(main())