risingwave_common/config/
streaming.rs1use super::*;
16
17#[derive(Debug, Default, Clone, Copy, ValueEnum, Serialize, Deserialize)]
18pub enum AsyncStackTraceOption {
19 Off,
21 On,
23 #[default]
26 #[clap(alias = "verbose")]
27 ReleaseVerbose,
28}
29
30impl AsyncStackTraceOption {
31 pub fn is_verbose(self) -> Option<bool> {
32 match self {
33 Self::Off => None,
34 Self::On => Some(false),
35 Self::ReleaseVerbose => Some(!cfg!(debug_assertions)),
36 }
37 }
38}
39
40#[derive(Clone, Debug, Serialize, Deserialize, DefaultFromSerde, ConfigDoc)]
42pub struct StreamingConfig {
43 #[serde(default = "default::streaming::in_flight_barrier_nums")]
45 pub in_flight_barrier_nums: usize,
46
47 #[serde(default)]
50 pub actor_runtime_worker_threads_num: Option<usize>,
51
52 #[serde(default = "default::streaming::async_stack_trace")]
54 pub async_stack_trace: AsyncStackTraceOption,
55
56 #[serde(default, with = "streaming_prefix")]
57 #[config_doc(omitted)]
58 pub developer: StreamingDeveloperConfig,
59
60 #[serde(default = "default::streaming::unique_user_stream_errors")]
62 pub unique_user_stream_errors: usize,
63
64 #[serde(default = "default::streaming::unsafe_enable_strict_consistency")]
66 pub unsafe_enable_strict_consistency: bool,
67
68 #[serde(default, flatten)]
69 #[config_doc(omitted)]
70 pub unrecognized: Unrecognized<Self>,
71}
72
73serde_with::with_prefix!(streaming_prefix "stream_");
74
75#[derive(Clone, Debug, Serialize, Deserialize, DefaultFromSerde, ConfigDoc)]
79pub struct StreamingDeveloperConfig {
80 #[serde(default = "default::developer::stream_enable_executor_row_count")]
84 pub enable_executor_row_count: bool,
85
86 #[serde(default = "default::developer::connector_message_buffer_size")]
89 pub connector_message_buffer_size: usize,
90
91 #[serde(default = "default::developer::unsafe_stream_extreme_cache_size")]
93 pub unsafe_extreme_cache_size: usize,
94
95 #[serde(default = "default::developer::stream_chunk_size")]
97 pub chunk_size: usize,
98
99 #[serde(default = "default::developer::stream_exchange_initial_permits")]
102 pub exchange_initial_permits: usize,
103
104 #[serde(default = "default::developer::stream_exchange_batched_permits")]
107 pub exchange_batched_permits: usize,
108
109 #[serde(default = "default::developer::stream_exchange_concurrent_barriers")]
111 pub exchange_concurrent_barriers: usize,
112
113 #[serde(default = "default::developer::stream_exchange_concurrent_dispatchers")]
118 pub exchange_concurrent_dispatchers: usize,
119
120 #[serde(default = "default::developer::stream_dml_channel_initial_permits")]
123 pub dml_channel_initial_permits: usize,
124
125 #[serde(default = "default::developer::stream_hash_agg_max_dirty_groups_heap_size")]
127 pub hash_agg_max_dirty_groups_heap_size: usize,
128
129 #[serde(default = "default::developer::memory_controller_threshold_aggressive")]
130 pub memory_controller_threshold_aggressive: f64,
131
132 #[serde(default = "default::developer::memory_controller_threshold_graceful")]
133 pub memory_controller_threshold_graceful: f64,
134
135 #[serde(default = "default::developer::memory_controller_threshold_stable")]
136 pub memory_controller_threshold_stable: f64,
137
138 #[serde(default = "default::developer::memory_controller_eviction_factor_aggressive")]
139 pub memory_controller_eviction_factor_aggressive: f64,
140
141 #[serde(default = "default::developer::memory_controller_eviction_factor_graceful")]
142 pub memory_controller_eviction_factor_graceful: f64,
143
144 #[serde(default = "default::developer::memory_controller_eviction_factor_stable")]
145 pub memory_controller_eviction_factor_stable: f64,
146
147 #[serde(default = "default::developer::memory_controller_update_interval_ms")]
148 pub memory_controller_update_interval_ms: usize,
149
150 #[serde(default = "default::developer::memory_controller_sequence_tls_step")]
151 pub memory_controller_sequence_tls_step: u64,
152
153 #[serde(default = "default::developer::memory_controller_sequence_tls_lag")]
154 pub memory_controller_sequence_tls_lag: u64,
155
156 #[serde(default = "default::developer::stream_enable_arrangement_backfill")]
157 pub enable_arrangement_backfill: bool,
162
163 #[serde(default = "default::developer::stream_high_join_amplification_threshold")]
164 pub high_join_amplification_threshold: usize,
167
168 #[serde(default = "default::developer::enable_actor_tokio_metrics")]
170 pub enable_actor_tokio_metrics: bool,
171
172 #[serde(default = "default::developer::stream_exchange_connection_pool_size")]
175 pub(super) exchange_connection_pool_size: Option<u16>,
176
177 #[serde(default = "default::developer::stream_enable_auto_schema_change")]
179 pub enable_auto_schema_change: bool,
180
181 #[serde(default = "default::developer::enable_shared_source")]
182 pub enable_shared_source: bool,
187
188 #[serde(default = "default::developer::switch_jdbc_pg_to_native")]
189 pub switch_jdbc_pg_to_native: bool,
192
193 #[serde(default = "default::developer::stream_max_barrier_batch_size")]
195 pub max_barrier_batch_size: u32,
196
197 #[serde(default = "default::developer::streaming_hash_join_entry_state_max_rows")]
200 pub hash_join_entry_state_max_rows: usize,
201
202 #[serde(default = "default::developer::streaming_now_progress_ratio")]
203 pub now_progress_ratio: Option<f32>,
204
205 #[serde(default = "default::developer::enable_explain_analyze_stats")]
207 pub enable_explain_analyze_stats: bool,
208
209 #[serde(default)]
210 pub compute_client_config: RpcClientConfig,
211
212 #[serde(default = "default::developer::iceberg_list_interval_sec")]
214 pub iceberg_list_interval_sec: u64,
215
216 #[serde(default = "default::developer::iceberg_fetch_batch_size")]
218 pub iceberg_fetch_batch_size: u64,
219
220 #[serde(default = "default::developer::iceberg_sink_positional_delete_cache_size")]
222 pub iceberg_sink_positional_delete_cache_size: usize,
223
224 #[serde(default = "default::developer::iceberg_sink_write_parquet_max_row_group_rows")]
226 pub iceberg_sink_write_parquet_max_row_group_rows: usize,
227
228 #[serde(default = "default::streaming::default_enable_mem_preload_state_table")]
231 pub default_enable_mem_preload_state_table: bool,
232
233 #[serde(default)]
236 pub mem_preload_state_table_ids_whitelist: Vec<u32>,
237
238 #[serde(default)]
241 pub mem_preload_state_table_ids_blacklist: Vec<u32>,
242}
243
244pub mod default {
245 pub use crate::config::default::developer;
246
247 pub mod streaming {
248 use tracing::info;
249
250 use crate::config::AsyncStackTraceOption;
251 use crate::util::env_var::env_var_is_true;
252
253 pub fn in_flight_barrier_nums() -> usize {
254 10000
257 }
258
259 pub fn async_stack_trace() -> AsyncStackTraceOption {
260 AsyncStackTraceOption::default()
261 }
262
263 pub fn unique_user_stream_errors() -> usize {
264 10
265 }
266
267 pub fn unsafe_enable_strict_consistency() -> bool {
268 true
269 }
270
271 pub fn default_enable_mem_preload_state_table() -> bool {
272 if env_var_is_true("DEFAULT_ENABLE_MEM_PRELOAD_STATE_TABLE") {
273 info!("enabled mem_preload_state_table globally by env var");
274 true
275 } else {
276 false
277 }
278 }
279 }
280}