risingwave_cmd_all/
single_node.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use clap::Parser;
16use home::home_dir;
17use risingwave_common::config::MetaBackend;
18use risingwave_common::util::resource_util::memory::system_memory_available_bytes;
19use risingwave_compactor::CompactorOpts;
20use risingwave_compute::ComputeNodeOpts;
21use risingwave_compute::memory::config::gradient_reserve_memory_bytes;
22use risingwave_frontend::FrontendOpts;
23use risingwave_meta_node::MetaNodeOpts;
24
25use crate::ParsedStandaloneOpts;
26
27#[derive(Eq, PartialOrd, PartialEq, Debug, Clone, Parser)]
28#[command(
29    version,
30    about = "[default] The Single Node mode. Start all services in one process, with process-level options. This will be executed if no subcommand is specified"
31)]
32/// Here we define our own defaults for the single node mode.
33pub struct SingleNodeOpts {
34    /// The address prometheus polls metrics from.
35    #[clap(long, env = "RW_SINGLE_NODE_PROMETHEUS_LISTENER_ADDR")]
36    prometheus_listener_addr: Option<String>,
37
38    /// The path to the cluster configuration file.
39    #[clap(long, env = "RW_SINGLE_NODE_CONFIG_PATH")]
40    config_path: Option<String>,
41
42    /// The store directory used by meta store and object store.
43    #[clap(long, env = "RW_SINGLE_NODE_STORE_DIRECTORY")]
44    store_directory: Option<String>,
45
46    /// Start RisingWave in-memory.
47    #[clap(long, env = "RW_SINGLE_NODE_IN_MEMORY")]
48    in_memory: bool,
49
50    /// Exit RisingWave after specified seconds of inactivity.
51    #[clap(long, hide = true, env = "RW_SINGLE_NODE_MAX_IDLE_SECS")]
52    max_idle_secs: Option<u64>,
53
54    #[clap(flatten)]
55    node_opts: NodeSpecificOpts,
56}
57
58impl SingleNodeOpts {
59    pub fn new_for_playground() -> Self {
60        let empty_args = vec![] as Vec<String>;
61        let mut opts = SingleNodeOpts::parse_from(empty_args);
62        opts.in_memory = true;
63        opts
64    }
65}
66
67/// # Node-Specific Options
68///
69/// ## Which node-specific options should be here?
70///
71/// This only includes the options displayed in the CLI parameters.
72///
73/// 1. An option that will be forced to override by single-node deployment should not be here.
74///    - e.g. `meta_addr` will be automatically set to localhost.
75/// 2. An option defined in the config file and hidden in the command-line help should not be here.
76///    - e.g. `sql_endpoint` is encouraged to be set from config file or environment variables.
77/// 3. An option that is only for cloud deployment should not be here.
78///    - e.g. `proxy_rpc_endpoint` is used in cloud deployment only, and should not be used by open-source users.
79///
80/// ## How to add an new option here?
81///
82/// Options for the other nodes are defined with following convention:
83///
84/// 1. The option name is the same as the definition in node's `Opts` struct. May add a prefix to avoid conflicts when necessary.
85/// 2. The option doesn't have a default value and must be `Option<T>`, so that the default value in the node's `Opts` struct can be used.
86/// 3. The option doesn't need to read from environment variables, which will be done in the node's `Opts` struct.
87#[derive(Eq, PartialOrd, PartialEq, Debug, Clone, Parser)]
88pub struct NodeSpecificOpts {
89    // ------- Compute Node Options -------
90    /// Total available memory for all the nodes
91    #[clap(long)]
92    pub total_memory_bytes: Option<usize>,
93
94    /// The parallelism that the compute node will register to the scheduler of the meta service.
95    #[clap(long)]
96    pub parallelism: Option<usize>,
97
98    // ------- Frontend Node Options -------
99    /// The address that this service listens to.
100    /// Usually the localhost + desired port.
101    #[clap(long)]
102    pub listen_addr: Option<String>,
103
104    // ------- Meta Node Options -------
105    /// The HTTP REST-API address of the Prometheus instance associated to this cluster.
106    /// This address is used to serve `PromQL` queries to Prometheus.
107    /// It is also used by Grafana Dashboard Service to fetch metrics and visualize them.
108    #[clap(long)]
109    pub prometheus_endpoint: Option<String>,
110
111    /// The additional selector used when querying Prometheus.
112    ///
113    /// The format is same as `PromQL`. Example: `instance="foo",namespace="bar"`
114    #[clap(long)]
115    pub prometheus_selector: Option<String>,
116
117    // ------- Compactor Node Options -------
118    #[clap(long)]
119    pub compaction_worker_threads_number: Option<usize>,
120}
121
122const HUMMOCK_IN_MEMORY: &str = "hummock+memory";
123
124pub fn map_single_node_opts_to_standalone_opts(opts: SingleNodeOpts) -> ParsedStandaloneOpts {
125    // Parse from empty strings to get the default values.
126    // Note that environment variables will be used if they are set.
127    let empty_args = vec![] as Vec<String>;
128    let mut meta_opts = MetaNodeOpts::parse_from(&empty_args);
129    let mut compute_opts = ComputeNodeOpts::parse_from(&empty_args);
130    let mut frontend_opts = FrontendOpts::parse_from(&empty_args);
131    let mut compactor_opts = CompactorOpts::parse_from(&empty_args);
132
133    if let Some(max_idle_secs) = opts.max_idle_secs {
134        meta_opts.dangerous_max_idle_secs = Some(max_idle_secs);
135    }
136
137    if let Some(prometheus_listener_addr) = &opts.prometheus_listener_addr {
138        meta_opts.prometheus_listener_addr = Some(prometheus_listener_addr.clone());
139        compute_opts
140            .prometheus_listener_addr
141            .clone_from(prometheus_listener_addr);
142        frontend_opts
143            .prometheus_listener_addr
144            .clone_from(prometheus_listener_addr);
145        compactor_opts
146            .prometheus_listener_addr
147            .clone_from(prometheus_listener_addr);
148    }
149    if let Some(config_path) = &opts.config_path {
150        meta_opts.config_path.clone_from(config_path);
151        compute_opts.config_path.clone_from(config_path);
152        frontend_opts.config_path.clone_from(config_path);
153        compactor_opts.config_path.clone_from(config_path);
154    }
155
156    let store_directory = opts.store_directory.unwrap_or_else(|| {
157        let mut home_path = home_dir().unwrap();
158        home_path.push(".risingwave");
159        home_path.to_str().unwrap().to_owned()
160    });
161
162    // Set state store for meta (if not set). It could be set by environment variables before this.
163    if meta_opts.state_store.is_none() {
164        if opts.in_memory {
165            meta_opts.state_store = Some(HUMMOCK_IN_MEMORY.to_owned());
166        } else {
167            let state_store_dir = format!("{}/state_store", &store_directory);
168            std::fs::create_dir_all(&state_store_dir).unwrap();
169            let state_store_url = format!("hummock+fs://{}", &state_store_dir);
170            meta_opts.state_store = Some(state_store_url);
171        }
172
173        // FIXME: otherwise it reports: missing system param "data_directory", but I think it should be set by this way...
174        meta_opts.data_directory = Some("hummock_001".to_owned());
175    }
176
177    // Set meta store for meta (if not set). It could be set by environment variables before this.
178    let meta_backend_is_set = match meta_opts.backend {
179        Some(MetaBackend::Sql)
180        | Some(MetaBackend::Sqlite)
181        | Some(MetaBackend::Postgres)
182        | Some(MetaBackend::Mysql) => meta_opts.sql_endpoint.is_some(),
183        Some(MetaBackend::Mem) => true,
184        None => false,
185    };
186    if !meta_backend_is_set {
187        if opts.in_memory {
188            meta_opts.backend = Some(MetaBackend::Mem);
189        } else {
190            meta_opts.backend = Some(MetaBackend::Sqlite);
191            let meta_store_dir = format!("{}/meta_store", &store_directory);
192            std::fs::create_dir_all(&meta_store_dir).unwrap();
193            let meta_store_endpoint = format!("{}/single_node.db", &meta_store_dir);
194            meta_opts.sql_endpoint = Some(meta_store_endpoint.into());
195        }
196    }
197
198    // Set listen addresses (force to override)
199    meta_opts.listen_addr = "127.0.0.1:5690".to_owned();
200    meta_opts.advertise_addr = "127.0.0.1:5690".to_owned();
201    meta_opts.dashboard_host = Some("0.0.0.0:5691".to_owned());
202    compute_opts.listen_addr = "127.0.0.1:5688".to_owned();
203    compactor_opts.listen_addr = "127.0.0.1:6660".to_owned();
204
205    // Set Meta addresses for all nodes (force to override)
206    let meta_addr = "http://127.0.0.1:5690".to_owned();
207    compute_opts.meta_address = meta_addr.parse().unwrap();
208    frontend_opts.meta_addr = meta_addr.parse().unwrap();
209    compactor_opts.meta_address = meta_addr.parse().unwrap();
210
211    // Allocate memory for each node
212    let total_memory_bytes = if let Some(total_memory_bytes) = opts.node_opts.total_memory_bytes {
213        total_memory_bytes
214    } else {
215        system_memory_available_bytes()
216    };
217    frontend_opts.frontend_total_memory_bytes = memory_for_frontend(total_memory_bytes);
218    compactor_opts.compactor_total_memory_bytes = memory_for_compactor(total_memory_bytes);
219    compute_opts.total_memory_bytes = total_memory_bytes
220        - memory_for_frontend(total_memory_bytes)
221        - memory_for_compactor(total_memory_bytes);
222    compute_opts.memory_manager_target_bytes =
223        Some(gradient_reserve_memory_bytes(total_memory_bytes));
224
225    // Apply node-specific options
226    if let Some(parallelism) = opts.node_opts.parallelism {
227        compute_opts.parallelism = parallelism;
228    }
229    if let Some(listen_addr) = opts.node_opts.listen_addr {
230        frontend_opts.listen_addr = listen_addr;
231    }
232    if let Some(prometheus_endpoint) = opts.node_opts.prometheus_endpoint {
233        meta_opts.prometheus_endpoint = Some(prometheus_endpoint);
234    }
235    if let Some(prometheus_selector) = opts.node_opts.prometheus_selector {
236        meta_opts.prometheus_selector = Some(prometheus_selector);
237    }
238    if let Some(n) = opts.node_opts.compaction_worker_threads_number {
239        compactor_opts.compaction_worker_threads_number = Some(n);
240    }
241
242    let in_memory_state_store = meta_opts
243        .state_store
244        .as_ref()
245        .unwrap()
246        .starts_with(HUMMOCK_IN_MEMORY);
247
248    ParsedStandaloneOpts {
249        meta_opts: Some(meta_opts),
250        compute_opts: Some(compute_opts),
251        frontend_opts: Some(frontend_opts),
252        // If the state store is in-memory, the compute node will start an embedded compactor.
253        compactor_opts: if in_memory_state_store {
254            None
255        } else {
256            Some(compactor_opts)
257        },
258    }
259}
260
261fn memory_for_frontend(total_memory_bytes: usize) -> usize {
262    if total_memory_bytes <= (16 << 30) {
263        total_memory_bytes / 8
264    } else {
265        (total_memory_bytes - (16 << 30)) / 16 + (16 << 30) / 8
266    }
267}
268
269fn memory_for_compactor(total_memory_bytes: usize) -> usize {
270    if total_memory_bytes <= (16 << 30) {
271        total_memory_bytes / 8
272    } else {
273        (total_memory_bytes - (16 << 30)) / 16 + (16 << 30) / 8
274    }
275}
276
277#[cfg(test)]
278mod tests {
279    use super::*;
280
281    #[test]
282    fn test_playground_in_memory_state_store() {
283        let opts = SingleNodeOpts::new_for_playground();
284        let standalone_opts = map_single_node_opts_to_standalone_opts(opts);
285
286        // Should not start a compactor.
287        assert!(standalone_opts.compactor_opts.is_none());
288
289        assert_eq!(
290            (standalone_opts.meta_opts.as_ref().unwrap())
291                .state_store
292                .as_ref()
293                .unwrap(),
294            HUMMOCK_IN_MEMORY,
295        );
296    }
297}