risedev/task/
meta_node_service.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::env;
16use std::path::{Path, PathBuf};
17use std::process::Command;
18use std::sync::LazyLock;
19
20use anyhow::{Context, Result, anyhow, bail};
21use itertools::Itertools;
22use sqlx::{ConnectOptions, Database};
23use url::Url;
24
25use super::{ExecuteContext, Task};
26use crate::util::{get_program_args, get_program_env_cmd, get_program_name, is_env_set};
27use crate::{
28    Application, HummockInMemoryStrategy, MetaBackend, MetaNodeConfig, add_hummock_backend,
29    add_tempo_endpoint,
30};
31
32/// URL for connecting to the SQL meta store, retrieved from the env var `RISEDEV_SQL_ENDPOINT`.
33/// If it is not set, a temporary sqlite file is created and used.
34///
35/// # Examples
36///
37/// - `mysql://root:my-secret-pw@127.0.0.1:3306/metastore`
38/// - `postgresql://localhost:5432/metastore`
39/// - `sqlite:///path/to/file.db`
40/// - `sqlite::memory:`
41fn sql_endpoint_from_env() -> String {
42    static SQL_ENDPOINT: LazyLock<String> = LazyLock::new(|| {
43        if let Ok(endpoint) = env::var("RISEDEV_SQL_ENDPOINT") {
44            tracing::info!(
45                "sql endpoint from env RISEDEV_SQL_ENDPOINT resolved to `{}`",
46                endpoint
47            );
48            endpoint
49        } else {
50            // `meta-backend: env` is specified, but env var is not set.
51            // Act as if `meta-backend: sqlite` is specified.
52            // Not using a temporary file because we want to persist the data across restarts.
53            let prefix_data = env::var("PREFIX_DATA").unwrap();
54            let dir = PathBuf::from(&prefix_data).join("meta-backend-env-fallback-sqlite");
55            fs_err::create_dir_all(&dir).unwrap();
56
57            let path = dir.join("metadata.db");
58            let sqlite_endpoint = format!("sqlite://{}?mode=rwc", path.to_string_lossy());
59            tracing::warn!(
60                "env RISEDEV_SQL_ENDPOINT not set, use fallback sqlite `{}`",
61                sqlite_endpoint
62            );
63            sqlite_endpoint
64        }
65    });
66
67    SQL_ENDPOINT.to_owned()
68}
69
70pub struct MetaNodeService {
71    config: MetaNodeConfig,
72}
73
74impl MetaNodeService {
75    pub fn new(config: MetaNodeConfig) -> Result<Self> {
76        Ok(Self { config })
77    }
78
79    /// Apply command args according to config
80    pub fn apply_command_args(
81        cmd: &mut Command,
82        config: &MetaNodeConfig,
83        hummock_in_memory_strategy: HummockInMemoryStrategy,
84    ) -> Result<()> {
85        cmd.arg("--listen-addr")
86            .arg(format!("{}:{}", config.listen_address, config.port))
87            .arg("--advertise-addr")
88            .arg(format!("{}:{}", config.address, config.port))
89            .arg("--dashboard-host")
90            .arg(format!(
91                "{}:{}",
92                config.listen_address, config.dashboard_port
93            ));
94
95        cmd.arg("--prometheus-host").arg(format!(
96            "{}:{}",
97            config.listen_address, config.exporter_port
98        ));
99
100        match config.provide_prometheus.as_ref().unwrap().as_slice() {
101            [] => {}
102            [prometheus] => {
103                cmd.arg("--prometheus-endpoint")
104                    .arg(format!("http://{}:{}", prometheus.address, prometheus.port));
105            }
106            _ => {
107                return Err(anyhow!(
108                    "unexpected prometheus config {:?}, only 1 instance is supported",
109                    config.provide_prometheus
110                ));
111            }
112        }
113
114        let mut is_persistent_meta_store = false;
115
116        match &config.meta_backend {
117            MetaBackend::Memory => {
118                cmd.arg("--backend").arg("mem");
119            }
120            MetaBackend::Sqlite => {
121                let sqlite_config = config.provide_sqlite_backend.as_ref().unwrap();
122                assert_eq!(
123                    sqlite_config.len(),
124                    1,
125                    "should have exactly 1 sqlite config"
126                );
127                is_persistent_meta_store = true;
128
129                let prefix_data = env::var("PREFIX_DATA")?;
130                let file_path = PathBuf::from(&prefix_data)
131                    .join(&sqlite_config[0].id)
132                    .join(&sqlite_config[0].file);
133                cmd.arg("--backend")
134                    .arg("sqlite")
135                    .arg("--sql-endpoint")
136                    .arg(file_path);
137            }
138            MetaBackend::Postgres => {
139                let pg_config = config.provide_postgres_backend.as_ref().unwrap();
140                let pg_store_config = pg_config
141                    .iter()
142                    .filter(|c| c.application == Application::Metastore)
143                    .exactly_one()
144                    .expect("more than one or no pg store config found for metastore");
145                is_persistent_meta_store = true;
146
147                cmd.arg("--backend")
148                    .arg("postgres")
149                    .arg("--sql-endpoint")
150                    .arg(format!(
151                        "{}:{}",
152                        pg_store_config.address, pg_store_config.port,
153                    ))
154                    .arg("--sql-username")
155                    .arg(&pg_store_config.user)
156                    .arg("--sql-password")
157                    .arg(&pg_store_config.password)
158                    .arg("--sql-database")
159                    .arg(&pg_store_config.database);
160            }
161            MetaBackend::Mysql => {
162                let mysql_config = config.provide_mysql_backend.as_ref().unwrap();
163                let mysql_store_config = mysql_config
164                    .iter()
165                    .filter(|c| c.application == Application::Metastore)
166                    .exactly_one()
167                    .expect("more than one or no mysql store config found for metastore");
168                is_persistent_meta_store = true;
169
170                cmd.arg("--backend")
171                    .arg("mysql")
172                    .arg("--sql-endpoint")
173                    .arg(format!(
174                        "{}:{}",
175                        mysql_store_config.address, mysql_store_config.port,
176                    ))
177                    .arg("--sql-username")
178                    .arg(&mysql_store_config.user)
179                    .arg("--sql-password")
180                    .arg(&mysql_store_config.password)
181                    .arg("--sql-database")
182                    .arg(&mysql_store_config.database);
183            }
184            MetaBackend::Env => {
185                let endpoint = sql_endpoint_from_env();
186                is_persistent_meta_store = true;
187
188                cmd.arg("--backend")
189                    .arg("sql")
190                    .arg("--sql-endpoint")
191                    .arg(endpoint);
192            }
193        }
194
195        let provide_minio = config.provide_minio.as_ref().unwrap();
196        let provide_opendal = config.provide_opendal.as_ref().unwrap();
197        let provide_aws_s3 = config.provide_aws_s3.as_ref().unwrap();
198        let provide_moat = config.provide_moat.as_ref().unwrap();
199
200        let provide_compute_node = config.provide_compute_node.as_ref().unwrap();
201        let provide_compactor = config.provide_compactor.as_ref().unwrap();
202
203        let (is_shared_backend, is_persistent_backend) = add_hummock_backend(
204            &config.id,
205            provide_opendal,
206            provide_minio,
207            provide_aws_s3,
208            provide_moat,
209            hummock_in_memory_strategy,
210            cmd,
211        )?;
212
213        if (provide_compute_node.len() > 1 || !provide_compactor.is_empty()) && !is_shared_backend {
214            return Err(anyhow!(
215                "Hummock storage may behave incorrectly with in-memory backend for multiple compute-node or compactor-enabled configuration. Should use a shared backend (e.g. MinIO) instead. Consider adding `use: minio` in risedev config."
216            ));
217        }
218
219        let provide_compactor = config.provide_compactor.as_ref().unwrap();
220        if is_shared_backend && provide_compactor.is_empty() {
221            return Err(anyhow!(
222                "When using a shared backend (minio, aws-s3, or shared in-memory with `risedev playground`), at least one compactor is required. Consider adding `use: compactor` in risedev config."
223            ));
224        }
225        if is_persistent_meta_store && !is_persistent_backend {
226            return Err(anyhow!(
227                "When using a persistent meta store (sql), a persistent state store is required (e.g. minio, aws-s3, etc.)."
228            ));
229        }
230
231        cmd.arg("--data-directory").arg("hummock_001");
232
233        let provide_tempo = config.provide_tempo.as_ref().unwrap();
234        add_tempo_endpoint(provide_tempo, cmd)?;
235
236        Ok(())
237    }
238}
239
240impl Task for MetaNodeService {
241    fn execute(&mut self, ctx: &mut ExecuteContext<impl std::io::Write>) -> anyhow::Result<()> {
242        ctx.service(self);
243        ctx.pb.set_message("starting...");
244
245        let mut cmd = ctx.risingwave_cmd("meta-node")?;
246
247        if crate::util::is_env_set("RISEDEV_ENABLE_PROFILE") {
248            cmd.env(
249                "RW_PROFILE_PATH",
250                Path::new(&env::var("PREFIX_LOG")?).join(format!("profile-{}", self.id())),
251            );
252        }
253
254        if crate::util::is_env_set("RISEDEV_ENABLE_HEAP_PROFILE") {
255            // See https://linux.die.net/man/3/jemalloc for the descriptions of profiling options
256            let conf = "prof:true,lg_prof_interval:32,lg_prof_sample:19,prof_prefix:meta-node";
257            cmd.env("_RJEM_MALLOC_CONF", conf); // prefixed for macos
258            cmd.env("MALLOC_CONF", conf); // unprefixed for linux
259        }
260
261        Self::apply_command_args(&mut cmd, &self.config, HummockInMemoryStrategy::Allowed)?;
262
263        if let MetaBackend::Env = self.config.meta_backend
264            && is_env_set("RISEDEV_CLEAN_START")
265        {
266            ctx.pb.set_message("initializing meta store from env...");
267            initialize_meta_store()?;
268        }
269
270        if !self.config.user_managed {
271            ctx.run_command(ctx.tmux_run(cmd)?)?;
272            ctx.pb.set_message("started");
273        } else {
274            ctx.pb.set_message("user managed");
275            writeln!(
276                &mut ctx.log,
277                "Please use the following parameters to start the meta:\n{}\n{} {}\n\n",
278                get_program_env_cmd(&cmd),
279                get_program_name(&cmd),
280                get_program_args(&cmd)
281            )?;
282        }
283
284        Ok(())
285    }
286
287    fn id(&self) -> String {
288        self.config.id.clone()
289    }
290}
291
292fn initialize_meta_store() -> Result<(), anyhow::Error> {
293    let rt = tokio::runtime::Builder::new_current_thread()
294        .enable_all()
295        .build()?;
296
297    let endpoint: Url = sql_endpoint_from_env()
298        .parse()
299        .context("invalid url for SQL endpoint")?;
300    let scheme = endpoint.scheme();
301
302    // Retrieve the database name to use for the meta store.
303    // Modify the URL to establish a temporary connection to initialize that database.
304    let (db, init_url) = if sqlx::Postgres::URL_SCHEMES.contains(&scheme) {
305        let options = sqlx::postgres::PgConnectOptions::from_url(&endpoint)
306            .context("invalid database url for Postgres meta backend")?;
307
308        let db = options
309            .get_database()
310            .unwrap_or_else(|| options.get_username()) // PG defaults to username if no database is specified
311            .to_owned();
312        // https://www.postgresql.org/docs/current/manage-ag-templatedbs.html
313        let init_options = options.database("template1");
314        let init_url = init_options.to_url_lossy();
315
316        (db, init_url)
317    } else if sqlx::MySql::URL_SCHEMES.contains(&scheme) {
318        let options = sqlx::mysql::MySqlConnectOptions::from_url(&endpoint)
319            .context("invalid database url for MySQL meta backend")?;
320
321        let db = options
322            .get_database()
323            .context("database not specified for MySQL meta backend")?
324            .to_owned();
325        // Effectively unset the database field when converting back to URL, meaning connect to no database.
326        let init_options = options.database("");
327        let init_url = init_options.to_url_lossy();
328
329        (db, init_url)
330    } else if sqlx::Sqlite::URL_SCHEMES.contains(&scheme) {
331        // For SQLite, simply empty the file.
332        let options = sqlx::sqlite::SqliteConnectOptions::from_url(&endpoint)
333            .context("invalid database url for SQLite meta backend")?;
334
335        if endpoint.as_str().contains(":memory:") || endpoint.as_str().contains("mode=memory") {
336            // SQLite in-memory database does not need initialization.
337        } else {
338            let filename = options.get_filename();
339            if std::fs::exists(filename)? {
340                fs_err::write(filename, b"").context("failed to empty SQLite file")?;
341            }
342        }
343
344        return Ok(());
345    } else {
346        bail!("unsupported SQL scheme for meta backend: {}", scheme);
347    };
348
349    rt.block_on(async move {
350        use sqlx::any::*;
351        install_default_drivers();
352
353        let options = sqlx::any::AnyConnectOptions::from_url(&init_url)?
354            .log_statements(log::LevelFilter::Debug);
355
356        let mut conn = options
357            .connect()
358            .await
359            .context("failed to connect to a template database for meta store")?;
360
361        // Intentionally not executing in a transaction because Postgres does not allow it.
362        sqlx::raw_sql(&format!("DROP DATABASE IF EXISTS {};", db))
363            .execute(&mut conn)
364            .await?;
365        sqlx::raw_sql(&format!("CREATE DATABASE {};", db))
366            .execute(&mut conn)
367            .await?;
368
369        Ok::<_, anyhow::Error>(())
370    })
371    .context("failed to initialize database for meta store")?;
372
373    Ok(())
374}