risingwave_common/session_config/mod.rs
1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod non_zero64;
16mod over_window;
17pub mod parallelism;
18mod query_mode;
19mod search_path;
20pub mod sink_decouple;
21mod transaction_isolation_level;
22mod visibility_mode;
23
24use chrono_tz::Tz;
25pub use over_window::OverWindowCachePolicy;
26pub use query_mode::QueryMode;
27use risingwave_common_proc_macro::{ConfigDoc, SessionConfig};
28pub use search_path::{SearchPath, USER_NAME_WILD_CARD};
29use serde::{Deserialize, Serialize};
30use serde_with::{DisplayFromStr, serde_as};
31use thiserror::Error;
32
33use self::non_zero64::ConfigNonZeroU64;
34use crate::hash::VirtualNode;
35use crate::session_config::parallelism::ConfigParallelism;
36use crate::session_config::sink_decouple::SinkDecouple;
37use crate::session_config::transaction_isolation_level::IsolationLevel;
38pub use crate::session_config::visibility_mode::VisibilityMode;
39use crate::{PG_VERSION, SERVER_ENCODING, SERVER_VERSION_NUM, STANDARD_CONFORMING_STRINGS};
40
41pub const SESSION_CONFIG_LIST_SEP: &str = ", ";
42
43#[derive(Error, Debug)]
44pub enum SessionConfigError {
45 #[error("Invalid value `{value}` for `{entry}`")]
46 InvalidValue {
47 entry: &'static str,
48 value: String,
49 source: anyhow::Error,
50 },
51
52 #[error("Unrecognized config entry `{0}`")]
53 UnrecognizedEntry(String),
54}
55
56type SessionConfigResult<T> = std::result::Result<T, SessionConfigError>;
57
58// NOTE(kwannoel): We declare it separately as a constant,
59// otherwise seems like it can't infer the type of -1 when written inline.
60const DISABLE_BACKFILL_RATE_LIMIT: i32 = -1;
61const DISABLE_SOURCE_RATE_LIMIT: i32 = -1;
62const DISABLE_DML_RATE_LIMIT: i32 = -1;
63const DISABLE_SINK_RATE_LIMIT: i32 = -1;
64
65/// Default to bypass cluster limits iff in debug mode.
66const BYPASS_CLUSTER_LIMITS: bool = cfg!(debug_assertions);
67
68#[serde_as]
69/// This is the Session Config of RisingWave.
70#[derive(Clone, Debug, Deserialize, Serialize, SessionConfig, ConfigDoc, PartialEq)]
71pub struct SessionConfig {
72 /// If `RW_IMPLICIT_FLUSH` is on, then every INSERT/UPDATE/DELETE statement will block
73 /// until the entire dataflow is refreshed. In other words, every related table & MV will
74 /// be able to see the write.
75 #[parameter(default = false, alias = "rw_implicit_flush")]
76 implicit_flush: bool,
77
78 /// If `CREATE_COMPACTION_GROUP_FOR_MV` is on, dedicated compaction groups will be created in
79 /// MV creation.
80 #[parameter(default = false)]
81 create_compaction_group_for_mv: bool,
82
83 /// A temporary config variable to force query running in either local or distributed mode.
84 /// The default value is auto which means let the system decide to run batch queries in local
85 /// or distributed mode automatically.
86 #[serde_as(as = "DisplayFromStr")]
87 #[parameter(default = QueryMode::default())]
88 query_mode: QueryMode,
89
90 /// Sets the number of digits displayed for floating-point values.
91 /// See <https://www.postgresql.org/docs/current/runtime-config-client.html#:~:text=for%20more%20information.-,extra_float_digits,-(integer)>
92 #[parameter(default = 1)]
93 extra_float_digits: i32,
94
95 /// Sets the application name to be reported in statistics and logs.
96 /// See <https://www.postgresql.org/docs/14/runtime-config-logging.html#:~:text=What%20to%20Log-,application_name,-(string)>
97 #[parameter(default = "", flags = "REPORT")]
98 application_name: String,
99
100 /// It is typically set by an application upon connection to the server.
101 /// see <https://www.postgresql.org/docs/current/runtime-config-client.html#GUC-DATESTYLE>
102 #[parameter(default = "", rename = "datestyle")]
103 date_style: String,
104
105 /// Force the use of lookup join instead of hash join when possible for local batch execution.
106 #[parameter(default = true, alias = "rw_batch_enable_lookup_join")]
107 batch_enable_lookup_join: bool,
108
109 /// Enable usage of sortAgg instead of hash agg when order property is satisfied in batch
110 /// execution
111 #[parameter(default = true, alias = "rw_batch_enable_sort_agg")]
112 batch_enable_sort_agg: bool,
113
114 /// Enable distributed DML, so an insert, delete, and update statement can be executed in a distributed way (e.g. running in multiple compute nodes).
115 /// No atomicity guarantee in this mode. Its goal is to gain the best ingestion performance for initial batch ingestion where users always can drop their table when failure happens.
116 #[parameter(default = false, rename = "batch_enable_distributed_dml")]
117 batch_enable_distributed_dml: bool,
118
119 /// Evaluate expression in strict mode for batch queries.
120 /// If set to false, an expression failure will not cause an error but leave a null value
121 /// on the result set.
122 #[parameter(default = true)]
123 batch_expr_strict_mode: bool,
124
125 /// The max gap allowed to transform small range scan into multi point lookup.
126 #[parameter(default = 8)]
127 max_split_range_gap: i32,
128
129 /// Sets the order in which schemas are searched when an object (table, data type, function, etc.)
130 /// is referenced by a simple name with no schema specified.
131 /// See <https://www.postgresql.org/docs/14/runtime-config-client.html#GUC-SEARCH-PATH>
132 #[serde_as(as = "DisplayFromStr")]
133 #[parameter(default = SearchPath::default())]
134 search_path: SearchPath,
135
136 /// If `VISIBILITY_MODE` is all, we will support querying data without checkpoint.
137 #[serde_as(as = "DisplayFromStr")]
138 #[parameter(default = VisibilityMode::default())]
139 visibility_mode: VisibilityMode,
140
141 /// See <https://www.postgresql.org/docs/current/transaction-iso.html>
142 #[serde_as(as = "DisplayFromStr")]
143 #[parameter(default = IsolationLevel::default())]
144 transaction_isolation: IsolationLevel,
145
146 /// Select as of specific epoch.
147 /// Sets the historical epoch for querying data. If 0, querying latest data.
148 #[serde_as(as = "DisplayFromStr")]
149 #[parameter(default = ConfigNonZeroU64::default())]
150 query_epoch: ConfigNonZeroU64,
151
152 /// Session timezone. Defaults to UTC.
153 #[parameter(default = "UTC", check_hook = check_timezone)]
154 timezone: String,
155
156 /// The execution parallelism for streaming queries, including tables, materialized views, indexes,
157 /// and sinks. Defaults to 0, which means they will be scheduled adaptively based on the cluster size.
158 ///
159 /// If a non-zero value is set, streaming queries will be scheduled to use a fixed number of parallelism.
160 /// Note that the value will be bounded at `STREAMING_MAX_PARALLELISM`.
161 #[serde_as(as = "DisplayFromStr")]
162 #[parameter(default = ConfigParallelism::default())]
163 streaming_parallelism: ConfigParallelism,
164
165 /// Specific parallelism for table. By default, it will fall back to `STREAMING_PARALLELISM`.
166 #[serde_as(as = "DisplayFromStr")]
167 #[parameter(default = ConfigParallelism::default())]
168 streaming_parallelism_for_table: ConfigParallelism,
169
170 /// Specific parallelism for sink. By default, it will fall back to `STREAMING_PARALLELISM`.
171 #[serde_as(as = "DisplayFromStr")]
172 #[parameter(default = ConfigParallelism::default())]
173 streaming_parallelism_for_sink: ConfigParallelism,
174
175 /// Specific parallelism for index. By default, it will fall back to `STREAMING_PARALLELISM`.
176 #[serde_as(as = "DisplayFromStr")]
177 #[parameter(default = ConfigParallelism::default())]
178 streaming_parallelism_for_index: ConfigParallelism,
179
180 /// Specific parallelism for source. By default, it will fall back to `STREAMING_PARALLELISM`.
181 #[serde_as(as = "DisplayFromStr")]
182 #[parameter(default = ConfigParallelism::default())]
183 streaming_parallelism_for_source: ConfigParallelism,
184
185 /// Specific parallelism for materialized view. By default, it will fall back to `STREAMING_PARALLELISM`.
186 #[serde_as(as = "DisplayFromStr")]
187 #[parameter(default = ConfigParallelism::default())]
188 streaming_parallelism_for_materialized_view: ConfigParallelism,
189
190 /// Enable delta join for streaming queries. Defaults to false.
191 #[parameter(default = false, alias = "rw_streaming_enable_delta_join")]
192 streaming_enable_delta_join: bool,
193
194 /// Enable bushy join for streaming queries. Defaults to true.
195 #[parameter(default = true, alias = "rw_streaming_enable_bushy_join")]
196 streaming_enable_bushy_join: bool,
197
198 /// Force filtering to be done inside the join whenever there's a choice between optimizations.
199 /// Defaults to false.
200 #[parameter(default = false, alias = "rw_streaming_force_filter_inside_join")]
201 streaming_force_filter_inside_join: bool,
202
203 /// Enable arrangement backfill for streaming queries. Defaults to true.
204 /// When set to true, the parallelism of the upstream fragment will be
205 /// decoupled from the parallelism of the downstream scan fragment.
206 /// Or more generally, the parallelism of the upstream table / index / mv
207 /// will be decoupled from the parallelism of the downstream table / index / mv / sink.
208 #[parameter(default = true)]
209 streaming_use_arrangement_backfill: bool,
210
211 #[parameter(default = false)]
212 streaming_use_snapshot_backfill: bool,
213
214 /// Allow `jsonb` in stream key
215 #[parameter(default = false, alias = "rw_streaming_allow_jsonb_in_stream_key")]
216 streaming_allow_jsonb_in_stream_key: bool,
217
218 /// Enable materialized expressions for impure functions (typically UDF).
219 #[parameter(default = true)]
220 streaming_enable_materialized_expressions: bool,
221
222 /// Enable join ordering for streaming and batch queries. Defaults to true.
223 #[parameter(default = true, alias = "rw_enable_join_ordering")]
224 enable_join_ordering: bool,
225
226 /// Enable two phase agg optimization. Defaults to true.
227 /// Setting this to true will always set `FORCE_TWO_PHASE_AGG` to false.
228 #[parameter(default = true, flags = "SETTER", alias = "rw_enable_two_phase_agg")]
229 enable_two_phase_agg: bool,
230
231 /// Force two phase agg optimization whenever there's a choice between
232 /// optimizations. Defaults to false.
233 /// Setting this to true will always set `ENABLE_TWO_PHASE_AGG` to false.
234 #[parameter(default = false, flags = "SETTER", alias = "rw_force_two_phase_agg")]
235 force_two_phase_agg: bool,
236
237 /// Enable sharing of common sub-plans.
238 /// This means that DAG structured query plans can be constructed,
239 #[parameter(default = true, alias = "rw_enable_share_plan")]
240 /// rather than only tree structured query plans.
241 enable_share_plan: bool,
242
243 /// Enable split distinct agg
244 #[parameter(default = false, alias = "rw_force_split_distinct_agg")]
245 force_split_distinct_agg: bool,
246
247 /// See <https://www.postgresql.org/docs/current/runtime-config-client.html#GUC-INTERVALSTYLE>
248 #[parameter(default = "", rename = "intervalstyle")]
249 interval_style: String,
250
251 /// If `BATCH_PARALLELISM` is non-zero, batch queries will use this parallelism.
252 #[serde_as(as = "DisplayFromStr")]
253 #[parameter(default = ConfigNonZeroU64::default())]
254 batch_parallelism: ConfigNonZeroU64,
255
256 /// The version of PostgreSQL that Risingwave claims to be.
257 #[parameter(default = PG_VERSION)]
258 server_version: String,
259
260 /// The version of PostgreSQL that Risingwave claims to be.
261 #[parameter(default = SERVER_VERSION_NUM)]
262 server_version_num: i32,
263
264 /// see <https://www.postgresql.org/docs/15/runtime-config-client.html#GUC-CLIENT-MIN-MESSAGES>
265 #[parameter(default = "notice")]
266 client_min_messages: String,
267
268 /// see <https://www.postgresql.org/docs/15/runtime-config-client.html#GUC-CLIENT-ENCODING>
269 #[parameter(default = SERVER_ENCODING, check_hook = check_client_encoding)]
270 client_encoding: String,
271
272 /// Enable decoupling sink and internal streaming graph or not
273 #[serde_as(as = "DisplayFromStr")]
274 #[parameter(default = SinkDecouple::default())]
275 sink_decouple: SinkDecouple,
276
277 /// See <https://www.postgresql.org/docs/current/runtime-config-compatible.html#RUNTIME-CONFIG-COMPATIBLE-VERSION>
278 /// Unused in RisingWave, support for compatibility.
279 #[parameter(default = false)]
280 synchronize_seqscans: bool,
281
282 /// Abort query statement that takes more than the specified amount of time in sec. If
283 /// `log_min_error_statement` is set to ERROR or lower, the statement that timed out will also be
284 /// logged. If this value is specified without units, it is taken as milliseconds. A value of
285 /// zero (the default) disables the timeout.
286 #[parameter(default = 0u32)]
287 statement_timeout: u32,
288
289 /// Terminate any session that has been idle (that is, waiting for a client query) within an open transaction for longer than the specified amount of time in milliseconds.
290 #[parameter(default = 60000u32)]
291 idle_in_transaction_session_timeout: u32,
292
293 /// See <https://www.postgresql.org/docs/current/runtime-config-client.html#GUC-LOCK-TIMEOUT>
294 /// Unused in RisingWave, support for compatibility.
295 #[parameter(default = 0)]
296 lock_timeout: i32,
297
298 /// For limiting the startup time of a shareable CDC streaming source when the source is being created. Unit: seconds.
299 #[parameter(default = 30)]
300 cdc_source_wait_streaming_start_timeout: i32,
301
302 /// see <https://www.postgresql.org/docs/current/runtime-config-client.html#GUC-ROW-SECURITY>.
303 /// Unused in RisingWave, support for compatibility.
304 #[parameter(default = true)]
305 row_security: bool,
306
307 /// see <https://www.postgresql.org/docs/current/runtime-config-client.html#GUC-STANDARD-CONFORMING-STRINGS>
308 #[parameter(default = STANDARD_CONFORMING_STRINGS)]
309 standard_conforming_strings: String,
310
311 /// Set streaming rate limit (rows per second) for each parallelism for mv / source / sink backfilling
312 /// If set to -1, disable rate limit.
313 /// If set to 0, this pauses the snapshot read / source read.
314 #[parameter(default = DISABLE_BACKFILL_RATE_LIMIT)]
315 backfill_rate_limit: i32,
316
317 /// Set streaming rate limit (rows per second) for each parallelism for mv / source backfilling, source reads.
318 /// If set to -1, disable rate limit.
319 /// If set to 0, this pauses the snapshot read / source read.
320 #[parameter(default = DISABLE_SOURCE_RATE_LIMIT)]
321 source_rate_limit: i32,
322
323 /// Set streaming rate limit (rows per second) for each parallelism for table DML.
324 /// If set to -1, disable rate limit.
325 /// If set to 0, this pauses the DML.
326 #[parameter(default = DISABLE_DML_RATE_LIMIT)]
327 dml_rate_limit: i32,
328
329 /// Set sink rate limit (rows per second) for each parallelism for external sink.
330 /// If set to -1, disable rate limit.
331 /// If set to 0, this pauses the sink.
332 #[parameter(default = DISABLE_SINK_RATE_LIMIT)]
333 sink_rate_limit: i32,
334
335 /// Cache policy for partition cache in streaming over window.
336 /// Can be "full", "recent", "`recent_first_n`" or "`recent_last_n`".
337 #[serde_as(as = "DisplayFromStr")]
338 #[parameter(default = OverWindowCachePolicy::default(), alias = "rw_streaming_over_window_cache_policy")]
339 streaming_over_window_cache_policy: OverWindowCachePolicy,
340
341 /// Run DDL statements in background
342 #[parameter(default = false)]
343 background_ddl: bool,
344
345 /// Enable shared source. Currently only for Kafka.
346 ///
347 /// When enabled, `CREATE SOURCE` will create a source streaming job, and `CREATE MATERIALIZED VIEWS` from the source
348 /// will forward the data from the same source streaming job, and also backfill prior data from the external source.
349 #[parameter(default = true)]
350 streaming_use_shared_source: bool,
351
352 /// Shows the server-side character set encoding. At present, this parameter can be shown but not set, because the encoding is determined at database creation time.
353 #[parameter(default = SERVER_ENCODING)]
354 server_encoding: String,
355
356 #[parameter(default = "hex", check_hook = check_bytea_output)]
357 bytea_output: String,
358
359 /// Bypass checks on cluster limits
360 ///
361 /// When enabled, `CREATE MATERIALIZED VIEW` will not fail if the cluster limit is hit.
362 #[parameter(default = BYPASS_CLUSTER_LIMITS)]
363 bypass_cluster_limits: bool,
364
365 /// The maximum number of parallelism a streaming query can use. Defaults to 256.
366 ///
367 /// Compared to `STREAMING_PARALLELISM`, which configures the initial parallelism, this configures
368 /// the maximum parallelism a streaming query can use in the future, if the cluster size changes or
369 /// users manually change the parallelism with `ALTER .. SET PARALLELISM`.
370 ///
371 /// It's not always a good idea to set this to a very large number, as it may cause performance
372 /// degradation when performing range scans on the table or the materialized view.
373 // a.k.a. vnode count
374 #[parameter(default = VirtualNode::COUNT_FOR_COMPAT, check_hook = check_streaming_max_parallelism)]
375 streaming_max_parallelism: usize,
376
377 /// Used to provide the connection information for the iceberg engine.
378 /// Format: iceberg_engine_connection = schema_name.connection_name.
379 #[parameter(default = "", check_hook = check_iceberg_engine_connection)]
380 iceberg_engine_connection: String,
381
382 /// Whether the streaming join should be unaligned or not.
383 #[parameter(default = false)]
384 streaming_enable_unaligned_join: bool,
385
386 /// The timeout for reading from the buffer of the sync log store on barrier.
387 /// Every epoch we will attempt to read the full buffer of the sync log store.
388 /// If we hit the timeout, we will stop reading and continue.
389 #[parameter(default = 64_usize)]
390 streaming_sync_log_store_pause_duration_ms: usize,
391
392 /// The max buffer size for sync logstore, before we start flushing.
393 #[parameter(default = 2048_usize)]
394 streaming_sync_log_store_buffer_size: usize,
395
396 /// Whether to disable purifying the definition of the table or source upon retrieval.
397 /// Only set this if encountering issues with functionalities like `SHOW` or `ALTER TABLE/SOURCE`.
398 /// This config may be removed in the future.
399 #[parameter(default = false, flags = "NO_ALTER_SYS")]
400 disable_purify_definition: bool,
401}
402
403fn check_iceberg_engine_connection(val: &str) -> Result<(), String> {
404 if val.is_empty() {
405 return Ok(());
406 }
407
408 let parts: Vec<&str> = val.split('.').collect();
409 if parts.len() != 2 {
410 return Err("Invalid iceberg engine connection format, Should be set to this format: schema_name.connection_name.".to_owned());
411 }
412
413 Ok(())
414}
415
416fn check_timezone(val: &str) -> Result<(), String> {
417 // Check if the provided string is a valid timezone.
418 Tz::from_str_insensitive(val).map_err(|_e| "Not a valid timezone")?;
419 Ok(())
420}
421
422fn check_client_encoding(val: &str) -> Result<(), String> {
423 // https://github.com/postgres/postgres/blob/REL_15_3/src/common/encnames.c#L525
424 let clean = val.replace(|c: char| !c.is_ascii_alphanumeric(), "");
425 if !clean.eq_ignore_ascii_case("UTF8") {
426 Err("Only support 'UTF8' for CLIENT_ENCODING".to_owned())
427 } else {
428 Ok(())
429 }
430}
431
432fn check_bytea_output(val: &str) -> Result<(), String> {
433 if val == "hex" {
434 Ok(())
435 } else {
436 Err("Only support 'hex' for BYTEA_OUTPUT".to_owned())
437 }
438}
439
440/// Check if the provided value is a valid max parallelism.
441fn check_streaming_max_parallelism(val: &usize) -> Result<(), String> {
442 match val {
443 // TODO(var-vnode): this is to prevent confusion with singletons, after we distinguish
444 // them better, we may allow 1 as the max parallelism (though not much point).
445 0 | 1 => Err("STREAMING_MAX_PARALLELISM must be greater than 1".to_owned()),
446 2..=VirtualNode::MAX_COUNT => Ok(()),
447 _ => Err(format!(
448 "STREAMING_MAX_PARALLELISM must be less than or equal to {}",
449 VirtualNode::MAX_COUNT
450 )),
451 }
452}
453
454impl SessionConfig {
455 pub fn set_force_two_phase_agg(
456 &mut self,
457 val: bool,
458 reporter: &mut impl ConfigReporter,
459 ) -> SessionConfigResult<bool> {
460 let set_val = self.set_force_two_phase_agg_inner(val, reporter)?;
461 if self.force_two_phase_agg {
462 self.set_enable_two_phase_agg(true, reporter)
463 } else {
464 Ok(set_val)
465 }
466 }
467
468 pub fn set_enable_two_phase_agg(
469 &mut self,
470 val: bool,
471 reporter: &mut impl ConfigReporter,
472 ) -> SessionConfigResult<bool> {
473 let set_val = self.set_enable_two_phase_agg_inner(val, reporter)?;
474 if !self.force_two_phase_agg {
475 self.set_force_two_phase_agg(false, reporter)
476 } else {
477 Ok(set_val)
478 }
479 }
480}
481
482pub struct VariableInfo {
483 pub name: String,
484 pub setting: String,
485 pub description: String,
486}
487
488/// Report status or notice to caller.
489pub trait ConfigReporter {
490 fn report_status(&mut self, key: &str, new_val: String);
491}
492
493// Report nothing.
494impl ConfigReporter for () {
495 fn report_status(&mut self, _key: &str, _new_val: String) {}
496}
497
498#[cfg(test)]
499mod test {
500 use super::*;
501
502 #[derive(SessionConfig)]
503 struct TestConfig {
504 #[parameter(default = 1, flags = "NO_ALTER_SYS", alias = "test_param_alias" | "alias_param_test")]
505 test_param: i32,
506 }
507
508 #[test]
509 fn test_session_config_alias() {
510 let mut config = TestConfig::default();
511 config.set("test_param", "2".to_owned(), &mut ()).unwrap();
512 assert_eq!(config.get("test_param_alias").unwrap(), "2");
513 config
514 .set("alias_param_test", "3".to_owned(), &mut ())
515 .unwrap();
516 assert_eq!(config.get("test_param_alias").unwrap(), "3");
517 assert!(TestConfig::check_no_alter_sys("test_param").unwrap());
518 }
519}