Skip to main content

risingwave_common/config/
streaming.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::time::Duration;
16
17use risingwave_common_proc_macro::serde_prefix_all;
18
19use super::*;
20
21mod async_stack_trace;
22mod join_encoding_type;
23mod over_window;
24
25pub use async_stack_trace::*;
26pub use join_encoding_type::*;
27pub use over_window::*;
28
29/// The section `[streaming]` in `risingwave.toml`.
30#[serde_with::apply(Option => #[serde(with = "none_as_empty_string")])]
31#[derive(Clone, Debug, Serialize, Deserialize, DefaultFromSerde, ConfigDoc)]
32pub struct StreamingConfig {
33    /// The maximum number of barriers in-flight in the compute nodes.
34    #[serde(default = "default::streaming::in_flight_barrier_nums")]
35    pub in_flight_barrier_nums: usize,
36
37    /// The thread number of the streaming actor runtime in the compute node. The default value is
38    /// decided by `tokio`.
39    #[serde(default)]
40    pub actor_runtime_worker_threads_num: Option<usize>,
41
42    /// Enable async stack tracing through `await-tree` for risectl.
43    #[serde(default = "default::streaming::async_stack_trace")]
44    pub async_stack_trace: AsyncStackTraceOption,
45
46    #[serde(default)]
47    #[config_doc(nested)]
48    pub developer: StreamingDeveloperConfig,
49
50    /// Max unique user stream errors per actor
51    #[serde(default = "default::streaming::unique_user_stream_errors")]
52    pub unique_user_stream_errors: usize,
53
54    /// Disable strict stream consistency checks.
55    #[serde(default = "default::streaming::unsafe_disable_strict_consistency")]
56    pub unsafe_disable_strict_consistency: bool,
57
58    #[serde(default, flatten)]
59    #[config_doc(omitted)]
60    pub unrecognized: Unrecognized<Self>,
61}
62
63/// The subsections `[streaming.developer]`.
64///
65/// It is put at [`StreamingConfig::developer`].
66#[serde_prefix_all("stream_", mode = "alias")]
67#[serde_with::apply(Option => #[serde(with = "none_as_empty_string")])]
68#[derive(Clone, Debug, Serialize, Deserialize, DefaultFromSerde, ConfigDoc)]
69pub struct StreamingDeveloperConfig {
70    /// Set to true to enable per-executor row count metrics. This will produce a lot of timeseries
71    /// and might affect the prometheus performance. If you only need actor input and output
72    /// rows data, see `stream_actor_in_record_cnt` and `stream_actor_out_record_cnt` instead.
73    #[serde(default = "default::developer::stream_enable_executor_row_count")]
74    pub enable_executor_row_count: bool,
75
76    /// The capacity of the chunks in the channel that connects between `ConnectorSource` and
77    /// `SourceExecutor`.
78    #[serde(default = "default::developer::connector_message_buffer_size")]
79    pub connector_message_buffer_size: usize,
80
81    /// Limit number of the cached entries in an extreme aggregation call.
82    #[serde(default = "default::developer::unsafe_stream_extreme_cache_size")]
83    pub unsafe_extreme_cache_size: usize,
84
85    /// Minimum cache size for TopN cache per group key.
86    #[serde(default = "default::developer::stream_topn_cache_min_capacity")]
87    pub topn_cache_min_capacity: usize,
88
89    /// The maximum size of the chunk produced by executor at a time.
90    #[serde(default = "default::developer::stream_chunk_size")]
91    pub chunk_size: usize,
92
93    /// The initial permits that a channel holds, i.e., the maximum row count can be buffered in
94    /// the channel.
95    #[serde(default = "default::developer::stream_exchange_initial_permits")]
96    pub exchange_initial_permits: usize,
97
98    /// The permits that are batched to add back, for reducing the backward `AddPermits` messages
99    /// in remote exchange.
100    #[serde(default = "default::developer::stream_exchange_batched_permits")]
101    pub exchange_batched_permits: usize,
102
103    /// The maximum number of concurrent barriers in an exchange channel.
104    #[serde(default = "default::developer::stream_exchange_concurrent_barriers")]
105    pub exchange_concurrent_barriers: usize,
106
107    /// The concurrency for dispatching messages to different downstream jobs.
108    ///
109    /// - `1` means no concurrency, i.e., dispatch messages to downstream jobs one by one.
110    /// - `0` means unlimited concurrency.
111    #[serde(default = "default::developer::stream_exchange_concurrent_dispatchers")]
112    pub exchange_concurrent_dispatchers: usize,
113
114    /// The initial permits for a dml channel, i.e., the maximum row count can be buffered in
115    /// the channel.
116    #[serde(default = "default::developer::stream_dml_channel_initial_permits")]
117    pub dml_channel_initial_permits: usize,
118
119    /// The max heap size of dirty groups of `HashAggExecutor`.
120    #[serde(default = "default::developer::stream_hash_agg_max_dirty_groups_heap_size")]
121    pub hash_agg_max_dirty_groups_heap_size: usize,
122
123    #[serde(default = "default::developer::memory_controller_threshold_aggressive")]
124    pub memory_controller_threshold_aggressive: f64,
125
126    #[serde(default = "default::developer::memory_controller_threshold_graceful")]
127    pub memory_controller_threshold_graceful: f64,
128
129    #[serde(default = "default::developer::memory_controller_threshold_stable")]
130    pub memory_controller_threshold_stable: f64,
131
132    #[serde(default = "default::developer::memory_controller_eviction_factor_aggressive")]
133    pub memory_controller_eviction_factor_aggressive: f64,
134
135    #[serde(default = "default::developer::memory_controller_eviction_factor_graceful")]
136    pub memory_controller_eviction_factor_graceful: f64,
137
138    #[serde(default = "default::developer::memory_controller_eviction_factor_stable")]
139    pub memory_controller_eviction_factor_stable: f64,
140
141    #[serde(default = "default::developer::memory_controller_update_interval_ms")]
142    pub memory_controller_update_interval_ms: usize,
143
144    #[serde(default = "default::developer::memory_controller_sequence_tls_step")]
145    pub memory_controller_sequence_tls_step: u64,
146
147    #[serde(default = "default::developer::memory_controller_sequence_tls_lag")]
148    pub memory_controller_sequence_tls_lag: u64,
149
150    #[serde(default = "default::developer::stream_enable_arrangement_backfill")]
151    /// Enable arrangement backfill
152    /// If false, the arrangement backfill will be disabled,
153    /// even if session variable set.
154    /// If true, it's decided by session variable `streaming_use_arrangement_backfill` (default true)
155    pub enable_arrangement_backfill: bool,
156
157    #[serde(default = "default::developer::stream_enable_snapshot_backfill")]
158    /// Enable snapshot backfill
159    /// If false, the snapshot backfill will be disabled,
160    /// even if session variable set.
161    /// If true, it's decided by session variable `streaming_use_snapshot_backfill` (default true)
162    pub enable_snapshot_backfill: bool,
163
164    #[serde(default = "default::developer::stream_high_join_amplification_threshold")]
165    /// If number of hash join matches exceeds this threshold number,
166    /// it will be logged.
167    pub high_join_amplification_threshold: usize,
168
169    #[serde(default = "default::developer::stream_high_gap_fill_amplification_threshold")]
170    /// If number of rows generated by gap fill between two anchor rows exceeds this threshold
171    /// number, it will be logged.
172    pub high_gap_fill_amplification_threshold: usize,
173
174    /// Actor tokio metrics is enabled if `enable_actor_tokio_metrics` is set or metrics level >= Debug.
175    #[serde(default = "default::developer::enable_actor_tokio_metrics")]
176    pub enable_actor_tokio_metrics: bool,
177
178    /// The number of the connections for streaming remote exchange between two nodes.
179    /// If not specified, the value of `server.connection_pool_size` will be used.
180    #[serde(default = "default::developer::stream_exchange_connection_pool_size")]
181    pub(super) exchange_connection_pool_size: Option<u16>,
182
183    /// A flag to allow disabling the auto schema change handling
184    #[serde(default = "default::developer::stream_enable_auto_schema_change")]
185    pub enable_auto_schema_change: bool,
186
187    #[serde(default = "default::developer::enable_shared_source")]
188    /// Enable shared source
189    /// If false, the shared source will be disabled,
190    /// even if session variable set.
191    /// If true, it's decided by session variable `streaming_use_shared_source` (default true)
192    pub enable_shared_source: bool,
193
194    #[serde(default = "default::developer::switch_jdbc_pg_to_native")]
195    /// When true, all jdbc sinks with connector='jdbc' and jdbc.url="jdbc:postgresql://..."
196    /// will be switched from jdbc postgresql sinks to rust native (connector='postgres') sinks.
197    pub switch_jdbc_pg_to_native: bool,
198
199    /// The maximum number of consecutive barriers allowed in a message when sent between actors.
200    #[serde(default = "default::developer::stream_max_barrier_batch_size")]
201    pub max_barrier_batch_size: u32,
202
203    /// Configure the system-wide cache row cardinality of hash join.
204    /// For example, if this is set to 1000, it means we can have at most 1000 rows in cache.
205    #[serde(default = "default::developer::streaming_hash_join_entry_state_max_rows")]
206    pub hash_join_entry_state_max_rows: usize,
207
208    /// Number of processed rows between periodic join cache evictions.
209    /// Values smaller than 1 will be clamped to 1 by the executor.
210    #[serde(default = "default::developer::streaming_join_hash_map_evict_interval_rows")]
211    pub join_hash_map_evict_interval_rows: u32,
212
213    #[serde(default = "default::developer::streaming_now_progress_ratio")]
214    pub now_progress_ratio: Option<f32>,
215
216    /// Enable / Disable profiling stats used by `EXPLAIN ANALYZE`
217    #[serde(default = "default::developer::enable_explain_analyze_stats")]
218    pub enable_explain_analyze_stats: bool,
219
220    #[serde(default)]
221    pub compute_client_config: RpcClientConfig,
222
223    /// The interval in seconds to rebuild snapshot iterators during snapshot backfill.
224    #[serde(default = "default::developer::stream_snapshot_iter_rebuild_interval_secs")]
225    pub snapshot_iter_rebuild_interval_secs: u64,
226
227    /// `IcebergListExecutor`: The interval in seconds for Iceberg source to list new files.
228    #[serde(default = "default::developer::iceberg_list_interval_sec")]
229    pub iceberg_list_interval_sec: u64,
230
231    /// `IcebergFetchExecutor`: The number of files the executor will fetch concurrently in a batch.
232    #[serde(default = "default::developer::iceberg_fetch_batch_size")]
233    pub iceberg_fetch_batch_size: u64,
234
235    /// `IcebergSink`: The size of the cache for positional delete in the sink.
236    #[serde(default = "default::developer::iceberg_sink_positional_delete_cache_size")]
237    pub iceberg_sink_positional_delete_cache_size: usize,
238
239    /// `IcebergSink`: The maximum number of rows in a row group when writing Parquet files.
240    #[serde(default = "default::developer::iceberg_sink_write_parquet_max_row_group_rows")]
241    pub iceberg_sink_write_parquet_max_row_group_rows: usize,
242
243    /// When enabled, materialized views using default `NoCheck` conflict behavior will be forced
244    /// to use `Overwrite`. Useful to avoid propagating inconsistent changelog downstream.
245    #[serde(default = "default::developer::materialize_force_overwrite_on_no_check")]
246    pub materialize_force_overwrite_on_no_check: bool,
247
248    /// Whether by default enable preloading all rows in memory for state table.
249    /// If true, all capable state tables will preload its state to memory
250    #[serde(default = "default::streaming::default_enable_mem_preload_state_table")]
251    pub default_enable_mem_preload_state_table: bool,
252
253    /// The list of state table ids to *enable* preloading all rows in memory for state table.
254    /// Only takes effect when `default_enable_mem_preload_state_table` is false.
255    #[serde(default)]
256    pub mem_preload_state_table_ids_whitelist: Vec<u32>,
257
258    /// The list of state table ids to *disable* preloading all rows in memory for state table.
259    /// Only takes effect when `default_enable_mem_preload_state_table` is true.
260    #[serde(default)]
261    pub mem_preload_state_table_ids_blacklist: Vec<u32>,
262
263    /// Eliminate unnecessary updates aggressively, even if it impacts performance. Enable this
264    /// only if it's confirmed that no-op updates are causing significant streaming amplification.
265    #[serde(default)]
266    pub aggressive_noop_update_elimination: bool,
267
268    /// The interval in seconds for the refresh scheduler to check and trigger scheduled refreshes.
269    #[serde(default = "default::developer::refresh_scheduler_interval_sec")]
270    pub refresh_scheduler_interval_sec: u64,
271
272    /// Determine which encoding will be used to encode join rows in operator cache.
273    #[serde(default)]
274    pub join_encoding_type: JoinEncodingType,
275
276    /// The timeout for reading from the buffer of the sync log store on barrier.
277    /// Every epoch we will attempt to read the full buffer of the sync log store.
278    /// If we hit the timeout, we will stop reading and continue.
279    #[serde(default = "default::developer::sync_log_store_pause_duration_ms")]
280    pub sync_log_store_pause_duration_ms: usize,
281
282    /// The max buffer size for sync logstore, before we start flushing.
283    #[serde(default = "default::developer::sync_log_store_buffer_size")]
284    pub sync_log_store_buffer_size: usize,
285
286    /// Disable the optimized dispatcher path for sync log store.
287    #[serde(default = "default::developer::disable_sync_log_store_dispatcher")]
288    pub disable_sync_log_store_dispatcher: bool,
289
290    /// Cache policy for partition cache in streaming over window.
291    /// Can be `full`, `recent`, `recent_first_n` or `recent_last_n`.
292    #[serde(default)]
293    pub over_window_cache_policy: OverWindowCachePolicy,
294
295    /// When enabled, vnode stats pruning is applied in production.
296    /// When disabled, vnode stats pruning is in dry-run mode: we still maintain vnode stats
297    /// and verify that pruning would be correct, but we don't actually use the pruning
298    /// results — we still use cache and storage to fulfill the read. This is useful for
299    /// validating the correctness of vnode stats pruning before enabling it in production.
300    #[serde(default = "default::developer::enable_state_table_vnode_stats_pruning")]
301    pub enable_state_table_vnode_stats_pruning: bool,
302
303    /// Whether `MaterializeExecutor` enables vnode key stats for its state table.
304    #[serde(default = "default::developer::enable_vnode_key_stats_for_materialize")]
305    pub enable_vnode_key_stats_for_materialize: bool,
306
307    /// The maximum number of kv log store readers that can concurrently read historical data
308    /// (i.e., from the state store) during initialization. A reader is considered "initializing"
309    /// until it has read at least one row from the historical stream or the stream returns empty.
310    /// Set to 0 to disable the limit (unlimited concurrency).
311    #[serde(default = "default::developer::max_concurrent_kv_log_store_historical_read")]
312    pub max_concurrent_kv_log_store_historical_read: usize,
313
314    #[serde(default, flatten)]
315    #[serde_prefix_all(skip)]
316    #[config_doc(omitted)]
317    pub unrecognized: Unrecognized<Self>,
318}
319
320impl StreamingDeveloperConfig {
321    pub fn snapshot_iter_rebuild_interval(&self) -> Duration {
322        let rebuild_interval = if self.snapshot_iter_rebuild_interval_secs < 10 {
323            tracing::warn!(
324                "too small rebuild_interval {} second. rewrite to 10",
325                self.snapshot_iter_rebuild_interval_secs
326            );
327            10
328        } else {
329            self.snapshot_iter_rebuild_interval_secs
330        };
331        Duration::from_secs(rebuild_interval)
332    }
333}
334
335impl StreamingConfig {
336    /// Returns the dot-separated keys of all unrecognized fields, including those in `developer` section.
337    pub fn unrecognized_keys(&self) -> impl Iterator<Item = String> {
338        std::iter::from_coroutine(
339            #[coroutine]
340            || {
341                for k in self.unrecognized.inner().keys() {
342                    yield format!("streaming.{k}");
343                }
344                for k in self.developer.unrecognized.inner().keys() {
345                    yield format!("streaming.developer.{k}");
346                }
347            },
348        )
349    }
350}
351
352pub mod default {
353    pub use crate::config::default::developer;
354
355    pub mod streaming {
356        use tracing::info;
357
358        use crate::config::AsyncStackTraceOption;
359        use crate::util::env_var::env_var_is_true;
360
361        pub fn in_flight_barrier_nums() -> usize {
362            // quick fix
363            // TODO: remove this limitation from code
364            10000
365        }
366
367        pub fn async_stack_trace() -> AsyncStackTraceOption {
368            AsyncStackTraceOption::default()
369        }
370
371        pub fn unique_user_stream_errors() -> usize {
372            10
373        }
374
375        pub fn unsafe_disable_strict_consistency() -> bool {
376            false
377        }
378
379        pub fn default_enable_mem_preload_state_table() -> bool {
380            if env_var_is_true("DEFAULT_ENABLE_MEM_PRELOAD_STATE_TABLE") {
381                info!("enabled mem_preload_state_table globally by env var");
382                true
383            } else {
384                false
385            }
386        }
387    }
388}