1mod iceberg_query_storage_mode;
16mod non_zero64;
17mod opt;
18pub mod parallelism;
19mod query_mode;
20mod search_path;
21pub mod sink_decouple;
22mod statement_timeout;
23mod transaction_isolation_level;
24mod visibility_mode;
25
26use chrono_tz::Tz;
27pub use iceberg_query_storage_mode::IcebergQueryStorageMode;
28use itertools::Itertools;
29pub use opt::OptionConfig;
30pub use query_mode::QueryMode;
31use risingwave_common_proc_macro::{ConfigDoc, SessionConfig};
32pub use search_path::{SearchPath, USER_NAME_WILD_CARD};
33use serde::{Deserialize, Serialize};
34pub use statement_timeout::StatementTimeout;
35use thiserror::Error;
36
37use self::non_zero64::ConfigNonZeroU64;
38use crate::config::mutate::TomlTableMutateExt;
39use crate::config::streaming::{JoinEncodingType, OverWindowCachePolicy};
40use crate::config::{ConfigMergeError, StreamingConfig, merge_streaming_config_section};
41use crate::hash::VirtualNode;
42use crate::session_config::parallelism::{ConfigBackfillParallelism, ConfigParallelism};
43use crate::session_config::sink_decouple::SinkDecouple;
44use crate::session_config::transaction_isolation_level::IsolationLevel;
45pub use crate::session_config::visibility_mode::VisibilityMode;
46use crate::{PG_VERSION, SERVER_ENCODING, SERVER_VERSION_NUM, STANDARD_CONFORMING_STRINGS};
47
48pub const SESSION_CONFIG_LIST_SEP: &str = ", ";
49
50#[derive(Error, Debug)]
51pub enum SessionConfigError {
52 #[error("Invalid value `{value}` for `{entry}`")]
53 InvalidValue {
54 entry: &'static str,
55 value: String,
56 source: anyhow::Error,
57 },
58
59 #[error("Unrecognized config entry `{0}`")]
60 UnrecognizedEntry(String),
61}
62
63type SessionConfigResult<T> = std::result::Result<T, SessionConfigError>;
64
65const DISABLE_BACKFILL_RATE_LIMIT: i32 = -1;
68const DISABLE_SOURCE_RATE_LIMIT: i32 = -1;
69const DISABLE_DML_RATE_LIMIT: i32 = -1;
70const DISABLE_SINK_RATE_LIMIT: i32 = -1;
71
72const BYPASS_CLUSTER_LIMITS: bool = cfg!(debug_assertions);
74
75#[serde_with::apply(_ => #[serde_as(as = "serde_with::DisplayFromStr")] )]
87#[serde_with::serde_as]
88#[derive(Clone, Debug, Deserialize, Serialize, SessionConfig, ConfigDoc, PartialEq)]
89pub struct SessionConfig {
90 #[parameter(default = false, alias = "rw_implicit_flush")]
94 implicit_flush: bool,
95
96 #[parameter(default = false)]
99 create_compaction_group_for_mv: bool,
100
101 #[parameter(default = QueryMode::default())]
105 query_mode: QueryMode,
106
107 #[parameter(default = IcebergQueryStorageMode::default())]
110 iceberg_query_storage_mode: IcebergQueryStorageMode,
111
112 #[parameter(default = 1)]
115 extra_float_digits: i32,
116
117 #[parameter(default = "", flags = "REPORT")]
120 application_name: String,
121
122 #[parameter(default = "", rename = "datestyle")]
125 date_style: String,
126
127 #[parameter(default = true, alias = "rw_batch_enable_lookup_join")]
129 batch_enable_lookup_join: bool,
130
131 #[parameter(default = true, alias = "rw_batch_enable_sort_agg")]
134 batch_enable_sort_agg: bool,
135
136 #[parameter(default = false, rename = "batch_enable_distributed_dml")]
139 batch_enable_distributed_dml: bool,
140
141 #[parameter(default = true)]
145 batch_expr_strict_mode: bool,
146
147 #[parameter(default = 8)]
149 max_split_range_gap: i32,
150
151 #[parameter(default = SearchPath::default())]
155 search_path: SearchPath,
156
157 #[parameter(default = VisibilityMode::default())]
159 visibility_mode: VisibilityMode,
160
161 #[parameter(default = IsolationLevel::default())]
163 transaction_isolation: IsolationLevel,
164
165 #[parameter(default = ConfigNonZeroU64::default())]
168 query_epoch: ConfigNonZeroU64,
169
170 #[parameter(default = "UTC", check_hook = check_timezone)]
172 timezone: String,
173
174 #[parameter(default = ConfigParallelism::Default)]
178 streaming_parallelism: ConfigParallelism,
179
180 #[parameter(
183 default = ConfigBackfillParallelism::Default,
184 check_hook = check_streaming_parallelism_for_backfill
185 )]
186 streaming_parallelism_for_backfill: ConfigBackfillParallelism,
187
188 #[parameter(default = ConfigParallelism::Default)]
192 streaming_parallelism_for_table: ConfigParallelism,
193
194 #[parameter(default = ConfigParallelism::Default)]
196 streaming_parallelism_for_sink: ConfigParallelism,
197
198 #[parameter(default = ConfigParallelism::Default)]
200 streaming_parallelism_for_index: ConfigParallelism,
201
202 #[parameter(default = ConfigParallelism::Default)]
206 streaming_parallelism_for_source: ConfigParallelism,
207
208 #[parameter(default = ConfigParallelism::Default)]
210 streaming_parallelism_for_materialized_view: ConfigParallelism,
211
212 #[parameter(default = false, alias = "rw_streaming_enable_delta_join")]
214 streaming_enable_delta_join: bool,
215
216 #[parameter(default = true, alias = "rw_streaming_enable_bushy_join")]
218 streaming_enable_bushy_join: bool,
219
220 #[parameter(default = false, alias = "rw_streaming_force_filter_inside_join")]
223 streaming_force_filter_inside_join: bool,
224
225 #[parameter(default = true)]
231 streaming_use_arrangement_backfill: bool,
232
233 #[parameter(default = true)]
234 streaming_use_snapshot_backfill: bool,
235
236 #[parameter(default = false)]
238 enable_serverless_backfill: bool,
239
240 #[parameter(default = false, alias = "rw_streaming_allow_jsonb_in_stream_key")]
242 streaming_allow_jsonb_in_stream_key: bool,
243
244 #[parameter(default = false)]
248 streaming_unsafe_allow_unmaterialized_impure_expr: bool,
249
250 #[parameter(default = false)]
252 streaming_separate_consecutive_join: bool,
253
254 #[parameter(default = false)]
256 streaming_separate_sink: bool,
257
258 #[parameter(default = None)]
263 streaming_join_encoding: OptionConfig<JoinEncodingType>,
264
265 #[parameter(default = true, alias = "rw_enable_join_ordering")]
267 enable_join_ordering: bool,
268
269 #[parameter(default = true, flags = "SETTER", alias = "rw_enable_two_phase_agg")]
272 enable_two_phase_agg: bool,
273
274 #[parameter(default = false, flags = "SETTER", alias = "rw_force_two_phase_agg")]
278 force_two_phase_agg: bool,
279
280 #[parameter(default = true, alias = "rw_enable_share_plan")]
283 enable_share_plan: bool,
285
286 #[parameter(default = false, alias = "rw_force_split_distinct_agg")]
288 force_split_distinct_agg: bool,
289
290 #[parameter(default = "", rename = "intervalstyle")]
292 interval_style: String,
293
294 #[parameter(default = ConfigNonZeroU64::default())]
296 batch_parallelism: ConfigNonZeroU64,
297
298 #[parameter(default = PG_VERSION)]
300 server_version: String,
301
302 #[parameter(default = SERVER_VERSION_NUM)]
304 server_version_num: i32,
305
306 #[parameter(default = "notice")]
308 client_min_messages: String,
309
310 #[parameter(default = SERVER_ENCODING, check_hook = check_client_encoding)]
312 client_encoding: String,
313
314 #[parameter(default = SinkDecouple::default())]
316 sink_decouple: SinkDecouple,
317
318 #[parameter(default = false)]
321 synchronize_seqscans: bool,
322
323 #[parameter(default = StatementTimeout::default())]
328 statement_timeout: StatementTimeout,
329
330 #[parameter(default = 60000u32)]
332 idle_in_transaction_session_timeout: u32,
333
334 #[parameter(default = 0)]
337 lock_timeout: i32,
338
339 #[parameter(default = 60)]
341 cdc_source_wait_streaming_start_timeout: i32,
342
343 #[parameter(default = true)]
346 row_security: bool,
347
348 #[parameter(default = STANDARD_CONFORMING_STRINGS)]
350 standard_conforming_strings: String,
351
352 #[parameter(default = DISABLE_BACKFILL_RATE_LIMIT)]
356 backfill_rate_limit: i32,
357
358 #[parameter(default = DISABLE_SOURCE_RATE_LIMIT)]
362 source_rate_limit: i32,
363
364 #[parameter(default = DISABLE_DML_RATE_LIMIT)]
368 dml_rate_limit: i32,
369
370 #[parameter(default = DISABLE_SINK_RATE_LIMIT)]
374 sink_rate_limit: i32,
375
376 #[parameter(default = None, alias = "rw_streaming_over_window_cache_policy")]
382 streaming_over_window_cache_policy: OptionConfig<OverWindowCachePolicy>,
383
384 #[parameter(default = false)]
386 background_ddl: bool,
387
388 #[parameter(default = true)]
393 streaming_use_shared_source: bool,
394
395 #[parameter(default = SERVER_ENCODING)]
397 server_encoding: String,
398
399 #[parameter(default = "hex", check_hook = check_bytea_output)]
400 bytea_output: String,
401
402 #[parameter(default = BYPASS_CLUSTER_LIMITS)]
406 bypass_cluster_limits: bool,
407
408 #[parameter(default = VirtualNode::COUNT_FOR_COMPAT, check_hook = check_streaming_max_parallelism)]
418 streaming_max_parallelism: usize,
419
420 #[parameter(default = "", check_hook = check_iceberg_engine_connection)]
423 iceberg_engine_connection: String,
424
425 #[parameter(default = false)]
427 streaming_enable_unaligned_join: bool,
428
429 #[parameter(default = None)]
436 streaming_sync_log_store_pause_duration_ms: OptionConfig<usize>,
437
438 #[parameter(default = None)]
443 streaming_sync_log_store_buffer_size: OptionConfig<usize>,
444
445 #[parameter(default = false, flags = "NO_ALTER_SYS")]
449 disable_purify_definition: bool,
450
451 #[parameter(default = 40_usize)] batch_hnsw_ef_search: usize,
454
455 #[parameter(default = true)]
457 enable_index_selection: bool,
458
459 #[parameter(default = false)]
461 enable_mv_selection: bool,
462
463 #[parameter(default = false)]
465 enable_locality_backfill: bool,
466
467 #[parameter(default = 30u32)]
470 slow_ddl_notification_secs: u32,
471
472 #[parameter(default = false)]
476 unsafe_enable_storage_retention_for_non_append_only_tables: bool,
477
478 #[parameter(default = true)]
481 enable_datafusion_engine: bool,
482
483 #[parameter(default = true)]
487 datafusion_prefer_hash_join: bool,
488
489 #[parameter(default = false)]
496 upsert_dml: bool,
497}
498
499fn check_iceberg_engine_connection(val: &str) -> Result<(), String> {
500 if val.is_empty() {
501 return Ok(());
502 }
503
504 let parts: Vec<&str> = val.split('.').collect();
505 if parts.len() != 2 {
506 return Err("Invalid iceberg engine connection format, Should be set to this format: schema_name.connection_name.".to_owned());
507 }
508
509 Ok(())
510}
511
512fn check_timezone(val: &str) -> Result<(), String> {
513 Tz::from_str_insensitive(val).map_err(|_e| "Not a valid timezone")?;
515 Ok(())
516}
517
518fn check_client_encoding(val: &str) -> Result<(), String> {
519 let clean = val.replace(|c: char| !c.is_ascii_alphanumeric(), "");
521 if !clean.eq_ignore_ascii_case("UTF8") {
522 Err("Only support 'UTF8' for CLIENT_ENCODING".to_owned())
523 } else {
524 Ok(())
525 }
526}
527
528fn check_bytea_output(val: &str) -> Result<(), String> {
529 if val == "hex" {
530 Ok(())
531 } else {
532 Err("Only support 'hex' for BYTEA_OUTPUT".to_owned())
533 }
534}
535
536fn check_streaming_max_parallelism(val: &usize) -> Result<(), String> {
538 match val {
539 0 | 1 => Err("STREAMING_MAX_PARALLELISM must be greater than 1".to_owned()),
542 2..=VirtualNode::MAX_COUNT => Ok(()),
543 _ => Err(format!(
544 "STREAMING_MAX_PARALLELISM must be less than or equal to {}",
545 VirtualNode::MAX_COUNT
546 )),
547 }
548}
549
550fn check_streaming_parallelism_for_backfill(val: &ConfigBackfillParallelism) -> Result<(), String> {
551 match val {
552 ConfigBackfillParallelism::Default | ConfigBackfillParallelism::Fixed(_) => Ok(()),
553 ConfigBackfillParallelism::Adaptive
554 | ConfigBackfillParallelism::Bounded(_)
555 | ConfigBackfillParallelism::Ratio(_) => Err(
556 "Only `default` or fixed backfill parallelism is supported here; adaptive backfill strategy is deferred to a later change.".to_owned(),
557 ),
558 }
559}
560
561impl SessionConfig {
562 pub fn set_force_two_phase_agg(
563 &mut self,
564 val: bool,
565 reporter: &mut impl ConfigReporter,
566 ) -> SessionConfigResult<bool> {
567 let set_val = self.set_force_two_phase_agg_inner(val, reporter)?;
568 if self.force_two_phase_agg {
569 self.set_enable_two_phase_agg(true, reporter)
570 } else {
571 Ok(set_val)
572 }
573 }
574
575 pub fn set_enable_two_phase_agg(
576 &mut self,
577 val: bool,
578 reporter: &mut impl ConfigReporter,
579 ) -> SessionConfigResult<bool> {
580 let set_val = self.set_enable_two_phase_agg_inner(val, reporter)?;
581 if !self.force_two_phase_agg {
582 self.set_force_two_phase_agg(false, reporter)
583 } else {
584 Ok(set_val)
585 }
586 }
587}
588
589pub struct VariableInfo {
590 pub name: String,
591 pub setting: String,
592 pub description: String,
593}
594
595pub trait ConfigReporter {
597 fn report_status(&mut self, key: &str, new_val: String);
598}
599
600impl ConfigReporter for () {
602 fn report_status(&mut self, _key: &str, _new_val: String) {}
603}
604
605def_anyhow_newtype! {
606 pub SessionConfigToOverrideError,
607 toml::ser::Error => "failed to serialize session config",
608 ConfigMergeError => transparent,
609}
610
611impl SessionConfig {
612 pub fn to_initial_streaming_config_override(
614 &self,
615 ) -> Result<String, SessionConfigToOverrideError> {
616 let mut table = toml::Table::new();
617
618 if let Some(v) = self.streaming_join_encoding.as_ref() {
621 table
622 .upsert("streaming.developer.join_encoding_type", v)
623 .unwrap();
624 }
625 if let Some(v) = self.streaming_sync_log_store_pause_duration_ms.as_ref() {
626 table
627 .upsert("streaming.developer.sync_log_store_pause_duration_ms", v)
628 .unwrap();
629 }
630 if let Some(v) = self.streaming_sync_log_store_buffer_size.as_ref() {
631 table
632 .upsert("streaming.developer.sync_log_store_buffer_size", v)
633 .unwrap();
634 }
635 if let Some(v) = self.streaming_over_window_cache_policy.as_ref() {
636 table
637 .upsert("streaming.developer.over_window_cache_policy", v)
638 .unwrap();
639 }
640
641 let res = toml::to_string(&table)?;
642
643 if !res.is_empty() {
645 let merged =
646 merge_streaming_config_section(&StreamingConfig::default(), res.as_str())?.unwrap();
647
648 let unrecognized_keys = merged.unrecognized_keys().collect_vec();
649 if !unrecognized_keys.is_empty() {
650 bail!("unrecognized configs: {:?}", unrecognized_keys);
651 }
652 }
653
654 Ok(res)
655 }
656}
657
658#[cfg(test)]
659mod test {
660 use expect_test::expect;
661
662 use super::*;
663
664 #[derive(SessionConfig)]
665 struct TestConfig {
666 #[parameter(default = 1, flags = "NO_ALTER_SYS", alias = "test_param_alias" | "alias_param_test")]
667 test_param: i32,
668 }
669
670 #[test]
671 fn test_session_config_alias() {
672 let mut config = TestConfig::default();
673 config.set("test_param", "2".to_owned(), &mut ()).unwrap();
674 assert_eq!(config.get("test_param_alias").unwrap(), "2");
675 config
676 .set("alias_param_test", "3".to_owned(), &mut ())
677 .unwrap();
678 assert_eq!(config.get("test_param_alias").unwrap(), "3");
679 assert!(TestConfig::check_no_alter_sys("test_param").unwrap());
680 }
681
682 #[test]
683 fn test_initial_streaming_config_override() {
684 let mut config = SessionConfig::default();
685 config
686 .set_streaming_join_encoding(Some(JoinEncodingType::Cpu).into(), &mut ())
687 .unwrap();
688 config
689 .set_streaming_over_window_cache_policy(
690 Some(OverWindowCachePolicy::RecentFirstN).into(),
691 &mut (),
692 )
693 .unwrap();
694
695 let override_str = config.to_initial_streaming_config_override().unwrap();
697 expect![[r#"
698 [streaming.developer]
699 join_encoding_type = "cpu_optimized"
700 over_window_cache_policy = "recent_first_n"
701 "#]]
702 .assert_eq(&override_str);
703
704 let merged = merge_streaming_config_section(&StreamingConfig::default(), &override_str)
706 .unwrap()
707 .unwrap();
708 assert_eq!(merged.developer.join_encoding_type, JoinEncodingType::Cpu);
709 assert_eq!(
710 merged.developer.over_window_cache_policy,
711 OverWindowCachePolicy::RecentFirstN
712 );
713 }
714
715 #[test]
716 fn test_streaming_parallelism_defaults() {
717 let config = SessionConfig::default();
718
719 assert_eq!(config.streaming_parallelism(), ConfigParallelism::Default);
720 assert_eq!(
721 config.streaming_parallelism_for_table(),
722 ConfigParallelism::Default
723 );
724 assert_eq!(
725 config.streaming_parallelism_for_source(),
726 ConfigParallelism::Default
727 );
728 assert_eq!(
729 config.streaming_parallelism_for_sink(),
730 ConfigParallelism::Default
731 );
732 assert_eq!(
733 config.streaming_parallelism_for_index(),
734 ConfigParallelism::Default
735 );
736 assert_eq!(
737 config.streaming_parallelism_for_materialized_view(),
738 ConfigParallelism::Default
739 );
740 }
741
742 #[test]
743 fn test_streaming_parallelism_default_round_trip() {
744 let mut config = SessionConfig::default();
745
746 assert_eq!(config.get("streaming_parallelism").unwrap(), "default");
747 assert_eq!(
748 config.get("streaming_parallelism_for_table").unwrap(),
749 "default"
750 );
751 assert_eq!(
752 config.get("streaming_parallelism_for_source").unwrap(),
753 "default"
754 );
755
756 config
757 .set("streaming_parallelism", "default".to_owned(), &mut ())
758 .unwrap();
759 assert_eq!(config.get("streaming_parallelism").unwrap(), "default");
760
761 config
762 .set("streaming_parallelism", "bounded(16)".to_owned(), &mut ())
763 .unwrap();
764 config
765 .set(
766 "streaming_parallelism_for_table",
767 "bounded(8)".to_owned(),
768 &mut (),
769 )
770 .unwrap();
771 config
772 .set(
773 "streaming_parallelism_for_source",
774 "bounded(8)".to_owned(),
775 &mut (),
776 )
777 .unwrap();
778
779 assert_eq!(
780 config.reset("streaming_parallelism", &mut ()).unwrap(),
781 "default"
782 );
783 assert_eq!(
784 config
785 .reset("streaming_parallelism_for_table", &mut ())
786 .unwrap(),
787 "default"
788 );
789 assert_eq!(
790 config
791 .reset("streaming_parallelism_for_source", &mut ())
792 .unwrap(),
793 "default"
794 );
795 }
796
797 #[test]
798 fn test_streaming_parallelism_for_backfill_accepts_default_and_fixed() {
799 let mut config = SessionConfig::default();
800
801 config
802 .set(
803 "streaming_parallelism_for_backfill",
804 "default".to_owned(),
805 &mut (),
806 )
807 .unwrap();
808 assert_eq!(
809 config.get("streaming_parallelism_for_backfill").unwrap(),
810 "default"
811 );
812
813 config
814 .set(
815 "streaming_parallelism_for_backfill",
816 "2".to_owned(),
817 &mut (),
818 )
819 .unwrap();
820 assert_eq!(config.streaming_parallelism_for_backfill().to_string(), "2");
821 }
822
823 #[test]
824 fn test_streaming_parallelism_for_backfill_rejects_adaptive_modes() {
825 let mut config = SessionConfig::default();
826 let expected = "Only `default` or fixed backfill parallelism is supported here; adaptive backfill strategy is deferred to a later change.";
827
828 for value in ["adaptive", "bounded(2)", "ratio(0.5)"] {
829 let err = config
830 .set(
831 "streaming_parallelism_for_backfill",
832 value.to_owned(),
833 &mut (),
834 )
835 .unwrap_err();
836
837 match err {
838 SessionConfigError::InvalidValue {
839 entry,
840 value: actual_value,
841 source,
842 } => {
843 assert_eq!(entry, "streaming_parallelism_for_backfill");
844 assert_eq!(actual_value, value);
845 assert_eq!(source.to_string(), expected);
846 }
847 other => panic!("unexpected error: {other:?}"),
848 }
849 }
850 }
851}