1mod iceberg_query_storage_mode;
16mod non_zero64;
17mod opt;
18pub mod parallelism;
19mod query_mode;
20mod search_path;
21pub mod sink_decouple;
22mod statement_timeout;
23mod transaction_isolation_level;
24mod visibility_mode;
25
26use chrono_tz::Tz;
27pub use iceberg_query_storage_mode::IcebergQueryStorageMode;
28use itertools::Itertools;
29pub use opt::OptionConfig;
30pub use query_mode::QueryMode;
31use risingwave_common_proc_macro::{ConfigDoc, SessionConfig};
32pub use search_path::{SearchPath, USER_NAME_WILD_CARD};
33use serde::{Deserialize, Serialize};
34pub use statement_timeout::StatementTimeout;
35use thiserror::Error;
36
37use self::non_zero64::ConfigNonZeroU64;
38use crate::config::mutate::TomlTableMutateExt;
39use crate::config::streaming::{JoinEncodingType, OverWindowCachePolicy};
40use crate::config::{ConfigMergeError, StreamingConfig, merge_streaming_config_section};
41use crate::hash::VirtualNode;
42use crate::session_config::parallelism::{ConfigBackfillParallelism, ConfigParallelism};
43use crate::session_config::sink_decouple::SinkDecouple;
44use crate::session_config::transaction_isolation_level::IsolationLevel;
45pub use crate::session_config::visibility_mode::VisibilityMode;
46use crate::{PG_VERSION, SERVER_ENCODING, SERVER_VERSION_NUM, STANDARD_CONFORMING_STRINGS};
47
48pub const SESSION_CONFIG_LIST_SEP: &str = ", ";
49
50#[derive(Error, Debug)]
51pub enum SessionConfigError {
52 #[error("Invalid value `{value}` for `{entry}`")]
53 InvalidValue {
54 entry: &'static str,
55 value: String,
56 source: anyhow::Error,
57 },
58
59 #[error("Unrecognized config entry `{0}`")]
60 UnrecognizedEntry(String),
61}
62
63type SessionConfigResult<T> = std::result::Result<T, SessionConfigError>;
64
65const DISABLE_BACKFILL_RATE_LIMIT: i32 = -1;
68const DISABLE_SOURCE_RATE_LIMIT: i32 = -1;
69const DISABLE_DML_RATE_LIMIT: i32 = -1;
70const DISABLE_SINK_RATE_LIMIT: i32 = -1;
71
72const BYPASS_CLUSTER_LIMITS: bool = cfg!(debug_assertions);
74
75#[serde_with::apply(_ => #[serde_as(as = "serde_with::DisplayFromStr")] )]
87#[serde_with::serde_as]
88#[derive(Clone, Debug, Deserialize, Serialize, SessionConfig, ConfigDoc, PartialEq)]
89pub struct SessionConfig {
90 #[parameter(default = false, alias = "rw_implicit_flush")]
94 implicit_flush: bool,
95
96 #[parameter(default = false)]
99 dml_wait_persistence: bool,
100
101 #[parameter(default = false)]
104 create_compaction_group_for_mv: bool,
105
106 #[parameter(default = QueryMode::default())]
110 query_mode: QueryMode,
111
112 #[parameter(default = IcebergQueryStorageMode::default())]
115 iceberg_query_storage_mode: IcebergQueryStorageMode,
116
117 #[parameter(default = 1)]
120 extra_float_digits: i32,
121
122 #[parameter(default = "", flags = "REPORT")]
125 application_name: String,
126
127 #[parameter(default = "", rename = "datestyle")]
130 date_style: String,
131
132 #[parameter(default = true, alias = "rw_batch_enable_lookup_join")]
134 batch_enable_lookup_join: bool,
135
136 #[parameter(default = true, alias = "rw_batch_enable_sort_agg")]
139 batch_enable_sort_agg: bool,
140
141 #[parameter(default = false, rename = "batch_enable_distributed_dml")]
144 batch_enable_distributed_dml: bool,
145
146 #[parameter(default = true)]
150 batch_expr_strict_mode: bool,
151
152 #[parameter(default = 8)]
154 max_split_range_gap: i32,
155
156 #[parameter(default = SearchPath::default())]
160 search_path: SearchPath,
161
162 #[parameter(default = VisibilityMode::default())]
164 visibility_mode: VisibilityMode,
165
166 #[parameter(default = IsolationLevel::default())]
168 transaction_isolation: IsolationLevel,
169
170 #[parameter(default = ConfigNonZeroU64::default())]
173 query_epoch: ConfigNonZeroU64,
174
175 #[parameter(default = "UTC", check_hook = check_timezone)]
177 timezone: String,
178
179 #[parameter(default = ConfigParallelism::Default, flags = "SESSION_INIT")]
183 streaming_parallelism: ConfigParallelism,
184
185 #[parameter(
188 default = ConfigBackfillParallelism::Default,
189 check_hook = check_streaming_parallelism_for_backfill,
190 flags = "SESSION_INIT"
191 )]
192 streaming_parallelism_for_backfill: ConfigBackfillParallelism,
193
194 #[parameter(default = ConfigParallelism::Default, flags = "SESSION_INIT")]
198 streaming_parallelism_for_table: ConfigParallelism,
199
200 #[parameter(default = ConfigParallelism::Default, flags = "SESSION_INIT")]
202 streaming_parallelism_for_sink: ConfigParallelism,
203
204 #[parameter(default = ConfigParallelism::Default, flags = "SESSION_INIT")]
206 streaming_parallelism_for_index: ConfigParallelism,
207
208 #[parameter(default = ConfigParallelism::Default, flags = "SESSION_INIT")]
212 streaming_parallelism_for_source: ConfigParallelism,
213
214 #[parameter(default = ConfigParallelism::Default, flags = "SESSION_INIT")]
216 streaming_parallelism_for_materialized_view: ConfigParallelism,
217
218 #[parameter(default = false, alias = "rw_streaming_enable_delta_join")]
220 streaming_enable_delta_join: bool,
221
222 #[parameter(default = true, alias = "rw_streaming_enable_bushy_join")]
224 streaming_enable_bushy_join: bool,
225
226 #[parameter(default = false, alias = "rw_streaming_force_filter_inside_join")]
229 streaming_force_filter_inside_join: bool,
230
231 #[parameter(default = true)]
237 streaming_use_arrangement_backfill: bool,
238
239 #[parameter(default = true)]
240 streaming_use_snapshot_backfill: bool,
241
242 #[parameter(default = false)]
244 enable_serverless_backfill: bool,
245
246 #[parameter(default = false, alias = "rw_streaming_allow_jsonb_in_stream_key")]
248 streaming_allow_jsonb_in_stream_key: bool,
249
250 #[parameter(default = false)]
254 streaming_unsafe_allow_unmaterialized_impure_expr: bool,
255
256 #[parameter(default = false)]
258 streaming_separate_consecutive_join: bool,
259
260 #[parameter(default = false)]
262 streaming_separate_sink: bool,
263
264 #[parameter(default = None)]
269 streaming_join_encoding: OptionConfig<JoinEncodingType>,
270
271 #[parameter(default = true, alias = "rw_enable_join_ordering")]
273 enable_join_ordering: bool,
274
275 #[parameter(default = true, flags = "SETTER", alias = "rw_enable_two_phase_agg")]
278 enable_two_phase_agg: bool,
279
280 #[parameter(default = false, flags = "SETTER", alias = "rw_force_two_phase_agg")]
284 force_two_phase_agg: bool,
285
286 #[parameter(default = true, alias = "rw_enable_share_plan")]
289 enable_share_plan: bool,
291
292 #[parameter(default = false, alias = "rw_force_split_distinct_agg")]
294 force_split_distinct_agg: bool,
295
296 #[parameter(default = "", rename = "intervalstyle")]
298 interval_style: String,
299
300 #[parameter(default = ConfigNonZeroU64::default())]
302 batch_parallelism: ConfigNonZeroU64,
303
304 #[parameter(default = PG_VERSION)]
306 server_version: String,
307
308 #[parameter(default = SERVER_VERSION_NUM)]
310 server_version_num: i32,
311
312 #[parameter(default = "notice")]
314 client_min_messages: String,
315
316 #[parameter(default = SERVER_ENCODING, check_hook = check_client_encoding)]
318 client_encoding: String,
319
320 #[parameter(default = SinkDecouple::default())]
322 sink_decouple: SinkDecouple,
323
324 #[parameter(default = false)]
327 synchronize_seqscans: bool,
328
329 #[parameter(default = StatementTimeout::default())]
334 statement_timeout: StatementTimeout,
335
336 #[parameter(default = 60000u32)]
338 idle_in_transaction_session_timeout: u32,
339
340 #[parameter(default = 0)]
343 lock_timeout: i32,
344
345 #[parameter(default = 60)]
347 cdc_source_wait_streaming_start_timeout: i32,
348
349 #[parameter(default = true)]
352 row_security: bool,
353
354 #[parameter(default = STANDARD_CONFORMING_STRINGS)]
356 standard_conforming_strings: String,
357
358 #[parameter(default = DISABLE_BACKFILL_RATE_LIMIT)]
362 backfill_rate_limit: i32,
363
364 #[parameter(default = DISABLE_SOURCE_RATE_LIMIT)]
368 source_rate_limit: i32,
369
370 #[parameter(default = DISABLE_DML_RATE_LIMIT)]
374 dml_rate_limit: i32,
375
376 #[parameter(default = DISABLE_SINK_RATE_LIMIT)]
380 sink_rate_limit: i32,
381
382 #[parameter(default = None, alias = "rw_streaming_over_window_cache_policy")]
388 streaming_over_window_cache_policy: OptionConfig<OverWindowCachePolicy>,
389
390 #[parameter(default = false)]
392 background_ddl: bool,
393
394 #[parameter(default = true)]
399 streaming_use_shared_source: bool,
400
401 #[parameter(default = true)]
408 streaming_asof_join_use_cache: bool,
409
410 #[parameter(default = SERVER_ENCODING)]
412 server_encoding: String,
413
414 #[parameter(default = "hex", check_hook = check_bytea_output)]
415 bytea_output: String,
416
417 #[parameter(default = BYPASS_CLUSTER_LIMITS)]
421 bypass_cluster_limits: bool,
422
423 #[parameter(default = VirtualNode::COUNT_FOR_COMPAT, check_hook = check_streaming_max_parallelism)]
433 streaming_max_parallelism: usize,
434
435 #[parameter(default = "", check_hook = check_iceberg_engine_connection)]
438 iceberg_engine_connection: String,
439
440 #[parameter(default = false)]
442 streaming_enable_unaligned_join: bool,
443
444 #[parameter(default = None)]
451 streaming_sync_log_store_pause_duration_ms: OptionConfig<usize>,
452
453 #[parameter(default = None)]
458 streaming_sync_log_store_buffer_size: OptionConfig<usize>,
459
460 #[parameter(default = false, flags = "NO_ALTER_SYS")]
464 disable_purify_definition: bool,
465
466 #[parameter(default = 40_usize)] batch_hnsw_ef_search: usize,
469
470 #[parameter(default = true)]
472 enable_index_selection: bool,
473
474 #[parameter(default = false)]
476 enable_mv_selection: bool,
477
478 #[parameter(default = false)]
480 enable_locality_backfill: bool,
481
482 #[parameter(default = 30u32)]
485 slow_ddl_notification_secs: u32,
486
487 #[parameter(default = false)]
491 unsafe_enable_storage_retention_for_non_append_only_tables: bool,
492
493 #[parameter(default = true)]
496 enable_datafusion_engine: bool,
497
498 #[parameter(default = true)]
502 datafusion_prefer_hash_join: bool,
503
504 #[parameter(default = false)]
511 upsert_dml: bool,
512}
513
514fn check_iceberg_engine_connection(val: &str) -> Result<(), String> {
515 if val.is_empty() {
516 return Ok(());
517 }
518
519 let parts: Vec<&str> = val.split('.').collect();
520 if parts.len() != 2 {
521 return Err("Invalid iceberg engine connection format, Should be set to this format: schema_name.connection_name.".to_owned());
522 }
523
524 Ok(())
525}
526
527fn check_timezone(val: &str) -> Result<(), String> {
528 Tz::from_str_insensitive(val).map_err(|_e| "Not a valid timezone")?;
530 Ok(())
531}
532
533fn check_client_encoding(val: &str) -> Result<(), String> {
534 let clean = val.replace(|c: char| !c.is_ascii_alphanumeric(), "");
536 if !clean.eq_ignore_ascii_case("UTF8") {
537 Err("Only support 'UTF8' for CLIENT_ENCODING".to_owned())
538 } else {
539 Ok(())
540 }
541}
542
543fn check_bytea_output(val: &str) -> Result<(), String> {
544 if val == "hex" {
545 Ok(())
546 } else {
547 Err("Only support 'hex' for BYTEA_OUTPUT".to_owned())
548 }
549}
550
551fn check_streaming_max_parallelism(val: &usize) -> Result<(), String> {
553 match val {
554 0 | 1 => Err("STREAMING_MAX_PARALLELISM must be greater than 1".to_owned()),
557 2..=VirtualNode::MAX_COUNT => Ok(()),
558 _ => Err(format!(
559 "STREAMING_MAX_PARALLELISM must be less than or equal to {}",
560 VirtualNode::MAX_COUNT
561 )),
562 }
563}
564
565fn check_streaming_parallelism_for_backfill(val: &ConfigBackfillParallelism) -> Result<(), String> {
566 match val {
567 ConfigBackfillParallelism::Default | ConfigBackfillParallelism::Fixed(_) => Ok(()),
568 ConfigBackfillParallelism::Adaptive
569 | ConfigBackfillParallelism::Bounded(_)
570 | ConfigBackfillParallelism::Ratio(_) => Err(
571 "Only `default` or fixed backfill parallelism is supported here; adaptive backfill strategy is deferred to a later change.".to_owned(),
572 ),
573 }
574}
575
576impl SessionConfig {
577 pub fn set_force_two_phase_agg(
578 &mut self,
579 val: bool,
580 reporter: &mut impl ConfigReporter,
581 ) -> SessionConfigResult<bool> {
582 let set_val = self.set_force_two_phase_agg_inner(val, reporter)?;
583 if self.force_two_phase_agg {
584 self.set_enable_two_phase_agg(true, reporter)
585 } else {
586 Ok(set_val)
587 }
588 }
589
590 pub fn set_enable_two_phase_agg(
591 &mut self,
592 val: bool,
593 reporter: &mut impl ConfigReporter,
594 ) -> SessionConfigResult<bool> {
595 let set_val = self.set_enable_two_phase_agg_inner(val, reporter)?;
596 if !self.force_two_phase_agg {
597 self.set_force_two_phase_agg(false, reporter)
598 } else {
599 Ok(set_val)
600 }
601 }
602}
603
604pub struct VariableInfo {
605 pub name: String,
606 pub setting: String,
607 pub description: String,
608}
609
610pub trait ConfigReporter {
612 fn report_status(&mut self, key: &str, new_val: String);
613}
614
615impl ConfigReporter for () {
617 fn report_status(&mut self, _key: &str, _new_val: String) {}
618}
619
620def_anyhow_newtype! {
621 pub SessionConfigToOverrideError,
622 toml::ser::Error => "failed to serialize session config",
623 ConfigMergeError => transparent,
624}
625
626impl SessionConfig {
627 pub fn to_initial_streaming_config_override(
629 &self,
630 ) -> Result<String, SessionConfigToOverrideError> {
631 let mut table = toml::Table::new();
632
633 if let Some(v) = self.streaming_join_encoding.as_ref() {
636 table
637 .upsert("streaming.developer.join_encoding_type", v)
638 .unwrap();
639 }
640 if let Some(v) = self.streaming_sync_log_store_pause_duration_ms.as_ref() {
641 table
642 .upsert("streaming.developer.sync_log_store_pause_duration_ms", v)
643 .unwrap();
644 }
645 if let Some(v) = self.streaming_sync_log_store_buffer_size.as_ref() {
646 table
647 .upsert("streaming.developer.sync_log_store_buffer_size", v)
648 .unwrap();
649 }
650 if let Some(v) = self.streaming_over_window_cache_policy.as_ref() {
651 table
652 .upsert("streaming.developer.over_window_cache_policy", v)
653 .unwrap();
654 }
655
656 let res = toml::to_string(&table)?;
657
658 if !res.is_empty() {
660 let merged =
661 merge_streaming_config_section(&StreamingConfig::default(), res.as_str())?.unwrap();
662
663 let unrecognized_keys = merged.unrecognized_keys().collect_vec();
664 if !unrecognized_keys.is_empty() {
665 bail!("unrecognized configs: {:?}", unrecognized_keys);
666 }
667 }
668
669 Ok(res)
670 }
671}
672
673#[cfg(test)]
674mod test {
675 use expect_test::expect;
676
677 use super::*;
678
679 #[derive(SessionConfig)]
680 struct TestConfig {
681 #[parameter(default = 1, flags = "NO_ALTER_SYS", alias = "test_param_alias" | "alias_param_test")]
682 test_param: i32,
683 }
684
685 #[test]
686 fn test_session_config_alias() {
687 let mut config = TestConfig::default();
688 config.set("test_param", "2".to_owned(), &mut ()).unwrap();
689 assert_eq!(config.get("test_param_alias").unwrap(), "2");
690 config
691 .set("alias_param_test", "3".to_owned(), &mut ())
692 .unwrap();
693 assert_eq!(config.get("test_param_alias").unwrap(), "3");
694 assert!(TestConfig::check_no_alter_sys("test_param").unwrap());
695 }
696
697 #[test]
698 fn test_initial_streaming_config_override() {
699 let mut config = SessionConfig::default();
700 config
701 .set_streaming_join_encoding(Some(JoinEncodingType::Cpu).into(), &mut ())
702 .unwrap();
703 config
704 .set_streaming_over_window_cache_policy(
705 Some(OverWindowCachePolicy::RecentFirstN).into(),
706 &mut (),
707 )
708 .unwrap();
709
710 let override_str = config.to_initial_streaming_config_override().unwrap();
712 expect![[r#"
713 [streaming.developer]
714 join_encoding_type = "cpu_optimized"
715 over_window_cache_policy = "recent_first_n"
716 "#]]
717 .assert_eq(&override_str);
718
719 let merged = merge_streaming_config_section(&StreamingConfig::default(), &override_str)
721 .unwrap()
722 .unwrap();
723 assert_eq!(merged.developer.join_encoding_type, JoinEncodingType::Cpu);
724 assert_eq!(
725 merged.developer.over_window_cache_policy,
726 OverWindowCachePolicy::RecentFirstN
727 );
728 }
729
730 #[test]
731 fn test_streaming_parallelism_defaults() {
732 let config = SessionConfig::default();
733
734 assert_eq!(config.streaming_parallelism(), ConfigParallelism::Default);
735 assert_eq!(
736 config.streaming_parallelism_for_table(),
737 ConfigParallelism::Default
738 );
739 assert_eq!(
740 config.streaming_parallelism_for_source(),
741 ConfigParallelism::Default
742 );
743 assert_eq!(
744 config.streaming_parallelism_for_sink(),
745 ConfigParallelism::Default
746 );
747 assert_eq!(
748 config.streaming_parallelism_for_index(),
749 ConfigParallelism::Default
750 );
751 assert_eq!(
752 config.streaming_parallelism_for_materialized_view(),
753 ConfigParallelism::Default
754 );
755 }
756
757 #[test]
758 fn test_streaming_parallelism_default_round_trip() {
759 let mut config = SessionConfig::default();
760
761 assert_eq!(config.get("streaming_parallelism").unwrap(), "default");
762 assert_eq!(
763 config.get("streaming_parallelism_for_table").unwrap(),
764 "default"
765 );
766 assert_eq!(
767 config.get("streaming_parallelism_for_source").unwrap(),
768 "default"
769 );
770
771 config
772 .set("streaming_parallelism", "default".to_owned(), &mut ())
773 .unwrap();
774 assert_eq!(config.get("streaming_parallelism").unwrap(), "default");
775
776 config
777 .set("streaming_parallelism", "bounded(16)".to_owned(), &mut ())
778 .unwrap();
779 config
780 .set(
781 "streaming_parallelism_for_table",
782 "bounded(8)".to_owned(),
783 &mut (),
784 )
785 .unwrap();
786 config
787 .set(
788 "streaming_parallelism_for_source",
789 "bounded(8)".to_owned(),
790 &mut (),
791 )
792 .unwrap();
793
794 assert_eq!(
795 config.reset("streaming_parallelism", &mut ()).unwrap(),
796 "default"
797 );
798 assert_eq!(
799 config
800 .reset("streaming_parallelism_for_table", &mut ())
801 .unwrap(),
802 "default"
803 );
804 assert_eq!(
805 config
806 .reset("streaming_parallelism_for_source", &mut ())
807 .unwrap(),
808 "default"
809 );
810 }
811 #[test]
812 fn test_streaming_parallelism_for_backfill_accepts_default_and_fixed() {
813 let mut config = SessionConfig::default();
814
815 config
816 .set(
817 "streaming_parallelism_for_backfill",
818 "default".to_owned(),
819 &mut (),
820 )
821 .unwrap();
822 assert_eq!(
823 config.get("streaming_parallelism_for_backfill").unwrap(),
824 "default"
825 );
826
827 config
828 .set(
829 "streaming_parallelism_for_backfill",
830 "2".to_owned(),
831 &mut (),
832 )
833 .unwrap();
834 assert_eq!(config.streaming_parallelism_for_backfill().to_string(), "2");
835 }
836
837 #[test]
838 fn test_streaming_parallelism_for_backfill_rejects_adaptive_modes() {
839 let mut config = SessionConfig::default();
840 let expected = "Only `default` or fixed backfill parallelism is supported here; adaptive backfill strategy is deferred to a later change.";
841
842 for value in ["adaptive", "bounded(2)", "ratio(0.5)"] {
843 let err = config
844 .set(
845 "streaming_parallelism_for_backfill",
846 value.to_owned(),
847 &mut (),
848 )
849 .unwrap_err();
850
851 match err {
852 SessionConfigError::InvalidValue {
853 entry,
854 value: actual_value,
855 source,
856 } => {
857 assert_eq!(entry, "streaming_parallelism_for_backfill");
858 assert_eq!(actual_value, value);
859 assert_eq!(source.to_string(), expected);
860 }
861 other => panic!("unexpected error: {other:?}"),
862 }
863 }
864 }
865}