1mod iceberg_query_storage_mode;
16mod non_zero64;
17mod opt;
18pub mod parallelism;
19mod query_mode;
20mod search_path;
21pub mod sink_decouple;
22mod statement_timeout;
23mod transaction_isolation_level;
24mod visibility_mode;
25
26use chrono_tz::Tz;
27pub use iceberg_query_storage_mode::IcebergQueryStorageMode;
28use itertools::Itertools;
29pub use opt::OptionConfig;
30pub use query_mode::QueryMode;
31use risingwave_common_proc_macro::{ConfigDoc, SessionConfig};
32pub use search_path::{SearchPath, USER_NAME_WILD_CARD};
33use serde::{Deserialize, Serialize};
34pub use statement_timeout::StatementTimeout;
35use thiserror::Error;
36
37use self::non_zero64::ConfigNonZeroU64;
38use crate::config::mutate::TomlTableMutateExt;
39use crate::config::streaming::{JoinEncodingType, OverWindowCachePolicy};
40use crate::config::{ConfigMergeError, StreamingConfig, merge_streaming_config_section};
41use crate::hash::VirtualNode;
42use crate::session_config::parallelism::{ConfigBackfillParallelism, ConfigParallelism};
43use crate::session_config::sink_decouple::SinkDecouple;
44use crate::session_config::transaction_isolation_level::IsolationLevel;
45pub use crate::session_config::visibility_mode::VisibilityMode;
46use crate::{PG_VERSION, SERVER_ENCODING, SERVER_VERSION_NUM, STANDARD_CONFORMING_STRINGS};
47
48pub const SESSION_CONFIG_LIST_SEP: &str = ", ";
49
50#[derive(Error, Debug)]
51pub enum SessionConfigError {
52 #[error("Invalid value `{value}` for `{entry}`")]
53 InvalidValue {
54 entry: &'static str,
55 value: String,
56 source: anyhow::Error,
57 },
58
59 #[error("Unrecognized config entry `{0}`")]
60 UnrecognizedEntry(String),
61}
62
63type SessionConfigResult<T> = std::result::Result<T, SessionConfigError>;
64
65const DISABLE_BACKFILL_RATE_LIMIT: i32 = -1;
68const DISABLE_SOURCE_RATE_LIMIT: i32 = -1;
69const DISABLE_DML_RATE_LIMIT: i32 = -1;
70const DISABLE_SINK_RATE_LIMIT: i32 = -1;
71
72const BYPASS_CLUSTER_LIMITS: bool = cfg!(debug_assertions);
74
75#[serde_with::apply(_ => #[serde_as(as = "serde_with::DisplayFromStr")] )]
87#[serde_with::serde_as]
88#[derive(Clone, Debug, Deserialize, Serialize, SessionConfig, ConfigDoc, PartialEq)]
89pub struct SessionConfig {
90 #[parameter(default = false, alias = "rw_implicit_flush")]
94 implicit_flush: bool,
95
96 #[parameter(default = false)]
99 create_compaction_group_for_mv: bool,
100
101 #[parameter(default = QueryMode::default())]
105 query_mode: QueryMode,
106
107 #[parameter(default = IcebergQueryStorageMode::default())]
110 iceberg_query_storage_mode: IcebergQueryStorageMode,
111
112 #[parameter(default = 1)]
115 extra_float_digits: i32,
116
117 #[parameter(default = "", flags = "REPORT")]
120 application_name: String,
121
122 #[parameter(default = "", rename = "datestyle")]
125 date_style: String,
126
127 #[parameter(default = true, alias = "rw_batch_enable_lookup_join")]
129 batch_enable_lookup_join: bool,
130
131 #[parameter(default = true, alias = "rw_batch_enable_sort_agg")]
134 batch_enable_sort_agg: bool,
135
136 #[parameter(default = false, rename = "batch_enable_distributed_dml")]
139 batch_enable_distributed_dml: bool,
140
141 #[parameter(default = true)]
145 batch_expr_strict_mode: bool,
146
147 #[parameter(default = 8)]
149 max_split_range_gap: i32,
150
151 #[parameter(default = SearchPath::default())]
155 search_path: SearchPath,
156
157 #[parameter(default = VisibilityMode::default())]
159 visibility_mode: VisibilityMode,
160
161 #[parameter(default = IsolationLevel::default())]
163 transaction_isolation: IsolationLevel,
164
165 #[parameter(default = ConfigNonZeroU64::default())]
168 query_epoch: ConfigNonZeroU64,
169
170 #[parameter(default = "UTC", check_hook = check_timezone)]
172 timezone: String,
173
174 #[parameter(default = ConfigParallelism::Default)]
178 streaming_parallelism: ConfigParallelism,
179
180 #[parameter(
183 default = ConfigBackfillParallelism::Default,
184 check_hook = check_streaming_parallelism_for_backfill
185 )]
186 streaming_parallelism_for_backfill: ConfigBackfillParallelism,
187
188 #[parameter(default = ConfigParallelism::Default)]
192 streaming_parallelism_for_table: ConfigParallelism,
193
194 #[parameter(default = ConfigParallelism::Default)]
196 streaming_parallelism_for_sink: ConfigParallelism,
197
198 #[parameter(default = ConfigParallelism::Default)]
200 streaming_parallelism_for_index: ConfigParallelism,
201
202 #[parameter(default = ConfigParallelism::Default)]
206 streaming_parallelism_for_source: ConfigParallelism,
207
208 #[parameter(default = ConfigParallelism::Default)]
210 streaming_parallelism_for_materialized_view: ConfigParallelism,
211
212 #[parameter(default = false, alias = "rw_streaming_enable_delta_join")]
214 streaming_enable_delta_join: bool,
215
216 #[parameter(default = true, alias = "rw_streaming_enable_bushy_join")]
218 streaming_enable_bushy_join: bool,
219
220 #[parameter(default = false, alias = "rw_streaming_force_filter_inside_join")]
223 streaming_force_filter_inside_join: bool,
224
225 #[parameter(default = true)]
231 streaming_use_arrangement_backfill: bool,
232
233 #[parameter(default = true)]
234 streaming_use_snapshot_backfill: bool,
235
236 #[parameter(default = false)]
238 enable_serverless_backfill: bool,
239
240 #[parameter(default = false, alias = "rw_streaming_allow_jsonb_in_stream_key")]
242 streaming_allow_jsonb_in_stream_key: bool,
243
244 #[parameter(default = false)]
248 streaming_unsafe_allow_unmaterialized_impure_expr: bool,
249
250 #[parameter(default = false)]
252 streaming_separate_consecutive_join: bool,
253
254 #[parameter(default = false)]
256 streaming_separate_sink: bool,
257
258 #[parameter(default = None)]
263 streaming_join_encoding: OptionConfig<JoinEncodingType>,
264
265 #[parameter(default = true, alias = "rw_enable_join_ordering")]
267 enable_join_ordering: bool,
268
269 #[parameter(default = true, flags = "SETTER", alias = "rw_enable_two_phase_agg")]
272 enable_two_phase_agg: bool,
273
274 #[parameter(default = false, flags = "SETTER", alias = "rw_force_two_phase_agg")]
278 force_two_phase_agg: bool,
279
280 #[parameter(default = true, alias = "rw_enable_share_plan")]
283 enable_share_plan: bool,
285
286 #[parameter(default = false, alias = "rw_force_split_distinct_agg")]
288 force_split_distinct_agg: bool,
289
290 #[parameter(default = "", rename = "intervalstyle")]
292 interval_style: String,
293
294 #[parameter(default = ConfigNonZeroU64::default())]
296 batch_parallelism: ConfigNonZeroU64,
297
298 #[parameter(default = PG_VERSION)]
300 server_version: String,
301
302 #[parameter(default = SERVER_VERSION_NUM)]
304 server_version_num: i32,
305
306 #[parameter(default = "notice")]
308 client_min_messages: String,
309
310 #[parameter(default = SERVER_ENCODING, check_hook = check_client_encoding)]
312 client_encoding: String,
313
314 #[parameter(default = SinkDecouple::default())]
316 sink_decouple: SinkDecouple,
317
318 #[parameter(default = false)]
321 synchronize_seqscans: bool,
322
323 #[parameter(default = StatementTimeout::default())]
328 statement_timeout: StatementTimeout,
329
330 #[parameter(default = 60000u32)]
332 idle_in_transaction_session_timeout: u32,
333
334 #[parameter(default = 0)]
337 lock_timeout: i32,
338
339 #[parameter(default = 60)]
341 cdc_source_wait_streaming_start_timeout: i32,
342
343 #[parameter(default = true)]
346 row_security: bool,
347
348 #[parameter(default = STANDARD_CONFORMING_STRINGS)]
350 standard_conforming_strings: String,
351
352 #[parameter(default = DISABLE_BACKFILL_RATE_LIMIT)]
356 backfill_rate_limit: i32,
357
358 #[parameter(default = DISABLE_SOURCE_RATE_LIMIT)]
362 source_rate_limit: i32,
363
364 #[parameter(default = DISABLE_DML_RATE_LIMIT)]
368 dml_rate_limit: i32,
369
370 #[parameter(default = DISABLE_SINK_RATE_LIMIT)]
374 sink_rate_limit: i32,
375
376 #[parameter(default = None, alias = "rw_streaming_over_window_cache_policy")]
382 streaming_over_window_cache_policy: OptionConfig<OverWindowCachePolicy>,
383
384 #[parameter(default = false)]
386 background_ddl: bool,
387
388 #[parameter(default = true)]
393 streaming_use_shared_source: bool,
394
395 #[parameter(default = true)]
402 streaming_asof_join_use_cache: bool,
403
404 #[parameter(default = SERVER_ENCODING)]
406 server_encoding: String,
407
408 #[parameter(default = "hex", check_hook = check_bytea_output)]
409 bytea_output: String,
410
411 #[parameter(default = BYPASS_CLUSTER_LIMITS)]
415 bypass_cluster_limits: bool,
416
417 #[parameter(default = VirtualNode::COUNT_FOR_COMPAT, check_hook = check_streaming_max_parallelism)]
427 streaming_max_parallelism: usize,
428
429 #[parameter(default = "", check_hook = check_iceberg_engine_connection)]
432 iceberg_engine_connection: String,
433
434 #[parameter(default = false)]
436 streaming_enable_unaligned_join: bool,
437
438 #[parameter(default = None)]
445 streaming_sync_log_store_pause_duration_ms: OptionConfig<usize>,
446
447 #[parameter(default = None)]
452 streaming_sync_log_store_buffer_size: OptionConfig<usize>,
453
454 #[parameter(default = false, flags = "NO_ALTER_SYS")]
458 disable_purify_definition: bool,
459
460 #[parameter(default = 40_usize)] batch_hnsw_ef_search: usize,
463
464 #[parameter(default = true)]
466 enable_index_selection: bool,
467
468 #[parameter(default = false)]
470 enable_mv_selection: bool,
471
472 #[parameter(default = false)]
474 enable_locality_backfill: bool,
475
476 #[parameter(default = 30u32)]
479 slow_ddl_notification_secs: u32,
480
481 #[parameter(default = false)]
485 unsafe_enable_storage_retention_for_non_append_only_tables: bool,
486
487 #[parameter(default = true)]
490 enable_datafusion_engine: bool,
491
492 #[parameter(default = true)]
496 datafusion_prefer_hash_join: bool,
497
498 #[parameter(default = false)]
505 upsert_dml: bool,
506}
507
508fn check_iceberg_engine_connection(val: &str) -> Result<(), String> {
509 if val.is_empty() {
510 return Ok(());
511 }
512
513 let parts: Vec<&str> = val.split('.').collect();
514 if parts.len() != 2 {
515 return Err("Invalid iceberg engine connection format, Should be set to this format: schema_name.connection_name.".to_owned());
516 }
517
518 Ok(())
519}
520
521fn check_timezone(val: &str) -> Result<(), String> {
522 Tz::from_str_insensitive(val).map_err(|_e| "Not a valid timezone")?;
524 Ok(())
525}
526
527fn check_client_encoding(val: &str) -> Result<(), String> {
528 let clean = val.replace(|c: char| !c.is_ascii_alphanumeric(), "");
530 if !clean.eq_ignore_ascii_case("UTF8") {
531 Err("Only support 'UTF8' for CLIENT_ENCODING".to_owned())
532 } else {
533 Ok(())
534 }
535}
536
537fn check_bytea_output(val: &str) -> Result<(), String> {
538 if val == "hex" {
539 Ok(())
540 } else {
541 Err("Only support 'hex' for BYTEA_OUTPUT".to_owned())
542 }
543}
544
545fn check_streaming_max_parallelism(val: &usize) -> Result<(), String> {
547 match val {
548 0 | 1 => Err("STREAMING_MAX_PARALLELISM must be greater than 1".to_owned()),
551 2..=VirtualNode::MAX_COUNT => Ok(()),
552 _ => Err(format!(
553 "STREAMING_MAX_PARALLELISM must be less than or equal to {}",
554 VirtualNode::MAX_COUNT
555 )),
556 }
557}
558
559fn check_streaming_parallelism_for_backfill(val: &ConfigBackfillParallelism) -> Result<(), String> {
560 match val {
561 ConfigBackfillParallelism::Default | ConfigBackfillParallelism::Fixed(_) => Ok(()),
562 ConfigBackfillParallelism::Adaptive
563 | ConfigBackfillParallelism::Bounded(_)
564 | ConfigBackfillParallelism::Ratio(_) => Err(
565 "Only `default` or fixed backfill parallelism is supported here; adaptive backfill strategy is deferred to a later change.".to_owned(),
566 ),
567 }
568}
569
570impl SessionConfig {
571 pub fn set_force_two_phase_agg(
572 &mut self,
573 val: bool,
574 reporter: &mut impl ConfigReporter,
575 ) -> SessionConfigResult<bool> {
576 let set_val = self.set_force_two_phase_agg_inner(val, reporter)?;
577 if self.force_two_phase_agg {
578 self.set_enable_two_phase_agg(true, reporter)
579 } else {
580 Ok(set_val)
581 }
582 }
583
584 pub fn set_enable_two_phase_agg(
585 &mut self,
586 val: bool,
587 reporter: &mut impl ConfigReporter,
588 ) -> SessionConfigResult<bool> {
589 let set_val = self.set_enable_two_phase_agg_inner(val, reporter)?;
590 if !self.force_two_phase_agg {
591 self.set_force_two_phase_agg(false, reporter)
592 } else {
593 Ok(set_val)
594 }
595 }
596}
597
598pub struct VariableInfo {
599 pub name: String,
600 pub setting: String,
601 pub description: String,
602}
603
604pub trait ConfigReporter {
606 fn report_status(&mut self, key: &str, new_val: String);
607}
608
609impl ConfigReporter for () {
611 fn report_status(&mut self, _key: &str, _new_val: String) {}
612}
613
614def_anyhow_newtype! {
615 pub SessionConfigToOverrideError,
616 toml::ser::Error => "failed to serialize session config",
617 ConfigMergeError => transparent,
618}
619
620impl SessionConfig {
621 pub fn to_initial_streaming_config_override(
623 &self,
624 ) -> Result<String, SessionConfigToOverrideError> {
625 let mut table = toml::Table::new();
626
627 if let Some(v) = self.streaming_join_encoding.as_ref() {
630 table
631 .upsert("streaming.developer.join_encoding_type", v)
632 .unwrap();
633 }
634 if let Some(v) = self.streaming_sync_log_store_pause_duration_ms.as_ref() {
635 table
636 .upsert("streaming.developer.sync_log_store_pause_duration_ms", v)
637 .unwrap();
638 }
639 if let Some(v) = self.streaming_sync_log_store_buffer_size.as_ref() {
640 table
641 .upsert("streaming.developer.sync_log_store_buffer_size", v)
642 .unwrap();
643 }
644 if let Some(v) = self.streaming_over_window_cache_policy.as_ref() {
645 table
646 .upsert("streaming.developer.over_window_cache_policy", v)
647 .unwrap();
648 }
649
650 let res = toml::to_string(&table)?;
651
652 if !res.is_empty() {
654 let merged =
655 merge_streaming_config_section(&StreamingConfig::default(), res.as_str())?.unwrap();
656
657 let unrecognized_keys = merged.unrecognized_keys().collect_vec();
658 if !unrecognized_keys.is_empty() {
659 bail!("unrecognized configs: {:?}", unrecognized_keys);
660 }
661 }
662
663 Ok(res)
664 }
665}
666
667#[cfg(test)]
668mod test {
669 use expect_test::expect;
670
671 use super::*;
672
673 #[derive(SessionConfig)]
674 struct TestConfig {
675 #[parameter(default = 1, flags = "NO_ALTER_SYS", alias = "test_param_alias" | "alias_param_test")]
676 test_param: i32,
677 }
678
679 #[test]
680 fn test_session_config_alias() {
681 let mut config = TestConfig::default();
682 config.set("test_param", "2".to_owned(), &mut ()).unwrap();
683 assert_eq!(config.get("test_param_alias").unwrap(), "2");
684 config
685 .set("alias_param_test", "3".to_owned(), &mut ())
686 .unwrap();
687 assert_eq!(config.get("test_param_alias").unwrap(), "3");
688 assert!(TestConfig::check_no_alter_sys("test_param").unwrap());
689 }
690
691 #[test]
692 fn test_initial_streaming_config_override() {
693 let mut config = SessionConfig::default();
694 config
695 .set_streaming_join_encoding(Some(JoinEncodingType::Cpu).into(), &mut ())
696 .unwrap();
697 config
698 .set_streaming_over_window_cache_policy(
699 Some(OverWindowCachePolicy::RecentFirstN).into(),
700 &mut (),
701 )
702 .unwrap();
703
704 let override_str = config.to_initial_streaming_config_override().unwrap();
706 expect![[r#"
707 [streaming.developer]
708 join_encoding_type = "cpu_optimized"
709 over_window_cache_policy = "recent_first_n"
710 "#]]
711 .assert_eq(&override_str);
712
713 let merged = merge_streaming_config_section(&StreamingConfig::default(), &override_str)
715 .unwrap()
716 .unwrap();
717 assert_eq!(merged.developer.join_encoding_type, JoinEncodingType::Cpu);
718 assert_eq!(
719 merged.developer.over_window_cache_policy,
720 OverWindowCachePolicy::RecentFirstN
721 );
722 }
723
724 #[test]
725 fn test_streaming_parallelism_defaults() {
726 let config = SessionConfig::default();
727
728 assert_eq!(config.streaming_parallelism(), ConfigParallelism::Default);
729 assert_eq!(
730 config.streaming_parallelism_for_table(),
731 ConfigParallelism::Default
732 );
733 assert_eq!(
734 config.streaming_parallelism_for_source(),
735 ConfigParallelism::Default
736 );
737 assert_eq!(
738 config.streaming_parallelism_for_sink(),
739 ConfigParallelism::Default
740 );
741 assert_eq!(
742 config.streaming_parallelism_for_index(),
743 ConfigParallelism::Default
744 );
745 assert_eq!(
746 config.streaming_parallelism_for_materialized_view(),
747 ConfigParallelism::Default
748 );
749 }
750
751 #[test]
752 fn test_streaming_parallelism_default_round_trip() {
753 let mut config = SessionConfig::default();
754
755 assert_eq!(config.get("streaming_parallelism").unwrap(), "default");
756 assert_eq!(
757 config.get("streaming_parallelism_for_table").unwrap(),
758 "default"
759 );
760 assert_eq!(
761 config.get("streaming_parallelism_for_source").unwrap(),
762 "default"
763 );
764
765 config
766 .set("streaming_parallelism", "default".to_owned(), &mut ())
767 .unwrap();
768 assert_eq!(config.get("streaming_parallelism").unwrap(), "default");
769
770 config
771 .set("streaming_parallelism", "bounded(16)".to_owned(), &mut ())
772 .unwrap();
773 config
774 .set(
775 "streaming_parallelism_for_table",
776 "bounded(8)".to_owned(),
777 &mut (),
778 )
779 .unwrap();
780 config
781 .set(
782 "streaming_parallelism_for_source",
783 "bounded(8)".to_owned(),
784 &mut (),
785 )
786 .unwrap();
787
788 assert_eq!(
789 config.reset("streaming_parallelism", &mut ()).unwrap(),
790 "default"
791 );
792 assert_eq!(
793 config
794 .reset("streaming_parallelism_for_table", &mut ())
795 .unwrap(),
796 "default"
797 );
798 assert_eq!(
799 config
800 .reset("streaming_parallelism_for_source", &mut ())
801 .unwrap(),
802 "default"
803 );
804 }
805
806 #[test]
807 fn test_streaming_parallelism_for_backfill_accepts_default_and_fixed() {
808 let mut config = SessionConfig::default();
809
810 config
811 .set(
812 "streaming_parallelism_for_backfill",
813 "default".to_owned(),
814 &mut (),
815 )
816 .unwrap();
817 assert_eq!(
818 config.get("streaming_parallelism_for_backfill").unwrap(),
819 "default"
820 );
821
822 config
823 .set(
824 "streaming_parallelism_for_backfill",
825 "2".to_owned(),
826 &mut (),
827 )
828 .unwrap();
829 assert_eq!(config.streaming_parallelism_for_backfill().to_string(), "2");
830 }
831
832 #[test]
833 fn test_streaming_parallelism_for_backfill_rejects_adaptive_modes() {
834 let mut config = SessionConfig::default();
835 let expected = "Only `default` or fixed backfill parallelism is supported here; adaptive backfill strategy is deferred to a later change.";
836
837 for value in ["adaptive", "bounded(2)", "ratio(0.5)"] {
838 let err = config
839 .set(
840 "streaming_parallelism_for_backfill",
841 value.to_owned(),
842 &mut (),
843 )
844 .unwrap_err();
845
846 match err {
847 SessionConfigError::InvalidValue {
848 entry,
849 value: actual_value,
850 source,
851 } => {
852 assert_eq!(entry, "streaming_parallelism_for_backfill");
853 assert_eq!(actual_value, value);
854 assert_eq!(source.to_string(), expected);
855 }
856 other => panic!("unexpected error: {other:?}"),
857 }
858 }
859 }
860}