From ddaf2777039b0a008ff99e534e705b09cdada267 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=A0=D0=B0=D0=B4=D0=B8=D0=BA?= Date: Sun, 31 May 2026 14:17:52 +0300 Subject: [PATCH] Add heuristic warnings to plan summary Detects 7 common GP plan anti-patterns (broadcast of large table, redistribute of large table, hot node, nested loop with large outer, large sort, spill to disk, stale row estimates, planning-heavy queries) and surfaces them as summary.warnings[] with severity + evidence. Raw plan and raw summary metrics are preserved so the agent can verify each warning against the underlying numbers. --- README.md | 33 ++++++- src/gp_mcp/explain.py | 206 +++++++++++++++++++++++++++++++++++++++++- 2 files changed, 235 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index 13bbdee..dfeb10f 100644 --- a/README.md +++ b/README.md @@ -31,15 +31,44 @@ MCP-сервер для оценки плана запросов dbt-модел "planning_time_ms": 12.1, "slowest_node": { "node_type": "Seq Scan", "actual_total_time_ms": 700.2, "...": "..." }, "motion_nodes": [{ "node_type": "Redistribute Motion", "...": "..." }], - "rows_misestimation_factor": 1.02 + "rows_misestimation_factor": 1.02, + "warnings": [ + { + "code": "broadcast_large_table", + "severity": "critical", + "message": "Broadcast Motion ships 5,400,000 rows to every segment ...", + "evidence": { "node_type": "Broadcast Motion", "actual_rows": 5400000, "...": "..." } + } + ] }, - "plan": [ /* raw EXPLAIN JSON */ ], + "plan": [ /* raw EXPLAIN JSON — пруф для каждого warning */ ], "statement_timeout_ms": 300000, "compiled_sql": "select ...", "model_name": "fct_orders" } ``` +### Warnings + +`summary.warnings[]` — это эвристический разбор плана для агента: какие узлы выглядят +неоптимально и что с этим делать. Сырые числа из плана **остаются** в `summary.*` и +`plan[]` — каждый warning несёт `evidence` со ссылкой на конкретные значения, чтобы +агент мог перепроверить рекомендацию, а не доверять ей слепо. + +| `code` | `severity` | Триггер | +|--------|------------|---------| +| `rows_misestimation` | warning | plan vs actual rows расходятся в > 10× — статистика устарела | +| `broadcast_large_table` | critical | `Broadcast Motion` гонит > 1M строк на все сегменты | +| `redistribute_large_table` | warning | `Redistribute Motion` тасует > 10M строк — проверить distribution key | +| `hot_node` | warning | Один узел занимает > 60% `execution_time_ms` | +| `nested_loop_large` | critical | `Nested Loop` с > 10 000 итераций | +| `large_sort` | warning | `Sort` по > 10M строк | +| `spill_to_disk` | warning | Узел льётся на диск (`Disk Usage > 0`, `Sort Method: external merge Disk` и т.п.) | +| `planning_heavy` | info | `planning_time_ms / execution_time_ms > 20%` | + +Пороги — константы в верхней части [src/gp_mcp/explain.py](src/gp_mcp/explain.py) +(`ROWS_MISESTIMATION_FACTOR`, `BROADCAST_LARGE_ROWS`, …), калибруются под кластер. + ## Установка ```bash diff --git a/src/gp_mcp/explain.py b/src/gp_mcp/explain.py index b775303..343eed5 100644 --- a/src/gp_mcp/explain.py +++ b/src/gp_mcp/explain.py @@ -12,6 +12,34 @@ class ExplainError(RuntimeError): pass +# Heuristic thresholds for the warning detector. Tuned for medium-sized GP 6 +# clusters; adjust per cluster if false positives start appearing. +ROWS_MISESTIMATION_FACTOR = 10.0 # plan vs actual rows diverge by this much → planner stats stale +BROADCAST_LARGE_ROWS = 1_000_000 # broadcasting more than this many rows is almost always a mistake +REDISTRIBUTE_LARGE_ROWS = 10_000_000 # redistributes above this volume warrant a distribution-key review +HOT_NODE_TIME_SHARE = 0.6 # single node dominates if it owns > 60% of execution time +NESTED_LOOP_LARGE_LOOPS = 10_000 # nested loop iterating this often is usually a mis-plan +LARGE_SORT_ROWS = 10_000_000 # sorting more rows than this on every run is suspicious +PLANNING_TIME_SHARE = 0.2 # planning time eating > 20% of total → simplify or materialise + + +@dataclass +class Warning: + """Single optimisation hint with evidence for the agent to verify.""" + code: str # short machine-readable identifier + severity: str # "info" | "warning" | "critical" + message: str # human-readable explanation + evidence: dict[str, Any] # raw numbers backing the warning + + def as_dict(self) -> dict[str, Any]: + return { + "code": self.code, + "severity": self.severity, + "message": self.message, + "evidence": self.evidence, + } + + @dataclass class PlanSummary: total_cost: float | None @@ -22,6 +50,7 @@ class PlanSummary: slowest_node: dict[str, Any] | None motion_nodes: list[dict[str, Any]] = field(default_factory=list) rows_misestimation_factor: float | None = None + warnings: list[Warning] = field(default_factory=list) def as_dict(self) -> dict[str, Any]: return { @@ -33,6 +62,7 @@ class PlanSummary: "slowest_node": self.slowest_node, "motion_nodes": self.motion_nodes, "rows_misestimation_factor": self.rows_misestimation_factor, + "warnings": [w.as_dict() for w in self.warnings], } @@ -63,8 +93,178 @@ def _walk(node: dict[str, Any]): yield from _walk(child) +def _node_has_spill(node: dict[str, Any]) -> tuple[bool, dict[str, Any]]: + """Detect whether a node spilled to disk (work_mem exhausted).""" + evidence = {} + spilled = False + for key in ("Disk Usage", "Workfile: Spilling", "Sort Method"): + if key in node: + evidence[key] = node[key] + if isinstance(node.get("Disk Usage"), (int, float)) and node["Disk Usage"] > 0: + spilled = True + sort_method = node.get("Sort Method", "") + if isinstance(sort_method, str) and "Disk" in sort_method: + spilled = True + if node.get("Workfile: Spilling") in (True, "true", 1): + spilled = True + return spilled, evidence + + +def detect_warnings(plan_payload: list[dict[str, Any]], summary: PlanSummary) -> list[Warning]: + """Apply heuristics to flag suspicious plan shapes. + + Each warning carries the raw numbers (`evidence`) that triggered it so the + agent can cross-check against the full plan rather than trusting the rule. + """ + + warnings: list[Warning] = [] + if not plan_payload: + return warnings + plan = plan_payload[0].get("Plan", {}) + + if summary.rows_misestimation_factor and summary.rows_misestimation_factor > ROWS_MISESTIMATION_FACTOR: + warnings.append(Warning( + code="rows_misestimation", + severity="warning", + message=( + f"Plan rows vs actual rows diverge by {summary.rows_misestimation_factor:.1f}x " + f"(threshold {ROWS_MISESTIMATION_FACTOR:.0f}x). Planner statistics are likely stale — " + "run ANALYZE on the source tables, or rewrite filters that wrap columns in functions." + ), + evidence={ + "plan_rows": summary.plan_rows, + "actual_rows": summary.actual_rows, + "factor": summary.rows_misestimation_factor, + }, + )) + + for m in summary.motion_nodes: + rows = m.get("actual_rows") or 0 + nt = m.get("node_type", "") + if "Broadcast" in nt and rows > BROADCAST_LARGE_ROWS: + warnings.append(Warning( + code="broadcast_large_table", + severity="critical", + message=( + f"Broadcast Motion ships {rows:,} rows to every segment " + f"(threshold {BROADCAST_LARGE_ROWS:,}). Usually means a large table is on " + "the wrong side of a join — review join key vs distribution key." + ), + evidence=m, + )) + elif "Redistribute" in nt and rows > REDISTRIBUTE_LARGE_ROWS: + warnings.append(Warning( + code="redistribute_large_table", + severity="warning", + message=( + f"Redistribute Motion moves {rows:,} rows between segments " + f"(threshold {REDISTRIBUTE_LARGE_ROWS:,}). Consider aligning distribution keys " + "of the joined tables to avoid the reshuffle." + ), + evidence=m, + )) + + if ( + summary.slowest_node + and summary.execution_time_ms + and summary.slowest_node.get("actual_total_time_ms") is not None + and summary.execution_time_ms > 0 + ): + share = summary.slowest_node["actual_total_time_ms"] / summary.execution_time_ms + if share > HOT_NODE_TIME_SHARE: + warnings.append(Warning( + code="hot_node", + severity="warning", + message=( + f"One node ({summary.slowest_node['node_type']}) consumes {share:.0%} of execution time " + f"(threshold {HOT_NODE_TIME_SHARE:.0%}). Optimising anything else is noise — " + "focus on this node." + ), + evidence={ + "share": share, + "node": summary.slowest_node, + "execution_time_ms": summary.execution_time_ms, + }, + )) + + for node in _walk(plan): + nt = node.get("Node Type", "") + + if nt == "Nested Loop": + loops = node.get("Actual Loops") or 0 + if loops > NESTED_LOOP_LARGE_LOOPS: + warnings.append(Warning( + code="nested_loop_large", + severity="critical", + message=( + f"Nested Loop iterates {loops:,} times (threshold {NESTED_LOOP_LARGE_LOOPS:,}). " + "Usually a symptom of bad row estimates — check that statistics are fresh; " + "a Hash Join would likely be orders of magnitude faster." + ), + evidence={ + "actual_loops": loops, + "actual_rows": node.get("Actual Rows"), + "actual_total_time_ms": node.get("Actual Total Time"), + }, + )) + + if nt == "Sort": + rows = node.get("Actual Rows") or 0 + if rows > LARGE_SORT_ROWS: + warnings.append(Warning( + code="large_sort", + severity="warning", + message=( + f"Sort processes {rows:,} rows (threshold {LARGE_SORT_ROWS:,}). " + "Consider whether the ORDER BY is required, or align distribution / " + "sort keys so the data arrives pre-sorted." + ), + evidence={ + "actual_rows": rows, + "sort_key": node.get("Sort Key"), + "sort_method": node.get("Sort Method"), + }, + )) + + spilled, spill_evidence = _node_has_spill(node) + if spilled: + warnings.append(Warning( + code="spill_to_disk", + severity="warning", + message=( + f"{nt} spilled to disk (work_mem exhausted). Increase statement_mem for this model " + "(dbt pre-hook `SET statement_mem='...'`), or reduce intermediate volume by " + "filtering / aggregating earlier." + ), + evidence={"node_type": nt, **spill_evidence}, + )) + + if ( + summary.planning_time_ms + and summary.execution_time_ms + and summary.execution_time_ms > 0 + and summary.planning_time_ms / summary.execution_time_ms > PLANNING_TIME_SHARE + ): + share = summary.planning_time_ms / summary.execution_time_ms + warnings.append(Warning( + code="planning_heavy", + severity="info", + message=( + f"Planning time is {share:.0%} of execution time (threshold {PLANNING_TIME_SHARE:.0%}). " + "For hot models consider materialisation or simplifying joins with many small tables." + ), + evidence={ + "planning_time_ms": summary.planning_time_ms, + "execution_time_ms": summary.execution_time_ms, + "share": share, + }, + )) + + return warnings + + def summarise(plan_payload: list[dict[str, Any]]) -> PlanSummary: - """Extract GP-relevant metrics from the JSON plan.""" + """Extract GP-relevant metrics and optimisation hints from the JSON plan.""" if not plan_payload: raise ExplainError("Plan payload is empty") @@ -108,7 +308,7 @@ def summarise(plan_payload: list[dict[str, Any]]) -> PlanSummary: if plan_rows and actual_rows and plan_rows > 0 and actual_rows > 0: misestimation = max(plan_rows / actual_rows, actual_rows / plan_rows) - return PlanSummary( + summary = PlanSummary( total_cost=total_cost, plan_rows=plan_rows, actual_rows=actual_rows, @@ -118,3 +318,5 @@ def summarise(plan_payload: list[dict[str, Any]]) -> PlanSummary: motion_nodes=motions, rows_misestimation_factor=misestimation, ) + summary.warnings = detect_warnings(plan_payload, summary) + return summary