risingwave_common/session_config/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod join_encoding_type;
16mod non_zero64;
17mod over_window;
18pub mod parallelism;
19mod query_mode;
20mod search_path;
21pub mod sink_decouple;
22mod transaction_isolation_level;
23mod visibility_mode;
24
25use chrono_tz::Tz;
26pub use over_window::OverWindowCachePolicy;
27pub use query_mode::QueryMode;
28use risingwave_common_proc_macro::{ConfigDoc, SessionConfig};
29pub use search_path::{SearchPath, USER_NAME_WILD_CARD};
30use serde::{Deserialize, Serialize};
31use serde_with::{DisplayFromStr, serde_as};
32use thiserror::Error;
33
34use self::non_zero64::ConfigNonZeroU64;
35use crate::hash::VirtualNode;
36use crate::session_config::join_encoding_type::JoinEncodingType;
37use crate::session_config::parallelism::ConfigParallelism;
38use crate::session_config::sink_decouple::SinkDecouple;
39use crate::session_config::transaction_isolation_level::IsolationLevel;
40pub use crate::session_config::visibility_mode::VisibilityMode;
41use crate::{PG_VERSION, SERVER_ENCODING, SERVER_VERSION_NUM, STANDARD_CONFORMING_STRINGS};
42
43pub const SESSION_CONFIG_LIST_SEP: &str = ", ";
44
45#[derive(Error, Debug)]
46pub enum SessionConfigError {
47    #[error("Invalid value `{value}` for `{entry}`")]
48    InvalidValue {
49        entry: &'static str,
50        value: String,
51        source: anyhow::Error,
52    },
53
54    #[error("Unrecognized config entry `{0}`")]
55    UnrecognizedEntry(String),
56}
57
58type SessionConfigResult<T> = std::result::Result<T, SessionConfigError>;
59
60// NOTE(kwannoel): We declare it separately as a constant,
61// otherwise seems like it can't infer the type of -1 when written inline.
62const DISABLE_BACKFILL_RATE_LIMIT: i32 = -1;
63const DISABLE_SOURCE_RATE_LIMIT: i32 = -1;
64const DISABLE_DML_RATE_LIMIT: i32 = -1;
65const DISABLE_SINK_RATE_LIMIT: i32 = -1;
66
67/// Default to bypass cluster limits iff in debug mode.
68const BYPASS_CLUSTER_LIMITS: bool = cfg!(debug_assertions);
69
70#[serde_as]
71/// This is the Session Config of RisingWave.
72#[derive(Clone, Debug, Deserialize, Serialize, SessionConfig, ConfigDoc, PartialEq)]
73pub struct SessionConfig {
74    /// If `RW_IMPLICIT_FLUSH` is on, then every INSERT/UPDATE/DELETE statement will block
75    /// until the entire dataflow is refreshed. In other words, every related table & MV will
76    /// be able to see the write.
77    #[parameter(default = false, alias = "rw_implicit_flush")]
78    implicit_flush: bool,
79
80    /// If `CREATE_COMPACTION_GROUP_FOR_MV` is on, dedicated compaction groups will be created in
81    /// MV creation.
82    #[parameter(default = false)]
83    create_compaction_group_for_mv: bool,
84
85    /// A temporary config variable to force query running in either local or distributed mode.
86    /// The default value is auto which means let the system decide to run batch queries in local
87    /// or distributed mode automatically.
88    #[serde_as(as = "DisplayFromStr")]
89    #[parameter(default = QueryMode::default())]
90    query_mode: QueryMode,
91
92    /// Sets the number of digits displayed for floating-point values.
93    /// See <https://www.postgresql.org/docs/current/runtime-config-client.html#:~:text=for%20more%20information.-,extra_float_digits,-(integer)>
94    #[parameter(default = 1)]
95    extra_float_digits: i32,
96
97    /// Sets the application name to be reported in statistics and logs.
98    /// See <https://www.postgresql.org/docs/14/runtime-config-logging.html#:~:text=What%20to%20Log-,application_name,-(string)>
99    #[parameter(default = "", flags = "REPORT")]
100    application_name: String,
101
102    /// It is typically set by an application upon connection to the server.
103    /// see <https://www.postgresql.org/docs/current/runtime-config-client.html#GUC-DATESTYLE>
104    #[parameter(default = "", rename = "datestyle")]
105    date_style: String,
106
107    /// Force the use of lookup join instead of hash join when possible for local batch execution.
108    #[parameter(default = true, alias = "rw_batch_enable_lookup_join")]
109    batch_enable_lookup_join: bool,
110
111    /// Enable usage of sortAgg instead of hash agg when order property is satisfied in batch
112    /// execution
113    #[parameter(default = true, alias = "rw_batch_enable_sort_agg")]
114    batch_enable_sort_agg: bool,
115
116    /// Enable distributed DML, so an insert, delete, and update statement can be executed in a distributed way (e.g. running in multiple compute nodes).
117    /// No atomicity guarantee in this mode. Its goal is to gain the best ingestion performance for initial batch ingestion where users always can drop their table when failure happens.
118    #[parameter(default = false, rename = "batch_enable_distributed_dml")]
119    batch_enable_distributed_dml: bool,
120
121    /// Evaluate expression in strict mode for batch queries.
122    /// If set to false, an expression failure will not cause an error but leave a null value
123    /// on the result set.
124    #[parameter(default = true)]
125    batch_expr_strict_mode: bool,
126
127    /// The max gap allowed to transform small range scan into multi point lookup.
128    #[parameter(default = 8)]
129    max_split_range_gap: i32,
130
131    /// Sets the order in which schemas are searched when an object (table, data type, function, etc.)
132    /// is referenced by a simple name with no schema specified.
133    /// See <https://www.postgresql.org/docs/14/runtime-config-client.html#GUC-SEARCH-PATH>
134    #[serde_as(as = "DisplayFromStr")]
135    #[parameter(default = SearchPath::default())]
136    search_path: SearchPath,
137
138    /// If `VISIBILITY_MODE` is all, we will support querying data without checkpoint.
139    #[serde_as(as = "DisplayFromStr")]
140    #[parameter(default = VisibilityMode::default())]
141    visibility_mode: VisibilityMode,
142
143    /// See <https://www.postgresql.org/docs/current/transaction-iso.html>
144    #[serde_as(as = "DisplayFromStr")]
145    #[parameter(default = IsolationLevel::default())]
146    transaction_isolation: IsolationLevel,
147
148    /// Select as of specific epoch.
149    /// Sets the historical epoch for querying data. If 0, querying latest data.
150    #[serde_as(as = "DisplayFromStr")]
151    #[parameter(default = ConfigNonZeroU64::default())]
152    query_epoch: ConfigNonZeroU64,
153
154    /// Session timezone. Defaults to UTC.
155    #[parameter(default = "UTC", check_hook = check_timezone)]
156    timezone: String,
157
158    /// The execution parallelism for streaming queries, including tables, materialized views, indexes,
159    /// and sinks. Defaults to 0, which means they will be scheduled adaptively based on the cluster size.
160    ///
161    /// If a non-zero value is set, streaming queries will be scheduled to use a fixed number of parallelism.
162    /// Note that the value will be bounded at `STREAMING_MAX_PARALLELISM`.
163    #[serde_as(as = "DisplayFromStr")]
164    #[parameter(default = ConfigParallelism::default())]
165    streaming_parallelism: ConfigParallelism,
166
167    /// Specific parallelism for table. By default, it will fall back to `STREAMING_PARALLELISM`.
168    #[serde_as(as = "DisplayFromStr")]
169    #[parameter(default = ConfigParallelism::default())]
170    streaming_parallelism_for_table: ConfigParallelism,
171
172    /// Specific parallelism for sink. By default, it will fall back to `STREAMING_PARALLELISM`.
173    #[serde_as(as = "DisplayFromStr")]
174    #[parameter(default = ConfigParallelism::default())]
175    streaming_parallelism_for_sink: ConfigParallelism,
176
177    /// Specific parallelism for index. By default, it will fall back to `STREAMING_PARALLELISM`.
178    #[serde_as(as = "DisplayFromStr")]
179    #[parameter(default = ConfigParallelism::default())]
180    streaming_parallelism_for_index: ConfigParallelism,
181
182    /// Specific parallelism for source. By default, it will fall back to `STREAMING_PARALLELISM`.
183    #[serde_as(as = "DisplayFromStr")]
184    #[parameter(default = ConfigParallelism::default())]
185    streaming_parallelism_for_source: ConfigParallelism,
186
187    /// Specific parallelism for materialized view. By default, it will fall back to `STREAMING_PARALLELISM`.
188    #[serde_as(as = "DisplayFromStr")]
189    #[parameter(default = ConfigParallelism::default())]
190    streaming_parallelism_for_materialized_view: ConfigParallelism,
191
192    /// Enable delta join for streaming queries. Defaults to false.
193    #[parameter(default = false, alias = "rw_streaming_enable_delta_join")]
194    streaming_enable_delta_join: bool,
195
196    /// Enable bushy join for streaming queries. Defaults to true.
197    #[parameter(default = true, alias = "rw_streaming_enable_bushy_join")]
198    streaming_enable_bushy_join: bool,
199
200    /// Force filtering to be done inside the join whenever there's a choice between optimizations.
201    /// Defaults to false.
202    #[parameter(default = false, alias = "rw_streaming_force_filter_inside_join")]
203    streaming_force_filter_inside_join: bool,
204
205    /// Enable arrangement backfill for streaming queries. Defaults to true.
206    /// When set to true, the parallelism of the upstream fragment will be
207    /// decoupled from the parallelism of the downstream scan fragment.
208    /// Or more generally, the parallelism of the upstream table / index / mv
209    /// will be decoupled from the parallelism of the downstream table / index / mv / sink.
210    #[parameter(default = true)]
211    streaming_use_arrangement_backfill: bool,
212
213    #[parameter(default = false)]
214    streaming_use_snapshot_backfill: bool,
215
216    /// Allow `jsonb` in stream key
217    #[parameter(default = false, alias = "rw_streaming_allow_jsonb_in_stream_key")]
218    streaming_allow_jsonb_in_stream_key: bool,
219
220    /// Enable materialized expressions for impure functions (typically UDF).
221    #[parameter(default = true)]
222    streaming_enable_materialized_expressions: bool,
223
224    /// Separate consecutive `StreamHashJoin` by no-shuffle `StreamExchange`
225    #[parameter(default = false)]
226    streaming_separate_consecutive_join: bool,
227
228    /// Determine which encoding will be used to encode join rows in operator cache.
229    #[serde_as(as = "DisplayFromStr")]
230    #[parameter(default = JoinEncodingType::default())]
231    streaming_join_encoding: JoinEncodingType,
232
233    /// Enable join ordering for streaming and batch queries. Defaults to true.
234    #[parameter(default = true, alias = "rw_enable_join_ordering")]
235    enable_join_ordering: bool,
236
237    /// Enable two phase agg optimization. Defaults to true.
238    /// Setting this to true will always set `FORCE_TWO_PHASE_AGG` to false.
239    #[parameter(default = true, flags = "SETTER", alias = "rw_enable_two_phase_agg")]
240    enable_two_phase_agg: bool,
241
242    /// Force two phase agg optimization whenever there's a choice between
243    /// optimizations. Defaults to false.
244    /// Setting this to true will always set `ENABLE_TWO_PHASE_AGG` to false.
245    #[parameter(default = false, flags = "SETTER", alias = "rw_force_two_phase_agg")]
246    force_two_phase_agg: bool,
247
248    /// Enable sharing of common sub-plans.
249    /// This means that DAG structured query plans can be constructed,
250    #[parameter(default = true, alias = "rw_enable_share_plan")]
251    /// rather than only tree structured query plans.
252    enable_share_plan: bool,
253
254    /// Enable split distinct agg
255    #[parameter(default = false, alias = "rw_force_split_distinct_agg")]
256    force_split_distinct_agg: bool,
257
258    /// See <https://www.postgresql.org/docs/current/runtime-config-client.html#GUC-INTERVALSTYLE>
259    #[parameter(default = "", rename = "intervalstyle")]
260    interval_style: String,
261
262    /// If `BATCH_PARALLELISM` is non-zero, batch queries will use this parallelism.
263    #[serde_as(as = "DisplayFromStr")]
264    #[parameter(default = ConfigNonZeroU64::default())]
265    batch_parallelism: ConfigNonZeroU64,
266
267    /// The version of PostgreSQL that Risingwave claims to be.
268    #[parameter(default = PG_VERSION)]
269    server_version: String,
270
271    /// The version of PostgreSQL that Risingwave claims to be.
272    #[parameter(default = SERVER_VERSION_NUM)]
273    server_version_num: i32,
274
275    /// see <https://www.postgresql.org/docs/15/runtime-config-client.html#GUC-CLIENT-MIN-MESSAGES>
276    #[parameter(default = "notice")]
277    client_min_messages: String,
278
279    /// see <https://www.postgresql.org/docs/15/runtime-config-client.html#GUC-CLIENT-ENCODING>
280    #[parameter(default = SERVER_ENCODING, check_hook = check_client_encoding)]
281    client_encoding: String,
282
283    /// Enable decoupling sink and internal streaming graph or not
284    #[serde_as(as = "DisplayFromStr")]
285    #[parameter(default = SinkDecouple::default())]
286    sink_decouple: SinkDecouple,
287
288    /// See <https://www.postgresql.org/docs/current/runtime-config-compatible.html#RUNTIME-CONFIG-COMPATIBLE-VERSION>
289    /// Unused in RisingWave, support for compatibility.
290    #[parameter(default = false)]
291    synchronize_seqscans: bool,
292
293    /// Abort query statement that takes more than the specified amount of time in sec. If
294    /// `log_min_error_statement` is set to ERROR or lower, the statement that timed out will also be
295    /// logged. If this value is specified without units, it is taken as milliseconds. A value of
296    /// zero (the default) disables the timeout.
297    #[parameter(default = 0u32)]
298    statement_timeout: u32,
299
300    /// Terminate any session that has been idle (that is, waiting for a client query) within an open transaction for longer than the specified amount of time in milliseconds.
301    #[parameter(default = 60000u32)]
302    idle_in_transaction_session_timeout: u32,
303
304    /// See <https://www.postgresql.org/docs/current/runtime-config-client.html#GUC-LOCK-TIMEOUT>
305    /// Unused in RisingWave, support for compatibility.
306    #[parameter(default = 0)]
307    lock_timeout: i32,
308
309    /// For limiting the startup time of a shareable CDC streaming source when the source is being created. Unit: seconds.
310    #[parameter(default = 30)]
311    cdc_source_wait_streaming_start_timeout: i32,
312
313    /// see <https://www.postgresql.org/docs/current/runtime-config-client.html#GUC-ROW-SECURITY>.
314    /// Unused in RisingWave, support for compatibility.
315    #[parameter(default = true)]
316    row_security: bool,
317
318    /// see <https://www.postgresql.org/docs/current/runtime-config-client.html#GUC-STANDARD-CONFORMING-STRINGS>
319    #[parameter(default = STANDARD_CONFORMING_STRINGS)]
320    standard_conforming_strings: String,
321
322    /// Set streaming rate limit (rows per second) for each parallelism for mv / source / sink backfilling
323    /// If set to -1, disable rate limit.
324    /// If set to 0, this pauses the snapshot read / source read.
325    #[parameter(default = DISABLE_BACKFILL_RATE_LIMIT)]
326    backfill_rate_limit: i32,
327
328    /// Set streaming rate limit (rows per second) for each parallelism for mv / source backfilling, source reads.
329    /// If set to -1, disable rate limit.
330    /// If set to 0, this pauses the snapshot read / source read.
331    #[parameter(default = DISABLE_SOURCE_RATE_LIMIT)]
332    source_rate_limit: i32,
333
334    /// Set streaming rate limit (rows per second) for each parallelism for table DML.
335    /// If set to -1, disable rate limit.
336    /// If set to 0, this pauses the DML.
337    #[parameter(default = DISABLE_DML_RATE_LIMIT)]
338    dml_rate_limit: i32,
339
340    /// Set sink rate limit (rows per second) for each parallelism for external sink.
341    /// If set to -1, disable rate limit.
342    /// If set to 0, this pauses the sink.
343    #[parameter(default = DISABLE_SINK_RATE_LIMIT)]
344    sink_rate_limit: i32,
345
346    /// Cache policy for partition cache in streaming over window.
347    /// Can be "full", "recent", "`recent_first_n`" or "`recent_last_n`".
348    #[serde_as(as = "DisplayFromStr")]
349    #[parameter(default = OverWindowCachePolicy::default(), alias = "rw_streaming_over_window_cache_policy")]
350    streaming_over_window_cache_policy: OverWindowCachePolicy,
351
352    /// Run DDL statements in background
353    #[parameter(default = false)]
354    background_ddl: bool,
355
356    /// Enable shared source. Currently only for Kafka.
357    ///
358    /// When enabled, `CREATE SOURCE` will create a source streaming job, and `CREATE MATERIALIZED VIEWS` from the source
359    /// will forward the data from the same source streaming job, and also backfill prior data from the external source.
360    #[parameter(default = true)]
361    streaming_use_shared_source: bool,
362
363    /// Shows the server-side character set encoding. At present, this parameter can be shown but not set, because the encoding is determined at database creation time.
364    #[parameter(default = SERVER_ENCODING)]
365    server_encoding: String,
366
367    #[parameter(default = "hex", check_hook = check_bytea_output)]
368    bytea_output: String,
369
370    /// Bypass checks on cluster limits
371    ///
372    /// When enabled, `CREATE MATERIALIZED VIEW` will not fail if the cluster limit is hit.
373    #[parameter(default = BYPASS_CLUSTER_LIMITS)]
374    bypass_cluster_limits: bool,
375
376    /// The maximum number of parallelism a streaming query can use. Defaults to 256.
377    ///
378    /// Compared to `STREAMING_PARALLELISM`, which configures the initial parallelism, this configures
379    /// the maximum parallelism a streaming query can use in the future, if the cluster size changes or
380    /// users manually change the parallelism with `ALTER .. SET PARALLELISM`.
381    ///
382    /// It's not always a good idea to set this to a very large number, as it may cause performance
383    /// degradation when performing range scans on the table or the materialized view.
384    // a.k.a. vnode count
385    #[parameter(default = VirtualNode::COUNT_FOR_COMPAT, check_hook = check_streaming_max_parallelism)]
386    streaming_max_parallelism: usize,
387
388    /// Used to provide the connection information for the iceberg engine.
389    /// Format: `iceberg_engine_connection` = `schema_name.connection_name`.
390    #[parameter(default = "", check_hook = check_iceberg_engine_connection)]
391    iceberg_engine_connection: String,
392
393    /// Whether the streaming join should be unaligned or not.
394    #[parameter(default = false)]
395    streaming_enable_unaligned_join: bool,
396
397    /// The timeout for reading from the buffer of the sync log store on barrier.
398    /// Every epoch we will attempt to read the full buffer of the sync log store.
399    /// If we hit the timeout, we will stop reading and continue.
400    #[parameter(default = 64_usize)]
401    streaming_sync_log_store_pause_duration_ms: usize,
402
403    /// The max buffer size for sync logstore, before we start flushing.
404    #[parameter(default = 2048_usize)]
405    streaming_sync_log_store_buffer_size: usize,
406
407    /// Whether to disable purifying the definition of the table or source upon retrieval.
408    /// Only set this if encountering issues with functionalities like `SHOW` or `ALTER TABLE/SOURCE`.
409    /// This config may be removed in the future.
410    #[parameter(default = false, flags = "NO_ALTER_SYS")]
411    disable_purify_definition: bool,
412}
413
414fn check_iceberg_engine_connection(val: &str) -> Result<(), String> {
415    if val.is_empty() {
416        return Ok(());
417    }
418
419    let parts: Vec<&str> = val.split('.').collect();
420    if parts.len() != 2 {
421        return Err("Invalid iceberg engine connection format, Should be set to this format: schema_name.connection_name.".to_owned());
422    }
423
424    Ok(())
425}
426
427fn check_timezone(val: &str) -> Result<(), String> {
428    // Check if the provided string is a valid timezone.
429    Tz::from_str_insensitive(val).map_err(|_e| "Not a valid timezone")?;
430    Ok(())
431}
432
433fn check_client_encoding(val: &str) -> Result<(), String> {
434    // https://github.com/postgres/postgres/blob/REL_15_3/src/common/encnames.c#L525
435    let clean = val.replace(|c: char| !c.is_ascii_alphanumeric(), "");
436    if !clean.eq_ignore_ascii_case("UTF8") {
437        Err("Only support 'UTF8' for CLIENT_ENCODING".to_owned())
438    } else {
439        Ok(())
440    }
441}
442
443fn check_bytea_output(val: &str) -> Result<(), String> {
444    if val == "hex" {
445        Ok(())
446    } else {
447        Err("Only support 'hex' for BYTEA_OUTPUT".to_owned())
448    }
449}
450
451/// Check if the provided value is a valid max parallelism.
452fn check_streaming_max_parallelism(val: &usize) -> Result<(), String> {
453    match val {
454        // TODO(var-vnode): this is to prevent confusion with singletons, after we distinguish
455        // them better, we may allow 1 as the max parallelism (though not much point).
456        0 | 1 => Err("STREAMING_MAX_PARALLELISM must be greater than 1".to_owned()),
457        2..=VirtualNode::MAX_COUNT => Ok(()),
458        _ => Err(format!(
459            "STREAMING_MAX_PARALLELISM must be less than or equal to {}",
460            VirtualNode::MAX_COUNT
461        )),
462    }
463}
464
465impl SessionConfig {
466    pub fn set_force_two_phase_agg(
467        &mut self,
468        val: bool,
469        reporter: &mut impl ConfigReporter,
470    ) -> SessionConfigResult<bool> {
471        let set_val = self.set_force_two_phase_agg_inner(val, reporter)?;
472        if self.force_two_phase_agg {
473            self.set_enable_two_phase_agg(true, reporter)
474        } else {
475            Ok(set_val)
476        }
477    }
478
479    pub fn set_enable_two_phase_agg(
480        &mut self,
481        val: bool,
482        reporter: &mut impl ConfigReporter,
483    ) -> SessionConfigResult<bool> {
484        let set_val = self.set_enable_two_phase_agg_inner(val, reporter)?;
485        if !self.force_two_phase_agg {
486            self.set_force_two_phase_agg(false, reporter)
487        } else {
488            Ok(set_val)
489        }
490    }
491}
492
493pub struct VariableInfo {
494    pub name: String,
495    pub setting: String,
496    pub description: String,
497}
498
499/// Report status or notice to caller.
500pub trait ConfigReporter {
501    fn report_status(&mut self, key: &str, new_val: String);
502}
503
504// Report nothing.
505impl ConfigReporter for () {
506    fn report_status(&mut self, _key: &str, _new_val: String) {}
507}
508
509#[cfg(test)]
510mod test {
511    use super::*;
512
513    #[derive(SessionConfig)]
514    struct TestConfig {
515        #[parameter(default = 1, flags = "NO_ALTER_SYS", alias = "test_param_alias" | "alias_param_test")]
516        test_param: i32,
517    }
518
519    #[test]
520    fn test_session_config_alias() {
521        let mut config = TestConfig::default();
522        config.set("test_param", "2".to_owned(), &mut ()).unwrap();
523        assert_eq!(config.get("test_param_alias").unwrap(), "2");
524        config
525            .set("alias_param_test", "3".to_owned(), &mut ())
526            .unwrap();
527        assert_eq!(config.get("test_param_alias").unwrap(), "3");
528        assert!(TestConfig::check_no_alter_sys("test_param").unwrap());
529    }
530}