[FLUSS][Spark] Add set_cluster_configs and reset_cluster_configs procedures#3204
[FLUSS][Spark] Add set_cluster_configs and reset_cluster_configs procedures#3204XuQianJin-Stars wants to merge 3 commits intoapache:mainfrom
Conversation
…edures for Spark connector
…igsProcedureTest - Replace DATALAKE_FORMAT=paimon with LOG_REPLICA_MIN_IN_SYNC_REPLICAS_NUMBER=2 to avoid triggering Paimon LakeCatalog creation (which requires warehouse config). - Loosen the post-reset assertion in ResetClusterConfigsProcedureTest: describeClusterConfigs only returns entries present in initial/dynamic configs, so 0 rows after reset is valid when the key was not in the initial configs.
There was a problem hiding this comment.
Pull request overview
Note
Copilot was unable to run its full agentic suite in this review.
Adds Spark SQL procedures to modify and reset Fluss cluster configurations at runtime, bringing Spark parity with existing Flink procedures.
Changes:
- Add
set_cluster_configsprocedure to set dynamic cluster config key/value pairs. - Add
reset_cluster_configsprocedure to delete (reset) dynamic cluster configs back to defaults. - Register new procedures in Spark and document usage/examples.
Reviewed changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 7 comments.
Show a summary per file
| File | Description |
|---|---|
| website/docs/engine-spark/procedures.md | Documents the new Spark procedures, including syntax, parameters, and examples. |
| fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/procedure/SetClusterConfigsProcedure.scala | Implements sys.set_cluster_configs procedure and its output schema. |
| fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/procedure/ResetClusterConfigsProcedure.scala | Implements sys.reset_cluster_configs procedure and its output schema. |
| fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkProcedures.scala | Registers the two new procedures in the Spark connector. |
| fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/procedure/SetClusterConfigsProcedureTest.scala | Adds unit tests for setting configs and validating input. |
| fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/procedure/ResetClusterConfigsProcedureTest.scala | Adds unit tests for resetting configs and validating input. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| val configPairs = if (args.numFields > 0 && !args.isNullAt(0)) { | ||
| val pairsArray = args.getArray(0) | ||
| (0 until pairsArray.numElements()) | ||
| .map(i => pairsArray.getUTF8String(i).toString) |
There was a problem hiding this comment.
pairsArray.getUTF8String(i) will throw if any element in config_pairs is NULL. Since Spark SQL arrays can contain nulls, this can lead to a confusing runtime failure (Spark internal error) rather than a clear validation error. Consider checking pairsArray.isNullAt(i) and throwing an IllegalArgumentException with a user-facing message that identifies the null element (and ideally its position/key).
| .map(i => pairsArray.getUTF8String(i).toString) | |
| .map { i => | |
| if (pairsArray.isNullAt(i)) { | |
| throw new IllegalArgumentException( | |
| s"config_pairs contains a null element at position $i. " + | |
| "Please specify valid configuration key/value pairs.") | |
| } | |
| pairsArray.getUTF8String(i).toString | |
| } |
| val configKeys = if (args.numFields > 0 && !args.isNullAt(0)) { | ||
| val keysArray = args.getArray(0) | ||
| (0 until keysArray.numElements()) | ||
| .map(i => keysArray.getUTF8String(i).toString) |
There was a problem hiding this comment.
keysArray.getUTF8String(i) will throw if config_keys contains NULL. This should be validated explicitly so callers get a deterministic IllegalArgumentException (e.g., “config_keys must not contain nulls”) instead of a Spark internal error.
| .map(i => keysArray.getUTF8String(i).toString) | |
| .map { i => | |
| if (keysArray.isNullAt(i)) { | |
| throw new IllegalArgumentException("config_keys must not contain nulls") | |
| } | |
| keysArray.getUTF8String(i).toString | |
| } |
| import scala.collection.JavaConverters._ | ||
|
|
There was a problem hiding this comment.
This import is unused in the file (no Java/Scala collection conversions are performed). Removing it avoids warnings and keeps the new file minimal.
| import scala.collection.JavaConverters._ |
| import scala.collection.JavaConverters._ | ||
|
|
There was a problem hiding this comment.
This import is unused in the file. Please remove it to avoid warnings and keep the new file clean.
| import scala.collection.JavaConverters._ |
| **Syntax:** | ||
|
|
||
| ```sql | ||
| CALL [catalog_name.]sys.set_cluster_configs(config_pairs => ARRAY('key1', 'value1', 'key2', 'value2')) |
There was a problem hiding this comment.
The syntax examples in this doc section omit the trailing semicolon, while the surrounding examples (e.g., CALL sys.get_cluster_configs();) include semicolons. For consistency and copy/paste friendliness, consider adding ; to the end of the syntax lines.
| **Syntax:** | ||
|
|
||
| ```sql | ||
| CALL [catalog_name.]sys.reset_cluster_configs(config_keys => ARRAY('key1', 'key2')) |
There was a problem hiding this comment.
Same consistency issue as set_cluster_configs: consider adding a trailing semicolon in the syntax block to match other procedure examples in this page.
| CALL [catalog_name.]sys.reset_cluster_configs(config_keys => ARRAY('key1', 'key2')) | |
| CALL [catalog_name.]sys.reset_cluster_configs(config_keys => ARRAY('key1', 'key2')); |
| override def call(args: InternalRow): Array[InternalRow] = { | ||
| val configPairs = if (args.numFields > 0 && !args.isNullAt(0)) { | ||
| val pairsArray = args.getArray(0) | ||
| (0 until pairsArray.numElements()) | ||
| .map(i => pairsArray.getUTF8String(i).toString) | ||
| .toArray | ||
| } else { | ||
| Array.empty[String] | ||
| } |
There was a problem hiding this comment.
The new parsing logic should be exercised with arrays containing NULL elements (e.g., array('k1', null) or array(null, 'v1')) to ensure you return a clear validation error rather than a Spark internal exception. Adding one or two negative tests around NULL elements would lock in the intended behavior once validation is added.
Purpose
Linked issue: close #3203
This PR adds
set_cluster_configsandreset_cluster_configsprocedures to the Spark connector, aligning it with the Flink connector which already provides a complete set of cluster configuration management procedures (get_cluster_configs,set_cluster_configs,reset_cluster_configs).Currently, Spark users can only read cluster configurations via
get_cluster_configsbut cannot dynamically modify or reset them through SQL. This change closes that gap.Brief change log
SetClusterConfigsProcedureto dynamically set cluster configuration values viaCALL sys.set_cluster_configs(config_pairs => ARRAY('key1', 'value1', 'key2', 'value2')).ResetClusterConfigsProcedureto reset cluster configurations to their default values viaCALL sys.reset_cluster_configs(config_keys => ARRAY('key1', 'key2')).SparkProcedures.Admin.alterClusterConfigs()API withAlterConfigOpType.SETandAlterConfigOpType.DELETErespectively.Tests
SetClusterConfigsProcedureTest:set_cluster_configs: set a single configuration— sets a config and verifies it viaget_cluster_configsset_cluster_configs: set multiple configurations— sets multiple configs in one callset_cluster_configs: empty config_pairs should fail— validates error on empty inputset_cluster_configs: odd number of config_pairs should fail— validates error on malformed inputResetClusterConfigsProcedureTest:reset_cluster_configs: set and then reset a configuration— sets a config, resets it, and verifies it's no longer DYNAMICreset_cluster_configs: reset multiple configurations— resets multiple configs in one callreset_cluster_configs: empty config_keys should fail— validates error on empty inputAPI and Format
No. This change only adds new Spark SQL procedures. No existing API or storage format is affected.
Documentation
Yes. Updated
website/docs/engine-spark/procedures.mdwith documentation for bothset_cluster_configsandreset_cluster_configsprocedures, including syntax, parameters, return types, examples, and notes.