risingwave_common/config/
batch.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use super::*;
16
17/// The section `[batch]` in `risingwave.toml`.
18#[derive(Clone, Debug, Serialize, Deserialize, DefaultFromSerde, ConfigDoc)]
19pub struct BatchConfig {
20    /// The thread number of the batch task runtime in the compute node. The default value is
21    /// decided by `tokio`.
22    #[serde(default)]
23    pub worker_threads_num: Option<usize>,
24
25    #[serde(default, with = "batch_prefix")]
26    #[config_doc(omitted)]
27    pub developer: BatchDeveloperConfig,
28
29    /// This is the max number of queries per sql session.
30    #[serde(default)]
31    pub distributed_query_limit: Option<u64>,
32
33    /// This is the max number of batch queries per frontend node.
34    #[serde(default)]
35    pub max_batch_queries_per_frontend_node: Option<u64>,
36
37    #[serde(default = "default::batch::enable_barrier_read")]
38    pub enable_barrier_read: bool,
39
40    /// Timeout for a batch query in seconds.
41    #[serde(default = "default::batch::statement_timeout_in_sec")]
42    pub statement_timeout_in_sec: u32,
43
44    #[serde(default, flatten)]
45    #[config_doc(omitted)]
46    pub unrecognized: Unrecognized<Self>,
47
48    #[serde(default)]
49    /// frontend compute runtime worker threads
50    pub frontend_compute_runtime_worker_threads: Option<usize>,
51
52    /// This is the secs used to mask a worker unavailable temporarily.
53    #[serde(default = "default::batch::mask_worker_temporary_secs")]
54    pub mask_worker_temporary_secs: usize,
55
56    /// Keywords on which SQL option redaction is based in the query log.
57    /// A SQL option with a name containing any of these keywords will be redacted.
58    #[serde(default = "default::batch::redact_sql_option_keywords")]
59    pub redact_sql_option_keywords: Vec<String>,
60
61    /// Enable the spill out to disk feature for batch queries.
62    #[serde(default = "default::batch::enable_spill")]
63    pub enable_spill: bool,
64}
65
66serde_with::with_prefix!(batch_prefix "batch_");
67
68/// The subsections `[batch.developer]`.
69///
70/// It is put at [`BatchConfig::developer`].
71#[derive(Clone, Debug, Serialize, Deserialize, DefaultFromSerde, ConfigDoc)]
72pub struct BatchDeveloperConfig {
73    /// The capacity of the chunks in the channel that connects between `ConnectorSource` and
74    /// `SourceExecutor`.
75    #[serde(default = "default::developer::connector_message_buffer_size")]
76    pub connector_message_buffer_size: usize,
77
78    /// The size of the channel used for output to exchange/shuffle.
79    #[serde(default = "default::developer::batch_output_channel_size")]
80    pub output_channel_size: usize,
81
82    #[serde(default = "default::developer::batch_receiver_channel_size")]
83    pub receiver_channel_size: usize,
84
85    #[serde(default = "default::developer::batch_root_stage_channel_size")]
86    pub root_stage_channel_size: usize,
87
88    /// The size of a chunk produced by `RowSeqScanExecutor`
89    #[serde(default = "default::developer::batch_chunk_size")]
90    pub chunk_size: usize,
91
92    /// The number of the connections for batch remote exchange between two nodes.
93    /// If not specified, the value of `server.connection_pool_size` will be used.
94    #[serde(default = "default::developer::batch_exchange_connection_pool_size")]
95    pub(super) exchange_connection_pool_size: Option<u16>,
96
97    #[serde(default)]
98    pub compute_client_config: RpcClientConfig,
99
100    #[serde(default)]
101    pub frontend_client_config: RpcClientConfig,
102
103    #[serde(default = "default::developer::batch_local_execute_buffer_size")]
104    pub local_execute_buffer_size: usize,
105}
106
107pub mod default {
108    pub use crate::config::default::developer;
109
110    pub mod batch {
111        pub fn enable_barrier_read() -> bool {
112            false
113        }
114
115        pub fn enable_spill() -> bool {
116            true
117        }
118
119        pub fn statement_timeout_in_sec() -> u32 {
120            // 1 hour
121            60 * 60
122        }
123
124        pub fn mask_worker_temporary_secs() -> usize {
125            30
126        }
127
128        pub fn redact_sql_option_keywords() -> Vec<String> {
129            [
130                "credential",
131                "key",
132                "password",
133                "private",
134                "secret",
135                "token",
136            ]
137            .into_iter()
138            .map(str::to_string)
139            .collect()
140        }
141    }
142}