risingwave_meta/controller/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::BTreeMap;
16use std::time::Duration;
17
18use anyhow::{Context, anyhow};
19use risingwave_common::bail;
20use risingwave_common::hash::VnodeCount;
21use risingwave_common::util::epoch::Epoch;
22use risingwave_meta_model::{
23    PrivateLinkService, connection, database, function, index, object, schema, secret, sink,
24    source, subscription, table, view,
25};
26use risingwave_meta_model_migration::{MigrationStatus, Migrator, MigratorTrait};
27use risingwave_pb::catalog::connection::PbInfo as PbConnectionInfo;
28use risingwave_pb::catalog::source::PbOptionalAssociatedTableId;
29use risingwave_pb::catalog::table::{
30    CdcTableType as PbCdcTableType, PbEngine, PbOptionalAssociatedSourceId, PbTableType,
31};
32use risingwave_pb::catalog::{
33    PbConnection, PbCreateType, PbDatabase, PbFunction, PbHandleConflictBehavior, PbIndex,
34    PbSchema, PbSecret, PbSink, PbSinkType, PbSource, PbStreamJobStatus, PbSubscription, PbTable,
35    PbView,
36};
37use sea_orm::{ConnectOptions, DatabaseConnection, DbBackend, ModelTrait};
38
39use crate::{MetaError, MetaResult, MetaStoreBackend};
40
41pub mod catalog;
42pub mod cluster;
43pub mod fragment;
44pub mod id;
45pub mod rename;
46pub mod scale;
47pub mod session_params;
48pub mod streaming_job;
49pub mod system_param;
50pub mod user;
51pub mod utils;
52
53// todo: refine the error transform.
54impl From<sea_orm::DbErr> for MetaError {
55    fn from(err: sea_orm::DbErr) -> Self {
56        if let Some(err) = err.sql_err() {
57            return anyhow!(err).into();
58        }
59        anyhow!(err).into()
60    }
61}
62
63#[derive(Clone)]
64pub struct SqlMetaStore {
65    pub conn: DatabaseConnection,
66    pub endpoint: String,
67}
68
69impl SqlMetaStore {
70    /// Connect to the SQL meta store based on the given configuration.
71    pub async fn connect(backend: MetaStoreBackend) -> MetaResult<Self> {
72        const MAX_DURATION: Duration = Duration::new(u64::MAX / 4, 0);
73
74        #[easy_ext::ext]
75        impl ConnectOptions {
76            /// Apply common settings for `SQLite` connections.
77            fn sqlite_common(&mut self) -> &mut Self {
78                self
79                    // Since Sqlite is prone to the error "(code: 5) database is locked" under concurrent access,
80                    // here we forcibly specify the number of connections as 1.
81                    .min_connections(1)
82                    .max_connections(1)
83                    // Workaround for https://github.com/risingwavelabs/risingwave/issues/18966.
84                    // Note: don't quite get the point but `acquire_timeout` and `connect_timeout` maps to the
85                    //       same underlying setting in `sqlx` under current implementation.
86                    .acquire_timeout(MAX_DURATION)
87                    .connect_timeout(MAX_DURATION)
88            }
89        }
90
91        Ok(match backend {
92            MetaStoreBackend::Mem => {
93                const IN_MEMORY_STORE: &str = "sqlite::memory:";
94
95                let mut options = ConnectOptions::new(IN_MEMORY_STORE);
96
97                options
98                    .sqlite_common()
99                    // Releasing the connection to in-memory SQLite database is unacceptable
100                    // because it will clear the database. Set a large enough timeout to prevent it.
101                    // `sqlx` actually supports disabling these timeouts by passing a `None`, but
102                    // `sea-orm` does not expose this option.
103                    .idle_timeout(MAX_DURATION)
104                    .max_lifetime(MAX_DURATION);
105
106                let conn = sea_orm::Database::connect(options).await?;
107                Self {
108                    conn,
109                    endpoint: IN_MEMORY_STORE.to_owned(),
110                }
111            }
112            MetaStoreBackend::Sql { endpoint, config } => {
113                let mut options = ConnectOptions::new(endpoint.clone());
114                options
115                    .max_connections(config.max_connections)
116                    .min_connections(config.min_connections)
117                    .connect_timeout(Duration::from_secs(config.connection_timeout_sec))
118                    .idle_timeout(Duration::from_secs(config.idle_timeout_sec))
119                    .acquire_timeout(Duration::from_secs(config.acquire_timeout_sec));
120
121                if DbBackend::Sqlite.is_prefix_of(&endpoint) {
122                    if endpoint.contains(":memory:") || endpoint.contains("mode=memory") {
123                        bail!(
124                            "use the `mem` backend instead of specifying a URL of in-memory SQLite"
125                        );
126                    }
127                    options.sqlite_common();
128                }
129
130                let conn = sea_orm::Database::connect(options).await?;
131                Self { conn, endpoint }
132            }
133        })
134    }
135
136    #[cfg(any(test, feature = "test"))]
137    pub async fn for_test() -> Self {
138        let this = Self::connect(MetaStoreBackend::Mem).await.unwrap();
139        Migrator::up(&this.conn, None).await.unwrap();
140        this
141    }
142
143    /// Check whether the cluster, which uses SQL as the backend, is a new cluster.
144    /// It determines this by inspecting the applied migrations. If the migration `m20230908_072257_init` has been applied,
145    /// then it is considered an old cluster.
146    ///
147    /// Note: this check should be performed before [`Self::up()`].
148    async fn is_first_launch(&self) -> MetaResult<bool> {
149        let migrations = Migrator::get_applied_migrations(&self.conn)
150            .await
151            .context("failed to get applied migrations")?;
152        for migration in migrations {
153            if migration.name() == "m20230908_072257_init"
154                && migration.status() == MigrationStatus::Applied
155            {
156                return Ok(false);
157            }
158        }
159        Ok(true)
160    }
161
162    /// Apply all the migrations to the meta store before starting the service.
163    ///
164    /// Returns whether the cluster is the first launch.
165    pub async fn up(&self) -> MetaResult<bool> {
166        let cluster_first_launch = self.is_first_launch().await?;
167        // Try to upgrade if any new model changes are added.
168        Migrator::up(&self.conn, None)
169            .await
170            .context("failed to upgrade models in meta store")?;
171
172        Ok(cluster_first_launch)
173    }
174}
175
176pub struct ObjectModel<M: ModelTrait>(M, object::Model);
177
178impl From<ObjectModel<database::Model>> for PbDatabase {
179    fn from(value: ObjectModel<database::Model>) -> Self {
180        Self {
181            id: value.0.database_id as _,
182            name: value.0.name,
183            owner: value.1.owner_id as _,
184            resource_group: value.0.resource_group.clone(),
185            barrier_interval_ms: value.0.barrier_interval_ms.map(|v| v as u32),
186            checkpoint_frequency: value.0.checkpoint_frequency.map(|v| v as u64),
187        }
188    }
189}
190
191impl From<ObjectModel<secret::Model>> for PbSecret {
192    fn from(value: ObjectModel<secret::Model>) -> Self {
193        Self {
194            id: value.0.secret_id as _,
195            name: value.0.name,
196            database_id: value.1.database_id.unwrap() as _,
197            value: value.0.value,
198            owner: value.1.owner_id as _,
199            schema_id: value.1.schema_id.unwrap() as _,
200        }
201    }
202}
203
204impl From<ObjectModel<schema::Model>> for PbSchema {
205    fn from(value: ObjectModel<schema::Model>) -> Self {
206        Self {
207            id: value.0.schema_id as _,
208            name: value.0.name,
209            database_id: value.1.database_id.unwrap() as _,
210            owner: value.1.owner_id as _,
211        }
212    }
213}
214
215impl From<ObjectModel<table::Model>> for PbTable {
216    fn from(value: ObjectModel<table::Model>) -> Self {
217        Self {
218            id: value.0.table_id as _,
219            schema_id: value.1.schema_id.unwrap() as _,
220            database_id: value.1.database_id.unwrap() as _,
221            name: value.0.name,
222            columns: value.0.columns.to_protobuf(),
223            pk: value.0.pk.to_protobuf(),
224            table_type: PbTableType::from(value.0.table_type) as _,
225            distribution_key: value.0.distribution_key.0,
226            stream_key: value.0.stream_key.0,
227            append_only: value.0.append_only,
228            owner: value.1.owner_id as _,
229            fragment_id: value.0.fragment_id.unwrap_or_default() as u32,
230            vnode_col_index: value.0.vnode_col_index.map(|index| index as _),
231            row_id_index: value.0.row_id_index.map(|index| index as _),
232            value_indices: value.0.value_indices.0,
233            definition: value.0.definition,
234            handle_pk_conflict_behavior: PbHandleConflictBehavior::from(
235                value.0.handle_pk_conflict_behavior,
236            ) as _,
237            version_column_indices: value
238                .0
239                .version_column_indices
240                .unwrap_or_default()
241                .0
242                .iter()
243                .map(|&idx| idx as u32)
244                .collect(),
245            read_prefix_len_hint: value.0.read_prefix_len_hint as _,
246            watermark_indices: value.0.watermark_indices.0,
247            dist_key_in_pk: value.0.dist_key_in_pk.0,
248            dml_fragment_id: value.0.dml_fragment_id.map(|id| id as u32),
249            cardinality: value
250                .0
251                .cardinality
252                .map(|cardinality| cardinality.to_protobuf()),
253            initialized_at_epoch: Some(
254                Epoch::from_unix_millis(value.1.initialized_at.and_utc().timestamp_millis() as _).0,
255            ),
256            created_at_epoch: Some(
257                Epoch::from_unix_millis(value.1.created_at.and_utc().timestamp_millis() as _).0,
258            ),
259            cleaned_by_watermark: value.0.cleaned_by_watermark,
260            stream_job_status: PbStreamJobStatus::Created as _,
261            create_type: PbCreateType::Foreground as _,
262            version: value.0.version.map(|v| v.to_protobuf()),
263            optional_associated_source_id: value
264                .0
265                .optional_associated_source_id
266                .map(|id| PbOptionalAssociatedSourceId::AssociatedSourceId(id as _)),
267            description: value.0.description,
268            #[expect(deprecated)]
269            incoming_sinks: vec![],
270            initialized_at_cluster_version: value.1.initialized_at_cluster_version,
271            created_at_cluster_version: value.1.created_at_cluster_version,
272            retention_seconds: value.0.retention_seconds.map(|id| id as u32),
273            cdc_table_id: value.0.cdc_table_id,
274            maybe_vnode_count: VnodeCount::set(value.0.vnode_count).to_protobuf(),
275            webhook_info: value.0.webhook_info.map(|info| info.to_protobuf()),
276            job_id: value.0.belongs_to_job_id.map(|id| id as _),
277            engine: value.0.engine.map(|engine| PbEngine::from(engine) as i32),
278            clean_watermark_index_in_pk: value.0.clean_watermark_index_in_pk,
279            refreshable: value.0.refreshable,
280            vector_index_info: value.0.vector_index_info.map(|index| index.to_protobuf()),
281            cdc_table_type: value
282                .0
283                .cdc_table_type
284                .map(|cdc_type| PbCdcTableType::from(cdc_type) as i32),
285            refresh_state: Some(risingwave_pb::catalog::RefreshState::from(
286                value
287                    .0
288                    .refresh_state
289                    .unwrap_or(risingwave_meta_model::table::RefreshState::Idle),
290            ) as i32),
291        }
292    }
293}
294
295impl From<ObjectModel<source::Model>> for PbSource {
296    fn from(value: ObjectModel<source::Model>) -> Self {
297        let mut secret_ref_map = BTreeMap::new();
298        if let Some(secret_ref) = value.0.secret_ref {
299            secret_ref_map = secret_ref.to_protobuf();
300        }
301        Self {
302            id: value.0.source_id as _,
303            schema_id: value.1.schema_id.unwrap() as _,
304            database_id: value.1.database_id.unwrap() as _,
305            name: value.0.name,
306            row_id_index: value.0.row_id_index.map(|id| id as _),
307            columns: value.0.columns.to_protobuf(),
308            pk_column_ids: value.0.pk_column_ids.0,
309            with_properties: value.0.with_properties.0,
310            owner: value.1.owner_id as _,
311            info: value.0.source_info.map(|info| info.to_protobuf()),
312            watermark_descs: value.0.watermark_descs.to_protobuf(),
313            definition: value.0.definition,
314            connection_id: value.0.connection_id.map(|id| id as _),
315            // todo: using the timestamp from the database directly.
316            initialized_at_epoch: Some(
317                Epoch::from_unix_millis(value.1.initialized_at.and_utc().timestamp_millis() as _).0,
318            ),
319            created_at_epoch: Some(
320                Epoch::from_unix_millis(value.1.created_at.and_utc().timestamp_millis() as _).0,
321            ),
322            version: value.0.version as _,
323            optional_associated_table_id: value
324                .0
325                .optional_associated_table_id
326                .map(|id| PbOptionalAssociatedTableId::AssociatedTableId(id as _)),
327            initialized_at_cluster_version: value.1.initialized_at_cluster_version,
328            created_at_cluster_version: value.1.created_at_cluster_version,
329            secret_refs: secret_ref_map,
330            rate_limit: value.0.rate_limit.map(|v| v as _),
331        }
332    }
333}
334
335impl From<ObjectModel<sink::Model>> for PbSink {
336    fn from(value: ObjectModel<sink::Model>) -> Self {
337        let mut secret_ref_map = BTreeMap::new();
338        if let Some(secret_ref) = value.0.secret_ref {
339            secret_ref_map = secret_ref.to_protobuf();
340        }
341        Self {
342            id: value.0.sink_id as _,
343            schema_id: value.1.schema_id.unwrap() as _,
344            database_id: value.1.database_id.unwrap() as _,
345            name: value.0.name,
346            columns: value.0.columns.to_protobuf(),
347            plan_pk: value.0.plan_pk.to_protobuf(),
348            distribution_key: value.0.distribution_key.0,
349            downstream_pk: value.0.downstream_pk.0,
350            sink_type: PbSinkType::from(value.0.sink_type) as _,
351            owner: value.1.owner_id as _,
352            properties: value.0.properties.0,
353            definition: value.0.definition,
354            connection_id: value.0.connection_id.map(|id| id as _),
355            initialized_at_epoch: Some(
356                Epoch::from_unix_millis(value.1.initialized_at.and_utc().timestamp_millis() as _).0,
357            ),
358            created_at_epoch: Some(
359                Epoch::from_unix_millis(value.1.created_at.and_utc().timestamp_millis() as _).0,
360            ),
361            db_name: value.0.db_name,
362            sink_from_name: value.0.sink_from_name,
363            stream_job_status: PbStreamJobStatus::Created as _,
364            format_desc: value.0.sink_format_desc.map(|desc| desc.to_protobuf()),
365            target_table: value.0.target_table.map(|id| id as _),
366            initialized_at_cluster_version: value.1.initialized_at_cluster_version,
367            created_at_cluster_version: value.1.created_at_cluster_version,
368            create_type: PbCreateType::Foreground as _,
369            secret_refs: secret_ref_map,
370            original_target_columns: value
371                .0
372                .original_target_columns
373                .map(|cols| cols.to_protobuf())
374                .unwrap_or_default(),
375            auto_refresh_schema_from_table: value
376                .0
377                .auto_refresh_schema_from_table
378                .map(|id| id as _),
379        }
380    }
381}
382
383impl From<ObjectModel<subscription::Model>> for PbSubscription {
384    fn from(value: ObjectModel<subscription::Model>) -> Self {
385        Self {
386            id: value.0.subscription_id as _,
387            schema_id: value.1.schema_id.unwrap() as _,
388            database_id: value.1.database_id.unwrap() as _,
389            name: value.0.name,
390            owner: value.1.owner_id as _,
391            retention_seconds: value.0.retention_seconds as _,
392            definition: value.0.definition,
393            initialized_at_epoch: Some(
394                Epoch::from_unix_millis(value.1.initialized_at.and_utc().timestamp_millis() as _).0,
395            ),
396            created_at_epoch: Some(
397                Epoch::from_unix_millis(value.1.created_at.and_utc().timestamp_millis() as _).0,
398            ),
399            initialized_at_cluster_version: value.1.initialized_at_cluster_version,
400            created_at_cluster_version: value.1.created_at_cluster_version,
401            dependent_table_id: value.0.dependent_table_id as _,
402            subscription_state: value.0.subscription_state as _,
403        }
404    }
405}
406
407impl From<ObjectModel<index::Model>> for PbIndex {
408    fn from(value: ObjectModel<index::Model>) -> Self {
409        Self {
410            id: value.0.index_id as _,
411            schema_id: value.1.schema_id.unwrap() as _,
412            database_id: value.1.database_id.unwrap() as _,
413            name: value.0.name,
414            owner: value.1.owner_id as _,
415            index_table_id: value.0.index_table_id as _,
416            primary_table_id: value.0.primary_table_id as _,
417            index_item: value.0.index_items.to_protobuf(),
418            index_column_properties: value
419                .0
420                .index_column_properties
421                .map(|p| p.to_protobuf())
422                .unwrap_or_default(),
423            index_columns_len: value.0.index_columns_len as _,
424            initialized_at_epoch: Some(
425                Epoch::from_unix_millis(value.1.initialized_at.and_utc().timestamp_millis() as _).0,
426            ),
427            created_at_epoch: Some(
428                Epoch::from_unix_millis(value.1.created_at.and_utc().timestamp_millis() as _).0,
429            ),
430            stream_job_status: PbStreamJobStatus::Created as _,
431            initialized_at_cluster_version: value.1.initialized_at_cluster_version,
432            created_at_cluster_version: value.1.created_at_cluster_version,
433            create_type: risingwave_pb::catalog::CreateType::Foreground.into(), /* Default for existing indexes */
434        }
435    }
436}
437
438impl From<ObjectModel<view::Model>> for PbView {
439    fn from(value: ObjectModel<view::Model>) -> Self {
440        Self {
441            id: value.0.view_id as _,
442            schema_id: value.1.schema_id.unwrap() as _,
443            database_id: value.1.database_id.unwrap() as _,
444            name: value.0.name,
445            owner: value.1.owner_id as _,
446            properties: value.0.properties.0,
447            sql: value.0.definition,
448            columns: value.0.columns.to_protobuf(),
449        }
450    }
451}
452
453impl From<ObjectModel<connection::Model>> for PbConnection {
454    fn from(value: ObjectModel<connection::Model>) -> Self {
455        let info: PbConnectionInfo = if value.0.info == PrivateLinkService::default() {
456            PbConnectionInfo::ConnectionParams(value.0.params.to_protobuf())
457        } else {
458            PbConnectionInfo::PrivateLinkService(value.0.info.to_protobuf())
459        };
460        Self {
461            id: value.1.oid as _,
462            schema_id: value.1.schema_id.unwrap() as _,
463            database_id: value.1.database_id.unwrap() as _,
464            name: value.0.name,
465            owner: value.1.owner_id as _,
466            info: Some(info),
467        }
468    }
469}
470
471impl From<ObjectModel<function::Model>> for PbFunction {
472    fn from(value: ObjectModel<function::Model>) -> Self {
473        Self {
474            id: value.0.function_id as _,
475            schema_id: value.1.schema_id.unwrap() as _,
476            database_id: value.1.database_id.unwrap() as _,
477            name: value.0.name,
478            owner: value.1.owner_id as _,
479            arg_names: value.0.arg_names.split(',').map(|s| s.to_owned()).collect(),
480            arg_types: value.0.arg_types.to_protobuf(),
481            return_type: Some(value.0.return_type.to_protobuf()),
482            language: value.0.language,
483            runtime: value.0.runtime,
484            link: value.0.link,
485            name_in_runtime: value.0.name_in_runtime,
486            body: value.0.body,
487            compressed_binary: value.0.compressed_binary,
488            kind: Some(value.0.kind.into()),
489            always_retry_on_network_error: value.0.always_retry_on_network_error,
490            is_async: value
491                .0
492                .options
493                .as_ref()
494                .and_then(|o| o.0.get("async").map(|v| v == "true")),
495            is_batched: value
496                .0
497                .options
498                .as_ref()
499                .and_then(|o| o.0.get("batch").map(|v| v == "true")),
500            created_at_epoch: Some(
501                Epoch::from_unix_millis(value.1.created_at.and_utc().timestamp_millis() as _).0,
502            ),
503            created_at_cluster_version: value.1.created_at_cluster_version,
504        }
505    }
506}