risingwave_common/session_config/mod.rs
1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod iceberg_query_storage_mode;
16mod non_zero64;
17mod opt;
18pub mod parallelism;
19mod query_mode;
20mod search_path;
21pub mod sink_decouple;
22mod statement_timeout;
23mod transaction_isolation_level;
24mod visibility_mode;
25
26use chrono_tz::Tz;
27pub use iceberg_query_storage_mode::IcebergQueryStorageMode;
28use itertools::Itertools;
29pub use opt::OptionConfig;
30pub use query_mode::QueryMode;
31use risingwave_common_proc_macro::{ConfigDoc, SessionConfig};
32pub use search_path::{SearchPath, USER_NAME_WILD_CARD};
33use serde::{Deserialize, Serialize};
34pub use statement_timeout::StatementTimeout;
35use thiserror::Error;
36
37use self::non_zero64::ConfigNonZeroU64;
38use crate::config::mutate::TomlTableMutateExt;
39use crate::config::streaming::{JoinEncodingType, OverWindowCachePolicy};
40use crate::config::{ConfigMergeError, StreamingConfig, merge_streaming_config_section};
41use crate::hash::VirtualNode;
42use crate::session_config::parallelism::{ConfigAdaptiveParallelismStrategy, ConfigParallelism};
43use crate::session_config::sink_decouple::SinkDecouple;
44use crate::session_config::transaction_isolation_level::IsolationLevel;
45pub use crate::session_config::visibility_mode::VisibilityMode;
46use crate::{PG_VERSION, SERVER_ENCODING, SERVER_VERSION_NUM, STANDARD_CONFORMING_STRINGS};
47
48pub const SESSION_CONFIG_LIST_SEP: &str = ", ";
49
50#[derive(Error, Debug)]
51pub enum SessionConfigError {
52 #[error("Invalid value `{value}` for `{entry}`")]
53 InvalidValue {
54 entry: &'static str,
55 value: String,
56 source: anyhow::Error,
57 },
58
59 #[error("Unrecognized config entry `{0}`")]
60 UnrecognizedEntry(String),
61}
62
63type SessionConfigResult<T> = std::result::Result<T, SessionConfigError>;
64
65// NOTE(kwannoel): We declare it separately as a constant,
66// otherwise seems like it can't infer the type of -1 when written inline.
67const DISABLE_BACKFILL_RATE_LIMIT: i32 = -1;
68const DISABLE_SOURCE_RATE_LIMIT: i32 = -1;
69const DISABLE_DML_RATE_LIMIT: i32 = -1;
70const DISABLE_SINK_RATE_LIMIT: i32 = -1;
71
72/// Default parallelism bound for tables
73const DEFAULT_TABLE_PARALLELISM_BOUND: std::num::NonZeroU64 = std::num::NonZeroU64::new(4).unwrap();
74
75/// Default parallelism bound for sources
76const DEFAULT_SOURCE_PARALLELISM_BOUND: std::num::NonZeroU64 =
77 std::num::NonZeroU64::new(4).unwrap();
78
79/// Default to bypass cluster limits iff in debug mode.
80const BYPASS_CLUSTER_LIMITS: bool = cfg!(debug_assertions);
81
82/// This is the Session Config of RisingWave.
83///
84/// All config entries implement `Display` and `FromStr` for getter and setter, to be read and
85/// altered within a session.
86///
87/// Users can change the default value of a configuration entry using `ALTER SYSTEM SET`. To
88/// facilitate this, a `serde` implementation is used as the wire format for retrieving initial
89/// configurations and updates from the meta service. It's important to note that the meta
90/// service stores the overridden value of each configuration entry per row with `Display` in
91/// the meta store, rather than using the `serde` format. However, we still delegate the `serde`
92/// impl of all fields to `Display`/`FromStr` to make it consistent.
93#[serde_with::apply(_ => #[serde_as(as = "serde_with::DisplayFromStr")] )]
94#[serde_with::serde_as]
95#[derive(Clone, Debug, Deserialize, Serialize, SessionConfig, ConfigDoc, PartialEq)]
96pub struct SessionConfig {
97 /// If `RW_IMPLICIT_FLUSH` is on, then every INSERT/UPDATE/DELETE statement will block
98 /// until the entire dataflow is refreshed. In other words, every related table & MV will
99 /// be able to see the write.
100 #[parameter(default = false, alias = "rw_implicit_flush")]
101 implicit_flush: bool,
102
103 /// If `CREATE_COMPACTION_GROUP_FOR_MV` is on, dedicated compaction groups will be created in
104 /// MV creation.
105 #[parameter(default = false)]
106 create_compaction_group_for_mv: bool,
107
108 /// A temporary config variable to force query running in either local or distributed mode.
109 /// The default value is auto which means let the system decide to run batch queries in local
110 /// or distributed mode automatically.
111 #[parameter(default = QueryMode::default())]
112 query_mode: QueryMode,
113
114 /// For Iceberg engine tables, which storage to use for batch SELECT: Iceberg (columnar) or
115 /// Hummock (row). Only affects batch SELECT on tables with ENGINE = ICEBERG.
116 #[parameter(default = IcebergQueryStorageMode::default())]
117 iceberg_query_storage_mode: IcebergQueryStorageMode,
118
119 /// Sets the number of digits displayed for floating-point values.
120 /// See <https://www.postgresql.org/docs/current/runtime-config-client.html#:~:text=for%20more%20information.-,extra_float_digits,-(integer)>
121 #[parameter(default = 1)]
122 extra_float_digits: i32,
123
124 /// Sets the application name to be reported in statistics and logs.
125 /// See <https://www.postgresql.org/docs/14/runtime-config-logging.html#:~:text=What%20to%20Log-,application_name,-(string)>
126 #[parameter(default = "", flags = "REPORT")]
127 application_name: String,
128
129 /// It is typically set by an application upon connection to the server.
130 /// see <https://www.postgresql.org/docs/current/runtime-config-client.html#GUC-DATESTYLE>
131 #[parameter(default = "", rename = "datestyle")]
132 date_style: String,
133
134 /// Force the use of lookup join instead of hash join when possible for local batch execution.
135 #[parameter(default = true, alias = "rw_batch_enable_lookup_join")]
136 batch_enable_lookup_join: bool,
137
138 /// Enable usage of sortAgg instead of hash agg when order property is satisfied in batch
139 /// execution
140 #[parameter(default = true, alias = "rw_batch_enable_sort_agg")]
141 batch_enable_sort_agg: bool,
142
143 /// Enable distributed DML, so an insert, delete, and update statement can be executed in a distributed way (e.g. running in multiple compute nodes).
144 /// No atomicity guarantee in this mode. Its goal is to gain the best ingestion performance for initial batch ingestion where users always can drop their table when failure happens.
145 #[parameter(default = false, rename = "batch_enable_distributed_dml")]
146 batch_enable_distributed_dml: bool,
147
148 /// Evaluate expression in strict mode for batch queries.
149 /// If set to false, an expression failure will not cause an error but leave a null value
150 /// on the result set.
151 #[parameter(default = true)]
152 batch_expr_strict_mode: bool,
153
154 /// The max gap allowed to transform small range scan into multi point lookup.
155 #[parameter(default = 8)]
156 max_split_range_gap: i32,
157
158 /// Sets the order in which schemas are searched when an object (table, data type, function, etc.)
159 /// is referenced by a simple name with no schema specified.
160 /// See <https://www.postgresql.org/docs/14/runtime-config-client.html#GUC-SEARCH-PATH>
161 #[parameter(default = SearchPath::default())]
162 search_path: SearchPath,
163
164 /// If `VISIBILITY_MODE` is all, we will support querying data without checkpoint.
165 #[parameter(default = VisibilityMode::default())]
166 visibility_mode: VisibilityMode,
167
168 /// See <https://www.postgresql.org/docs/current/transaction-iso.html>
169 #[parameter(default = IsolationLevel::default())]
170 transaction_isolation: IsolationLevel,
171
172 /// Select as of specific epoch.
173 /// Sets the historical epoch for querying data. If 0, querying latest data.
174 #[parameter(default = ConfigNonZeroU64::default())]
175 query_epoch: ConfigNonZeroU64,
176
177 /// Session timezone. Defaults to UTC.
178 #[parameter(default = "UTC", check_hook = check_timezone)]
179 timezone: String,
180
181 /// The execution parallelism for streaming queries, including tables, materialized views, indexes,
182 /// and sinks. Defaults to 0, which means they will be scheduled adaptively based on the cluster size.
183 ///
184 /// If a non-zero value is set, streaming queries will be scheduled to use a fixed number of parallelism.
185 /// Note that the value will be bounded at `STREAMING_MAX_PARALLELISM`.
186 #[parameter(default = ConfigParallelism::default())]
187 streaming_parallelism: ConfigParallelism,
188
189 /// Specific parallelism for backfill. By default, it will fall back to `STREAMING_PARALLELISM`.
190 #[parameter(default = ConfigParallelism::default())]
191 streaming_parallelism_for_backfill: ConfigParallelism,
192
193 /// The adaptive parallelism strategy for streaming jobs. Defaults to `default`, which follows the system setting.
194 #[parameter(default = ConfigAdaptiveParallelismStrategy::default())]
195 streaming_parallelism_strategy: ConfigAdaptiveParallelismStrategy,
196
197 /// Specific adaptive parallelism strategy for table. Defaults to `BOUNDED(4)`.
198 #[parameter(default = ConfigAdaptiveParallelismStrategy::Bounded(DEFAULT_TABLE_PARALLELISM_BOUND))]
199 streaming_parallelism_strategy_for_table: ConfigAdaptiveParallelismStrategy,
200
201 /// Specific parallelism for table. By default, it will fall back to `STREAMING_PARALLELISM`.
202 #[parameter(default = ConfigParallelism::default())]
203 streaming_parallelism_for_table: ConfigParallelism,
204
205 /// Specific adaptive parallelism strategy for sink. Falls back to `STREAMING_PARALLELISM_STRATEGY`.
206 #[parameter(default = ConfigAdaptiveParallelismStrategy::default())]
207 streaming_parallelism_strategy_for_sink: ConfigAdaptiveParallelismStrategy,
208
209 /// Specific parallelism for sink. By default, it will fall back to `STREAMING_PARALLELISM`.
210 #[parameter(default = ConfigParallelism::default())]
211 streaming_parallelism_for_sink: ConfigParallelism,
212
213 /// Specific adaptive parallelism strategy for index. Falls back to `STREAMING_PARALLELISM_STRATEGY`.
214 #[parameter(default = ConfigAdaptiveParallelismStrategy::default())]
215 streaming_parallelism_strategy_for_index: ConfigAdaptiveParallelismStrategy,
216
217 /// Specific parallelism for index. By default, it will fall back to `STREAMING_PARALLELISM`.
218 #[parameter(default = ConfigParallelism::default())]
219 streaming_parallelism_for_index: ConfigParallelism,
220
221 /// Specific adaptive parallelism strategy for source. Defaults to `BOUNDED(4)`.
222 #[parameter(default = ConfigAdaptiveParallelismStrategy::Bounded(DEFAULT_SOURCE_PARALLELISM_BOUND))]
223 streaming_parallelism_strategy_for_source: ConfigAdaptiveParallelismStrategy,
224
225 /// Specific parallelism for source. By default, it will fall back to `STREAMING_PARALLELISM`.
226 #[parameter(default = ConfigParallelism::default())]
227 streaming_parallelism_for_source: ConfigParallelism,
228
229 /// Specific adaptive parallelism strategy for materialized view. Falls back to `STREAMING_PARALLELISM_STRATEGY`.
230 #[parameter(default = ConfigAdaptiveParallelismStrategy::default())]
231 streaming_parallelism_strategy_for_materialized_view: ConfigAdaptiveParallelismStrategy,
232
233 /// Specific parallelism for materialized view. By default, it will fall back to `STREAMING_PARALLELISM`.
234 #[parameter(default = ConfigParallelism::default())]
235 streaming_parallelism_for_materialized_view: ConfigParallelism,
236
237 /// Enable delta join for streaming queries. Defaults to false.
238 #[parameter(default = false, alias = "rw_streaming_enable_delta_join")]
239 streaming_enable_delta_join: bool,
240
241 /// Enable bushy join for streaming queries. Defaults to true.
242 #[parameter(default = true, alias = "rw_streaming_enable_bushy_join")]
243 streaming_enable_bushy_join: bool,
244
245 /// Force filtering to be done inside the join whenever there's a choice between optimizations.
246 /// Defaults to false.
247 #[parameter(default = false, alias = "rw_streaming_force_filter_inside_join")]
248 streaming_force_filter_inside_join: bool,
249
250 /// Enable arrangement backfill for streaming queries. Defaults to true.
251 /// When set to true, the parallelism of the upstream fragment will be
252 /// decoupled from the parallelism of the downstream scan fragment.
253 /// Or more generally, the parallelism of the upstream table / index / mv
254 /// will be decoupled from the parallelism of the downstream table / index / mv / sink.
255 #[parameter(default = true)]
256 streaming_use_arrangement_backfill: bool,
257
258 #[parameter(default = true)]
259 streaming_use_snapshot_backfill: bool,
260
261 /// Enable serverless backfill for streaming queries. Defaults to false.
262 #[parameter(default = false)]
263 enable_serverless_backfill: bool,
264
265 /// Allow `jsonb` in stream key
266 #[parameter(default = false, alias = "rw_streaming_allow_jsonb_in_stream_key")]
267 streaming_allow_jsonb_in_stream_key: bool,
268
269 /// Unsafe: allow impure expressions on non-append-only streams without materialization.
270 ///
271 /// This may lead to inconsistent results or panics due to re-evaluation on updates/retracts.
272 #[parameter(default = false)]
273 streaming_unsafe_allow_unmaterialized_impure_expr: bool,
274
275 /// Separate consecutive `StreamHashJoin` by no-shuffle `StreamExchange`
276 #[parameter(default = false)]
277 streaming_separate_consecutive_join: bool,
278
279 /// Separate `StreamSink` by no-shuffle `StreamExchange`
280 #[parameter(default = false)]
281 streaming_separate_sink: bool,
282
283 /// Determine which encoding will be used to encode join rows in operator cache.
284 ///
285 /// This overrides the corresponding entry from the `[streaming.developer]` section in the config file,
286 /// taking effect for new streaming jobs created in the current session.
287 #[parameter(default = None)]
288 streaming_join_encoding: OptionConfig<JoinEncodingType>,
289
290 /// Enable join ordering for streaming and batch queries. Defaults to true.
291 #[parameter(default = true, alias = "rw_enable_join_ordering")]
292 enable_join_ordering: bool,
293
294 /// Enable two phase agg optimization. Defaults to true.
295 /// Setting this to true will always set `FORCE_TWO_PHASE_AGG` to false.
296 #[parameter(default = true, flags = "SETTER", alias = "rw_enable_two_phase_agg")]
297 enable_two_phase_agg: bool,
298
299 /// Force two phase agg optimization whenever there's a choice between
300 /// optimizations. Defaults to false.
301 /// Setting this to true will always set `ENABLE_TWO_PHASE_AGG` to false.
302 #[parameter(default = false, flags = "SETTER", alias = "rw_force_two_phase_agg")]
303 force_two_phase_agg: bool,
304
305 /// Enable sharing of common sub-plans.
306 /// This means that DAG structured query plans can be constructed,
307 #[parameter(default = true, alias = "rw_enable_share_plan")]
308 /// rather than only tree structured query plans.
309 enable_share_plan: bool,
310
311 /// Enable split distinct agg
312 #[parameter(default = false, alias = "rw_force_split_distinct_agg")]
313 force_split_distinct_agg: bool,
314
315 /// See <https://www.postgresql.org/docs/current/runtime-config-client.html#GUC-INTERVALSTYLE>
316 #[parameter(default = "", rename = "intervalstyle")]
317 interval_style: String,
318
319 /// If `BATCH_PARALLELISM` is non-zero, batch queries will use this parallelism.
320 #[parameter(default = ConfigNonZeroU64::default())]
321 batch_parallelism: ConfigNonZeroU64,
322
323 /// The version of PostgreSQL that Risingwave claims to be.
324 #[parameter(default = PG_VERSION)]
325 server_version: String,
326
327 /// The version of PostgreSQL that Risingwave claims to be.
328 #[parameter(default = SERVER_VERSION_NUM)]
329 server_version_num: i32,
330
331 /// see <https://www.postgresql.org/docs/15/runtime-config-client.html#GUC-CLIENT-MIN-MESSAGES>
332 #[parameter(default = "notice")]
333 client_min_messages: String,
334
335 /// see <https://www.postgresql.org/docs/15/runtime-config-client.html#GUC-CLIENT-ENCODING>
336 #[parameter(default = SERVER_ENCODING, check_hook = check_client_encoding)]
337 client_encoding: String,
338
339 /// Enable decoupling sink and internal streaming graph or not
340 #[parameter(default = SinkDecouple::default())]
341 sink_decouple: SinkDecouple,
342
343 /// See <https://www.postgresql.org/docs/current/runtime-config-compatible.html#RUNTIME-CONFIG-COMPATIBLE-VERSION>
344 /// Unused in RisingWave, support for compatibility.
345 #[parameter(default = false)]
346 synchronize_seqscans: bool,
347
348 /// Abort query statement that takes more than the specified amount of time in sec. If
349 /// `log_min_error_statement` is set to ERROR or lower, the statement that timed out will also be
350 /// logged. If this value is specified without units, it is taken as milliseconds. A value of
351 /// zero (the default) disables the timeout.
352 #[parameter(default = StatementTimeout::default())]
353 statement_timeout: StatementTimeout,
354
355 /// Terminate any session that has been idle (that is, waiting for a client query) within an open transaction for longer than the specified amount of time in milliseconds.
356 #[parameter(default = 60000u32)]
357 idle_in_transaction_session_timeout: u32,
358
359 /// See <https://www.postgresql.org/docs/current/runtime-config-client.html#GUC-LOCK-TIMEOUT>
360 /// Unused in RisingWave, support for compatibility.
361 #[parameter(default = 0)]
362 lock_timeout: i32,
363
364 /// For limiting the startup time of a shareable CDC streaming source when the source is being created. Unit: seconds.
365 #[parameter(default = 60)]
366 cdc_source_wait_streaming_start_timeout: i32,
367
368 /// see <https://www.postgresql.org/docs/current/runtime-config-client.html#GUC-ROW-SECURITY>.
369 /// Unused in RisingWave, support for compatibility.
370 #[parameter(default = true)]
371 row_security: bool,
372
373 /// see <https://www.postgresql.org/docs/current/runtime-config-client.html#GUC-STANDARD-CONFORMING-STRINGS>
374 #[parameter(default = STANDARD_CONFORMING_STRINGS)]
375 standard_conforming_strings: String,
376
377 /// Set streaming rate limit (rows per second) for each parallelism for mv / source / sink backfilling
378 /// If set to -1, disable rate limit.
379 /// If set to 0, this pauses the snapshot read / source read.
380 #[parameter(default = DISABLE_BACKFILL_RATE_LIMIT)]
381 backfill_rate_limit: i32,
382
383 /// Set streaming rate limit (rows per second) for each parallelism for mv / source backfilling, source reads.
384 /// If set to -1, disable rate limit.
385 /// If set to 0, this pauses the snapshot read / source read.
386 #[parameter(default = DISABLE_SOURCE_RATE_LIMIT)]
387 source_rate_limit: i32,
388
389 /// Set streaming rate limit (rows per second) for each parallelism for table DML.
390 /// If set to -1, disable rate limit.
391 /// If set to 0, this pauses the DML.
392 #[parameter(default = DISABLE_DML_RATE_LIMIT)]
393 dml_rate_limit: i32,
394
395 /// Set sink rate limit (rows per second) for each parallelism for external sink.
396 /// If set to -1, disable rate limit.
397 /// If set to 0, this pauses the sink.
398 #[parameter(default = DISABLE_SINK_RATE_LIMIT)]
399 sink_rate_limit: i32,
400
401 /// Cache policy for partition cache in streaming over window.
402 /// Can be `full`, `recent`, `recent_first_n` or `recent_last_n`.
403 ///
404 /// This overrides the corresponding entry from the `[streaming.developer]` section in the config file,
405 /// taking effect for new streaming jobs created in the current session.
406 #[parameter(default = None, alias = "rw_streaming_over_window_cache_policy")]
407 streaming_over_window_cache_policy: OptionConfig<OverWindowCachePolicy>,
408
409 /// Run DDL statements in background
410 #[parameter(default = false)]
411 background_ddl: bool,
412
413 /// Enable shared source. Currently only for Kafka.
414 ///
415 /// When enabled, `CREATE SOURCE` will create a source streaming job, and `CREATE MATERIALIZED VIEWS` from the source
416 /// will forward the data from the same source streaming job, and also backfill prior data from the external source.
417 #[parameter(default = true)]
418 streaming_use_shared_source: bool,
419
420 /// Shows the server-side character set encoding. At present, this parameter can be shown but not set, because the encoding is determined at database creation time.
421 #[parameter(default = SERVER_ENCODING)]
422 server_encoding: String,
423
424 #[parameter(default = "hex", check_hook = check_bytea_output)]
425 bytea_output: String,
426
427 /// Bypass checks on cluster limits
428 ///
429 /// When enabled, `CREATE MATERIALIZED VIEW` will not fail if the cluster limit is hit.
430 #[parameter(default = BYPASS_CLUSTER_LIMITS)]
431 bypass_cluster_limits: bool,
432
433 /// The maximum number of parallelism a streaming query can use. Defaults to 256.
434 ///
435 /// Compared to `STREAMING_PARALLELISM`, which configures the initial parallelism, this configures
436 /// the maximum parallelism a streaming query can use in the future, if the cluster size changes or
437 /// users manually change the parallelism with `ALTER .. SET PARALLELISM`.
438 ///
439 /// It's not always a good idea to set this to a very large number, as it may cause performance
440 /// degradation when performing range scans on the table or the materialized view.
441 // a.k.a. vnode count
442 #[parameter(default = VirtualNode::COUNT_FOR_COMPAT, check_hook = check_streaming_max_parallelism)]
443 streaming_max_parallelism: usize,
444
445 /// Used to provide the connection information for the iceberg engine.
446 /// Format: `iceberg_engine_connection` = `schema_name.connection_name`.
447 #[parameter(default = "", check_hook = check_iceberg_engine_connection)]
448 iceberg_engine_connection: String,
449
450 /// Whether the streaming join should be unaligned or not.
451 #[parameter(default = false)]
452 streaming_enable_unaligned_join: bool,
453
454 /// The timeout for reading from the buffer of the sync log store on barrier.
455 /// Every epoch we will attempt to read the full buffer of the sync log store.
456 /// If we hit the timeout, we will stop reading and continue.
457 ///
458 /// This overrides the corresponding entry from the `[streaming.developer]` section in the config file,
459 /// taking effect for new streaming jobs created in the current session.
460 #[parameter(default = None)]
461 streaming_sync_log_store_pause_duration_ms: OptionConfig<usize>,
462
463 /// The max buffer size for sync logstore, before we start flushing.
464 ///
465 /// This overrides the corresponding entry from the `[streaming.developer]` section in the config file,
466 /// taking effect for new streaming jobs created in the current session.
467 #[parameter(default = None)]
468 streaming_sync_log_store_buffer_size: OptionConfig<usize>,
469
470 /// Whether to disable purifying the definition of the table or source upon retrieval.
471 /// Only set this if encountering issues with functionalities like `SHOW` or `ALTER TABLE/SOURCE`.
472 /// This config may be removed in the future.
473 #[parameter(default = false, flags = "NO_ALTER_SYS")]
474 disable_purify_definition: bool,
475
476 /// The `ef_search` used in querying hnsw vector index
477 #[parameter(default = 40_usize)] // default value borrowed from pg_vector
478 batch_hnsw_ef_search: usize,
479
480 /// Enable index selection for queries
481 #[parameter(default = true)]
482 enable_index_selection: bool,
483
484 /// Enable mv selection for queries
485 #[parameter(default = false)]
486 enable_mv_selection: bool,
487
488 /// Enable locality backfill for streaming queries. Defaults to false.
489 #[parameter(default = false)]
490 enable_locality_backfill: bool,
491
492 /// Duration in seconds before notifying the user that a long-running DDL operation (e.g., DROP TABLE, CANCEL JOBS)
493 /// is still running. Set to 0 to disable notifications. Defaults to 30 seconds.
494 #[parameter(default = 30u32)]
495 slow_ddl_notification_secs: u32,
496
497 /// Unsafe: Enable storage retention for non-append-only tables.
498 /// Enabling this can lead to streaming inconsistency and node panic
499 /// if there is any row INSERT/UPDATE/DELETE operation corresponding to the ttled primary key.
500 #[parameter(default = false)]
501 unsafe_enable_storage_retention_for_non_append_only_tables: bool,
502
503 /// Enable DataFusion Engine
504 /// When enabled, queries involving Iceberg tables will be executed using the DataFusion engine.
505 #[parameter(default = false)]
506 enable_datafusion_engine: bool,
507
508 /// Prefer hash join over sort merge join in DataFusion engine
509 /// When enabled, the DataFusion engine will prioritize hash joins for query execution plans,
510 /// potentially improving performance for certain workloads, but may cause OOM for large datasets.
511 #[parameter(default = true)]
512 datafusion_prefer_hash_join: bool,
513
514 /// Emit chunks in upsert format for `UPDATE` and `DELETE` DMLs.
515 /// May lead to undefined behavior if the table is created with `ON CONFLICT DO NOTHING`.
516 ///
517 /// When enabled:
518 /// - `UPDATE` will only emit `Insert` records for new rows, instead of `Update` records.
519 /// - `DELETE` will only include key columns and pad the rest with NULL, instead of emitting complete rows.
520 #[parameter(default = false)]
521 upsert_dml: bool,
522}
523
524fn check_iceberg_engine_connection(val: &str) -> Result<(), String> {
525 if val.is_empty() {
526 return Ok(());
527 }
528
529 let parts: Vec<&str> = val.split('.').collect();
530 if parts.len() != 2 {
531 return Err("Invalid iceberg engine connection format, Should be set to this format: schema_name.connection_name.".to_owned());
532 }
533
534 Ok(())
535}
536
537fn check_timezone(val: &str) -> Result<(), String> {
538 // Check if the provided string is a valid timezone.
539 Tz::from_str_insensitive(val).map_err(|_e| "Not a valid timezone")?;
540 Ok(())
541}
542
543fn check_client_encoding(val: &str) -> Result<(), String> {
544 // https://github.com/postgres/postgres/blob/REL_15_3/src/common/encnames.c#L525
545 let clean = val.replace(|c: char| !c.is_ascii_alphanumeric(), "");
546 if !clean.eq_ignore_ascii_case("UTF8") {
547 Err("Only support 'UTF8' for CLIENT_ENCODING".to_owned())
548 } else {
549 Ok(())
550 }
551}
552
553fn check_bytea_output(val: &str) -> Result<(), String> {
554 if val == "hex" {
555 Ok(())
556 } else {
557 Err("Only support 'hex' for BYTEA_OUTPUT".to_owned())
558 }
559}
560
561/// Check if the provided value is a valid max parallelism.
562fn check_streaming_max_parallelism(val: &usize) -> Result<(), String> {
563 match val {
564 // TODO(var-vnode): this is to prevent confusion with singletons, after we distinguish
565 // them better, we may allow 1 as the max parallelism (though not much point).
566 0 | 1 => Err("STREAMING_MAX_PARALLELISM must be greater than 1".to_owned()),
567 2..=VirtualNode::MAX_COUNT => Ok(()),
568 _ => Err(format!(
569 "STREAMING_MAX_PARALLELISM must be less than or equal to {}",
570 VirtualNode::MAX_COUNT
571 )),
572 }
573}
574
575impl SessionConfig {
576 pub fn set_force_two_phase_agg(
577 &mut self,
578 val: bool,
579 reporter: &mut impl ConfigReporter,
580 ) -> SessionConfigResult<bool> {
581 let set_val = self.set_force_two_phase_agg_inner(val, reporter)?;
582 if self.force_two_phase_agg {
583 self.set_enable_two_phase_agg(true, reporter)
584 } else {
585 Ok(set_val)
586 }
587 }
588
589 pub fn set_enable_two_phase_agg(
590 &mut self,
591 val: bool,
592 reporter: &mut impl ConfigReporter,
593 ) -> SessionConfigResult<bool> {
594 let set_val = self.set_enable_two_phase_agg_inner(val, reporter)?;
595 if !self.force_two_phase_agg {
596 self.set_force_two_phase_agg(false, reporter)
597 } else {
598 Ok(set_val)
599 }
600 }
601}
602
603pub struct VariableInfo {
604 pub name: String,
605 pub setting: String,
606 pub description: String,
607}
608
609/// Report status or notice to caller.
610pub trait ConfigReporter {
611 fn report_status(&mut self, key: &str, new_val: String);
612}
613
614// Report nothing.
615impl ConfigReporter for () {
616 fn report_status(&mut self, _key: &str, _new_val: String) {}
617}
618
619def_anyhow_newtype! {
620 pub SessionConfigToOverrideError,
621 toml::ser::Error => "failed to serialize session config",
622 ConfigMergeError => transparent,
623}
624
625impl SessionConfig {
626 /// Generate an initial override for the streaming config from the session config.
627 pub fn to_initial_streaming_config_override(
628 &self,
629 ) -> Result<String, SessionConfigToOverrideError> {
630 let mut table = toml::Table::new();
631
632 // TODO: make this more type safe.
633 // We `unwrap` here to assert the hard-coded keys are correct.
634 if let Some(v) = self.streaming_join_encoding.as_ref() {
635 table
636 .upsert("streaming.developer.join_encoding_type", v)
637 .unwrap();
638 }
639 if let Some(v) = self.streaming_sync_log_store_pause_duration_ms.as_ref() {
640 table
641 .upsert("streaming.developer.sync_log_store_pause_duration_ms", v)
642 .unwrap();
643 }
644 if let Some(v) = self.streaming_sync_log_store_buffer_size.as_ref() {
645 table
646 .upsert("streaming.developer.sync_log_store_buffer_size", v)
647 .unwrap();
648 }
649 if let Some(v) = self.streaming_over_window_cache_policy.as_ref() {
650 table
651 .upsert("streaming.developer.over_window_cache_policy", v)
652 .unwrap();
653 }
654
655 let res = toml::to_string(&table)?;
656
657 // Validate all fields are valid by trying to merge it to the default config.
658 if !res.is_empty() {
659 let merged =
660 merge_streaming_config_section(&StreamingConfig::default(), res.as_str())?.unwrap();
661
662 let unrecognized_keys = merged.unrecognized_keys().collect_vec();
663 if !unrecognized_keys.is_empty() {
664 bail!("unrecognized configs: {:?}", unrecognized_keys);
665 }
666 }
667
668 Ok(res)
669 }
670}
671
672#[cfg(test)]
673mod test {
674 use expect_test::expect;
675
676 use super::*;
677
678 #[derive(SessionConfig)]
679 struct TestConfig {
680 #[parameter(default = 1, flags = "NO_ALTER_SYS", alias = "test_param_alias" | "alias_param_test")]
681 test_param: i32,
682 }
683
684 #[test]
685 fn test_session_config_alias() {
686 let mut config = TestConfig::default();
687 config.set("test_param", "2".to_owned(), &mut ()).unwrap();
688 assert_eq!(config.get("test_param_alias").unwrap(), "2");
689 config
690 .set("alias_param_test", "3".to_owned(), &mut ())
691 .unwrap();
692 assert_eq!(config.get("test_param_alias").unwrap(), "3");
693 assert!(TestConfig::check_no_alter_sys("test_param").unwrap());
694 }
695
696 #[test]
697 fn test_initial_streaming_config_override() {
698 let mut config = SessionConfig::default();
699 config
700 .set_streaming_join_encoding(Some(JoinEncodingType::Cpu).into(), &mut ())
701 .unwrap();
702 config
703 .set_streaming_over_window_cache_policy(
704 Some(OverWindowCachePolicy::RecentFirstN).into(),
705 &mut (),
706 )
707 .unwrap();
708
709 // Check the converted config override string.
710 let override_str = config.to_initial_streaming_config_override().unwrap();
711 expect![[r#"
712 [streaming.developer]
713 join_encoding_type = "cpu_optimized"
714 over_window_cache_policy = "recent_first_n"
715 "#]]
716 .assert_eq(&override_str);
717
718 // Try merging it to the default streaming config.
719 let merged = merge_streaming_config_section(&StreamingConfig::default(), &override_str)
720 .unwrap()
721 .unwrap();
722 assert_eq!(merged.developer.join_encoding_type, JoinEncodingType::Cpu);
723 assert_eq!(
724 merged.developer.over_window_cache_policy,
725 OverWindowCachePolicy::RecentFirstN
726 );
727 }
728}