risingwave_common/config/
batch.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common_proc_macro::serde_prefix_all;
16
17use super::*;
18
19/// The section `[batch]` in `risingwave.toml`.
20#[derive(Clone, Debug, Serialize, Deserialize, DefaultFromSerde, ConfigDoc)]
21pub struct BatchConfig {
22    /// The thread number of the batch task runtime in the compute node. The default value is
23    /// decided by `tokio`.
24    #[serde(default)]
25    pub worker_threads_num: Option<usize>,
26
27    #[serde(default)]
28    #[config_doc(omitted)]
29    pub developer: BatchDeveloperConfig,
30
31    /// This is the max number of queries per sql session.
32    #[serde(default)]
33    pub distributed_query_limit: Option<u64>,
34
35    /// This is the max number of batch queries per frontend node.
36    #[serde(default)]
37    pub max_batch_queries_per_frontend_node: Option<u64>,
38
39    #[serde(default = "default::batch::enable_barrier_read")]
40    pub enable_barrier_read: bool,
41
42    /// Timeout for a batch query in seconds.
43    #[serde(default = "default::batch::statement_timeout_in_sec")]
44    pub statement_timeout_in_sec: u32,
45
46    #[serde(default, flatten)]
47    #[config_doc(omitted)]
48    pub unrecognized: Unrecognized<Self>,
49
50    #[serde(default)]
51    /// frontend compute runtime worker threads
52    pub frontend_compute_runtime_worker_threads: Option<usize>,
53
54    /// This is the secs used to mask a worker unavailable temporarily.
55    #[serde(default = "default::batch::mask_worker_temporary_secs")]
56    pub mask_worker_temporary_secs: usize,
57
58    /// Keywords on which SQL option redaction is based in the query log.
59    /// A SQL option with a name containing any of these keywords will be redacted.
60    #[serde(default = "default::batch::redact_sql_option_keywords")]
61    pub redact_sql_option_keywords: Vec<String>,
62
63    /// Enable the spill out to disk feature for batch queries.
64    #[serde(default = "default::batch::enable_spill")]
65    pub enable_spill: bool,
66}
67
68/// The subsections `[batch.developer]`.
69///
70/// It is put at [`BatchConfig::developer`].
71#[serde_prefix_all("batch_", mode = "alias")]
72#[derive(Clone, Debug, Serialize, Deserialize, DefaultFromSerde, ConfigDoc)]
73pub struct BatchDeveloperConfig {
74    /// The capacity of the chunks in the channel that connects between `ConnectorSource` and
75    /// `SourceExecutor`.
76    #[serde(default = "default::developer::connector_message_buffer_size")]
77    pub connector_message_buffer_size: usize,
78
79    /// The size of the channel used for output to exchange/shuffle.
80    #[serde(default = "default::developer::batch_output_channel_size")]
81    pub output_channel_size: usize,
82
83    #[serde(default = "default::developer::batch_receiver_channel_size")]
84    pub receiver_channel_size: usize,
85
86    #[serde(default = "default::developer::batch_root_stage_channel_size")]
87    pub root_stage_channel_size: usize,
88
89    /// The size of a chunk produced by `RowSeqScanExecutor`
90    #[serde(default = "default::developer::batch_chunk_size")]
91    pub chunk_size: usize,
92
93    /// The number of the connections for batch remote exchange between two nodes.
94    /// If not specified, the value of `server.connection_pool_size` will be used.
95    #[serde(default = "default::developer::batch_exchange_connection_pool_size")]
96    pub(super) exchange_connection_pool_size: Option<u16>,
97
98    #[serde(default)]
99    pub compute_client_config: RpcClientConfig,
100
101    #[serde(default)]
102    pub frontend_client_config: RpcClientConfig,
103
104    #[serde(default = "default::developer::batch_local_execute_buffer_size")]
105    pub local_execute_buffer_size: usize,
106}
107
108pub mod default {
109    pub use crate::config::default::developer;
110
111    pub mod batch {
112        pub fn enable_barrier_read() -> bool {
113            false
114        }
115
116        pub fn enable_spill() -> bool {
117            true
118        }
119
120        pub fn statement_timeout_in_sec() -> u32 {
121            // 1 hour
122            60 * 60
123        }
124
125        pub fn mask_worker_temporary_secs() -> usize {
126            30
127        }
128
129        pub fn redact_sql_option_keywords() -> Vec<String> {
130            [
131                "credential",
132                "key",
133                "password",
134                "private",
135                "secret",
136                "token",
137            ]
138            .into_iter()
139            .map(str::to_string)
140            .collect()
141        }
142    }
143}