risedev/task/
meta_node_service.rs1use std::env;
16use std::path::{Path, PathBuf};
17use std::process::Command;
18use std::sync::LazyLock;
19
20use anyhow::{Context, Result, anyhow, bail};
21use itertools::Itertools;
22use sqlx::{ConnectOptions, Database};
23use url::Url;
24
25use super::{ExecuteContext, Task};
26use crate::util::{get_program_args, get_program_env_cmd, get_program_name, is_env_set};
27use crate::{
28 Application, HummockInMemoryStrategy, MetaBackend, MetaNodeConfig, add_hummock_backend,
29 add_tempo_endpoint,
30};
31
32fn sql_endpoint_from_env() -> String {
42 static SQL_ENDPOINT: LazyLock<String> = LazyLock::new(|| {
43 if let Ok(endpoint) = env::var("RISEDEV_SQL_ENDPOINT") {
44 tracing::info!(
45 "sql endpoint from env RISEDEV_SQL_ENDPOINT resolved to `{}`",
46 endpoint
47 );
48 endpoint
49 } else {
50 let prefix_data = env::var("PREFIX_DATA").unwrap();
54 let dir = PathBuf::from(&prefix_data).join("meta-backend-env-fallback-sqlite");
55 fs_err::create_dir_all(&dir).unwrap();
56
57 let path = dir.join("metadata.db");
58 let sqlite_endpoint = format!("sqlite://{}?mode=rwc", path.to_string_lossy());
59 tracing::warn!(
60 "env RISEDEV_SQL_ENDPOINT not set, use fallback sqlite `{}`",
61 sqlite_endpoint
62 );
63 sqlite_endpoint
64 }
65 });
66
67 SQL_ENDPOINT.to_owned()
68}
69
70pub struct MetaNodeService {
71 config: MetaNodeConfig,
72}
73
74impl MetaNodeService {
75 pub fn new(config: MetaNodeConfig) -> Result<Self> {
76 Ok(Self { config })
77 }
78
79 pub fn apply_command_args(
81 cmd: &mut Command,
82 config: &MetaNodeConfig,
83 hummock_in_memory_strategy: HummockInMemoryStrategy,
84 ) -> Result<()> {
85 cmd.arg("--listen-addr")
86 .arg(format!("{}:{}", config.listen_address, config.port))
87 .arg("--advertise-addr")
88 .arg(format!("{}:{}", config.address, config.port))
89 .arg("--dashboard-host")
90 .arg(format!(
91 "{}:{}",
92 config.listen_address, config.dashboard_port
93 ));
94
95 cmd.arg("--prometheus-host").arg(format!(
96 "{}:{}",
97 config.listen_address, config.exporter_port
98 ));
99
100 match config.provide_prometheus.as_ref().unwrap().as_slice() {
101 [] => {}
102 [prometheus] => {
103 cmd.arg("--prometheus-endpoint")
104 .arg(format!("http://{}:{}", prometheus.address, prometheus.port));
105 }
106 _ => {
107 return Err(anyhow!(
108 "unexpected prometheus config {:?}, only 1 instance is supported",
109 config.provide_prometheus
110 ));
111 }
112 }
113
114 let mut is_persistent_meta_store = false;
115
116 match &config.meta_backend {
117 MetaBackend::Memory => {
118 cmd.arg("--backend").arg("mem");
119 }
120 MetaBackend::Sqlite => {
121 let sqlite_config = config.provide_sqlite_backend.as_ref().unwrap();
122 assert_eq!(
123 sqlite_config.len(),
124 1,
125 "should have exactly 1 sqlite config"
126 );
127 is_persistent_meta_store = true;
128
129 let prefix_data = env::var("PREFIX_DATA")?;
130 let file_path = PathBuf::from(&prefix_data)
131 .join(&sqlite_config[0].id)
132 .join(&sqlite_config[0].file);
133 cmd.arg("--backend")
134 .arg("sqlite")
135 .arg("--sql-endpoint")
136 .arg(file_path);
137 }
138 MetaBackend::Postgres => {
139 let pg_config = config.provide_postgres_backend.as_ref().unwrap();
140 let pg_store_config = pg_config
141 .iter()
142 .filter(|c| c.application == Application::Metastore)
143 .exactly_one()
144 .expect("more than one or no pg store config found for metastore");
145 is_persistent_meta_store = true;
146
147 cmd.arg("--backend")
148 .arg("postgres")
149 .arg("--sql-endpoint")
150 .arg(format!(
151 "{}:{}",
152 pg_store_config.address, pg_store_config.port,
153 ))
154 .arg("--sql-username")
155 .arg(&pg_store_config.user)
156 .arg("--sql-password")
157 .arg(&pg_store_config.password)
158 .arg("--sql-database")
159 .arg(&pg_store_config.database);
160 }
161 MetaBackend::Mysql => {
162 let mysql_config = config.provide_mysql_backend.as_ref().unwrap();
163 let mysql_store_config = mysql_config
164 .iter()
165 .filter(|c| c.application == Application::Metastore)
166 .exactly_one()
167 .expect("more than one or no mysql store config found for metastore");
168 is_persistent_meta_store = true;
169
170 cmd.arg("--backend")
171 .arg("mysql")
172 .arg("--sql-endpoint")
173 .arg(format!(
174 "{}:{}",
175 mysql_store_config.address, mysql_store_config.port,
176 ))
177 .arg("--sql-username")
178 .arg(&mysql_store_config.user)
179 .arg("--sql-password")
180 .arg(&mysql_store_config.password)
181 .arg("--sql-database")
182 .arg(&mysql_store_config.database);
183 }
184 MetaBackend::Env => {
185 let endpoint = sql_endpoint_from_env();
186 is_persistent_meta_store = true;
187
188 cmd.arg("--backend")
189 .arg("sql")
190 .arg("--sql-endpoint")
191 .arg(endpoint);
192 }
193 }
194
195 let provide_minio = config.provide_minio.as_ref().unwrap();
196 let provide_opendal = config.provide_opendal.as_ref().unwrap();
197 let provide_aws_s3 = config.provide_aws_s3.as_ref().unwrap();
198
199 let provide_compute_node = config.provide_compute_node.as_ref().unwrap();
200 let provide_compactor = config.provide_compactor.as_ref().unwrap();
201
202 let (is_shared_backend, is_persistent_backend) = add_hummock_backend(
203 &config.id,
204 provide_opendal,
205 provide_minio,
206 provide_aws_s3,
207 hummock_in_memory_strategy,
208 cmd,
209 )?;
210
211 if (provide_compute_node.len() > 1 || !provide_compactor.is_empty()) && !is_shared_backend {
212 return Err(anyhow!(
213 "Hummock storage may behave incorrectly with in-memory backend for multiple compute-node or compactor-enabled configuration. Should use a shared backend (e.g. MinIO) instead. Consider adding `use: minio` in risedev config."
214 ));
215 }
216
217 let provide_compactor = config.provide_compactor.as_ref().unwrap();
218 if is_shared_backend && provide_compactor.is_empty() {
219 return Err(anyhow!(
220 "When using a shared backend (minio, aws-s3, or shared in-memory with `risedev playground`), at least one compactor is required. Consider adding `use: compactor` in risedev config."
221 ));
222 }
223 if is_persistent_meta_store && !is_persistent_backend {
224 return Err(anyhow!(
225 "When using a persistent meta store (sql), a persistent state store is required (e.g. minio, aws-s3, etc.)."
226 ));
227 }
228
229 cmd.arg("--data-directory").arg("hummock_001");
230
231 let provide_tempo = config.provide_tempo.as_ref().unwrap();
232 add_tempo_endpoint(provide_tempo, cmd)?;
233
234 Ok(())
235 }
236}
237
238impl Task for MetaNodeService {
239 fn execute(&mut self, ctx: &mut ExecuteContext<impl std::io::Write>) -> anyhow::Result<()> {
240 ctx.service(self);
241 ctx.pb.set_message("starting...");
242
243 let mut cmd = ctx.risingwave_cmd("meta-node")?;
244
245 if crate::util::is_env_set("RISEDEV_ENABLE_PROFILE") {
246 cmd.env(
247 "RW_PROFILE_PATH",
248 Path::new(&env::var("PREFIX_LOG")?).join(format!("profile-{}", self.id())),
249 );
250 }
251
252 if crate::util::is_env_set("RISEDEV_ENABLE_HEAP_PROFILE") {
253 let conf = "prof:true,lg_prof_interval:32,lg_prof_sample:19,prof_prefix:meta-node";
255 cmd.env("_RJEM_MALLOC_CONF", conf); cmd.env("MALLOC_CONF", conf); }
258
259 Self::apply_command_args(&mut cmd, &self.config, HummockInMemoryStrategy::Allowed)?;
260
261 if let MetaBackend::Env = self.config.meta_backend {
262 if is_env_set("RISEDEV_CLEAN_START") {
263 ctx.pb.set_message("initializing meta store from env...");
264 initialize_meta_store()?;
265 }
266 }
267
268 if !self.config.user_managed {
269 ctx.run_command(ctx.tmux_run(cmd)?)?;
270 ctx.pb.set_message("started");
271 } else {
272 ctx.pb.set_message("user managed");
273 writeln!(
274 &mut ctx.log,
275 "Please use the following parameters to start the meta:\n{}\n{} {}\n\n",
276 get_program_env_cmd(&cmd),
277 get_program_name(&cmd),
278 get_program_args(&cmd)
279 )?;
280 }
281
282 Ok(())
283 }
284
285 fn id(&self) -> String {
286 self.config.id.clone()
287 }
288}
289
290fn initialize_meta_store() -> Result<(), anyhow::Error> {
291 let rt = tokio::runtime::Builder::new_current_thread()
292 .enable_all()
293 .build()?;
294
295 let endpoint: Url = sql_endpoint_from_env()
296 .parse()
297 .context("invalid url for SQL endpoint")?;
298 let scheme = endpoint.scheme();
299
300 let (db, init_url) = if sqlx::Postgres::URL_SCHEMES.contains(&scheme) {
303 let options = sqlx::postgres::PgConnectOptions::from_url(&endpoint)
304 .context("invalid database url for Postgres meta backend")?;
305
306 let db = options
307 .get_database()
308 .unwrap_or_else(|| options.get_username()) .to_owned();
310 let init_options = options.database("template1");
312 let init_url = init_options.to_url_lossy();
313
314 (db, init_url)
315 } else if sqlx::MySql::URL_SCHEMES.contains(&scheme) {
316 let options = sqlx::mysql::MySqlConnectOptions::from_url(&endpoint)
317 .context("invalid database url for MySQL meta backend")?;
318
319 let db = options
320 .get_database()
321 .context("database not specified for MySQL meta backend")?
322 .to_owned();
323 let init_options = options.database("");
325 let init_url = init_options.to_url_lossy();
326
327 (db, init_url)
328 } else if sqlx::Sqlite::URL_SCHEMES.contains(&scheme) {
329 let options = sqlx::sqlite::SqliteConnectOptions::from_url(&endpoint)
331 .context("invalid database url for SQLite meta backend")?;
332
333 if endpoint.as_str().contains(":memory:") || endpoint.as_str().contains("mode=memory") {
334 } else {
336 let filename = options.get_filename();
337 if std::fs::exists(filename)? {
338 fs_err::write(filename, b"").context("failed to empty SQLite file")?;
339 }
340 }
341
342 return Ok(());
343 } else {
344 bail!("unsupported SQL scheme for meta backend: {}", scheme);
345 };
346
347 rt.block_on(async move {
348 use sqlx::any::*;
349 install_default_drivers();
350
351 let options = sqlx::any::AnyConnectOptions::from_url(&init_url)?
352 .log_statements(log::LevelFilter::Debug);
353
354 let mut conn = options
355 .connect()
356 .await
357 .context("failed to connect to a template database for meta store")?;
358
359 sqlx::raw_sql(&format!("DROP DATABASE IF EXISTS {};", db))
361 .execute(&mut conn)
362 .await?;
363 sqlx::raw_sql(&format!("CREATE DATABASE {};", db))
364 .execute(&mut conn)
365 .await?;
366
367 Ok::<_, anyhow::Error>(())
368 })
369 .context("failed to initialize database for meta store")?;
370
371 Ok(())
372}