risedev/task/
meta_node_service.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::env;
16use std::path::{Path, PathBuf};
17use std::process::Command;
18use std::sync::LazyLock;
19
20use anyhow::{Context, Result, anyhow, bail};
21use itertools::Itertools;
22use sqlx::{ConnectOptions, Database};
23use url::Url;
24
25use super::{ExecuteContext, Task, risingwave_cmd};
26use crate::util::{get_program_args, get_program_env_cmd, get_program_name, is_env_set};
27use crate::{
28    Application, HummockInMemoryStrategy, MetaBackend, MetaNodeConfig, add_hummock_backend,
29    add_tempo_endpoint,
30};
31
32/// URL for connecting to the SQL meta store, retrieved from the env var `RISEDEV_SQL_ENDPOINT`.
33/// If it is not set, a temporary sqlite file is created and used.
34///
35/// # Examples
36///
37/// - `mysql://root:my-secret-pw@127.0.0.1:3306/metastore`
38/// - `postgresql://localhost:5432/metastore`
39/// - `sqlite:///path/to/file.db`
40/// - `sqlite::memory:`
41fn sql_endpoint_from_env() -> String {
42    static SQL_ENDPOINT: LazyLock<String> = LazyLock::new(|| {
43        if let Ok(endpoint) = env::var("RISEDEV_SQL_ENDPOINT") {
44            tracing::info!(
45                "sql endpoint from env RISEDEV_SQL_ENDPOINT resolved to `{}`",
46                endpoint
47            );
48            endpoint
49        } else {
50            // `meta-backend: env` is specified, but env var is not set.
51            // Act as if `meta-backend: sqlite` is specified.
52            // Not using a temporary file because we want to persist the data across restarts.
53            let prefix_data = env::var("PREFIX_DATA").unwrap();
54            let dir = PathBuf::from(&prefix_data).join("meta-backend-env-fallback-sqlite");
55            fs_err::create_dir_all(&dir).unwrap();
56
57            let path = dir.join("metadata.db");
58            let sqlite_endpoint = format!("sqlite://{}?mode=rwc", path.to_string_lossy());
59            tracing::warn!(
60                "env RISEDEV_SQL_ENDPOINT not set, use fallback sqlite `{}`",
61                sqlite_endpoint
62            );
63            sqlite_endpoint
64        }
65    });
66
67    SQL_ENDPOINT.to_owned()
68}
69
70pub struct MetaNodeService {
71    config: MetaNodeConfig,
72}
73
74impl MetaNodeService {
75    pub fn new(config: MetaNodeConfig) -> Result<Self> {
76        Ok(Self { config })
77    }
78
79    /// Apply command args according to config
80    pub fn apply_command_args(
81        cmd: &mut Command,
82        config: &MetaNodeConfig,
83        hummock_in_memory_strategy: HummockInMemoryStrategy,
84    ) -> Result<()> {
85        cmd.arg("--listen-addr")
86            .arg(format!("{}:{}", config.listen_address, config.port))
87            .arg("--advertise-addr")
88            .arg(format!("{}:{}", config.address, config.port))
89            .arg("--dashboard-host")
90            .arg(format!(
91                "{}:{}",
92                config.listen_address, config.dashboard_port
93            ));
94
95        cmd.arg("--prometheus-host").arg(format!(
96            "{}:{}",
97            config.listen_address, config.exporter_port
98        ));
99
100        match config.provide_prometheus.as_ref().unwrap().as_slice() {
101            [] => {}
102            [prometheus] => {
103                cmd.arg("--prometheus-endpoint")
104                    .arg(format!("http://{}:{}", prometheus.address, prometheus.port));
105            }
106            _ => {
107                return Err(anyhow!(
108                    "unexpected prometheus config {:?}, only 1 instance is supported",
109                    config.provide_prometheus
110                ));
111            }
112        }
113
114        let mut is_persistent_meta_store = false;
115
116        match &config.meta_backend {
117            MetaBackend::Memory => {
118                cmd.arg("--backend")
119                    .arg("sql")
120                    .arg("--sql-endpoint")
121                    .arg("sqlite::memory:");
122            }
123            MetaBackend::Sqlite => {
124                let sqlite_config = config.provide_sqlite_backend.as_ref().unwrap();
125                assert_eq!(sqlite_config.len(), 1);
126                is_persistent_meta_store = true;
127
128                let prefix_data = env::var("PREFIX_DATA")?;
129                let file_path = PathBuf::from(&prefix_data)
130                    .join(&sqlite_config[0].id)
131                    .join(&sqlite_config[0].file);
132                cmd.arg("--backend")
133                    .arg("sqlite")
134                    .arg("--sql-endpoint")
135                    .arg(file_path);
136            }
137            MetaBackend::Postgres => {
138                let pg_config = config.provide_postgres_backend.as_ref().unwrap();
139                let pg_store_config = pg_config
140                    .iter()
141                    .filter(|c| c.application == Application::Metastore)
142                    .exactly_one()
143                    .expect("more than one or no pg store config found for metastore");
144                is_persistent_meta_store = true;
145
146                cmd.arg("--backend")
147                    .arg("postgres")
148                    .arg("--sql-endpoint")
149                    .arg(format!(
150                        "{}:{}",
151                        pg_store_config.address, pg_store_config.port,
152                    ))
153                    .arg("--sql-username")
154                    .arg(&pg_store_config.user)
155                    .arg("--sql-password")
156                    .arg(&pg_store_config.password)
157                    .arg("--sql-database")
158                    .arg(&pg_store_config.database);
159            }
160            MetaBackend::Mysql => {
161                let mysql_config = config.provide_mysql_backend.as_ref().unwrap();
162                let mysql_store_config = mysql_config
163                    .iter()
164                    .filter(|c| c.application == Application::Metastore)
165                    .exactly_one()
166                    .expect("more than one or no mysql store config found for metastore");
167                is_persistent_meta_store = true;
168
169                cmd.arg("--backend")
170                    .arg("mysql")
171                    .arg("--sql-endpoint")
172                    .arg(format!(
173                        "{}:{}",
174                        mysql_store_config.address, mysql_store_config.port,
175                    ))
176                    .arg("--sql-username")
177                    .arg(&mysql_store_config.user)
178                    .arg("--sql-password")
179                    .arg(&mysql_store_config.password)
180                    .arg("--sql-database")
181                    .arg(&mysql_store_config.database);
182            }
183            MetaBackend::Env => {
184                let endpoint = sql_endpoint_from_env();
185                is_persistent_meta_store = true;
186
187                cmd.arg("--backend")
188                    .arg("sql")
189                    .arg("--sql-endpoint")
190                    .arg(endpoint);
191            }
192        }
193
194        let provide_minio = config.provide_minio.as_ref().unwrap();
195        let provide_opendal = config.provide_opendal.as_ref().unwrap();
196        let provide_aws_s3 = config.provide_aws_s3.as_ref().unwrap();
197
198        let provide_compute_node = config.provide_compute_node.as_ref().unwrap();
199        let provide_compactor = config.provide_compactor.as_ref().unwrap();
200
201        let (is_shared_backend, is_persistent_backend) = match (
202            config.enable_in_memory_kv_state_backend,
203            provide_minio.as_slice(),
204            provide_aws_s3.as_slice(),
205            provide_opendal.as_slice(),
206        ) {
207            (true, [], [], []) => {
208                cmd.arg("--state-store").arg("in-memory");
209                (false, false)
210            }
211            (true, _, _, _) => {
212                return Err(anyhow!(
213                    "When `enable_in_memory_kv_state_backend` is enabled, no minio and aws-s3 should be provided.",
214                ));
215            }
216            (_, provide_minio, provide_aws_s3, provide_opendal) => add_hummock_backend(
217                &config.id,
218                provide_opendal,
219                provide_minio,
220                provide_aws_s3,
221                hummock_in_memory_strategy,
222                cmd,
223            )?,
224        };
225
226        if (provide_compute_node.len() > 1 || !provide_compactor.is_empty()) && !is_shared_backend {
227            if config.enable_in_memory_kv_state_backend {
228                // Using a non-shared backend with multiple compute nodes will be problematic for
229                // state sharing like scaling. However, for distributed end-to-end tests with
230                // in-memory state store, this is acceptable.
231            } else {
232                return Err(anyhow!(
233                    "Hummock storage may behave incorrectly with in-memory backend for multiple compute-node or compactor-enabled configuration. Should use a shared backend (e.g. MinIO) instead. Consider adding `use: minio` in risedev config."
234                ));
235            }
236        }
237
238        let provide_compactor = config.provide_compactor.as_ref().unwrap();
239        if is_shared_backend && provide_compactor.is_empty() {
240            return Err(anyhow!(
241                "When using a shared backend (minio, aws-s3, or shared in-memory with `risedev playground`), at least one compactor is required. Consider adding `use: compactor` in risedev config."
242            ));
243        }
244        if is_persistent_meta_store && !is_persistent_backend {
245            return Err(anyhow!(
246                "When using a persistent meta store (sql), a persistent state store is required (e.g. minio, aws-s3, etc.)."
247            ));
248        }
249
250        cmd.arg("--data-directory").arg("hummock_001");
251
252        let provide_tempo = config.provide_tempo.as_ref().unwrap();
253        add_tempo_endpoint(provide_tempo, cmd)?;
254
255        Ok(())
256    }
257}
258
259impl Task for MetaNodeService {
260    fn execute(&mut self, ctx: &mut ExecuteContext<impl std::io::Write>) -> anyhow::Result<()> {
261        ctx.service(self);
262        ctx.pb.set_message("starting...");
263
264        let mut cmd = risingwave_cmd("meta-node")?;
265
266        if crate::util::is_env_set("RISEDEV_ENABLE_PROFILE") {
267            cmd.env(
268                "RW_PROFILE_PATH",
269                Path::new(&env::var("PREFIX_LOG")?).join(format!("profile-{}", self.id())),
270            );
271        }
272
273        if crate::util::is_env_set("RISEDEV_ENABLE_HEAP_PROFILE") {
274            // See https://linux.die.net/man/3/jemalloc for the descriptions of profiling options
275            let conf = "prof:true,lg_prof_interval:32,lg_prof_sample:19,prof_prefix:meta-node";
276            cmd.env("_RJEM_MALLOC_CONF", conf); // prefixed for macos
277            cmd.env("MALLOC_CONF", conf); // unprefixed for linux
278        }
279
280        Self::apply_command_args(&mut cmd, &self.config, HummockInMemoryStrategy::Isolated)?;
281
282        if let MetaBackend::Env = self.config.meta_backend {
283            if is_env_set("RISEDEV_CLEAN_START") {
284                ctx.pb.set_message("initializing meta store from env...");
285                initialize_meta_store()?;
286            }
287        }
288
289        if !self.config.user_managed {
290            ctx.run_command(ctx.tmux_run(cmd)?)?;
291            ctx.pb.set_message("started");
292        } else {
293            ctx.pb.set_message("user managed");
294            writeln!(
295                &mut ctx.log,
296                "Please use the following parameters to start the meta:\n{}\n{} {}\n\n",
297                get_program_env_cmd(&cmd),
298                get_program_name(&cmd),
299                get_program_args(&cmd)
300            )?;
301        }
302
303        Ok(())
304    }
305
306    fn id(&self) -> String {
307        self.config.id.clone()
308    }
309}
310
311fn initialize_meta_store() -> Result<(), anyhow::Error> {
312    let rt = tokio::runtime::Builder::new_current_thread()
313        .enable_all()
314        .build()?;
315
316    let endpoint: Url = sql_endpoint_from_env()
317        .parse()
318        .context("invalid url for SQL endpoint")?;
319    let scheme = endpoint.scheme();
320
321    // Retrieve the database name to use for the meta store.
322    // Modify the URL to establish a temporary connection to initialize that database.
323    let (db, init_url) = if sqlx::Postgres::URL_SCHEMES.contains(&scheme) {
324        let options = sqlx::postgres::PgConnectOptions::from_url(&endpoint)
325            .context("invalid database url for Postgres meta backend")?;
326
327        let db = options
328            .get_database()
329            .unwrap_or_else(|| options.get_username()) // PG defaults to username if no database is specified
330            .to_owned();
331        // https://www.postgresql.org/docs/current/manage-ag-templatedbs.html
332        let init_options = options.database("template1");
333        let init_url = init_options.to_url_lossy();
334
335        (db, init_url)
336    } else if sqlx::MySql::URL_SCHEMES.contains(&scheme) {
337        let options = sqlx::mysql::MySqlConnectOptions::from_url(&endpoint)
338            .context("invalid database url for MySQL meta backend")?;
339
340        let db = options
341            .get_database()
342            .context("database not specified for MySQL meta backend")?
343            .to_owned();
344        // Effectively unset the database field when converting back to URL, meaning connect to no database.
345        let init_options = options.database("");
346        let init_url = init_options.to_url_lossy();
347
348        (db, init_url)
349    } else if sqlx::Sqlite::URL_SCHEMES.contains(&scheme) {
350        // For SQLite, simply empty the file.
351        let options = sqlx::sqlite::SqliteConnectOptions::from_url(&endpoint)
352            .context("invalid database url for SQLite meta backend")?;
353
354        if endpoint.as_str().contains(":memory:") || endpoint.as_str().contains("mode=memory") {
355            // SQLite in-memory database does not need initialization.
356        } else {
357            let filename = options.get_filename();
358            if std::fs::exists(filename)? {
359                fs_err::write(filename, b"").context("failed to empty SQLite file")?;
360            }
361        }
362
363        return Ok(());
364    } else {
365        bail!("unsupported SQL scheme for meta backend: {}", scheme);
366    };
367
368    rt.block_on(async move {
369        use sqlx::any::*;
370        install_default_drivers();
371
372        let options = sqlx::any::AnyConnectOptions::from_url(&init_url)?
373            .log_statements(log::LevelFilter::Debug);
374
375        let mut conn = options
376            .connect()
377            .await
378            .context("failed to connect to a template database for meta store")?;
379
380        // Intentionally not executing in a transaction because Postgres does not allow it.
381        sqlx::raw_sql(&format!("DROP DATABASE IF EXISTS {};", db))
382            .execute(&mut conn)
383            .await?;
384        sqlx::raw_sql(&format!("CREATE DATABASE {};", db))
385            .execute(&mut conn)
386            .await?;
387
388        Ok::<_, anyhow::Error>(())
389    })
390    .context("failed to initialize database for meta store")?;
391
392    Ok(())
393}