Guide
Routing EXPLAIN ANALYZE Output to Centralized Logs
Routing EXPLAIN ANALYZE output to centralized logs represents a highly constrained pipeline stage within modern database observability architectures. This step operates strictly between raw plan capture and downstream analytical processing, functioning as a deterministic transport layer rather than a transformation engine. For platform teams managing multi-tenant database fleets, the routing stage must guarantee lossless delivery, enforce strict schema contracts, and maintain complete isolation from query execution paths. When implemented correctly, this workflow eliminates the operational friction of scattered diagnostic data while preserving the integrity required for automated regression detection.
Stage Boundaries and Contract Enforcement
Strict stage isolation is non-negotiable in production-grade telemetry pipelines. The routing layer accepts only fully materialized EXPLAIN ANALYZE payloads from upstream collectors and emits structured JSON events to centralized logging infrastructure. It does not parse execution trees, compute cost metrics, or trigger regression alerts. Those responsibilities belong to dedicated normalization and evaluation stages. The routing component relies on a single upstream dependency: a low-overhead capture mechanism that guarantees Capturing EXPLAIN Plans Without Impacting Production Performance. Once the payload reaches the router, the system treats it as an immutable telemetry object.
Operational dependencies are explicitly defined through interface contracts. The router expects a standardized envelope containing query_hash, execution_timestamp, cluster_id, plan_text, actual_rows, and total_time_ms. Any deviation from this schema triggers immediate rejection to a dead-letter queue rather than silent mutation. Downstream consumers, including the Automated EXPLAIN Capture & Storage Workflows framework, subscribe exclusively to the routed output stream, ensuring that transport failures never cascade into storage corruption. Schema validation occurs synchronously at the ingress boundary, preventing malformed payloads from consuming downstream compute cycles.
Deterministic Routing Logic and Thresholds
Routing decisions must be stateless, reproducible, and driven by explicit metadata tags. Dynamic routing based on runtime heuristics introduces non-determinism that complicates incident reconstruction. Instead, the pipeline employs a rule-based dispatcher that evaluates static attributes at ingestion time. Common routing dimensions include:
- Cluster Topology: Directs plans to region-specific log indices for data residency compliance.
- Query Classification: Routes DDL, DML, and analytical workloads to separate retention policies.
- Latency Thresholds: Flags queries exceeding baseline execution windows for expedited delivery to SRE alerting channels.
- Sampling Rate Control: Applies deterministic modulo hashing on
query_hashto cap ingestion volume during traffic spikes.
Threshold evaluation follows a strict precedence chain. The dispatcher first checks for explicit routing tags injected by the capture agent. If absent, it evaluates latency multipliers against historical baselines. Queries exceeding total_time_ms > 5000 or actual_rows > 1_000_000 are tagged critical and routed to a high-priority index with a 7-day retention window. Standard workloads route to baseline indices with 30-day retention. Sampling is enforced via consistent hashing: hash(query_hash) % 100 < sample_rate. This guarantees identical routing outcomes for identical queries across restarts, a prerequisite for reliable Normalizing Query Plans for Cross-Engine Comparison.
Production-Ready Implementation Blueprint
The following Python implementation demonstrates an async, schema-validated router with explicit threshold evaluation, batch shipping, and circuit-breaker fallback. It leverages asyncio for non-blocking I/O and adheres to the OpenTelemetry Semantic Conventions for Logs for structured telemetry emission.
import asyncio
import hashlib
import json
import logging
from dataclasses import dataclass
from enum import Enum
from typing import Optional
import aiohttp
from pydantic import BaseModel, ValidationError, Field
logger = logging.getLogger("explain_router")
class RouteTarget(str, Enum):
CRITICAL = "critical-plans"
BASELINE = "baseline-plans"
DDL = "ddl-plans"
class ExplainEnvelope(BaseModel):
query_hash: str
execution_timestamp: float
cluster_id: str
plan_text: str
actual_rows: int = Field(ge=0)
total_time_ms: float = Field(ge=0)
query_type: Optional[str] = None
@dataclass
class RoutingConfig:
critical_time_ms: float = 5000.0
critical_rows: int = 1_000_000
sample_rate_pct: int = 100
batch_size: int = 50
flush_interval_s: float = 2.0
class ExplainRouter:
def __init__(self, config: RoutingConfig, log_endpoint: str):
self.config = config
self.log_endpoint = log_endpoint
self._queue: asyncio.Queue = asyncio.Queue(maxsize=10000)
self._circuit_open = False
self._session: Optional[aiohttp.ClientSession] = None
async def _init_session(self):
self._session = aiohttp.ClientSession(
timeout=aiohttp.ClientTimeout(total=5.0),
connector=aiohttp.TCPConnector(limit=20)
)
def _evaluate_route(self, payload: ExplainEnvelope) -> RouteTarget:
if payload.query_type == "DDL":
return RouteTarget.DDL
if (payload.total_time_ms > self.config.critical_time_ms or
payload.actual_rows > self.config.critical_rows):
return RouteTarget.CRITICAL
return RouteTarget.BASELINE
def _apply_sampling(self, payload: ExplainEnvelope) -> bool:
if self.config.sample_rate_pct >= 100:
return True
digest = int(hashlib.sha256(payload.query_hash.encode()).hexdigest(), 16)
return (digest % 100) < self.config.sample_rate_pct
async def ingest(self, raw_json: str) -> bool:
try:
envelope = ExplainEnvelope.model_validate_json(raw_json)
except ValidationError as e:
logger.warning("Schema violation, routing to DLQ: %s", e)
await self._send_to_dlq(raw_json)
return False
if not self._apply_sampling(envelope):
return True # Dropped intentionally
target_index = self._evaluate_route(envelope)
routed_event = {
**envelope.model_dump(),
"route_target": target_index.value,
"ingestion_timestamp": asyncio.get_event_loop().time()
}
await self._queue.put(routed_event)
return True
async def _flush_batch(self):
batch = []
while len(batch) < self.config.batch_size:
try:
batch.append(await asyncio.wait_for(self._queue.get(), timeout=0.1))
except asyncio.TimeoutError:
break
if not batch:
return
if self._circuit_open:
await self._spill_to_disk(batch)
return
try:
payload = json.dumps(batch)
async with self._session.post(
self.log_endpoint,
data=payload,
headers={"Content-Type": "application/json"}
) as resp:
resp.raise_for_status()
except (aiohttp.ClientError, asyncio.TimeoutError) as e:
logger.error("Log endpoint failure, opening circuit: %s", e)
self._circuit_open = True
await self._spill_to_disk(batch)
async def _send_to_dlq(self, raw: str):
# Implementation omitted for brevity: push to Kafka DLQ topic or S3
pass
async def _spill_to_disk(self, batch: list[dict]):
# Fallback: append to local JSONL file with atomic rename
pass
async def run(self):
await self._init_session()
while True:
await self._flush_batch()
await asyncio.sleep(self.config.flush_interval_s)
if self._circuit_open:
await asyncio.sleep(30) # Cooldown before retry
self._circuit_open = FalseThis architecture aligns with established patterns for Building Async Ingestion Pipelines for High-Throughput Queries, prioritizing backpressure management and graceful degradation over synchronous blocking calls.
Observability Hooks and Safe Fallback Protocols
Transport reliability is enforced through explicit observability hooks and deterministic fallback paths. Every routing decision emits structured metrics: explain_router_ingested_total, explain_router_dropped_total, and explain_router_queue_depth. Latency histograms track route_evaluation_duration_ms and batch_flush_duration_ms to detect dispatcher degradation before it impacts capture agents.
When the centralized log endpoint becomes unreachable, the router activates a tiered fallback protocol:
- Circuit Breaker Activation: After three consecutive
5xxor timeout responses, the dispatcher halts network egress for a configurable cooldown period. - Local Disk Spillover: In-flight batches are serialized to an append-only JSONL file on ephemeral storage. A background watcher monitors disk utilization and triggers alerts at 70% capacity.
- Dead-Letter Queue (DLQ) Routing: Malformed payloads or expired spillover events are forwarded to a persistent message broker with infinite retention. A reconciliation job periodically replays DLQ entries once downstream health is restored.
Queue depth is capped at 10,000 events to prevent unbounded memory consumption. When the threshold is breached, the router applies a strict drop-oldest policy, logging the event hash for auditability. This ensures that the routing layer never becomes a bottleneck for the database host, preserving the strict isolation boundary between telemetry transport and query execution.