Skip to content

[serve] Add async task processing API: task_consumer, task_handler, TaskProcessorAdapter (issue #54652)#1

Draft
Myasuka with Copilot wants to merge 3 commits into
masterfrom
copilot/add-issue-54652-analysis
Draft

[serve] Add async task processing API: task_consumer, task_handler, TaskProcessorAdapter (issue #54652)#1
Myasuka with Copilot wants to merge 3 commits into
masterfrom
copilot/add-issue-54652-analysis

Conversation

Copilot AI commented Feb 28, 2026

Copy link
Copy Markdown

Ray Serve has no native support for long-running background tasks (batch inference, fine-tuning, ETL), forcing users to block replicas and risk timeouts. This adds a producer-consumer architecture decoupling task submission from execution via an external message broker.

New: python/ray/serve/task_processor.py

  • TaskProcessorConfig — Pydantic config: queue name, adapter backend, retry limit, DLQ names
  • TaskResult — Standardized envelope: id, status, backend_task_id, created_at, result
  • TaskProcessorAdapter — Abstract base class for pluggable backends (initialize, register_task_handler, enqueue_task, get_task_status, start_consumer + optional lifecycle hooks)
  • @task_handler(name) — Tags a deployment method as the entry-point for a named task type
  • @task_consumer(config) — Class decorator: scans for @task_handler methods, wires the adapter, starts the consumer loop in __init__, shuts down in __del__
  • _InMemoryAdapter / _InMemorySettings — Built-in in-process adapter (queue.Queue + daemon thread) for local dev/testing; supports retries and DLQ logging; no external broker required

API changes

task_consumer, task_handler, TaskProcessorConfig, TaskProcessorAdapter, TaskResult exported from ray.serve.

Chinese documentation

All docstrings in task_processor.py are bilingual (Chinese + English). TASK_PROCESSOR_zh.md provides a full Chinese API reference, architecture overview, usage examples, upgrade/rollback guidance, and glossary.

Usage

from ray import serve
from ray.serve.task_processor import TaskProcessorConfig, _InMemorySettings

config = TaskProcessorConfig(
    queue_name="doc_index_queue",
    adapter_config=_InMemorySettings(),
    max_retry=3,
    unprocessable_task_queue_name="dlq",
)

@serve.deployment
@serve.task_consumer(config)
class DocumentIndexingConsumer:
    def __init__(self):
        self.indexer = DocumentIndexingEngine()

    @serve.task_handler(name="index_document")
    def index_document(self, doc_id: str, url: str):
        return self.indexer.process(self.indexer.download(url))

@serve.deployment
@serve.ingress(app)
class APIProducer:
    def __init__(self, config, consumer_app):
        self.adapter = serve.task_processor._build_adapter(config)
        self.adapter.initialize()

    @app.post("/index")
    async def submit(self, request: Request):
        task = self.adapter.enqueue_task("index_document", kwargs=await request.json())
        return {"task_id": task.id}

serve.run(APIProducer.bind(config, DocumentIndexingConsumer.bind()))

✨ Let Copilot coding agent set things up for you — coding agent works faster and does higher quality work when set up for your repo.

Co-authored-by: Myasuka <1709104+Myasuka@users.noreply.github.com>
Copilot AI changed the title [WIP] Analyze purpose and implementation for issue 54652 [serve] Add async task processing API: task_consumer, task_handler, and TaskProcessorAdapter Feb 28, 2026
Co-authored-by: Myasuka <1709104+Myasuka@users.noreply.github.com>
Copilot AI changed the title [serve] Add async task processing API: task_consumer, task_handler, and TaskProcessorAdapter [serve] Add async task processing API: task_consumer, task_handler, TaskProcessorAdapter (issue #54652) Feb 28, 2026
@github-actions

Copy link
Copy Markdown

This pull request has been automatically marked as stale because it has not had
any activity for 14 days. It will be closed in another 14 days if no further activity occurs.
Thank you for your contributions.

You can always ask for help on our discussion forum or Ray's public slack channel.

If you'd like to keep this open, just leave any comment, and the stale label will be removed.

@github-actions github-actions Bot added the stale label Jun 23, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants