Skip to content

feat: add MapSort expression support for Spark 4.0#4076

Open
andygrove wants to merge 10 commits intoapache:mainfrom
andygrove:feat/map-sort-spark4
Open

feat: add MapSort expression support for Spark 4.0#4076
andygrove wants to merge 10 commits intoapache:mainfrom
andygrove:feat/map-sort-spark4

Conversation

@andygrove
Copy link
Copy Markdown
Member

@andygrove andygrove commented Apr 24, 2026

Which issue does this PR close?

Closes #1941
Closes #3171

Rationale for this change

Spark 4.0 introduces MapSort, used for normalizing map values when they appear in shuffle hash partitioning keys, in try_element_at, and in other contexts where map ordering must be deterministic. Without native support, queries that touch maps in any of these positions fall back to Spark, which forces the entire enclosing operator off Comet (e.g. an entire shuffle exchange).

What changes are included in this PR?

  • New native scalar function map_sort in native/spark-expr/src/map_funcs/map_sort.rs that sorts map entries by key in ascending order, registered via comet_scalar_funcs.rs.
  • Wire MapSort into the Spark 4.0 CometExprShim so the expression is converted to the new scalar function during serde.
  • The columnar shuffle on map array element test in CometColumnarShuffleSuite now expects shuffle fallback on Spark 4.0+: the new shuffle-key normalization wraps mapsort inside transform(arr, x -> mapsort(x)), and Comet does not currently support ArrayTransform with a lambda body. Answer correctness is still verified via checkSparkAnswer.

How are these changes tested?

  • New unit tests in native/spark-expr/src/map_funcs/map_sort.rs cover sorting on each supported key type, null handling, and empty maps.
  • Existing CometColumnarShuffleSuite tests for map shuffle keys all pass under the Spark 4.0 profile (41/41).

andygrove and others added 2 commits April 24, 2026 17:44
Add native map_sort scalar function that sorts map entries by key in
ascending order, and wire it up via the Spark 4.0 CometExprShim so that
MapSort expressions are accelerated instead of falling back to Spark.
Re-enable all CometColumnarShuffleSuite map tests that were skipped for
Spark 4.0.

Closes apache#1941

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Spark 4.0 normalizes shuffle keys containing array<map> via
transform(arr, x -> mapsort(x)), which Comet does not yet support
because ArrayTransform with a lambda body has no serde. Mark the
columnar shuffle on map array element test as expecting the fallback
on Spark 4.0+ while still verifying answer correctness.
The MapSort serde for Spark 4.0 called scalarFunctionExprToProto without a
return type. The Rust planner then looked up "map_sort" in the session
UDF registry to infer the type, but map_sort is only handled via the
create_comet_physical_fun match dispatch, not registered as a UDF, causing
"There is no UDF named 'map_sort' in the registry" at execution time
(e.g., group-by on a map column in CollationSuite).

Pass ms.dataType explicitly via scalarFunctionExprToProtoWithReturnType,
matching the pattern used by ceil, floor, and other scalar functions.
Arrow's sort_to_indices does not support Struct (and other complex)
key types, so map_sort fails at runtime when the map key is a struct.
Check key type via supportedScalarSortElementType and fall back to
Spark when the key type is not natively sortable. This fixes 4
CollationSuite failures in spark-sql-auto-sql_core-1 for Spark 4.0:
'Group by on map containing structs with ...'.
# Conflicts:
#	spark/src/main/spark-4.0/org/apache/comet/shims/CometExprShim.scala
Spark 4.0 wraps map shuffle keys in mapsort(...). Comet's map_sort relies
on Arrow's sort_to_indices, which only supports scalar key types, so maps
with array or struct keys fall back to Spark. Update the
'columnar shuffle on array/struct map key/value' test to expect 0 Comet
shuffles for the array-key and struct-key cases on Spark 4.0+, while
keeping the scalar-key cases at 1.
.repartition(numPartitions, $"_1", $"_2")
.sortWithinPartitions($"_2")

if (isSpark40Plus) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, glad we're cleaning up this ugliness.

@mbutrovich
Copy link
Copy Markdown
Contributor

Thanks @andygrove! Categorized suggestions below:

Performance

One global take instead of per-map sort_to_indices + take + concat?

spark_map_sort loops over every map in the batch, calling sort_to_indices and take on each slice, then a final concat over n struct slices. For n maps that's roughly 2n + 1 kernel calls and 2n + 1 allocations (n small index arrays, n full per-map struct copies from take, one full concat copy).

The version below keeps the n sort_to_indices calls but drops the rest: one Vec<u32> grow (pre-sized), one take over the full entries, no concat. That's n + 1 kernel calls and roughly n + 2 allocations, and crucially it eliminates all per-map struct copies plus the final concat copy, which are the expensive ones. Peak transient memory drops from roughly 3x entry volume to roughly 1x.

Compare to DataFusion's sort_batch (datafusion/physical-plan/src/sorts/sort.rs:813), which is what ExternalSorter uses to sort an in-memory batch:

let indices = lexsort_to_indices(&sort_columns, fetch)?;
let columns = take_arrays(batch.columns(), &indices, None)?;

One global permutation, one take. No per-row work, no trailing concat.

map_sort can't call sort_to_indices once on the full key column. It needs to sort within each map boundary. But the same shape applies: build one global index buffer respecting group boundaries, then a single take on the full entries:

let total = maps_arg_entries.len();
let mut global: Vec<u32> = Vec::with_capacity(total);

for idx in 0..maps_arg.len() {
    let start = maps_arg_offsets[idx] as usize;
    let end   = maps_arg_offsets[idx + 1] as usize;
    if end == start { continue; }

    let keys  = maps_arg_entries.column(0).slice(start, end - start);
    let local = sort_to_indices(&keys, Some(sort_options), None)?;
    global.extend(local.values().iter().map(|i| start as u32 + *i));
}

let indices = UInt32Array::from(global);
let sorted  = take(&maps_arg_entries, &indices, None)?;

Offsets and nulls stay untouched. Output is one struct array, matching sort_batch's pattern of global-permutation-plus-single-take.

Early outs?

Worth short-circuiting these up front?

if maps_arg.is_empty() { return Ok(ColumnarValue::Array(arr_arg)); }
if maps_arg.null_count() == maps_arg.len() { return Ok(ColumnarValue::Array(arr_arg)); }
if *is_sorted { return Ok(ColumnarValue::Array(arr_arg)); }

Microbenchmark numbers?

It would be nice to have before/after numbers in the PR description. Spark 4.0 map-shuffle now stays native, so there's a natural story to measure against the prior fallback path.

Spark compatibility

NaN / -0.0 on float-keyed maps

Spark's key ordering goes through Double.compare (NaN sorts largest, -0.0 == 0.0). Arrow's sort_to_indices uses IEEE total ordering (-0.0 < 0.0, NaN at extremes). Comet already tracks this class of incompatibility via spark.comet.exec.strictFloatingPoint.

CometSortOrder (serde/CometSortOrder.scala:34) and CometSortArray (serde/arrays.scala:150) both consult COMET_EXEC_STRICT_FLOATING_POINT and return Incompatible when the child type contains Float/Double. The MapSort shim in this PR doesn't. MapSort should follow the same pattern:

case ms: MapSort =>
  val keyType = ms.dataType.asInstanceOf[MapType].keyType
  if (!supportedScalarSortElementType(keyType)) {
    withInfo(ms, s"MapSort on map with key type $keyType is not supported")
    None
  } else if (CometConf.COMET_EXEC_STRICT_FLOATING_POINT.get() &&
             SupportLevel.containsFloatingPoint(keyType)) {
    withInfo(ms, s"MapSort on floating-point key not compatible with Spark when " +
      s"${CometConf.COMET_EXEC_STRICT_FLOATING_POINT.key}=true")
    None
  } else {
    // ... existing conversion
  }

Default behavior is unchanged (Comet converts the expression, accepting the float divergence). Users who set strictFloatingPoint=true get the same fallback behavior they already get for SortOrder and SortArray.

supportedRangePartitioningDataType in CometShuffleExchangeExec.scala:344 has the same gap but is out of scope for this PR. I will open an issue to follow up on that.

Style nits

  • native/spark-expr/src/map_funcs/map_sort.rs:75,128. Arc::<dyn Array>::clone(array) and Arc::<arrow::datatypes::Field>::clone(map_field) can be Arc::clone(array) / map_field.clone().
  • .unwrap() on the downcast_ref at :81 and :121. An .expect("invariant: ...") documents what's being assumed.

Docs

Worth a mention in docs/source/user-guide/compatibility.md that Spark 4.0 map-shuffle / group-by-on-map now stays native (closing #1941, #3171)?

- Use single global sort_to_indices+take instead of per-map take+concat
- Add early-out fast paths (empty array, all-null, is_sorted=true)
- Fall back to Spark for floating-point map keys when strictFloatingPoint=true
- Clean up Arc::clone calls and replace .unwrap() on downcasts with .expect
- Document MapSort behavior in map expressions compatibility guide
@andygrove
Copy link
Copy Markdown
Member Author

andygrove commented Apr 28, 2026

Thanks @mbutrovich. Pushed 1620e33b7 addressing the feedback:

Performance

  • Replaced the per-map take + final concat with one global Vec<u32> index buffer plus a single take over the full entries struct, mirroring sort_batch.
  • Added the three early-outs (empty input, all-null, *is_sorted) at the top of spark_map_sort.

Spark compatibility

  • MapSort now falls back to Spark for floating-point keys when spark.comet.exec.strictFloatingPoint=true, matching CometSortOrder and CometSortArray.

Style

  • Switched to Arc::clone(...) and .expect("invariant: ...") on the two downcasts.

Docs

  • Added a MapSort section to docs/source/user-guide/latest/compatibility/expressions/map.md describing the Spark 4.0 behavior and the strict floating-point fallback.

I'll add benchmarks next

Covers int and string keys at three map sizes (4, 16, 64 entries per map)
with a fixed batch of 8192 maps.
@andygrove
Copy link
Copy Markdown
Member Author

Microbenchmark numbers from native/spark-expr/benches/map_sort.rs (6e832541d). Each case is a batch of 8192 maps; "old" is the per-map take + concat impl, "new" is the global-permutation + single take impl. Release build, M-series Mac.

Case Entries/map Old New Speedup
int_keys/4 4 3.99 ms 0.77 ms 5.2x
int_keys/16 16 4.44 ms 1.25 ms 3.6x
int_keys/64 64 5.02 ms 1.62 ms 3.1x
string_keys/4 4 4.52 ms 0.97 ms 4.7x
string_keys/16 16 7.16 ms 3.87 ms 1.85x
string_keys/64 64 8.95 ms 4.24 ms 2.1x

The biggest wins are the small-map cases where the per-map struct copies and trailing concat dominated. To run locally:

cd native && cargo bench --bench map_sort

Copy link
Copy Markdown
Contributor

@mbutrovich mbutrovich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks awesome! Thanks for the quick turnaround on the feedback, @andygrove!

case _ => None
}

case ms: MapSort =>
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this shim only for 4.0 or need to be in 4.x?

Copy link
Copy Markdown
Member Author

@andygrove andygrove Apr 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah the PR was started before 4.1 was added, and upmerging broke the tests. added shims for 4.1 and 4.2 now

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is also a folder spark/src/main/spark-4.x to serve all Spark4 subversions

The MapSort handler was only added to the spark-4.0 shim, so under the
spark-4.1 and spark-4.2 profiles `mapsort(...)` partitioning expressions
fell through to `case _ => None` and the columnar shuffle reverted to
plain Spark, breaking CometColumnarShuffleSuite.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Feature] Support Spark expression: map_sort Add support for MapSort expression in Spark 4.0.0

3 participants