Skip to content

feat(dfir_pipes): add push accum operators (fold, reduce, etc)#2962

Open
MingweiSamuel wants to merge 1 commit into
mainfrom
mingwei/push-pipes
Open

feat(dfir_pipes): add push accum operators (fold, reduce, etc)#2962
MingweiSamuel wants to merge 1 commit into
mainfrom
mingwei/push-pipes

Conversation

@MingweiSamuel

Copy link
Copy Markdown
Member

No description provided.

@cloudflare-workers-and-pages

cloudflare-workers-and-pages Bot commented Jun 19, 2026

Copy link
Copy Markdown

Deploying hydro with  Cloudflare Pages  Cloudflare Pages

Latest commit: 28fd95e
Status: ✅  Deploy successful!
Preview URL: https://0b87d585.hydroflow.pages.dev
Branch Preview URL: https://mingwei-push-pipes.hydroflow.pages.dev

View logs

@MingweiSamuel MingweiSamuel marked this pull request as ready for review June 19, 2026 20:51
@MingweiSamuel MingweiSamuel requested review from a team and Copilot June 19, 2026 20:51

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR extends dfir_pipes’ push-operator toolbox by introducing accumulator-style combinators (fold/reduce/sort) plus keyed accumulation operators, and updates compile-fail expectations in dfir_rs accordingly.

Changes:

  • Add a unified Accumulate push combinator driven by an AccumState trait, with provided FoldState, ReduceState, and SortState implementations.
  • Add new push combinators: Sort, FoldKeyed, and ReduceKeyed, and expose constructors/exports from push::mod.
  • Update dfir_rs compile-fail .stderr snapshots to reflect new Push implementors shown by the compiler.

Reviewed changes

Copilot reviewed 9 out of 9 changed files in this pull request and generated 4 comments.

Show a summary per file
File Description
dfir_rs/tests/compile-fail/stable/surface_demuxenum_wrongfields_2.stderr Snapshot update to reflect additional Push implementors in error output.
dfir_rs/tests/compile-fail/stable/surface_demuxenum_wrongfields_1.stderr Snapshot update to reflect additional Push implementors in error output.
dfir_rs/tests/compile-fail/stable/surface_demuxenum_wrongenum.stderr Snapshot update to reflect additional Push implementors in error output.
dfir_pipes/src/push/accumulate.rs Introduces the Accumulate combinator and AccumState trait; includes unit tests.
dfir_pipes/src/push/accum_state.rs Adds concrete accumulator states for fold/reduce/sort behaviors.
dfir_pipes/src/push/fold_keyed.rs Adds keyed fold accumulation into a HashMap and emit-on-finalize behavior.
dfir_pipes/src/push/reduce_keyed.rs Adds keyed reduce accumulation into a HashMap and emit-on-finalize behavior.
dfir_pipes/src/push/sort.rs Adds a Sort push combinator emitting sorted items on finalize.
dfir_pipes/src/push/mod.rs Wires new modules/exports and adds constructors (fold, reduce, reduce_ref, accumulate, sort).

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +137 to +146
if let AccumPhase::Draining(iter) = this.phase {
loop {
ready!(this.next.as_mut().poll_ready(ctx));
let Some(item) = iter.next() else {
break;
};
this.next.as_mut().start_send(item, ());
}
*this.phase = AccumPhase::Done;
}
Comment on lines +357 to +359
/// Creates a reduce push (owned mode) that reduces all items into a single value,
/// then emits it downstream on finalize. If no items were received, nothing is emitted.
pub const fn reduce<ReduceFn, Next, Item>(
Comment on lines +13 to +15
/// Push combinator that folds items by key into a hashmap, then emits all
/// (key, accumulator) pairs downstream on flush.
///
Comment on lines +13 to +15
/// Push combinator that reduces items by key into a hashmap, then emits all
/// (key, value) pairs downstream on flush. The first value for each key
/// becomes the initial accumulator.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants