risingwave_common/config/
streaming.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common_proc_macro::serde_prefix_all;
16
17use super::*;
18
19mod async_stack_trace;
20mod join_encoding_type;
21mod over_window;
22
23pub use async_stack_trace::*;
24pub use join_encoding_type::*;
25pub use over_window::*;
26
27/// The section `[streaming]` in `risingwave.toml`.
28#[derive(Clone, Debug, Serialize, Deserialize, DefaultFromSerde, ConfigDoc)]
29pub struct StreamingConfig {
30    /// The maximum number of barriers in-flight in the compute nodes.
31    #[serde(default = "default::streaming::in_flight_barrier_nums")]
32    pub in_flight_barrier_nums: usize,
33
34    /// The thread number of the streaming actor runtime in the compute node. The default value is
35    /// decided by `tokio`.
36    #[serde(default)]
37    pub actor_runtime_worker_threads_num: Option<usize>,
38
39    /// Enable async stack tracing through `await-tree` for risectl.
40    #[serde(default = "default::streaming::async_stack_trace")]
41    pub async_stack_trace: AsyncStackTraceOption,
42
43    #[serde(default)]
44    #[config_doc(omitted)]
45    pub developer: StreamingDeveloperConfig,
46
47    /// Max unique user stream errors per actor
48    #[serde(default = "default::streaming::unique_user_stream_errors")]
49    pub unique_user_stream_errors: usize,
50
51    /// Control the strictness of stream consistency.
52    #[serde(default = "default::streaming::unsafe_enable_strict_consistency")]
53    pub unsafe_enable_strict_consistency: bool,
54
55    #[serde(default, flatten)]
56    #[config_doc(omitted)]
57    pub unrecognized: Unrecognized<Self>,
58}
59
60/// The subsections `[streaming.developer]`.
61///
62/// It is put at [`StreamingConfig::developer`].
63#[serde_prefix_all("stream_", mode = "alias")]
64#[derive(Clone, Debug, Serialize, Deserialize, DefaultFromSerde, ConfigDoc)]
65pub struct StreamingDeveloperConfig {
66    /// Set to true to enable per-executor row count metrics. This will produce a lot of timeseries
67    /// and might affect the prometheus performance. If you only need actor input and output
68    /// rows data, see `stream_actor_in_record_cnt` and `stream_actor_out_record_cnt` instead.
69    #[serde(default = "default::developer::stream_enable_executor_row_count")]
70    pub enable_executor_row_count: bool,
71
72    /// The capacity of the chunks in the channel that connects between `ConnectorSource` and
73    /// `SourceExecutor`.
74    #[serde(default = "default::developer::connector_message_buffer_size")]
75    pub connector_message_buffer_size: usize,
76
77    /// Limit number of the cached entries in an extreme aggregation call.
78    #[serde(default = "default::developer::unsafe_stream_extreme_cache_size")]
79    pub unsafe_extreme_cache_size: usize,
80
81    /// Minimum cache size for TopN cache per group key.
82    #[serde(default = "default::developer::stream_topn_cache_min_capacity")]
83    pub topn_cache_min_capacity: usize,
84
85    /// The maximum size of the chunk produced by executor at a time.
86    #[serde(default = "default::developer::stream_chunk_size")]
87    pub chunk_size: usize,
88
89    /// The initial permits that a channel holds, i.e., the maximum row count can be buffered in
90    /// the channel.
91    #[serde(default = "default::developer::stream_exchange_initial_permits")]
92    pub exchange_initial_permits: usize,
93
94    /// The permits that are batched to add back, for reducing the backward `AddPermits` messages
95    /// in remote exchange.
96    #[serde(default = "default::developer::stream_exchange_batched_permits")]
97    pub exchange_batched_permits: usize,
98
99    /// The maximum number of concurrent barriers in an exchange channel.
100    #[serde(default = "default::developer::stream_exchange_concurrent_barriers")]
101    pub exchange_concurrent_barriers: usize,
102
103    /// The concurrency for dispatching messages to different downstream jobs.
104    ///
105    /// - `1` means no concurrency, i.e., dispatch messages to downstream jobs one by one.
106    /// - `0` means unlimited concurrency.
107    #[serde(default = "default::developer::stream_exchange_concurrent_dispatchers")]
108    pub exchange_concurrent_dispatchers: usize,
109
110    /// The initial permits for a dml channel, i.e., the maximum row count can be buffered in
111    /// the channel.
112    #[serde(default = "default::developer::stream_dml_channel_initial_permits")]
113    pub dml_channel_initial_permits: usize,
114
115    /// The max heap size of dirty groups of `HashAggExecutor`.
116    #[serde(default = "default::developer::stream_hash_agg_max_dirty_groups_heap_size")]
117    pub hash_agg_max_dirty_groups_heap_size: usize,
118
119    #[serde(default = "default::developer::memory_controller_threshold_aggressive")]
120    pub memory_controller_threshold_aggressive: f64,
121
122    #[serde(default = "default::developer::memory_controller_threshold_graceful")]
123    pub memory_controller_threshold_graceful: f64,
124
125    #[serde(default = "default::developer::memory_controller_threshold_stable")]
126    pub memory_controller_threshold_stable: f64,
127
128    #[serde(default = "default::developer::memory_controller_eviction_factor_aggressive")]
129    pub memory_controller_eviction_factor_aggressive: f64,
130
131    #[serde(default = "default::developer::memory_controller_eviction_factor_graceful")]
132    pub memory_controller_eviction_factor_graceful: f64,
133
134    #[serde(default = "default::developer::memory_controller_eviction_factor_stable")]
135    pub memory_controller_eviction_factor_stable: f64,
136
137    #[serde(default = "default::developer::memory_controller_update_interval_ms")]
138    pub memory_controller_update_interval_ms: usize,
139
140    #[serde(default = "default::developer::memory_controller_sequence_tls_step")]
141    pub memory_controller_sequence_tls_step: u64,
142
143    #[serde(default = "default::developer::memory_controller_sequence_tls_lag")]
144    pub memory_controller_sequence_tls_lag: u64,
145
146    #[serde(default = "default::developer::stream_enable_arrangement_backfill")]
147    /// Enable arrangement backfill
148    /// If false, the arrangement backfill will be disabled,
149    /// even if session variable set.
150    /// If true, it's decided by session variable `streaming_use_arrangement_backfill` (default true)
151    pub enable_arrangement_backfill: bool,
152
153    #[serde(default = "default::developer::stream_high_join_amplification_threshold")]
154    /// If number of hash join matches exceeds this threshold number,
155    /// it will be logged.
156    pub high_join_amplification_threshold: usize,
157
158    /// Actor tokio metrics is enabled if `enable_actor_tokio_metrics` is set or metrics level >= Debug.
159    #[serde(default = "default::developer::enable_actor_tokio_metrics")]
160    pub enable_actor_tokio_metrics: bool,
161
162    /// The number of the connections for streaming remote exchange between two nodes.
163    /// If not specified, the value of `server.connection_pool_size` will be used.
164    #[serde(default = "default::developer::stream_exchange_connection_pool_size")]
165    pub(super) exchange_connection_pool_size: Option<u16>,
166
167    /// A flag to allow disabling the auto schema change handling
168    #[serde(default = "default::developer::stream_enable_auto_schema_change")]
169    pub enable_auto_schema_change: bool,
170
171    #[serde(default = "default::developer::enable_shared_source")]
172    /// Enable shared source
173    /// If false, the shared source will be disabled,
174    /// even if session variable set.
175    /// If true, it's decided by session variable `streaming_use_shared_source` (default true)
176    pub enable_shared_source: bool,
177
178    #[serde(default = "default::developer::switch_jdbc_pg_to_native")]
179    /// When true, all jdbc sinks with connector='jdbc' and jdbc.url="jdbc:postgresql://..."
180    /// will be switched from jdbc postgresql sinks to rust native (connector='postgres') sinks.
181    pub switch_jdbc_pg_to_native: bool,
182
183    /// The maximum number of consecutive barriers allowed in a message when sent between actors.
184    #[serde(default = "default::developer::stream_max_barrier_batch_size")]
185    pub max_barrier_batch_size: u32,
186
187    /// Configure the system-wide cache row cardinality of hash join.
188    /// For example, if this is set to 1000, it means we can have at most 1000 rows in cache.
189    #[serde(default = "default::developer::streaming_hash_join_entry_state_max_rows")]
190    pub hash_join_entry_state_max_rows: usize,
191
192    #[serde(default = "default::developer::streaming_now_progress_ratio")]
193    pub now_progress_ratio: Option<f32>,
194
195    /// Enable / Disable profiling stats used by `EXPLAIN ANALYZE`
196    #[serde(default = "default::developer::enable_explain_analyze_stats")]
197    pub enable_explain_analyze_stats: bool,
198
199    #[serde(default)]
200    pub compute_client_config: RpcClientConfig,
201
202    /// `IcebergListExecutor`: The interval in seconds for Iceberg source to list new files.
203    #[serde(default = "default::developer::iceberg_list_interval_sec")]
204    pub iceberg_list_interval_sec: u64,
205
206    /// `IcebergFetchExecutor`: The number of files the executor will fetch concurrently in a batch.
207    #[serde(default = "default::developer::iceberg_fetch_batch_size")]
208    pub iceberg_fetch_batch_size: u64,
209
210    /// `IcebergSink`: The size of the cache for positional delete in the sink.
211    #[serde(default = "default::developer::iceberg_sink_positional_delete_cache_size")]
212    pub iceberg_sink_positional_delete_cache_size: usize,
213
214    /// `IcebergSink`: The maximum number of rows in a row group when writing Parquet files.
215    #[serde(default = "default::developer::iceberg_sink_write_parquet_max_row_group_rows")]
216    pub iceberg_sink_write_parquet_max_row_group_rows: usize,
217
218    /// Whether by default enable preloading all rows in memory for state table.
219    /// If true, all capable state tables will preload its state to memory
220    #[serde(default = "default::streaming::default_enable_mem_preload_state_table")]
221    pub default_enable_mem_preload_state_table: bool,
222
223    /// The list of state table ids to *enable* preloading all rows in memory for state table.
224    /// Only takes effect when `default_enable_mem_preload_state_table` is false.
225    #[serde(default)]
226    pub mem_preload_state_table_ids_whitelist: Vec<u32>,
227
228    /// The list of state table ids to *disable* preloading all rows in memory for state table.
229    /// Only takes effect when `default_enable_mem_preload_state_table` is true.
230    #[serde(default)]
231    pub mem_preload_state_table_ids_blacklist: Vec<u32>,
232
233    /// Eliminate unnecessary updates aggressively, even if it impacts performance. Enable this
234    /// only if it's confirmed that no-op updates are causing significant streaming amplification.
235    #[serde(default)]
236    pub aggressive_noop_update_elimination: bool,
237
238    /// The interval in seconds for the refresh scheduler to check and trigger scheduled refreshes.
239    #[serde(default = "default::developer::refresh_scheduler_interval_sec")]
240    pub refresh_scheduler_interval_sec: u64,
241
242    /// Determine which encoding will be used to encode join rows in operator cache.
243    #[serde(default)]
244    pub join_encoding_type: JoinEncodingType,
245
246    /// The timeout for reading from the buffer of the sync log store on barrier.
247    /// Every epoch we will attempt to read the full buffer of the sync log store.
248    /// If we hit the timeout, we will stop reading and continue.
249    #[serde(default = "default::developer::sync_log_store_pause_duration_ms")]
250    pub sync_log_store_pause_duration_ms: usize,
251
252    /// The max buffer size for sync logstore, before we start flushing.
253    #[serde(default = "default::developer::sync_log_store_buffer_size")]
254    pub sync_log_store_buffer_size: usize,
255
256    /// Cache policy for partition cache in streaming over window.
257    /// Can be `full`, `recent`, `recent_first_n` or `recent_last_n`.
258    #[serde(default)]
259    pub over_window_cache_policy: OverWindowCachePolicy,
260
261    #[serde(default, flatten)]
262    #[serde_prefix_all(skip)]
263    #[config_doc(omitted)]
264    pub unrecognized: Unrecognized<Self>,
265}
266
267impl StreamingConfig {
268    /// Returns the dot-separated keys of all unrecognized fields, including those in `developer` section.
269    pub fn unrecognized_keys(&self) -> impl Iterator<Item = String> {
270        std::iter::from_coroutine(
271            #[coroutine]
272            || {
273                for k in self.unrecognized.inner().keys() {
274                    yield format!("streaming.{k}");
275                }
276                for k in self.developer.unrecognized.inner().keys() {
277                    yield format!("streaming.developer.{k}");
278                }
279            },
280        )
281    }
282}
283
284pub mod default {
285    pub use crate::config::default::developer;
286
287    pub mod streaming {
288        use tracing::info;
289
290        use crate::config::AsyncStackTraceOption;
291        use crate::util::env_var::env_var_is_true;
292
293        pub fn in_flight_barrier_nums() -> usize {
294            // quick fix
295            // TODO: remove this limitation from code
296            10000
297        }
298
299        pub fn async_stack_trace() -> AsyncStackTraceOption {
300            AsyncStackTraceOption::default()
301        }
302
303        pub fn unique_user_stream_errors() -> usize {
304            10
305        }
306
307        pub fn unsafe_enable_strict_consistency() -> bool {
308            true
309        }
310
311        pub fn default_enable_mem_preload_state_table() -> bool {
312            if env_var_is_true("DEFAULT_ENABLE_MEM_PRELOAD_STATE_TABLE") {
313                info!("enabled mem_preload_state_table globally by env var");
314                true
315            } else {
316                false
317            }
318        }
319    }
320}