risingwave_common/config/
streaming.rs1use risingwave_common_proc_macro::serde_prefix_all;
16
17use super::*;
18
19mod async_stack_trace;
20mod join_encoding_type;
21mod over_window;
22
23pub use async_stack_trace::*;
24pub use join_encoding_type::*;
25pub use over_window::*;
26
27#[derive(Clone, Debug, Serialize, Deserialize, DefaultFromSerde, ConfigDoc)]
29pub struct StreamingConfig {
30 #[serde(default = "default::streaming::in_flight_barrier_nums")]
32 pub in_flight_barrier_nums: usize,
33
34 #[serde(default)]
37 pub actor_runtime_worker_threads_num: Option<usize>,
38
39 #[serde(default = "default::streaming::async_stack_trace")]
41 pub async_stack_trace: AsyncStackTraceOption,
42
43 #[serde(default)]
44 #[config_doc(omitted)]
45 pub developer: StreamingDeveloperConfig,
46
47 #[serde(default = "default::streaming::unique_user_stream_errors")]
49 pub unique_user_stream_errors: usize,
50
51 #[serde(default = "default::streaming::unsafe_enable_strict_consistency")]
53 pub unsafe_enable_strict_consistency: bool,
54
55 #[serde(default, flatten)]
56 #[config_doc(omitted)]
57 pub unrecognized: Unrecognized<Self>,
58}
59
60#[serde_prefix_all("stream_", mode = "alias")]
64#[derive(Clone, Debug, Serialize, Deserialize, DefaultFromSerde, ConfigDoc)]
65pub struct StreamingDeveloperConfig {
66 #[serde(default = "default::developer::stream_enable_executor_row_count")]
70 pub enable_executor_row_count: bool,
71
72 #[serde(default = "default::developer::connector_message_buffer_size")]
75 pub connector_message_buffer_size: usize,
76
77 #[serde(default = "default::developer::unsafe_stream_extreme_cache_size")]
79 pub unsafe_extreme_cache_size: usize,
80
81 #[serde(default = "default::developer::stream_topn_cache_min_capacity")]
83 pub topn_cache_min_capacity: usize,
84
85 #[serde(default = "default::developer::stream_chunk_size")]
87 pub chunk_size: usize,
88
89 #[serde(default = "default::developer::stream_exchange_initial_permits")]
92 pub exchange_initial_permits: usize,
93
94 #[serde(default = "default::developer::stream_exchange_batched_permits")]
97 pub exchange_batched_permits: usize,
98
99 #[serde(default = "default::developer::stream_exchange_concurrent_barriers")]
101 pub exchange_concurrent_barriers: usize,
102
103 #[serde(default = "default::developer::stream_exchange_concurrent_dispatchers")]
108 pub exchange_concurrent_dispatchers: usize,
109
110 #[serde(default = "default::developer::stream_dml_channel_initial_permits")]
113 pub dml_channel_initial_permits: usize,
114
115 #[serde(default = "default::developer::stream_hash_agg_max_dirty_groups_heap_size")]
117 pub hash_agg_max_dirty_groups_heap_size: usize,
118
119 #[serde(default = "default::developer::memory_controller_threshold_aggressive")]
120 pub memory_controller_threshold_aggressive: f64,
121
122 #[serde(default = "default::developer::memory_controller_threshold_graceful")]
123 pub memory_controller_threshold_graceful: f64,
124
125 #[serde(default = "default::developer::memory_controller_threshold_stable")]
126 pub memory_controller_threshold_stable: f64,
127
128 #[serde(default = "default::developer::memory_controller_eviction_factor_aggressive")]
129 pub memory_controller_eviction_factor_aggressive: f64,
130
131 #[serde(default = "default::developer::memory_controller_eviction_factor_graceful")]
132 pub memory_controller_eviction_factor_graceful: f64,
133
134 #[serde(default = "default::developer::memory_controller_eviction_factor_stable")]
135 pub memory_controller_eviction_factor_stable: f64,
136
137 #[serde(default = "default::developer::memory_controller_update_interval_ms")]
138 pub memory_controller_update_interval_ms: usize,
139
140 #[serde(default = "default::developer::memory_controller_sequence_tls_step")]
141 pub memory_controller_sequence_tls_step: u64,
142
143 #[serde(default = "default::developer::memory_controller_sequence_tls_lag")]
144 pub memory_controller_sequence_tls_lag: u64,
145
146 #[serde(default = "default::developer::stream_enable_arrangement_backfill")]
147 pub enable_arrangement_backfill: bool,
152
153 #[serde(default = "default::developer::stream_high_join_amplification_threshold")]
154 pub high_join_amplification_threshold: usize,
157
158 #[serde(default = "default::developer::enable_actor_tokio_metrics")]
160 pub enable_actor_tokio_metrics: bool,
161
162 #[serde(default = "default::developer::stream_exchange_connection_pool_size")]
165 pub(super) exchange_connection_pool_size: Option<u16>,
166
167 #[serde(default = "default::developer::stream_enable_auto_schema_change")]
169 pub enable_auto_schema_change: bool,
170
171 #[serde(default = "default::developer::enable_shared_source")]
172 pub enable_shared_source: bool,
177
178 #[serde(default = "default::developer::switch_jdbc_pg_to_native")]
179 pub switch_jdbc_pg_to_native: bool,
182
183 #[serde(default = "default::developer::stream_max_barrier_batch_size")]
185 pub max_barrier_batch_size: u32,
186
187 #[serde(default = "default::developer::streaming_hash_join_entry_state_max_rows")]
190 pub hash_join_entry_state_max_rows: usize,
191
192 #[serde(default = "default::developer::streaming_now_progress_ratio")]
193 pub now_progress_ratio: Option<f32>,
194
195 #[serde(default = "default::developer::enable_explain_analyze_stats")]
197 pub enable_explain_analyze_stats: bool,
198
199 #[serde(default)]
200 pub compute_client_config: RpcClientConfig,
201
202 #[serde(default = "default::developer::iceberg_list_interval_sec")]
204 pub iceberg_list_interval_sec: u64,
205
206 #[serde(default = "default::developer::iceberg_fetch_batch_size")]
208 pub iceberg_fetch_batch_size: u64,
209
210 #[serde(default = "default::developer::iceberg_sink_positional_delete_cache_size")]
212 pub iceberg_sink_positional_delete_cache_size: usize,
213
214 #[serde(default = "default::developer::iceberg_sink_write_parquet_max_row_group_rows")]
216 pub iceberg_sink_write_parquet_max_row_group_rows: usize,
217
218 #[serde(default = "default::streaming::default_enable_mem_preload_state_table")]
221 pub default_enable_mem_preload_state_table: bool,
222
223 #[serde(default)]
226 pub mem_preload_state_table_ids_whitelist: Vec<u32>,
227
228 #[serde(default)]
231 pub mem_preload_state_table_ids_blacklist: Vec<u32>,
232
233 #[serde(default)]
236 pub aggressive_noop_update_elimination: bool,
237
238 #[serde(default = "default::developer::refresh_scheduler_interval_sec")]
240 pub refresh_scheduler_interval_sec: u64,
241
242 #[serde(default)]
244 pub join_encoding_type: JoinEncodingType,
245
246 #[serde(default = "default::developer::sync_log_store_pause_duration_ms")]
250 pub sync_log_store_pause_duration_ms: usize,
251
252 #[serde(default = "default::developer::sync_log_store_buffer_size")]
254 pub sync_log_store_buffer_size: usize,
255
256 #[serde(default)]
259 pub over_window_cache_policy: OverWindowCachePolicy,
260
261 #[serde(default, flatten)]
262 #[serde_prefix_all(skip)]
263 #[config_doc(omitted)]
264 pub unrecognized: Unrecognized<Self>,
265}
266
267impl StreamingConfig {
268 pub fn unrecognized_keys(&self) -> impl Iterator<Item = String> {
270 std::iter::from_coroutine(
271 #[coroutine]
272 || {
273 for k in self.unrecognized.inner().keys() {
274 yield format!("streaming.{k}");
275 }
276 for k in self.developer.unrecognized.inner().keys() {
277 yield format!("streaming.developer.{k}");
278 }
279 },
280 )
281 }
282}
283
284pub mod default {
285 pub use crate::config::default::developer;
286
287 pub mod streaming {
288 use tracing::info;
289
290 use crate::config::AsyncStackTraceOption;
291 use crate::util::env_var::env_var_is_true;
292
293 pub fn in_flight_barrier_nums() -> usize {
294 10000
297 }
298
299 pub fn async_stack_trace() -> AsyncStackTraceOption {
300 AsyncStackTraceOption::default()
301 }
302
303 pub fn unique_user_stream_errors() -> usize {
304 10
305 }
306
307 pub fn unsafe_enable_strict_consistency() -> bool {
308 true
309 }
310
311 pub fn default_enable_mem_preload_state_table() -> bool {
312 if env_var_is_true("DEFAULT_ENABLE_MEM_PRELOAD_STATE_TABLE") {
313 info!("enabled mem_preload_state_table globally by env var");
314 true
315 } else {
316 false
317 }
318 }
319 }
320}