risingwave_cmd_all/
single_node.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use clap::Parser;
16use home::home_dir;
17use risingwave_common::config::MetaBackend;
18use risingwave_common::util::resource_util::memory::system_memory_available_bytes;
19use risingwave_compactor::CompactorOpts;
20use risingwave_compute::ComputeNodeOpts;
21use risingwave_compute::memory::config::gradient_reserve_memory_bytes;
22use risingwave_frontend::FrontendOpts;
23use risingwave_meta_node::MetaNodeOpts;
24
25use crate::ParsedStandaloneOpts;
26
27#[derive(Eq, PartialOrd, PartialEq, Debug, Clone, Parser)]
28#[command(
29    version,
30    about = "[default] The Single Node mode. Start all services in one process, with process-level options. This will be executed if no subcommand is specified"
31)]
32/// Here we define our own defaults for the single node mode.
33pub struct SingleNodeOpts {
34    /// The address prometheus polls metrics from.
35    #[clap(long, env = "RW_SINGLE_NODE_PROMETHEUS_LISTENER_ADDR")]
36    prometheus_listener_addr: Option<String>,
37
38    /// The path to the cluster configuration file.
39    #[clap(long, env = "RW_SINGLE_NODE_CONFIG_PATH")]
40    config_path: Option<String>,
41
42    /// The store directory used by meta store and object store.
43    #[clap(long, env = "RW_SINGLE_NODE_STORE_DIRECTORY")]
44    store_directory: Option<String>,
45
46    /// Start RisingWave in-memory.
47    #[clap(long, env = "RW_SINGLE_NODE_IN_MEMORY")]
48    in_memory: bool,
49
50    /// Exit RisingWave after specified seconds of inactivity.
51    #[clap(long, hide = true, env = "RW_SINGLE_NODE_MAX_IDLE_SECS")]
52    max_idle_secs: Option<u64>,
53
54    #[clap(flatten)]
55    node_opts: NodeSpecificOpts,
56}
57
58impl SingleNodeOpts {
59    pub fn new_for_playground() -> Self {
60        let empty_args = vec![] as Vec<String>;
61        let mut opts = SingleNodeOpts::parse_from(empty_args);
62        opts.in_memory = true;
63        opts
64    }
65}
66
67/// # Node-Specific Options
68///
69/// ## Which node-specific options should be here?
70///
71/// This only includes the options displayed in the CLI parameters.
72///
73/// 1. An option that will be forced to override by single-node deployment should not be here.
74///    - e.g. `meta_addr` will be automatically set to localhost.
75/// 2. An option defined in the config file and hidden in the command-line help should not be here.
76///    - e.g. `sql_endpoint` is encouraged to be set from config file or environment variables.
77/// 3. An option that is only for cloud deployment should not be here.
78///    - e.g. `proxy_rpc_endpoint` is used in cloud deployment only, and should not be used by open-source users.
79///
80/// ## How to add an new option here?
81///
82/// Options for the other nodes are defined with following convention:
83///
84/// 1. The option name is the same as the definition in node's `Opts` struct. May add a prefix to avoid conflicts when necessary.
85/// 2. The option doesn't have a default value and must be `Option<T>`, so that the default value in the node's `Opts` struct can be used.
86/// 3. The option doesn't need to read from environment variables, which will be done in the node's `Opts` struct.
87#[derive(Eq, PartialOrd, PartialEq, Debug, Clone, Parser)]
88pub struct NodeSpecificOpts {
89    // ------- Compute Node Options -------
90    /// Total available memory for all the nodes
91    #[clap(long)]
92    pub total_memory_bytes: Option<usize>,
93
94    /// The parallelism that the compute node will register to the scheduler of the meta service.
95    #[clap(long)]
96    pub parallelism: Option<usize>,
97
98    // ------- Frontend Node Options -------
99    /// The address that this service listens to.
100    /// Usually the localhost + desired port.
101    #[clap(long)]
102    pub listen_addr: Option<String>,
103
104    // ------- Meta Node Options -------
105    /// The HTTP REST-API address of the Prometheus instance associated to this cluster.
106    /// This address is used to serve `PromQL` queries to Prometheus.
107    /// It is also used by Grafana Dashboard Service to fetch metrics and visualize them.
108    #[clap(long)]
109    pub prometheus_endpoint: Option<String>,
110
111    /// The additional selector used when querying Prometheus.
112    ///
113    /// The format is same as `PromQL`. Example: `instance="foo",namespace="bar"`
114    #[clap(long)]
115    pub prometheus_selector: Option<String>,
116
117    // ------- Compactor Node Options -------
118    #[clap(long)]
119    pub compaction_worker_threads_number: Option<usize>,
120}
121
122pub fn map_single_node_opts_to_standalone_opts(opts: SingleNodeOpts) -> ParsedStandaloneOpts {
123    // Parse from empty strings to get the default values.
124    // Note that environment variables will be used if they are set.
125    let empty_args = vec![] as Vec<String>;
126    let mut meta_opts = MetaNodeOpts::parse_from(&empty_args);
127    let mut compute_opts = ComputeNodeOpts::parse_from(&empty_args);
128    let mut frontend_opts = FrontendOpts::parse_from(&empty_args);
129    let mut compactor_opts = CompactorOpts::parse_from(&empty_args);
130
131    if let Some(max_idle_secs) = opts.max_idle_secs {
132        meta_opts.dangerous_max_idle_secs = Some(max_idle_secs);
133    }
134
135    if let Some(prometheus_listener_addr) = &opts.prometheus_listener_addr {
136        meta_opts.prometheus_listener_addr = Some(prometheus_listener_addr.clone());
137        compute_opts
138            .prometheus_listener_addr
139            .clone_from(prometheus_listener_addr);
140        frontend_opts
141            .prometheus_listener_addr
142            .clone_from(prometheus_listener_addr);
143        compactor_opts
144            .prometheus_listener_addr
145            .clone_from(prometheus_listener_addr);
146    }
147    if let Some(config_path) = &opts.config_path {
148        meta_opts.config_path.clone_from(config_path);
149        compute_opts.config_path.clone_from(config_path);
150        frontend_opts.config_path.clone_from(config_path);
151        compactor_opts.config_path.clone_from(config_path);
152    }
153
154    let store_directory = opts.store_directory.unwrap_or_else(|| {
155        let mut home_path = home_dir().unwrap();
156        home_path.push(".risingwave");
157        home_path.to_str().unwrap().to_owned()
158    });
159
160    // Set state store for meta (if not set). It could be set by environment variables before this.
161    if meta_opts.state_store.is_none() {
162        if opts.in_memory {
163            meta_opts.state_store = Some("hummock+memory".to_owned());
164        } else {
165            let state_store_dir = format!("{}/state_store", &store_directory);
166            std::fs::create_dir_all(&state_store_dir).unwrap();
167            let state_store_url = format!("hummock+fs://{}", &state_store_dir);
168            meta_opts.state_store = Some(state_store_url);
169        }
170
171        // FIXME: otherwise it reports: missing system param "data_directory", but I think it should be set by this way...
172        meta_opts.data_directory = Some("hummock_001".to_owned());
173    }
174
175    // Set meta store for meta (if not set). It could be set by environment variables before this.
176    let meta_backend_is_set = match meta_opts.backend {
177        Some(MetaBackend::Sql)
178        | Some(MetaBackend::Sqlite)
179        | Some(MetaBackend::Postgres)
180        | Some(MetaBackend::Mysql) => meta_opts.sql_endpoint.is_some(),
181        Some(MetaBackend::Mem) => true,
182        None => false,
183    };
184    if !meta_backend_is_set {
185        if opts.in_memory {
186            meta_opts.backend = Some(MetaBackend::Mem);
187        } else {
188            meta_opts.backend = Some(MetaBackend::Sqlite);
189            let meta_store_dir = format!("{}/meta_store", &store_directory);
190            std::fs::create_dir_all(&meta_store_dir).unwrap();
191            let meta_store_endpoint = format!("{}/single_node.db", &meta_store_dir);
192            meta_opts.sql_endpoint = Some(meta_store_endpoint.into());
193        }
194    }
195
196    // Set listen addresses (force to override)
197    meta_opts.listen_addr = "127.0.0.1:5690".to_owned();
198    meta_opts.advertise_addr = "127.0.0.1:5690".to_owned();
199    meta_opts.dashboard_host = Some("0.0.0.0:5691".to_owned());
200    compute_opts.listen_addr = "127.0.0.1:5688".to_owned();
201    compactor_opts.listen_addr = "127.0.0.1:6660".to_owned();
202
203    // Set Meta addresses for all nodes (force to override)
204    let meta_addr = "http://127.0.0.1:5690".to_owned();
205    compute_opts.meta_address = meta_addr.parse().unwrap();
206    frontend_opts.meta_addr = meta_addr.parse().unwrap();
207    compactor_opts.meta_address = meta_addr.parse().unwrap();
208
209    // Allocate memory for each node
210    let total_memory_bytes = if let Some(total_memory_bytes) = opts.node_opts.total_memory_bytes {
211        total_memory_bytes
212    } else {
213        system_memory_available_bytes()
214    };
215    frontend_opts.frontend_total_memory_bytes = memory_for_frontend(total_memory_bytes);
216    compactor_opts.compactor_total_memory_bytes = memory_for_compactor(total_memory_bytes);
217    compute_opts.total_memory_bytes = total_memory_bytes
218        - memory_for_frontend(total_memory_bytes)
219        - memory_for_compactor(total_memory_bytes);
220    compute_opts.memory_manager_target_bytes =
221        Some(gradient_reserve_memory_bytes(total_memory_bytes));
222
223    // Apply node-specific options
224    if let Some(parallelism) = opts.node_opts.parallelism {
225        compute_opts.parallelism = parallelism;
226    }
227    if let Some(listen_addr) = opts.node_opts.listen_addr {
228        frontend_opts.listen_addr = listen_addr;
229    }
230    if let Some(prometheus_endpoint) = opts.node_opts.prometheus_endpoint {
231        meta_opts.prometheus_endpoint = Some(prometheus_endpoint);
232    }
233    if let Some(prometheus_selector) = opts.node_opts.prometheus_selector {
234        meta_opts.prometheus_selector = Some(prometheus_selector);
235    }
236    if let Some(n) = opts.node_opts.compaction_worker_threads_number {
237        compactor_opts.compaction_worker_threads_number = Some(n);
238    }
239
240    ParsedStandaloneOpts {
241        meta_opts: Some(meta_opts),
242        compute_opts: Some(compute_opts),
243        frontend_opts: Some(frontend_opts),
244        compactor_opts: Some(compactor_opts),
245    }
246}
247
248fn memory_for_frontend(total_memory_bytes: usize) -> usize {
249    if total_memory_bytes <= (16 << 30) {
250        total_memory_bytes / 8
251    } else {
252        (total_memory_bytes - (16 << 30)) / 16 + (16 << 30) / 8
253    }
254}
255
256fn memory_for_compactor(total_memory_bytes: usize) -> usize {
257    if total_memory_bytes <= (16 << 30) {
258        total_memory_bytes / 8
259    } else {
260        (total_memory_bytes - (16 << 30)) / 16 + (16 << 30) / 8
261    }
262}