risingwave_meta/controller/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::BTreeMap;
16use std::time::Duration;
17
18use anyhow::{Context, anyhow};
19use risingwave_common::bail;
20use risingwave_common::hash::VnodeCount;
21use risingwave_common::util::epoch::Epoch;
22use risingwave_meta_model::{
23    PrivateLinkService, connection, database, function, index, object, schema, secret, sink,
24    source, subscription, table, view,
25};
26use risingwave_meta_model_migration::{MigrationStatus, Migrator, MigratorTrait};
27use risingwave_pb::catalog::connection::PbInfo as PbConnectionInfo;
28use risingwave_pb::catalog::table::{CdcTableType as PbCdcTableType, PbEngine, PbTableType};
29use risingwave_pb::catalog::{
30    PbConnection, PbCreateType, PbDatabase, PbFunction, PbHandleConflictBehavior, PbIndex,
31    PbSchema, PbSecret, PbSink, PbSinkType, PbSource, PbStreamJobStatus, PbSubscription, PbTable,
32    PbView,
33};
34use sea_orm::{ConnectOptions, DatabaseConnection, DbBackend, ModelTrait};
35
36use crate::{MetaError, MetaResult, MetaStoreBackend};
37
38pub mod catalog;
39pub mod cluster;
40pub mod fragment;
41pub mod id;
42pub mod rename;
43pub mod scale;
44pub mod session_params;
45pub mod streaming_job;
46pub mod system_param;
47pub mod user;
48pub mod utils;
49
50// todo: refine the error transform.
51impl From<sea_orm::DbErr> for MetaError {
52    fn from(err: sea_orm::DbErr) -> Self {
53        if let Some(err) = err.sql_err() {
54            return anyhow!(err).into();
55        }
56        anyhow!(err).into()
57    }
58}
59
60#[derive(Clone)]
61pub struct SqlMetaStore {
62    pub conn: DatabaseConnection,
63    pub endpoint: String,
64}
65
66impl SqlMetaStore {
67    /// Connect to the SQL meta store based on the given configuration.
68    pub async fn connect(backend: MetaStoreBackend) -> MetaResult<Self> {
69        const MAX_DURATION: Duration = Duration::new(u64::MAX / 4, 0);
70
71        #[easy_ext::ext]
72        impl ConnectOptions {
73            /// Apply common settings for `SQLite` connections.
74            fn sqlite_common(&mut self) -> &mut Self {
75                self
76                    // Since Sqlite is prone to the error "(code: 5) database is locked" under concurrent access,
77                    // here we forcibly specify the number of connections as 1.
78                    .min_connections(1)
79                    .max_connections(1)
80                    // Workaround for https://github.com/risingwavelabs/risingwave/issues/18966.
81                    // Note: don't quite get the point but `acquire_timeout` and `connect_timeout` maps to the
82                    //       same underlying setting in `sqlx` under current implementation.
83                    .acquire_timeout(MAX_DURATION)
84                    .connect_timeout(MAX_DURATION)
85            }
86        }
87
88        Ok(match backend {
89            MetaStoreBackend::Mem => {
90                const IN_MEMORY_STORE: &str = "sqlite::memory:";
91
92                let mut options = ConnectOptions::new(IN_MEMORY_STORE);
93
94                options
95                    .sqlite_common()
96                    // Releasing the connection to in-memory SQLite database is unacceptable
97                    // because it will clear the database. Set a large enough timeout to prevent it.
98                    // `sqlx` actually supports disabling these timeouts by passing a `None`, but
99                    // `sea-orm` does not expose this option.
100                    .idle_timeout(MAX_DURATION)
101                    .max_lifetime(MAX_DURATION);
102
103                let conn = sea_orm::Database::connect(options).await?;
104                Self {
105                    conn,
106                    endpoint: IN_MEMORY_STORE.to_owned(),
107                }
108            }
109            MetaStoreBackend::Sql { endpoint, config } => {
110                let mut options = ConnectOptions::new(endpoint.clone());
111                options
112                    .max_connections(config.max_connections)
113                    .min_connections(config.min_connections)
114                    .connect_timeout(Duration::from_secs(config.connection_timeout_sec))
115                    .idle_timeout(Duration::from_secs(config.idle_timeout_sec))
116                    .acquire_timeout(Duration::from_secs(config.acquire_timeout_sec));
117
118                if DbBackend::Sqlite.is_prefix_of(&endpoint) {
119                    if endpoint.contains(":memory:") || endpoint.contains("mode=memory") {
120                        bail!(
121                            "use the `mem` backend instead of specifying a URL of in-memory SQLite"
122                        );
123                    }
124                    options.sqlite_common();
125                }
126
127                let conn = sea_orm::Database::connect(options).await?;
128                Self { conn, endpoint }
129            }
130        })
131    }
132
133    #[cfg(any(test, feature = "test"))]
134    pub async fn for_test() -> Self {
135        let this = Self::connect(MetaStoreBackend::Mem).await.unwrap();
136        Migrator::up(&this.conn, None).await.unwrap();
137        this
138    }
139
140    /// Check whether the cluster, which uses SQL as the backend, is a new cluster.
141    /// It determines this by inspecting the applied migrations. If the migration `m20230908_072257_init` has been applied,
142    /// then it is considered an old cluster.
143    ///
144    /// Note: this check should be performed before [`Self::up()`].
145    async fn is_first_launch(&self) -> MetaResult<bool> {
146        let migrations = Migrator::get_applied_migrations(&self.conn)
147            .await
148            .context("failed to get applied migrations")?;
149        for migration in migrations {
150            if migration.name() == "m20230908_072257_init"
151                && migration.status() == MigrationStatus::Applied
152            {
153                return Ok(false);
154            }
155        }
156        Ok(true)
157    }
158
159    /// Apply all the migrations to the meta store before starting the service.
160    ///
161    /// Returns whether the cluster is the first launch.
162    pub async fn up(&self) -> MetaResult<bool> {
163        let cluster_first_launch = self.is_first_launch().await?;
164        // Try to upgrade if any new model changes are added.
165        Migrator::up(&self.conn, None)
166            .await
167            .context("failed to upgrade models in meta store")?;
168
169        Ok(cluster_first_launch)
170    }
171}
172
173pub struct ObjectModel<M: ModelTrait>(M, object::Model);
174
175impl From<ObjectModel<database::Model>> for PbDatabase {
176    fn from(value: ObjectModel<database::Model>) -> Self {
177        Self {
178            id: value.0.database_id,
179            name: value.0.name,
180            owner: value.1.owner_id as _,
181            resource_group: value.0.resource_group.clone(),
182            barrier_interval_ms: value.0.barrier_interval_ms.map(|v| v as u32),
183            checkpoint_frequency: value.0.checkpoint_frequency.map(|v| v as u64),
184        }
185    }
186}
187
188impl From<ObjectModel<secret::Model>> for PbSecret {
189    fn from(value: ObjectModel<secret::Model>) -> Self {
190        Self {
191            id: value.0.secret_id,
192            name: value.0.name,
193            database_id: value.1.database_id.unwrap(),
194            value: value.0.value,
195            owner: value.1.owner_id as _,
196            schema_id: value.1.schema_id.unwrap(),
197        }
198    }
199}
200
201impl From<ObjectModel<schema::Model>> for PbSchema {
202    fn from(value: ObjectModel<schema::Model>) -> Self {
203        Self {
204            id: value.0.schema_id,
205            name: value.0.name,
206            database_id: value.1.database_id.unwrap(),
207            owner: value.1.owner_id as _,
208        }
209    }
210}
211
212impl From<ObjectModel<table::Model>> for PbTable {
213    fn from(value: ObjectModel<table::Model>) -> Self {
214        Self {
215            id: value.0.table_id,
216            schema_id: value.1.schema_id.unwrap(),
217            database_id: value.1.database_id.unwrap(),
218            name: value.0.name,
219            columns: value.0.columns.to_protobuf(),
220            pk: value.0.pk.to_protobuf(),
221            table_type: PbTableType::from(value.0.table_type) as _,
222            distribution_key: value.0.distribution_key.0,
223            stream_key: value.0.stream_key.0,
224            append_only: value.0.append_only,
225            owner: value.1.owner_id as _,
226            fragment_id: value.0.fragment_id.unwrap_or_default(),
227            vnode_col_index: value.0.vnode_col_index.map(|index| index as _),
228            row_id_index: value.0.row_id_index.map(|index| index as _),
229            value_indices: value.0.value_indices.0,
230            definition: value.0.definition,
231            handle_pk_conflict_behavior: PbHandleConflictBehavior::from(
232                value.0.handle_pk_conflict_behavior,
233            ) as _,
234            version_column_indices: value
235                .0
236                .version_column_indices
237                .unwrap_or_default()
238                .0
239                .iter()
240                .map(|&idx| idx as u32)
241                .collect(),
242            read_prefix_len_hint: value.0.read_prefix_len_hint as _,
243            watermark_indices: value.0.watermark_indices.0,
244            dist_key_in_pk: value.0.dist_key_in_pk.0,
245            dml_fragment_id: value.0.dml_fragment_id,
246            cardinality: value
247                .0
248                .cardinality
249                .map(|cardinality| cardinality.to_protobuf()),
250            initialized_at_epoch: Some(
251                Epoch::from_unix_millis(value.1.initialized_at.and_utc().timestamp_millis() as _).0,
252            ),
253            created_at_epoch: Some(
254                Epoch::from_unix_millis(value.1.created_at.and_utc().timestamp_millis() as _).0,
255            ),
256            cleaned_by_watermark: value.0.cleaned_by_watermark,
257            stream_job_status: PbStreamJobStatus::Created as _,
258            create_type: PbCreateType::Foreground as _,
259            version: value.0.version.map(|v| v.to_protobuf()),
260            optional_associated_source_id: value.0.optional_associated_source_id.map(Into::into),
261            description: value.0.description,
262            #[expect(deprecated)]
263            incoming_sinks: vec![],
264            initialized_at_cluster_version: value.1.initialized_at_cluster_version,
265            created_at_cluster_version: value.1.created_at_cluster_version,
266            retention_seconds: value.0.retention_seconds.map(|id| id as u32),
267            cdc_table_id: value.0.cdc_table_id,
268            maybe_vnode_count: VnodeCount::set(value.0.vnode_count).to_protobuf(),
269            webhook_info: value.0.webhook_info.map(|info| info.to_protobuf()),
270            job_id: value.0.belongs_to_job_id,
271            engine: value.0.engine.map(|engine| PbEngine::from(engine) as i32),
272            #[expect(deprecated)]
273            clean_watermark_index_in_pk: value.0.clean_watermark_index_in_pk,
274            clean_watermark_indices: value
275                .0
276                .clean_watermark_indices
277                .map(|indices| indices.0.iter().map(|&x| x as u32).collect())
278                .unwrap_or_default(),
279            refreshable: value.0.refreshable,
280            vector_index_info: value.0.vector_index_info.map(|index| index.to_protobuf()),
281            cdc_table_type: value
282                .0
283                .cdc_table_type
284                .map(|cdc_type| PbCdcTableType::from(cdc_type) as i32),
285        }
286    }
287}
288
289impl From<ObjectModel<source::Model>> for PbSource {
290    fn from(value: ObjectModel<source::Model>) -> Self {
291        let mut secret_ref_map = BTreeMap::new();
292        if let Some(secret_ref) = value.0.secret_ref {
293            secret_ref_map = secret_ref.to_protobuf();
294        }
295        Self {
296            id: value.0.source_id as _,
297            schema_id: value.1.schema_id.unwrap(),
298            database_id: value.1.database_id.unwrap(),
299            name: value.0.name,
300            row_id_index: value.0.row_id_index.map(|id| id as _),
301            columns: value.0.columns.to_protobuf(),
302            pk_column_ids: value.0.pk_column_ids.0,
303            with_properties: value.0.with_properties.0,
304            owner: value.1.owner_id as _,
305            info: value.0.source_info.map(|info| info.to_protobuf()),
306            watermark_descs: value.0.watermark_descs.to_protobuf(),
307            definition: value.0.definition,
308            connection_id: value.0.connection_id,
309            // todo: using the timestamp from the database directly.
310            initialized_at_epoch: Some(
311                Epoch::from_unix_millis(value.1.initialized_at.and_utc().timestamp_millis() as _).0,
312            ),
313            created_at_epoch: Some(
314                Epoch::from_unix_millis(value.1.created_at.and_utc().timestamp_millis() as _).0,
315            ),
316            version: value.0.version as _,
317            optional_associated_table_id: value.0.optional_associated_table_id.map(Into::into),
318            initialized_at_cluster_version: value.1.initialized_at_cluster_version,
319            created_at_cluster_version: value.1.created_at_cluster_version,
320            secret_refs: secret_ref_map,
321            rate_limit: value.0.rate_limit.map(|v| v as _),
322            refresh_mode: value
323                .0
324                .refresh_mode
325                .map(|refresh_mode| refresh_mode.to_protobuf()),
326        }
327    }
328}
329
330impl From<ObjectModel<sink::Model>> for PbSink {
331    fn from(value: ObjectModel<sink::Model>) -> Self {
332        let mut secret_ref_map = BTreeMap::new();
333        if let Some(secret_ref) = value.0.secret_ref {
334            secret_ref_map = secret_ref.to_protobuf();
335        }
336        Self {
337            id: value.0.sink_id as _,
338            schema_id: value.1.schema_id.unwrap(),
339            database_id: value.1.database_id.unwrap(),
340            name: value.0.name,
341            columns: value.0.columns.to_protobuf(),
342            plan_pk: value.0.plan_pk.to_protobuf(),
343            distribution_key: value.0.distribution_key.0,
344            downstream_pk: value.0.downstream_pk.0,
345            sink_type: PbSinkType::from(value.0.sink_type) as _,
346            owner: value.1.owner_id as _,
347            properties: value.0.properties.0,
348            definition: value.0.definition,
349            connection_id: value.0.connection_id,
350            initialized_at_epoch: Some(
351                Epoch::from_unix_millis(value.1.initialized_at.and_utc().timestamp_millis() as _).0,
352            ),
353            created_at_epoch: Some(
354                Epoch::from_unix_millis(value.1.created_at.and_utc().timestamp_millis() as _).0,
355            ),
356            db_name: value.0.db_name,
357            sink_from_name: value.0.sink_from_name,
358            stream_job_status: PbStreamJobStatus::Created as _,
359            format_desc: value.0.sink_format_desc.map(|desc| desc.to_protobuf()),
360            target_table: value.0.target_table,
361            initialized_at_cluster_version: value.1.initialized_at_cluster_version,
362            created_at_cluster_version: value.1.created_at_cluster_version,
363            create_type: PbCreateType::Foreground as _,
364            secret_refs: secret_ref_map,
365            original_target_columns: value
366                .0
367                .original_target_columns
368                .map(|cols| cols.to_protobuf())
369                .unwrap_or_default(),
370            auto_refresh_schema_from_table: value.0.auto_refresh_schema_from_table,
371        }
372    }
373}
374
375impl From<ObjectModel<subscription::Model>> for PbSubscription {
376    fn from(value: ObjectModel<subscription::Model>) -> Self {
377        Self {
378            id: value.0.subscription_id as _,
379            schema_id: value.1.schema_id.unwrap(),
380            database_id: value.1.database_id.unwrap(),
381            name: value.0.name,
382            owner: value.1.owner_id as _,
383            retention_seconds: value.0.retention_seconds as _,
384            definition: value.0.definition,
385            initialized_at_epoch: Some(
386                Epoch::from_unix_millis(value.1.initialized_at.and_utc().timestamp_millis() as _).0,
387            ),
388            created_at_epoch: Some(
389                Epoch::from_unix_millis(value.1.created_at.and_utc().timestamp_millis() as _).0,
390            ),
391            initialized_at_cluster_version: value.1.initialized_at_cluster_version,
392            created_at_cluster_version: value.1.created_at_cluster_version,
393            dependent_table_id: value.0.dependent_table_id,
394            subscription_state: value.0.subscription_state as _,
395        }
396    }
397}
398
399impl From<ObjectModel<index::Model>> for PbIndex {
400    fn from(value: ObjectModel<index::Model>) -> Self {
401        Self {
402            id: value.0.index_id as _,
403            schema_id: value.1.schema_id.unwrap(),
404            database_id: value.1.database_id.unwrap(),
405            name: value.0.name,
406            owner: value.1.owner_id as _,
407            index_table_id: value.0.index_table_id,
408            primary_table_id: value.0.primary_table_id,
409            index_item: value.0.index_items.to_protobuf(),
410            index_column_properties: value
411                .0
412                .index_column_properties
413                .map(|p| p.to_protobuf())
414                .unwrap_or_default(),
415            index_columns_len: value.0.index_columns_len as _,
416            initialized_at_epoch: Some(
417                Epoch::from_unix_millis(value.1.initialized_at.and_utc().timestamp_millis() as _).0,
418            ),
419            created_at_epoch: Some(
420                Epoch::from_unix_millis(value.1.created_at.and_utc().timestamp_millis() as _).0,
421            ),
422            stream_job_status: PbStreamJobStatus::Created as _,
423            initialized_at_cluster_version: value.1.initialized_at_cluster_version,
424            created_at_cluster_version: value.1.created_at_cluster_version,
425            create_type: risingwave_pb::catalog::CreateType::Foreground.into(), /* Default for existing indexes */
426        }
427    }
428}
429
430impl From<ObjectModel<view::Model>> for PbView {
431    fn from(value: ObjectModel<view::Model>) -> Self {
432        Self {
433            id: value.0.view_id as _,
434            schema_id: value.1.schema_id.unwrap(),
435            database_id: value.1.database_id.unwrap(),
436            name: value.0.name,
437            owner: value.1.owner_id as _,
438            properties: value.0.properties.0,
439            sql: value.0.definition,
440            columns: value.0.columns.to_protobuf(),
441            created_at_epoch: Some(
442                Epoch::from_unix_millis(value.1.created_at.and_utc().timestamp_millis() as _).0,
443            ),
444            created_at_cluster_version: value.1.created_at_cluster_version,
445        }
446    }
447}
448
449impl From<ObjectModel<connection::Model>> for PbConnection {
450    fn from(value: ObjectModel<connection::Model>) -> Self {
451        let info: PbConnectionInfo = if value.0.info == PrivateLinkService::default() {
452            PbConnectionInfo::ConnectionParams(value.0.params.to_protobuf())
453        } else {
454            PbConnectionInfo::PrivateLinkService(value.0.info.to_protobuf())
455        };
456        Self {
457            id: value.1.oid.as_connection_id(),
458            schema_id: value.1.schema_id.unwrap(),
459            database_id: value.1.database_id.unwrap(),
460            name: value.0.name,
461            owner: value.1.owner_id as _,
462            info: Some(info),
463        }
464    }
465}
466
467impl From<ObjectModel<function::Model>> for PbFunction {
468    fn from(value: ObjectModel<function::Model>) -> Self {
469        Self {
470            id: value.0.function_id as _,
471            schema_id: value.1.schema_id.unwrap(),
472            database_id: value.1.database_id.unwrap(),
473            name: value.0.name,
474            owner: value.1.owner_id as _,
475            arg_names: value.0.arg_names.split(',').map(|s| s.to_owned()).collect(),
476            arg_types: value.0.arg_types.to_protobuf(),
477            return_type: Some(value.0.return_type.to_protobuf()),
478            language: value.0.language,
479            runtime: value.0.runtime,
480            link: value.0.link,
481            name_in_runtime: value.0.name_in_runtime,
482            body: value.0.body,
483            compressed_binary: value.0.compressed_binary,
484            kind: Some(value.0.kind.into()),
485            always_retry_on_network_error: value.0.always_retry_on_network_error,
486            is_async: value
487                .0
488                .options
489                .as_ref()
490                .and_then(|o| o.0.get("async").map(|v| v == "true")),
491            is_batched: value
492                .0
493                .options
494                .as_ref()
495                .and_then(|o| o.0.get("batch").map(|v| v == "true")),
496            created_at_epoch: Some(
497                Epoch::from_unix_millis(value.1.created_at.and_utc().timestamp_millis() as _).0,
498            ),
499            created_at_cluster_version: value.1.created_at_cluster_version,
500        }
501    }
502}