Add heuristic warnings to plan summary

Detects 7 common GP plan anti-patterns (broadcast of large table,
redistribute of large table, hot node, nested loop with large outer,
large sort, spill to disk, stale row estimates, planning-heavy queries)
and surfaces them as summary.warnings[] with severity + evidence.

Raw plan and raw summary metrics are preserved so the agent can verify
each warning against the underlying numbers.
This commit is contained in:
2026-05-31 14:17:52 +03:00
parent 7c9487e0f9
commit ddaf277703
2 changed files with 235 additions and 4 deletions

View File

@@ -31,15 +31,44 @@ MCP-сервер для оценки плана запросов dbt-модел
"planning_time_ms": 12.1,
"slowest_node": { "node_type": "Seq Scan", "actual_total_time_ms": 700.2, "...": "..." },
"motion_nodes": [{ "node_type": "Redistribute Motion", "...": "..." }],
"rows_misestimation_factor": 1.02
"rows_misestimation_factor": 1.02,
"warnings": [
{
"code": "broadcast_large_table",
"severity": "critical",
"message": "Broadcast Motion ships 5,400,000 rows to every segment ...",
"evidence": { "node_type": "Broadcast Motion", "actual_rows": 5400000, "...": "..." }
}
]
},
"plan": [ /* raw EXPLAIN JSON */ ],
"plan": [ /* raw EXPLAIN JSON пруф для каждого warning */ ],
"statement_timeout_ms": 300000,
"compiled_sql": "select ...",
"model_name": "fct_orders"
}
```
### Warnings
`summary.warnings[]` — это эвристический разбор плана для агента: какие узлы выглядят
неоптимально и что с этим делать. Сырые числа из плана **остаются** в `summary.*` и
`plan[]` — каждый warning несёт `evidence` со ссылкой на конкретные значения, чтобы
агент мог перепроверить рекомендацию, а не доверять ей слепо.
| `code` | `severity` | Триггер |
|--------|------------|---------|
| `rows_misestimation` | warning | plan vs actual rows расходятся в > 10× — статистика устарела |
| `broadcast_large_table` | critical | `Broadcast Motion` гонит > 1M строк на все сегменты |
| `redistribute_large_table` | warning | `Redistribute Motion` тасует > 10M строк — проверить distribution key |
| `hot_node` | warning | Один узел занимает > 60% `execution_time_ms` |
| `nested_loop_large` | critical | `Nested Loop` с > 10 000 итераций |
| `large_sort` | warning | `Sort` по > 10M строк |
| `spill_to_disk` | warning | Узел льётся на диск (`Disk Usage > 0`, `Sort Method: external merge Disk` и т.п.) |
| `planning_heavy` | info | `planning_time_ms / execution_time_ms > 20%` |
Пороги — константы в верхней части [src/gp_mcp/explain.py](src/gp_mcp/explain.py)
(`ROWS_MISESTIMATION_FACTOR`, `BROADCAST_LARGE_ROWS`, …), калибруются под кластер.
## Установка
```bash

View File

@@ -12,6 +12,34 @@ class ExplainError(RuntimeError):
pass
# Heuristic thresholds for the warning detector. Tuned for medium-sized GP 6
# clusters; adjust per cluster if false positives start appearing.
ROWS_MISESTIMATION_FACTOR = 10.0 # plan vs actual rows diverge by this much → planner stats stale
BROADCAST_LARGE_ROWS = 1_000_000 # broadcasting more than this many rows is almost always a mistake
REDISTRIBUTE_LARGE_ROWS = 10_000_000 # redistributes above this volume warrant a distribution-key review
HOT_NODE_TIME_SHARE = 0.6 # single node dominates if it owns > 60% of execution time
NESTED_LOOP_LARGE_LOOPS = 10_000 # nested loop iterating this often is usually a mis-plan
LARGE_SORT_ROWS = 10_000_000 # sorting more rows than this on every run is suspicious
PLANNING_TIME_SHARE = 0.2 # planning time eating > 20% of total → simplify or materialise
@dataclass
class Warning:
"""Single optimisation hint with evidence for the agent to verify."""
code: str # short machine-readable identifier
severity: str # "info" | "warning" | "critical"
message: str # human-readable explanation
evidence: dict[str, Any] # raw numbers backing the warning
def as_dict(self) -> dict[str, Any]:
return {
"code": self.code,
"severity": self.severity,
"message": self.message,
"evidence": self.evidence,
}
@dataclass
class PlanSummary:
total_cost: float | None
@@ -22,6 +50,7 @@ class PlanSummary:
slowest_node: dict[str, Any] | None
motion_nodes: list[dict[str, Any]] = field(default_factory=list)
rows_misestimation_factor: float | None = None
warnings: list[Warning] = field(default_factory=list)
def as_dict(self) -> dict[str, Any]:
return {
@@ -33,6 +62,7 @@ class PlanSummary:
"slowest_node": self.slowest_node,
"motion_nodes": self.motion_nodes,
"rows_misestimation_factor": self.rows_misestimation_factor,
"warnings": [w.as_dict() for w in self.warnings],
}
@@ -63,8 +93,178 @@ def _walk(node: dict[str, Any]):
yield from _walk(child)
def _node_has_spill(node: dict[str, Any]) -> tuple[bool, dict[str, Any]]:
"""Detect whether a node spilled to disk (work_mem exhausted)."""
evidence = {}
spilled = False
for key in ("Disk Usage", "Workfile: Spilling", "Sort Method"):
if key in node:
evidence[key] = node[key]
if isinstance(node.get("Disk Usage"), (int, float)) and node["Disk Usage"] > 0:
spilled = True
sort_method = node.get("Sort Method", "")
if isinstance(sort_method, str) and "Disk" in sort_method:
spilled = True
if node.get("Workfile: Spilling") in (True, "true", 1):
spilled = True
return spilled, evidence
def detect_warnings(plan_payload: list[dict[str, Any]], summary: PlanSummary) -> list[Warning]:
"""Apply heuristics to flag suspicious plan shapes.
Each warning carries the raw numbers (`evidence`) that triggered it so the
agent can cross-check against the full plan rather than trusting the rule.
"""
warnings: list[Warning] = []
if not plan_payload:
return warnings
plan = plan_payload[0].get("Plan", {})
if summary.rows_misestimation_factor and summary.rows_misestimation_factor > ROWS_MISESTIMATION_FACTOR:
warnings.append(Warning(
code="rows_misestimation",
severity="warning",
message=(
f"Plan rows vs actual rows diverge by {summary.rows_misestimation_factor:.1f}x "
f"(threshold {ROWS_MISESTIMATION_FACTOR:.0f}x). Planner statistics are likely stale — "
"run ANALYZE on the source tables, or rewrite filters that wrap columns in functions."
),
evidence={
"plan_rows": summary.plan_rows,
"actual_rows": summary.actual_rows,
"factor": summary.rows_misestimation_factor,
},
))
for m in summary.motion_nodes:
rows = m.get("actual_rows") or 0
nt = m.get("node_type", "")
if "Broadcast" in nt and rows > BROADCAST_LARGE_ROWS:
warnings.append(Warning(
code="broadcast_large_table",
severity="critical",
message=(
f"Broadcast Motion ships {rows:,} rows to every segment "
f"(threshold {BROADCAST_LARGE_ROWS:,}). Usually means a large table is on "
"the wrong side of a join — review join key vs distribution key."
),
evidence=m,
))
elif "Redistribute" in nt and rows > REDISTRIBUTE_LARGE_ROWS:
warnings.append(Warning(
code="redistribute_large_table",
severity="warning",
message=(
f"Redistribute Motion moves {rows:,} rows between segments "
f"(threshold {REDISTRIBUTE_LARGE_ROWS:,}). Consider aligning distribution keys "
"of the joined tables to avoid the reshuffle."
),
evidence=m,
))
if (
summary.slowest_node
and summary.execution_time_ms
and summary.slowest_node.get("actual_total_time_ms") is not None
and summary.execution_time_ms > 0
):
share = summary.slowest_node["actual_total_time_ms"] / summary.execution_time_ms
if share > HOT_NODE_TIME_SHARE:
warnings.append(Warning(
code="hot_node",
severity="warning",
message=(
f"One node ({summary.slowest_node['node_type']}) consumes {share:.0%} of execution time "
f"(threshold {HOT_NODE_TIME_SHARE:.0%}). Optimising anything else is noise — "
"focus on this node."
),
evidence={
"share": share,
"node": summary.slowest_node,
"execution_time_ms": summary.execution_time_ms,
},
))
for node in _walk(plan):
nt = node.get("Node Type", "")
if nt == "Nested Loop":
loops = node.get("Actual Loops") or 0
if loops > NESTED_LOOP_LARGE_LOOPS:
warnings.append(Warning(
code="nested_loop_large",
severity="critical",
message=(
f"Nested Loop iterates {loops:,} times (threshold {NESTED_LOOP_LARGE_LOOPS:,}). "
"Usually a symptom of bad row estimates — check that statistics are fresh; "
"a Hash Join would likely be orders of magnitude faster."
),
evidence={
"actual_loops": loops,
"actual_rows": node.get("Actual Rows"),
"actual_total_time_ms": node.get("Actual Total Time"),
},
))
if nt == "Sort":
rows = node.get("Actual Rows") or 0
if rows > LARGE_SORT_ROWS:
warnings.append(Warning(
code="large_sort",
severity="warning",
message=(
f"Sort processes {rows:,} rows (threshold {LARGE_SORT_ROWS:,}). "
"Consider whether the ORDER BY is required, or align distribution / "
"sort keys so the data arrives pre-sorted."
),
evidence={
"actual_rows": rows,
"sort_key": node.get("Sort Key"),
"sort_method": node.get("Sort Method"),
},
))
spilled, spill_evidence = _node_has_spill(node)
if spilled:
warnings.append(Warning(
code="spill_to_disk",
severity="warning",
message=(
f"{nt} spilled to disk (work_mem exhausted). Increase statement_mem for this model "
"(dbt pre-hook `SET statement_mem='...'`), or reduce intermediate volume by "
"filtering / aggregating earlier."
),
evidence={"node_type": nt, **spill_evidence},
))
if (
summary.planning_time_ms
and summary.execution_time_ms
and summary.execution_time_ms > 0
and summary.planning_time_ms / summary.execution_time_ms > PLANNING_TIME_SHARE
):
share = summary.planning_time_ms / summary.execution_time_ms
warnings.append(Warning(
code="planning_heavy",
severity="info",
message=(
f"Planning time is {share:.0%} of execution time (threshold {PLANNING_TIME_SHARE:.0%}). "
"For hot models consider materialisation or simplifying joins with many small tables."
),
evidence={
"planning_time_ms": summary.planning_time_ms,
"execution_time_ms": summary.execution_time_ms,
"share": share,
},
))
return warnings
def summarise(plan_payload: list[dict[str, Any]]) -> PlanSummary:
"""Extract GP-relevant metrics from the JSON plan."""
"""Extract GP-relevant metrics and optimisation hints from the JSON plan."""
if not plan_payload:
raise ExplainError("Plan payload is empty")
@@ -108,7 +308,7 @@ def summarise(plan_payload: list[dict[str, Any]]) -> PlanSummary:
if plan_rows and actual_rows and plan_rows > 0 and actual_rows > 0:
misestimation = max(plan_rows / actual_rows, actual_rows / plan_rows)
return PlanSummary(
summary = PlanSummary(
total_cost=total_cost,
plan_rows=plan_rows,
actual_rows=actual_rows,
@@ -118,3 +318,5 @@ def summarise(plan_payload: list[dict[str, Any]]) -> PlanSummary:
motion_nodes=motions,
rows_misestimation_factor=misestimation,
)
summary.warnings = detect_warnings(plan_payload, summary)
return summary