risingwave_common/session_config/
mod.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod iceberg_query_storage_mode;
16mod non_zero64;
17mod opt;
18pub mod parallelism;
19mod query_mode;
20mod search_path;
21pub mod sink_decouple;
22mod statement_timeout;
23mod transaction_isolation_level;
24mod visibility_mode;
25
26use chrono_tz::Tz;
27pub use iceberg_query_storage_mode::IcebergQueryStorageMode;
28use itertools::Itertools;
29pub use opt::OptionConfig;
30pub use query_mode::QueryMode;
31use risingwave_common_proc_macro::{ConfigDoc, SessionConfig};
32pub use search_path::{SearchPath, USER_NAME_WILD_CARD};
33use serde::{Deserialize, Serialize};
34pub use statement_timeout::StatementTimeout;
35use thiserror::Error;
36
37use self::non_zero64::ConfigNonZeroU64;
38use crate::config::mutate::TomlTableMutateExt;
39use crate::config::streaming::{JoinEncodingType, OverWindowCachePolicy};
40use crate::config::{ConfigMergeError, StreamingConfig, merge_streaming_config_section};
41use crate::hash::VirtualNode;
42use crate::session_config::parallelism::{ConfigBackfillParallelism, ConfigParallelism};
43use crate::session_config::sink_decouple::SinkDecouple;
44use crate::session_config::transaction_isolation_level::IsolationLevel;
45pub use crate::session_config::visibility_mode::VisibilityMode;
46use crate::{PG_VERSION, SERVER_ENCODING, SERVER_VERSION_NUM, STANDARD_CONFORMING_STRINGS};
47
48pub const SESSION_CONFIG_LIST_SEP: &str = ", ";
49
50#[derive(Error, Debug)]
51pub enum SessionConfigError {
52    #[error("Invalid value `{value}` for `{entry}`")]
53    InvalidValue {
54        entry: &'static str,
55        value: String,
56        source: anyhow::Error,
57    },
58
59    #[error("Unrecognized config entry `{0}`")]
60    UnrecognizedEntry(String),
61}
62
63type SessionConfigResult<T> = std::result::Result<T, SessionConfigError>;
64
65// NOTE(kwannoel): We declare it separately as a constant,
66// otherwise seems like it can't infer the type of -1 when written inline.
67const DISABLE_BACKFILL_RATE_LIMIT: i32 = -1;
68const DISABLE_SOURCE_RATE_LIMIT: i32 = -1;
69const DISABLE_DML_RATE_LIMIT: i32 = -1;
70const DISABLE_SINK_RATE_LIMIT: i32 = -1;
71
72/// Default to bypass cluster limits iff in debug mode.
73const BYPASS_CLUSTER_LIMITS: bool = cfg!(debug_assertions);
74
75/// This is the Session Config of RisingWave.
76///
77/// All config entries implement `Display` and `FromStr` for getter and setter, to be read and
78/// altered within a session.
79///
80/// Users can change the default value of a configuration entry using `ALTER SYSTEM SET`. To
81/// facilitate this, a `serde` implementation is used as the wire format for retrieving initial
82/// configurations and updates from the meta service. It's important to note that the meta
83/// service stores the overridden value of each configuration entry per row with `Display` in
84/// the meta store, rather than using the `serde` format. However, we still delegate the `serde`
85/// impl of all fields to `Display`/`FromStr` to make it consistent.
86#[serde_with::apply(_ => #[serde_as(as = "serde_with::DisplayFromStr")] )]
87#[serde_with::serde_as]
88#[derive(Clone, Debug, Deserialize, Serialize, SessionConfig, ConfigDoc, PartialEq)]
89pub struct SessionConfig {
90    /// If `RW_IMPLICIT_FLUSH` is on, then every INSERT/UPDATE/DELETE statement will block
91    /// until the entire dataflow is refreshed. In other words, every related table & MV will
92    /// be able to see the write.
93    #[parameter(default = false, alias = "rw_implicit_flush")]
94    implicit_flush: bool,
95
96    /// If `DML_WAIT_PERSISTENCE` is on, then every INSERT/UPDATE/DELETE statement waits until
97    /// the transaction is included in a checkpoint. This is ignored when `IMPLICIT_FLUSH` is on.
98    #[parameter(default = false)]
99    dml_wait_persistence: bool,
100
101    /// If `CREATE_COMPACTION_GROUP_FOR_MV` is on, dedicated compaction groups will be created in
102    /// MV creation.
103    #[parameter(default = false)]
104    create_compaction_group_for_mv: bool,
105
106    /// A temporary config variable to force query running in either local or distributed mode.
107    /// The default value is auto which means let the system decide to run batch queries in local
108    /// or distributed mode automatically.
109    #[parameter(default = QueryMode::default())]
110    query_mode: QueryMode,
111
112    /// For Iceberg engine tables, which storage to use for batch SELECT: Iceberg (columnar) or
113    /// Hummock (row). Only affects batch SELECT on tables with ENGINE = ICEBERG.
114    #[parameter(default = IcebergQueryStorageMode::default())]
115    iceberg_query_storage_mode: IcebergQueryStorageMode,
116
117    /// Sets the number of digits displayed for floating-point values.
118    /// See <https://www.postgresql.org/docs/current/runtime-config-client.html#:~:text=for%20more%20information.-,extra_float_digits,-(integer)>
119    #[parameter(default = 1)]
120    extra_float_digits: i32,
121
122    /// Sets the application name to be reported in statistics and logs.
123    /// See <https://www.postgresql.org/docs/14/runtime-config-logging.html#:~:text=What%20to%20Log-,application_name,-(string)>
124    #[parameter(default = "", flags = "REPORT")]
125    application_name: String,
126
127    /// It is typically set by an application upon connection to the server.
128    /// see <https://www.postgresql.org/docs/current/runtime-config-client.html#GUC-DATESTYLE>
129    #[parameter(default = "", rename = "datestyle")]
130    date_style: String,
131
132    /// Force the use of lookup join instead of hash join when possible for local batch execution.
133    #[parameter(default = true, alias = "rw_batch_enable_lookup_join")]
134    batch_enable_lookup_join: bool,
135
136    /// Enable usage of sortAgg instead of hash agg when order property is satisfied in batch
137    /// execution
138    #[parameter(default = true, alias = "rw_batch_enable_sort_agg")]
139    batch_enable_sort_agg: bool,
140
141    /// Enable distributed DML, so an insert, delete, and update statement can be executed in a distributed way (e.g. running in multiple compute nodes).
142    /// No atomicity guarantee in this mode. Its goal is to gain the best ingestion performance for initial batch ingestion where users always can drop their table when failure happens.
143    #[parameter(default = false, rename = "batch_enable_distributed_dml")]
144    batch_enable_distributed_dml: bool,
145
146    /// Evaluate expression in strict mode for batch queries.
147    /// If set to false, an expression failure will not cause an error but leave a null value
148    /// on the result set.
149    #[parameter(default = true)]
150    batch_expr_strict_mode: bool,
151
152    /// The max gap allowed to transform small range scan into multi point lookup.
153    #[parameter(default = 8)]
154    max_split_range_gap: i32,
155
156    /// Sets the order in which schemas are searched when an object (table, data type, function, etc.)
157    /// is referenced by a simple name with no schema specified.
158    /// See <https://www.postgresql.org/docs/14/runtime-config-client.html#GUC-SEARCH-PATH>
159    #[parameter(default = SearchPath::default())]
160    search_path: SearchPath,
161
162    /// If `VISIBILITY_MODE` is all, we will support querying data without checkpoint.
163    #[parameter(default = VisibilityMode::default())]
164    visibility_mode: VisibilityMode,
165
166    /// See <https://www.postgresql.org/docs/current/transaction-iso.html>
167    #[parameter(default = IsolationLevel::default())]
168    transaction_isolation: IsolationLevel,
169
170    /// Select as of specific epoch.
171    /// Sets the historical epoch for querying data. If 0, querying latest data.
172    #[parameter(default = ConfigNonZeroU64::default())]
173    query_epoch: ConfigNonZeroU64,
174
175    /// Session timezone. Defaults to UTC.
176    #[parameter(default = "UTC", check_hook = check_timezone)]
177    timezone: String,
178
179    /// The execution parallelism for streaming queries, including tables, materialized views,
180    /// indexes, and sinks. Defaults to `default`, which preserves the legacy adaptive
181    /// scheduling behavior during effective resolution.
182    #[parameter(default = ConfigParallelism::Default, flags = "SESSION_INIT")]
183    streaming_parallelism: ConfigParallelism,
184
185    /// Specific parallelism for backfill. Only `default` and a fixed positive integer are
186    /// supported here. Adaptive backfill strategies are deferred to a later change.
187    #[parameter(
188        default = ConfigBackfillParallelism::Default,
189        check_hook = check_streaming_parallelism_for_backfill,
190        flags = "SESSION_INIT"
191    )]
192    streaming_parallelism_for_backfill: ConfigBackfillParallelism,
193
194    /// Specific parallelism for table. Defaults to `default`, which preserves the legacy
195    /// bounded adaptive behavior only when the global parallelism itself remains `default`.
196    /// Otherwise it follows the explicit global parallelism.
197    #[parameter(default = ConfigParallelism::Default, flags = "SESSION_INIT")]
198    streaming_parallelism_for_table: ConfigParallelism,
199
200    /// Specific parallelism for sink. By default, it will fall back to `STREAMING_PARALLELISM`.
201    #[parameter(default = ConfigParallelism::Default, flags = "SESSION_INIT")]
202    streaming_parallelism_for_sink: ConfigParallelism,
203
204    /// Specific parallelism for index. By default, it will fall back to `STREAMING_PARALLELISM`.
205    #[parameter(default = ConfigParallelism::Default, flags = "SESSION_INIT")]
206    streaming_parallelism_for_index: ConfigParallelism,
207
208    /// Specific parallelism for source. Defaults to `default`, which preserves the legacy
209    /// bounded adaptive behavior only when the global parallelism itself remains `default`.
210    /// Otherwise it follows the explicit global parallelism.
211    #[parameter(default = ConfigParallelism::Default, flags = "SESSION_INIT")]
212    streaming_parallelism_for_source: ConfigParallelism,
213
214    /// Specific parallelism for materialized view. By default, it will fall back to `STREAMING_PARALLELISM`.
215    #[parameter(default = ConfigParallelism::Default, flags = "SESSION_INIT")]
216    streaming_parallelism_for_materialized_view: ConfigParallelism,
217
218    /// Enable delta join for streaming queries. Defaults to false.
219    #[parameter(default = false, alias = "rw_streaming_enable_delta_join")]
220    streaming_enable_delta_join: bool,
221
222    /// Enable bushy join for streaming queries. Defaults to true.
223    #[parameter(default = true, alias = "rw_streaming_enable_bushy_join")]
224    streaming_enable_bushy_join: bool,
225
226    /// Force filtering to be done inside the join whenever there's a choice between optimizations.
227    /// Defaults to false.
228    #[parameter(default = false, alias = "rw_streaming_force_filter_inside_join")]
229    streaming_force_filter_inside_join: bool,
230
231    /// Enable arrangement backfill for streaming queries. Defaults to true.
232    /// When set to true, the parallelism of the upstream fragment will be
233    /// decoupled from the parallelism of the downstream scan fragment.
234    /// Or more generally, the parallelism of the upstream table / index / mv
235    /// will be decoupled from the parallelism of the downstream table / index / mv / sink.
236    #[parameter(default = true)]
237    streaming_use_arrangement_backfill: bool,
238
239    #[parameter(default = true)]
240    streaming_use_snapshot_backfill: bool,
241
242    /// Enable serverless backfill for streaming queries. Defaults to false.
243    #[parameter(default = false)]
244    enable_serverless_backfill: bool,
245
246    /// Allow `jsonb` in stream key
247    #[parameter(default = false, alias = "rw_streaming_allow_jsonb_in_stream_key")]
248    streaming_allow_jsonb_in_stream_key: bool,
249
250    /// Unsafe: allow impure expressions on non-append-only streams without materialization.
251    ///
252    /// This may lead to inconsistent results or panics due to re-evaluation on updates/retracts.
253    #[parameter(default = false)]
254    streaming_unsafe_allow_unmaterialized_impure_expr: bool,
255
256    /// Separate consecutive `StreamHashJoin` by no-shuffle `StreamExchange`
257    #[parameter(default = false)]
258    streaming_separate_consecutive_join: bool,
259
260    /// Separate `StreamSink` by no-shuffle `StreamExchange`
261    #[parameter(default = false)]
262    streaming_separate_sink: bool,
263
264    /// Determine which encoding will be used to encode join rows in operator cache.
265    ///
266    /// This overrides the corresponding entry from the `[streaming.developer]` section in the config file,
267    /// taking effect for new streaming jobs created in the current session.
268    #[parameter(default = None)]
269    streaming_join_encoding: OptionConfig<JoinEncodingType>,
270
271    /// Enable join ordering for streaming and batch queries. Defaults to true.
272    #[parameter(default = true, alias = "rw_enable_join_ordering")]
273    enable_join_ordering: bool,
274
275    /// Enable two phase agg optimization. Defaults to true.
276    /// Setting this to true will always set `FORCE_TWO_PHASE_AGG` to false.
277    #[parameter(default = true, flags = "SETTER", alias = "rw_enable_two_phase_agg")]
278    enable_two_phase_agg: bool,
279
280    /// Force two phase agg optimization whenever there's a choice between
281    /// optimizations. Defaults to false.
282    /// Setting this to true will always set `ENABLE_TWO_PHASE_AGG` to false.
283    #[parameter(default = false, flags = "SETTER", alias = "rw_force_two_phase_agg")]
284    force_two_phase_agg: bool,
285
286    /// Enable sharing of common sub-plans.
287    /// This means that DAG structured query plans can be constructed,
288    #[parameter(default = true, alias = "rw_enable_share_plan")]
289    /// rather than only tree structured query plans.
290    enable_share_plan: bool,
291
292    /// Enable split distinct agg
293    #[parameter(default = false, alias = "rw_force_split_distinct_agg")]
294    force_split_distinct_agg: bool,
295
296    /// See <https://www.postgresql.org/docs/current/runtime-config-client.html#GUC-INTERVALSTYLE>
297    #[parameter(default = "", rename = "intervalstyle")]
298    interval_style: String,
299
300    /// If `BATCH_PARALLELISM` is non-zero, batch queries will use this parallelism.
301    #[parameter(default = ConfigNonZeroU64::default())]
302    batch_parallelism: ConfigNonZeroU64,
303
304    /// The version of PostgreSQL that Risingwave claims to be.
305    #[parameter(default = PG_VERSION)]
306    server_version: String,
307
308    /// The version of PostgreSQL that Risingwave claims to be.
309    #[parameter(default = SERVER_VERSION_NUM)]
310    server_version_num: i32,
311
312    /// see <https://www.postgresql.org/docs/15/runtime-config-client.html#GUC-CLIENT-MIN-MESSAGES>
313    #[parameter(default = "notice")]
314    client_min_messages: String,
315
316    /// see <https://www.postgresql.org/docs/15/runtime-config-client.html#GUC-CLIENT-ENCODING>
317    #[parameter(default = SERVER_ENCODING, check_hook = check_client_encoding)]
318    client_encoding: String,
319
320    /// Enable decoupling sink and internal streaming graph or not
321    #[parameter(default = SinkDecouple::default())]
322    sink_decouple: SinkDecouple,
323
324    /// See <https://www.postgresql.org/docs/current/runtime-config-compatible.html#RUNTIME-CONFIG-COMPATIBLE-VERSION>
325    /// Unused in RisingWave, support for compatibility.
326    #[parameter(default = false)]
327    synchronize_seqscans: bool,
328
329    /// Abort query statement that takes more than the specified amount of time in sec. If
330    /// `log_min_error_statement` is set to ERROR or lower, the statement that timed out will also be
331    /// logged. If this value is specified without units, it is taken as milliseconds. A value of
332    /// zero (the default) disables the timeout.
333    #[parameter(default = StatementTimeout::default())]
334    statement_timeout: StatementTimeout,
335
336    /// Terminate any session that has been idle (that is, waiting for a client query) within an open transaction for longer than the specified amount of time in milliseconds.
337    #[parameter(default = 60000u32)]
338    idle_in_transaction_session_timeout: u32,
339
340    /// See <https://www.postgresql.org/docs/current/runtime-config-client.html#GUC-LOCK-TIMEOUT>
341    /// Unused in RisingWave, support for compatibility.
342    #[parameter(default = 0)]
343    lock_timeout: i32,
344
345    /// For limiting the startup time of a shareable CDC streaming source when the source is being created. Unit: seconds.
346    #[parameter(default = 60)]
347    cdc_source_wait_streaming_start_timeout: i32,
348
349    /// see <https://www.postgresql.org/docs/current/runtime-config-client.html#GUC-ROW-SECURITY>.
350    /// Unused in RisingWave, support for compatibility.
351    #[parameter(default = true)]
352    row_security: bool,
353
354    /// see <https://www.postgresql.org/docs/current/runtime-config-client.html#GUC-STANDARD-CONFORMING-STRINGS>
355    #[parameter(default = STANDARD_CONFORMING_STRINGS)]
356    standard_conforming_strings: String,
357
358    /// Set streaming rate limit (rows per second) for each parallelism for mv / source / sink backfilling
359    /// If set to -1, disable rate limit.
360    /// If set to 0, this pauses the snapshot read / source read.
361    #[parameter(default = DISABLE_BACKFILL_RATE_LIMIT)]
362    backfill_rate_limit: i32,
363
364    /// Set streaming rate limit (rows per second) for each parallelism for mv / source backfilling, source reads.
365    /// If set to -1, disable rate limit.
366    /// If set to 0, this pauses the snapshot read / source read.
367    #[parameter(default = DISABLE_SOURCE_RATE_LIMIT)]
368    source_rate_limit: i32,
369
370    /// Set streaming rate limit (rows per second) for each parallelism for table DML.
371    /// If set to -1, disable rate limit.
372    /// If set to 0, this pauses the DML.
373    #[parameter(default = DISABLE_DML_RATE_LIMIT)]
374    dml_rate_limit: i32,
375
376    /// Set sink rate limit (rows per second) for each parallelism for external sink.
377    /// If set to -1, disable rate limit.
378    /// If set to 0, this pauses the sink.
379    #[parameter(default = DISABLE_SINK_RATE_LIMIT)]
380    sink_rate_limit: i32,
381
382    /// Cache policy for partition cache in streaming over window.
383    /// Can be `full`, `recent`, `recent_first_n` or `recent_last_n`.
384    ///
385    /// This overrides the corresponding entry from the `[streaming.developer]` section in the config file,
386    /// taking effect for new streaming jobs created in the current session.
387    #[parameter(default = None, alias = "rw_streaming_over_window_cache_policy")]
388    streaming_over_window_cache_policy: OptionConfig<OverWindowCachePolicy>,
389
390    /// Run DDL statements in background
391    #[parameter(default = false)]
392    background_ddl: bool,
393
394    /// Enable shared source. Currently only for Kafka.
395    ///
396    /// When enabled, `CREATE SOURCE` will create a source streaming job, and `CREATE MATERIALIZED VIEWS` from the source
397    /// will forward the data from the same source streaming job, and also backfill prior data from the external source.
398    #[parameter(default = true)]
399    streaming_use_shared_source: bool,
400
401    /// Enable in-memory cache for `AsOf` join executor.
402    ///
403    /// When enabled (default), `AsOf` join uses the cache-based implementation.
404    ///
405    /// When disabled, `AsOf` join uses a no-cache implementation that directly queries
406    /// the state table on-demand, reducing unnecessary data fetches for cache.
407    #[parameter(default = true)]
408    streaming_asof_join_use_cache: bool,
409
410    /// Shows the server-side character set encoding. At present, this parameter can be shown but not set, because the encoding is determined at database creation time.
411    #[parameter(default = SERVER_ENCODING)]
412    server_encoding: String,
413
414    #[parameter(default = "hex", check_hook = check_bytea_output)]
415    bytea_output: String,
416
417    /// Bypass checks on cluster limits
418    ///
419    /// When enabled, `CREATE MATERIALIZED VIEW` will not fail if the cluster limit is hit.
420    #[parameter(default = BYPASS_CLUSTER_LIMITS)]
421    bypass_cluster_limits: bool,
422
423    /// The maximum number of parallelism a streaming query can use. Defaults to 256.
424    ///
425    /// Compared to `STREAMING_PARALLELISM`, which configures the initial parallelism, this configures
426    /// the maximum parallelism a streaming query can use in the future, if the cluster size changes or
427    /// users manually change the parallelism with `ALTER .. SET PARALLELISM`.
428    ///
429    /// It's not always a good idea to set this to a very large number, as it may cause performance
430    /// degradation when performing range scans on the table or the materialized view.
431    // a.k.a. vnode count
432    #[parameter(default = VirtualNode::COUNT_FOR_COMPAT, check_hook = check_streaming_max_parallelism)]
433    streaming_max_parallelism: usize,
434
435    /// Used to provide the connection information for the iceberg engine.
436    /// Format: `iceberg_engine_connection` = `schema_name.connection_name`.
437    #[parameter(default = "", check_hook = check_iceberg_engine_connection)]
438    iceberg_engine_connection: String,
439
440    /// Whether the streaming join should be unaligned or not.
441    #[parameter(default = false)]
442    streaming_enable_unaligned_join: bool,
443
444    /// The timeout for reading from the buffer of the sync log store on barrier.
445    /// Every epoch we will attempt to read the full buffer of the sync log store.
446    /// If we hit the timeout, we will stop reading and continue.
447    ///
448    /// This overrides the corresponding entry from the `[streaming.developer]` section in the config file,
449    /// taking effect for new streaming jobs created in the current session.
450    #[parameter(default = None)]
451    streaming_sync_log_store_pause_duration_ms: OptionConfig<usize>,
452
453    /// The max buffer size for sync logstore, before we start flushing.
454    ///
455    /// This overrides the corresponding entry from the `[streaming.developer]` section in the config file,
456    /// taking effect for new streaming jobs created in the current session.
457    #[parameter(default = None)]
458    streaming_sync_log_store_buffer_size: OptionConfig<usize>,
459
460    /// Whether to disable purifying the definition of the table or source upon retrieval.
461    /// Only set this if encountering issues with functionalities like `SHOW` or `ALTER TABLE/SOURCE`.
462    /// This config may be removed in the future.
463    #[parameter(default = false, flags = "NO_ALTER_SYS")]
464    disable_purify_definition: bool,
465
466    /// The `ef_search` used in querying hnsw vector index
467    #[parameter(default = 40_usize)] // default value borrowed from pg_vector
468    batch_hnsw_ef_search: usize,
469
470    /// Enable index selection for queries
471    #[parameter(default = true)]
472    enable_index_selection: bool,
473
474    /// Enable mv selection for queries
475    #[parameter(default = false)]
476    enable_mv_selection: bool,
477
478    /// Enable locality backfill for streaming queries. Defaults to false.
479    #[parameter(default = false)]
480    enable_locality_backfill: bool,
481
482    /// Duration in seconds before notifying the user that a long-running DDL operation (e.g., DROP TABLE, CANCEL JOBS)
483    /// is still running. Set to 0 to disable notifications. Defaults to 30 seconds.
484    #[parameter(default = 30u32)]
485    slow_ddl_notification_secs: u32,
486
487    /// Unsafe: Enable storage retention for non-append-only tables.
488    /// Enabling this can lead to streaming inconsistency and node panic
489    /// if there is any row INSERT/UPDATE/DELETE operation corresponding to the ttled primary key.
490    #[parameter(default = false)]
491    unsafe_enable_storage_retention_for_non_append_only_tables: bool,
492
493    /// Enable DataFusion Engine
494    /// When enabled, queries involving Iceberg tables will be executed using the DataFusion engine.
495    #[parameter(default = true)]
496    enable_datafusion_engine: bool,
497
498    /// Prefer hash join over sort merge join in DataFusion engine
499    /// When enabled, the DataFusion engine will prioritize hash joins for query execution plans,
500    /// potentially improving performance for certain workloads, but may cause OOM for large datasets.
501    #[parameter(default = true)]
502    datafusion_prefer_hash_join: bool,
503
504    /// Emit chunks in upsert format for `UPDATE` and `DELETE` DMLs.
505    /// May lead to undefined behavior if the table is created with `ON CONFLICT DO NOTHING`.
506    ///
507    /// When enabled:
508    /// - `UPDATE` will only emit `Insert` records for new rows, instead of `Update` records.
509    /// - `DELETE` will only include key columns and pad the rest with NULL, instead of emitting complete rows.
510    #[parameter(default = false)]
511    upsert_dml: bool,
512}
513
514fn check_iceberg_engine_connection(val: &str) -> Result<(), String> {
515    if val.is_empty() {
516        return Ok(());
517    }
518
519    let parts: Vec<&str> = val.split('.').collect();
520    if parts.len() != 2 {
521        return Err("Invalid iceberg engine connection format, Should be set to this format: schema_name.connection_name.".to_owned());
522    }
523
524    Ok(())
525}
526
527fn check_timezone(val: &str) -> Result<(), String> {
528    // Check if the provided string is a valid timezone.
529    Tz::from_str_insensitive(val).map_err(|_e| "Not a valid timezone")?;
530    Ok(())
531}
532
533fn check_client_encoding(val: &str) -> Result<(), String> {
534    // https://github.com/postgres/postgres/blob/REL_15_3/src/common/encnames.c#L525
535    let clean = val.replace(|c: char| !c.is_ascii_alphanumeric(), "");
536    if !clean.eq_ignore_ascii_case("UTF8") {
537        Err("Only support 'UTF8' for CLIENT_ENCODING".to_owned())
538    } else {
539        Ok(())
540    }
541}
542
543fn check_bytea_output(val: &str) -> Result<(), String> {
544    if val == "hex" {
545        Ok(())
546    } else {
547        Err("Only support 'hex' for BYTEA_OUTPUT".to_owned())
548    }
549}
550
551/// Check if the provided value is a valid max parallelism.
552fn check_streaming_max_parallelism(val: &usize) -> Result<(), String> {
553    match val {
554        // TODO(var-vnode): this is to prevent confusion with singletons, after we distinguish
555        // them better, we may allow 1 as the max parallelism (though not much point).
556        0 | 1 => Err("STREAMING_MAX_PARALLELISM must be greater than 1".to_owned()),
557        2..=VirtualNode::MAX_COUNT => Ok(()),
558        _ => Err(format!(
559            "STREAMING_MAX_PARALLELISM must be less than or equal to {}",
560            VirtualNode::MAX_COUNT
561        )),
562    }
563}
564
565fn check_streaming_parallelism_for_backfill(val: &ConfigBackfillParallelism) -> Result<(), String> {
566    match val {
567        ConfigBackfillParallelism::Default | ConfigBackfillParallelism::Fixed(_) => Ok(()),
568        ConfigBackfillParallelism::Adaptive
569        | ConfigBackfillParallelism::Bounded(_)
570        | ConfigBackfillParallelism::Ratio(_) => Err(
571            "Only `default` or fixed backfill parallelism is supported here; adaptive backfill strategy is deferred to a later change.".to_owned(),
572        ),
573    }
574}
575
576impl SessionConfig {
577    pub fn set_force_two_phase_agg(
578        &mut self,
579        val: bool,
580        reporter: &mut impl ConfigReporter,
581    ) -> SessionConfigResult<bool> {
582        let set_val = self.set_force_two_phase_agg_inner(val, reporter)?;
583        if self.force_two_phase_agg {
584            self.set_enable_two_phase_agg(true, reporter)
585        } else {
586            Ok(set_val)
587        }
588    }
589
590    pub fn set_enable_two_phase_agg(
591        &mut self,
592        val: bool,
593        reporter: &mut impl ConfigReporter,
594    ) -> SessionConfigResult<bool> {
595        let set_val = self.set_enable_two_phase_agg_inner(val, reporter)?;
596        if !self.force_two_phase_agg {
597            self.set_force_two_phase_agg(false, reporter)
598        } else {
599            Ok(set_val)
600        }
601    }
602}
603
604pub struct VariableInfo {
605    pub name: String,
606    pub setting: String,
607    pub description: String,
608}
609
610/// Report status or notice to caller.
611pub trait ConfigReporter {
612    fn report_status(&mut self, key: &str, new_val: String);
613}
614
615// Report nothing.
616impl ConfigReporter for () {
617    fn report_status(&mut self, _key: &str, _new_val: String) {}
618}
619
620def_anyhow_newtype! {
621    pub SessionConfigToOverrideError,
622    toml::ser::Error => "failed to serialize session config",
623    ConfigMergeError => transparent,
624}
625
626impl SessionConfig {
627    /// Generate an initial override for the streaming config from the session config.
628    pub fn to_initial_streaming_config_override(
629        &self,
630    ) -> Result<String, SessionConfigToOverrideError> {
631        let mut table = toml::Table::new();
632
633        // TODO: make this more type safe.
634        // We `unwrap` here to assert the hard-coded keys are correct.
635        if let Some(v) = self.streaming_join_encoding.as_ref() {
636            table
637                .upsert("streaming.developer.join_encoding_type", v)
638                .unwrap();
639        }
640        if let Some(v) = self.streaming_sync_log_store_pause_duration_ms.as_ref() {
641            table
642                .upsert("streaming.developer.sync_log_store_pause_duration_ms", v)
643                .unwrap();
644        }
645        if let Some(v) = self.streaming_sync_log_store_buffer_size.as_ref() {
646            table
647                .upsert("streaming.developer.sync_log_store_buffer_size", v)
648                .unwrap();
649        }
650        if let Some(v) = self.streaming_over_window_cache_policy.as_ref() {
651            table
652                .upsert("streaming.developer.over_window_cache_policy", v)
653                .unwrap();
654        }
655
656        let res = toml::to_string(&table)?;
657
658        // Validate all fields are valid by trying to merge it to the default config.
659        if !res.is_empty() {
660            let merged =
661                merge_streaming_config_section(&StreamingConfig::default(), res.as_str())?.unwrap();
662
663            let unrecognized_keys = merged.unrecognized_keys().collect_vec();
664            if !unrecognized_keys.is_empty() {
665                bail!("unrecognized configs: {:?}", unrecognized_keys);
666            }
667        }
668
669        Ok(res)
670    }
671}
672
673#[cfg(test)]
674mod test {
675    use expect_test::expect;
676
677    use super::*;
678
679    #[derive(SessionConfig)]
680    struct TestConfig {
681        #[parameter(default = 1, flags = "NO_ALTER_SYS", alias = "test_param_alias" | "alias_param_test")]
682        test_param: i32,
683    }
684
685    #[test]
686    fn test_session_config_alias() {
687        let mut config = TestConfig::default();
688        config.set("test_param", "2".to_owned(), &mut ()).unwrap();
689        assert_eq!(config.get("test_param_alias").unwrap(), "2");
690        config
691            .set("alias_param_test", "3".to_owned(), &mut ())
692            .unwrap();
693        assert_eq!(config.get("test_param_alias").unwrap(), "3");
694        assert!(TestConfig::check_no_alter_sys("test_param").unwrap());
695    }
696
697    #[test]
698    fn test_initial_streaming_config_override() {
699        let mut config = SessionConfig::default();
700        config
701            .set_streaming_join_encoding(Some(JoinEncodingType::Cpu).into(), &mut ())
702            .unwrap();
703        config
704            .set_streaming_over_window_cache_policy(
705                Some(OverWindowCachePolicy::RecentFirstN).into(),
706                &mut (),
707            )
708            .unwrap();
709
710        // Check the converted config override string.
711        let override_str = config.to_initial_streaming_config_override().unwrap();
712        expect![[r#"
713            [streaming.developer]
714            join_encoding_type = "cpu_optimized"
715            over_window_cache_policy = "recent_first_n"
716        "#]]
717        .assert_eq(&override_str);
718
719        // Try merging it to the default streaming config.
720        let merged = merge_streaming_config_section(&StreamingConfig::default(), &override_str)
721            .unwrap()
722            .unwrap();
723        assert_eq!(merged.developer.join_encoding_type, JoinEncodingType::Cpu);
724        assert_eq!(
725            merged.developer.over_window_cache_policy,
726            OverWindowCachePolicy::RecentFirstN
727        );
728    }
729
730    #[test]
731    fn test_streaming_parallelism_defaults() {
732        let config = SessionConfig::default();
733
734        assert_eq!(config.streaming_parallelism(), ConfigParallelism::Default);
735        assert_eq!(
736            config.streaming_parallelism_for_table(),
737            ConfigParallelism::Default
738        );
739        assert_eq!(
740            config.streaming_parallelism_for_source(),
741            ConfigParallelism::Default
742        );
743        assert_eq!(
744            config.streaming_parallelism_for_sink(),
745            ConfigParallelism::Default
746        );
747        assert_eq!(
748            config.streaming_parallelism_for_index(),
749            ConfigParallelism::Default
750        );
751        assert_eq!(
752            config.streaming_parallelism_for_materialized_view(),
753            ConfigParallelism::Default
754        );
755    }
756
757    #[test]
758    fn test_streaming_parallelism_default_round_trip() {
759        let mut config = SessionConfig::default();
760
761        assert_eq!(config.get("streaming_parallelism").unwrap(), "default");
762        assert_eq!(
763            config.get("streaming_parallelism_for_table").unwrap(),
764            "default"
765        );
766        assert_eq!(
767            config.get("streaming_parallelism_for_source").unwrap(),
768            "default"
769        );
770
771        config
772            .set("streaming_parallelism", "default".to_owned(), &mut ())
773            .unwrap();
774        assert_eq!(config.get("streaming_parallelism").unwrap(), "default");
775
776        config
777            .set("streaming_parallelism", "bounded(16)".to_owned(), &mut ())
778            .unwrap();
779        config
780            .set(
781                "streaming_parallelism_for_table",
782                "bounded(8)".to_owned(),
783                &mut (),
784            )
785            .unwrap();
786        config
787            .set(
788                "streaming_parallelism_for_source",
789                "bounded(8)".to_owned(),
790                &mut (),
791            )
792            .unwrap();
793
794        assert_eq!(
795            config.reset("streaming_parallelism", &mut ()).unwrap(),
796            "default"
797        );
798        assert_eq!(
799            config
800                .reset("streaming_parallelism_for_table", &mut ())
801                .unwrap(),
802            "default"
803        );
804        assert_eq!(
805            config
806                .reset("streaming_parallelism_for_source", &mut ())
807                .unwrap(),
808            "default"
809        );
810    }
811    #[test]
812    fn test_streaming_parallelism_for_backfill_accepts_default_and_fixed() {
813        let mut config = SessionConfig::default();
814
815        config
816            .set(
817                "streaming_parallelism_for_backfill",
818                "default".to_owned(),
819                &mut (),
820            )
821            .unwrap();
822        assert_eq!(
823            config.get("streaming_parallelism_for_backfill").unwrap(),
824            "default"
825        );
826
827        config
828            .set(
829                "streaming_parallelism_for_backfill",
830                "2".to_owned(),
831                &mut (),
832            )
833            .unwrap();
834        assert_eq!(config.streaming_parallelism_for_backfill().to_string(), "2");
835    }
836
837    #[test]
838    fn test_streaming_parallelism_for_backfill_rejects_adaptive_modes() {
839        let mut config = SessionConfig::default();
840        let expected = "Only `default` or fixed backfill parallelism is supported here; adaptive backfill strategy is deferred to a later change.";
841
842        for value in ["adaptive", "bounded(2)", "ratio(0.5)"] {
843            let err = config
844                .set(
845                    "streaming_parallelism_for_backfill",
846                    value.to_owned(),
847                    &mut (),
848                )
849                .unwrap_err();
850
851            match err {
852                SessionConfigError::InvalidValue {
853                    entry,
854                    value: actual_value,
855                    source,
856                } => {
857                    assert_eq!(entry, "streaming_parallelism_for_backfill");
858                    assert_eq!(actual_value, value);
859                    assert_eq!(source.to_string(), expected);
860                }
861                other => panic!("unexpected error: {other:?}"),
862            }
863        }
864    }
865}