risingwave_common/session_config/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod non_zero64;
16mod opt;
17mod over_window;
18pub mod parallelism;
19mod query_mode;
20mod search_path;
21pub mod sink_decouple;
22mod transaction_isolation_level;
23mod visibility_mode;
24
25use chrono_tz::Tz;
26use itertools::Itertools;
27pub use opt::OptionConfig;
28pub use over_window::OverWindowCachePolicy;
29pub use query_mode::QueryMode;
30use risingwave_common_proc_macro::{ConfigDoc, SessionConfig};
31pub use search_path::{SearchPath, USER_NAME_WILD_CARD};
32use serde::{Deserialize, Serialize};
33use thiserror::Error;
34
35use self::non_zero64::ConfigNonZeroU64;
36use crate::config::streaming::JoinEncodingType;
37use crate::config::{ConfigMergeError, StreamingConfig, merge_streaming_config_section};
38use crate::hash::VirtualNode;
39use crate::session_config::parallelism::ConfigParallelism;
40use crate::session_config::sink_decouple::SinkDecouple;
41use crate::session_config::transaction_isolation_level::IsolationLevel;
42pub use crate::session_config::visibility_mode::VisibilityMode;
43use crate::{PG_VERSION, SERVER_ENCODING, SERVER_VERSION_NUM, STANDARD_CONFORMING_STRINGS};
44
45pub const SESSION_CONFIG_LIST_SEP: &str = ", ";
46
47#[derive(Error, Debug)]
48pub enum SessionConfigError {
49    #[error("Invalid value `{value}` for `{entry}`")]
50    InvalidValue {
51        entry: &'static str,
52        value: String,
53        source: anyhow::Error,
54    },
55
56    #[error("Unrecognized config entry `{0}`")]
57    UnrecognizedEntry(String),
58}
59
60type SessionConfigResult<T> = std::result::Result<T, SessionConfigError>;
61
62// NOTE(kwannoel): We declare it separately as a constant,
63// otherwise seems like it can't infer the type of -1 when written inline.
64const DISABLE_BACKFILL_RATE_LIMIT: i32 = -1;
65const DISABLE_SOURCE_RATE_LIMIT: i32 = -1;
66const DISABLE_DML_RATE_LIMIT: i32 = -1;
67const DISABLE_SINK_RATE_LIMIT: i32 = -1;
68
69/// Default to bypass cluster limits iff in debug mode.
70const BYPASS_CLUSTER_LIMITS: bool = cfg!(debug_assertions);
71
72/// This is the Session Config of RisingWave.
73///
74/// All config entries implement `Display` and `FromStr` for getter and setter, to be read and
75/// altered within a session.
76///
77/// Users can change the default value of a configuration entry using `ALTER SYSTEM SET`. To
78/// facilitate this, a `serde` implementation is used as the wire format for retrieving initial
79/// configurations and updates from the meta service. It's important to note that the meta
80/// service stores the overridden value of each configuration entry per row with `Display` in
81/// the meta store, rather than using the `serde` format. However, we still delegate the `serde`
82/// impl of all fields to `Display`/`FromStr` to make it consistent.
83#[serde_with::apply(_ => #[serde_as(as = "serde_with::DisplayFromStr")] )]
84#[serde_with::serde_as]
85#[derive(Clone, Debug, Deserialize, Serialize, SessionConfig, ConfigDoc, PartialEq)]
86pub struct SessionConfig {
87    /// If `RW_IMPLICIT_FLUSH` is on, then every INSERT/UPDATE/DELETE statement will block
88    /// until the entire dataflow is refreshed. In other words, every related table & MV will
89    /// be able to see the write.
90    #[parameter(default = false, alias = "rw_implicit_flush")]
91    implicit_flush: bool,
92
93    /// If `CREATE_COMPACTION_GROUP_FOR_MV` is on, dedicated compaction groups will be created in
94    /// MV creation.
95    #[parameter(default = false)]
96    create_compaction_group_for_mv: bool,
97
98    /// A temporary config variable to force query running in either local or distributed mode.
99    /// The default value is auto which means let the system decide to run batch queries in local
100    /// or distributed mode automatically.
101    #[parameter(default = QueryMode::default())]
102    query_mode: QueryMode,
103
104    /// Sets the number of digits displayed for floating-point values.
105    /// See <https://www.postgresql.org/docs/current/runtime-config-client.html#:~:text=for%20more%20information.-,extra_float_digits,-(integer)>
106    #[parameter(default = 1)]
107    extra_float_digits: i32,
108
109    /// Sets the application name to be reported in statistics and logs.
110    /// See <https://www.postgresql.org/docs/14/runtime-config-logging.html#:~:text=What%20to%20Log-,application_name,-(string)>
111    #[parameter(default = "", flags = "REPORT")]
112    application_name: String,
113
114    /// It is typically set by an application upon connection to the server.
115    /// see <https://www.postgresql.org/docs/current/runtime-config-client.html#GUC-DATESTYLE>
116    #[parameter(default = "", rename = "datestyle")]
117    date_style: String,
118
119    /// Force the use of lookup join instead of hash join when possible for local batch execution.
120    #[parameter(default = true, alias = "rw_batch_enable_lookup_join")]
121    batch_enable_lookup_join: bool,
122
123    /// Enable usage of sortAgg instead of hash agg when order property is satisfied in batch
124    /// execution
125    #[parameter(default = true, alias = "rw_batch_enable_sort_agg")]
126    batch_enable_sort_agg: bool,
127
128    /// Enable distributed DML, so an insert, delete, and update statement can be executed in a distributed way (e.g. running in multiple compute nodes).
129    /// No atomicity guarantee in this mode. Its goal is to gain the best ingestion performance for initial batch ingestion where users always can drop their table when failure happens.
130    #[parameter(default = false, rename = "batch_enable_distributed_dml")]
131    batch_enable_distributed_dml: bool,
132
133    /// Evaluate expression in strict mode for batch queries.
134    /// If set to false, an expression failure will not cause an error but leave a null value
135    /// on the result set.
136    #[parameter(default = true)]
137    batch_expr_strict_mode: bool,
138
139    /// The max gap allowed to transform small range scan into multi point lookup.
140    #[parameter(default = 8)]
141    max_split_range_gap: i32,
142
143    /// Sets the order in which schemas are searched when an object (table, data type, function, etc.)
144    /// is referenced by a simple name with no schema specified.
145    /// See <https://www.postgresql.org/docs/14/runtime-config-client.html#GUC-SEARCH-PATH>
146    #[parameter(default = SearchPath::default())]
147    search_path: SearchPath,
148
149    /// If `VISIBILITY_MODE` is all, we will support querying data without checkpoint.
150    #[parameter(default = VisibilityMode::default())]
151    visibility_mode: VisibilityMode,
152
153    /// See <https://www.postgresql.org/docs/current/transaction-iso.html>
154    #[parameter(default = IsolationLevel::default())]
155    transaction_isolation: IsolationLevel,
156
157    /// Select as of specific epoch.
158    /// Sets the historical epoch for querying data. If 0, querying latest data.
159    #[parameter(default = ConfigNonZeroU64::default())]
160    query_epoch: ConfigNonZeroU64,
161
162    /// Session timezone. Defaults to UTC.
163    #[parameter(default = "UTC", check_hook = check_timezone)]
164    timezone: String,
165
166    /// The execution parallelism for streaming queries, including tables, materialized views, indexes,
167    /// and sinks. Defaults to 0, which means they will be scheduled adaptively based on the cluster size.
168    ///
169    /// If a non-zero value is set, streaming queries will be scheduled to use a fixed number of parallelism.
170    /// Note that the value will be bounded at `STREAMING_MAX_PARALLELISM`.
171    #[parameter(default = ConfigParallelism::default())]
172    streaming_parallelism: ConfigParallelism,
173
174    /// Specific parallelism for table. By default, it will fall back to `STREAMING_PARALLELISM`.
175    #[parameter(default = ConfigParallelism::default())]
176    streaming_parallelism_for_table: ConfigParallelism,
177
178    /// Specific parallelism for sink. By default, it will fall back to `STREAMING_PARALLELISM`.
179    #[parameter(default = ConfigParallelism::default())]
180    streaming_parallelism_for_sink: ConfigParallelism,
181
182    /// Specific parallelism for index. By default, it will fall back to `STREAMING_PARALLELISM`.
183    #[parameter(default = ConfigParallelism::default())]
184    streaming_parallelism_for_index: ConfigParallelism,
185
186    /// Specific parallelism for source. By default, it will fall back to `STREAMING_PARALLELISM`.
187    #[parameter(default = ConfigParallelism::default())]
188    streaming_parallelism_for_source: ConfigParallelism,
189
190    /// Specific parallelism for materialized view. By default, it will fall back to `STREAMING_PARALLELISM`.
191    #[parameter(default = ConfigParallelism::default())]
192    streaming_parallelism_for_materialized_view: ConfigParallelism,
193
194    /// Enable delta join for streaming queries. Defaults to false.
195    #[parameter(default = false, alias = "rw_streaming_enable_delta_join")]
196    streaming_enable_delta_join: bool,
197
198    /// Enable bushy join for streaming queries. Defaults to true.
199    #[parameter(default = true, alias = "rw_streaming_enable_bushy_join")]
200    streaming_enable_bushy_join: bool,
201
202    /// Force filtering to be done inside the join whenever there's a choice between optimizations.
203    /// Defaults to false.
204    #[parameter(default = false, alias = "rw_streaming_force_filter_inside_join")]
205    streaming_force_filter_inside_join: bool,
206
207    /// Enable arrangement backfill for streaming queries. Defaults to true.
208    /// When set to true, the parallelism of the upstream fragment will be
209    /// decoupled from the parallelism of the downstream scan fragment.
210    /// Or more generally, the parallelism of the upstream table / index / mv
211    /// will be decoupled from the parallelism of the downstream table / index / mv / sink.
212    #[parameter(default = true)]
213    streaming_use_arrangement_backfill: bool,
214
215    #[parameter(default = false)]
216    streaming_use_snapshot_backfill: bool,
217
218    /// Allow `jsonb` in stream key
219    #[parameter(default = false, alias = "rw_streaming_allow_jsonb_in_stream_key")]
220    streaming_allow_jsonb_in_stream_key: bool,
221
222    /// Enable materialized expressions for impure functions (typically UDF).
223    #[parameter(default = true)]
224    streaming_enable_materialized_expressions: bool,
225
226    /// Separate consecutive `StreamHashJoin` by no-shuffle `StreamExchange`
227    #[parameter(default = false)]
228    streaming_separate_consecutive_join: bool,
229
230    /// Separate `StreamSink` by no-shuffle `StreamExchange`
231    #[parameter(default = false)]
232    streaming_separate_sink: bool,
233
234    /// Determine which encoding will be used to encode join rows in operator cache.
235    ///
236    /// This overrides the corresponding entry from the `[streaming.developer]` section in the config file,
237    /// taking effect for new streaming jobs created in the current session.
238    #[parameter(default = None)]
239    streaming_join_encoding: OptionConfig<JoinEncodingType>,
240
241    /// Enable join ordering for streaming and batch queries. Defaults to true.
242    #[parameter(default = true, alias = "rw_enable_join_ordering")]
243    enable_join_ordering: bool,
244
245    /// Enable two phase agg optimization. Defaults to true.
246    /// Setting this to true will always set `FORCE_TWO_PHASE_AGG` to false.
247    #[parameter(default = true, flags = "SETTER", alias = "rw_enable_two_phase_agg")]
248    enable_two_phase_agg: bool,
249
250    /// Force two phase agg optimization whenever there's a choice between
251    /// optimizations. Defaults to false.
252    /// Setting this to true will always set `ENABLE_TWO_PHASE_AGG` to false.
253    #[parameter(default = false, flags = "SETTER", alias = "rw_force_two_phase_agg")]
254    force_two_phase_agg: bool,
255
256    /// Enable sharing of common sub-plans.
257    /// This means that DAG structured query plans can be constructed,
258    #[parameter(default = true, alias = "rw_enable_share_plan")]
259    /// rather than only tree structured query plans.
260    enable_share_plan: bool,
261
262    /// Enable split distinct agg
263    #[parameter(default = false, alias = "rw_force_split_distinct_agg")]
264    force_split_distinct_agg: bool,
265
266    /// See <https://www.postgresql.org/docs/current/runtime-config-client.html#GUC-INTERVALSTYLE>
267    #[parameter(default = "", rename = "intervalstyle")]
268    interval_style: String,
269
270    /// If `BATCH_PARALLELISM` is non-zero, batch queries will use this parallelism.
271    #[parameter(default = ConfigNonZeroU64::default())]
272    batch_parallelism: ConfigNonZeroU64,
273
274    /// The version of PostgreSQL that Risingwave claims to be.
275    #[parameter(default = PG_VERSION)]
276    server_version: String,
277
278    /// The version of PostgreSQL that Risingwave claims to be.
279    #[parameter(default = SERVER_VERSION_NUM)]
280    server_version_num: i32,
281
282    /// see <https://www.postgresql.org/docs/15/runtime-config-client.html#GUC-CLIENT-MIN-MESSAGES>
283    #[parameter(default = "notice")]
284    client_min_messages: String,
285
286    /// see <https://www.postgresql.org/docs/15/runtime-config-client.html#GUC-CLIENT-ENCODING>
287    #[parameter(default = SERVER_ENCODING, check_hook = check_client_encoding)]
288    client_encoding: String,
289
290    /// Enable decoupling sink and internal streaming graph or not
291    #[parameter(default = SinkDecouple::default())]
292    sink_decouple: SinkDecouple,
293
294    /// See <https://www.postgresql.org/docs/current/runtime-config-compatible.html#RUNTIME-CONFIG-COMPATIBLE-VERSION>
295    /// Unused in RisingWave, support for compatibility.
296    #[parameter(default = false)]
297    synchronize_seqscans: bool,
298
299    /// Abort query statement that takes more than the specified amount of time in sec. If
300    /// `log_min_error_statement` is set to ERROR or lower, the statement that timed out will also be
301    /// logged. If this value is specified without units, it is taken as milliseconds. A value of
302    /// zero (the default) disables the timeout.
303    #[parameter(default = 0u32)]
304    statement_timeout: u32,
305
306    /// Terminate any session that has been idle (that is, waiting for a client query) within an open transaction for longer than the specified amount of time in milliseconds.
307    #[parameter(default = 60000u32)]
308    idle_in_transaction_session_timeout: u32,
309
310    /// See <https://www.postgresql.org/docs/current/runtime-config-client.html#GUC-LOCK-TIMEOUT>
311    /// Unused in RisingWave, support for compatibility.
312    #[parameter(default = 0)]
313    lock_timeout: i32,
314
315    /// For limiting the startup time of a shareable CDC streaming source when the source is being created. Unit: seconds.
316    #[parameter(default = 60)]
317    cdc_source_wait_streaming_start_timeout: i32,
318
319    /// see <https://www.postgresql.org/docs/current/runtime-config-client.html#GUC-ROW-SECURITY>.
320    /// Unused in RisingWave, support for compatibility.
321    #[parameter(default = true)]
322    row_security: bool,
323
324    /// see <https://www.postgresql.org/docs/current/runtime-config-client.html#GUC-STANDARD-CONFORMING-STRINGS>
325    #[parameter(default = STANDARD_CONFORMING_STRINGS)]
326    standard_conforming_strings: String,
327
328    /// Set streaming rate limit (rows per second) for each parallelism for mv / source / sink backfilling
329    /// If set to -1, disable rate limit.
330    /// If set to 0, this pauses the snapshot read / source read.
331    #[parameter(default = DISABLE_BACKFILL_RATE_LIMIT)]
332    backfill_rate_limit: i32,
333
334    /// Set streaming rate limit (rows per second) for each parallelism for mv / source backfilling, source reads.
335    /// If set to -1, disable rate limit.
336    /// If set to 0, this pauses the snapshot read / source read.
337    #[parameter(default = DISABLE_SOURCE_RATE_LIMIT)]
338    source_rate_limit: i32,
339
340    /// Set streaming rate limit (rows per second) for each parallelism for table DML.
341    /// If set to -1, disable rate limit.
342    /// If set to 0, this pauses the DML.
343    #[parameter(default = DISABLE_DML_RATE_LIMIT)]
344    dml_rate_limit: i32,
345
346    /// Set sink rate limit (rows per second) for each parallelism for external sink.
347    /// If set to -1, disable rate limit.
348    /// If set to 0, this pauses the sink.
349    #[parameter(default = DISABLE_SINK_RATE_LIMIT)]
350    sink_rate_limit: i32,
351
352    /// Cache policy for partition cache in streaming over window.
353    /// Can be "full", "recent", "`recent_first_n`" or "`recent_last_n`".
354    #[parameter(default = OverWindowCachePolicy::default(), alias = "rw_streaming_over_window_cache_policy")]
355    streaming_over_window_cache_policy: OverWindowCachePolicy,
356
357    /// Run DDL statements in background
358    #[parameter(default = false)]
359    background_ddl: bool,
360
361    /// Enable shared source. Currently only for Kafka.
362    ///
363    /// When enabled, `CREATE SOURCE` will create a source streaming job, and `CREATE MATERIALIZED VIEWS` from the source
364    /// will forward the data from the same source streaming job, and also backfill prior data from the external source.
365    #[parameter(default = true)]
366    streaming_use_shared_source: bool,
367
368    /// Shows the server-side character set encoding. At present, this parameter can be shown but not set, because the encoding is determined at database creation time.
369    #[parameter(default = SERVER_ENCODING)]
370    server_encoding: String,
371
372    #[parameter(default = "hex", check_hook = check_bytea_output)]
373    bytea_output: String,
374
375    /// Bypass checks on cluster limits
376    ///
377    /// When enabled, `CREATE MATERIALIZED VIEW` will not fail if the cluster limit is hit.
378    #[parameter(default = BYPASS_CLUSTER_LIMITS)]
379    bypass_cluster_limits: bool,
380
381    /// The maximum number of parallelism a streaming query can use. Defaults to 256.
382    ///
383    /// Compared to `STREAMING_PARALLELISM`, which configures the initial parallelism, this configures
384    /// the maximum parallelism a streaming query can use in the future, if the cluster size changes or
385    /// users manually change the parallelism with `ALTER .. SET PARALLELISM`.
386    ///
387    /// It's not always a good idea to set this to a very large number, as it may cause performance
388    /// degradation when performing range scans on the table or the materialized view.
389    // a.k.a. vnode count
390    #[parameter(default = VirtualNode::COUNT_FOR_COMPAT, check_hook = check_streaming_max_parallelism)]
391    streaming_max_parallelism: usize,
392
393    /// Used to provide the connection information for the iceberg engine.
394    /// Format: `iceberg_engine_connection` = `schema_name.connection_name`.
395    #[parameter(default = "", check_hook = check_iceberg_engine_connection)]
396    iceberg_engine_connection: String,
397
398    /// Whether the streaming join should be unaligned or not.
399    #[parameter(default = false)]
400    streaming_enable_unaligned_join: bool,
401
402    /// The timeout for reading from the buffer of the sync log store on barrier.
403    /// Every epoch we will attempt to read the full buffer of the sync log store.
404    /// If we hit the timeout, we will stop reading and continue.
405    #[parameter(default = 64_usize)]
406    streaming_sync_log_store_pause_duration_ms: usize,
407
408    /// The max buffer size for sync logstore, before we start flushing.
409    #[parameter(default = 2048_usize)]
410    streaming_sync_log_store_buffer_size: usize,
411
412    /// Whether to disable purifying the definition of the table or source upon retrieval.
413    /// Only set this if encountering issues with functionalities like `SHOW` or `ALTER TABLE/SOURCE`.
414    /// This config may be removed in the future.
415    #[parameter(default = false, flags = "NO_ALTER_SYS")]
416    disable_purify_definition: bool,
417
418    /// The `ef_search` used in querying hnsw vector index
419    #[parameter(default = 40_usize)] // default value borrowed from pg_vector
420    batch_hnsw_ef_search: usize,
421
422    /// Enable index selection for queries
423    #[parameter(default = true)]
424    enable_index_selection: bool,
425
426    /// Enable locality backfill for streaming queries. Defaults to false.
427    #[parameter(default = false)]
428    enable_locality_backfill: bool,
429
430    /// Duration in seconds before notifying the user that a long-running DDL operation (e.g., DROP TABLE, CANCEL JOBS)
431    /// is still running. Set to 0 to disable notifications. Defaults to 30 seconds.
432    #[parameter(default = 30u32)]
433    slow_ddl_notification_secs: u32,
434
435    /// Unsafe: Enable storage retention for non-append-only tables.
436    /// Enabling this can lead to streaming inconsistency and node panic
437    /// if there is any row INSERT/UPDATE/DELETE operation corresponding to the ttled primary key.
438    #[parameter(default = false)]
439    unsafe_enable_storage_retention_for_non_append_only_tables: bool,
440}
441
442fn check_iceberg_engine_connection(val: &str) -> Result<(), String> {
443    if val.is_empty() {
444        return Ok(());
445    }
446
447    let parts: Vec<&str> = val.split('.').collect();
448    if parts.len() != 2 {
449        return Err("Invalid iceberg engine connection format, Should be set to this format: schema_name.connection_name.".to_owned());
450    }
451
452    Ok(())
453}
454
455fn check_timezone(val: &str) -> Result<(), String> {
456    // Check if the provided string is a valid timezone.
457    Tz::from_str_insensitive(val).map_err(|_e| "Not a valid timezone")?;
458    Ok(())
459}
460
461fn check_client_encoding(val: &str) -> Result<(), String> {
462    // https://github.com/postgres/postgres/blob/REL_15_3/src/common/encnames.c#L525
463    let clean = val.replace(|c: char| !c.is_ascii_alphanumeric(), "");
464    if !clean.eq_ignore_ascii_case("UTF8") {
465        Err("Only support 'UTF8' for CLIENT_ENCODING".to_owned())
466    } else {
467        Ok(())
468    }
469}
470
471fn check_bytea_output(val: &str) -> Result<(), String> {
472    if val == "hex" {
473        Ok(())
474    } else {
475        Err("Only support 'hex' for BYTEA_OUTPUT".to_owned())
476    }
477}
478
479/// Check if the provided value is a valid max parallelism.
480fn check_streaming_max_parallelism(val: &usize) -> Result<(), String> {
481    match val {
482        // TODO(var-vnode): this is to prevent confusion with singletons, after we distinguish
483        // them better, we may allow 1 as the max parallelism (though not much point).
484        0 | 1 => Err("STREAMING_MAX_PARALLELISM must be greater than 1".to_owned()),
485        2..=VirtualNode::MAX_COUNT => Ok(()),
486        _ => Err(format!(
487            "STREAMING_MAX_PARALLELISM must be less than or equal to {}",
488            VirtualNode::MAX_COUNT
489        )),
490    }
491}
492
493impl SessionConfig {
494    pub fn set_force_two_phase_agg(
495        &mut self,
496        val: bool,
497        reporter: &mut impl ConfigReporter,
498    ) -> SessionConfigResult<bool> {
499        let set_val = self.set_force_two_phase_agg_inner(val, reporter)?;
500        if self.force_two_phase_agg {
501            self.set_enable_two_phase_agg(true, reporter)
502        } else {
503            Ok(set_val)
504        }
505    }
506
507    pub fn set_enable_two_phase_agg(
508        &mut self,
509        val: bool,
510        reporter: &mut impl ConfigReporter,
511    ) -> SessionConfigResult<bool> {
512        let set_val = self.set_enable_two_phase_agg_inner(val, reporter)?;
513        if !self.force_two_phase_agg {
514            self.set_force_two_phase_agg(false, reporter)
515        } else {
516            Ok(set_val)
517        }
518    }
519}
520
521pub struct VariableInfo {
522    pub name: String,
523    pub setting: String,
524    pub description: String,
525}
526
527/// Report status or notice to caller.
528pub trait ConfigReporter {
529    fn report_status(&mut self, key: &str, new_val: String);
530}
531
532// Report nothing.
533impl ConfigReporter for () {
534    fn report_status(&mut self, _key: &str, _new_val: String) {}
535}
536
537def_anyhow_newtype! {
538    pub SessionConfigToOverrideError,
539    toml::ser::Error => "failed to serialize session config",
540    ConfigMergeError => transparent,
541}
542
543impl SessionConfig {
544    /// Generate an initial override for the streaming config from the session config.
545    pub fn to_initial_streaming_config_override(
546        &self,
547    ) -> Result<String, SessionConfigToOverrideError> {
548        type Map = toml::map::Map<String, toml::Value>;
549        let mut map = Map::new();
550
551        // TODO: make this more type safe.
552        fn add(map: &mut Map, path: &str, val: impl Serialize) -> Result<(), toml::ser::Error> {
553            let value = toml::Value::try_from(val)?;
554            let segments = path.split('.').collect_vec();
555            let (key, segments) = segments.split_last().expect("empty path");
556
557            let mut map = map;
558            for segment in segments {
559                map = map
560                    .entry(segment.to_owned())
561                    .or_insert_with(|| toml::Value::Table(Map::new()))
562                    .as_table_mut()
563                    .expect("expect table");
564            }
565            map.insert(key.to_string(), value);
566
567            Ok(())
568        }
569
570        if let Some(v) = self.streaming_join_encoding.as_ref() {
571            add(&mut map, "streaming.developer.join_encoding_type", v)?;
572        }
573
574        let res = toml::to_string(&map)?;
575
576        // Validate all fields are valid by trying to merge it to the default config.
577        if !res.is_empty() {
578            let merged =
579                merge_streaming_config_section(&StreamingConfig::default(), res.as_str())?.unwrap();
580
581            for u in [
582                merged.unrecognized.into_inner(),
583                merged.developer.unrecognized.into_inner(),
584            ] {
585                if !u.is_empty() {
586                    bail!("unrecognized config entries: {:?}", u);
587                }
588            }
589        }
590
591        Ok(res)
592    }
593}
594
595#[cfg(test)]
596mod test {
597    use expect_test::expect;
598
599    use super::*;
600
601    #[derive(SessionConfig)]
602    struct TestConfig {
603        #[parameter(default = 1, flags = "NO_ALTER_SYS", alias = "test_param_alias" | "alias_param_test")]
604        test_param: i32,
605    }
606
607    #[test]
608    fn test_session_config_alias() {
609        let mut config = TestConfig::default();
610        config.set("test_param", "2".to_owned(), &mut ()).unwrap();
611        assert_eq!(config.get("test_param_alias").unwrap(), "2");
612        config
613            .set("alias_param_test", "3".to_owned(), &mut ())
614            .unwrap();
615        assert_eq!(config.get("test_param_alias").unwrap(), "3");
616        assert!(TestConfig::check_no_alter_sys("test_param").unwrap());
617    }
618
619    #[test]
620    fn test_initial_streaming_config_override() {
621        let mut config = SessionConfig::default();
622        config
623            .set_streaming_join_encoding(Some(JoinEncodingType::Cpu).into(), &mut ())
624            .unwrap();
625
626        // Check the converted config override string.
627        let override_str = config.to_initial_streaming_config_override().unwrap();
628        expect![[r#"
629            [streaming.developer]
630            join_encoding_type = "cpu_optimized"
631        "#]]
632        .assert_eq(&override_str);
633
634        // Try merging it to the default streaming config.
635        let merged = merge_streaming_config_section(&StreamingConfig::default(), &override_str)
636            .unwrap()
637            .unwrap();
638        assert_eq!(merged.developer.join_encoding_type, JoinEncodingType::Cpu);
639    }
640}