Skip to main content

risingwave_ctl/cmd_impl/meta/
create_meta_store_schema.rs

1// Copyright 2026 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use anyhow::Context;
16use risingwave_common::config::{MetaBackend, MetaStoreConfig};
17use risingwave_meta::MetaStoreBackend;
18use risingwave_meta::controller::SqlMetaStore;
19
20/// Options for `MetaCommands::CreateMetaStoreSchema`.
21///
22/// Mirrors the meta-store-selection subset of `RestoreOpts` so the command can
23/// connect to the same backends supported by the meta node.
24#[derive(clap::Args, Debug, Clone)]
25pub struct CreateMetaStoreSchemaOpts {
26    /// Type of meta store to apply schema changes to.
27    #[clap(long, value_enum, default_value_t = MetaBackend::Mem)]
28    pub meta_store_type: MetaBackend,
29    /// Endpoint of the sql backend. For sqlite this is the file path; for
30    /// postgres/mysql it is the `host[:port]` (username/password/database are
31    /// passed separately).
32    #[clap(long, default_value_t = String::from(""))]
33    pub sql_endpoint: String,
34    /// Username of sql backend, required when meta backend is set to MySQL or PostgreSQL.
35    #[clap(long, default_value = "")]
36    pub sql_username: String,
37    /// Password of sql backend, required when meta backend is set to MySQL or PostgreSQL.
38    #[clap(long, default_value = "")]
39    pub sql_password: String,
40    /// Database of sql backend, required when meta backend is set to MySQL or PostgreSQL.
41    #[clap(long, default_value = "")]
42    pub sql_database: String,
43    /// Extra URL parameters for the sql connection, e.g. `sslmode=disable`.
44    /// Example: `param1=value1&param2=value2`.
45    #[clap(long)]
46    pub sql_url_params: Option<String>,
47}
48
49fn build_backend(opts: &CreateMetaStoreSchemaOpts) -> anyhow::Result<MetaStoreBackend> {
50    let params_suffix = |params: &Option<String>| match params {
51        Some(p) if !p.is_empty() => format!("?{p}"),
52        _ => String::new(),
53    };
54
55    Ok(match opts.meta_store_type {
56        MetaBackend::Mem => MetaStoreBackend::Mem,
57        MetaBackend::Sql => MetaStoreBackend::Sql {
58            endpoint: opts.sql_endpoint.clone(),
59            config: MetaStoreConfig::default(),
60        },
61        MetaBackend::Sqlite => MetaStoreBackend::Sql {
62            endpoint: format!("sqlite://{}?mode=rwc", opts.sql_endpoint),
63            config: MetaStoreConfig::default(),
64        },
65        MetaBackend::Postgres => MetaStoreBackend::Sql {
66            endpoint: format!(
67                "postgres://{}:{}@{}/{}{}",
68                opts.sql_username,
69                opts.sql_password,
70                opts.sql_endpoint,
71                opts.sql_database,
72                params_suffix(&opts.sql_url_params),
73            ),
74            config: MetaStoreConfig::default(),
75        },
76        MetaBackend::Mysql => MetaStoreBackend::Sql {
77            endpoint: format!(
78                "mysql://{}:{}@{}/{}{}",
79                opts.sql_username,
80                opts.sql_password,
81                opts.sql_endpoint,
82                opts.sql_database,
83                params_suffix(&opts.sql_url_params),
84            ),
85            config: MetaStoreConfig::default(),
86        },
87    })
88}
89
90/// Apply all schema changes located under `src/meta/model/migration` to the
91/// meta store, mirroring the behavior of `SqlMetaStore::up` without needing a
92/// running meta node.
93pub async fn create_meta_store_schema(opts: CreateMetaStoreSchemaOpts) -> anyhow::Result<()> {
94    let backend = build_backend(&opts)?;
95    let store = SqlMetaStore::connect(backend)
96        .await
97        .context("failed to connect to meta store")?;
98    let first_launch = store
99        .up()
100        .await
101        .context("failed to apply meta store schema")?;
102    if first_launch {
103        tracing::info!("meta store schema initialized (first launch)");
104    } else {
105        tracing::info!("meta store schema upgraded to the latest version");
106    }
107    Ok(())
108}