risingwave_ctl/cmd_impl/meta/
create_meta_store_schema.rs1use anyhow::Context;
16use risingwave_common::config::{MetaBackend, MetaStoreConfig};
17use risingwave_meta::MetaStoreBackend;
18use risingwave_meta::controller::SqlMetaStore;
19
20#[derive(clap::Args, Debug, Clone)]
25pub struct CreateMetaStoreSchemaOpts {
26 #[clap(long, value_enum, default_value_t = MetaBackend::Mem)]
28 pub meta_store_type: MetaBackend,
29 #[clap(long, default_value_t = String::from(""))]
33 pub sql_endpoint: String,
34 #[clap(long, default_value = "")]
36 pub sql_username: String,
37 #[clap(long, default_value = "")]
39 pub sql_password: String,
40 #[clap(long, default_value = "")]
42 pub sql_database: String,
43 #[clap(long)]
46 pub sql_url_params: Option<String>,
47}
48
49fn build_backend(opts: &CreateMetaStoreSchemaOpts) -> anyhow::Result<MetaStoreBackend> {
50 let params_suffix = |params: &Option<String>| match params {
51 Some(p) if !p.is_empty() => format!("?{p}"),
52 _ => String::new(),
53 };
54
55 Ok(match opts.meta_store_type {
56 MetaBackend::Mem => MetaStoreBackend::Mem,
57 MetaBackend::Sql => MetaStoreBackend::Sql {
58 endpoint: opts.sql_endpoint.clone(),
59 config: MetaStoreConfig::default(),
60 },
61 MetaBackend::Sqlite => MetaStoreBackend::Sql {
62 endpoint: format!("sqlite://{}?mode=rwc", opts.sql_endpoint),
63 config: MetaStoreConfig::default(),
64 },
65 MetaBackend::Postgres => MetaStoreBackend::Sql {
66 endpoint: format!(
67 "postgres://{}:{}@{}/{}{}",
68 opts.sql_username,
69 opts.sql_password,
70 opts.sql_endpoint,
71 opts.sql_database,
72 params_suffix(&opts.sql_url_params),
73 ),
74 config: MetaStoreConfig::default(),
75 },
76 MetaBackend::Mysql => MetaStoreBackend::Sql {
77 endpoint: format!(
78 "mysql://{}:{}@{}/{}{}",
79 opts.sql_username,
80 opts.sql_password,
81 opts.sql_endpoint,
82 opts.sql_database,
83 params_suffix(&opts.sql_url_params),
84 ),
85 config: MetaStoreConfig::default(),
86 },
87 })
88}
89
90pub async fn create_meta_store_schema(opts: CreateMetaStoreSchemaOpts) -> anyhow::Result<()> {
94 let backend = build_backend(&opts)?;
95 let store = SqlMetaStore::connect(backend)
96 .await
97 .context("failed to connect to meta store")?;
98 let first_launch = store
99 .up()
100 .await
101 .context("failed to apply meta store schema")?;
102 if first_launch {
103 tracing::info!("meta store schema initialized (first launch)");
104 } else {
105 tracing::info!("meta store schema upgraded to the latest version");
106 }
107 Ok(())
108}