Skip to content

fix(topk): call attempt_early_completion when filter rejects entire batch#22852

Merged
kosiew merged 3 commits into
apache:mainfrom
ajegou:arnaud.jegou/topk-early-exit-filter-rejection
Jun 15, 2026
Merged

fix(topk): call attempt_early_completion when filter rejects entire batch#22852
kosiew merged 3 commits into
apache:mainfrom
ajegou:arnaud.jegou/topk-early-exit-filter-rejection

Conversation

@ajegou

@ajegou ajegou commented Jun 9, 2026

Copy link
Copy Markdown
Contributor

Which issue does this PR close?

Rationale for this change

TopK::insert_batch short-circuits when the heap's dynamic filter rejects every row in a batch:

if !filter.has_true() {
    // nothing to filter, so no need to update
    return Ok(());
}

The early-exit check attempt_early_completion(&batch) lives later in the same function, gated on replacements > 0. So a batch that the filter rejects entirely bypasses the check.

The heap's dynamic filter is derived from the heap's worst row (via update_filter). A batch whose rows all come from a strictly worse sort prefix is exactly the batch the filter rejects entirely — i.e. the very signal attempt_early_completion is designed to detect ("the next batch is past the heap's boundary, we can stop") is what causes the function to short-circuit before the check runs.

This is a feature-interaction regression between two PRs that were both correct in isolation. The attempt_early_completion mechanism was added by #15563 (closing #15529). At the time, there was no heap-derived dynamic filter on TopK, so the only sensible call site was right after a successful heap insertion. Two months later, #15770 added the dynamic-filter pushdown for TopK sorts, introducing the !filter.has_true() short-circuit. The two features address different problems and the new short-circuit didn't connect to the existing prefix-completion check — which is how this gap opened up.

Consequence: on a TopK over an input ordered on the sort prefix, finished = true is never set once the heap stabilizes. Since finished is the signal SortExec uses to stop pulling from its input (via Poll::Ready(None) from the TopK stream, which cascades into dropping the source stream), the source keeps being polled long past the point where no further row can improve the heap. The LIMIT optimization effectively degrades to "heap saves memory but reads everything"; sources with cancellable streams (e.g. networked sources) never receive the cancellation signal.

What changes are included in this PR?

Single behavioral change in datafusion/physical-plan/src/topk/mod.rs: call attempt_early_completion(&batch) immediately before the return Ok(()) in the !filter.has_true() branch.

Why this scope, not a broader restructuring:

  • The existing attempt_early_completion call inside if replacements > 0 is load-bearing for a related case: a batch containing a mix of "still valuable" rows and "past the boundary" rows. The existing test_try_finish_marks_finished_with_prefix test covers this case — Batch 2 with a=[2,3], b=[10,20] against a heap where heap.max.a = 2; the (2, 10) row must be inserted before the check on the (3, 20) last row triggers. Moving the call earlier would skip the insertion of valuable rows and break that test.
  • The bug is specifically that the short-circuit path doesn't call the check. The fix targets exactly that path.
  • A related but separate gap is not addressed here: when filter.has_true() == true but replacements == 0 (the filter accepts some rows but find_new_topk_items ends up inserting none of them), the existing call inside if replacements > 0 is also skipped. This requires a divergence between the heap's filter predicate and the row-byte comparison used inside find_new_topk_items, which shouldn't normally happen (the filter is derived from the heap's worst row using the same comparator). A deterministic synthetic repro would likely require concurrent heap updates from sibling partitions or boundary-value edge cases (NaN/NULL semantics, type coercion). Happy to send a follow-up if reviewers want it covered; the workload that motivated this fix was the filter-rejection case empirically.

Are these changes tested?

Yes. Added a regression test test_try_finish_fires_when_filter_rejects_entire_batch. The assertion target is topk.finished — the flag that signals "stop pulling from the source" to upstream consumers (read by TopKExec::poll_next to emit Poll::Ready(None)). Asserting that the flag transitions on the fully-filter-rejected batch is equivalent to asserting that the source-stopping mechanism activates.

  • Builds a TopK over a (a, b) sort with prefix a, k=3.
  • Inserts a batch that fills the heap with rows from a ∈ {1, 2}; update_filter tightens the filter to a < 2 OR (a = 2 AND b < 30).
  • Inserts a second batch with all rows at a = 3 — filter rejects every row.
  • Without the fix: insert_batch short-circuits, topk.finished stays false. Test fails.
  • With the fix: attempt_early_completion fires (last-row prefix a = 3 > heap.max prefix a = 2), topk.finished becomes true. Test passes.

The test also asserts the emitted top-K is unchanged from after batch 1, confirming no candidate row was incorrectly excluded by the early bail.

All 28 existing topk:: tests continue to pass (including test_try_finish_marks_finished_with_prefix, which exercises the mixed-prefix case).

Are there any user-facing changes?

No public API or output changes. The fix only changes when TopK marks itself finished = true — specifically, it now fires attempt_early_completion for batches that are entirely rejected by the heap's dynamic filter, where previously it would silently skip the check. Output of TopK is unchanged; only the early-exit behavior improves.

…atch

`TopK::insert_batch` short-circuits when the heap's dynamic filter
rejects every row in a batch:

    if !filter.has_true() {
        // nothing to filter, so no need to update
        return Ok(());
    }

The early-exit check `attempt_early_completion(&batch)` lives later in
the same function, gated on `replacements > 0`. So a batch that the
filter rejects entirely bypasses the check.

The heap's dynamic filter is derived from the heap's worst row (via
`update_filter`). A batch whose rows all come from a strictly worse
sort prefix is exactly the batch the filter rejects entirely -- i.e.
the very signal `attempt_early_completion` is designed to detect ("the
next batch is past the heap's boundary, we can stop") is what causes
the function to short-circuit *before* the check runs.

This is a feature-interaction regression between two PRs that were
both correct in isolation. The `attempt_early_completion` mechanism
was added by apache#15563 (closing apache#15529). At the time, there was no
heap-derived dynamic filter on TopK, so the only sensible call site
was right after a successful heap insertion. Two months later, apache#15770
added the dynamic-filter pushdown for TopK sorts, introducing the
`!filter.has_true()` short-circuit. The two features address different
problems and the new short-circuit didn't connect to the existing
prefix-completion check -- which is how this gap opened up.

Consequence: on a TopK over an input ordered on the sort prefix,
`finished = true` is never set once the heap stabilizes. Since
`finished` is the signal `SortExec` uses to stop pulling from its
input, the source keeps being polled long past the point where no
further row can improve the heap.

Fix: call `attempt_early_completion(&batch)` immediately before the
`return Ok(())` in the `!filter.has_true()` branch.

Adds a regression test `test_try_finish_fires_when_filter_rejects_entire_batch`
that fails on unpatched code and passes with the fix.

Closes apache#22849.

@kosiew kosiew left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ajegou
Thanks for the update. This looks good to me overall. I left a couple of small suggestions that might make the new TopK prefix handling a bit easier to read and maintain.

@@ -255,7 +255,13 @@ impl TopK {
let array = filtered.into_array(num_rows)?;
let mut filter = array.as_boolean().clone();
if !filter.has_true() {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment is accurate, but it feels a bit longer than the control flow it explains. I think we could keep just the key invariant here: the heap is unchanged, but the rejected batch can still prove prefix completion.

For example:

// The heap is unchanged, but a fully rejected batch can still prove that
// the shared sort prefix has passed the heap boundary.
self.attempt_early_completion(&batch)?;
return Ok(());

@ajegou ajegou Jun 10, 2026

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, I updated it, thanks

/// filter rejects entirely — i.e. the very signal the early-exit was designed to
/// detect was being silently dropped.
#[tokio::test]
async fn test_try_finish_fires_when_filter_rejects_entire_batch() -> Result<()> {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice regression coverage. One small maintainability thought: this repeats a lot of the setup from test_try_finish_marks_finished_with_prefix. A small helper for the (a, b) schema, sort expressions, and TopK construction could make future TopK prefix tests shorter and keep the main scenario easier to spot.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes good idea, I just made the change

Address review feedback on apache#22852:

- Shorten the comment before `attempt_early_completion` in the
  `!filter.has_true()` branch to the key invariant ("the heap is
  unchanged, but a fully rejected batch can still prove that the shared
  sort prefix has passed the heap boundary").

- Extract `build_ab_prefix_topk()` covering the (a, b) schema, sort
  expressions, and `TopK` construction shared by
  `test_try_finish_marks_finished_with_prefix` and the new regression
  test, so each test opens directly on its per-scenario logic.

No behavior change.
@ajegou

ajegou commented Jun 10, 2026

Copy link
Copy Markdown
Contributor Author

While preparing this PR we noticed a second, related failure mode that this change doesn't address: on multi-partition workloads, when a sibling partition has already tightened the shared dynamic filter but a given partition's local heap is still empty, attempt_early_completion can't conclude completion (it reads the local heap's max row, which is None). That cross-partition starvation case is the rest of the regression on multi-partition workloads.

Because of this, the current fix doesn't show a significant impact on the sort-tpch benchmark, although it does improve performances on single partition workloads

Addressing it cleanly requires lifting the threshold check from the local heap to the shared TopKDynamicFilters, which is a larger design change. @geoffreyclaude has a follow-up PR ready that does this and will be opened against apache shortly. We're keeping the two changes separate to keep each PR narrowly scoped — this one is correctness-safe on its own and the follow-up rests cleanly on top. I will open a separate ticket for that follow-up (edit: #22874)

Pinging @kosiew just in case this has an impact on your review / approval

@ajegou ajegou changed the title fix(topk): call attempt_early_completion when filter rejects entire b… fix(topk): call attempt_early_completion when filter rejects entire batch Jun 10, 2026
@github-actions github-actions Bot added the auto detected api change Auto detected API change label Jun 12, 2026
@github-actions github-actions Bot removed the auto detected api change Auto detected API change label Jun 12, 2026
@gabotechs

Copy link
Copy Markdown
Contributor

🤔 I don't think the failure on the CI is this PRs fault, there seem to be other PRs affected (e.g., #22926)

@kosiew kosiew added this pull request to the merge queue Jun 15, 2026
Merged via the queue into apache:main with commit 6520315 Jun 15, 2026
39 of 40 checks passed
geoffreyclaude added a commit to geoffreyclaude/datafusion that referenced this pull request Jun 17, 2026
Unlike the fully rejected batch path fixed by apache#22852, this path has already
passed the dynamic filter. `find_new_topk_items` can still produce zero heap
replacements, and that unchanged heap can still combine with the batch prefix
to prove that later rows cannot enter the TopK.
gh-worker-dd-mergequeue-cf854d Bot pushed a commit to DataDog/datafusion that referenced this pull request Jun 18, 2026
…atch (apache#22852)

## Which issue does this PR close?

- Closes apache#22849
- A related cross-partition starvation case is tracked separately in
apache#22874 and addressed by an upcoming follow-up PR — see
[discussion](apache#22852 (comment))
for details

## Rationale for this change

`TopK::insert_batch` short-circuits when the heap's dynamic filter
rejects every row in a batch:

```rust
if !filter.has_true() {
    // nothing to filter, so no need to update
    return Ok(());
}
```

The early-exit check `attempt_early_completion(&batch)` lives later in
the same function, gated on `replacements > 0`. So a batch that the
filter rejects entirely bypasses the check.

The heap's dynamic filter is derived from the heap's worst row (via
`update_filter`). A batch whose rows all come from a strictly worse sort
prefix is exactly the batch the filter rejects entirely — i.e. the very
signal `attempt_early_completion` is designed to detect ("the next batch
is past the heap's boundary, we can stop") is what causes the function
to short-circuit *before* the check runs.

This is a feature-interaction regression between two PRs that were both
correct in isolation. The `attempt_early_completion` mechanism was added
by apache#15563 (closing apache#15529). At the time, there was no heap-derived
dynamic filter on TopK, so the only sensible call site was right after a
successful heap insertion. Two months later, apache#15770 added the
dynamic-filter pushdown for TopK sorts, introducing the
`!filter.has_true()` short-circuit. The two features address different
problems and the new short-circuit didn't connect to the existing
prefix-completion check — which is how this gap opened up.

**Consequence**: on a TopK over an input ordered on the sort prefix,
`finished = true` is never set once the heap stabilizes. Since
`finished` is the signal `SortExec` uses to stop pulling from its input
(via `Poll::Ready(None)` from the TopK stream, which cascades into
dropping the source stream), the source keeps being polled long past the
point where no further row can improve the heap. The LIMIT optimization
effectively degrades to "heap saves memory but reads everything";
sources with cancellable streams (e.g. networked sources) never receive
the cancellation signal.

## What changes are included in this PR?

Single behavioral change in `datafusion/physical-plan/src/topk/mod.rs`:
call `attempt_early_completion(&batch)` immediately before the `return
Ok(())` in the `!filter.has_true()` branch.

Why this scope, not a broader restructuring:

- The existing `attempt_early_completion` call inside `if replacements >
0` is load-bearing for a related case: a batch containing a mix of
"still valuable" rows and "past the boundary" rows. The existing
`test_try_finish_marks_finished_with_prefix` test covers this case —
Batch 2 with `a=[2,3], b=[10,20]` against a heap where `heap.max.a = 2`;
the `(2, 10)` row must be inserted before the check on the `(3, 20)`
last row triggers. Moving the call earlier would skip the insertion of
valuable rows and break that test.
- The bug is specifically that the *short-circuit* path doesn't call the
check. The fix targets exactly that path.
- A related but separate gap is not addressed here: when
`filter.has_true() == true` but `replacements == 0` (the filter accepts
some rows but `find_new_topk_items` ends up inserting none of them), the
existing call inside `if replacements > 0` is also skipped. This
requires a divergence between the heap's filter predicate and the
row-byte comparison used inside `find_new_topk_items`, which shouldn't
normally happen (the filter is derived from the heap's worst row using
the same comparator). A deterministic synthetic repro would likely
require concurrent heap updates from sibling partitions or
boundary-value edge cases (NaN/NULL semantics, type coercion). Happy to
send a follow-up if reviewers want it covered; the workload that
motivated this fix was the filter-rejection case empirically.

## Are these changes tested?

Yes. Added a regression test
`test_try_finish_fires_when_filter_rejects_entire_batch`. The assertion
target is `topk.finished` — the flag that signals "stop pulling from the
source" to upstream consumers (read by `TopKExec::poll_next` to emit
`Poll::Ready(None)`). Asserting that the flag transitions on the
fully-filter-rejected batch is equivalent to asserting that the
source-stopping mechanism activates.

- Builds a TopK over a `(a, b)` sort with prefix `a`, k=3.
- Inserts a batch that fills the heap with rows from `a ∈ {1, 2}`;
`update_filter` tightens the filter to `a < 2 OR (a = 2 AND b < 30)`.
- Inserts a second batch with all rows at `a = 3` — filter rejects every
row.
- Without the fix: `insert_batch` short-circuits, `topk.finished` stays
`false`. Test fails.
- With the fix: `attempt_early_completion` fires (last-row prefix `a =
3` > heap.max prefix `a = 2`), `topk.finished` becomes `true`. Test
passes.

The test also asserts the emitted top-K is unchanged from after batch 1,
confirming no candidate row was incorrectly excluded by the early bail.

All 28 existing `topk::` tests continue to pass (including
`test_try_finish_marks_finished_with_prefix`, which exercises the
mixed-prefix case).

## Are there any user-facing changes?

No public API or output changes. The fix only changes when TopK marks
itself `finished = true` — specifically, it now fires
`attempt_early_completion` for batches that are entirely rejected by the
heap's dynamic filter, where previously it would silently skip the
check. Output of TopK is unchanged; only the early-exit behavior
improves.

---------

Co-authored-by: Gabriel <45515538+gabotechs@users.noreply.github.com>
(cherry picked from commit 6520315)
gh-worker-dd-mergequeue-cf854d Bot added a commit to DataDog/datafusion that referenced this pull request Jun 18, 2026
…r-22852-branch54-20260617

[branch-54] Cherry-pick apache#22852

Co-authored-by: ajegou <arnaud.jegou@gmail.com>
Co-authored-by: arnaud.jegou <arnaud.jegou@datadoghq.com>
geoffreyclaude added a commit to geoffreyclaude/datafusion that referenced this pull request Jun 19, 2026
Unlike the fully rejected batch path fixed by apache#22852, this path has already
passed the dynamic filter. `find_new_topk_items` can still produce zero heap
replacements, and that unchanged heap can still combine with the batch prefix
to prove that later rows cannot enter the TopK.
yonas pushed a commit to yonasBSD/datafusion that referenced this pull request Jun 22, 2026
## Which issue does this PR close?

Closes apache#22874.

Follow-up to [fix(topk): call attempt_early_completion when filter
rejects entire batch].

## Rationale for this change

[TopK dynamic filter pushdown attempt 2] lets `SortExec` tighten
scan-side predicates while a TopK heap finds better rows. Once the
current TopK threshold is known, scans can skip data that cannot enter
the final `ORDER BY ... LIMIT` result.

That works well for a single partition. Partitioned `SortExec` has one
extra case to handle:

- each output partition has its own local `TopK` heap
- those local heaps share one `TopKDynamicFilters` instance
- one partition can tighten the shared filter before another partition
has enough rows to fill its local heap

[fix(topk): call attempt_early_completion when filter rejects entire
batch] fixed the local case where a heap already has a max row and the
dynamic filter rejects a whole batch.

This PR fixes the remaining shared-filter case. A lagging partition can
now use the shared prefix threshold to stop early even when its local
heap is still empty. If there is no shared threshold yet, it falls back
to the existing local heap prefix check.

The shared prefix check is not treating another partition's threshold as
this partition's local heap boundary. It uses the same threshold that
already drives the shared dynamic filter. Once a partition's ordered
input has moved past that shared prefix threshold, later batches from
that partition cannot add rows that survive the shared filter. The local
heap still emits the candidates it has already kept; this only stops
pulling input that can no longer add candidates.

Single-partition behavior is unchanged.

## How the TopK optimizations fit together

There are two existing optimizations involved here:

- Dynamic filter pushdown: once a `TopK` heap has K rows, its worst kept
row becomes a threshold. That threshold tightens a scan-side filter so
later data that cannot enter the final `ORDER BY ... LIMIT` result can
be skipped.
- Prefix early exit: when the input is ordered by a prefix of the
requested sort, `TopK` can stop pulling once the last row in a batch is
past a known TopK boundary on that shared prefix.

For a partition-preserving `SortExec`, those optimizations meet in one
shared place. Each output partition has a local `TopK` heap, but all of
those local heaps publish into one shared dynamic filter. A partition
that fills first can tighten the shared filter for everyone else.
Lagging partitions then need to use that same shared prefix threshold to
stop pulling once their ordered input has moved past it.

This PR makes that composition explicit: the shared filter stays alive
until every local `TopK` has emitted, and the shared threshold carries
its common-prefix row so lagging partitions can apply the same prefix
early-exit check.

## What changes are included in this PR?

- Check early completion when a batch passes the dynamic filter but
produces zero heap replacements.
- Track local TopK emitters so a shared filter completes only after the
last emitter has produced output.
- Store the shared threshold and its common-prefix row together in
`TopKDynamicFilters`.
- Check the shared prefix in `attempt_early_completion` before falling
back to the local heap prefix.
- Add focused TopK and `SortExec` tests for the local zero-replacement
path, early completion before local heap fill, equal-prefix
non-completion, DESC/null prefix ordering, and shared-filter completion.

## How is this split for review?

The commits are ordered so each one has a narrow job:

1. `Check TopK early completion after zero-replacement batches`
Handles the local case where a batch passes the dynamic filter, produces
zero heap replacements, but still proves later rows cannot enter the
TopK.

2. `Complete shared TopK filters after all emitters`
Keeps a shared dynamic filter watchable until every local TopK emitter
has emitted. This includes the `SortExec` wiring for preserved
partitioning.

3. `Use shared TopK prefix thresholds for early exit`
Carries the shared threshold's common-prefix row so lagging partitions
can stop before their local heap is full.

## Are these changes tested?

Correctness is covered by targeted tests for:

- the local zero-replacement early-completion path
- early completion before local heap fill from a shared prefix threshold
- equal-prefix non-completion
- DESC and NULLS LAST prefix row ordering
- shared-filter completion after all TopK emitters, including preserved
`SortExec` partitioning

Relevant background:

- [perf: Add TopK benchmarks as variation over the `sort_tpch`
benchmarks] added the benchmark setup used here.
- [perf: Introduce sort prefix computation for early TopK exit
optimization on partially sorted input (10x speedup on top10 bench)]
added common-prefix TopK early termination.
- [TopK dynamic filter pushdown attempt 2] added scan-side dynamic
filter pushdown and exposed the shared-filter / local-heap interaction.
- [fix(topk): call attempt_early_completion when filter rejects entire
batch] fixed the local all-filtered-batch case.
- This PR fixes the remaining partitioned shared-filter case.

Benchmark command:

```bash
dfbench sort-tpch --sorted --limit 10 --iterations 5 \
  --path /tmp/df-topk-bench-data/tpch_sf1 \
  -o /tmp/topk-shared-prefix-followup.json
```

Both sides were rebuilt with fresh isolated `release-nonlto` target
directories before running the benchmark. This is not the regular
`topk_tpch` script: this case needs `--sorted --limit 10` to exercise
the prefix early-exit path.

Clean rerun against the PR base commit `7bb6e152b`. Times are
milliseconds, using the average of 5 iterations for each row.

| scope | PR base | this PR | change |
|---|---:|---:|---:|
| all sort-tpch queries | 760.59 | 348.84 | -54.1% |
| Q8 | 49.70 | 7.66 | -84.6% |
| Q9 | 92.13 | 10.27 | -88.8% |
| Q10 | 89.66 | 14.35 | -84.0% |

The `DataSourceExec` counters show the less noisy part of the result.
Q8/Q9/Q10 now emit only the first batch from each partition instead of
continuing to drain millions of rows.

| query | PR base `DataSourceExec output_rows` | this PR `DataSourceExec
output_rows` | PR base `bytes_scanned` | this PR `bytes_scanned` |
|---|---:|---:|---:|---:|
| Q8 | 3.66M | 81.92K | 56.81M | 15.79M |
| Q9 | 3.66M | 81.92K | 75.19M | 20.89M |
| Q10 | 3.10M | 81.92K | 110.9M | 34.69M |

## Are there any user-facing changes?

No. This is an internal physical execution optimization fix.

[perf: Add TopK benchmarks as variation over the `sort_tpch`
benchmarks]: apache#15560
[perf: Introduce sort prefix computation for early TopK exit
optimization on partially sorted input (10x speedup on top10 bench)]:
apache#15563
[TopK dynamic filter pushdown attempt 2]:
apache#15770
[fix(topk): call attempt_early_completion when filter rejects entire
batch]: apache#22852

---------

Co-authored-by: kosiew <kosiew@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

physical-plan Changes to the physical-plan crate

Projects

None yet

Development

Successfully merging this pull request may close these issues.

TopK early-exit doesn't fire when heap's dynamic filter rejects an entire batch

3 participants