risingwave_common/session_config/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod non_zero64;
16mod over_window;
17mod query_mode;
18mod search_path;
19pub mod sink_decouple;
20mod transaction_isolation_level;
21mod visibility_mode;
22
23use chrono_tz::Tz;
24pub use over_window::OverWindowCachePolicy;
25pub use query_mode::QueryMode;
26use risingwave_common_proc_macro::{ConfigDoc, SessionConfig};
27pub use search_path::{SearchPath, USER_NAME_WILD_CARD};
28use serde::{Deserialize, Serialize};
29use serde_with::{DisplayFromStr, serde_as};
30use thiserror::Error;
31
32use self::non_zero64::ConfigNonZeroU64;
33use crate::hash::VirtualNode;
34use crate::session_config::sink_decouple::SinkDecouple;
35use crate::session_config::transaction_isolation_level::IsolationLevel;
36pub use crate::session_config::visibility_mode::VisibilityMode;
37use crate::{PG_VERSION, SERVER_ENCODING, SERVER_VERSION_NUM, STANDARD_CONFORMING_STRINGS};
38
39pub const SESSION_CONFIG_LIST_SEP: &str = ", ";
40
41#[derive(Error, Debug)]
42pub enum SessionConfigError {
43    #[error("Invalid value `{value}` for `{entry}`")]
44    InvalidValue {
45        entry: &'static str,
46        value: String,
47        source: anyhow::Error,
48    },
49
50    #[error("Unrecognized config entry `{0}`")]
51    UnrecognizedEntry(String),
52}
53
54type SessionConfigResult<T> = std::result::Result<T, SessionConfigError>;
55
56// NOTE(kwannoel): We declare it separately as a constant,
57// otherwise seems like it can't infer the type of -1 when written inline.
58const DISABLE_BACKFILL_RATE_LIMIT: i32 = -1;
59const DISABLE_SOURCE_RATE_LIMIT: i32 = -1;
60const DISABLE_DML_RATE_LIMIT: i32 = -1;
61const DISABLE_SINK_RATE_LIMIT: i32 = -1;
62
63/// Default to bypass cluster limits iff in debug mode.
64const BYPASS_CLUSTER_LIMITS: bool = cfg!(debug_assertions);
65
66#[serde_as]
67/// This is the Session Config of RisingWave.
68#[derive(Clone, Debug, Deserialize, Serialize, SessionConfig, ConfigDoc, PartialEq)]
69pub struct SessionConfig {
70    /// If `RW_IMPLICIT_FLUSH` is on, then every INSERT/UPDATE/DELETE statement will block
71    /// until the entire dataflow is refreshed. In other words, every related table & MV will
72    /// be able to see the write.
73    #[parameter(default = false, alias = "rw_implicit_flush")]
74    implicit_flush: bool,
75
76    /// If `CREATE_COMPACTION_GROUP_FOR_MV` is on, dedicated compaction groups will be created in
77    /// MV creation.
78    #[parameter(default = false)]
79    create_compaction_group_for_mv: bool,
80
81    /// A temporary config variable to force query running in either local or distributed mode.
82    /// The default value is auto which means let the system decide to run batch queries in local
83    /// or distributed mode automatically.
84    #[serde_as(as = "DisplayFromStr")]
85    #[parameter(default = QueryMode::default())]
86    query_mode: QueryMode,
87
88    /// Sets the number of digits displayed for floating-point values.
89    /// See <https://www.postgresql.org/docs/current/runtime-config-client.html#:~:text=for%20more%20information.-,extra_float_digits,-(integer)>
90    #[parameter(default = 1)]
91    extra_float_digits: i32,
92
93    /// Sets the application name to be reported in statistics and logs.
94    /// See <https://www.postgresql.org/docs/14/runtime-config-logging.html#:~:text=What%20to%20Log-,application_name,-(string)>
95    #[parameter(default = "", flags = "REPORT")]
96    application_name: String,
97
98    /// It is typically set by an application upon connection to the server.
99    /// see <https://www.postgresql.org/docs/current/runtime-config-client.html#GUC-DATESTYLE>
100    #[parameter(default = "", rename = "datestyle")]
101    date_style: String,
102
103    /// Force the use of lookup join instead of hash join when possible for local batch execution.
104    #[parameter(default = true, alias = "rw_batch_enable_lookup_join")]
105    batch_enable_lookup_join: bool,
106
107    /// Enable usage of sortAgg instead of hash agg when order property is satisfied in batch
108    /// execution
109    #[parameter(default = true, alias = "rw_batch_enable_sort_agg")]
110    batch_enable_sort_agg: bool,
111
112    /// Enable distributed DML, so an insert, delete, and update statement can be executed in a distributed way (e.g. running in multiple compute nodes).
113    /// No atomicity guarantee in this mode. Its goal is to gain the best ingestion performance for initial batch ingestion where users always can drop their table when failure happens.
114    #[parameter(default = false, rename = "batch_enable_distributed_dml")]
115    batch_enable_distributed_dml: bool,
116
117    /// Evaluate expression in strict mode for batch queries.
118    /// If set to false, an expression failure will not cause an error but leave a null value
119    /// on the result set.
120    #[parameter(default = true)]
121    batch_expr_strict_mode: bool,
122
123    /// The max gap allowed to transform small range scan into multi point lookup.
124    #[parameter(default = 8)]
125    max_split_range_gap: i32,
126
127    /// Sets the order in which schemas are searched when an object (table, data type, function, etc.)
128    /// is referenced by a simple name with no schema specified.
129    /// See <https://www.postgresql.org/docs/14/runtime-config-client.html#GUC-SEARCH-PATH>
130    #[serde_as(as = "DisplayFromStr")]
131    #[parameter(default = SearchPath::default())]
132    search_path: SearchPath,
133
134    /// If `VISIBILITY_MODE` is all, we will support querying data without checkpoint.
135    #[serde_as(as = "DisplayFromStr")]
136    #[parameter(default = VisibilityMode::default())]
137    visibility_mode: VisibilityMode,
138
139    /// See <https://www.postgresql.org/docs/current/transaction-iso.html>
140    #[serde_as(as = "DisplayFromStr")]
141    #[parameter(default = IsolationLevel::default())]
142    transaction_isolation: IsolationLevel,
143
144    /// Select as of specific epoch.
145    /// Sets the historical epoch for querying data. If 0, querying latest data.
146    #[serde_as(as = "DisplayFromStr")]
147    #[parameter(default = ConfigNonZeroU64::default())]
148    query_epoch: ConfigNonZeroU64,
149
150    /// Session timezone. Defaults to UTC.
151    #[parameter(default = "UTC", check_hook = check_timezone)]
152    timezone: String,
153
154    /// The execution parallelism for streaming queries, including tables, materialized views, indexes,
155    /// and sinks. Defaults to 0, which means they will be scheduled adaptively based on the cluster size.
156    ///
157    /// If a non-zero value is set, streaming queries will be scheduled to use a fixed number of parallelism.
158    /// Note that the value will be bounded at `STREAMING_MAX_PARALLELISM`.
159    #[serde_as(as = "DisplayFromStr")]
160    #[parameter(default = ConfigNonZeroU64::default())]
161    streaming_parallelism: ConfigNonZeroU64,
162
163    /// Enable delta join for streaming queries. Defaults to false.
164    #[parameter(default = false, alias = "rw_streaming_enable_delta_join")]
165    streaming_enable_delta_join: bool,
166
167    /// Enable bushy join for streaming queries. Defaults to true.
168    #[parameter(default = true, alias = "rw_streaming_enable_bushy_join")]
169    streaming_enable_bushy_join: bool,
170
171    /// Enable arrangement backfill for streaming queries. Defaults to true.
172    /// When set to true, the parallelism of the upstream fragment will be
173    /// decoupled from the parallelism of the downstream scan fragment.
174    /// Or more generally, the parallelism of the upstream table / index / mv
175    /// will be decoupled from the parallelism of the downstream table / index / mv / sink.
176    #[parameter(default = true)]
177    streaming_use_arrangement_backfill: bool,
178
179    #[parameter(default = false)]
180    streaming_use_snapshot_backfill: bool,
181
182    /// Allow `jsonb` in stream key
183    #[parameter(default = false, alias = "rw_streaming_allow_jsonb_in_stream_key")]
184    streaming_allow_jsonb_in_stream_key: bool,
185
186    /// Enable join ordering for streaming and batch queries. Defaults to true.
187    #[parameter(default = true, alias = "rw_enable_join_ordering")]
188    enable_join_ordering: bool,
189
190    /// Enable two phase agg optimization. Defaults to true.
191    /// Setting this to true will always set `FORCE_TWO_PHASE_AGG` to false.
192    #[parameter(default = true, flags = "SETTER", alias = "rw_enable_two_phase_agg")]
193    enable_two_phase_agg: bool,
194
195    /// Force two phase agg optimization whenever there's a choice between
196    /// optimizations. Defaults to false.
197    /// Setting this to true will always set `ENABLE_TWO_PHASE_AGG` to false.
198    #[parameter(default = false, flags = "SETTER", alias = "rw_force_two_phase_agg")]
199    force_two_phase_agg: bool,
200
201    /// Enable sharing of common sub-plans.
202    /// This means that DAG structured query plans can be constructed,
203    #[parameter(default = true, alias = "rw_enable_share_plan")]
204    /// rather than only tree structured query plans.
205    enable_share_plan: bool,
206
207    /// Enable split distinct agg
208    #[parameter(default = false, alias = "rw_force_split_distinct_agg")]
209    force_split_distinct_agg: bool,
210
211    /// See <https://www.postgresql.org/docs/current/runtime-config-client.html#GUC-INTERVALSTYLE>
212    #[parameter(default = "", rename = "intervalstyle")]
213    interval_style: String,
214
215    /// If `BATCH_PARALLELISM` is non-zero, batch queries will use this parallelism.
216    #[serde_as(as = "DisplayFromStr")]
217    #[parameter(default = ConfigNonZeroU64::default())]
218    batch_parallelism: ConfigNonZeroU64,
219
220    /// The version of PostgreSQL that Risingwave claims to be.
221    #[parameter(default = PG_VERSION)]
222    server_version: String,
223
224    /// The version of PostgreSQL that Risingwave claims to be.
225    #[parameter(default = SERVER_VERSION_NUM)]
226    server_version_num: i32,
227
228    /// see <https://www.postgresql.org/docs/15/runtime-config-client.html#GUC-CLIENT-MIN-MESSAGES>
229    #[parameter(default = "notice")]
230    client_min_messages: String,
231
232    /// see <https://www.postgresql.org/docs/15/runtime-config-client.html#GUC-CLIENT-ENCODING>
233    #[parameter(default = SERVER_ENCODING, check_hook = check_client_encoding)]
234    client_encoding: String,
235
236    /// Enable decoupling sink and internal streaming graph or not
237    #[serde_as(as = "DisplayFromStr")]
238    #[parameter(default = SinkDecouple::default())]
239    sink_decouple: SinkDecouple,
240
241    /// See <https://www.postgresql.org/docs/current/runtime-config-compatible.html#RUNTIME-CONFIG-COMPATIBLE-VERSION>
242    /// Unused in RisingWave, support for compatibility.
243    #[parameter(default = false)]
244    synchronize_seqscans: bool,
245
246    /// Abort query statement that takes more than the specified amount of time in sec. If
247    /// `log_min_error_statement` is set to ERROR or lower, the statement that timed out will also be
248    /// logged. If this value is specified without units, it is taken as milliseconds. A value of
249    /// zero (the default) disables the timeout.
250    #[parameter(default = 0u32)]
251    statement_timeout: u32,
252
253    /// Terminate any session that has been idle (that is, waiting for a client query) within an open transaction for longer than the specified amount of time in milliseconds.
254    #[parameter(default = 60000u32)]
255    idle_in_transaction_session_timeout: u32,
256
257    /// See <https://www.postgresql.org/docs/current/runtime-config-client.html#GUC-LOCK-TIMEOUT>
258    /// Unused in RisingWave, support for compatibility.
259    #[parameter(default = 0)]
260    lock_timeout: i32,
261
262    /// For limiting the startup time of a shareable CDC streaming source when the source is being created. Unit: seconds.
263    #[parameter(default = 30)]
264    cdc_source_wait_streaming_start_timeout: i32,
265
266    /// see <https://www.postgresql.org/docs/current/runtime-config-client.html#GUC-ROW-SECURITY>.
267    /// Unused in RisingWave, support for compatibility.
268    #[parameter(default = true)]
269    row_security: bool,
270
271    /// see <https://www.postgresql.org/docs/current/runtime-config-client.html#GUC-STANDARD-CONFORMING-STRINGS>
272    #[parameter(default = STANDARD_CONFORMING_STRINGS)]
273    standard_conforming_strings: String,
274
275    /// Set streaming rate limit (rows per second) for each parallelism for mv / source / sink backfilling
276    /// If set to -1, disable rate limit.
277    /// If set to 0, this pauses the snapshot read / source read.
278    #[parameter(default = DISABLE_BACKFILL_RATE_LIMIT)]
279    backfill_rate_limit: i32,
280
281    /// Set streaming rate limit (rows per second) for each parallelism for mv / source backfilling, source reads.
282    /// If set to -1, disable rate limit.
283    /// If set to 0, this pauses the snapshot read / source read.
284    #[parameter(default = DISABLE_SOURCE_RATE_LIMIT)]
285    source_rate_limit: i32,
286
287    /// Set streaming rate limit (rows per second) for each parallelism for table DML.
288    /// If set to -1, disable rate limit.
289    /// If set to 0, this pauses the DML.
290    #[parameter(default = DISABLE_DML_RATE_LIMIT)]
291    dml_rate_limit: i32,
292
293    /// Set sink rate limit (rows per second) for each parallelism for external sink.
294    /// If set to -1, disable rate limit.
295    /// If set to 0, this pauses the sink.
296    #[parameter(default = DISABLE_SINK_RATE_LIMIT)]
297    sink_rate_limit: i32,
298
299    /// Cache policy for partition cache in streaming over window.
300    /// Can be "full", "recent", "`recent_first_n`" or "`recent_last_n`".
301    #[serde_as(as = "DisplayFromStr")]
302    #[parameter(default = OverWindowCachePolicy::default(), alias = "rw_streaming_over_window_cache_policy")]
303    streaming_over_window_cache_policy: OverWindowCachePolicy,
304
305    /// Run DDL statements in background
306    #[parameter(default = false)]
307    background_ddl: bool,
308
309    /// Enable shared source. Currently only for Kafka.
310    ///
311    /// When enabled, `CREATE SOURCE` will create a source streaming job, and `CREATE MATERIALIZED VIEWS` from the source
312    /// will forward the data from the same source streaming job, and also backfill prior data from the external source.
313    #[parameter(default = true)]
314    streaming_use_shared_source: bool,
315
316    /// Shows the server-side character set encoding. At present, this parameter can be shown but not set, because the encoding is determined at database creation time.
317    #[parameter(default = SERVER_ENCODING)]
318    server_encoding: String,
319
320    #[parameter(default = "hex", check_hook = check_bytea_output)]
321    bytea_output: String,
322
323    /// Bypass checks on cluster limits
324    ///
325    /// When enabled, `CREATE MATERIALIZED VIEW` will not fail if the cluster limit is hit.
326    #[parameter(default = BYPASS_CLUSTER_LIMITS)]
327    bypass_cluster_limits: bool,
328
329    /// The maximum number of parallelism a streaming query can use. Defaults to 256.
330    ///
331    /// Compared to `STREAMING_PARALLELISM`, which configures the initial parallelism, this configures
332    /// the maximum parallelism a streaming query can use in the future, if the cluster size changes or
333    /// users manually change the parallelism with `ALTER .. SET PARALLELISM`.
334    ///
335    /// It's not always a good idea to set this to a very large number, as it may cause performance
336    /// degradation when performing range scans on the table or the materialized view.
337    // a.k.a. vnode count
338    #[parameter(default = VirtualNode::COUNT_FOR_COMPAT, check_hook = check_streaming_max_parallelism)]
339    streaming_max_parallelism: usize,
340
341    /// Used to provide the connection information for the iceberg engine.
342    /// Format: iceberg_engine_connection = schema_name.connection_name.
343    #[parameter(default = "", check_hook = check_iceberg_engine_connection)]
344    iceberg_engine_connection: String,
345
346    /// Whether the streaming join should be unaligned or not.
347    #[parameter(default = false)]
348    streaming_enable_unaligned_join: bool,
349
350    /// The timeout for reading from the buffer of the sync log store on barrier.
351    /// Every epoch we will attempt to read the full buffer of the sync log store.
352    /// If we hit the timeout, we will stop reading and continue.
353    #[parameter(default = 64_usize)]
354    streaming_sync_log_store_pause_duration_ms: usize,
355
356    /// The max buffer size for sync logstore, before we start flushing.
357    #[parameter(default = 2048_usize)]
358    streaming_sync_log_store_buffer_size: usize,
359}
360
361fn check_iceberg_engine_connection(val: &str) -> Result<(), String> {
362    if val.is_empty() {
363        return Ok(());
364    }
365
366    let parts: Vec<&str> = val.split('.').collect();
367    if parts.len() != 2 {
368        return Err("Invalid iceberg engine connection format, Should be set to this format: schema_name.connection_name.".to_owned());
369    }
370
371    Ok(())
372}
373
374fn check_timezone(val: &str) -> Result<(), String> {
375    // Check if the provided string is a valid timezone.
376    Tz::from_str_insensitive(val).map_err(|_e| "Not a valid timezone")?;
377    Ok(())
378}
379
380fn check_client_encoding(val: &str) -> Result<(), String> {
381    // https://github.com/postgres/postgres/blob/REL_15_3/src/common/encnames.c#L525
382    let clean = val.replace(|c: char| !c.is_ascii_alphanumeric(), "");
383    if !clean.eq_ignore_ascii_case("UTF8") {
384        Err("Only support 'UTF8' for CLIENT_ENCODING".to_owned())
385    } else {
386        Ok(())
387    }
388}
389
390fn check_bytea_output(val: &str) -> Result<(), String> {
391    if val == "hex" {
392        Ok(())
393    } else {
394        Err("Only support 'hex' for BYTEA_OUTPUT".to_owned())
395    }
396}
397
398/// Check if the provided value is a valid max parallelism.
399fn check_streaming_max_parallelism(val: &usize) -> Result<(), String> {
400    match val {
401        // TODO(var-vnode): this is to prevent confusion with singletons, after we distinguish
402        // them better, we may allow 1 as the max parallelism (though not much point).
403        0 | 1 => Err("STREAMING_MAX_PARALLELISM must be greater than 1".to_owned()),
404        2..=VirtualNode::MAX_COUNT => Ok(()),
405        _ => Err(format!(
406            "STREAMING_MAX_PARALLELISM must be less than or equal to {}",
407            VirtualNode::MAX_COUNT
408        )),
409    }
410}
411
412impl SessionConfig {
413    pub fn set_force_two_phase_agg(
414        &mut self,
415        val: bool,
416        reporter: &mut impl ConfigReporter,
417    ) -> SessionConfigResult<bool> {
418        let set_val = self.set_force_two_phase_agg_inner(val, reporter)?;
419        if self.force_two_phase_agg {
420            self.set_enable_two_phase_agg(true, reporter)
421        } else {
422            Ok(set_val)
423        }
424    }
425
426    pub fn set_enable_two_phase_agg(
427        &mut self,
428        val: bool,
429        reporter: &mut impl ConfigReporter,
430    ) -> SessionConfigResult<bool> {
431        let set_val = self.set_enable_two_phase_agg_inner(val, reporter)?;
432        if !self.force_two_phase_agg {
433            self.set_force_two_phase_agg(false, reporter)
434        } else {
435            Ok(set_val)
436        }
437    }
438}
439
440pub struct VariableInfo {
441    pub name: String,
442    pub setting: String,
443    pub description: String,
444}
445
446/// Report status or notice to caller.
447pub trait ConfigReporter {
448    fn report_status(&mut self, key: &str, new_val: String);
449}
450
451// Report nothing.
452impl ConfigReporter for () {
453    fn report_status(&mut self, _key: &str, _new_val: String) {}
454}
455
456#[cfg(test)]
457mod test {
458    use super::*;
459
460    #[derive(SessionConfig)]
461    struct TestConfig {
462        #[parameter(default = 1, flags = "NO_ALTER_SYS", alias = "test_param_alias" | "alias_param_test")]
463        test_param: i32,
464    }
465
466    #[test]
467    fn test_session_config_alias() {
468        let mut config = TestConfig::default();
469        config.set("test_param", "2".to_owned(), &mut ()).unwrap();
470        assert_eq!(config.get("test_param_alias").unwrap(), "2");
471        config
472            .set("alias_param_test", "3".to_owned(), &mut ())
473            .unwrap();
474        assert_eq!(config.get("test_param_alias").unwrap(), "3");
475        assert!(TestConfig::check_no_alter_sys("test_param").unwrap());
476    }
477}