Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
e4bfcaf
feat(Broker): add PgmqBroker
mducros-wm May 18, 2026
783802d
feat(Broker): partionned queue
mducros-wm May 20, 2026
d18e47f
feat(Pgmq): add consumer
mducros-wm May 21, 2026
86a9c86
feat(Pgmq): add listen notify
mducros-wm May 21, 2026
0ad1c3a
fix(Workflow): tests
mducros-wm May 21, 2026
3999582
fix(pgmq): support encoder
mducros-wm May 28, 2026
2221af2
feat(pgmq-broker): add join method
mducros-wm May 28, 2026
4c7df94
feat(api): remove superbowl route
mducros-wm May 28, 2026
39443d2
fix(pgmq-broker): url format
mducros-wm May 28, 2026
23fb4e1
fix(app): works without pgmq extra
mducros-wm May 29, 2026
2cf1621
fix(pgmq): simplify code
mducros-wm Jun 2, 2026
6983122
fix(StateBackend): remove postgres
mducros-wm Jun 2, 2026
308b27e
fix(Worker): remove useless code
mducros-wm Jun 3, 2026
1b94275
fix(postgres): code review
mducros-wm Jun 5, 2026
734649f
fix(postgres): partition creation
mducros-wm Jun 11, 2026
e6a0608
fix(api): restore main apis
mducros-wm Jun 15, 2026
da23afc
fix(pgmessage): compilation
mducros-wm Jun 15, 2026
c74adbf
fix(Encoder): less useless parsing
mducros-wm Jun 16, 2026
01b3511
fix(broker): interval unit
mducros-wm Jun 17, 2026
d85986e
fix(postgres-broker): close properly
mducros-wm Jun 17, 2026
cfc3f2a
fix(listen-notify): code review
mducros-wm Jun 17, 2026
457a4ad
fix(postgres): improve code
mducros-wm Jun 18, 2026
ce01e4c
fix(postgres): self-healing LISTEN/NOTIFY listener
mducros-wm Jun 19, 2026
cadd4a8
docs(api): drop SuperBowl references and prune dead API surface
mducros-wm Jun 19, 2026
09a2e2d
docs(changelog): list all removed/renamed public APIs for 7.0.0
mducros-wm Jun 19, 2026
0b577e5
docs(changelog): note the repurposed postgres extra as breaking
mducros-wm Jun 19, 2026
ec138ee
test(postgres): use psycopg v3 driver for the fixture's SQLAlchemy en…
mducros-wm Jun 19, 2026
f139890
docs(guide): keep max_retries heading consistent with its siblings
mducros-wm Jun 19, 2026
e873695
docs(guide): document PostgresBroker partitioning and retention
mducros-wm Jun 19, 2026
da0cc5a
fix(postgres): never let a failed ack/nack kill the worker thread
mducros-wm Jun 19, 2026
7d72884
perf(encoder): serialize once on the bytes path; drop committed dev s…
mducros-wm Jun 19, 2026
eac24c4
test(postgres): cover retries, results and groups end-to-end
mducros-wm Jun 19, 2026
9c75b12
refactor(postgres): count queue length via PGMQ metrics, not the raw …
mducros-wm Jun 19, 2026
125976e
fix(postgres): honor timeout=0 as a non-blocking read; reject negativ…
mducros-wm Jun 19, 2026
96dd9b3
fix(encoder): drop PydanticEncoder fallback, raise on decode failure
mducros-wm Jun 19, 2026
c0cb0ff
refactor(postgres): minor review cleanups
mducros-wm Jun 19, 2026
4d98f7b
fix(postgres): support several consumers per queue on the shared list…
mducros-wm Jun 19, 2026
8e5b656
fix(doc): reformulation
mducros-wm Jun 19, 2026
bc2bdc6
feat(encoder): restore PydanticEncoder fallback_encoder support
mducros-wm Jun 24, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ jobs:
ports:
- 5784:5672
postgres:
image: postgres:16
image: ghcr.io/pgmq/pg18-pgmq:v1.10.0
ports:
- 5544:5432
env:
Expand Down
8 changes: 6 additions & 2 deletions CONTRIBUTORS.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,11 @@ of those changes to CLEARTYPE SRL.
| [@thomasLeMeur](https://github.com/thomasLeMeur) | Thomas Le Meur |
| [@fregogui](https://github.com/fregogui) | Guillaume Fregosi |
| [@pgitips](https://github.com/pgitips) | Pierre Giraud |



| [@williampollet](https://github.com/williampollet) | William Pollet |
| [@mehdithez](https://github.com/mehdithez) | Zeroual Mehdi |
| [@julien-duponchelle](https://github.com/julien-duponchelle) | Julien Duponchelle |
| [@alisterd51](https://github.com/alisterd51) | Antoine Clarman |
| [@mducros-wm](https://github.com/mducros-wm) | Martin Ducros |
| [@julien-duponchelle](https://github.com/julien-duponchelle) | Julien Duponchelle |
| [@alisterd51](https://github.com/alisterd51) | Antoine Clarman |
19 changes: 11 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,17 @@ If you want to use it with [RabbitMQ]
uv pip install 'remoulade[rabbitmq]'
```

or if you want to use it with [Redis]
If you want to use it with [PostgreSQL] and [PGMQ]

```console
uv pip install 'remoulade[redis]'
uv pip install 'remoulade[postgres]'
```

If you want Redis-backed extras like results or cancellation, add [Redis] to the broker extra you use:

```console
uv pip install 'remoulade[rabbitmq, redis]'
uv pip install 'remoulade[postgres, redis]'
```

## Quickstart
Expand Down Expand Up @@ -84,12 +91,6 @@ If you want to contribute to the project. First make a Pull request and get appr

This will trigger a CI/CD pipeline that publish the package

## Dashboard

Check out [SuperBowl](https://github.com/wiremind/super-bowl) a dashboard for real-time monitoring and administrating all your Remoulade tasks.
***See the current progress, enqueue, requeue, cancel and more ...***
Super easy to use !.

## Kubernetes

Remoulade is tailored to run transparently in containers on [Kubernetes](https://kubernetes.io/) and to make the most of their features. This does not mean it cannot run outside of Kubernetes ;)
Expand All @@ -110,6 +111,8 @@ remoulade is licensed under the LGPL. Please see [COPYING] and

[COPYING.LESSER]: https://github.com/wiremind/remoulade/blob/master/COPYING.LESSER
[COPYING]: https://github.com/wiremind/remoulade/blob/master/COPYING
[PostgreSQL]: https://www.postgresql.org/
[PGMQ]: https://pgmq.github.io/pgmq/
[RabbitMQ]: https://www.rabbitmq.com/
[Redis]: https://redis.io
[user guide]: https://remoulade.readthedocs.io/en/latest/guide.html
21 changes: 21 additions & 0 deletions docs/source/changelog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,27 @@ Changelog

All notable changes to this project will be documented in this file.

`7.0.0`_ -- 2026-06-15
------------
Breaking changes
^^^^^^^^^^^^^^^^
* Remove the legacy PostgreSQL state backend.
* Remove the ``DELETE /messages/states`` API route, which only worked with the removed PostgreSQL state backend.
* Remove the ``PUT /scheduled/jobs/<job_hash>`` API route (single-job update); use ``PUT /scheduled/jobs`` instead.
* Rework the broker API around the new PostgreSQL/PGMQ implementation.
* Rename ``Encoder.encode``/``Encoder.decode`` to ``Encoder.encode_in_bytes``/``Encoder.decode_bytes``; custom encoders must now also implement ``Encoder._encode_in_json`` and ``Encoder.decode_json``.
* Rename ``Message.encode``/``Message.decode`` to ``Message.encode_in_bytes``/``Message.decode_bytes``.
* ``PydanticEncoder`` no longer depends on ``simplejson``; serialization now goes through Pydantic, so ``Decimal`` values are encoded as JSON strings instead of numbers.
* Repurpose the ``postgres`` extra: it now installs the PGMQ broker dependencies (``sqlalchemy>=2``, ``psycopg>=3``, ``pgmq``) instead of the legacy state backend dependencies (``sqlalchemy<2``, ``psycopg2``).

Feat
^^^^
* Add a PostgreSQL/PGMQ broker with partitioned queues, native delayed messages, ``LISTEN/NOTIFY`` wakeups, and queue join support.
Changed
^^^^^^^
* Restore the main APIs after the broker refactor.
* Update the documentation, examples, CI, and test suite for the new stack.

=======
`6.2.0`_ -- 2026-05-18
-------------
Expand Down
50 changes: 0 additions & 50 deletions docs/source/getting_started.rst
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ By the end of this tutorial, you will be able to do the following:
* :ref:`create a pipeline of tasks that will sequentially process data<getting_started:Creating a Pipeline of tasks>`
* :ref:`use the result middleware to wait and get actor results<getting_started:Using the Result Middleware>`
* :ref:`use the remoulade scheduler to periodically run tasks<getting_started:Scheduling Messages>`
* :ref:`use SuperBowl to monitor and manage your tasks<getting_started:Monitoring and Managing your tasks>`

Prerequisites
-------------
Expand Down Expand Up @@ -341,55 +340,6 @@ To set up the scheduler, we instantiate it, set it as the global scheduler, and

If you run this script and get back to the worker terminal, you will see ``get_weather`` being executed every 10 seconds.

Monitoring and Managing your tasks
----------------------------------

To monitor and manage your tasks, you can use the Superbowl_ dashboard.

.. _Superbowl: https://github.com/wiremind/super-bowl

First, you will need to install Node.js_. Then, clone Superbowl_ in another directory, install its dependencies and run it::

$ cd ..
$ git clone https://github.com/wiremind/super-bowl.git
$ npm install
$ npm run serve

.. _Node.js: https://nodejs.org/en/download/

Now, if you open ``localhost:8080`` in your browser, you will see the SuperBowl dashboard, but you will not see your messages yet. In order to see and manage them, you will have to modify the ``get_weather.py`` script to serve the remoulade api.

.. code-block:: python
:caption: get_weather.py
:emphasize-lines: 4, 22, 23

import requests
import remoulade
from remoulade.brokers.rabbitmq import RabbitmqBroker
from remoulade.api.main import app


@remoulade.actor
def get_weather(city):
url = f"https://goweather.herokuapp.com/weather/{city}"

response = requests.get(url).json()

url_endpoint = <url_endpoint>
text = f'{city}: {response["description"]}'
requests.post(url_endpoint, json=text)


rabbitmq_broker = RabbitmqBroker()
remoulade.set_broker(rabbitmq_broker)
remoulade.declare_actors([get_weather])

if __name__ == "__main__":
app.run(host="localhost", port=5005)

Now you can use the Enqueue tab to enqueue messages with custom arguments, and then see their progress in the messages tab.
Additionally, if you run groups or scheduled jobs in your script, you will be able to see them in their respective tabs.

Next Steps
----------

Expand Down
2 changes: 2 additions & 0 deletions docs/source/global.rst
Original file line number Diff line number Diff line change
Expand Up @@ -88,5 +88,7 @@

.. _gevent: http://www.gevent.org/
.. _RabbitMQ: https://www.rabbitmq.com
.. _PostgreSQL: https://www.postgresql.org/
.. _PGMQ: https://pgmq.github.io/pgmq/
.. _Redis: https://redis.io
Comment thread
mducros-wm marked this conversation as resolved.
.. _Dramatiq: https://dramatiq.io
57 changes: 53 additions & 4 deletions docs/source/guide.rst
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ actor an invalid URL. Let's try it::


Message Retries
---------------
^^^^^^^^^^^^^^^

If an error occurs during message processing, it will be terminated with a failure message.
Alternatively, you can add the |Retries| Middleware to the broker and set the max_retries or retry_when option to automatically retry your message on failure.
Expand Down Expand Up @@ -208,7 +208,7 @@ max_retries
The maximum number of times a message should be retried. Default to ``0``.

min_backoff
^^^^^^^^^^^
^^^^^^^^^^^^^^^

The minimum number of milliseconds of backoff to apply between retries. Must be greater than 100 milliseconds. Defaults to 15 seconds.

Expand Down Expand Up @@ -331,6 +331,11 @@ milliseconds)::
Keep in mind that *your message broker is not a database*. Scheduled
messages should represent a small subset of all your messages.

On brokers that emulate delay in worker memory, the enqueued message
will carry an ``eta`` option. ``PostgresBroker`` stores delayed messages in
PostgreSQL natively instead, so the message it returns does not include
``eta``.


Prioritizing Messages
---------------------
Expand Down Expand Up @@ -381,7 +386,7 @@ Message Brokers
---------------

Remoulade abstracts over the notion of a message broker and currently
supports RabbitMQ out of the box.
supports RabbitMQ and PostgreSQL/PGMQ out of the box.

RabbitMQ Broker
^^^^^^^^^^^^^^^
Expand All @@ -398,6 +403,51 @@ execution::
remoulade.set_broker(rabbitmq_broker)


Postgres Broker
^^^^^^^^^^^^^^^

To configure PostgreSQL/PGMQ, install ``remoulade[postgres]`` and
instantiate a ``PostgresBroker`` with a PostgreSQL URL as early as possible
during your program's execution. This broker must be used with a PostgreSQL
user allowed to create and delete tables::

import remoulade

from remoulade.brokers.postgres import PostgresBroker

postgres_broker = PostgresBroker(url="postgresql://remoulade@localhost:5432/remoulade")
remoulade.set_broker(postgres_broker)

PGMQ handles delayed messages natively, so ``send_with_options(delay=...)``
does not create a worker-side delay queue or add an ``eta`` option to
the message.

Each queue is created as a **partitioned** PGMQ queue (through
``pgmq.create_partitioned``), which relies on the PostgreSQL ``pg_partman``
extension. Messages are stored in time-based partitions of the queue table,
and once a message is acked or nacked it is moved to the queue's archive
table, which is partitioned the same way. Two broker parameters control this:

``archive_partition_interval_in_days`` (default ``1``)
The time span covered by a single partition. Smaller values create more,
smaller partitions; larger values create fewer, larger ones.

``archive_retention_interval_in_days`` (default ``7``)
How long a partition is kept before ``pg_partman`` drops it. Archived
messages older than this window are removed together with their partition,
so set it comfortably above your longest expected processing and retry
window.

.. note::

Partitioning is maintained by ``pg_partman``: its maintenance routine
(``partman.run_maintenance_proc()``) must run periodically — through the
``pg_partman`` background worker or a ``pg_cron`` job — to create upcoming
partitions ahead of time and drop expired ones. The official PGMQ Docker
image (``ghcr.io/pgmq/pg18-pgmq``) ships ``pg_partman`` and schedules this
for you; on a self-managed or hosted PostgreSQL you must enable it yourself.


Local Broker
^^^^^^^^^^^^^^^

Expand Down Expand Up @@ -472,4 +522,3 @@ synchronously by calling them as you would normal functions.

.. _pytest fixtures: https://docs.pytest.org/en/latest/fixture.html
.. _priority documentation: https://www.rabbitmq.com/priority.html

9 changes: 7 additions & 2 deletions docs/source/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,14 @@ If you want to use it with RabbitMQ_::

$ pip install -U 'remoulade[rabbitmq]'

Or if you want to use it with Redis_::
Or if you want to use it with PostgreSQL_ and PGMQ_::

$ pip install -U 'remoulade[redis]'
$ pip install -U 'remoulade[postgres]'

Or if you want to use Redis_ for results and cancellation::

$ pip install -U 'remoulade[rabbitmq, redis]'
$ pip install -U 'remoulade[postgres, redis]'

Read the :doc:`guide` if you're ready to get started.

Expand Down
12 changes: 9 additions & 3 deletions docs/source/installation.rst
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,21 @@ Remoulade supports Python versions 3.12 and up and is installable via

Via pip
-------
remoulade can be used with a RabbbitMQ_ or a PostgreSQL_ broker.

To install remoulade, simply run the following command in a terminal::
If you want to use it with RabbitMQ_, simply run the following command in a terminal::

$ pip install -U 'remoulade[rabbitmq]'

Remoulade use RabbitMQ_ as message broker.
If you want to use PostgreSQL_ with PGMQ_ instead, install::

If you would like to use it with Redis_ to store the results then run:
$ pip install -U 'remoulade[postgres]'

If you would like to use Redis_-backed extras like results or
cancellation, add the ``redis`` extra to whichever broker you choose::

$ pip install -U 'remoulade[rabbitmq, redis]'
$ pip install -U 'remoulade[postgres, redis]'

If you don't have `pip`_ installed, check out `this guide`_.

Expand All @@ -32,6 +37,7 @@ extra requirements:
Name Description
============= =======================================================================================
``rabbitmq`` Installs the required dependencies for using Remoulade with RabbitMQ.
``postgres`` Installs the required dependencies for using Remoulade with PostgreSQL and PGMQ.
``redis`` Installs the required dependencies for using Remoulade with Redis.
============= =======================================================================================

Expand Down
3 changes: 3 additions & 0 deletions docs/source/reference.rst
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ Brokers
.. autoclass:: remoulade.brokers.rabbitmq.RabbitmqBroker
:members:
:inherited-members:
.. autoclass:: remoulade.brokers.postgres.PostgresBroker
:members:
Comment thread
mducros-wm marked this conversation as resolved.
:inherited-members:
.. autoclass:: remoulade.brokers.stub.StubBroker
:members:
:inherited-members:
Expand Down
4 changes: 2 additions & 2 deletions examples/composition/composition/actors.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@
from remoulade.results import Results
from remoulade.results.backends import RedisBackend
from remoulade.state import MessageState
from remoulade.state.backends import PostgresBackend
from remoulade.state.backends import RedisBackend as RedisStateBackend

encoder = PickleEncoder()
backend = RedisBackend(encoder=encoder)
broker = RabbitmqBroker()
broker.add_middleware(Results(backend=backend))
remoulade.set_broker(broker)
remoulade.set_encoder(encoder)
remoulade.get_broker().add_middleware(MessageState(backend=PostgresBackend()))
remoulade.get_broker().add_middleware(MessageState(backend=RedisStateBackend()))


@remoulade.actor(store_results=True)
Expand Down
17 changes: 6 additions & 11 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ classifiers = [
rabbitmq = ["amqpstorm>=2.6,<3"]
redis = ["redis>=7.0.0"]
server = ["flask~=2.3.3", "marshmallow>=3,<4", "flask-apispec"]
postgres = ["sqlalchemy>=1.4.29,<2", "psycopg2>=2.9.11"]
pydantic = ["pydantic>=2.12", "simplejson"]
postgres = ["sqlalchemy>=2.0,<3", "psycopg>=3.2", "pgmq[sqlalchemy]>=1.1.1,<2"]
pydantic = ["pydantic>=2.12"]
limits = ["limits~=5.3.0"]
tracing = ["opentelemetry-api>=1.20"]

Expand All @@ -34,10 +34,10 @@ dev = [
"flask~=2.3.3",
"marshmallow>=3,<4",
"flask-apispec",
"sqlalchemy>=1.4.29,<2",
"psycopg2>=2.9.11",
"sqlalchemy>=2.0,<3",
"pgmq[sqlalchemy]>=1.1.1",
"psycopg>=3.2",
"pydantic>=2.12",
"simplejson",
"limits~=5.3.0",
# Docs
"alabaster",
Expand All @@ -48,10 +48,9 @@ dev = [
# Linting
"ruff",
"mypy~=1.18.2",
"sqlalchemy[mypy]>=1.4.29,<2",
"sqlalchemy[mypy]>=2.0,<3",
"types-redis",
"types-python-dateutil",
"types-simplejson",
"types-requests",
# Misc
"pre-commit",
Expand Down Expand Up @@ -97,10 +96,6 @@ include = ["remoulade*"]
testpaths = ["tests"]
asyncio_mode = "auto"
markers = ["confirm_delivery", "group_transaction"]
filterwarnings = [
"error::sqlalchemy.exc.RemovedIn20Warning",
"error::sqlalchemy.exc.MovedIn20Warning",
]

[tool.ruff]
target-version = "py312"
Expand Down
Loading
Loading