[python] Fix limit push-down discarding non-raw_convertible splits#7742
[python] Fix limit push-down discarding non-raw_convertible splits#7742TheR1sing3un wants to merge 8 commits intoapache:masterfrom
Conversation
|
Could you please double-check the PR description and the change? I ran both newly added tests on master, |
9b4c40a to
3b7c748
Compare
|
@XiaoHongbo-Hope you're right — the two earlier tests didn't actually distinguish the buggy and fixed implementations. Both used inputs (same-key-twice on a single bucket) where every split ended up non-raw_convertible, which means the pre-fix loop body never ran and the fallback I've replaced them with a single, deterministic reproducer that does exercise the bug:
End-to-end check: Force-pushed 3b7c7484b with the new reproducer and an updated commit message / PR description that walks through why the bug requires |
…row_count for budget
Two divergences from Java's DataTableBatchScan.applyPushDownLimit():
1) Non-raw_convertible splits were skipped entirely by the loop body
— they never entered ``limited_splits``. As a consequence, when a
non-raw split appeared BEFORE a raw split that meets the limit, the
early-return omitted the non-raw split from the plan altogether.
Java unconditionally adds every visited split.
2) The accumulator used ``split.row_count`` (file-level pre-DV upper
bound) where Java uses ``split.partialMergedRowCount()`` — file
row_count *minus* any deletion-vector cardinality already recorded
in the manifest. Python has the same value via
``DataSplit.merged_row_count()``, but ``_apply_push_down_limit``
wasn't using it, so on DV-aware raw splits the accumulator
over-counted and the early-return fired before the reader could
actually produce ``limit`` rows.
The two divergences interact. With ``[non-raw, raw]`` and a tight
limit, (1) silently drops the non-raw partition's data. With
``[raw_with_DV, non-raw, ...]`` and a limit between the post-DV and
pre-DV row counts, (2) makes the loop early-return on the DV split
alone, leaving the reader with fewer rows than it could otherwise
produce by also draining the trailing non-raw splits.
Fix:
for split in splits:
limited_splits.append(split) # add unconditionally
if split.raw_convertible:
merged = split.merged_row_count()
scanned_row_count += merged if merged is not None else split.row_count
if scanned_row_count >= self.limit:
return limited_splits
return splits
The ``merged is not None`` fallback to ``split.row_count`` keeps the
previous behaviour for layouts where the merged count cannot be
derived from the manifest (older snapshots, some data-evolution
shapes); using the pre-DV upper bound there is still strictly better
than the alternative of skipping that split's contribution to the
budget.
Tests:
test_limit_drops_non_raw_split_after_raw_budget_is_met (new):
deterministic ``[non-raw (p1), raw (p2)]`` plan. Pre-fix (master)
fails with ``1 != 2``: ``limited_splits=[raw]``, p1's data is
silently dropped. Post-fix returns both splits.
ApplyPushDownLimitUnitTest (new): synthetic-split unit tests for the
accumulator, since pypaimon's writer doesn't compact L0 → L1+ and
the DV-enabled PK read path skips L0, so a true DV-aware
raw_convertible split is hard to produce from a pure-Python end-
to-end fixture. Cases:
* test_dv_aware_accumulator_uses_merged_row_count —
``[raw(row_count=10, merged=4), non-raw, non-raw]`` + limit=5.
Pre-fix: early-returns after the raw split → 1 split.
Post-fix: 4 < 5 keeps walking → 3 splits.
* test_accumulator_falls_back_to_row_count_when_merged_unavailable
— guards the ``merged is None`` fallback path.
* test_no_raw_splits_falls_through_to_full_list — all-non-raw
falls through to the loop's terminal ``return splits``.
* test_empty_splits_returns_empty / test_no_limit_returns_input_unchanged
— boundary conditions.
3b7c748 to
a57200e
Compare
|
Update — there was a second divergence from Java's implementation that the previous version of this PR didn't address. Java's
Force-pushed a57200ebb:
PTAL. |
| if split.raw_convertible: | ||
| limited_splits.append(split) | ||
| scanned_row_count += split.row_count | ||
| merged = split.merged_row_count() |
There was a problem hiding this comment.
Could you double-check this against current Java semantics? Or the PR description is not latest? It says this mirrors Java
There was a problem hiding this comment.
Could you double-check this against current Java semantics? Or the PR description is not latest? It says this mirrors Java
Hi, How about now? Thanks for your reminder!
Review feedback (XiaoHongbo-Hope on PR apache#7742): the previous fix accumulated ``merged_row_count() if not None else split.row_count`` under a ``raw_convertible`` gate while unconditionally adding every split to ``limited_splits``. That was a Python-only behavior and diverged from Java's ``DataTableBatchScan.applyPushDownLimit`` despite the PR description claiming "mirrors Java line-for-line". Three concrete divergences: 1. Gate: Java uses ``mergedRowCount.isPresent()``, we used ``raw_convertible``. 2. Append timing: Java only adds splits whose merged count is known; we added every split regardless. 3. Fallback: Java has none; we fell back to ``split.row_count`` when ``merged_row_count()`` returned None. The single behavioral fix this PR needs to deliver is the accumulator source — replacing ``split.row_count`` (DV-blind, over-counts when DV is on) with ``merged_row_count()`` (DV-aware). Java already does exactly this. Drop the extra divergences so the loop reads as a direct port of Java: for split in splits: merged = split.merged_row_count() if merged is not None: limited_splits.append(split) scanned_row_count += merged if scanned_row_count >= self.limit: return limited_splits return splits Test adjustments: - Removed the integration test ``test_limit_drops_non_raw_split_after_ raw_budget_is_met``. Its expectation ("non-raw split survives the limit pushdown") was based on the now-reverted unconditional-append behavior. Java drops non-raw splits after the budget is met — matching this is now correct, so the test is no longer a regression reproducer. - Renamed ``test_accumulator_falls_back_to_row_count_when_merged_ unavailable`` to ``test_accumulator_skips_splits_with_unknown_ merged_count`` and rewrote the docstring to describe the actual Java-aligned behavior (skip + fall through). - Kept ``test_dv_aware_accumulator_uses_merged_row_count`` as the master-vs-fix reproducer: master accumulates row_count=10 ≥ limit=5 and early-returns ``[raw]`` (1 split); fix accumulates merged=4 < 5, skips the two non-raw splits, falls through to ``return splits`` with all 3. Verified by swapping the file body to master's version — this test fails (1 != 3) on master and passes after the fix. Lint: flake8 clean. Tests: 10/10 in reader_split_generator_test.py.
…limit
Line-by-line audit against Java
``DataTableBatchScan.applyPushDownLimit`` (paimon-core/.../source/
DataTableBatchScan.java:128-165) caught one missing branch:
Java L129: if (pushDownLimit == null || hasNonPartitionFilter())
return Optional.empty();
Java skips limit pushdown entirely when the predicate references any
non-partition column, because per-split row counts (the accumulator
input below) are pre-filter and would over-count against the actual
filtered output — pushing the early-return budget too low and giving
the reader fewer rows than the user asked for.
Add the equivalent short-circuit to Python: a new private helper
``_has_non_partition_filter()`` mirrors Java's
``SnapshotReaderImpl.hasNonPartitionFilter`` (lines 235-248) using
the existing ``_get_all_fields`` predicate-leaf walker. When the
predicate references any column outside ``partition_keys`` the
limit-pushdown loop is skipped and the splits are returned untouched.
Tests:
- New ``test_non_partition_filter_short_circuits_pushdown`` in
ApplyPushDownLimitUnitTest covers the new branch.
- Existing 4 unit tests carry through unchanged (the new short-circuit
doesn't trip when ``has_non_partition_filter=False``).
Inline comments now annotate every Java line we mirror (L129, L138,
L146, L147-163, L164) so a reviewer can verify the port at a glance.
|
Thanks for the update - the latest code looks good to me. Two small follow-ups, just suggestions:
|
You're right. My opus is a bit wordy, lol. Let me update it |
Address review on PR apache#7742: shrink the multi-paragraph rationale on ``_apply_push_down_limit`` / ``_has_non_partition_filter`` to a single line each pointing at the Java counterpart. The full reasoning lives in the PR description; the file just needs to say what it mirrors.
|
ready for the final review! @XiaoHongbo-Hope |
… description drift Address review comment r3173561771: tighten the new unit-test docstrings and correct the parts that no longer match the implementation. * Class-level rationale dropped — the cases speak for themselves. * test_dv_aware_accumulator_uses_merged_row_count: previous wording said the post-fix loop "adds the two non-raw splits without changing the accumulator". That's wrong: ``merged is None`` splits are NOT appended to ``limited_splits``; the three-split result comes from the fall-through ``return splits`` after the loop completes. Updated to say so. * Other docstrings shrunk to one or two lines each. * _apply / _split helpers: dropped the inline narration on the fake scanner / fake split — they're trivially obvious from the bodies.
|
Thanks for the updates — this looks much cleaner now. One tiny wording nit: the current Java code seems |
Reviewer pointed out the docstring named the Java method ``partialMergedRowCount`` while the actual API is ``Split.mergedRowCount()`` (DataTableBatchScan.applyPushDownLimit calls ``split.mergedRowCount()``). Pick the real name so future readers cross-referencing Java don't get tripped up.
done~ |
|
+1 |
How
_apply_push_down_limitnow mirrors Java: short-circuit when thepredicate references any non-partition column, otherwise accumulate
split.merged_row_count()and stop at the limit. Splits with unknownmerged count fall through to the reader unchanged.
Tests
ApplyPushDownLimitUnitTestdrives_apply_push_down_limitwithsynthetic splits.
Compatibility
No public API change. No file format change.