risingwave_cmd_all/
single_node.rs1use clap::Parser;
16use home::home_dir;
17use risingwave_common::config::MetaBackend;
18use risingwave_common::util::resource_util::memory::system_memory_available_bytes;
19use risingwave_compactor::CompactorOpts;
20use risingwave_compute::ComputeNodeOpts;
21use risingwave_compute::memory::config::gradient_reserve_memory_bytes;
22use risingwave_frontend::FrontendOpts;
23use risingwave_meta_node::MetaNodeOpts;
24
25use crate::ParsedStandaloneOpts;
26
27#[derive(Eq, PartialOrd, PartialEq, Debug, Clone, Parser)]
28#[command(
29 version,
30 about = "[default] The Single Node mode. Start all services in one process, with process-level options. This will be executed if no subcommand is specified"
31)]
32pub struct SingleNodeOpts {
34 #[clap(long, env = "RW_SINGLE_NODE_PROMETHEUS_LISTENER_ADDR")]
36 prometheus_listener_addr: Option<String>,
37
38 #[clap(long, env = "RW_SINGLE_NODE_CONFIG_PATH")]
40 config_path: Option<String>,
41
42 #[clap(long, env = "RW_SINGLE_NODE_STORE_DIRECTORY")]
44 store_directory: Option<String>,
45
46 #[clap(long, env = "RW_SINGLE_NODE_IN_MEMORY")]
48 in_memory: bool,
49
50 #[clap(long, hide = true, env = "RW_SINGLE_NODE_MAX_IDLE_SECS")]
52 max_idle_secs: Option<u64>,
53
54 #[clap(flatten)]
55 node_opts: NodeSpecificOpts,
56}
57
58impl SingleNodeOpts {
59 pub fn new_for_playground() -> Self {
60 let empty_args = vec![] as Vec<String>;
61 let mut opts = SingleNodeOpts::parse_from(empty_args);
62 opts.in_memory = true;
63 opts
64 }
65}
66
67#[derive(Eq, PartialOrd, PartialEq, Debug, Clone, Parser)]
88pub struct NodeSpecificOpts {
89 #[clap(long)]
92 pub total_memory_bytes: Option<usize>,
93
94 #[clap(long)]
96 pub parallelism: Option<usize>,
97
98 #[clap(long)]
102 pub listen_addr: Option<String>,
103
104 #[clap(long)]
109 pub prometheus_endpoint: Option<String>,
110
111 #[clap(long)]
115 pub prometheus_selector: Option<String>,
116
117 #[clap(long)]
119 pub compaction_worker_threads_number: Option<usize>,
120}
121
122const HUMMOCK_IN_MEMORY: &str = "hummock+memory";
123
124pub fn map_single_node_opts_to_standalone_opts(opts: SingleNodeOpts) -> ParsedStandaloneOpts {
125 let empty_args = vec![] as Vec<String>;
128 let mut meta_opts = MetaNodeOpts::parse_from(&empty_args);
129 let mut compute_opts = ComputeNodeOpts::parse_from(&empty_args);
130 let mut frontend_opts = FrontendOpts::parse_from(&empty_args);
131 let mut compactor_opts = CompactorOpts::parse_from(&empty_args);
132
133 if let Some(max_idle_secs) = opts.max_idle_secs {
134 meta_opts.dangerous_max_idle_secs = Some(max_idle_secs);
135 }
136
137 if let Some(prometheus_listener_addr) = &opts.prometheus_listener_addr {
138 meta_opts.prometheus_listener_addr = Some(prometheus_listener_addr.clone());
139 compute_opts
140 .prometheus_listener_addr
141 .clone_from(prometheus_listener_addr);
142 frontend_opts
143 .prometheus_listener_addr
144 .clone_from(prometheus_listener_addr);
145 compactor_opts
146 .prometheus_listener_addr
147 .clone_from(prometheus_listener_addr);
148 }
149 if let Some(config_path) = &opts.config_path {
150 meta_opts.config_path.clone_from(config_path);
151 compute_opts.config_path.clone_from(config_path);
152 frontend_opts.config_path.clone_from(config_path);
153 compactor_opts.config_path.clone_from(config_path);
154 }
155
156 let store_directory = opts.store_directory.unwrap_or_else(|| {
157 let mut home_path = home_dir().unwrap();
158 home_path.push(".risingwave");
159 home_path.to_str().unwrap().to_owned()
160 });
161
162 if meta_opts.state_store.is_none() {
164 if opts.in_memory {
165 meta_opts.state_store = Some(HUMMOCK_IN_MEMORY.to_owned());
166 } else {
167 let state_store_dir = format!("{}/state_store", &store_directory);
168 std::fs::create_dir_all(&state_store_dir).unwrap();
169 let state_store_url = format!("hummock+fs://{}", &state_store_dir);
170 meta_opts.state_store = Some(state_store_url);
171 }
172
173 meta_opts.data_directory = Some("hummock_001".to_owned());
175 }
176
177 let meta_backend_is_set = match meta_opts.backend {
179 Some(MetaBackend::Sql)
180 | Some(MetaBackend::Sqlite)
181 | Some(MetaBackend::Postgres)
182 | Some(MetaBackend::Mysql) => meta_opts.sql_endpoint.is_some(),
183 Some(MetaBackend::Mem) => true,
184 None => false,
185 };
186 if !meta_backend_is_set {
187 if opts.in_memory {
188 meta_opts.backend = Some(MetaBackend::Mem);
189 } else {
190 meta_opts.backend = Some(MetaBackend::Sqlite);
191 let meta_store_dir = format!("{}/meta_store", &store_directory);
192 std::fs::create_dir_all(&meta_store_dir).unwrap();
193 let meta_store_endpoint = format!("{}/single_node.db", &meta_store_dir);
194 meta_opts.sql_endpoint = Some(meta_store_endpoint.into());
195 }
196 }
197
198 meta_opts.listen_addr = "127.0.0.1:5690".to_owned();
200 meta_opts.advertise_addr = "127.0.0.1:5690".to_owned();
201 meta_opts.dashboard_host = Some("0.0.0.0:5691".to_owned());
202 compute_opts.listen_addr = "127.0.0.1:5688".to_owned();
203 compactor_opts.listen_addr = "127.0.0.1:6660".to_owned();
204
205 let meta_addr = "http://127.0.0.1:5690".to_owned();
207 compute_opts.meta_address = meta_addr.parse().unwrap();
208 frontend_opts.meta_addr = meta_addr.parse().unwrap();
209 compactor_opts.meta_address = meta_addr.parse().unwrap();
210
211 let total_memory_bytes = if let Some(total_memory_bytes) = opts.node_opts.total_memory_bytes {
213 total_memory_bytes
214 } else {
215 system_memory_available_bytes()
216 };
217 frontend_opts.frontend_total_memory_bytes = memory_for_frontend(total_memory_bytes);
218 compactor_opts.compactor_total_memory_bytes = memory_for_compactor(total_memory_bytes);
219 compute_opts.total_memory_bytes = total_memory_bytes
220 - memory_for_frontend(total_memory_bytes)
221 - memory_for_compactor(total_memory_bytes);
222 compute_opts.memory_manager_target_bytes =
223 Some(gradient_reserve_memory_bytes(total_memory_bytes));
224
225 if let Some(parallelism) = opts.node_opts.parallelism {
227 compute_opts.parallelism = parallelism;
228 }
229 if let Some(listen_addr) = opts.node_opts.listen_addr {
230 frontend_opts.listen_addr = listen_addr;
231 }
232 if let Some(prometheus_endpoint) = opts.node_opts.prometheus_endpoint {
233 meta_opts.prometheus_endpoint = Some(prometheus_endpoint);
234 }
235 if let Some(prometheus_selector) = opts.node_opts.prometheus_selector {
236 meta_opts.prometheus_selector = Some(prometheus_selector);
237 }
238 if let Some(n) = opts.node_opts.compaction_worker_threads_number {
239 compactor_opts.compaction_worker_threads_number = Some(n);
240 }
241
242 let in_memory_state_store = meta_opts
243 .state_store
244 .as_ref()
245 .unwrap()
246 .starts_with(HUMMOCK_IN_MEMORY);
247
248 ParsedStandaloneOpts {
249 meta_opts: Some(meta_opts),
250 compute_opts: Some(compute_opts),
251 frontend_opts: Some(frontend_opts),
252 compactor_opts: if in_memory_state_store {
254 None
255 } else {
256 Some(compactor_opts)
257 },
258 }
259}
260
261fn memory_for_frontend(total_memory_bytes: usize) -> usize {
262 if total_memory_bytes <= (16 << 30) {
263 total_memory_bytes / 8
264 } else {
265 (total_memory_bytes - (16 << 30)) / 16 + (16 << 30) / 8
266 }
267}
268
269fn memory_for_compactor(total_memory_bytes: usize) -> usize {
270 if total_memory_bytes <= (16 << 30) {
271 total_memory_bytes / 8
272 } else {
273 (total_memory_bytes - (16 << 30)) / 16 + (16 << 30) / 8
274 }
275}
276
277#[cfg(test)]
278mod tests {
279 use super::*;
280
281 #[test]
282 fn test_playground_in_memory_state_store() {
283 let opts = SingleNodeOpts::new_for_playground();
284 let standalone_opts = map_single_node_opts_to_standalone_opts(opts);
285
286 assert!(standalone_opts.compactor_opts.is_none());
288
289 assert_eq!(
290 (standalone_opts.meta_opts.as_ref().unwrap())
291 .state_store
292 .as_ref()
293 .unwrap(),
294 HUMMOCK_IN_MEMORY,
295 );
296 }
297}