Skip to content

feat: max_concurrency per run with semaphore enforcement #16

Description

@kination

Summary

Add max_concurrency field to DagSpec to cap the number of simultaneously running tasks within a single DAG run.

Without this, a wide DAG (many parallel branches) can spawn hundreds of tasks at once and overwhelm downstream systems like Lambda concurrency limits, EKS API server, or external HTTP endpoints.

Design

Field added to DagSpec:

max_concurrency: nil   # nil = unlimited (default)

YAML usage:

name: wide_pipeline
max_concurrency: 10

Implementation

In DAG.Engine, track per-run state:

  • running_task_count — live count of tasks in :running state
  • pending_queue — task_ids that are dependency-ready but waiting for a slot

On task ready (deps resolved):

  • If running_task_count < max_concurrency → dispatch immediately, increment counter
  • Otherwise → append to pending_queue

On :task_done or :task_failed:

  • Decrement running_task_count
  • If pending_queue is non-empty → pop next task, dispatch it, keep counter stable

This is a semaphore-style counter. No external library needed.

Distinction from Pools

max_concurrency is per-run scope. The planned Pool feature (issue: pools/concurrency-limits) is cross-run and cross-DAG. Both can apply simultaneously.

Files to Change

  • lib/levicon/schema/dag_spec.ex — add max_concurrency field
  • lib/levicon/schema/run.ex — add running_task_count, pending_queue fields
  • lib/levicon/dag/engine.ex — enforce semaphore in handle_info({:task_done}) and handle_info({:task_failed})
  • test/levicon/dag/engine_test.exs — add tests for concurrency limiting behavior

References

  • Design doc: docs/plans/2026-03-05-levicon-mvp.md (DAG.Engine section)
  • Improvement research: docs/improvement-research.md (section A)

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions