Runbook

Using Kafka for Async Query Plan Ingestion at Scale

Transitioning from synchronous EXPLAIN execution to a decoupled message bus is a foundational requirement for modern query plan baseline tracking. When database workloads exceed 50,000 queries per second, synchronous capture introduces unacceptable latency spikes and connection pool exhaustion. Implementing Using Kafka for Async Query Plan Ingestion at Scale requires precise threshold management, deterministic consumer logic, and fail-safe override mechanisms. The following operational guide details symptom identification, root cause analysis, and production-ready automation patterns tailored for database SREs and platform engineering teams.

Symptom Identification & Production Thresholds

Performance regression detection relies on timely plan ingestion. When the async pipeline degrades, baseline drift occurs silently until production queries regress. Monitor these exact thresholds to trigger automated alerts:

  • Consumer Group Lag: Sustained lag > 15,000 messages per partition for > 3 minutes indicates backpressure or deserialization bottlenecks.
  • p95 Ingestion Latency: End-to-end latency (producer publish to baseline store commit) exceeding 1,200ms degrades regression alerting SLAs.
  • Deserialization Failure Rate: > 0.15% of messages routed to the dead-letter queue (DLQ) signals schema drift or malformed JSON/Protobuf payloads.
  • Kafka Broker Disk I/O Wait: Sustained iowait > 65% on broker partitions hosting the query-plans topic causes producer RequestTimeout errors.
  • Baseline Store Write Contention: PostgreSQL/ClickHouse row lock wait times > 800ms during upsert operations indicate partition skew or missing composite indexes on (query_hash, plan_version).

When any threshold breaches, the pipeline must automatically throttle non-critical captures while preserving high-priority regression candidates. This behavior is typically governed by the configuration patterns outlined in Automated EXPLAIN Capture & Storage Workflows, which define the routing logic for baseline persistence.

Observability Integration

Deploy Prometheus alerting rules mapped directly to these thresholds. Use OpenTelemetry to propagate trace_id from the database proxy through the producer, broker, consumer, and baseline store. Correlate kafka_consumer_lag with db_query_duration to distinguish ingestion bottlenecks from actual workload regressions.

YAML
# prometheus/alerts/kafka_ingestion.yml
groups:
  - name: kafka_query_plan_ingestion
    rules:
      - alert: HighConsumerLag
        expr: sum(kafka_consumer_group_lag{group="query-plan-consumers", topic="query-plans"}) by (partition) > 15000
        for: 3m
        labels:
          severity: warning
        annotations:
          summary: "Query plan consumer lag exceeds safe threshold"
      - alert: DLQFailureSpike
        expr: rate(kafka_consumer_dlq_total[5m]) / rate(kafka_consumer_records_consumed_total[5m]) > 0.0015
        for: 2m
        labels:
          severity: critical
        annotations:
          summary: "Deserialization failure rate breaching 0.15% SLA"

Root Cause Analysis & Debugging Scenarios

Async ingestion failures rarely stem from a single component. Production runbooks should prioritize these failure domains during triage:

1. Python GIL Contention & JSON Parsing Overhead

Synchronous json.loads() on multi-megabyte execution plans blocks the event loop. When CPU utilization on consumer pods exceeds 85% with low network I/O, the bottleneck is typically unbatched deserialization. Mitigation: Replace the standard library parser with orjson or msgspec, which bypass GIL contention during C-extension execution. Process records in micro-batches rather than one-by-one.

2. Kafka Consumer Rebalancing Storms

Aggressive session.timeout.ms (< 10,000) combined with heavy GC pauses triggers unnecessary partition reassignment. Each rebalance pauses ingestion for 2–5 seconds, creating lag spikes. Mitigation: Align heartbeat and poll intervals with expected processing time. Set session.timeout.ms=45000, heartbeat.interval.ms=15000, and max.poll.interval.ms=300000. Use cooperative-sticky partition assignment to minimize partition revocation during scale events. Refer to Apache Kafka Consumer Configuration for exact parameter semantics.

3. Schema Registry Version Mismatch

Producers publishing Avro/Protobuf payloads with backward-incompatible field removals cause SchemaRegistryError exceptions. Consumers fail to deserialize and drop messages silently if error handling is absent. Mitigation: Enforce BACKWARD_TRANSITIVE compatibility at the registry level. Implement a validation middleware that rejects non-conforming payloads at the producer edge before they enter the topic.

4. Broker Disk Saturation & ISR Shrinkage

High log.segment.bytes or aggressive flush intervals cause disk queue buildup. When UnderReplicatedPartitions > 0, producers block until acks=all is satisfied. Mitigation: Monitor kafka_server_ReplicaManager_UnderReplicatedPartitions. Adjust log.flush.interval.messages=100000 and log.flush.interval.ms=10000 to batch disk writes. Ensure num.io.threads matches available NVMe IOPS.

Production Configuration & Code Patterns

Topic Configuration

PROPERTIES
# topic-config.properties
query-plans.partitions=48
query-plans.replication.factor=3
query-plans.retention.bytes=107374182400  # 100GB per partition
query-plans.retention.ms=604800000        # 7 days
query-plans.compression.type=zstd
query-plans.min.insync.replicas=2
query-plans.cleanup.policy=delete

High-Throughput Producer (Python)

PYTHON
from confluent_kafka import Producer
import orjson
import hashlib

def delivery_callback(err, msg):
    if err:
        # Route to local fallback queue or DLQ
        pass

producer = Producer({
    'bootstrap.servers': 'kafka-broker-01:9092,kafka-broker-02:9092',
    'compression.type': 'zstd',
    'batch.size': 1048576,
    'linger.ms': 15,
    'acks': 'all',
    'enable.idempotence': True,
    'max.in.flight.requests.per.connection': 5
})

def publish_query_plan(plan_payload: dict, priority: str = "standard"):
    plan_json = orjson.dumps(plan_payload, option=orjson.OPT_SERIALIZE_NUMPY)
    query_hash = hashlib.sha256(plan_json).hexdigest()
    key = f"{priority}:{query_hash}".encode('utf-8')
    
    producer.produce(
        topic='query-plans',
        key=key,
        value=plan_json,
        callback=delivery_callback
    )
    producer.poll(0)

Deterministic Consumer with DLQ Routing

PYTHON
from confluent_kafka import Consumer, KafkaError
import orjson
import time

consumer = Consumer({
    'bootstrap.servers': 'kafka-broker-01:9092',
    'group.id': 'query-plan-consumers',
    'auto.offset.reset': 'earliest',
    'enable.auto.commit': False,
    'partition.assignment.strategy': 'cooperative-sticky',
    'session.timeout.ms': 45000
})

consumer.subscribe(['query-plans'])

def process_batch(records):
    for msg in records:
        try:
            payload = orjson.loads(msg.value())
            # Route to baseline store via async upsert
            baseline_store_upsert(payload)
        except Exception as e:
            route_to_dlq(msg, str(e))
    consumer.commit()

while True:
    records = consumer.consume(num_messages=500, timeout=1.0)
    if records:
        process_batch(records)

Safe Fallback Chains & Mitigation Paths

Production systems must degrade gracefully under extreme load. Implement a tiered fallback chain:

  1. Circuit Breaker Activation: When p95_in_latency > 1200ms for 60 seconds, trigger a circuit breaker. Drop priority=low messages immediately. Continue processing priority=critical and priority=regression_candidate.
  2. Local Buffering & Backpressure: If the baseline store rejects writes, buffer messages to a local SQLite/WAL file. Resume ingestion only when store lock contention drops below 400ms.
  3. Synchronous Fallback (Last Resort): If Kafka cluster health degrades (UnderReplicatedPartitions > 5 for > 2m), temporarily route EXPLAIN output directly to a lightweight Redis queue with TTL-based eviction. This preserves capture continuity while accepting higher DB connection overhead.
  4. DLQ Replay & Schema Correction: Messages routed to query-plans.dlq must be inspected via a schema validation worker. Correct malformed payloads, republish to the main topic, and advance consumer offsets only after successful baseline commit.

This multi-tiered approach ensures that query optimization engineers retain visibility into regression candidates even during infrastructure degradation, maintaining alignment with the ingestion pipeline standards defined in Building Async Ingestion Pipelines for High-Throughput Queries.

Observability Integration & Validation

Deploy a continuous validation pipeline alongside ingestion:

  • Schema Drift Detection: Run a nightly diff between the latest Avro/Protobuf schema and the baseline store DDL. Alert on type mismatches or dropped required fields.
  • Trace Sampling: Sample 1% of EXPLAIN captures with full distributed tracing. Verify that producer.publish_ts to baseline_store.commit_ts remains within SLA.
  • Lag Reconciliation: Cross-reference kafka_consumer_group_lag with query_plan_capture_rate from the database proxy. A divergence indicates silent drops at the producer layer.
  • Automated Replay Testing: Use a staging environment to inject synthetic plan payloads at 2x production throughput. Validate that consumer scaling policies trigger correctly and that fallback chains activate without data loss.

By enforcing strict thresholds, deterministic consumer configurations, and explicit degradation paths, platform teams can sustain high-throughput query plan ingestion without compromising baseline accuracy or regression detection latency.