risingwave_common/session_config/
mod.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod non_zero64;
16mod opt;
17pub mod parallelism;
18mod query_mode;
19mod search_path;
20pub mod sink_decouple;
21mod statement_timeout;
22mod transaction_isolation_level;
23mod visibility_mode;
24
25use chrono_tz::Tz;
26use itertools::Itertools;
27pub use opt::OptionConfig;
28pub use query_mode::QueryMode;
29use risingwave_common_proc_macro::{ConfigDoc, SessionConfig};
30pub use search_path::{SearchPath, USER_NAME_WILD_CARD};
31use serde::{Deserialize, Serialize};
32pub use statement_timeout::StatementTimeout;
33use thiserror::Error;
34
35use self::non_zero64::ConfigNonZeroU64;
36use crate::config::mutate::TomlTableMutateExt;
37use crate::config::streaming::{JoinEncodingType, OverWindowCachePolicy};
38use crate::config::{ConfigMergeError, StreamingConfig, merge_streaming_config_section};
39use crate::hash::VirtualNode;
40use crate::session_config::parallelism::{ConfigAdaptiveParallelismStrategy, ConfigParallelism};
41use crate::session_config::sink_decouple::SinkDecouple;
42use crate::session_config::transaction_isolation_level::IsolationLevel;
43pub use crate::session_config::visibility_mode::VisibilityMode;
44use crate::{PG_VERSION, SERVER_ENCODING, SERVER_VERSION_NUM, STANDARD_CONFORMING_STRINGS};
45
46pub const SESSION_CONFIG_LIST_SEP: &str = ", ";
47
48#[derive(Error, Debug)]
49pub enum SessionConfigError {
50    #[error("Invalid value `{value}` for `{entry}`")]
51    InvalidValue {
52        entry: &'static str,
53        value: String,
54        source: anyhow::Error,
55    },
56
57    #[error("Unrecognized config entry `{0}`")]
58    UnrecognizedEntry(String),
59}
60
61type SessionConfigResult<T> = std::result::Result<T, SessionConfigError>;
62
63// NOTE(kwannoel): We declare it separately as a constant,
64// otherwise seems like it can't infer the type of -1 when written inline.
65const DISABLE_BACKFILL_RATE_LIMIT: i32 = -1;
66const DISABLE_SOURCE_RATE_LIMIT: i32 = -1;
67const DISABLE_DML_RATE_LIMIT: i32 = -1;
68const DISABLE_SINK_RATE_LIMIT: i32 = -1;
69
70/// Default parallelism bound for tables
71const DEFAULT_TABLE_PARALLELISM_BOUND: std::num::NonZeroU64 = std::num::NonZeroU64::new(4).unwrap();
72
73/// Default parallelism bound for sources
74const DEFAULT_SOURCE_PARALLELISM_BOUND: std::num::NonZeroU64 =
75    std::num::NonZeroU64::new(4).unwrap();
76
77/// Default to bypass cluster limits iff in debug mode.
78const BYPASS_CLUSTER_LIMITS: bool = cfg!(debug_assertions);
79
80/// This is the Session Config of RisingWave.
81///
82/// All config entries implement `Display` and `FromStr` for getter and setter, to be read and
83/// altered within a session.
84///
85/// Users can change the default value of a configuration entry using `ALTER SYSTEM SET`. To
86/// facilitate this, a `serde` implementation is used as the wire format for retrieving initial
87/// configurations and updates from the meta service. It's important to note that the meta
88/// service stores the overridden value of each configuration entry per row with `Display` in
89/// the meta store, rather than using the `serde` format. However, we still delegate the `serde`
90/// impl of all fields to `Display`/`FromStr` to make it consistent.
91#[serde_with::apply(_ => #[serde_as(as = "serde_with::DisplayFromStr")] )]
92#[serde_with::serde_as]
93#[derive(Clone, Debug, Deserialize, Serialize, SessionConfig, ConfigDoc, PartialEq)]
94pub struct SessionConfig {
95    /// If `RW_IMPLICIT_FLUSH` is on, then every INSERT/UPDATE/DELETE statement will block
96    /// until the entire dataflow is refreshed. In other words, every related table & MV will
97    /// be able to see the write.
98    #[parameter(default = false, alias = "rw_implicit_flush")]
99    implicit_flush: bool,
100
101    /// If `CREATE_COMPACTION_GROUP_FOR_MV` is on, dedicated compaction groups will be created in
102    /// MV creation.
103    #[parameter(default = false)]
104    create_compaction_group_for_mv: bool,
105
106    /// A temporary config variable to force query running in either local or distributed mode.
107    /// The default value is auto which means let the system decide to run batch queries in local
108    /// or distributed mode automatically.
109    #[parameter(default = QueryMode::default())]
110    query_mode: QueryMode,
111
112    /// Sets the number of digits displayed for floating-point values.
113    /// See <https://www.postgresql.org/docs/current/runtime-config-client.html#:~:text=for%20more%20information.-,extra_float_digits,-(integer)>
114    #[parameter(default = 1)]
115    extra_float_digits: i32,
116
117    /// Sets the application name to be reported in statistics and logs.
118    /// See <https://www.postgresql.org/docs/14/runtime-config-logging.html#:~:text=What%20to%20Log-,application_name,-(string)>
119    #[parameter(default = "", flags = "REPORT")]
120    application_name: String,
121
122    /// It is typically set by an application upon connection to the server.
123    /// see <https://www.postgresql.org/docs/current/runtime-config-client.html#GUC-DATESTYLE>
124    #[parameter(default = "", rename = "datestyle")]
125    date_style: String,
126
127    /// Force the use of lookup join instead of hash join when possible for local batch execution.
128    #[parameter(default = true, alias = "rw_batch_enable_lookup_join")]
129    batch_enable_lookup_join: bool,
130
131    /// Enable usage of sortAgg instead of hash agg when order property is satisfied in batch
132    /// execution
133    #[parameter(default = true, alias = "rw_batch_enable_sort_agg")]
134    batch_enable_sort_agg: bool,
135
136    /// Enable distributed DML, so an insert, delete, and update statement can be executed in a distributed way (e.g. running in multiple compute nodes).
137    /// No atomicity guarantee in this mode. Its goal is to gain the best ingestion performance for initial batch ingestion where users always can drop their table when failure happens.
138    #[parameter(default = false, rename = "batch_enable_distributed_dml")]
139    batch_enable_distributed_dml: bool,
140
141    /// Evaluate expression in strict mode for batch queries.
142    /// If set to false, an expression failure will not cause an error but leave a null value
143    /// on the result set.
144    #[parameter(default = true)]
145    batch_expr_strict_mode: bool,
146
147    /// The max gap allowed to transform small range scan into multi point lookup.
148    #[parameter(default = 8)]
149    max_split_range_gap: i32,
150
151    /// Sets the order in which schemas are searched when an object (table, data type, function, etc.)
152    /// is referenced by a simple name with no schema specified.
153    /// See <https://www.postgresql.org/docs/14/runtime-config-client.html#GUC-SEARCH-PATH>
154    #[parameter(default = SearchPath::default())]
155    search_path: SearchPath,
156
157    /// If `VISIBILITY_MODE` is all, we will support querying data without checkpoint.
158    #[parameter(default = VisibilityMode::default())]
159    visibility_mode: VisibilityMode,
160
161    /// See <https://www.postgresql.org/docs/current/transaction-iso.html>
162    #[parameter(default = IsolationLevel::default())]
163    transaction_isolation: IsolationLevel,
164
165    /// Select as of specific epoch.
166    /// Sets the historical epoch for querying data. If 0, querying latest data.
167    #[parameter(default = ConfigNonZeroU64::default())]
168    query_epoch: ConfigNonZeroU64,
169
170    /// Session timezone. Defaults to UTC.
171    #[parameter(default = "UTC", check_hook = check_timezone)]
172    timezone: String,
173
174    /// The execution parallelism for streaming queries, including tables, materialized views, indexes,
175    /// and sinks. Defaults to 0, which means they will be scheduled adaptively based on the cluster size.
176    ///
177    /// If a non-zero value is set, streaming queries will be scheduled to use a fixed number of parallelism.
178    /// Note that the value will be bounded at `STREAMING_MAX_PARALLELISM`.
179    #[parameter(default = ConfigParallelism::default())]
180    streaming_parallelism: ConfigParallelism,
181
182    /// Specific parallelism for backfill. By default, it will fall back to `STREAMING_PARALLELISM`.
183    #[parameter(default = ConfigParallelism::default())]
184    streaming_parallelism_for_backfill: ConfigParallelism,
185
186    /// The adaptive parallelism strategy for streaming jobs. Defaults to `default`, which follows the system setting.
187    #[parameter(default = ConfigAdaptiveParallelismStrategy::default())]
188    streaming_parallelism_strategy: ConfigAdaptiveParallelismStrategy,
189
190    /// Specific adaptive parallelism strategy for table. Defaults to `BOUNDED(4)`.
191    #[parameter(default = ConfigAdaptiveParallelismStrategy::Bounded(DEFAULT_TABLE_PARALLELISM_BOUND))]
192    streaming_parallelism_strategy_for_table: ConfigAdaptiveParallelismStrategy,
193
194    /// Specific parallelism for table. By default, it will fall back to `STREAMING_PARALLELISM`.
195    #[parameter(default = ConfigParallelism::default())]
196    streaming_parallelism_for_table: ConfigParallelism,
197
198    /// Specific adaptive parallelism strategy for sink. Falls back to `STREAMING_PARALLELISM_STRATEGY`.
199    #[parameter(default = ConfigAdaptiveParallelismStrategy::default())]
200    streaming_parallelism_strategy_for_sink: ConfigAdaptiveParallelismStrategy,
201
202    /// Specific parallelism for sink. By default, it will fall back to `STREAMING_PARALLELISM`.
203    #[parameter(default = ConfigParallelism::default())]
204    streaming_parallelism_for_sink: ConfigParallelism,
205
206    /// Specific adaptive parallelism strategy for index. Falls back to `STREAMING_PARALLELISM_STRATEGY`.
207    #[parameter(default = ConfigAdaptiveParallelismStrategy::default())]
208    streaming_parallelism_strategy_for_index: ConfigAdaptiveParallelismStrategy,
209
210    /// Specific parallelism for index. By default, it will fall back to `STREAMING_PARALLELISM`.
211    #[parameter(default = ConfigParallelism::default())]
212    streaming_parallelism_for_index: ConfigParallelism,
213
214    /// Specific adaptive parallelism strategy for source. Defaults to `BOUNDED(4)`.
215    #[parameter(default = ConfigAdaptiveParallelismStrategy::Bounded(DEFAULT_SOURCE_PARALLELISM_BOUND))]
216    streaming_parallelism_strategy_for_source: ConfigAdaptiveParallelismStrategy,
217
218    /// Specific parallelism for source. By default, it will fall back to `STREAMING_PARALLELISM`.
219    #[parameter(default = ConfigParallelism::default())]
220    streaming_parallelism_for_source: ConfigParallelism,
221
222    /// Specific adaptive parallelism strategy for materialized view. Falls back to `STREAMING_PARALLELISM_STRATEGY`.
223    #[parameter(default = ConfigAdaptiveParallelismStrategy::default())]
224    streaming_parallelism_strategy_for_materialized_view: ConfigAdaptiveParallelismStrategy,
225
226    /// Specific parallelism for materialized view. By default, it will fall back to `STREAMING_PARALLELISM`.
227    #[parameter(default = ConfigParallelism::default())]
228    streaming_parallelism_for_materialized_view: ConfigParallelism,
229
230    /// Enable delta join for streaming queries. Defaults to false.
231    #[parameter(default = false, alias = "rw_streaming_enable_delta_join")]
232    streaming_enable_delta_join: bool,
233
234    /// Enable bushy join for streaming queries. Defaults to true.
235    #[parameter(default = true, alias = "rw_streaming_enable_bushy_join")]
236    streaming_enable_bushy_join: bool,
237
238    /// Force filtering to be done inside the join whenever there's a choice between optimizations.
239    /// Defaults to false.
240    #[parameter(default = false, alias = "rw_streaming_force_filter_inside_join")]
241    streaming_force_filter_inside_join: bool,
242
243    /// Enable arrangement backfill for streaming queries. Defaults to true.
244    /// When set to true, the parallelism of the upstream fragment will be
245    /// decoupled from the parallelism of the downstream scan fragment.
246    /// Or more generally, the parallelism of the upstream table / index / mv
247    /// will be decoupled from the parallelism of the downstream table / index / mv / sink.
248    #[parameter(default = true)]
249    streaming_use_arrangement_backfill: bool,
250
251    #[parameter(default = true)]
252    streaming_use_snapshot_backfill: bool,
253
254    /// Enable serverless backfill for streaming queries. Defaults to false.
255    #[parameter(default = false)]
256    enable_serverless_backfill: bool,
257
258    /// Allow `jsonb` in stream key
259    #[parameter(default = false, alias = "rw_streaming_allow_jsonb_in_stream_key")]
260    streaming_allow_jsonb_in_stream_key: bool,
261
262    /// Unsafe: allow impure expressions on non-append-only streams without materialization.
263    ///
264    /// This may lead to inconsistent results or panics due to re-evaluation on updates/retracts.
265    #[parameter(default = false)]
266    streaming_unsafe_allow_unmaterialized_impure_expr: bool,
267
268    /// Separate consecutive `StreamHashJoin` by no-shuffle `StreamExchange`
269    #[parameter(default = false)]
270    streaming_separate_consecutive_join: bool,
271
272    /// Separate `StreamSink` by no-shuffle `StreamExchange`
273    #[parameter(default = false)]
274    streaming_separate_sink: bool,
275
276    /// Determine which encoding will be used to encode join rows in operator cache.
277    ///
278    /// This overrides the corresponding entry from the `[streaming.developer]` section in the config file,
279    /// taking effect for new streaming jobs created in the current session.
280    #[parameter(default = None)]
281    streaming_join_encoding: OptionConfig<JoinEncodingType>,
282
283    /// Enable join ordering for streaming and batch queries. Defaults to true.
284    #[parameter(default = true, alias = "rw_enable_join_ordering")]
285    enable_join_ordering: bool,
286
287    /// Enable two phase agg optimization. Defaults to true.
288    /// Setting this to true will always set `FORCE_TWO_PHASE_AGG` to false.
289    #[parameter(default = true, flags = "SETTER", alias = "rw_enable_two_phase_agg")]
290    enable_two_phase_agg: bool,
291
292    /// Force two phase agg optimization whenever there's a choice between
293    /// optimizations. Defaults to false.
294    /// Setting this to true will always set `ENABLE_TWO_PHASE_AGG` to false.
295    #[parameter(default = false, flags = "SETTER", alias = "rw_force_two_phase_agg")]
296    force_two_phase_agg: bool,
297
298    /// Enable sharing of common sub-plans.
299    /// This means that DAG structured query plans can be constructed,
300    #[parameter(default = true, alias = "rw_enable_share_plan")]
301    /// rather than only tree structured query plans.
302    enable_share_plan: bool,
303
304    /// Enable split distinct agg
305    #[parameter(default = false, alias = "rw_force_split_distinct_agg")]
306    force_split_distinct_agg: bool,
307
308    /// See <https://www.postgresql.org/docs/current/runtime-config-client.html#GUC-INTERVALSTYLE>
309    #[parameter(default = "", rename = "intervalstyle")]
310    interval_style: String,
311
312    /// If `BATCH_PARALLELISM` is non-zero, batch queries will use this parallelism.
313    #[parameter(default = ConfigNonZeroU64::default())]
314    batch_parallelism: ConfigNonZeroU64,
315
316    /// The version of PostgreSQL that Risingwave claims to be.
317    #[parameter(default = PG_VERSION)]
318    server_version: String,
319
320    /// The version of PostgreSQL that Risingwave claims to be.
321    #[parameter(default = SERVER_VERSION_NUM)]
322    server_version_num: i32,
323
324    /// see <https://www.postgresql.org/docs/15/runtime-config-client.html#GUC-CLIENT-MIN-MESSAGES>
325    #[parameter(default = "notice")]
326    client_min_messages: String,
327
328    /// see <https://www.postgresql.org/docs/15/runtime-config-client.html#GUC-CLIENT-ENCODING>
329    #[parameter(default = SERVER_ENCODING, check_hook = check_client_encoding)]
330    client_encoding: String,
331
332    /// Enable decoupling sink and internal streaming graph or not
333    #[parameter(default = SinkDecouple::default())]
334    sink_decouple: SinkDecouple,
335
336    /// See <https://www.postgresql.org/docs/current/runtime-config-compatible.html#RUNTIME-CONFIG-COMPATIBLE-VERSION>
337    /// Unused in RisingWave, support for compatibility.
338    #[parameter(default = false)]
339    synchronize_seqscans: bool,
340
341    /// Abort query statement that takes more than the specified amount of time in sec. If
342    /// `log_min_error_statement` is set to ERROR or lower, the statement that timed out will also be
343    /// logged. If this value is specified without units, it is taken as milliseconds. A value of
344    /// zero (the default) disables the timeout.
345    #[parameter(default = StatementTimeout::default())]
346    statement_timeout: StatementTimeout,
347
348    /// Terminate any session that has been idle (that is, waiting for a client query) within an open transaction for longer than the specified amount of time in milliseconds.
349    #[parameter(default = 60000u32)]
350    idle_in_transaction_session_timeout: u32,
351
352    /// See <https://www.postgresql.org/docs/current/runtime-config-client.html#GUC-LOCK-TIMEOUT>
353    /// Unused in RisingWave, support for compatibility.
354    #[parameter(default = 0)]
355    lock_timeout: i32,
356
357    /// For limiting the startup time of a shareable CDC streaming source when the source is being created. Unit: seconds.
358    #[parameter(default = 60)]
359    cdc_source_wait_streaming_start_timeout: i32,
360
361    /// see <https://www.postgresql.org/docs/current/runtime-config-client.html#GUC-ROW-SECURITY>.
362    /// Unused in RisingWave, support for compatibility.
363    #[parameter(default = true)]
364    row_security: bool,
365
366    /// see <https://www.postgresql.org/docs/current/runtime-config-client.html#GUC-STANDARD-CONFORMING-STRINGS>
367    #[parameter(default = STANDARD_CONFORMING_STRINGS)]
368    standard_conforming_strings: String,
369
370    /// Set streaming rate limit (rows per second) for each parallelism for mv / source / sink backfilling
371    /// If set to -1, disable rate limit.
372    /// If set to 0, this pauses the snapshot read / source read.
373    #[parameter(default = DISABLE_BACKFILL_RATE_LIMIT)]
374    backfill_rate_limit: i32,
375
376    /// Set streaming rate limit (rows per second) for each parallelism for mv / source backfilling, source reads.
377    /// If set to -1, disable rate limit.
378    /// If set to 0, this pauses the snapshot read / source read.
379    #[parameter(default = DISABLE_SOURCE_RATE_LIMIT)]
380    source_rate_limit: i32,
381
382    /// Set streaming rate limit (rows per second) for each parallelism for table DML.
383    /// If set to -1, disable rate limit.
384    /// If set to 0, this pauses the DML.
385    #[parameter(default = DISABLE_DML_RATE_LIMIT)]
386    dml_rate_limit: i32,
387
388    /// Set sink rate limit (rows per second) for each parallelism for external sink.
389    /// If set to -1, disable rate limit.
390    /// If set to 0, this pauses the sink.
391    #[parameter(default = DISABLE_SINK_RATE_LIMIT)]
392    sink_rate_limit: i32,
393
394    /// Cache policy for partition cache in streaming over window.
395    /// Can be `full`, `recent`, `recent_first_n` or `recent_last_n`.
396    ///
397    /// This overrides the corresponding entry from the `[streaming.developer]` section in the config file,
398    /// taking effect for new streaming jobs created in the current session.
399    #[parameter(default = None, alias = "rw_streaming_over_window_cache_policy")]
400    streaming_over_window_cache_policy: OptionConfig<OverWindowCachePolicy>,
401
402    /// Run DDL statements in background
403    #[parameter(default = false)]
404    background_ddl: bool,
405
406    /// Enable shared source. Currently only for Kafka.
407    ///
408    /// When enabled, `CREATE SOURCE` will create a source streaming job, and `CREATE MATERIALIZED VIEWS` from the source
409    /// will forward the data from the same source streaming job, and also backfill prior data from the external source.
410    #[parameter(default = true)]
411    streaming_use_shared_source: bool,
412
413    /// Shows the server-side character set encoding. At present, this parameter can be shown but not set, because the encoding is determined at database creation time.
414    #[parameter(default = SERVER_ENCODING)]
415    server_encoding: String,
416
417    #[parameter(default = "hex", check_hook = check_bytea_output)]
418    bytea_output: String,
419
420    /// Bypass checks on cluster limits
421    ///
422    /// When enabled, `CREATE MATERIALIZED VIEW` will not fail if the cluster limit is hit.
423    #[parameter(default = BYPASS_CLUSTER_LIMITS)]
424    bypass_cluster_limits: bool,
425
426    /// The maximum number of parallelism a streaming query can use. Defaults to 256.
427    ///
428    /// Compared to `STREAMING_PARALLELISM`, which configures the initial parallelism, this configures
429    /// the maximum parallelism a streaming query can use in the future, if the cluster size changes or
430    /// users manually change the parallelism with `ALTER .. SET PARALLELISM`.
431    ///
432    /// It's not always a good idea to set this to a very large number, as it may cause performance
433    /// degradation when performing range scans on the table or the materialized view.
434    // a.k.a. vnode count
435    #[parameter(default = VirtualNode::COUNT_FOR_COMPAT, check_hook = check_streaming_max_parallelism)]
436    streaming_max_parallelism: usize,
437
438    /// Used to provide the connection information for the iceberg engine.
439    /// Format: `iceberg_engine_connection` = `schema_name.connection_name`.
440    #[parameter(default = "", check_hook = check_iceberg_engine_connection)]
441    iceberg_engine_connection: String,
442
443    /// Whether the streaming join should be unaligned or not.
444    #[parameter(default = false)]
445    streaming_enable_unaligned_join: bool,
446
447    /// The timeout for reading from the buffer of the sync log store on barrier.
448    /// Every epoch we will attempt to read the full buffer of the sync log store.
449    /// If we hit the timeout, we will stop reading and continue.
450    ///
451    /// This overrides the corresponding entry from the `[streaming.developer]` section in the config file,
452    /// taking effect for new streaming jobs created in the current session.
453    #[parameter(default = None)]
454    streaming_sync_log_store_pause_duration_ms: OptionConfig<usize>,
455
456    /// The max buffer size for sync logstore, before we start flushing.
457    ///
458    /// This overrides the corresponding entry from the `[streaming.developer]` section in the config file,
459    /// taking effect for new streaming jobs created in the current session.
460    #[parameter(default = None)]
461    streaming_sync_log_store_buffer_size: OptionConfig<usize>,
462
463    /// Whether to disable purifying the definition of the table or source upon retrieval.
464    /// Only set this if encountering issues with functionalities like `SHOW` or `ALTER TABLE/SOURCE`.
465    /// This config may be removed in the future.
466    #[parameter(default = false, flags = "NO_ALTER_SYS")]
467    disable_purify_definition: bool,
468
469    /// The `ef_search` used in querying hnsw vector index
470    #[parameter(default = 40_usize)] // default value borrowed from pg_vector
471    batch_hnsw_ef_search: usize,
472
473    /// Enable index selection for queries
474    #[parameter(default = true)]
475    enable_index_selection: bool,
476
477    /// Enable mv selection for queries
478    #[parameter(default = false)]
479    enable_mv_selection: bool,
480
481    /// Enable locality backfill for streaming queries. Defaults to false.
482    #[parameter(default = false)]
483    enable_locality_backfill: bool,
484
485    /// Duration in seconds before notifying the user that a long-running DDL operation (e.g., DROP TABLE, CANCEL JOBS)
486    /// is still running. Set to 0 to disable notifications. Defaults to 30 seconds.
487    #[parameter(default = 30u32)]
488    slow_ddl_notification_secs: u32,
489
490    /// Unsafe: Enable storage retention for non-append-only tables.
491    /// Enabling this can lead to streaming inconsistency and node panic
492    /// if there is any row INSERT/UPDATE/DELETE operation corresponding to the ttled primary key.
493    #[parameter(default = false)]
494    unsafe_enable_storage_retention_for_non_append_only_tables: bool,
495
496    /// Enable DataFusion Engine
497    /// When enabled, queries involving Iceberg tables will be executed using the DataFusion engine.
498    #[parameter(default = false)]
499    enable_datafusion_engine: bool,
500
501    /// Prefer hash join over sort merge join in DataFusion engine
502    /// When enabled, the DataFusion engine will prioritize hash joins for query execution plans,
503    /// potentially improving performance for certain workloads, but may cause OOM for large datasets.
504    #[parameter(default = true)]
505    datafusion_prefer_hash_join: bool,
506
507    /// Emit chunks in upsert format for `UPDATE` and `DELETE` DMLs.
508    /// May lead to undefined behavior if the table is created with `ON CONFLICT DO NOTHING`.
509    ///
510    /// When enabled:
511    /// - `UPDATE` will only emit `Insert` records for new rows, instead of `Update` records.
512    /// - `DELETE` will only include key columns and pad the rest with NULL, instead of emitting complete rows.
513    #[parameter(default = false)]
514    upsert_dml: bool,
515}
516
517fn check_iceberg_engine_connection(val: &str) -> Result<(), String> {
518    if val.is_empty() {
519        return Ok(());
520    }
521
522    let parts: Vec<&str> = val.split('.').collect();
523    if parts.len() != 2 {
524        return Err("Invalid iceberg engine connection format, Should be set to this format: schema_name.connection_name.".to_owned());
525    }
526
527    Ok(())
528}
529
530fn check_timezone(val: &str) -> Result<(), String> {
531    // Check if the provided string is a valid timezone.
532    Tz::from_str_insensitive(val).map_err(|_e| "Not a valid timezone")?;
533    Ok(())
534}
535
536fn check_client_encoding(val: &str) -> Result<(), String> {
537    // https://github.com/postgres/postgres/blob/REL_15_3/src/common/encnames.c#L525
538    let clean = val.replace(|c: char| !c.is_ascii_alphanumeric(), "");
539    if !clean.eq_ignore_ascii_case("UTF8") {
540        Err("Only support 'UTF8' for CLIENT_ENCODING".to_owned())
541    } else {
542        Ok(())
543    }
544}
545
546fn check_bytea_output(val: &str) -> Result<(), String> {
547    if val == "hex" {
548        Ok(())
549    } else {
550        Err("Only support 'hex' for BYTEA_OUTPUT".to_owned())
551    }
552}
553
554/// Check if the provided value is a valid max parallelism.
555fn check_streaming_max_parallelism(val: &usize) -> Result<(), String> {
556    match val {
557        // TODO(var-vnode): this is to prevent confusion with singletons, after we distinguish
558        // them better, we may allow 1 as the max parallelism (though not much point).
559        0 | 1 => Err("STREAMING_MAX_PARALLELISM must be greater than 1".to_owned()),
560        2..=VirtualNode::MAX_COUNT => Ok(()),
561        _ => Err(format!(
562            "STREAMING_MAX_PARALLELISM must be less than or equal to {}",
563            VirtualNode::MAX_COUNT
564        )),
565    }
566}
567
568impl SessionConfig {
569    pub fn set_force_two_phase_agg(
570        &mut self,
571        val: bool,
572        reporter: &mut impl ConfigReporter,
573    ) -> SessionConfigResult<bool> {
574        let set_val = self.set_force_two_phase_agg_inner(val, reporter)?;
575        if self.force_two_phase_agg {
576            self.set_enable_two_phase_agg(true, reporter)
577        } else {
578            Ok(set_val)
579        }
580    }
581
582    pub fn set_enable_two_phase_agg(
583        &mut self,
584        val: bool,
585        reporter: &mut impl ConfigReporter,
586    ) -> SessionConfigResult<bool> {
587        let set_val = self.set_enable_two_phase_agg_inner(val, reporter)?;
588        if !self.force_two_phase_agg {
589            self.set_force_two_phase_agg(false, reporter)
590        } else {
591            Ok(set_val)
592        }
593    }
594}
595
596pub struct VariableInfo {
597    pub name: String,
598    pub setting: String,
599    pub description: String,
600}
601
602/// Report status or notice to caller.
603pub trait ConfigReporter {
604    fn report_status(&mut self, key: &str, new_val: String);
605}
606
607// Report nothing.
608impl ConfigReporter for () {
609    fn report_status(&mut self, _key: &str, _new_val: String) {}
610}
611
612def_anyhow_newtype! {
613    pub SessionConfigToOverrideError,
614    toml::ser::Error => "failed to serialize session config",
615    ConfigMergeError => transparent,
616}
617
618impl SessionConfig {
619    /// Generate an initial override for the streaming config from the session config.
620    pub fn to_initial_streaming_config_override(
621        &self,
622    ) -> Result<String, SessionConfigToOverrideError> {
623        let mut table = toml::Table::new();
624
625        // TODO: make this more type safe.
626        // We `unwrap` here to assert the hard-coded keys are correct.
627        if let Some(v) = self.streaming_join_encoding.as_ref() {
628            table
629                .upsert("streaming.developer.join_encoding_type", v)
630                .unwrap();
631        }
632        if let Some(v) = self.streaming_sync_log_store_pause_duration_ms.as_ref() {
633            table
634                .upsert("streaming.developer.sync_log_store_pause_duration_ms", v)
635                .unwrap();
636        }
637        if let Some(v) = self.streaming_sync_log_store_buffer_size.as_ref() {
638            table
639                .upsert("streaming.developer.sync_log_store_buffer_size", v)
640                .unwrap();
641        }
642        if let Some(v) = self.streaming_over_window_cache_policy.as_ref() {
643            table
644                .upsert("streaming.developer.over_window_cache_policy", v)
645                .unwrap();
646        }
647
648        let res = toml::to_string(&table)?;
649
650        // Validate all fields are valid by trying to merge it to the default config.
651        if !res.is_empty() {
652            let merged =
653                merge_streaming_config_section(&StreamingConfig::default(), res.as_str())?.unwrap();
654
655            let unrecognized_keys = merged.unrecognized_keys().collect_vec();
656            if !unrecognized_keys.is_empty() {
657                bail!("unrecognized configs: {:?}", unrecognized_keys);
658            }
659        }
660
661        Ok(res)
662    }
663}
664
665#[cfg(test)]
666mod test {
667    use expect_test::expect;
668
669    use super::*;
670
671    #[derive(SessionConfig)]
672    struct TestConfig {
673        #[parameter(default = 1, flags = "NO_ALTER_SYS", alias = "test_param_alias" | "alias_param_test")]
674        test_param: i32,
675    }
676
677    #[test]
678    fn test_session_config_alias() {
679        let mut config = TestConfig::default();
680        config.set("test_param", "2".to_owned(), &mut ()).unwrap();
681        assert_eq!(config.get("test_param_alias").unwrap(), "2");
682        config
683            .set("alias_param_test", "3".to_owned(), &mut ())
684            .unwrap();
685        assert_eq!(config.get("test_param_alias").unwrap(), "3");
686        assert!(TestConfig::check_no_alter_sys("test_param").unwrap());
687    }
688
689    #[test]
690    fn test_initial_streaming_config_override() {
691        let mut config = SessionConfig::default();
692        config
693            .set_streaming_join_encoding(Some(JoinEncodingType::Cpu).into(), &mut ())
694            .unwrap();
695        config
696            .set_streaming_over_window_cache_policy(
697                Some(OverWindowCachePolicy::RecentFirstN).into(),
698                &mut (),
699            )
700            .unwrap();
701
702        // Check the converted config override string.
703        let override_str = config.to_initial_streaming_config_override().unwrap();
704        expect![[r#"
705            [streaming.developer]
706            join_encoding_type = "cpu_optimized"
707            over_window_cache_policy = "recent_first_n"
708        "#]]
709        .assert_eq(&override_str);
710
711        // Try merging it to the default streaming config.
712        let merged = merge_streaming_config_section(&StreamingConfig::default(), &override_str)
713            .unwrap()
714            .unwrap();
715        assert_eq!(merged.developer.join_encoding_type, JoinEncodingType::Cpu);
716        assert_eq!(
717            merged.developer.over_window_cache_policy,
718            OverWindowCachePolicy::RecentFirstN
719        );
720    }
721}