Skip to content

vector233/kqbatch

Repository files navigation

kqbatch

Go Reference CI License: MIT

kqbatch helps Go Kafka consumers process messages in efficient batches instead of one message at a time.

It turns segmentio/kafka-go readers into size/time-based batch consumers with bounded concurrency, retries, error-handler hooks, graceful shutdown, and explicit offset commits. It is built for services that consume Kafka and write to downstream systems that are faster in bulk, such as databases, batch HTTP APIs, search indexes, or warehouses.

Use kqbatch when

  • You use github.com/segmentio/kafka-go.
  • Your downstream is more efficient in bulk, for example database bulk insert or batch HTTP APIs.
  • You want count/time-based batch flushing without hand-rolling buffers, timers, retries, shutdown, and offset commits.
  • You accept terminal failures being handled by ErrorHandler and then committed.

Not for

  • Exactly-once processing semantics.
  • Kafka transactions.
  • Full message routing or workflow orchestration frameworks.
  • Per-message business workflows where batching is not the core problem.

How it works

Kafka topic
    │
    ├── Worker 1 ──► buffer ──► [ batch accumulator ] ──► processor pool ──► BatchProcessor
    ├── Worker 2 ──► buffer ──► [ batch accumulator ] ──► processor pool ──► BatchProcessor
    └── Worker N ──► buffer ──► [ batch accumulator ] ──► processor pool ──► BatchProcessor

Each worker maintains an independent Kafka reader. Messages are accumulated until either ChunkSize is reached or FlushInterval elapses. A bounded goroutine pool handles batches concurrently, retrying on failure and routing dead messages to ErrorHandler.

Why kqbatch

Most Go Kafka libraries deliver messages one at a time to the handler:

Library Handler unit Batch accumulation
zeromicro/go-queue single message no
ThreeDotsLabs/watermill single message no
IBM/sarama (ConsumerGroup) single message no
segmentio/kafka-go (Reader) single message no
kqbatch []kafka.Message yes — count + time dual-trigger

The difference matters when the downstream operation (database write, HTTP call, etc.) is far more efficient in bulk. With a per-message handler you either call the database once per message, or wire up your own accumulation state and flush logic. kqbatch does this for you: messages accumulate until ChunkSize is reached or FlushInterval elapses — whichever comes first — and your handler always receives a ready-to-use slice.

Install

go get github.com/vector233/kqbatch

Quick start

package main

import (
    "context"
    "encoding/json"
    "os"
    "os/signal"
    "syscall"
    "time"

    "github.com/vector233/kqbatch"
    "github.com/segmentio/kafka-go"
)

type OrderProcessor struct{}

func (p *OrderProcessor) ProcessBatch(ctx context.Context, msgs []kafka.Message) error {
    orders := make([]Order, 0, len(msgs))
    for _, msg := range msgs {
        var o Order
        if err := json.Unmarshal(msg.Value, &o); err != nil {
            return err
        }
        orders = append(orders, o)
    }
    return db.BulkInsert(ctx, orders)
}

func main() {
    k := kqbatch.NewKafka(kqbatch.KafkaConfig{
        Brokers: []string{"kafka:9092"},
    })

    consumer := kqbatch.New(k,
        kqbatch.Config{
            Name:          "orders",
            Topic:         "orders",
            Group:         "order-processor",
            Workers:       3,   // parallel Kafka readers (≤ partition count)
            ChunkSize:     200, // flush when batch hits 200 messages
            FlushInterval: 1,   // also flush every 1 second if batch isn't full
            // ManualCommit=true: sync commit after each success; on terminal failure
            // ErrorHandler runs per-message then the offset is committed.
            // Processors is forced to 1; scale throughput via Workers instead.
            ManualCommit:  true,
        },
        &OrderProcessor{},
        kqbatch.WithLogger(myLogger),
    )

    ctx, cancel := context.WithCancel(context.Background())
    consumer.Start(ctx)

    sig := make(chan os.Signal, 1)
    signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)
    <-sig

    cancel()                                                    // stop fetching
    stopCtx, stopCancel := context.WithTimeout(context.Background(), 30*time.Second)
    defer stopCancel()
    consumer.Stop(stopCtx)                                      // wait up to 30s
}

Configuration

Field Default Description
Name Name shown in log lines
Topic Kafka topic to consume
Group Consumer group ID
Offset "last" Starting offset for new groups: "first" or "last"
Workers 3 Parallel Kafka readers; should not exceed partition count
ChunkSize 200 Max messages per batch
FlushInterval 1 Seconds before a partial batch is flushed
Processors 10 Max concurrent batch-processing goroutines per worker. Forced to 1 when ManualCommit=true.
MaxRetry 0 Retry attempts on batch failure (0 = no retry)
RetryInterval 3 Seconds between retries
ErrorConcurrency 10 Concurrent error-handler calls for a failed batch (0 = serial)
ManualCommit false true = commit synchronously after each success; on terminal failure, ErrorHandler runs then offset is committed
MinBytes 1024 Min bytes to fetch per Kafka request
MaxBytes 10485760 Max bytes to fetch per Kafka request

Options

// Plug in your own logger (zap, logrus, slog, zerolog, …)
kqbatch.WithLogger(logger)

// Custom dead-letter handler called per-message when all retries are exhausted
kqbatch.WithErrorHandler(func(ctx context.Context, msg kafka.Message, err error) {
    deadLetterQueue.Send(msg)
})

// Override the per-worker message buffer capacity (default: ChunkSize × Processors)
kqbatch.WithBufferLen(5000)

Offset commit modes

ManualCommit: false (default)
CommitMessages is called after every batch (success or failure) and flushed to Kafka asynchronously by kafka-go's background goroutine (approximately every 1 second). Higher throughput than ManualCommit=true. A crash within the ~1s flush window may cause recent batches to be re-delivered on restart.

ManualCommit: true
CommitMessages is called synchronously after each successful batch. On failure after all retries:

  1. ErrorHandler is called once per message in the failed batch.
  2. The batch offset is then committed so a restarted consumer does not replay it.

Use ErrorHandler to write to a dead-letter queue, emit alerts, or record the failure in an application-specific way. The library calls it once and moves on — the library does not guarantee ErrorHandler itself succeeded. If ErrorHandler panics the panic is recovered, and the commit still proceeds. Make ErrorHandler reliable (retry internally, or accept that the failure is only logged).

Processors is automatically clamped to 1 when ManualCommit=true. This ensures offset commits are always sequential — concurrent processors committing in arbitrary order would cause Kafka to regress the committed offset on restart. To scale throughput with ManualCommit=true, increase Workers instead (each worker has its own Kafka reader and an independent, sequential processor).

Graceful shutdown

Cancel the context to stop fetching. Stop(ctx) then waits for:

  1. All fetch goroutines to exit
  2. Any messages already in the buffer to be batched and dispatched
  3. All in-flight processor goroutines to finish

Pass a timeout context to Stop to cap the wait time:

cancel()                                                              // signal shutdown
stopCtx, stopCancel := context.WithTimeout(context.Background(), 30*time.Second)
defer stopCancel()
consumer.Stop(stopCtx)                                                // blocks ≤ 30s

If the deadline expires before all processors finish, consumers are forcibly closed and Stop returns immediately. Some in-flight commits may be lost.

Reliability guarantees

Scenario ManualCommit=false ManualCommit=true
Normal processing async commit after each batch (~1s) sync commit after each batch
Process crash mid-batch batches in last ~1s window may re-deliver batch re-delivered on restart
Transient fetch error retried automatically (1s backoff) retried automatically (1s backoff)
All retries exhausted ErrorHandler per-message, then commit ErrorHandler per-message, then commit
ProcessBatch panic recovered, logged; batch not explicitly committed recovered, logged; batch re-delivered
ErrorHandler panic recovered, logged; commit still proceeds recovered, logged; commit still proceeds

Producing messages

producer := k.NewProducer("orders")
defer producer.Close()

producer.Push(ctx, myOrder) // JSON-marshals and sends asynchronously

Tuning

  • Workers: match the number of topic partitions for maximum parallelism. With ManualCommit=true, this is the primary lever for scaling throughput.
  • ChunkSize + FlushInterval: larger batches improve database throughput; shorter intervals reduce processing latency.
  • Processors: bound by your downstream (database) concurrency capacity. Ignored (set to 1) when ManualCommit=true.
  • Buffer: default ChunkSize × Processors absorbs bursts. Increase via WithBufferLen if the fetch rate greatly exceeds processing rate.

Performance

See bench/BENCH_DESIGN.md for benchmark methodology, configuration matrix, and results.

License

kqbatch is released under the MIT License.

Design notes

Why Processors=1 with ManualCommit=true?

When Processors>1, multiple goroutines process batches concurrently and call CommitMessages in non-deterministic completion order. If batch B (higher offsets) commits before batch A (lower offsets), Kafka records the higher offset. On crash, the consumer restarts from batch B's offset — batch A is never re-delivered, breaking at-least-once semantics.

The current implementation takes the simplest correct approach: enforce Processors=1 so commits are always serial and monotonically increasing. A future enhancement could implement an ordered commit queue that buffers commit results and flushes them in dispatch order, restoring concurrent processing without offset regression.

Fetch error retry

If FetchMessage returns a non-shutdown error (network blip, broker rebalance, etc.), the worker logs the error, sleeps 1 second, and retries. Without this, a transient error would permanently stop a worker — the service would appear alive but consume nothing.

Stop timeout

Stop(ctx) spawns a goroutine that waits for all workers and closes a channel when done. A select then races that channel against ctx.Done(). This ensures Stop always returns within the caller's deadline even if processors are slow to drain.