risingwave_common/config/
streaming.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use super::*;
16
17#[derive(Debug, Default, Clone, Copy, ValueEnum, Serialize, Deserialize)]
18pub enum AsyncStackTraceOption {
19    /// Disabled.
20    Off,
21    /// Enabled with basic instruments.
22    On,
23    /// Enabled with extra verbose instruments in release build.
24    /// Behaves the same as `on` in debug build due to performance concern.
25    #[default]
26    #[clap(alias = "verbose")]
27    ReleaseVerbose,
28}
29
30impl AsyncStackTraceOption {
31    pub fn is_verbose(self) -> Option<bool> {
32        match self {
33            Self::Off => None,
34            Self::On => Some(false),
35            Self::ReleaseVerbose => Some(!cfg!(debug_assertions)),
36        }
37    }
38}
39
40/// The section `[streaming]` in `risingwave.toml`.
41#[derive(Clone, Debug, Serialize, Deserialize, DefaultFromSerde, ConfigDoc)]
42pub struct StreamingConfig {
43    /// The maximum number of barriers in-flight in the compute nodes.
44    #[serde(default = "default::streaming::in_flight_barrier_nums")]
45    pub in_flight_barrier_nums: usize,
46
47    /// The thread number of the streaming actor runtime in the compute node. The default value is
48    /// decided by `tokio`.
49    #[serde(default)]
50    pub actor_runtime_worker_threads_num: Option<usize>,
51
52    /// Enable async stack tracing through `await-tree` for risectl.
53    #[serde(default = "default::streaming::async_stack_trace")]
54    pub async_stack_trace: AsyncStackTraceOption,
55
56    #[serde(default, with = "streaming_prefix")]
57    #[config_doc(omitted)]
58    pub developer: StreamingDeveloperConfig,
59
60    /// Max unique user stream errors per actor
61    #[serde(default = "default::streaming::unique_user_stream_errors")]
62    pub unique_user_stream_errors: usize,
63
64    /// Control the strictness of stream consistency.
65    #[serde(default = "default::streaming::unsafe_enable_strict_consistency")]
66    pub unsafe_enable_strict_consistency: bool,
67
68    #[serde(default, flatten)]
69    #[config_doc(omitted)]
70    pub unrecognized: Unrecognized<Self>,
71}
72
73serde_with::with_prefix!(streaming_prefix "stream_");
74
75/// The subsections `[streaming.developer]`.
76///
77/// It is put at [`StreamingConfig::developer`].
78#[derive(Clone, Debug, Serialize, Deserialize, DefaultFromSerde, ConfigDoc)]
79pub struct StreamingDeveloperConfig {
80    /// Set to true to enable per-executor row count metrics. This will produce a lot of timeseries
81    /// and might affect the prometheus performance. If you only need actor input and output
82    /// rows data, see `stream_actor_in_record_cnt` and `stream_actor_out_record_cnt` instead.
83    #[serde(default = "default::developer::stream_enable_executor_row_count")]
84    pub enable_executor_row_count: bool,
85
86    /// The capacity of the chunks in the channel that connects between `ConnectorSource` and
87    /// `SourceExecutor`.
88    #[serde(default = "default::developer::connector_message_buffer_size")]
89    pub connector_message_buffer_size: usize,
90
91    /// Limit number of the cached entries in an extreme aggregation call.
92    #[serde(default = "default::developer::unsafe_stream_extreme_cache_size")]
93    pub unsafe_extreme_cache_size: usize,
94
95    /// The maximum size of the chunk produced by executor at a time.
96    #[serde(default = "default::developer::stream_chunk_size")]
97    pub chunk_size: usize,
98
99    /// The initial permits that a channel holds, i.e., the maximum row count can be buffered in
100    /// the channel.
101    #[serde(default = "default::developer::stream_exchange_initial_permits")]
102    pub exchange_initial_permits: usize,
103
104    /// The permits that are batched to add back, for reducing the backward `AddPermits` messages
105    /// in remote exchange.
106    #[serde(default = "default::developer::stream_exchange_batched_permits")]
107    pub exchange_batched_permits: usize,
108
109    /// The maximum number of concurrent barriers in an exchange channel.
110    #[serde(default = "default::developer::stream_exchange_concurrent_barriers")]
111    pub exchange_concurrent_barriers: usize,
112
113    /// The concurrency for dispatching messages to different downstream jobs.
114    ///
115    /// - `1` means no concurrency, i.e., dispatch messages to downstream jobs one by one.
116    /// - `0` means unlimited concurrency.
117    #[serde(default = "default::developer::stream_exchange_concurrent_dispatchers")]
118    pub exchange_concurrent_dispatchers: usize,
119
120    /// The initial permits for a dml channel, i.e., the maximum row count can be buffered in
121    /// the channel.
122    #[serde(default = "default::developer::stream_dml_channel_initial_permits")]
123    pub dml_channel_initial_permits: usize,
124
125    /// The max heap size of dirty groups of `HashAggExecutor`.
126    #[serde(default = "default::developer::stream_hash_agg_max_dirty_groups_heap_size")]
127    pub hash_agg_max_dirty_groups_heap_size: usize,
128
129    #[serde(default = "default::developer::memory_controller_threshold_aggressive")]
130    pub memory_controller_threshold_aggressive: f64,
131
132    #[serde(default = "default::developer::memory_controller_threshold_graceful")]
133    pub memory_controller_threshold_graceful: f64,
134
135    #[serde(default = "default::developer::memory_controller_threshold_stable")]
136    pub memory_controller_threshold_stable: f64,
137
138    #[serde(default = "default::developer::memory_controller_eviction_factor_aggressive")]
139    pub memory_controller_eviction_factor_aggressive: f64,
140
141    #[serde(default = "default::developer::memory_controller_eviction_factor_graceful")]
142    pub memory_controller_eviction_factor_graceful: f64,
143
144    #[serde(default = "default::developer::memory_controller_eviction_factor_stable")]
145    pub memory_controller_eviction_factor_stable: f64,
146
147    #[serde(default = "default::developer::memory_controller_update_interval_ms")]
148    pub memory_controller_update_interval_ms: usize,
149
150    #[serde(default = "default::developer::memory_controller_sequence_tls_step")]
151    pub memory_controller_sequence_tls_step: u64,
152
153    #[serde(default = "default::developer::memory_controller_sequence_tls_lag")]
154    pub memory_controller_sequence_tls_lag: u64,
155
156    #[serde(default = "default::developer::stream_enable_arrangement_backfill")]
157    /// Enable arrangement backfill
158    /// If false, the arrangement backfill will be disabled,
159    /// even if session variable set.
160    /// If true, it's decided by session variable `streaming_use_arrangement_backfill` (default true)
161    pub enable_arrangement_backfill: bool,
162
163    #[serde(default = "default::developer::stream_high_join_amplification_threshold")]
164    /// If number of hash join matches exceeds this threshold number,
165    /// it will be logged.
166    pub high_join_amplification_threshold: usize,
167
168    /// Actor tokio metrics is enabled if `enable_actor_tokio_metrics` is set or metrics level >= Debug.
169    #[serde(default = "default::developer::enable_actor_tokio_metrics")]
170    pub enable_actor_tokio_metrics: bool,
171
172    /// The number of the connections for streaming remote exchange between two nodes.
173    /// If not specified, the value of `server.connection_pool_size` will be used.
174    #[serde(default = "default::developer::stream_exchange_connection_pool_size")]
175    pub(super) exchange_connection_pool_size: Option<u16>,
176
177    /// A flag to allow disabling the auto schema change handling
178    #[serde(default = "default::developer::stream_enable_auto_schema_change")]
179    pub enable_auto_schema_change: bool,
180
181    #[serde(default = "default::developer::enable_shared_source")]
182    /// Enable shared source
183    /// If false, the shared source will be disabled,
184    /// even if session variable set.
185    /// If true, it's decided by session variable `streaming_use_shared_source` (default true)
186    pub enable_shared_source: bool,
187
188    #[serde(default = "default::developer::switch_jdbc_pg_to_native")]
189    /// When true, all jdbc sinks with connector='jdbc' and jdbc.url="jdbc:postgresql://..."
190    /// will be switched from jdbc postgresql sinks to rust native (connector='postgres') sinks.
191    pub switch_jdbc_pg_to_native: bool,
192
193    /// The maximum number of consecutive barriers allowed in a message when sent between actors.
194    #[serde(default = "default::developer::stream_max_barrier_batch_size")]
195    pub max_barrier_batch_size: u32,
196
197    /// Configure the system-wide cache row cardinality of hash join.
198    /// For example, if this is set to 1000, it means we can have at most 1000 rows in cache.
199    #[serde(default = "default::developer::streaming_hash_join_entry_state_max_rows")]
200    pub hash_join_entry_state_max_rows: usize,
201
202    /// Enable / Disable profiling stats used by `EXPLAIN ANALYZE`
203    #[serde(default = "default::developer::enable_explain_analyze_stats")]
204    pub enable_explain_analyze_stats: bool,
205
206    #[serde(default)]
207    pub compute_client_config: RpcClientConfig,
208
209    /// `IcebergListExecutor`: The interval in seconds for Iceberg source to list new files.
210    #[serde(default = "default::developer::iceberg_list_interval_sec")]
211    pub iceberg_list_interval_sec: u64,
212
213    /// `IcebergFetchExecutor`: The number of files the executor will fetch concurrently in a batch.
214    #[serde(default = "default::developer::iceberg_fetch_batch_size")]
215    pub iceberg_fetch_batch_size: u64,
216
217    /// `IcebergSink`: The size of the cache for positional delete in the sink.
218    #[serde(default = "default::developer::iceberg_sink_positional_delete_cache_size")]
219    pub iceberg_sink_positional_delete_cache_size: usize,
220
221    /// `IcebergSink`: The maximum number of rows in a row group when writing Parquet files.
222    #[serde(default = "default::developer::iceberg_sink_write_parquet_max_row_group_rows")]
223    pub iceberg_sink_write_parquet_max_row_group_rows: usize,
224}
225
226pub mod default {
227    pub use crate::config::default::developer;
228
229    pub mod streaming {
230        use crate::config::AsyncStackTraceOption;
231
232        pub fn in_flight_barrier_nums() -> usize {
233            // quick fix
234            // TODO: remove this limitation from code
235            10000
236        }
237
238        pub fn async_stack_trace() -> AsyncStackTraceOption {
239            AsyncStackTraceOption::default()
240        }
241
242        pub fn unique_user_stream_errors() -> usize {
243            10
244        }
245
246        pub fn unsafe_enable_strict_consistency() -> bool {
247            true
248        }
249    }
250}