risingwave_common/config/
batch.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common_proc_macro::serde_prefix_all;
16
17use super::*;
18
19/// The section `[batch]` in `risingwave.toml`.
20#[serde_with::apply(Option => #[serde(with = "none_as_empty_string")])]
21#[derive(Clone, Debug, Serialize, Deserialize, DefaultFromSerde, ConfigDoc)]
22pub struct BatchConfig {
23    /// The thread number of the batch task runtime in the compute node. The default value is
24    /// decided by `tokio`.
25    #[serde(default)]
26    pub worker_threads_num: Option<usize>,
27
28    #[serde(default)]
29    #[config_doc(nested)]
30    pub developer: BatchDeveloperConfig,
31
32    /// This is the max number of queries per sql session.
33    #[serde(default)]
34    pub distributed_query_limit: Option<u64>,
35
36    /// This is the max number of batch queries per frontend node.
37    #[serde(default)]
38    pub max_batch_queries_per_frontend_node: Option<u64>,
39
40    #[serde(default = "default::batch::enable_barrier_read")]
41    pub enable_barrier_read: bool,
42
43    /// Timeout for a batch query in seconds.
44    #[serde(default = "default::batch::statement_timeout_in_sec")]
45    pub statement_timeout_in_sec: u32,
46
47    #[serde(default, flatten)]
48    #[config_doc(omitted)]
49    pub unrecognized: Unrecognized<Self>,
50
51    #[serde(default)]
52    /// frontend compute runtime worker threads
53    pub frontend_compute_runtime_worker_threads: Option<usize>,
54
55    /// This is the secs used to mask a worker unavailable temporarily.
56    #[serde(default = "default::batch::mask_worker_temporary_secs")]
57    pub mask_worker_temporary_secs: usize,
58
59    /// Keywords on which SQL option redaction is based in the query log.
60    /// A SQL option with a name containing any of these keywords will be redacted.
61    #[serde(default = "default::batch::redact_sql_option_keywords")]
62    pub redact_sql_option_keywords: Vec<String>,
63
64    /// Enable the spill out to disk feature for batch queries.
65    #[serde(default = "default::batch::enable_spill")]
66    pub enable_spill: bool,
67}
68
69/// The subsections `[batch.developer]`.
70///
71/// It is put at [`BatchConfig::developer`].
72#[serde_prefix_all("batch_", mode = "alias")]
73#[serde_with::apply(Option => #[serde(with = "none_as_empty_string")])]
74#[derive(Clone, Debug, Serialize, Deserialize, DefaultFromSerde, ConfigDoc)]
75pub struct BatchDeveloperConfig {
76    /// The capacity of the chunks in the channel that connects between `ConnectorSource` and
77    /// `SourceExecutor`.
78    #[serde(default = "default::developer::connector_message_buffer_size")]
79    pub connector_message_buffer_size: usize,
80
81    /// The size of the channel used for output to exchange/shuffle.
82    #[serde(default = "default::developer::batch_output_channel_size")]
83    pub output_channel_size: usize,
84
85    #[serde(default = "default::developer::batch_receiver_channel_size")]
86    pub receiver_channel_size: usize,
87
88    #[serde(default = "default::developer::batch_root_stage_channel_size")]
89    pub root_stage_channel_size: usize,
90
91    /// The size of a chunk produced by `RowSeqScanExecutor`
92    #[serde(default = "default::developer::batch_chunk_size")]
93    pub chunk_size: usize,
94
95    /// The number of the connections for batch remote exchange between two nodes.
96    /// If not specified, the value of `server.connection_pool_size` will be used.
97    #[serde(default = "default::developer::batch_exchange_connection_pool_size")]
98    pub(super) exchange_connection_pool_size: Option<u16>,
99
100    #[serde(default)]
101    pub compute_client_config: RpcClientConfig,
102
103    #[serde(default)]
104    pub frontend_client_config: RpcClientConfig,
105
106    #[serde(default = "default::developer::batch_local_execute_buffer_size")]
107    pub local_execute_buffer_size: usize,
108}
109
110pub mod default {
111    pub use crate::config::default::developer;
112
113    pub mod batch {
114        pub fn enable_barrier_read() -> bool {
115            false
116        }
117
118        pub fn enable_spill() -> bool {
119            true
120        }
121
122        pub fn statement_timeout_in_sec() -> u32 {
123            // 1 hour
124            60 * 60
125        }
126
127        pub fn mask_worker_temporary_secs() -> usize {
128            30
129        }
130
131        pub fn redact_sql_option_keywords() -> Vec<String> {
132            [
133                "credential",
134                "key",
135                "password",
136                "private",
137                "secret",
138                "token",
139            ]
140            .into_iter()
141            .map(str::to_string)
142            .collect()
143        }
144    }
145}