risingwave_meta/controller/catalog/
create_op.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use super::*;
16use crate::barrier::SnapshotBackfillInfo;
17
18impl CatalogController {
19    pub(crate) async fn create_object(
20        txn: &DatabaseTransaction,
21        obj_type: ObjectType,
22        owner_id: UserId,
23        database_id: Option<DatabaseId>,
24        schema_id: Option<SchemaId>,
25    ) -> MetaResult<object::Model> {
26        let active_db = object::ActiveModel {
27            oid: Default::default(),
28            obj_type: Set(obj_type),
29            owner_id: Set(owner_id),
30            schema_id: Set(schema_id),
31            database_id: Set(database_id),
32            initialized_at: Default::default(),
33            created_at: Default::default(),
34            initialized_at_cluster_version: Set(Some(current_cluster_version())),
35            created_at_cluster_version: Set(Some(current_cluster_version())),
36        };
37        Ok(active_db.insert(txn).await?)
38    }
39
40    pub async fn create_database(&self, db: PbDatabase) -> MetaResult<NotificationVersion> {
41        let inner = self.inner.write().await;
42        let owner_id = db.owner as _;
43        let txn = inner.db.begin().await?;
44        ensure_user_id(owner_id, &txn).await?;
45        check_database_name_duplicate(&db.name, &txn).await?;
46
47        let db_obj = Self::create_object(&txn, ObjectType::Database, owner_id, None, None).await?;
48        let mut db: database::ActiveModel = db.into();
49        db.database_id = Set(db_obj.oid);
50        let db = db.insert(&txn).await?;
51
52        let mut schemas = vec![];
53        for schema_name in iter::once(DEFAULT_SCHEMA_NAME).chain(SYSTEM_SCHEMAS) {
54            let schema_obj =
55                Self::create_object(&txn, ObjectType::Schema, owner_id, Some(db_obj.oid), None)
56                    .await?;
57            let schema = schema::ActiveModel {
58                schema_id: Set(schema_obj.oid),
59                name: Set(schema_name.into()),
60            };
61            let schema = schema.insert(&txn).await?;
62            schemas.push(ObjectModel(schema, schema_obj).into());
63        }
64        txn.commit().await?;
65
66        let mut version = self
67            .notify_frontend(
68                NotificationOperation::Add,
69                NotificationInfo::Database(ObjectModel(db, db_obj).into()),
70            )
71            .await;
72        for schema in schemas {
73            version = self
74                .notify_frontend(NotificationOperation::Add, NotificationInfo::Schema(schema))
75                .await;
76        }
77
78        Ok(version)
79    }
80
81    pub async fn create_schema(&self, schema: PbSchema) -> MetaResult<NotificationVersion> {
82        let inner = self.inner.write().await;
83        let owner_id = schema.owner as _;
84        let txn = inner.db.begin().await?;
85        ensure_user_id(owner_id, &txn).await?;
86        ensure_object_id(ObjectType::Database, schema.database_id as _, &txn).await?;
87        check_schema_name_duplicate(&schema.name, schema.database_id as _, &txn).await?;
88
89        let schema_obj = Self::create_object(
90            &txn,
91            ObjectType::Schema,
92            owner_id,
93            Some(schema.database_id as _),
94            None,
95        )
96        .await?;
97        let mut schema: schema::ActiveModel = schema.into();
98        schema.schema_id = Set(schema_obj.oid);
99        let schema = schema.insert(&txn).await?;
100        txn.commit().await?;
101
102        let version = self
103            .notify_frontend(
104                NotificationOperation::Add,
105                NotificationInfo::Schema(ObjectModel(schema, schema_obj).into()),
106            )
107            .await;
108        Ok(version)
109    }
110
111    pub async fn create_subscription_catalog(
112        &self,
113        pb_subscription: &mut PbSubscription,
114    ) -> MetaResult<()> {
115        let inner = self.inner.write().await;
116        let txn = inner.db.begin().await?;
117
118        ensure_user_id(pb_subscription.owner as _, &txn).await?;
119        ensure_object_id(ObjectType::Database, pb_subscription.database_id as _, &txn).await?;
120        ensure_object_id(ObjectType::Schema, pb_subscription.schema_id as _, &txn).await?;
121        check_subscription_name_duplicate(pb_subscription, &txn).await?;
122
123        let obj = Self::create_object(
124            &txn,
125            ObjectType::Subscription,
126            pb_subscription.owner as _,
127            Some(pb_subscription.database_id as _),
128            Some(pb_subscription.schema_id as _),
129        )
130        .await?;
131        pb_subscription.id = obj.oid as _;
132        let subscription: subscription::ActiveModel = pb_subscription.clone().into();
133        Subscription::insert(subscription).exec(&txn).await?;
134
135        // record object dependency.
136        ObjectDependency::insert(object_dependency::ActiveModel {
137            oid: Set(pb_subscription.dependent_table_id as _),
138            used_by: Set(pb_subscription.id as _),
139            ..Default::default()
140        })
141        .exec(&txn)
142        .await?;
143        txn.commit().await?;
144        Ok(())
145    }
146
147    pub async fn create_source(
148        &self,
149        mut pb_source: PbSource,
150    ) -> MetaResult<(SourceId, NotificationVersion)> {
151        let inner = self.inner.write().await;
152        let owner_id = pb_source.owner as _;
153        let txn = inner.db.begin().await?;
154        ensure_user_id(owner_id, &txn).await?;
155        ensure_object_id(ObjectType::Database, pb_source.database_id as _, &txn).await?;
156        ensure_object_id(ObjectType::Schema, pb_source.schema_id as _, &txn).await?;
157        check_relation_name_duplicate(
158            &pb_source.name,
159            pb_source.database_id as _,
160            pb_source.schema_id as _,
161            &txn,
162        )
163        .await?;
164
165        // handle secret ref
166        let secret_ids = get_referred_secret_ids_from_source(&pb_source)?;
167        let connection_ids = get_referred_connection_ids_from_source(&pb_source);
168
169        let source_obj = Self::create_object(
170            &txn,
171            ObjectType::Source,
172            owner_id,
173            Some(pb_source.database_id as _),
174            Some(pb_source.schema_id as _),
175        )
176        .await?;
177        let source_id = source_obj.oid;
178        pb_source.id = source_id as _;
179        let source: source::ActiveModel = pb_source.clone().into();
180        Source::insert(source).exec(&txn).await?;
181
182        // add secret dependency
183        let dep_relation_ids = secret_ids.iter().chain(connection_ids.iter());
184        if !secret_ids.is_empty() || !connection_ids.is_empty() {
185            ObjectDependency::insert_many(dep_relation_ids.map(|id| {
186                object_dependency::ActiveModel {
187                    oid: Set(*id as _),
188                    used_by: Set(source_id as _),
189                    ..Default::default()
190                }
191            }))
192            .exec(&txn)
193            .await?;
194        }
195
196        txn.commit().await?;
197
198        let version = self
199            .notify_frontend_relation_info(
200                NotificationOperation::Add,
201                PbObjectInfo::Source(pb_source),
202            )
203            .await;
204        Ok((source_id, version))
205    }
206
207    pub async fn create_function(
208        &self,
209        mut pb_function: PbFunction,
210    ) -> MetaResult<NotificationVersion> {
211        let inner = self.inner.write().await;
212        let owner_id = pb_function.owner as _;
213        let txn = inner.db.begin().await?;
214        ensure_user_id(owner_id, &txn).await?;
215        ensure_object_id(ObjectType::Database, pb_function.database_id as _, &txn).await?;
216        ensure_object_id(ObjectType::Schema, pb_function.schema_id as _, &txn).await?;
217        check_function_signature_duplicate(&pb_function, &txn).await?;
218
219        let function_obj = Self::create_object(
220            &txn,
221            ObjectType::Function,
222            owner_id,
223            Some(pb_function.database_id as _),
224            Some(pb_function.schema_id as _),
225        )
226        .await?;
227        pb_function.id = function_obj.oid as _;
228        let function: function::ActiveModel = pb_function.clone().into();
229        Function::insert(function).exec(&txn).await?;
230        txn.commit().await?;
231
232        let version = self
233            .notify_frontend(
234                NotificationOperation::Add,
235                NotificationInfo::Function(pb_function),
236            )
237            .await;
238        Ok(version)
239    }
240
241    pub async fn create_connection(
242        &self,
243        mut pb_connection: PbConnection,
244    ) -> MetaResult<NotificationVersion> {
245        let inner = self.inner.write().await;
246        let owner_id = pb_connection.owner as _;
247        let txn = inner.db.begin().await?;
248        ensure_user_id(owner_id, &txn).await?;
249        ensure_object_id(ObjectType::Database, pb_connection.database_id as _, &txn).await?;
250        ensure_object_id(ObjectType::Schema, pb_connection.schema_id as _, &txn).await?;
251        check_connection_name_duplicate(&pb_connection, &txn).await?;
252
253        let mut dep_secrets = HashSet::new();
254        if let Some(ConnectionInfo::ConnectionParams(params)) = &pb_connection.info {
255            dep_secrets.extend(
256                params
257                    .secret_refs
258                    .values()
259                    .map(|secret_ref| secret_ref.secret_id),
260            );
261        }
262
263        let conn_obj = Self::create_object(
264            &txn,
265            ObjectType::Connection,
266            owner_id,
267            Some(pb_connection.database_id as _),
268            Some(pb_connection.schema_id as _),
269        )
270        .await?;
271        pb_connection.id = conn_obj.oid as _;
272        let connection: connection::ActiveModel = pb_connection.clone().into();
273        Connection::insert(connection).exec(&txn).await?;
274
275        for secret_id in dep_secrets {
276            ObjectDependency::insert(object_dependency::ActiveModel {
277                oid: Set(secret_id as _),
278                used_by: Set(conn_obj.oid),
279                ..Default::default()
280            })
281            .exec(&txn)
282            .await?;
283        }
284
285        txn.commit().await?;
286
287        {
288            // call meta telemetry here to report the connection creation
289            report_event(
290                PbTelemetryEventStage::Unspecified,
291                "connection_create",
292                pb_connection.get_id() as _,
293                {
294                    pb_connection.info.as_ref().and_then(|info| match info {
295                        ConnectionInfo::ConnectionParams(params) => {
296                            Some(params.connection_type().as_str_name().to_owned())
297                        }
298                        _ => None,
299                    })
300                },
301                None,
302                None,
303            );
304        }
305
306        let version = self
307            .notify_frontend(
308                NotificationOperation::Add,
309                NotificationInfo::Connection(pb_connection),
310            )
311            .await;
312        Ok(version)
313    }
314
315    pub async fn create_secret(
316        &self,
317        mut pb_secret: PbSecret,
318        secret_plain_payload: Vec<u8>,
319    ) -> MetaResult<NotificationVersion> {
320        let inner = self.inner.write().await;
321        let owner_id = pb_secret.owner as _;
322        let txn = inner.db.begin().await?;
323        ensure_user_id(owner_id, &txn).await?;
324        ensure_object_id(ObjectType::Database, pb_secret.database_id as _, &txn).await?;
325        ensure_object_id(ObjectType::Schema, pb_secret.schema_id as _, &txn).await?;
326        check_secret_name_duplicate(&pb_secret, &txn).await?;
327
328        let secret_obj = Self::create_object(
329            &txn,
330            ObjectType::Secret,
331            owner_id,
332            Some(pb_secret.database_id as _),
333            Some(pb_secret.schema_id as _),
334        )
335        .await?;
336        pb_secret.id = secret_obj.oid as _;
337        let secret: secret::ActiveModel = pb_secret.clone().into();
338        Secret::insert(secret).exec(&txn).await?;
339
340        txn.commit().await?;
341
342        // Notify the compute and frontend node plain secret
343        let mut secret_plain = pb_secret;
344        secret_plain.value.clone_from(&secret_plain_payload);
345
346        LocalSecretManager::global().add_secret(secret_plain.id, secret_plain_payload);
347        self.env
348            .notification_manager()
349            .notify_compute_without_version(Operation::Add, Info::Secret(secret_plain.clone()));
350
351        let version = self
352            .notify_frontend(
353                NotificationOperation::Add,
354                NotificationInfo::Secret(secret_plain),
355            )
356            .await;
357
358        Ok(version)
359    }
360
361    pub async fn create_view(&self, mut pb_view: PbView) -> MetaResult<NotificationVersion> {
362        let inner = self.inner.write().await;
363        let owner_id = pb_view.owner as _;
364        let txn = inner.db.begin().await?;
365        ensure_user_id(owner_id, &txn).await?;
366        ensure_object_id(ObjectType::Database, pb_view.database_id as _, &txn).await?;
367        ensure_object_id(ObjectType::Schema, pb_view.schema_id as _, &txn).await?;
368        check_relation_name_duplicate(
369            &pb_view.name,
370            pb_view.database_id as _,
371            pb_view.schema_id as _,
372            &txn,
373        )
374        .await?;
375
376        let view_obj = Self::create_object(
377            &txn,
378            ObjectType::View,
379            owner_id,
380            Some(pb_view.database_id as _),
381            Some(pb_view.schema_id as _),
382        )
383        .await?;
384        pb_view.id = view_obj.oid as _;
385        let view: view::ActiveModel = pb_view.clone().into();
386        View::insert(view).exec(&txn).await?;
387
388        // todo: change `dependent_relations` to `dependent_objects`, which should includes connection and function as well.
389        // todo: shall we need to check existence of them Or let database handle it by FOREIGN KEY constraint.
390        for obj_id in &pb_view.dependent_relations {
391            ObjectDependency::insert(object_dependency::ActiveModel {
392                oid: Set(*obj_id as _),
393                used_by: Set(view_obj.oid),
394                ..Default::default()
395            })
396            .exec(&txn)
397            .await?;
398        }
399
400        txn.commit().await?;
401
402        let version = self
403            .notify_frontend_relation_info(NotificationOperation::Add, PbObjectInfo::View(pb_view))
404            .await;
405        Ok(version)
406    }
407
408    pub async fn validate_cross_db_snapshot_backfill(
409        &self,
410        cross_db_snapshot_backfill_info: &SnapshotBackfillInfo,
411    ) -> MetaResult<()> {
412        if cross_db_snapshot_backfill_info
413            .upstream_mv_table_id_to_backfill_epoch
414            .is_empty()
415        {
416            return Ok(());
417        }
418
419        let inner = self.inner.read().await;
420        let table_ids = cross_db_snapshot_backfill_info
421            .upstream_mv_table_id_to_backfill_epoch
422            .keys()
423            .map(|t| t.table_id as ObjectId)
424            .collect_vec();
425        let cnt = Subscription::find()
426            .select_only()
427            .column(subscription::Column::DependentTableId)
428            .distinct()
429            .filter(subscription::Column::DependentTableId.is_in(table_ids))
430            .count(&inner.db)
431            .await? as usize;
432
433        if cnt
434            < cross_db_snapshot_backfill_info
435                .upstream_mv_table_id_to_backfill_epoch
436                .keys()
437                .count()
438        {
439            return Err(MetaError::permission_denied(
440                "Some upstream tables are not subscribed".to_owned(),
441            ));
442        }
443
444        Ok(())
445    }
446}