risingwave_common/session_config/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod non_zero64;
16mod opt;
17pub mod parallelism;
18mod query_mode;
19mod search_path;
20pub mod sink_decouple;
21mod transaction_isolation_level;
22mod visibility_mode;
23
24use chrono_tz::Tz;
25use itertools::Itertools;
26pub use opt::OptionConfig;
27pub use query_mode::QueryMode;
28use risingwave_common_proc_macro::{ConfigDoc, SessionConfig};
29pub use search_path::{SearchPath, USER_NAME_WILD_CARD};
30use serde::{Deserialize, Serialize};
31use thiserror::Error;
32
33use self::non_zero64::ConfigNonZeroU64;
34use crate::config::mutate::TomlTableMutateExt;
35use crate::config::streaming::{JoinEncodingType, OverWindowCachePolicy};
36use crate::config::{ConfigMergeError, StreamingConfig, merge_streaming_config_section};
37use crate::hash::VirtualNode;
38use crate::session_config::parallelism::ConfigParallelism;
39use crate::session_config::sink_decouple::SinkDecouple;
40use crate::session_config::transaction_isolation_level::IsolationLevel;
41pub use crate::session_config::visibility_mode::VisibilityMode;
42use crate::{PG_VERSION, SERVER_ENCODING, SERVER_VERSION_NUM, STANDARD_CONFORMING_STRINGS};
43
44pub const SESSION_CONFIG_LIST_SEP: &str = ", ";
45
46#[derive(Error, Debug)]
47pub enum SessionConfigError {
48    #[error("Invalid value `{value}` for `{entry}`")]
49    InvalidValue {
50        entry: &'static str,
51        value: String,
52        source: anyhow::Error,
53    },
54
55    #[error("Unrecognized config entry `{0}`")]
56    UnrecognizedEntry(String),
57}
58
59type SessionConfigResult<T> = std::result::Result<T, SessionConfigError>;
60
61// NOTE(kwannoel): We declare it separately as a constant,
62// otherwise seems like it can't infer the type of -1 when written inline.
63const DISABLE_BACKFILL_RATE_LIMIT: i32 = -1;
64const DISABLE_SOURCE_RATE_LIMIT: i32 = -1;
65const DISABLE_DML_RATE_LIMIT: i32 = -1;
66const DISABLE_SINK_RATE_LIMIT: i32 = -1;
67
68/// Default to bypass cluster limits iff in debug mode.
69const BYPASS_CLUSTER_LIMITS: bool = cfg!(debug_assertions);
70
71/// This is the Session Config of RisingWave.
72///
73/// All config entries implement `Display` and `FromStr` for getter and setter, to be read and
74/// altered within a session.
75///
76/// Users can change the default value of a configuration entry using `ALTER SYSTEM SET`. To
77/// facilitate this, a `serde` implementation is used as the wire format for retrieving initial
78/// configurations and updates from the meta service. It's important to note that the meta
79/// service stores the overridden value of each configuration entry per row with `Display` in
80/// the meta store, rather than using the `serde` format. However, we still delegate the `serde`
81/// impl of all fields to `Display`/`FromStr` to make it consistent.
82#[serde_with::apply(_ => #[serde_as(as = "serde_with::DisplayFromStr")] )]
83#[serde_with::serde_as]
84#[derive(Clone, Debug, Deserialize, Serialize, SessionConfig, ConfigDoc, PartialEq)]
85pub struct SessionConfig {
86    /// If `RW_IMPLICIT_FLUSH` is on, then every INSERT/UPDATE/DELETE statement will block
87    /// until the entire dataflow is refreshed. In other words, every related table & MV will
88    /// be able to see the write.
89    #[parameter(default = false, alias = "rw_implicit_flush")]
90    implicit_flush: bool,
91
92    /// If `CREATE_COMPACTION_GROUP_FOR_MV` is on, dedicated compaction groups will be created in
93    /// MV creation.
94    #[parameter(default = false)]
95    create_compaction_group_for_mv: bool,
96
97    /// A temporary config variable to force query running in either local or distributed mode.
98    /// The default value is auto which means let the system decide to run batch queries in local
99    /// or distributed mode automatically.
100    #[parameter(default = QueryMode::default())]
101    query_mode: QueryMode,
102
103    /// Sets the number of digits displayed for floating-point values.
104    /// See <https://www.postgresql.org/docs/current/runtime-config-client.html#:~:text=for%20more%20information.-,extra_float_digits,-(integer)>
105    #[parameter(default = 1)]
106    extra_float_digits: i32,
107
108    /// Sets the application name to be reported in statistics and logs.
109    /// See <https://www.postgresql.org/docs/14/runtime-config-logging.html#:~:text=What%20to%20Log-,application_name,-(string)>
110    #[parameter(default = "", flags = "REPORT")]
111    application_name: String,
112
113    /// It is typically set by an application upon connection to the server.
114    /// see <https://www.postgresql.org/docs/current/runtime-config-client.html#GUC-DATESTYLE>
115    #[parameter(default = "", rename = "datestyle")]
116    date_style: String,
117
118    /// Force the use of lookup join instead of hash join when possible for local batch execution.
119    #[parameter(default = true, alias = "rw_batch_enable_lookup_join")]
120    batch_enable_lookup_join: bool,
121
122    /// Enable usage of sortAgg instead of hash agg when order property is satisfied in batch
123    /// execution
124    #[parameter(default = true, alias = "rw_batch_enable_sort_agg")]
125    batch_enable_sort_agg: bool,
126
127    /// Enable distributed DML, so an insert, delete, and update statement can be executed in a distributed way (e.g. running in multiple compute nodes).
128    /// No atomicity guarantee in this mode. Its goal is to gain the best ingestion performance for initial batch ingestion where users always can drop their table when failure happens.
129    #[parameter(default = false, rename = "batch_enable_distributed_dml")]
130    batch_enable_distributed_dml: bool,
131
132    /// Evaluate expression in strict mode for batch queries.
133    /// If set to false, an expression failure will not cause an error but leave a null value
134    /// on the result set.
135    #[parameter(default = true)]
136    batch_expr_strict_mode: bool,
137
138    /// The max gap allowed to transform small range scan into multi point lookup.
139    #[parameter(default = 8)]
140    max_split_range_gap: i32,
141
142    /// Sets the order in which schemas are searched when an object (table, data type, function, etc.)
143    /// is referenced by a simple name with no schema specified.
144    /// See <https://www.postgresql.org/docs/14/runtime-config-client.html#GUC-SEARCH-PATH>
145    #[parameter(default = SearchPath::default())]
146    search_path: SearchPath,
147
148    /// If `VISIBILITY_MODE` is all, we will support querying data without checkpoint.
149    #[parameter(default = VisibilityMode::default())]
150    visibility_mode: VisibilityMode,
151
152    /// See <https://www.postgresql.org/docs/current/transaction-iso.html>
153    #[parameter(default = IsolationLevel::default())]
154    transaction_isolation: IsolationLevel,
155
156    /// Select as of specific epoch.
157    /// Sets the historical epoch for querying data. If 0, querying latest data.
158    #[parameter(default = ConfigNonZeroU64::default())]
159    query_epoch: ConfigNonZeroU64,
160
161    /// Session timezone. Defaults to UTC.
162    #[parameter(default = "UTC", check_hook = check_timezone)]
163    timezone: String,
164
165    /// The execution parallelism for streaming queries, including tables, materialized views, indexes,
166    /// and sinks. Defaults to 0, which means they will be scheduled adaptively based on the cluster size.
167    ///
168    /// If a non-zero value is set, streaming queries will be scheduled to use a fixed number of parallelism.
169    /// Note that the value will be bounded at `STREAMING_MAX_PARALLELISM`.
170    #[parameter(default = ConfigParallelism::default())]
171    streaming_parallelism: ConfigParallelism,
172
173    /// Specific parallelism for table. By default, it will fall back to `STREAMING_PARALLELISM`.
174    #[parameter(default = ConfigParallelism::default())]
175    streaming_parallelism_for_table: ConfigParallelism,
176
177    /// Specific parallelism for sink. By default, it will fall back to `STREAMING_PARALLELISM`.
178    #[parameter(default = ConfigParallelism::default())]
179    streaming_parallelism_for_sink: ConfigParallelism,
180
181    /// Specific parallelism for index. By default, it will fall back to `STREAMING_PARALLELISM`.
182    #[parameter(default = ConfigParallelism::default())]
183    streaming_parallelism_for_index: ConfigParallelism,
184
185    /// Specific parallelism for source. By default, it will fall back to `STREAMING_PARALLELISM`.
186    #[parameter(default = ConfigParallelism::default())]
187    streaming_parallelism_for_source: ConfigParallelism,
188
189    /// Specific parallelism for materialized view. By default, it will fall back to `STREAMING_PARALLELISM`.
190    #[parameter(default = ConfigParallelism::default())]
191    streaming_parallelism_for_materialized_view: ConfigParallelism,
192
193    /// Enable delta join for streaming queries. Defaults to false.
194    #[parameter(default = false, alias = "rw_streaming_enable_delta_join")]
195    streaming_enable_delta_join: bool,
196
197    /// Enable bushy join for streaming queries. Defaults to true.
198    #[parameter(default = true, alias = "rw_streaming_enable_bushy_join")]
199    streaming_enable_bushy_join: bool,
200
201    /// Force filtering to be done inside the join whenever there's a choice between optimizations.
202    /// Defaults to false.
203    #[parameter(default = false, alias = "rw_streaming_force_filter_inside_join")]
204    streaming_force_filter_inside_join: bool,
205
206    /// Enable arrangement backfill for streaming queries. Defaults to true.
207    /// When set to true, the parallelism of the upstream fragment will be
208    /// decoupled from the parallelism of the downstream scan fragment.
209    /// Or more generally, the parallelism of the upstream table / index / mv
210    /// will be decoupled from the parallelism of the downstream table / index / mv / sink.
211    #[parameter(default = true)]
212    streaming_use_arrangement_backfill: bool,
213
214    #[parameter(default = false)]
215    streaming_use_snapshot_backfill: bool,
216
217    /// Allow `jsonb` in stream key
218    #[parameter(default = false, alias = "rw_streaming_allow_jsonb_in_stream_key")]
219    streaming_allow_jsonb_in_stream_key: bool,
220
221    /// Enable materialized expressions for impure functions (typically UDF).
222    #[parameter(default = true)]
223    streaming_enable_materialized_expressions: bool,
224
225    /// Separate consecutive `StreamHashJoin` by no-shuffle `StreamExchange`
226    #[parameter(default = false)]
227    streaming_separate_consecutive_join: bool,
228
229    /// Separate `StreamSink` by no-shuffle `StreamExchange`
230    #[parameter(default = false)]
231    streaming_separate_sink: bool,
232
233    /// Determine which encoding will be used to encode join rows in operator cache.
234    ///
235    /// This overrides the corresponding entry from the `[streaming.developer]` section in the config file,
236    /// taking effect for new streaming jobs created in the current session.
237    #[parameter(default = None)]
238    streaming_join_encoding: OptionConfig<JoinEncodingType>,
239
240    /// Enable join ordering for streaming and batch queries. Defaults to true.
241    #[parameter(default = true, alias = "rw_enable_join_ordering")]
242    enable_join_ordering: bool,
243
244    /// Enable two phase agg optimization. Defaults to true.
245    /// Setting this to true will always set `FORCE_TWO_PHASE_AGG` to false.
246    #[parameter(default = true, flags = "SETTER", alias = "rw_enable_two_phase_agg")]
247    enable_two_phase_agg: bool,
248
249    /// Force two phase agg optimization whenever there's a choice between
250    /// optimizations. Defaults to false.
251    /// Setting this to true will always set `ENABLE_TWO_PHASE_AGG` to false.
252    #[parameter(default = false, flags = "SETTER", alias = "rw_force_two_phase_agg")]
253    force_two_phase_agg: bool,
254
255    /// Enable sharing of common sub-plans.
256    /// This means that DAG structured query plans can be constructed,
257    #[parameter(default = true, alias = "rw_enable_share_plan")]
258    /// rather than only tree structured query plans.
259    enable_share_plan: bool,
260
261    /// Enable split distinct agg
262    #[parameter(default = false, alias = "rw_force_split_distinct_agg")]
263    force_split_distinct_agg: bool,
264
265    /// See <https://www.postgresql.org/docs/current/runtime-config-client.html#GUC-INTERVALSTYLE>
266    #[parameter(default = "", rename = "intervalstyle")]
267    interval_style: String,
268
269    /// If `BATCH_PARALLELISM` is non-zero, batch queries will use this parallelism.
270    #[parameter(default = ConfigNonZeroU64::default())]
271    batch_parallelism: ConfigNonZeroU64,
272
273    /// The version of PostgreSQL that Risingwave claims to be.
274    #[parameter(default = PG_VERSION)]
275    server_version: String,
276
277    /// The version of PostgreSQL that Risingwave claims to be.
278    #[parameter(default = SERVER_VERSION_NUM)]
279    server_version_num: i32,
280
281    /// see <https://www.postgresql.org/docs/15/runtime-config-client.html#GUC-CLIENT-MIN-MESSAGES>
282    #[parameter(default = "notice")]
283    client_min_messages: String,
284
285    /// see <https://www.postgresql.org/docs/15/runtime-config-client.html#GUC-CLIENT-ENCODING>
286    #[parameter(default = SERVER_ENCODING, check_hook = check_client_encoding)]
287    client_encoding: String,
288
289    /// Enable decoupling sink and internal streaming graph or not
290    #[parameter(default = SinkDecouple::default())]
291    sink_decouple: SinkDecouple,
292
293    /// See <https://www.postgresql.org/docs/current/runtime-config-compatible.html#RUNTIME-CONFIG-COMPATIBLE-VERSION>
294    /// Unused in RisingWave, support for compatibility.
295    #[parameter(default = false)]
296    synchronize_seqscans: bool,
297
298    /// Abort query statement that takes more than the specified amount of time in sec. If
299    /// `log_min_error_statement` is set to ERROR or lower, the statement that timed out will also be
300    /// logged. If this value is specified without units, it is taken as milliseconds. A value of
301    /// zero (the default) disables the timeout.
302    #[parameter(default = 0u32)]
303    statement_timeout: u32,
304
305    /// Terminate any session that has been idle (that is, waiting for a client query) within an open transaction for longer than the specified amount of time in milliseconds.
306    #[parameter(default = 60000u32)]
307    idle_in_transaction_session_timeout: u32,
308
309    /// See <https://www.postgresql.org/docs/current/runtime-config-client.html#GUC-LOCK-TIMEOUT>
310    /// Unused in RisingWave, support for compatibility.
311    #[parameter(default = 0)]
312    lock_timeout: i32,
313
314    /// For limiting the startup time of a shareable CDC streaming source when the source is being created. Unit: seconds.
315    #[parameter(default = 60)]
316    cdc_source_wait_streaming_start_timeout: i32,
317
318    /// see <https://www.postgresql.org/docs/current/runtime-config-client.html#GUC-ROW-SECURITY>.
319    /// Unused in RisingWave, support for compatibility.
320    #[parameter(default = true)]
321    row_security: bool,
322
323    /// see <https://www.postgresql.org/docs/current/runtime-config-client.html#GUC-STANDARD-CONFORMING-STRINGS>
324    #[parameter(default = STANDARD_CONFORMING_STRINGS)]
325    standard_conforming_strings: String,
326
327    /// Set streaming rate limit (rows per second) for each parallelism for mv / source / sink backfilling
328    /// If set to -1, disable rate limit.
329    /// If set to 0, this pauses the snapshot read / source read.
330    #[parameter(default = DISABLE_BACKFILL_RATE_LIMIT)]
331    backfill_rate_limit: i32,
332
333    /// Set streaming rate limit (rows per second) for each parallelism for mv / source backfilling, source reads.
334    /// If set to -1, disable rate limit.
335    /// If set to 0, this pauses the snapshot read / source read.
336    #[parameter(default = DISABLE_SOURCE_RATE_LIMIT)]
337    source_rate_limit: i32,
338
339    /// Set streaming rate limit (rows per second) for each parallelism for table DML.
340    /// If set to -1, disable rate limit.
341    /// If set to 0, this pauses the DML.
342    #[parameter(default = DISABLE_DML_RATE_LIMIT)]
343    dml_rate_limit: i32,
344
345    /// Set sink rate limit (rows per second) for each parallelism for external sink.
346    /// If set to -1, disable rate limit.
347    /// If set to 0, this pauses the sink.
348    #[parameter(default = DISABLE_SINK_RATE_LIMIT)]
349    sink_rate_limit: i32,
350
351    /// Cache policy for partition cache in streaming over window.
352    /// Can be `full`, `recent`, `recent_first_n` or `recent_last_n`.
353    ///
354    /// This overrides the corresponding entry from the `[streaming.developer]` section in the config file,
355    /// taking effect for new streaming jobs created in the current session.
356    #[parameter(default = None, alias = "rw_streaming_over_window_cache_policy")]
357    streaming_over_window_cache_policy: OptionConfig<OverWindowCachePolicy>,
358
359    /// Run DDL statements in background
360    #[parameter(default = false)]
361    background_ddl: bool,
362
363    /// Enable shared source. Currently only for Kafka.
364    ///
365    /// When enabled, `CREATE SOURCE` will create a source streaming job, and `CREATE MATERIALIZED VIEWS` from the source
366    /// will forward the data from the same source streaming job, and also backfill prior data from the external source.
367    #[parameter(default = true)]
368    streaming_use_shared_source: bool,
369
370    /// Shows the server-side character set encoding. At present, this parameter can be shown but not set, because the encoding is determined at database creation time.
371    #[parameter(default = SERVER_ENCODING)]
372    server_encoding: String,
373
374    #[parameter(default = "hex", check_hook = check_bytea_output)]
375    bytea_output: String,
376
377    /// Bypass checks on cluster limits
378    ///
379    /// When enabled, `CREATE MATERIALIZED VIEW` will not fail if the cluster limit is hit.
380    #[parameter(default = BYPASS_CLUSTER_LIMITS)]
381    bypass_cluster_limits: bool,
382
383    /// The maximum number of parallelism a streaming query can use. Defaults to 256.
384    ///
385    /// Compared to `STREAMING_PARALLELISM`, which configures the initial parallelism, this configures
386    /// the maximum parallelism a streaming query can use in the future, if the cluster size changes or
387    /// users manually change the parallelism with `ALTER .. SET PARALLELISM`.
388    ///
389    /// It's not always a good idea to set this to a very large number, as it may cause performance
390    /// degradation when performing range scans on the table or the materialized view.
391    // a.k.a. vnode count
392    #[parameter(default = VirtualNode::COUNT_FOR_COMPAT, check_hook = check_streaming_max_parallelism)]
393    streaming_max_parallelism: usize,
394
395    /// Used to provide the connection information for the iceberg engine.
396    /// Format: `iceberg_engine_connection` = `schema_name.connection_name`.
397    #[parameter(default = "", check_hook = check_iceberg_engine_connection)]
398    iceberg_engine_connection: String,
399
400    /// Whether the streaming join should be unaligned or not.
401    #[parameter(default = false)]
402    streaming_enable_unaligned_join: bool,
403
404    /// The timeout for reading from the buffer of the sync log store on barrier.
405    /// Every epoch we will attempt to read the full buffer of the sync log store.
406    /// If we hit the timeout, we will stop reading and continue.
407    ///
408    /// This overrides the corresponding entry from the `[streaming.developer]` section in the config file,
409    /// taking effect for new streaming jobs created in the current session.
410    #[parameter(default = None)]
411    streaming_sync_log_store_pause_duration_ms: OptionConfig<usize>,
412
413    /// The max buffer size for sync logstore, before we start flushing.
414    ///
415    /// This overrides the corresponding entry from the `[streaming.developer]` section in the config file,
416    /// taking effect for new streaming jobs created in the current session.
417    #[parameter(default = None)]
418    streaming_sync_log_store_buffer_size: OptionConfig<usize>,
419
420    /// Whether to disable purifying the definition of the table or source upon retrieval.
421    /// Only set this if encountering issues with functionalities like `SHOW` or `ALTER TABLE/SOURCE`.
422    /// This config may be removed in the future.
423    #[parameter(default = false, flags = "NO_ALTER_SYS")]
424    disable_purify_definition: bool,
425
426    /// The `ef_search` used in querying hnsw vector index
427    #[parameter(default = 40_usize)] // default value borrowed from pg_vector
428    batch_hnsw_ef_search: usize,
429
430    /// Enable index selection for queries
431    #[parameter(default = true)]
432    enable_index_selection: bool,
433
434    /// Enable locality backfill for streaming queries. Defaults to false.
435    #[parameter(default = false)]
436    enable_locality_backfill: bool,
437
438    /// Duration in seconds before notifying the user that a long-running DDL operation (e.g., DROP TABLE, CANCEL JOBS)
439    /// is still running. Set to 0 to disable notifications. Defaults to 30 seconds.
440    #[parameter(default = 30u32)]
441    slow_ddl_notification_secs: u32,
442
443    /// Unsafe: Enable storage retention for non-append-only tables.
444    /// Enabling this can lead to streaming inconsistency and node panic
445    /// if there is any row INSERT/UPDATE/DELETE operation corresponding to the ttled primary key.
446    #[parameter(default = false)]
447    unsafe_enable_storage_retention_for_non_append_only_tables: bool,
448
449    /// Enable DataFusion Engine
450    /// When enabled, queries involving Iceberg tables will be executed using the DataFusion engine.
451    #[parameter(default = false)]
452    enable_datafusion_engine: bool,
453}
454
455fn check_iceberg_engine_connection(val: &str) -> Result<(), String> {
456    if val.is_empty() {
457        return Ok(());
458    }
459
460    let parts: Vec<&str> = val.split('.').collect();
461    if parts.len() != 2 {
462        return Err("Invalid iceberg engine connection format, Should be set to this format: schema_name.connection_name.".to_owned());
463    }
464
465    Ok(())
466}
467
468fn check_timezone(val: &str) -> Result<(), String> {
469    // Check if the provided string is a valid timezone.
470    Tz::from_str_insensitive(val).map_err(|_e| "Not a valid timezone")?;
471    Ok(())
472}
473
474fn check_client_encoding(val: &str) -> Result<(), String> {
475    // https://github.com/postgres/postgres/blob/REL_15_3/src/common/encnames.c#L525
476    let clean = val.replace(|c: char| !c.is_ascii_alphanumeric(), "");
477    if !clean.eq_ignore_ascii_case("UTF8") {
478        Err("Only support 'UTF8' for CLIENT_ENCODING".to_owned())
479    } else {
480        Ok(())
481    }
482}
483
484fn check_bytea_output(val: &str) -> Result<(), String> {
485    if val == "hex" {
486        Ok(())
487    } else {
488        Err("Only support 'hex' for BYTEA_OUTPUT".to_owned())
489    }
490}
491
492/// Check if the provided value is a valid max parallelism.
493fn check_streaming_max_parallelism(val: &usize) -> Result<(), String> {
494    match val {
495        // TODO(var-vnode): this is to prevent confusion with singletons, after we distinguish
496        // them better, we may allow 1 as the max parallelism (though not much point).
497        0 | 1 => Err("STREAMING_MAX_PARALLELISM must be greater than 1".to_owned()),
498        2..=VirtualNode::MAX_COUNT => Ok(()),
499        _ => Err(format!(
500            "STREAMING_MAX_PARALLELISM must be less than or equal to {}",
501            VirtualNode::MAX_COUNT
502        )),
503    }
504}
505
506impl SessionConfig {
507    pub fn set_force_two_phase_agg(
508        &mut self,
509        val: bool,
510        reporter: &mut impl ConfigReporter,
511    ) -> SessionConfigResult<bool> {
512        let set_val = self.set_force_two_phase_agg_inner(val, reporter)?;
513        if self.force_two_phase_agg {
514            self.set_enable_two_phase_agg(true, reporter)
515        } else {
516            Ok(set_val)
517        }
518    }
519
520    pub fn set_enable_two_phase_agg(
521        &mut self,
522        val: bool,
523        reporter: &mut impl ConfigReporter,
524    ) -> SessionConfigResult<bool> {
525        let set_val = self.set_enable_two_phase_agg_inner(val, reporter)?;
526        if !self.force_two_phase_agg {
527            self.set_force_two_phase_agg(false, reporter)
528        } else {
529            Ok(set_val)
530        }
531    }
532}
533
534pub struct VariableInfo {
535    pub name: String,
536    pub setting: String,
537    pub description: String,
538}
539
540/// Report status or notice to caller.
541pub trait ConfigReporter {
542    fn report_status(&mut self, key: &str, new_val: String);
543}
544
545// Report nothing.
546impl ConfigReporter for () {
547    fn report_status(&mut self, _key: &str, _new_val: String) {}
548}
549
550def_anyhow_newtype! {
551    pub SessionConfigToOverrideError,
552    toml::ser::Error => "failed to serialize session config",
553    ConfigMergeError => transparent,
554}
555
556impl SessionConfig {
557    /// Generate an initial override for the streaming config from the session config.
558    pub fn to_initial_streaming_config_override(
559        &self,
560    ) -> Result<String, SessionConfigToOverrideError> {
561        let mut table = toml::Table::new();
562
563        // TODO: make this more type safe.
564        // We `unwrap` here to assert the hard-coded keys are correct.
565        if let Some(v) = self.streaming_join_encoding.as_ref() {
566            table
567                .upsert("streaming.developer.join_encoding_type", v)
568                .unwrap();
569        }
570        if let Some(v) = self.streaming_sync_log_store_pause_duration_ms.as_ref() {
571            table
572                .upsert("streaming.developer.sync_log_store_pause_duration_ms", v)
573                .unwrap();
574        }
575        if let Some(v) = self.streaming_sync_log_store_buffer_size.as_ref() {
576            table
577                .upsert("streaming.developer.sync_log_store_buffer_size", v)
578                .unwrap();
579        }
580        if let Some(v) = self.streaming_over_window_cache_policy.as_ref() {
581            table
582                .upsert("streaming.developer.over_window_cache_policy", v)
583                .unwrap();
584        }
585
586        let res = toml::to_string(&table)?;
587
588        // Validate all fields are valid by trying to merge it to the default config.
589        if !res.is_empty() {
590            let merged =
591                merge_streaming_config_section(&StreamingConfig::default(), res.as_str())?.unwrap();
592
593            let unrecognized_keys = merged.unrecognized_keys().collect_vec();
594            if !unrecognized_keys.is_empty() {
595                bail!("unrecognized configs: {:?}", unrecognized_keys);
596            }
597        }
598
599        Ok(res)
600    }
601}
602
603#[cfg(test)]
604mod test {
605    use expect_test::expect;
606
607    use super::*;
608
609    #[derive(SessionConfig)]
610    struct TestConfig {
611        #[parameter(default = 1, flags = "NO_ALTER_SYS", alias = "test_param_alias" | "alias_param_test")]
612        test_param: i32,
613    }
614
615    #[test]
616    fn test_session_config_alias() {
617        let mut config = TestConfig::default();
618        config.set("test_param", "2".to_owned(), &mut ()).unwrap();
619        assert_eq!(config.get("test_param_alias").unwrap(), "2");
620        config
621            .set("alias_param_test", "3".to_owned(), &mut ())
622            .unwrap();
623        assert_eq!(config.get("test_param_alias").unwrap(), "3");
624        assert!(TestConfig::check_no_alter_sys("test_param").unwrap());
625    }
626
627    #[test]
628    fn test_initial_streaming_config_override() {
629        let mut config = SessionConfig::default();
630        config
631            .set_streaming_join_encoding(Some(JoinEncodingType::Cpu).into(), &mut ())
632            .unwrap();
633        config
634            .set_streaming_over_window_cache_policy(
635                Some(OverWindowCachePolicy::RecentFirstN).into(),
636                &mut (),
637            )
638            .unwrap();
639
640        // Check the converted config override string.
641        let override_str = config.to_initial_streaming_config_override().unwrap();
642        expect![[r#"
643            [streaming.developer]
644            join_encoding_type = "cpu_optimized"
645            over_window_cache_policy = "recent_first_n"
646        "#]]
647        .assert_eq(&override_str);
648
649        // Try merging it to the default streaming config.
650        let merged = merge_streaming_config_section(&StreamingConfig::default(), &override_str)
651            .unwrap()
652            .unwrap();
653        assert_eq!(merged.developer.join_encoding_type, JoinEncodingType::Cpu);
654        assert_eq!(
655            merged.developer.over_window_cache_policy,
656            OverWindowCachePolicy::RecentFirstN
657        );
658    }
659}