1use std::env;
16use std::path::{Path, PathBuf};
17use std::process::Command;
18use std::sync::LazyLock;
19
20use anyhow::{Context, Result, anyhow, bail};
21use itertools::Itertools;
22use sqlx::{ConnectOptions, Database};
23use url::Url;
24
25use super::{ExecuteContext, Task, risingwave_cmd};
26use crate::util::{get_program_args, get_program_env_cmd, get_program_name, is_env_set};
27use crate::{
28 Application, HummockInMemoryStrategy, MetaBackend, MetaNodeConfig, add_hummock_backend,
29 add_tempo_endpoint,
30};
31
32fn sql_endpoint_from_env() -> String {
42 static SQL_ENDPOINT: LazyLock<String> = LazyLock::new(|| {
43 if let Ok(endpoint) = env::var("RISEDEV_SQL_ENDPOINT") {
44 tracing::info!(
45 "sql endpoint from env RISEDEV_SQL_ENDPOINT resolved to `{}`",
46 endpoint
47 );
48 endpoint
49 } else {
50 let prefix_data = env::var("PREFIX_DATA").unwrap();
54 let dir = PathBuf::from(&prefix_data).join("meta-backend-env-fallback-sqlite");
55 fs_err::create_dir_all(&dir).unwrap();
56
57 let path = dir.join("metadata.db");
58 let sqlite_endpoint = format!("sqlite://{}?mode=rwc", path.to_string_lossy());
59 tracing::warn!(
60 "env RISEDEV_SQL_ENDPOINT not set, use fallback sqlite `{}`",
61 sqlite_endpoint
62 );
63 sqlite_endpoint
64 }
65 });
66
67 SQL_ENDPOINT.to_owned()
68}
69
70pub struct MetaNodeService {
71 config: MetaNodeConfig,
72}
73
74impl MetaNodeService {
75 pub fn new(config: MetaNodeConfig) -> Result<Self> {
76 Ok(Self { config })
77 }
78
79 pub fn apply_command_args(
81 cmd: &mut Command,
82 config: &MetaNodeConfig,
83 hummock_in_memory_strategy: HummockInMemoryStrategy,
84 ) -> Result<()> {
85 cmd.arg("--listen-addr")
86 .arg(format!("{}:{}", config.listen_address, config.port))
87 .arg("--advertise-addr")
88 .arg(format!("{}:{}", config.address, config.port))
89 .arg("--dashboard-host")
90 .arg(format!(
91 "{}:{}",
92 config.listen_address, config.dashboard_port
93 ));
94
95 cmd.arg("--prometheus-host").arg(format!(
96 "{}:{}",
97 config.listen_address, config.exporter_port
98 ));
99
100 match config.provide_prometheus.as_ref().unwrap().as_slice() {
101 [] => {}
102 [prometheus] => {
103 cmd.arg("--prometheus-endpoint")
104 .arg(format!("http://{}:{}", prometheus.address, prometheus.port));
105 }
106 _ => {
107 return Err(anyhow!(
108 "unexpected prometheus config {:?}, only 1 instance is supported",
109 config.provide_prometheus
110 ));
111 }
112 }
113
114 let mut is_persistent_meta_store = false;
115
116 match &config.meta_backend {
117 MetaBackend::Memory => {
118 cmd.arg("--backend")
119 .arg("sql")
120 .arg("--sql-endpoint")
121 .arg("sqlite::memory:");
122 }
123 MetaBackend::Sqlite => {
124 let sqlite_config = config.provide_sqlite_backend.as_ref().unwrap();
125 assert_eq!(sqlite_config.len(), 1);
126 is_persistent_meta_store = true;
127
128 let prefix_data = env::var("PREFIX_DATA")?;
129 let file_path = PathBuf::from(&prefix_data)
130 .join(&sqlite_config[0].id)
131 .join(&sqlite_config[0].file);
132 cmd.arg("--backend")
133 .arg("sqlite")
134 .arg("--sql-endpoint")
135 .arg(file_path);
136 }
137 MetaBackend::Postgres => {
138 let pg_config = config.provide_postgres_backend.as_ref().unwrap();
139 let pg_store_config = pg_config
140 .iter()
141 .filter(|c| c.application == Application::Metastore)
142 .exactly_one()
143 .expect("more than one or no pg store config found for metastore");
144 is_persistent_meta_store = true;
145
146 cmd.arg("--backend")
147 .arg("postgres")
148 .arg("--sql-endpoint")
149 .arg(format!(
150 "{}:{}",
151 pg_store_config.address, pg_store_config.port,
152 ))
153 .arg("--sql-username")
154 .arg(&pg_store_config.user)
155 .arg("--sql-password")
156 .arg(&pg_store_config.password)
157 .arg("--sql-database")
158 .arg(&pg_store_config.database);
159 }
160 MetaBackend::Mysql => {
161 let mysql_config = config.provide_mysql_backend.as_ref().unwrap();
162 let mysql_store_config = mysql_config
163 .iter()
164 .filter(|c| c.application == Application::Metastore)
165 .exactly_one()
166 .expect("more than one or no mysql store config found for metastore");
167 is_persistent_meta_store = true;
168
169 cmd.arg("--backend")
170 .arg("mysql")
171 .arg("--sql-endpoint")
172 .arg(format!(
173 "{}:{}",
174 mysql_store_config.address, mysql_store_config.port,
175 ))
176 .arg("--sql-username")
177 .arg(&mysql_store_config.user)
178 .arg("--sql-password")
179 .arg(&mysql_store_config.password)
180 .arg("--sql-database")
181 .arg(&mysql_store_config.database);
182 }
183 MetaBackend::Env => {
184 let endpoint = sql_endpoint_from_env();
185 is_persistent_meta_store = true;
186
187 cmd.arg("--backend")
188 .arg("sql")
189 .arg("--sql-endpoint")
190 .arg(endpoint);
191 }
192 }
193
194 let provide_minio = config.provide_minio.as_ref().unwrap();
195 let provide_opendal = config.provide_opendal.as_ref().unwrap();
196 let provide_aws_s3 = config.provide_aws_s3.as_ref().unwrap();
197
198 let provide_compute_node = config.provide_compute_node.as_ref().unwrap();
199 let provide_compactor = config.provide_compactor.as_ref().unwrap();
200
201 let (is_shared_backend, is_persistent_backend) = match (
202 config.enable_in_memory_kv_state_backend,
203 provide_minio.as_slice(),
204 provide_aws_s3.as_slice(),
205 provide_opendal.as_slice(),
206 ) {
207 (true, [], [], []) => {
208 cmd.arg("--state-store").arg("in-memory");
209 (false, false)
210 }
211 (true, _, _, _) => {
212 return Err(anyhow!(
213 "When `enable_in_memory_kv_state_backend` is enabled, no minio and aws-s3 should be provided.",
214 ));
215 }
216 (_, provide_minio, provide_aws_s3, provide_opendal) => add_hummock_backend(
217 &config.id,
218 provide_opendal,
219 provide_minio,
220 provide_aws_s3,
221 hummock_in_memory_strategy,
222 cmd,
223 )?,
224 };
225
226 if (provide_compute_node.len() > 1 || !provide_compactor.is_empty()) && !is_shared_backend {
227 if config.enable_in_memory_kv_state_backend {
228 } else {
232 return Err(anyhow!(
233 "Hummock storage may behave incorrectly with in-memory backend for multiple compute-node or compactor-enabled configuration. Should use a shared backend (e.g. MinIO) instead. Consider adding `use: minio` in risedev config."
234 ));
235 }
236 }
237
238 let provide_compactor = config.provide_compactor.as_ref().unwrap();
239 if is_shared_backend && provide_compactor.is_empty() {
240 return Err(anyhow!(
241 "When using a shared backend (minio, aws-s3, or shared in-memory with `risedev playground`), at least one compactor is required. Consider adding `use: compactor` in risedev config."
242 ));
243 }
244 if is_persistent_meta_store && !is_persistent_backend {
245 return Err(anyhow!(
246 "When using a persistent meta store (sql), a persistent state store is required (e.g. minio, aws-s3, etc.)."
247 ));
248 }
249
250 cmd.arg("--data-directory").arg("hummock_001");
251
252 let provide_tempo = config.provide_tempo.as_ref().unwrap();
253 add_tempo_endpoint(provide_tempo, cmd)?;
254
255 Ok(())
256 }
257}
258
259impl Task for MetaNodeService {
260 fn execute(&mut self, ctx: &mut ExecuteContext<impl std::io::Write>) -> anyhow::Result<()> {
261 ctx.service(self);
262 ctx.pb.set_message("starting...");
263
264 let mut cmd = risingwave_cmd("meta-node")?;
265
266 if crate::util::is_env_set("RISEDEV_ENABLE_PROFILE") {
267 cmd.env(
268 "RW_PROFILE_PATH",
269 Path::new(&env::var("PREFIX_LOG")?).join(format!("profile-{}", self.id())),
270 );
271 }
272
273 if crate::util::is_env_set("RISEDEV_ENABLE_HEAP_PROFILE") {
274 let conf = "prof:true,lg_prof_interval:32,lg_prof_sample:19,prof_prefix:meta-node";
276 cmd.env("_RJEM_MALLOC_CONF", conf); cmd.env("MALLOC_CONF", conf); }
279
280 Self::apply_command_args(&mut cmd, &self.config, HummockInMemoryStrategy::Isolated)?;
281
282 if let MetaBackend::Env = self.config.meta_backend {
283 if is_env_set("RISEDEV_CLEAN_START") {
284 ctx.pb.set_message("initializing meta store from env...");
285 initialize_meta_store()?;
286 }
287 }
288
289 if !self.config.user_managed {
290 ctx.run_command(ctx.tmux_run(cmd)?)?;
291 ctx.pb.set_message("started");
292 } else {
293 ctx.pb.set_message("user managed");
294 writeln!(
295 &mut ctx.log,
296 "Please use the following parameters to start the meta:\n{}\n{} {}\n\n",
297 get_program_env_cmd(&cmd),
298 get_program_name(&cmd),
299 get_program_args(&cmd)
300 )?;
301 }
302
303 Ok(())
304 }
305
306 fn id(&self) -> String {
307 self.config.id.clone()
308 }
309}
310
311fn initialize_meta_store() -> Result<(), anyhow::Error> {
312 let rt = tokio::runtime::Builder::new_current_thread()
313 .enable_all()
314 .build()?;
315
316 let endpoint: Url = sql_endpoint_from_env()
317 .parse()
318 .context("invalid url for SQL endpoint")?;
319 let scheme = endpoint.scheme();
320
321 let (db, init_url) = if sqlx::Postgres::URL_SCHEMES.contains(&scheme) {
324 let options = sqlx::postgres::PgConnectOptions::from_url(&endpoint)
325 .context("invalid database url for Postgres meta backend")?;
326
327 let db = options
328 .get_database()
329 .unwrap_or_else(|| options.get_username()) .to_owned();
331 let init_options = options.database("template1");
333 let init_url = init_options.to_url_lossy();
334
335 (db, init_url)
336 } else if sqlx::MySql::URL_SCHEMES.contains(&scheme) {
337 let options = sqlx::mysql::MySqlConnectOptions::from_url(&endpoint)
338 .context("invalid database url for MySQL meta backend")?;
339
340 let db = options
341 .get_database()
342 .context("database not specified for MySQL meta backend")?
343 .to_owned();
344 let init_options = options.database("");
346 let init_url = init_options.to_url_lossy();
347
348 (db, init_url)
349 } else if sqlx::Sqlite::URL_SCHEMES.contains(&scheme) {
350 let options = sqlx::sqlite::SqliteConnectOptions::from_url(&endpoint)
352 .context("invalid database url for SQLite meta backend")?;
353
354 if endpoint.as_str().contains(":memory:") || endpoint.as_str().contains("mode=memory") {
355 } else {
357 let filename = options.get_filename();
358 if std::fs::exists(filename)? {
359 fs_err::write(filename, b"").context("failed to empty SQLite file")?;
360 }
361 }
362
363 return Ok(());
364 } else {
365 bail!("unsupported SQL scheme for meta backend: {}", scheme);
366 };
367
368 rt.block_on(async move {
369 use sqlx::any::*;
370 install_default_drivers();
371
372 let options = sqlx::any::AnyConnectOptions::from_url(&init_url)?
373 .log_statements(log::LevelFilter::Debug);
374
375 let mut conn = options
376 .connect()
377 .await
378 .context("failed to connect to a template database for meta store")?;
379
380 sqlx::raw_sql(&format!("DROP DATABASE IF EXISTS {};", db))
382 .execute(&mut conn)
383 .await?;
384 sqlx::raw_sql(&format!("CREATE DATABASE {};", db))
385 .execute(&mut conn)
386 .await?;
387
388 Ok::<_, anyhow::Error>(())
389 })
390 .context("failed to initialize database for meta store")?;
391
392 Ok(())
393}