Skip to content

[python] Implement partial-update merge engine in pypaimon#7745

Open
TheR1sing3un wants to merge 4 commits intoapache:masterfrom
TheR1sing3un:py-merge-engine-partial-update
Open

[python] Implement partial-update merge engine in pypaimon#7745
TheR1sing3un wants to merge 4 commits intoapache:masterfrom
TheR1sing3un:py-merge-engine-partial-update

Conversation

@TheR1sing3un
Copy link
Copy Markdown
Member

Purpose

pypaimon exposes MergeEngine.PARTIAL_UPDATE in core_options.py and accepts merge-engine: partial-update as a table option, but the read path never reads that option — pypaimon/read/reader/sort_merge_reader.py hardcodes DeduplicateMergeFunction(). So a user who:

  1. Creates a PK table with merge-engine: partial-update,
  2. Writes one batch with [{id: 1, a: 'A', b: null}], then another with [{id: 1, a: null, b: 'B'}],
  3. Reads back,

gets [{id: 1, a: null, b: 'B'}] (silently deduplicated to the latest row) instead of the expected [{id: 1, a: 'A', b: 'B'}] (per-field merge of non-null values). No error, no warning, just wrong data. The same is true today for merge-engine: aggregation and merge-engine: first-row — both are silently degraded to dedupe.

This PR ports the core PartialUpdateMergeFunction semantics from Java (paimon-core/.../mergetree/compact/PartialUpdateMergeFunction.java) and wires the Python read path to dispatch on merge-engine. End state:

  • merge-engine: partial-update works correctly for the common case (no DELETE rows, no sequence-group config, no per-field aggregator overrides).
  • merge-engine: deduplicate is unchanged.
  • merge-engine: aggregation / first-row raise an explicit NotImplementedError instead of silently behaving as dedupe — fail loud now that we have an obvious place for users to escalate from.

Changes

New: pypaimon/read/reader/partial_update_merge_function.py

class PartialUpdateMergeFunction:
    def __init__(self, key_arity, value_arity): ...
    def reset(self): ...
    def add(self, kv):
        if not RowKind.is_add_byte(kv.value_row_kind_byte):
            raise NotImplementedError(...)  # DELETE / UPDATE_BEFORE
        # accumulator[i] = kv.value.get_field(i) if non-null
    def get_result(self) -> Optional[KeyValue]:
        # Build a fresh KeyValue from a fresh tuple — never aliased to
        # upstream's reused input kv.

The reset / add / get_result protocol matches DeduplicateMergeFunction exactly so SortMergeReader doesn't change.

Modified: pypaimon/read/reader/sort_merge_reader.py

SortMergeReaderWithMinHeap.__init__ gains an optional merge_function kwarg (defaults to DeduplicateMergeFunction() so any direct callers are unchanged).

Modified: pypaimon/read/split_read.py

MergeFileSplitRead.section_reader_supplier now picks the merge function based on self.table.options.merge_engine():

DEDUPLICATE     -> DeduplicateMergeFunction()
PARTIAL_UPDATE  -> PartialUpdateMergeFunction(key_arity, value_arity)
AGGREGATE / FIRST_ROW -> NotImplementedError(...)

Out of scope (deliberate, called out in code comments)

  • Per-field aggregator overrides (fields.<name>.aggregate-function=...) — needs a FieldAggregator framework.
  • Sequence-group support (fields.<name>.sequence-group=...) — needs a UserDefinedSeqComparator-equivalent.
  • ignore-delete, partial-update.remove-record-on-delete, partial-update.remove-record-on-sequence-group — depend on the above.
  • AGGREGATE / FIRST_ROW merge engines — separate PRs each. This PR just upgrades them from "silent dedupe" to "explicit error".
  • DELETE / UPDATE_BEFORE row kinds — raise NotImplementedError so we never silently corrupt data with a half-implemented contract.

Linked issue

N/A — surfaced when verifying that merge-engine: partial-update actually does what the option name implies in pypaimon.

Tests

  • pypaimon/tests/test_partial_update_merge_function.py — 11 unit cases driving the merge function with synthetic KeyValue instances. Covers: single insert, two-way overlapping merge (overwrite vs fill-null), three-way merge composition, later-null-does-not-clobber, reset between keys, get_result before any add, UPDATE_AFTER acceptance, DELETE / UPDATE_BEFORE refusal, and result decoupling from input kv (proves the result is built into a fresh tuple so upstream's reused-KeyValue pattern doesn't corrupt us).
  • pypaimon/tests/test_partial_update_e2e.py — 8 end-to-end cases on real PK tables. Covers: two-write merge (A,_ + _,BA,B), three-write left-to-right composition, disjoint keys unaffected, later-non-null wins over earlier non-null, later-null preserves earlier value, deduplicate-engine-unchanged (regression), aggregation / first-row raise NotImplementedError.

Master-vs-fix verification — checked out origin/master's sort_merge_reader.py / split_read.py over the new tests:

$ git show origin/master:.../sort_merge_reader.py > .../sort_merge_reader.py
$ git show origin/master:.../split_read.py > .../split_read.py
$ pytest pypaimon/tests/test_partial_update_e2e.py
... 6 failed, 2 passed
   FAILED test_partial_update_two_writes_merges_non_null
   FAILED test_partial_update_three_writes_merges_left_to_right
   FAILED test_partial_update_later_value_wins_over_earlier_non_null
   FAILED test_partial_update_later_null_does_not_clobber_earlier_value
   FAILED test_aggregation_engine_raises_not_implemented
   FAILED test_first_row_engine_raises_not_implemented
$ git checkout HEAD -- .../sort_merge_reader.py .../split_read.py
$ pytest pypaimon/tests/test_partial_update_e2e.py
... 8 passed

Regression: pytest pypaimon/tests/{reader_primary_key_test,reader_split_generator_test,reader_append_only_test,test_partial_update_*}.py → 50 passed, 7 failed. The 7 failures are all pre-existing lance / vortex environment issues unrelated to this PR; the dedupe path is unchanged.

flake8 --config=dev/cfg.ini clean.

API and format

Public API additions:

  • pypaimon.read.reader.partial_update_merge_function.PartialUpdateMergeFunction — new class.
  • SortMergeReaderWithMinHeap.__init__ gets an optional merge_function kwarg (back-compat default).

No public API removals or signature breaks. No file format change.

Behaviour change: tables with merge-engine: partial-update now produce per-field merged results (was: silently deduplicated). Tables with merge-engine: aggregation or first-row now raise NotImplementedError (was: silently deduplicated). Both are correctness fixes — the previous behaviour was producing wrong data with no signal to the user.

Documentation

The new module carries a docstring covering the algorithm, the out-of-scope list, and the link back to the Java reference. The dispatch in MergeFileSplitRead._build_merge_function carries an inline comment explaining why we now raise on AGGREGATE / FIRST_ROW.

Generative AI disclosure

Drafted with assistance from an AI coding tool; the algorithm follows org.apache.paimon.mergetree.compact.PartialUpdateMergeFunction and the soundness contract is exercised end-to-end by the regression tests above (which fail on master and pass post-fix).

``MergeEngine.PARTIAL_UPDATE`` is exposed in ``core_options.py`` and
accepts ``merge-engine: partial-update`` as a table option, but the
read path never reads that option — ``sort_merge_reader.py`` hardcodes
``DeduplicateMergeFunction()``. So a user who creates a PK table with
``merge-engine: partial-update`` and writes overlapping rows whose
non-null columns differ gets silently deduplicated results instead of
the expected per-field merge: their data is wrong, with no error or
warning. The same is true for ``aggregation`` and ``first-row`` —
both are silently degraded to dedupe today.

This change ports the core ``PartialUpdateMergeFunction`` semantics
from Java
(paimon-core/.../mergetree/compact/PartialUpdateMergeFunction.java) and
wires the Python read path to dispatch on ``merge-engine``:

  * New ``pypaimon/read/reader/partial_update_merge_function.py``: on
    each ``add(kv)`` copy non-null fields of ``kv.value`` into an
    accumulator; ``get_result()`` returns a fresh KeyValue with the
    merged row. Result is built into a brand-new tuple so the merge
    output is decoupled from upstream's reused KeyValue instances.
  * ``SortMergeReaderWithMinHeap.__init__`` gains an optional
    ``merge_function`` kwarg; default still ``DeduplicateMergeFunction()``
    so any direct callers (none in-tree) are unchanged.
  * ``MergeFileSplitRead.section_reader_supplier`` selects the merge
    function based on ``self.table.options.merge_engine()``:
      DEDUPLICATE     -> DeduplicateMergeFunction (unchanged)
      PARTIAL_UPDATE  -> PartialUpdateMergeFunction
      AGGREGATE / FIRST_ROW -> NotImplementedError (was silent dedupe)

Out of scope, intentionally:
  * Per-field aggregator overrides (``fields.<name>.aggregate-function``)
  * Sequence-group support (``fields.<name>.sequence-group``)
  * ``ignore-delete`` / ``partial-update.remove-record-on-*`` options
  * AGGREGATE / FIRST_ROW merge engine implementations
DELETE / UPDATE_BEFORE rows raise ``NotImplementedError`` at ``add()``
time so we can't silently corrupt data with a half-implemented contract.

Tests:
  * ``test_partial_update_merge_function.py`` — 11 unit cases covering
    single insert, two-way overlapping merges, three-way merges, later-
    null-does-not-clobber, reset between keys, get_result-before-any-
    add, UPDATE_AFTER acceptance, DELETE / UPDATE_BEFORE refusal, and
    result decoupling from input kv (proves we're not aliasing
    upstream's reused KeyValue).
  * ``test_partial_update_e2e.py`` — 8 cases: two-write merge, three-
    write merge, disjoint keys unaffected, later-non-null wins, later-
    null preserves earlier value, deduplicate engine unchanged
    (regression), and aggregation / first-row raise NotImplementedError.

Verified by checking out ``origin/master``'s ``sort_merge_reader.py`` /
``split_read.py`` and rerunning ``test_partial_update_e2e.py``: master
fails the 4 partial-update merge cases (silent dedupe) and the 2
aggregation / first-row "raises" cases (silent dedupe instead of
raising); fix passes all 8.
Comment thread paimon-python/pypaimon/read/split_read.py
…f-scope options

Address review on r3168491328: previously `_build_merge_function()`
dispatched on `merge-engine: partial-update` alone, so a table that ALSO
configured sequence-group / per-field aggregator / ignore-delete /
partial-update.remove-record-on-* would fall into the simple
PartialUpdateMergeFunction and silently drop those semantics --
exactly the same silent-corruption pattern this PR exists to close,
just reshaped from "silent dedupe" to "silent half-partial-update".

Now the PARTIAL_UPDATE branch first scans the table options for any
of the unsupported keys:
 * fields.<name>.sequence-group
 * fields.<name>.aggregate-function
 * fields.default-aggregate-function
 * ignore-delete (and the partial-update./first-row./deduplicate.
   prefixed aliases) when truthy
 * partial-update.remove-record-on-delete when truthy
 * partial-update.remove-record-on-sequence-group when truthy

If any are set, raise NotImplementedError naming every offending key
so the user can either drop them or escalate. Same shape as the
existing AGGREGATE / FIRST_ROW raise.

Tests: 7 new e2e cases in test_partial_update_e2e.py, one per option
plus a regression case asserting `ignore-delete: false` (explicitly
disabled) still passes through to the merge function.
…eNonNullFields

Java PartialUpdateMergeFunction.updateNonNullFields (line 177-188) raises
IllegalArgumentException when an input field is null and the schema marks
that field NOT NULL. The Python port previously absorbed such inputs
silently, letting writes whose first value was null on a NOT NULL field
land null in the accumulator.

Changes:

 * PartialUpdateMergeFunction.__init__ takes an optional `nullables`
   list parallel to value indices. When given, every add() checks each
   null input against `nullables[i]` and raises ValueError on a NOT NULL
   field, matching Java semantics on every row (not just the first).
   When omitted, behaviour is unchanged (back-compat for direct callers).
 * MergeFileSplitRead snapshots the raw value-side schema as
   `value_fields` before _create_key_value_fields wraps it, then hands
   `[f.type.nullable for f in self.value_fields]` to the merge function.
 * Five new unit cases in test_partial_update_merge_function.py: first
   row null on NOT NULL raises, subsequent row null on NOT NULL raises,
   null on nullable field is absorbed, length-mismatch nullables raises,
   omitting nullables preserves the previous lenient behaviour.

Result: with the existing guard in _build_merge_function (which refuses
out-of-scope options) and the NOT NULL enforcement here, the simple
last-non-null path is now feature-equivalent to Java's
updateNonNullFields + getResult on the supported subset.
@XiaoHongbo-Hope
Copy link
Copy Markdown
Contributor

Could you add coverage for partial-update rows that land in the same data file, e.g. two write_arrow()
calls before a single prepare_commit()? I think this currently still bypasses the new merge-engine dispatch

@XiaoHongbo-Hope
Copy link
Copy Markdown
Contributor

Nit: thanks for adding the coverage. I wonder if you can make it a bit more focused though — some unit and
e2e cases seem to overlap on the same last-non-null semantics, and the unsupported-option cases could probably be table-driven.

@TheR1sing3un
Copy link
Copy Markdown
Member Author

Could you add coverage for partial-update rows that land in the same data file, e.g. two write_arrow() calls before a single prepare_commit()? I think this currently still bypasses the new merge-engine dispatch

Thank you for your suggestion. I will pin you again after it's done

…tedFailure

Reviewer asked to cover rows that land in the same data file --
multiple write_arrow() calls before a single prepare_commit().
Adding the cases revealed the writer-side / read-side gap upstream
of this PR: KeyValueDataWriter._merge_data only does concat+sort
(no merge function applied), so the flushed file holds duplicate
primary keys; on read, _build_split_from_pack treats any single-file
group as raw_convertible and routes through the fast path, skipping
SortMergeReader and the merge-engine dispatch this PR adds.

Fixing it requires either a merge buffer in KeyValueDataWriter
(mirroring Java SortBufferWriteBuffer / MergeTreeWriter) or a
tighter raw_convertible check that proves intra-file PK uniqueness
-- both are write-path / scan-path restructuring outside this
read-side merge-engine port. The two new cases are kept as
unittest.expectedFailure so the gap stays visible and converts to
passing regressions when the writer-side fix lands.
@TheR1sing3un
Copy link
Copy Markdown
Member Author

Nit: thanks for adding the coverage. I wonder if you can make it a bit more focused though — some unit and e2e cases seem to overlap on the same last-non-null semantics, and the unsupported-option cases could probably be table-driven.

Verified -- and the gap is independent of this PR.

I added two coverage cases per your suggestion (commit 0d58859):
test_partial_update_two_write_arrows_single_commit /
test_partial_update_three_write_arrows_single_commit. Both fail, so
they're marked unittest.expectedFailure for now.

To rule out anything this PR introduced, I ran the same workload on
plain origin/master (b4e54ad) using the default deduplicate
merge engine -- two write_arrow calls + one prepare_commit with
the same PK in both batches:

  w.write_arrow(pa.Table.from_pylist([{'id': 1, 'a': 'first',  'b': 'old'}], ...))
  w.write_arrow(pa.Table.from_pylist([{'id': 1, 'a': 'second', 'b': 'new'}], ...))
  c.commit(w.prepare_commit())
  # Expected: 1 row -- {'id': 1, 'a': 'second', 'b': 'new'} (dedupe to latest)
  # Actual on master: 2 rows -- {'id': 1, 'a': 'first', ...}, {'id': 1, 'a': 'second', ...}

The flushed data file contains 2 rows for the same PK; on read,
_build_split_from_pack (split_generator.py:99-100) marks any
single-file PK split as raw_convertible=True, which routes through
the raw-convertible fast path and skips SortMergeReader. Even on
master the dedupe path silently returns both rows -- so what this PR
exposes is a pre-existing user-facing data-quality issue that affects
the default PK-table configuration. It's not a regression introduced
by the partial-update dispatch.

Root cause split across the write and read paths:

  • KeyValueDataWriter._merge_data only does concat + sort -- no
    merge function is applied at flush time, so the file holds duplicate
    primary keys (violates the LSM "PK unique within a file" invariant
    that Java enforces in SortBufferWriteBuffer.forEach /
    MergeTreeWriter.flushWriteBuffer).
  • The read-side raw_convertible shortcut assumes that invariant
    holds, so it skips SortMergeReader -- where the merge-engine
    dispatch (this PR) and the existing dedupe both live.

Fixing it requires either (a) an in-memory merge buffer in
KeyValueDataWriter mirroring Java's SortBufferWriteBuffer (which
applies the table's MergeFunction during flush -- the proper fix,
also benefits dedupe/aggregation/first-row), or (b) a stricter
raw_convertible check that proves intra-file PK uniqueness. Both
are write-path / scan-path restructuring, well outside the read-side
scope of this PR. I'll open a separate PR for it -- the dedupe data-
quality issue is severe enough on its own to warrant a dedicated fix
rather than buried in this merge-engine port.

The expectedFailure cases here will turn into passing regressions
once that follow-up lands.

@TheR1sing3un
Copy link
Copy Markdown
Member Author

@XiaoHongbo-Hope Hi, thank you for your point. However, this issue is not strictly related to this pr. It is a general problem. I would rather propose a separate pr to solve it. These two PRS are independent and can be advanced in parallel

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants