risingwave_common/config/
streaming.rs1use super::*;
16
17#[derive(Debug, Default, Clone, Copy, ValueEnum, Serialize, Deserialize)]
18pub enum AsyncStackTraceOption {
19 Off,
21 On,
23 #[default]
26 #[clap(alias = "verbose")]
27 ReleaseVerbose,
28}
29
30impl AsyncStackTraceOption {
31 pub fn is_verbose(self) -> Option<bool> {
32 match self {
33 Self::Off => None,
34 Self::On => Some(false),
35 Self::ReleaseVerbose => Some(!cfg!(debug_assertions)),
36 }
37 }
38}
39
40#[derive(Clone, Debug, Serialize, Deserialize, DefaultFromSerde, ConfigDoc)]
42pub struct StreamingConfig {
43 #[serde(default = "default::streaming::in_flight_barrier_nums")]
45 pub in_flight_barrier_nums: usize,
46
47 #[serde(default)]
50 pub actor_runtime_worker_threads_num: Option<usize>,
51
52 #[serde(default = "default::streaming::async_stack_trace")]
54 pub async_stack_trace: AsyncStackTraceOption,
55
56 #[serde(default, with = "streaming_prefix")]
57 #[config_doc(omitted)]
58 pub developer: StreamingDeveloperConfig,
59
60 #[serde(default = "default::streaming::unique_user_stream_errors")]
62 pub unique_user_stream_errors: usize,
63
64 #[serde(default = "default::streaming::unsafe_enable_strict_consistency")]
66 pub unsafe_enable_strict_consistency: bool,
67
68 #[serde(default, flatten)]
69 #[config_doc(omitted)]
70 pub unrecognized: Unrecognized<Self>,
71}
72
73serde_with::with_prefix!(streaming_prefix "stream_");
74
75#[derive(Clone, Debug, Serialize, Deserialize, DefaultFromSerde, ConfigDoc)]
79pub struct StreamingDeveloperConfig {
80 #[serde(default = "default::developer::stream_enable_executor_row_count")]
84 pub enable_executor_row_count: bool,
85
86 #[serde(default = "default::developer::connector_message_buffer_size")]
89 pub connector_message_buffer_size: usize,
90
91 #[serde(default = "default::developer::unsafe_stream_extreme_cache_size")]
93 pub unsafe_extreme_cache_size: usize,
94
95 #[serde(default = "default::developer::stream_chunk_size")]
97 pub chunk_size: usize,
98
99 #[serde(default = "default::developer::stream_exchange_initial_permits")]
102 pub exchange_initial_permits: usize,
103
104 #[serde(default = "default::developer::stream_exchange_batched_permits")]
107 pub exchange_batched_permits: usize,
108
109 #[serde(default = "default::developer::stream_exchange_concurrent_barriers")]
111 pub exchange_concurrent_barriers: usize,
112
113 #[serde(default = "default::developer::stream_exchange_concurrent_dispatchers")]
118 pub exchange_concurrent_dispatchers: usize,
119
120 #[serde(default = "default::developer::stream_dml_channel_initial_permits")]
123 pub dml_channel_initial_permits: usize,
124
125 #[serde(default = "default::developer::stream_hash_agg_max_dirty_groups_heap_size")]
127 pub hash_agg_max_dirty_groups_heap_size: usize,
128
129 #[serde(default = "default::developer::memory_controller_threshold_aggressive")]
130 pub memory_controller_threshold_aggressive: f64,
131
132 #[serde(default = "default::developer::memory_controller_threshold_graceful")]
133 pub memory_controller_threshold_graceful: f64,
134
135 #[serde(default = "default::developer::memory_controller_threshold_stable")]
136 pub memory_controller_threshold_stable: f64,
137
138 #[serde(default = "default::developer::memory_controller_eviction_factor_aggressive")]
139 pub memory_controller_eviction_factor_aggressive: f64,
140
141 #[serde(default = "default::developer::memory_controller_eviction_factor_graceful")]
142 pub memory_controller_eviction_factor_graceful: f64,
143
144 #[serde(default = "default::developer::memory_controller_eviction_factor_stable")]
145 pub memory_controller_eviction_factor_stable: f64,
146
147 #[serde(default = "default::developer::memory_controller_update_interval_ms")]
148 pub memory_controller_update_interval_ms: usize,
149
150 #[serde(default = "default::developer::memory_controller_sequence_tls_step")]
151 pub memory_controller_sequence_tls_step: u64,
152
153 #[serde(default = "default::developer::memory_controller_sequence_tls_lag")]
154 pub memory_controller_sequence_tls_lag: u64,
155
156 #[serde(default = "default::developer::stream_enable_arrangement_backfill")]
157 pub enable_arrangement_backfill: bool,
162
163 #[serde(default = "default::developer::stream_high_join_amplification_threshold")]
164 pub high_join_amplification_threshold: usize,
167
168 #[serde(default = "default::developer::enable_actor_tokio_metrics")]
170 pub enable_actor_tokio_metrics: bool,
171
172 #[serde(default = "default::developer::stream_exchange_connection_pool_size")]
175 pub(super) exchange_connection_pool_size: Option<u16>,
176
177 #[serde(default = "default::developer::stream_enable_auto_schema_change")]
179 pub enable_auto_schema_change: bool,
180
181 #[serde(default = "default::developer::enable_shared_source")]
182 pub enable_shared_source: bool,
187
188 #[serde(default = "default::developer::switch_jdbc_pg_to_native")]
189 pub switch_jdbc_pg_to_native: bool,
192
193 #[serde(default = "default::developer::stream_max_barrier_batch_size")]
195 pub max_barrier_batch_size: u32,
196
197 #[serde(default = "default::developer::streaming_hash_join_entry_state_max_rows")]
200 pub hash_join_entry_state_max_rows: usize,
201
202 #[serde(default = "default::developer::enable_explain_analyze_stats")]
204 pub enable_explain_analyze_stats: bool,
205
206 #[serde(default)]
207 pub compute_client_config: RpcClientConfig,
208
209 #[serde(default = "default::developer::iceberg_list_interval_sec")]
211 pub iceberg_list_interval_sec: u64,
212
213 #[serde(default = "default::developer::iceberg_fetch_batch_size")]
215 pub iceberg_fetch_batch_size: u64,
216
217 #[serde(default = "default::developer::iceberg_sink_positional_delete_cache_size")]
219 pub iceberg_sink_positional_delete_cache_size: usize,
220
221 #[serde(default = "default::developer::iceberg_sink_write_parquet_max_row_group_rows")]
223 pub iceberg_sink_write_parquet_max_row_group_rows: usize,
224}
225
226pub mod default {
227 pub use crate::config::default::developer;
228
229 pub mod streaming {
230 use crate::config::AsyncStackTraceOption;
231
232 pub fn in_flight_barrier_nums() -> usize {
233 10000
236 }
237
238 pub fn async_stack_trace() -> AsyncStackTraceOption {
239 AsyncStackTraceOption::default()
240 }
241
242 pub fn unique_user_stream_errors() -> usize {
243 10
244 }
245
246 pub fn unsafe_enable_strict_consistency() -> bool {
247 true
248 }
249 }
250}