risingwave_common/config/
streaming.rs1use std::time::Duration;
16
17use risingwave_common_proc_macro::serde_prefix_all;
18
19use super::*;
20
21mod async_stack_trace;
22mod join_encoding_type;
23mod over_window;
24
25pub use async_stack_trace::*;
26pub use join_encoding_type::*;
27pub use over_window::*;
28
29#[serde_with::apply(Option => #[serde(with = "none_as_empty_string")])]
31#[derive(Clone, Debug, Serialize, Deserialize, DefaultFromSerde, ConfigDoc)]
32pub struct StreamingConfig {
33 #[serde(default = "default::streaming::in_flight_barrier_nums")]
35 pub in_flight_barrier_nums: usize,
36
37 #[serde(default)]
40 pub actor_runtime_worker_threads_num: Option<usize>,
41
42 #[serde(default = "default::streaming::async_stack_trace")]
44 pub async_stack_trace: AsyncStackTraceOption,
45
46 #[serde(default)]
47 #[config_doc(nested)]
48 pub developer: StreamingDeveloperConfig,
49
50 #[serde(default = "default::streaming::unique_user_stream_errors")]
52 pub unique_user_stream_errors: usize,
53
54 #[serde(default = "default::streaming::unsafe_disable_strict_consistency")]
56 pub unsafe_disable_strict_consistency: bool,
57
58 #[serde(default, flatten)]
59 #[config_doc(omitted)]
60 pub unrecognized: Unrecognized<Self>,
61}
62
63#[serde_prefix_all("stream_", mode = "alias")]
67#[serde_with::apply(Option => #[serde(with = "none_as_empty_string")])]
68#[derive(Clone, Debug, Serialize, Deserialize, DefaultFromSerde, ConfigDoc)]
69pub struct StreamingDeveloperConfig {
70 #[serde(default = "default::developer::stream_enable_executor_row_count")]
74 pub enable_executor_row_count: bool,
75
76 #[serde(default = "default::developer::connector_message_buffer_size")]
79 pub connector_message_buffer_size: usize,
80
81 #[serde(default = "default::developer::unsafe_stream_extreme_cache_size")]
83 pub unsafe_extreme_cache_size: usize,
84
85 #[serde(default = "default::developer::stream_topn_cache_min_capacity")]
87 pub topn_cache_min_capacity: usize,
88
89 #[serde(default = "default::developer::stream_chunk_size")]
91 pub chunk_size: usize,
92
93 #[serde(default = "default::developer::stream_exchange_initial_permits")]
96 pub exchange_initial_permits: usize,
97
98 #[serde(default = "default::developer::stream_exchange_batched_permits")]
101 pub exchange_batched_permits: usize,
102
103 #[serde(default = "default::developer::stream_exchange_concurrent_barriers")]
105 pub exchange_concurrent_barriers: usize,
106
107 #[serde(default = "default::developer::stream_exchange_concurrent_dispatchers")]
112 pub exchange_concurrent_dispatchers: usize,
113
114 #[serde(default = "default::developer::stream_dml_channel_initial_permits")]
117 pub dml_channel_initial_permits: usize,
118
119 #[serde(default = "default::developer::stream_hash_agg_max_dirty_groups_heap_size")]
121 pub hash_agg_max_dirty_groups_heap_size: usize,
122
123 #[serde(default = "default::developer::memory_controller_threshold_aggressive")]
124 pub memory_controller_threshold_aggressive: f64,
125
126 #[serde(default = "default::developer::memory_controller_threshold_graceful")]
127 pub memory_controller_threshold_graceful: f64,
128
129 #[serde(default = "default::developer::memory_controller_threshold_stable")]
130 pub memory_controller_threshold_stable: f64,
131
132 #[serde(default = "default::developer::memory_controller_eviction_factor_aggressive")]
133 pub memory_controller_eviction_factor_aggressive: f64,
134
135 #[serde(default = "default::developer::memory_controller_eviction_factor_graceful")]
136 pub memory_controller_eviction_factor_graceful: f64,
137
138 #[serde(default = "default::developer::memory_controller_eviction_factor_stable")]
139 pub memory_controller_eviction_factor_stable: f64,
140
141 #[serde(default = "default::developer::memory_controller_update_interval_ms")]
142 pub memory_controller_update_interval_ms: usize,
143
144 #[serde(default = "default::developer::memory_controller_sequence_tls_step")]
145 pub memory_controller_sequence_tls_step: u64,
146
147 #[serde(default = "default::developer::memory_controller_sequence_tls_lag")]
148 pub memory_controller_sequence_tls_lag: u64,
149
150 #[serde(default = "default::developer::stream_enable_arrangement_backfill")]
151 pub enable_arrangement_backfill: bool,
156
157 #[serde(default = "default::developer::stream_enable_snapshot_backfill")]
158 pub enable_snapshot_backfill: bool,
163
164 #[serde(default = "default::developer::stream_high_join_amplification_threshold")]
165 pub high_join_amplification_threshold: usize,
168
169 #[serde(default = "default::developer::stream_high_gap_fill_amplification_threshold")]
170 pub high_gap_fill_amplification_threshold: usize,
173
174 #[serde(default = "default::developer::enable_actor_tokio_metrics")]
176 pub enable_actor_tokio_metrics: bool,
177
178 #[serde(default = "default::developer::stream_exchange_connection_pool_size")]
181 pub(super) exchange_connection_pool_size: Option<u16>,
182
183 #[serde(default = "default::developer::stream_enable_auto_schema_change")]
185 pub enable_auto_schema_change: bool,
186
187 #[serde(default = "default::developer::enable_shared_source")]
188 pub enable_shared_source: bool,
193
194 #[serde(default = "default::developer::switch_jdbc_pg_to_native")]
195 pub switch_jdbc_pg_to_native: bool,
198
199 #[serde(default = "default::developer::stream_max_barrier_batch_size")]
201 pub max_barrier_batch_size: u32,
202
203 #[serde(default = "default::developer::streaming_hash_join_entry_state_max_rows")]
206 pub hash_join_entry_state_max_rows: usize,
207
208 #[serde(default = "default::developer::streaming_join_hash_map_evict_interval_rows")]
211 pub join_hash_map_evict_interval_rows: u32,
212
213 #[serde(default = "default::developer::streaming_now_progress_ratio")]
214 pub now_progress_ratio: Option<f32>,
215
216 #[serde(default = "default::developer::enable_explain_analyze_stats")]
218 pub enable_explain_analyze_stats: bool,
219
220 #[serde(default)]
221 pub compute_client_config: RpcClientConfig,
222
223 #[serde(default = "default::developer::stream_snapshot_iter_rebuild_interval_secs")]
225 pub snapshot_iter_rebuild_interval_secs: u64,
226
227 #[serde(default = "default::developer::iceberg_list_interval_sec")]
229 pub iceberg_list_interval_sec: u64,
230
231 #[serde(default = "default::developer::iceberg_fetch_batch_size")]
233 pub iceberg_fetch_batch_size: u64,
234
235 #[serde(default = "default::developer::iceberg_sink_positional_delete_cache_size")]
237 pub iceberg_sink_positional_delete_cache_size: usize,
238
239 #[serde(default = "default::developer::iceberg_sink_write_parquet_max_row_group_rows")]
241 pub iceberg_sink_write_parquet_max_row_group_rows: usize,
242
243 #[serde(default = "default::developer::materialize_force_overwrite_on_no_check")]
246 pub materialize_force_overwrite_on_no_check: bool,
247
248 #[serde(default = "default::streaming::default_enable_mem_preload_state_table")]
251 pub default_enable_mem_preload_state_table: bool,
252
253 #[serde(default)]
256 pub mem_preload_state_table_ids_whitelist: Vec<u32>,
257
258 #[serde(default)]
261 pub mem_preload_state_table_ids_blacklist: Vec<u32>,
262
263 #[serde(default)]
266 pub aggressive_noop_update_elimination: bool,
267
268 #[serde(default = "default::developer::refresh_scheduler_interval_sec")]
270 pub refresh_scheduler_interval_sec: u64,
271
272 #[serde(default)]
274 pub join_encoding_type: JoinEncodingType,
275
276 #[serde(default = "default::developer::sync_log_store_pause_duration_ms")]
280 pub sync_log_store_pause_duration_ms: usize,
281
282 #[serde(default = "default::developer::sync_log_store_buffer_size")]
284 pub sync_log_store_buffer_size: usize,
285
286 #[serde(default = "default::developer::disable_sync_log_store_dispatcher")]
288 pub disable_sync_log_store_dispatcher: bool,
289
290 #[serde(default)]
293 pub over_window_cache_policy: OverWindowCachePolicy,
294
295 #[serde(default = "default::developer::enable_state_table_vnode_stats_pruning")]
301 pub enable_state_table_vnode_stats_pruning: bool,
302
303 #[serde(default = "default::developer::enable_vnode_key_stats_for_materialize")]
305 pub enable_vnode_key_stats_for_materialize: bool,
306
307 #[serde(default = "default::developer::max_concurrent_kv_log_store_historical_read")]
312 pub max_concurrent_kv_log_store_historical_read: usize,
313
314 #[serde(default, flatten)]
315 #[serde_prefix_all(skip)]
316 #[config_doc(omitted)]
317 pub unrecognized: Unrecognized<Self>,
318}
319
320impl StreamingDeveloperConfig {
321 pub fn snapshot_iter_rebuild_interval(&self) -> Duration {
322 let rebuild_interval = if self.snapshot_iter_rebuild_interval_secs < 10 {
323 tracing::warn!(
324 "too small rebuild_interval {} second. rewrite to 10",
325 self.snapshot_iter_rebuild_interval_secs
326 );
327 10
328 } else {
329 self.snapshot_iter_rebuild_interval_secs
330 };
331 Duration::from_secs(rebuild_interval)
332 }
333}
334
335impl StreamingConfig {
336 pub fn unrecognized_keys(&self) -> impl Iterator<Item = String> {
338 std::iter::from_coroutine(
339 #[coroutine]
340 || {
341 for k in self.unrecognized.inner().keys() {
342 yield format!("streaming.{k}");
343 }
344 for k in self.developer.unrecognized.inner().keys() {
345 yield format!("streaming.developer.{k}");
346 }
347 },
348 )
349 }
350}
351
352pub mod default {
353 pub use crate::config::default::developer;
354
355 pub mod streaming {
356 use tracing::info;
357
358 use crate::config::AsyncStackTraceOption;
359 use crate::util::env_var::env_var_is_true;
360
361 pub fn in_flight_barrier_nums() -> usize {
362 10000
365 }
366
367 pub fn async_stack_trace() -> AsyncStackTraceOption {
368 AsyncStackTraceOption::default()
369 }
370
371 pub fn unique_user_stream_errors() -> usize {
372 10
373 }
374
375 pub fn unsafe_disable_strict_consistency() -> bool {
376 false
377 }
378
379 pub fn default_enable_mem_preload_state_table() -> bool {
380 if env_var_is_true("DEFAULT_ENABLE_MEM_PRELOAD_STATE_TABLE") {
381 info!("enabled mem_preload_state_table globally by env var");
382 true
383 } else {
384 false
385 }
386 }
387 }
388}