risingwave_common/config/
streaming.rs1use super::*;
16
17#[derive(Debug, Default, Clone, Copy, ValueEnum, Serialize, Deserialize)]
18pub enum AsyncStackTraceOption {
19 Off,
21 On,
23 #[default]
26 #[clap(alias = "verbose")]
27 ReleaseVerbose,
28}
29
30impl AsyncStackTraceOption {
31 pub fn is_verbose(self) -> Option<bool> {
32 match self {
33 Self::Off => None,
34 Self::On => Some(false),
35 Self::ReleaseVerbose => Some(!cfg!(debug_assertions)),
36 }
37 }
38}
39
40#[derive(Clone, Debug, Serialize, Deserialize, DefaultFromSerde, ConfigDoc)]
42pub struct StreamingConfig {
43 #[serde(default = "default::streaming::in_flight_barrier_nums")]
45 pub in_flight_barrier_nums: usize,
46
47 #[serde(default)]
50 pub actor_runtime_worker_threads_num: Option<usize>,
51
52 #[serde(default = "default::streaming::async_stack_trace")]
54 pub async_stack_trace: AsyncStackTraceOption,
55
56 #[serde(default, with = "streaming_prefix")]
57 #[config_doc(omitted)]
58 pub developer: StreamingDeveloperConfig,
59
60 #[serde(default = "default::streaming::unique_user_stream_errors")]
62 pub unique_user_stream_errors: usize,
63
64 #[serde(default = "default::streaming::unsafe_enable_strict_consistency")]
66 pub unsafe_enable_strict_consistency: bool,
67
68 #[serde(default, flatten)]
69 #[config_doc(omitted)]
70 pub unrecognized: Unrecognized<Self>,
71}
72
73serde_with::with_prefix!(streaming_prefix "stream_");
74
75#[derive(Clone, Debug, Serialize, Deserialize, DefaultFromSerde, ConfigDoc)]
79pub struct StreamingDeveloperConfig {
80 #[serde(default = "default::developer::stream_enable_executor_row_count")]
84 pub enable_executor_row_count: bool,
85
86 #[serde(default = "default::developer::connector_message_buffer_size")]
89 pub connector_message_buffer_size: usize,
90
91 #[serde(default = "default::developer::unsafe_stream_extreme_cache_size")]
93 pub unsafe_extreme_cache_size: usize,
94
95 #[serde(default = "default::developer::stream_topn_cache_min_capacity")]
97 pub topn_cache_min_capacity: usize,
98
99 #[serde(default = "default::developer::stream_chunk_size")]
101 pub chunk_size: usize,
102
103 #[serde(default = "default::developer::stream_exchange_initial_permits")]
106 pub exchange_initial_permits: usize,
107
108 #[serde(default = "default::developer::stream_exchange_batched_permits")]
111 pub exchange_batched_permits: usize,
112
113 #[serde(default = "default::developer::stream_exchange_concurrent_barriers")]
115 pub exchange_concurrent_barriers: usize,
116
117 #[serde(default = "default::developer::stream_exchange_concurrent_dispatchers")]
122 pub exchange_concurrent_dispatchers: usize,
123
124 #[serde(default = "default::developer::stream_dml_channel_initial_permits")]
127 pub dml_channel_initial_permits: usize,
128
129 #[serde(default = "default::developer::stream_hash_agg_max_dirty_groups_heap_size")]
131 pub hash_agg_max_dirty_groups_heap_size: usize,
132
133 #[serde(default = "default::developer::memory_controller_threshold_aggressive")]
134 pub memory_controller_threshold_aggressive: f64,
135
136 #[serde(default = "default::developer::memory_controller_threshold_graceful")]
137 pub memory_controller_threshold_graceful: f64,
138
139 #[serde(default = "default::developer::memory_controller_threshold_stable")]
140 pub memory_controller_threshold_stable: f64,
141
142 #[serde(default = "default::developer::memory_controller_eviction_factor_aggressive")]
143 pub memory_controller_eviction_factor_aggressive: f64,
144
145 #[serde(default = "default::developer::memory_controller_eviction_factor_graceful")]
146 pub memory_controller_eviction_factor_graceful: f64,
147
148 #[serde(default = "default::developer::memory_controller_eviction_factor_stable")]
149 pub memory_controller_eviction_factor_stable: f64,
150
151 #[serde(default = "default::developer::memory_controller_update_interval_ms")]
152 pub memory_controller_update_interval_ms: usize,
153
154 #[serde(default = "default::developer::memory_controller_sequence_tls_step")]
155 pub memory_controller_sequence_tls_step: u64,
156
157 #[serde(default = "default::developer::memory_controller_sequence_tls_lag")]
158 pub memory_controller_sequence_tls_lag: u64,
159
160 #[serde(default = "default::developer::stream_enable_arrangement_backfill")]
161 pub enable_arrangement_backfill: bool,
166
167 #[serde(default = "default::developer::stream_high_join_amplification_threshold")]
168 pub high_join_amplification_threshold: usize,
171
172 #[serde(default = "default::developer::enable_actor_tokio_metrics")]
174 pub enable_actor_tokio_metrics: bool,
175
176 #[serde(default = "default::developer::stream_exchange_connection_pool_size")]
179 pub(super) exchange_connection_pool_size: Option<u16>,
180
181 #[serde(default = "default::developer::stream_enable_auto_schema_change")]
183 pub enable_auto_schema_change: bool,
184
185 #[serde(default = "default::developer::enable_shared_source")]
186 pub enable_shared_source: bool,
191
192 #[serde(default = "default::developer::switch_jdbc_pg_to_native")]
193 pub switch_jdbc_pg_to_native: bool,
196
197 #[serde(default = "default::developer::stream_max_barrier_batch_size")]
199 pub max_barrier_batch_size: u32,
200
201 #[serde(default = "default::developer::streaming_hash_join_entry_state_max_rows")]
204 pub hash_join_entry_state_max_rows: usize,
205
206 #[serde(default = "default::developer::streaming_now_progress_ratio")]
207 pub now_progress_ratio: Option<f32>,
208
209 #[serde(default = "default::developer::enable_explain_analyze_stats")]
211 pub enable_explain_analyze_stats: bool,
212
213 #[serde(default)]
214 pub compute_client_config: RpcClientConfig,
215
216 #[serde(default = "default::developer::iceberg_list_interval_sec")]
218 pub iceberg_list_interval_sec: u64,
219
220 #[serde(default = "default::developer::iceberg_fetch_batch_size")]
222 pub iceberg_fetch_batch_size: u64,
223
224 #[serde(default = "default::developer::iceberg_sink_positional_delete_cache_size")]
226 pub iceberg_sink_positional_delete_cache_size: usize,
227
228 #[serde(default = "default::developer::iceberg_sink_write_parquet_max_row_group_rows")]
230 pub iceberg_sink_write_parquet_max_row_group_rows: usize,
231
232 #[serde(default = "default::streaming::default_enable_mem_preload_state_table")]
235 pub default_enable_mem_preload_state_table: bool,
236
237 #[serde(default)]
240 pub mem_preload_state_table_ids_whitelist: Vec<u32>,
241
242 #[serde(default)]
245 pub mem_preload_state_table_ids_blacklist: Vec<u32>,
246
247 #[serde(default)]
250 pub aggressive_noop_update_elimination: bool,
251}
252
253pub mod default {
254 pub use crate::config::default::developer;
255
256 pub mod streaming {
257 use tracing::info;
258
259 use crate::config::AsyncStackTraceOption;
260 use crate::util::env_var::env_var_is_true;
261
262 pub fn in_flight_barrier_nums() -> usize {
263 10000
266 }
267
268 pub fn async_stack_trace() -> AsyncStackTraceOption {
269 AsyncStackTraceOption::default()
270 }
271
272 pub fn unique_user_stream_errors() -> usize {
273 10
274 }
275
276 pub fn unsafe_enable_strict_consistency() -> bool {
277 true
278 }
279
280 pub fn default_enable_mem_preload_state_table() -> bool {
281 if env_var_is_true("DEFAULT_ENABLE_MEM_PRELOAD_STATE_TABLE") {
282 info!("enabled mem_preload_state_table globally by env var");
283 true
284 } else {
285 false
286 }
287 }
288 }
289}