Initial commit: gp-mcp server
MCP stdio server for Greenplum 6.x query plan evaluation: - explain_sql / explain_dbt_model tools - read-only session enforcement + statement_timeout - dbt compile integration - all settings via env vars (no hardcoded defaults)
This commit is contained in:
26
.env.example
Normal file
26
.env.example
Normal file
@@ -0,0 +1,26 @@
|
||||
# Greenplum connection (required)
|
||||
GP_HOST=
|
||||
GP_PORT=
|
||||
GP_USER=
|
||||
GP_PASSWORD=
|
||||
GP_DATABASE=
|
||||
|
||||
# Greenplum schema search_path (optional, comma-separated)
|
||||
GP_SCHEMA=
|
||||
|
||||
# dbt project (required for explain_dbt_model)
|
||||
DBT_PROJECT_DIR=
|
||||
DBT_PROFILES_DIR=
|
||||
DBT_TARGET=
|
||||
|
||||
# Path to dbt executable (optional, defaults to "dbt" on PATH)
|
||||
DBT_EXECUTABLE=
|
||||
|
||||
# Statement timeout in milliseconds.
|
||||
# STATEMENT_TIMEOUT_MS = default applied to every EXPLAIN ANALYZE.
|
||||
# MAX_STATEMENT_TIMEOUT_MS = upper bound; per-call override cannot exceed this.
|
||||
STATEMENT_TIMEOUT_MS=
|
||||
MAX_STATEMENT_TIMEOUT_MS=
|
||||
|
||||
# Logging: DEBUG, INFO, WARNING, ERROR
|
||||
LOG_LEVEL=
|
||||
11
.gitignore
vendored
Normal file
11
.gitignore
vendored
Normal file
@@ -0,0 +1,11 @@
|
||||
.env
|
||||
.venv/
|
||||
venv/
|
||||
__pycache__/
|
||||
*.pyc
|
||||
*.pyo
|
||||
*.egg-info/
|
||||
.pytest_cache/
|
||||
.mypy_cache/
|
||||
.ruff_cache/
|
||||
.DS_Store
|
||||
301
README.md
Normal file
301
README.md
Normal file
@@ -0,0 +1,301 @@
|
||||
# gp-mcp
|
||||
|
||||
MCP-сервер для оценки плана запросов dbt-моделей в Greenplum 6.x.
|
||||
|
||||
Запускается локально по `stdio` рядом с AI-агентом, который рефакторит легаси PL/SQL
|
||||
в dbt-модели. Сервер:
|
||||
|
||||
1. компилирует выбранную dbt-модель (`dbt compile --select <model>`);
|
||||
2. подключается к Greenplum под read-only пользователем
|
||||
(`SET default_transaction_read_only = on`, `statement_timeout`);
|
||||
3. выполняет `EXPLAIN (ANALYZE, VERBOSE, FORMAT JSON)`;
|
||||
4. возвращает JSON-план + краткую сводку с GP-метриками (motion-узлы,
|
||||
самый медленный узел, ошибка оценки строк).
|
||||
|
||||
## Tools
|
||||
|
||||
| Tool | Параметры | Что делает |
|
||||
|------|-----------|------------|
|
||||
| `explain_sql` | `sql: str`, `statement_timeout_ms?: int` | EXPLAIN ANALYZE для произвольного SQL |
|
||||
| `explain_dbt_model` | `model_name: str`, `statement_timeout_ms?: int` | `dbt compile` + EXPLAIN ANALYZE для модели |
|
||||
|
||||
Возвращаемый JSON:
|
||||
|
||||
```json
|
||||
{
|
||||
"summary": {
|
||||
"total_cost": 12345.6,
|
||||
"plan_rows": 100000,
|
||||
"actual_rows": 98412,
|
||||
"execution_time_ms": 842.3,
|
||||
"planning_time_ms": 12.1,
|
||||
"slowest_node": { "node_type": "Seq Scan", "actual_total_time_ms": 700.2, "...": "..." },
|
||||
"motion_nodes": [{ "node_type": "Redistribute Motion", "...": "..." }],
|
||||
"rows_misestimation_factor": 1.02
|
||||
},
|
||||
"plan": [ /* raw EXPLAIN JSON */ ],
|
||||
"statement_timeout_ms": 300000,
|
||||
"compiled_sql": "select ...",
|
||||
"model_name": "fct_orders"
|
||||
}
|
||||
```
|
||||
|
||||
## Установка
|
||||
|
||||
```bash
|
||||
cd /Users/admin/Projects/vpn
|
||||
python3 -m venv .venv
|
||||
source .venv/bin/activate
|
||||
pip install -r requirements.txt
|
||||
```
|
||||
|
||||
## Конфигурация
|
||||
|
||||
Все настройки — через переменные окружения. Скопируй `.env.example` в `.env`
|
||||
и заполни.
|
||||
|
||||
| Переменная | Обязательная | Назначение |
|
||||
|------------|:-:|---|
|
||||
| `GP_HOST` | + | Хост Greenplum master |
|
||||
| `GP_PORT` | + | Порт |
|
||||
| `GP_USER` | + | Read-only пользователь (см. ниже) |
|
||||
| `GP_PASSWORD` | + | Пароль |
|
||||
| `GP_DATABASE` | + | Имя БД |
|
||||
| `GP_SCHEMA` | | `search_path`, можно через запятую |
|
||||
| `DBT_PROJECT_DIR` | + | Каталог dbt-проекта (содержит `dbt_project.yml`) |
|
||||
| `DBT_PROFILES_DIR` | + | Каталог с `profiles.yml` |
|
||||
| `DBT_TARGET` | + | Имя target из `profiles.yml` (напр. `dev`) |
|
||||
| `DBT_EXECUTABLE` | | Путь к `dbt`, по умолчанию `dbt` из PATH |
|
||||
| `STATEMENT_TIMEOUT_MS` | + | Дефолтный `statement_timeout` для EXPLAIN ANALYZE |
|
||||
| `MAX_STATEMENT_TIMEOUT_MS` | + | Верхняя граница, агент не сможет превысить |
|
||||
| `LOG_LEVEL` | | `DEBUG`/`INFO`/`WARNING`/`ERROR`, дефолт `INFO` |
|
||||
|
||||
Если обязательная переменная не задана — сервер не стартует и пишет в stderr
|
||||
имя недостающей переменной.
|
||||
|
||||
## Read-only роль в Greenplum
|
||||
|
||||
Сервер требует, чтобы доступ был ограничен на уровне БД. Минимум:
|
||||
|
||||
```sql
|
||||
CREATE ROLE dbt_explain LOGIN PASSWORD '...';
|
||||
GRANT CONNECT ON DATABASE <db> TO dbt_explain;
|
||||
GRANT USAGE ON SCHEMA <schema> TO dbt_explain;
|
||||
GRANT SELECT ON ALL TABLES IN SCHEMA <schema> TO dbt_explain;
|
||||
ALTER DEFAULT PRIVILEGES IN SCHEMA <schema>
|
||||
GRANT SELECT ON TABLES TO dbt_explain;
|
||||
```
|
||||
|
||||
Сервер дополнительно ставит сессионный `default_transaction_read_only = on`,
|
||||
но GRANT-ы — единственная надёжная защита.
|
||||
|
||||
## Запуск
|
||||
|
||||
Локально (для отладки):
|
||||
|
||||
```bash
|
||||
python -m gp_mcp.server
|
||||
```
|
||||
|
||||
Сервер ничего не печатает в stdout (это канал MCP) — все логи идут в stderr.
|
||||
|
||||
## Подключение к клиенту
|
||||
|
||||
Сервер общается по `stdio`, поэтому клиент должен сам его запускать.
|
||||
Конфиг — стандартный MCP JSON: одинаковая форма для Claude Code и Cursor,
|
||||
различаются только пути к файлам настроек.
|
||||
|
||||
Общий блок, который пригодится ниже:
|
||||
|
||||
```json
|
||||
{
|
||||
"command": "/Users/admin/Projects/vpn/.venv/bin/python",
|
||||
"args": ["-m", "gp_mcp.server"],
|
||||
"cwd": "/Users/admin/Projects/vpn/src",
|
||||
"env": {
|
||||
"GP_HOST": "gp-master.internal",
|
||||
"GP_PORT": "5432",
|
||||
"GP_USER": "dbt_explain",
|
||||
"GP_PASSWORD": "REPLACE_ME",
|
||||
"GP_DATABASE": "analytics",
|
||||
"GP_SCHEMA": "analytics,public",
|
||||
"DBT_PROJECT_DIR": "/Users/admin/Projects/dbt-analytics",
|
||||
"DBT_PROFILES_DIR": "/Users/admin/.dbt",
|
||||
"DBT_TARGET": "dev",
|
||||
"STATEMENT_TIMEOUT_MS": "300000",
|
||||
"MAX_STATEMENT_TIMEOUT_MS": "900000",
|
||||
"LOG_LEVEL": "INFO"
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
Важно:
|
||||
- `command` — **абсолютный** путь к Python из venv проекта. Клиенты MCP
|
||||
обычно стартуют без активированного окружения, поэтому полагаться на
|
||||
`python` из PATH нельзя.
|
||||
- `cwd` указан на `src/`, чтобы Python нашёл пакет `gp_mcp` без установки
|
||||
(`pip install -e .` не делаем).
|
||||
- Секреты держим в `env` соответствующего конфига клиента, **не** в коде
|
||||
и **не** в репозитории.
|
||||
|
||||
---
|
||||
|
||||
### Claude Code
|
||||
|
||||
Есть три способа добавить сервер — выбери один.
|
||||
|
||||
**1. Через CLI (быстрее всего)**
|
||||
|
||||
```bash
|
||||
claude mcp add gp-mcp \
|
||||
--scope user \
|
||||
--env GP_HOST=gp-master.internal \
|
||||
--env GP_PORT=5432 \
|
||||
--env GP_USER=dbt_explain \
|
||||
--env GP_PASSWORD=REPLACE_ME \
|
||||
--env GP_DATABASE=analytics \
|
||||
--env DBT_PROJECT_DIR=/Users/admin/Projects/dbt-analytics \
|
||||
--env DBT_PROFILES_DIR=/Users/admin/.dbt \
|
||||
--env DBT_TARGET=dev \
|
||||
--env STATEMENT_TIMEOUT_MS=300000 \
|
||||
--env MAX_STATEMENT_TIMEOUT_MS=900000 \
|
||||
-- /Users/admin/Projects/vpn/.venv/bin/python -m gp_mcp.server
|
||||
```
|
||||
|
||||
Флаг `--scope`:
|
||||
- `user` — для всех проектов (пишется в `~/.claude.json`);
|
||||
- `project` — общий для команды, кладётся в `.mcp.json` в корне проекта,
|
||||
его можно коммитить в git (секреты тогда задают через `${VAR}`-подстановку
|
||||
из окружения, а не хардкодом);
|
||||
- `local` — только в текущем проекте, только у тебя.
|
||||
|
||||
**2. Вручную, user-scope: `~/.claude.json`**
|
||||
|
||||
```json
|
||||
{
|
||||
"mcpServers": {
|
||||
"gp-mcp": { /* см. общий блок выше */ }
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
**3. Вручную, project-scope: `.mcp.json` в корне dbt-репозитория**
|
||||
|
||||
```json
|
||||
{
|
||||
"mcpServers": {
|
||||
"gp-mcp": {
|
||||
"command": "/Users/admin/Projects/vpn/.venv/bin/python",
|
||||
"args": ["-m", "gp_mcp.server"],
|
||||
"cwd": "/Users/admin/Projects/vpn/src",
|
||||
"env": {
|
||||
"GP_HOST": "${GP_HOST}",
|
||||
"GP_PORT": "${GP_PORT}",
|
||||
"GP_USER": "${GP_USER}",
|
||||
"GP_PASSWORD": "${GP_PASSWORD}",
|
||||
"GP_DATABASE": "${GP_DATABASE}",
|
||||
"DBT_PROJECT_DIR": "${DBT_PROJECT_DIR}",
|
||||
"DBT_PROFILES_DIR": "${DBT_PROFILES_DIR}",
|
||||
"DBT_TARGET": "${DBT_TARGET}",
|
||||
"STATEMENT_TIMEOUT_MS": "300000",
|
||||
"MAX_STATEMENT_TIMEOUT_MS": "900000"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
**Проверка:**
|
||||
|
||||
```bash
|
||||
claude mcp list # gp-mcp должен быть в списке
|
||||
claude mcp get gp-mcp # детали конфига
|
||||
```
|
||||
|
||||
В сессии `/mcp` покажет статус подключения и список tool'ов. Если статус
|
||||
`failed`, посмотри `~/Library/Logs/Claude/` — сервер пишет ошибки запуска
|
||||
(включая отсутствующие env-переменные) в stderr.
|
||||
|
||||
---
|
||||
|
||||
### Cursor IDE
|
||||
|
||||
Cursor использует тот же MCP-формат, но свой файл настроек.
|
||||
|
||||
**1. Через UI**
|
||||
|
||||
`Settings` → `Cursor Settings` → `MCP & Integrations` → `New MCP Server` →
|
||||
откроется `mcp.json` для редактирования.
|
||||
|
||||
**2. Вручную, глобально: `~/.cursor/mcp.json`**
|
||||
|
||||
Доступно во всех проектах.
|
||||
|
||||
```json
|
||||
{
|
||||
"mcpServers": {
|
||||
"gp-mcp": {
|
||||
"command": "/Users/admin/Projects/vpn/.venv/bin/python",
|
||||
"args": ["-m", "gp_mcp.server"],
|
||||
"cwd": "/Users/admin/Projects/vpn/src",
|
||||
"env": {
|
||||
"GP_HOST": "gp-master.internal",
|
||||
"GP_PORT": "5432",
|
||||
"GP_USER": "dbt_explain",
|
||||
"GP_PASSWORD": "REPLACE_ME",
|
||||
"GP_DATABASE": "analytics",
|
||||
"DBT_PROJECT_DIR": "/Users/admin/Projects/dbt-analytics",
|
||||
"DBT_PROFILES_DIR": "/Users/admin/.dbt",
|
||||
"DBT_TARGET": "dev",
|
||||
"STATEMENT_TIMEOUT_MS": "300000",
|
||||
"MAX_STATEMENT_TIMEOUT_MS": "900000"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
**3. Вручную, для проекта: `.cursor/mcp.json` в корне dbt-репозитория**
|
||||
|
||||
Видно только в этом проекте. Удобно, когда у разных dbt-проектов разные
|
||||
`DBT_PROJECT_DIR`/`DBT_TARGET`.
|
||||
|
||||
**Проверка:**
|
||||
|
||||
`Settings` → `MCP & Integrations` — справа от `gp-mcp` должен загореться
|
||||
зелёный индикатор и появиться список tool'ов (`explain_sql`,
|
||||
`explain_dbt_model`). В чате tools будут доступны Agent-режиму.
|
||||
|
||||
Если индикатор красный — раскрой сервер в этом же окне, там показывается
|
||||
stderr запуска (включая `Configuration error: Required environment variable
|
||||
'...' is not set`).
|
||||
|
||||
---
|
||||
|
||||
### Общие проблемы при подключении
|
||||
|
||||
| Симптом | Причина |
|
||||
|---------|---------|
|
||||
| `Configuration error: Required environment variable 'X' is not set` | Переменная `X` не задана в `env` конфига клиента |
|
||||
| `ModuleNotFoundError: No module named 'gp_mcp'` | Неверный `cwd` — должен указывать на `src/`, или Python не из venv |
|
||||
| `ModuleNotFoundError: No module named 'mcp'` | `command` указывает не на Python из venv, где установлены зависимости |
|
||||
| Сервер стартует, но tools не появляются | Клиент не перезапущен / нет permissions в Cursor для MCP |
|
||||
| `dbt: command not found` при вызове `explain_dbt_model` | Поставь `DBT_EXECUTABLE=/абсолютный/путь/к/dbt` в `env` |
|
||||
|
||||
## Структура
|
||||
|
||||
```
|
||||
vpn/
|
||||
├── .env.example
|
||||
├── .gitignore
|
||||
├── requirements.txt
|
||||
├── README.md
|
||||
└── src/
|
||||
└── gp_mcp/
|
||||
├── __init__.py
|
||||
├── config.py # загрузка и валидация env
|
||||
├── db.py # psycopg2 + read-only + timeout
|
||||
├── dbt_runner.py # subprocess dbt compile + чтение compiled SQL
|
||||
├── explain.py # EXPLAIN ANALYZE + summary
|
||||
└── server.py # FastMCP, регистрация tools, stdio
|
||||
```
|
||||
4
requirements.txt
Normal file
4
requirements.txt
Normal file
@@ -0,0 +1,4 @@
|
||||
mcp>=1.2.0
|
||||
psycopg2-binary>=2.9.9
|
||||
python-dotenv>=1.0.1
|
||||
PyYAML>=6.0.1
|
||||
0
src/gp_mcp/__init__.py
Normal file
0
src/gp_mcp/__init__.py
Normal file
125
src/gp_mcp/config.py
Normal file
125
src/gp_mcp/config.py
Normal file
@@ -0,0 +1,125 @@
|
||||
"""Configuration loaded entirely from environment variables.
|
||||
|
||||
No hard-coded defaults for connection or paths — required variables must be
|
||||
set explicitly. A missing required variable raises ConfigError at startup with
|
||||
the offending variable name.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
|
||||
from dotenv import load_dotenv
|
||||
|
||||
|
||||
class ConfigError(RuntimeError):
|
||||
pass
|
||||
|
||||
|
||||
def _require(name: str) -> str:
|
||||
value = os.environ.get(name)
|
||||
if value is None or value.strip() == "":
|
||||
raise ConfigError(f"Required environment variable {name!r} is not set")
|
||||
return value.strip()
|
||||
|
||||
|
||||
def _optional(name: str) -> str | None:
|
||||
value = os.environ.get(name)
|
||||
if value is None or value.strip() == "":
|
||||
return None
|
||||
return value.strip()
|
||||
|
||||
|
||||
def _require_int(name: str) -> int:
|
||||
raw = _require(name)
|
||||
try:
|
||||
return int(raw)
|
||||
except ValueError as exc:
|
||||
raise ConfigError(f"Environment variable {name!r} must be an integer, got {raw!r}") from exc
|
||||
|
||||
|
||||
def _require_positive_int(name: str) -> int:
|
||||
value = _require_int(name)
|
||||
if value <= 0:
|
||||
raise ConfigError(f"Environment variable {name!r} must be > 0, got {value}")
|
||||
return value
|
||||
|
||||
|
||||
def _require_dir(name: str) -> Path:
|
||||
raw = _require(name)
|
||||
path = Path(raw).expanduser()
|
||||
if not path.is_dir():
|
||||
raise ConfigError(f"Environment variable {name!r} points to {raw!r}, which is not a directory")
|
||||
return path
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class GreenplumConfig:
|
||||
host: str
|
||||
port: int
|
||||
user: str
|
||||
password: str
|
||||
database: str
|
||||
schema: str | None
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class DbtConfig:
|
||||
project_dir: Path
|
||||
profiles_dir: Path
|
||||
target: str
|
||||
executable: str
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class LimitsConfig:
|
||||
statement_timeout_ms: int
|
||||
max_statement_timeout_ms: int
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class AppConfig:
|
||||
gp: GreenplumConfig
|
||||
dbt: DbtConfig
|
||||
limits: LimitsConfig
|
||||
log_level: str
|
||||
|
||||
|
||||
def load_config() -> AppConfig:
|
||||
"""Load and validate the entire configuration from environment."""
|
||||
|
||||
load_dotenv(override=False)
|
||||
|
||||
gp = GreenplumConfig(
|
||||
host=_require("GP_HOST"),
|
||||
port=_require_positive_int("GP_PORT"),
|
||||
user=_require("GP_USER"),
|
||||
password=_require("GP_PASSWORD"),
|
||||
database=_require("GP_DATABASE"),
|
||||
schema=_optional("GP_SCHEMA"),
|
||||
)
|
||||
|
||||
dbt = DbtConfig(
|
||||
project_dir=_require_dir("DBT_PROJECT_DIR"),
|
||||
profiles_dir=_require_dir("DBT_PROFILES_DIR"),
|
||||
target=_require("DBT_TARGET"),
|
||||
executable=_optional("DBT_EXECUTABLE") or "dbt",
|
||||
)
|
||||
|
||||
limits = LimitsConfig(
|
||||
statement_timeout_ms=_require_positive_int("STATEMENT_TIMEOUT_MS"),
|
||||
max_statement_timeout_ms=_require_positive_int("MAX_STATEMENT_TIMEOUT_MS"),
|
||||
)
|
||||
if limits.statement_timeout_ms > limits.max_statement_timeout_ms:
|
||||
raise ConfigError(
|
||||
"STATEMENT_TIMEOUT_MS must be <= MAX_STATEMENT_TIMEOUT_MS "
|
||||
f"(got {limits.statement_timeout_ms} > {limits.max_statement_timeout_ms})"
|
||||
)
|
||||
|
||||
log_level = (_optional("LOG_LEVEL") or "INFO").upper()
|
||||
if log_level not in {"DEBUG", "INFO", "WARNING", "ERROR"}:
|
||||
raise ConfigError(f"LOG_LEVEL must be one of DEBUG/INFO/WARNING/ERROR, got {log_level!r}")
|
||||
|
||||
return AppConfig(gp=gp, dbt=dbt, limits=limits, log_level=log_level)
|
||||
51
src/gp_mcp/db.py
Normal file
51
src/gp_mcp/db.py
Normal file
@@ -0,0 +1,51 @@
|
||||
"""Greenplum connections with enforced read-only mode and statement_timeout."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from contextlib import contextmanager
|
||||
from typing import Iterator
|
||||
|
||||
import psycopg2
|
||||
from psycopg2 import sql
|
||||
from psycopg2.extensions import connection as PgConnection
|
||||
|
||||
from .config import GreenplumConfig
|
||||
|
||||
|
||||
def connect(gp: GreenplumConfig, statement_timeout_ms: int) -> PgConnection:
|
||||
"""Open a new connection with read-only and timeout enforced.
|
||||
|
||||
Why session-level (not just transaction-level) read-only: a misbehaving query
|
||||
that opens its own transaction inside the session still cannot write.
|
||||
"""
|
||||
|
||||
conn = psycopg2.connect(
|
||||
host=gp.host,
|
||||
port=gp.port,
|
||||
user=gp.user,
|
||||
password=gp.password,
|
||||
dbname=gp.database,
|
||||
application_name="gp-mcp",
|
||||
)
|
||||
conn.autocommit = True
|
||||
with conn.cursor() as cur:
|
||||
cur.execute("SET SESSION CHARACTERISTICS AS TRANSACTION READ ONLY")
|
||||
cur.execute("SET default_transaction_read_only = on")
|
||||
cur.execute("SET statement_timeout = %s", (statement_timeout_ms,))
|
||||
if gp.schema:
|
||||
schemas = [s.strip() for s in gp.schema.split(",") if s.strip()]
|
||||
if schemas:
|
||||
stmt = sql.SQL("SET search_path TO {}").format(
|
||||
sql.SQL(", ").join(sql.Identifier(s) for s in schemas)
|
||||
)
|
||||
cur.execute(stmt)
|
||||
return conn
|
||||
|
||||
|
||||
@contextmanager
|
||||
def open_connection(gp: GreenplumConfig, statement_timeout_ms: int) -> Iterator[PgConnection]:
|
||||
conn = connect(gp, statement_timeout_ms)
|
||||
try:
|
||||
yield conn
|
||||
finally:
|
||||
conn.close()
|
||||
96
src/gp_mcp/dbt_runner.py
Normal file
96
src/gp_mcp/dbt_runner.py
Normal file
@@ -0,0 +1,96 @@
|
||||
"""Run `dbt compile` and read the compiled SQL for a selected model."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import re
|
||||
import subprocess
|
||||
from pathlib import Path
|
||||
|
||||
import yaml
|
||||
|
||||
from .config import DbtConfig
|
||||
|
||||
|
||||
class DbtCompileError(RuntimeError):
|
||||
pass
|
||||
|
||||
|
||||
_MODEL_NAME_RE = re.compile(r"^[A-Za-z_][A-Za-z0-9_]*$")
|
||||
|
||||
|
||||
def _validate_model_name(model_name: str) -> str:
|
||||
"""Reject anything that isn't a bare dbt identifier.
|
||||
|
||||
Why: model_name is appended to a `dbt --select` argument and used to locate
|
||||
a file on disk. Restricting to identifier characters keeps both subprocess
|
||||
and filesystem lookup safe.
|
||||
"""
|
||||
|
||||
if not _MODEL_NAME_RE.match(model_name):
|
||||
raise DbtCompileError(
|
||||
f"Invalid dbt model name {model_name!r}: must match {_MODEL_NAME_RE.pattern}"
|
||||
)
|
||||
return model_name
|
||||
|
||||
|
||||
def _project_name(project_dir: Path) -> str:
|
||||
project_file = project_dir / "dbt_project.yml"
|
||||
if not project_file.is_file():
|
||||
raise DbtCompileError(f"dbt_project.yml not found in {project_dir}")
|
||||
with project_file.open("r", encoding="utf-8") as f:
|
||||
data = yaml.safe_load(f) or {}
|
||||
name = data.get("name")
|
||||
if not isinstance(name, str) or not name:
|
||||
raise DbtCompileError(f"`name` not found in {project_file}")
|
||||
return name
|
||||
|
||||
|
||||
def _find_compiled_sql(project_dir: Path, project_name: str, model_name: str) -> Path:
|
||||
compiled_root = project_dir / "target" / "compiled" / project_name
|
||||
if not compiled_root.is_dir():
|
||||
raise DbtCompileError(f"Compiled output dir does not exist: {compiled_root}")
|
||||
matches = list(compiled_root.rglob(f"{model_name}.sql"))
|
||||
if not matches:
|
||||
raise DbtCompileError(
|
||||
f"Compiled SQL for model {model_name!r} not found under {compiled_root}"
|
||||
)
|
||||
if len(matches) > 1:
|
||||
# Ambiguous (model name reused across paths). Surface the candidates.
|
||||
rels = ", ".join(str(p.relative_to(project_dir)) for p in matches)
|
||||
raise DbtCompileError(
|
||||
f"Multiple compiled files match model {model_name!r}: {rels}"
|
||||
)
|
||||
return matches[0]
|
||||
|
||||
|
||||
def compile_model(cfg: DbtConfig, model_name: str) -> str:
|
||||
"""Compile a single dbt model and return the resulting SQL."""
|
||||
|
||||
model_name = _validate_model_name(model_name)
|
||||
project_name = _project_name(cfg.project_dir)
|
||||
|
||||
cmd = [
|
||||
cfg.executable,
|
||||
"compile",
|
||||
"--select", model_name,
|
||||
"--project-dir", str(cfg.project_dir),
|
||||
"--profiles-dir", str(cfg.profiles_dir),
|
||||
"--target", cfg.target,
|
||||
]
|
||||
|
||||
result = subprocess.run(
|
||||
cmd,
|
||||
cwd=cfg.project_dir,
|
||||
capture_output=True,
|
||||
text=True,
|
||||
check=False,
|
||||
)
|
||||
if result.returncode != 0:
|
||||
raise DbtCompileError(
|
||||
f"dbt compile failed (exit {result.returncode}):\n"
|
||||
f"stdout:\n{result.stdout}\n"
|
||||
f"stderr:\n{result.stderr}"
|
||||
)
|
||||
|
||||
compiled_path = _find_compiled_sql(cfg.project_dir, project_name, model_name)
|
||||
return compiled_path.read_text(encoding="utf-8")
|
||||
120
src/gp_mcp/explain.py
Normal file
120
src/gp_mcp/explain.py
Normal file
@@ -0,0 +1,120 @@
|
||||
"""Run EXPLAIN (ANALYZE, VERBOSE, FORMAT JSON) and summarise the GP plan."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Any
|
||||
|
||||
from psycopg2.extensions import connection as PgConnection
|
||||
|
||||
|
||||
class ExplainError(RuntimeError):
|
||||
pass
|
||||
|
||||
|
||||
@dataclass
|
||||
class PlanSummary:
|
||||
total_cost: float | None
|
||||
plan_rows: float | None
|
||||
actual_rows: float | None
|
||||
execution_time_ms: float | None
|
||||
planning_time_ms: float | None
|
||||
slowest_node: dict[str, Any] | None
|
||||
motion_nodes: list[dict[str, Any]] = field(default_factory=list)
|
||||
rows_misestimation_factor: float | None = None
|
||||
|
||||
def as_dict(self) -> dict[str, Any]:
|
||||
return {
|
||||
"total_cost": self.total_cost,
|
||||
"plan_rows": self.plan_rows,
|
||||
"actual_rows": self.actual_rows,
|
||||
"execution_time_ms": self.execution_time_ms,
|
||||
"planning_time_ms": self.planning_time_ms,
|
||||
"slowest_node": self.slowest_node,
|
||||
"motion_nodes": self.motion_nodes,
|
||||
"rows_misestimation_factor": self.rows_misestimation_factor,
|
||||
}
|
||||
|
||||
|
||||
def explain_analyze_json(conn: PgConnection, sql_text: str) -> list[dict[str, Any]]:
|
||||
"""Run EXPLAIN (ANALYZE, VERBOSE, FORMAT JSON) and return the raw plan list."""
|
||||
|
||||
if not sql_text or not sql_text.strip():
|
||||
raise ExplainError("SQL is empty")
|
||||
|
||||
# ANALYZE actually executes the statement. The session is set
|
||||
# default_transaction_read_only=on (see db.connect), so writes are rejected
|
||||
# by the server. statement_timeout caps runaway plans.
|
||||
wrapped = f"EXPLAIN (ANALYZE, VERBOSE, FORMAT JSON) {sql_text}"
|
||||
with conn.cursor() as cur:
|
||||
cur.execute(wrapped)
|
||||
row = cur.fetchone()
|
||||
if not row:
|
||||
raise ExplainError("EXPLAIN returned no rows")
|
||||
payload = row[0]
|
||||
if not isinstance(payload, list):
|
||||
raise ExplainError(f"EXPLAIN returned unexpected payload type: {type(payload).__name__}")
|
||||
return payload
|
||||
|
||||
|
||||
def _walk(node: dict[str, Any]):
|
||||
yield node
|
||||
for child in node.get("Plans", []) or []:
|
||||
yield from _walk(child)
|
||||
|
||||
|
||||
def summarise(plan_payload: list[dict[str, Any]]) -> PlanSummary:
|
||||
"""Extract GP-relevant metrics from the JSON plan."""
|
||||
|
||||
if not plan_payload:
|
||||
raise ExplainError("Plan payload is empty")
|
||||
|
||||
root = plan_payload[0]
|
||||
plan = root.get("Plan", {})
|
||||
|
||||
total_cost = plan.get("Total Cost")
|
||||
plan_rows = plan.get("Plan Rows")
|
||||
actual_rows = plan.get("Actual Rows")
|
||||
execution_time = root.get("Execution Time")
|
||||
planning_time = root.get("Planning Time")
|
||||
|
||||
slowest: dict[str, Any] | None = None
|
||||
motions: list[dict[str, Any]] = []
|
||||
|
||||
for node in _walk(plan):
|
||||
node_type = node.get("Node Type", "")
|
||||
if "Motion" in node_type or node_type.startswith("Gather"):
|
||||
motions.append({
|
||||
"node_type": node_type,
|
||||
"slice": node.get("Slice"),
|
||||
"senders": node.get("Senders"),
|
||||
"receivers": node.get("Receivers"),
|
||||
"actual_rows": node.get("Actual Rows"),
|
||||
"actual_total_time_ms": node.get("Actual Total Time"),
|
||||
})
|
||||
actual_total = node.get("Actual Total Time")
|
||||
if actual_total is not None:
|
||||
if slowest is None or actual_total > slowest.get("actual_total_time_ms", -1):
|
||||
slowest = {
|
||||
"node_type": node_type,
|
||||
"actual_total_time_ms": actual_total,
|
||||
"actual_rows": node.get("Actual Rows"),
|
||||
"plan_rows": node.get("Plan Rows"),
|
||||
"relation": node.get("Relation Name"),
|
||||
"alias": node.get("Alias"),
|
||||
}
|
||||
|
||||
misestimation: float | None = None
|
||||
if plan_rows and actual_rows and plan_rows > 0 and actual_rows > 0:
|
||||
misestimation = max(plan_rows / actual_rows, actual_rows / plan_rows)
|
||||
|
||||
return PlanSummary(
|
||||
total_cost=total_cost,
|
||||
plan_rows=plan_rows,
|
||||
actual_rows=actual_rows,
|
||||
execution_time_ms=execution_time,
|
||||
planning_time_ms=planning_time,
|
||||
slowest_node=slowest,
|
||||
motion_nodes=motions,
|
||||
rows_misestimation_factor=misestimation,
|
||||
)
|
||||
109
src/gp_mcp/server.py
Normal file
109
src/gp_mcp/server.py
Normal file
@@ -0,0 +1,109 @@
|
||||
"""MCP stdio server exposing Greenplum EXPLAIN tools for dbt model review."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import logging
|
||||
import sys
|
||||
from typing import Any
|
||||
|
||||
from mcp.server.fastmcp import FastMCP
|
||||
|
||||
from .config import AppConfig, ConfigError, load_config
|
||||
from .db import open_connection
|
||||
from .dbt_runner import DbtCompileError, compile_model
|
||||
from .explain import ExplainError, explain_analyze_json, summarise
|
||||
|
||||
|
||||
logger = logging.getLogger("gp_mcp")
|
||||
|
||||
|
||||
def _resolve_timeout(cfg: AppConfig, override_ms: int | None) -> int:
|
||||
if override_ms is None:
|
||||
return cfg.limits.statement_timeout_ms
|
||||
if override_ms <= 0:
|
||||
raise ValueError("statement_timeout_ms must be > 0")
|
||||
if override_ms > cfg.limits.max_statement_timeout_ms:
|
||||
raise ValueError(
|
||||
f"statement_timeout_ms {override_ms} exceeds MAX_STATEMENT_TIMEOUT_MS "
|
||||
f"{cfg.limits.max_statement_timeout_ms}"
|
||||
)
|
||||
return override_ms
|
||||
|
||||
|
||||
def _explain_payload(cfg: AppConfig, sql_text: str, timeout_ms: int) -> dict[str, Any]:
|
||||
with open_connection(cfg.gp, timeout_ms) as conn:
|
||||
plan = explain_analyze_json(conn, sql_text)
|
||||
summary = summarise(plan)
|
||||
return {
|
||||
"summary": summary.as_dict(),
|
||||
"plan": plan,
|
||||
"statement_timeout_ms": timeout_ms,
|
||||
}
|
||||
|
||||
|
||||
def build_server(cfg: AppConfig) -> FastMCP:
|
||||
mcp = FastMCP("gp-mcp")
|
||||
|
||||
@mcp.tool()
|
||||
def explain_sql(sql: str, statement_timeout_ms: int | None = None) -> str:
|
||||
"""Run EXPLAIN (ANALYZE, VERBOSE, FORMAT JSON) on the given SQL.
|
||||
|
||||
Returns a JSON string with:
|
||||
- summary: GP-relevant metrics (total_cost, execution_time_ms,
|
||||
motion_nodes, slowest_node, rows_misestimation_factor)
|
||||
- plan: raw EXPLAIN JSON
|
||||
- statement_timeout_ms: actual timeout applied
|
||||
The session is read-only; writes are rejected by the server.
|
||||
"""
|
||||
timeout_ms = _resolve_timeout(cfg, statement_timeout_ms)
|
||||
payload = _explain_payload(cfg, sql, timeout_ms)
|
||||
return json.dumps(payload, ensure_ascii=False, default=str)
|
||||
|
||||
@mcp.tool()
|
||||
def explain_dbt_model(model_name: str, statement_timeout_ms: int | None = None) -> str:
|
||||
"""Compile a dbt model and run EXPLAIN (ANALYZE, FORMAT JSON) on it.
|
||||
|
||||
Steps:
|
||||
1. `dbt compile --select <model_name>` in the configured project
|
||||
2. Read target/compiled/<project>/.../<model>.sql
|
||||
3. EXPLAIN ANALYZE against Greenplum (read-only session)
|
||||
|
||||
Returns the same JSON shape as explain_sql, plus `compiled_sql`.
|
||||
"""
|
||||
timeout_ms = _resolve_timeout(cfg, statement_timeout_ms)
|
||||
compiled_sql = compile_model(cfg.dbt, model_name)
|
||||
payload = _explain_payload(cfg, compiled_sql, timeout_ms)
|
||||
payload["compiled_sql"] = compiled_sql
|
||||
payload["model_name"] = model_name
|
||||
return json.dumps(payload, ensure_ascii=False, default=str)
|
||||
|
||||
return mcp
|
||||
|
||||
|
||||
def main() -> int:
|
||||
try:
|
||||
cfg = load_config()
|
||||
except ConfigError as exc:
|
||||
# stderr — stdout is the MCP transport channel.
|
||||
print(f"Configuration error: {exc}", file=sys.stderr)
|
||||
return 2
|
||||
|
||||
logging.basicConfig(
|
||||
level=cfg.log_level,
|
||||
stream=sys.stderr,
|
||||
format="%(asctime)s %(levelname)s %(name)s: %(message)s",
|
||||
)
|
||||
logger.info(
|
||||
"gp-mcp starting (host=%s db=%s schema=%s timeout_ms=%d max_timeout_ms=%d)",
|
||||
cfg.gp.host, cfg.gp.database, cfg.gp.schema,
|
||||
cfg.limits.statement_timeout_ms, cfg.limits.max_statement_timeout_ms,
|
||||
)
|
||||
|
||||
mcp = build_server(cfg)
|
||||
mcp.run(transport="stdio")
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
raise SystemExit(main())
|
||||
Reference in New Issue
Block a user