Skip to content

feat(Broker): add PgmqBroker#399

Open
mducros-wm wants to merge 39 commits into
masterfrom
codex-pgmq-broker-j1
Open

feat(Broker): add PgmqBroker#399
mducros-wm wants to merge 39 commits into
masterfrom
codex-pgmq-broker-j1

Conversation

@mducros-wm

Copy link
Copy Markdown

No description provided.

@mducros-wm mducros-wm force-pushed the codex-pgmq-broker-j1 branch from e0018fe to e4bfcaf Compare May 20, 2026 08:39
Comment thread .github/workflows/tests.yml Outdated
- 5784:5672
postgres:
image: postgres:16
image: ghcr.io/pgmq/pg18-pgmq:latest

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe "vanilla" pg image and pgmq installation as a plugin ?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could but it seems more complex to me. Why don't you want to use the pgmq image ? I should remove the latest though

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to test the plugin installation and versionning

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it the responsibility of remoulade to install the plugins ?

Comment thread remoulade/brokers/pgmq.py Outdated
Comment thread remoulade/brokers/pgmq.py Outdated
@mducros-wm

Copy link
Copy Markdown
Author

Comment thread pyproject.toml Outdated
Comment thread remoulade/brokers/pgmq.py
Comment thread remoulade/brokers/pgmq.py
Comment thread remoulade/brokers/pgmq.py
self,
*,
url: str,
middleware: list["Middleware"] | None = None,

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you check if all builtin middleware are compatible.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Middleware manipulate remoulade object which are unchanged. However, because the delay is handled natively, some middleware method will never be called (before_delay_message, after_declare_delay_queue)

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But imo, it makes no sense to call them with a pgmq broker

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most of the Middleware are not set by the user directly. Maybe you should have a look on the default middleware setup during init.

Comment thread pyproject.toml Outdated
Comment thread remoulade/brokers/pgmq.py Outdated
Comment thread remoulade/brokers/pgmq.py Outdated
Comment thread remoulade/brokers/pgmq.py Outdated
Comment thread remoulade/brokers/pgmq.py Outdated
Comment thread remoulade/brokers/pgmq.py Outdated
Comment thread remoulade/brokers/pgmq.py Outdated
Comment thread remoulade/brokers/pgmq.py
Comment thread remoulade/brokers/pgmq.py Outdated
Comment thread remoulade/brokers/pgmq.py
Comment thread remoulade/brokers/pgmq.py Outdated
Comment thread remoulade/brokers/pgmq.py Outdated
Comment thread remoulade/brokers/pgmq.py Outdated
Comment thread remoulade/brokers/pgmq.py Outdated
Comment thread remoulade/brokers/pgmq.py Outdated
Comment thread remoulade/brokers/pgmq.py Outdated
Comment thread docs/source/global.rst
Comment thread docs/source/guide.rst Outdated
Comment thread docs/source/installation.rst Outdated
Comment thread docs/superpowers/plans/2026-06-05-postgres-broker-ms-timings.md Outdated
Comment thread remoulade/brokers/pgmq.py Outdated
Comment thread remoulade/brokers/postgres.py Outdated
Comment thread remoulade/brokers/postgres.py Outdated
Comment thread remoulade/brokers/postgres.py Outdated
Comment thread remoulade/brokers/postgres.py Outdated
Comment thread remoulade/brokers/postgres.py Outdated
Comment thread remoulade/brokers/postgres.py
Comment thread remoulade/brokers/postgres.py
Comment thread remoulade/brokers/postgres.py Outdated
Comment thread remoulade/brokers/postgres.py Outdated
@tavallaie

Copy link
Copy Markdown

PGMQ maintainer here, is there anything I can help you with?

Also, it is good if you see our documentation.

@mducros-wm mducros-wm force-pushed the codex-pgmq-broker-j1 branch from aae381d to 457a4ad Compare June 19, 2026 08:12
mducros-wm and others added 16 commits June 19, 2026 10:57
Reconnect the shared listener with capped backoff instead of degrading
to polling permanently after the first connection failure. On any
connection error the listener drops the connection, wakes consumers so
they fall back to polling immediately, then reopens and re-LISTENs on
every registered channel; `available` flips back to True so consumers
resume LISTEN/NOTIFY automatically.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Remove all SuperBowl dashboard mentions from the docs (getting_started,
README). Also remove API artifacts left unused after the legacy state
backend removal and unused by the SuperBowl frontend:
- DeleteSchema, orphaned by the earlier clean_states route removal
- update_job (PUT /scheduled/jobs/<hash>), never called by SuperBowl
- GroupMessagesT, a defined-but-unused TypedDict

Document the removed DELETE /messages/states and PUT /scheduled/jobs/<hash>
routes in the 7.0.0 changelog breaking changes.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Complete the 7.0.0 breaking changes with the Encoder and Message method
renames (encode/decode -> encode_in_bytes/decode_bytes, plus the new
JSON encoder hooks), alongside the already-listed state backend and
removed HTTP routes.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
The postgres extra now pulls the PGMQ broker dependencies instead of the
removed legacy state backend dependencies.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…gine

SQLAlchemy defaults `postgresql://` to the psycopg2 dialect, which the
project does not depend on. CI installs only psycopg v3, so the raw
engine built in the postgres_broker fixture failed at setup with
ModuleNotFoundError: No module named 'psycopg2'. Swap the scheme to
postgresql+psycopg://, mirroring what PGMQ does internally for the
broker. The plain url is kept for PostgresBroker, whose listener opens a
raw psycopg.connect() connection that does not understand +psycopg.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Revert the unjustified ^^^ -> --- underline change on the max_retries
retry option so it matches the other retry options (min_backoff,
max_backoff, retry_when, backoff_strategy).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Explain that queues are partitioned PGMQ queues backed by pg_partman,
what archive_partition_interval_in_days / archive_retention_interval_in_days
control, and that pg_partman maintenance must run periodically.

Also fix the broken sentence about the required PostgreSQL user and remove
the unused FUTURE_PARTITION_HORIZON constant.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
archive() runs in process_message's finally, outside the worker loop's
except Empty, so a transient DB error during ack/nack used to propagate
and kill the worker thread (which has no restart). Swallow and log it
like the RabbitMQ consumer does; the message is redelivered after its
visibility timeout, honoring at-least-once.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…cripts

encode_in_bytes routed through encode_in_json, whose throwaway json.dumps
validation pass ran on top of the real serialization: 2 passes for the
default JSONEncoder (RabbitMQ/Stub) and 3 for PydanticEncoder. Serialize
directly from the data / _encode_in_json output instead, bringing the
default path back to a single pass and Pydantic down to two. Output bytes
are unchanged. Also drop the misleading `# pragma: no cover` on the default
codec, which the Stub broker actually exercises.

Remove local_postgres_broker.py / local_postgres_consumer.py: local dev
scripts with hard-coded DSNs and prints that should never have been committed.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
The existing suite tests the broker's mechanics largely in isolation; the
middleware-driven behaviours were only exercised against the stub and
RabbitMQ brokers. Add real-Worker tests running through PGMQ:

- a flaky actor that fails then succeeds on retry,
- an always-failing actor whose exhausted retries end archived (proving the
  nack does not leave the message to be redelivered forever — no DLQ),
- result storage/retrieval via a Results middleware,
- a group aggregating results.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…table

join() counted messages by hand-building a Table("q_<name>", schema="pgmq")
and running count(*), coupling the broker to PGMQ's internal table layout.
Use the public metrics() API instead: queue_length covers visible, invisible
in-flight and native delayed messages alike, matching join()'s contract.
Prune the now-unused SQLAlchemy imports.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…e timeout

The polling fallback rounded timeout up to a one-second minimum, so a
consumer created with timeout=0 blocked ~1s per read instead of being
non-blocking as documented. Do a single immediate read when timeout is 0,
and validate in the consumer's __init__ that timeout is not negative
(matching the broker's other input validation) instead of silently
pretending to coerce it.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
The fallback_encoder swallowed every decode error and returned the raw,
un-rehydrated payload through the fallback encoder, silently handing actors
the wrong data shape and hiding the original failure. Remove the argument so
decoding raises explicitly (ActorNotFound, ValidationError, ...). Drop the
two tests that asserted the silent-fallback behaviour; the raising paths are
already covered. Document both this and the simplejson->Pydantic
serialization change (Decimal now encoded as a string) as breaking.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
- Fix copy-pasted Message.decode_json/encode_in_json docstrings that said
  "bytestring" while they handle JSON objects.
- Make convert_days_in_partman_syntax a staticmethod; it never used self.
- Reject prefetch < 1 in the consumer with a ValueError (matching the
  timeout validation) instead of claiming a coercion that never happened.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…ener

The shared LISTEN/NOTIFY listener keyed a single wake event per queue, so a
second consumer on the same queue overwrote the first's event and a closing
consumer's unregister() silently stopped notifications for a still-active
sibling. Track a set of events per queue, wake them all, and only drop a
queue's channel routing once its last consumer leaves.

Also back `available` with a threading.Event instead of a bare bool so the
dispatch thread's writes and the consumer threads' reads are synchronized
rather than relying on CPython GIL atomicity; expose it as a read-only
bool property so callers and tests are unaffected.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
@mducros-wm mducros-wm requested a review from fregogui June 19, 2026 14:56
Reintroduce the opt-in fallback_encoder argument so decoding can fall
back to another encoder instead of raising. Defaults to None, keeping
strict decoding as the default behaviour.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants