risingwave_cmd_all/
single_node.rs1use clap::Parser;
16use home::home_dir;
17use risingwave_common::config::MetaBackend;
18use risingwave_common::util::resource_util::memory::system_memory_available_bytes;
19use risingwave_compactor::CompactorOpts;
20use risingwave_compute::ComputeNodeOpts;
21use risingwave_compute::memory::config::gradient_reserve_memory_bytes;
22use risingwave_frontend::FrontendOpts;
23use risingwave_meta_node::MetaNodeOpts;
24
25use crate::ParsedStandaloneOpts;
26
27#[derive(Eq, PartialOrd, PartialEq, Debug, Clone, Parser)]
28#[command(
29 version,
30 about = "[default] The Single Node mode. Start all services in one process, with process-level options. This will be executed if no subcommand is specified"
31)]
32pub struct SingleNodeOpts {
34 #[clap(long, env = "RW_SINGLE_NODE_PROMETHEUS_LISTENER_ADDR")]
36 prometheus_listener_addr: Option<String>,
37
38 #[clap(long, env = "RW_SINGLE_NODE_CONFIG_PATH")]
40 config_path: Option<String>,
41
42 #[clap(long, env = "RW_SINGLE_NODE_STORE_DIRECTORY")]
44 store_directory: Option<String>,
45
46 #[clap(long, env = "RW_SINGLE_NODE_IN_MEMORY")]
48 in_memory: bool,
49
50 #[clap(long, hide = true, env = "RW_SINGLE_NODE_MAX_IDLE_SECS")]
52 max_idle_secs: Option<u64>,
53
54 #[clap(flatten)]
55 node_opts: NodeSpecificOpts,
56}
57
58impl SingleNodeOpts {
59 pub fn new_for_playground() -> Self {
60 let empty_args = vec![] as Vec<String>;
61 let mut opts = SingleNodeOpts::parse_from(empty_args);
62 opts.in_memory = true;
63 opts
64 }
65}
66
67#[derive(Eq, PartialOrd, PartialEq, Debug, Clone, Parser)]
88pub struct NodeSpecificOpts {
89 #[clap(long)]
92 pub total_memory_bytes: Option<usize>,
93
94 #[clap(long)]
96 pub parallelism: Option<usize>,
97
98 #[clap(long)]
102 pub listen_addr: Option<String>,
103
104 #[clap(long)]
109 pub prometheus_endpoint: Option<String>,
110
111 #[clap(long)]
115 pub prometheus_selector: Option<String>,
116
117 #[clap(long)]
119 pub compaction_worker_threads_number: Option<usize>,
120}
121
122pub fn map_single_node_opts_to_standalone_opts(opts: SingleNodeOpts) -> ParsedStandaloneOpts {
123 let empty_args = vec![] as Vec<String>;
126 let mut meta_opts = MetaNodeOpts::parse_from(&empty_args);
127 let mut compute_opts = ComputeNodeOpts::parse_from(&empty_args);
128 let mut frontend_opts = FrontendOpts::parse_from(&empty_args);
129 let mut compactor_opts = CompactorOpts::parse_from(&empty_args);
130
131 if let Some(max_idle_secs) = opts.max_idle_secs {
132 meta_opts.dangerous_max_idle_secs = Some(max_idle_secs);
133 }
134
135 if let Some(prometheus_listener_addr) = &opts.prometheus_listener_addr {
136 meta_opts.prometheus_listener_addr = Some(prometheus_listener_addr.clone());
137 compute_opts
138 .prometheus_listener_addr
139 .clone_from(prometheus_listener_addr);
140 frontend_opts
141 .prometheus_listener_addr
142 .clone_from(prometheus_listener_addr);
143 compactor_opts
144 .prometheus_listener_addr
145 .clone_from(prometheus_listener_addr);
146 }
147 if let Some(config_path) = &opts.config_path {
148 meta_opts.config_path.clone_from(config_path);
149 compute_opts.config_path.clone_from(config_path);
150 frontend_opts.config_path.clone_from(config_path);
151 compactor_opts.config_path.clone_from(config_path);
152 }
153
154 let store_directory = opts.store_directory.unwrap_or_else(|| {
155 let mut home_path = home_dir().unwrap();
156 home_path.push(".risingwave");
157 home_path.to_str().unwrap().to_owned()
158 });
159
160 if meta_opts.state_store.is_none() {
162 if opts.in_memory {
163 meta_opts.state_store = Some("hummock+memory".to_owned());
164 } else {
165 let state_store_dir = format!("{}/state_store", &store_directory);
166 std::fs::create_dir_all(&state_store_dir).unwrap();
167 let state_store_url = format!("hummock+fs://{}", &state_store_dir);
168 meta_opts.state_store = Some(state_store_url);
169 }
170
171 meta_opts.data_directory = Some("hummock_001".to_owned());
173 }
174
175 let meta_backend_is_set = match meta_opts.backend {
177 Some(MetaBackend::Sql)
178 | Some(MetaBackend::Sqlite)
179 | Some(MetaBackend::Postgres)
180 | Some(MetaBackend::Mysql) => meta_opts.sql_endpoint.is_some(),
181 Some(MetaBackend::Mem) => true,
182 None => false,
183 };
184 if !meta_backend_is_set {
185 if opts.in_memory {
186 meta_opts.backend = Some(MetaBackend::Mem);
187 } else {
188 meta_opts.backend = Some(MetaBackend::Sqlite);
189 let meta_store_dir = format!("{}/meta_store", &store_directory);
190 std::fs::create_dir_all(&meta_store_dir).unwrap();
191 let meta_store_endpoint = format!("{}/single_node.db", &meta_store_dir);
192 meta_opts.sql_endpoint = Some(meta_store_endpoint.into());
193 }
194 }
195
196 meta_opts.listen_addr = "127.0.0.1:5690".to_owned();
198 meta_opts.advertise_addr = "127.0.0.1:5690".to_owned();
199 meta_opts.dashboard_host = Some("0.0.0.0:5691".to_owned());
200 compute_opts.listen_addr = "127.0.0.1:5688".to_owned();
201 compactor_opts.listen_addr = "127.0.0.1:6660".to_owned();
202
203 let meta_addr = "http://127.0.0.1:5690".to_owned();
205 compute_opts.meta_address = meta_addr.parse().unwrap();
206 frontend_opts.meta_addr = meta_addr.parse().unwrap();
207 compactor_opts.meta_address = meta_addr.parse().unwrap();
208
209 let total_memory_bytes = if let Some(total_memory_bytes) = opts.node_opts.total_memory_bytes {
211 total_memory_bytes
212 } else {
213 system_memory_available_bytes()
214 };
215 frontend_opts.frontend_total_memory_bytes = memory_for_frontend(total_memory_bytes);
216 compactor_opts.compactor_total_memory_bytes = memory_for_compactor(total_memory_bytes);
217 compute_opts.total_memory_bytes = total_memory_bytes
218 - memory_for_frontend(total_memory_bytes)
219 - memory_for_compactor(total_memory_bytes);
220 compute_opts.memory_manager_target_bytes =
221 Some(gradient_reserve_memory_bytes(total_memory_bytes));
222
223 if let Some(parallelism) = opts.node_opts.parallelism {
225 compute_opts.parallelism = parallelism;
226 }
227 if let Some(listen_addr) = opts.node_opts.listen_addr {
228 frontend_opts.listen_addr = listen_addr;
229 }
230 if let Some(prometheus_endpoint) = opts.node_opts.prometheus_endpoint {
231 meta_opts.prometheus_endpoint = Some(prometheus_endpoint);
232 }
233 if let Some(prometheus_selector) = opts.node_opts.prometheus_selector {
234 meta_opts.prometheus_selector = Some(prometheus_selector);
235 }
236 if let Some(n) = opts.node_opts.compaction_worker_threads_number {
237 compactor_opts.compaction_worker_threads_number = Some(n);
238 }
239
240 ParsedStandaloneOpts {
241 meta_opts: Some(meta_opts),
242 compute_opts: Some(compute_opts),
243 frontend_opts: Some(frontend_opts),
244 compactor_opts: Some(compactor_opts),
245 }
246}
247
248fn memory_for_frontend(total_memory_bytes: usize) -> usize {
249 if total_memory_bytes <= (16 << 30) {
250 total_memory_bytes / 8
251 } else {
252 (total_memory_bytes - (16 << 30)) / 16 + (16 << 30) / 8
253 }
254}
255
256fn memory_for_compactor(total_memory_bytes: usize) -> usize {
257 if total_memory_bytes <= (16 << 30) {
258 total_memory_bytes / 8
259 } else {
260 (total_memory_bytes - (16 << 30)) / 16 + (16 << 30) / 8
261 }
262}