risingwave_meta/controller/catalog/
create_op.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::system_param::{OverrideValidate, Validate};
16
17use super::*;
18use crate::barrier::SnapshotBackfillInfo;
19
20impl CatalogController {
21    pub(crate) async fn create_object(
22        txn: &DatabaseTransaction,
23        obj_type: ObjectType,
24        owner_id: UserId,
25        database_id: Option<DatabaseId>,
26        schema_id: Option<SchemaId>,
27    ) -> MetaResult<object::Model> {
28        let active_db = object::ActiveModel {
29            oid: Default::default(),
30            obj_type: Set(obj_type),
31            owner_id: Set(owner_id),
32            schema_id: Set(schema_id),
33            database_id: Set(database_id),
34            initialized_at: Default::default(),
35            created_at: Default::default(),
36            initialized_at_cluster_version: Set(Some(current_cluster_version())),
37            created_at_cluster_version: Set(Some(current_cluster_version())),
38        };
39        Ok(active_db.insert(txn).await?)
40    }
41
42    pub async fn create_database(
43        &self,
44        db: PbDatabase,
45    ) -> MetaResult<(NotificationVersion, risingwave_meta_model::database::Model)> {
46        // validate first
47        if let Some(ref interval) = db.barrier_interval_ms {
48            OverrideValidate::barrier_interval_ms(interval).map_err(|e| anyhow::anyhow!(e))?;
49        }
50        if let Some(ref frequency) = db.checkpoint_frequency {
51            OverrideValidate::checkpoint_frequency(frequency).map_err(|e| anyhow::anyhow!(e))?;
52        }
53
54        let inner = self.inner.write().await;
55        let owner_id = db.owner as _;
56        let txn = inner.db.begin().await?;
57        ensure_user_id(owner_id, &txn).await?;
58        check_database_name_duplicate(&db.name, &txn).await?;
59
60        let db_obj = Self::create_object(&txn, ObjectType::Database, owner_id, None, None).await?;
61        let mut db: database::ActiveModel = db.into();
62        db.database_id = Set(db_obj.oid);
63        let db = db.insert(&txn).await?;
64
65        let mut schemas = vec![];
66        for schema_name in iter::once(DEFAULT_SCHEMA_NAME).chain(SYSTEM_SCHEMAS) {
67            let schema_obj =
68                Self::create_object(&txn, ObjectType::Schema, owner_id, Some(db_obj.oid), None)
69                    .await?;
70            let schema = schema::ActiveModel {
71                schema_id: Set(schema_obj.oid),
72                name: Set(schema_name.into()),
73            };
74            let schema = schema.insert(&txn).await?;
75            schemas.push(ObjectModel(schema, schema_obj).into());
76        }
77        txn.commit().await?;
78
79        let mut version = self
80            .notify_frontend(
81                NotificationOperation::Add,
82                NotificationInfo::Database(ObjectModel(db.clone(), db_obj).into()),
83            )
84            .await;
85        for schema in schemas {
86            version = self
87                .notify_frontend(NotificationOperation::Add, NotificationInfo::Schema(schema))
88                .await;
89        }
90
91        Ok((version, db))
92    }
93
94    pub async fn create_schema(&self, schema: PbSchema) -> MetaResult<NotificationVersion> {
95        let inner = self.inner.write().await;
96        let owner_id = schema.owner as _;
97        let txn = inner.db.begin().await?;
98        ensure_user_id(owner_id, &txn).await?;
99        ensure_object_id(ObjectType::Database, schema.database_id as _, &txn).await?;
100        check_schema_name_duplicate(&schema.name, schema.database_id as _, &txn).await?;
101
102        let schema_obj = Self::create_object(
103            &txn,
104            ObjectType::Schema,
105            owner_id,
106            Some(schema.database_id as _),
107            None,
108        )
109        .await?;
110        let mut schema: schema::ActiveModel = schema.into();
111        schema.schema_id = Set(schema_obj.oid);
112        let schema = schema.insert(&txn).await?;
113
114        let updated_user_info =
115            grant_default_privileges_automatically(&txn, schema_obj.oid).await?;
116
117        txn.commit().await?;
118
119        let mut version = self
120            .notify_frontend(
121                NotificationOperation::Add,
122                NotificationInfo::Schema(ObjectModel(schema, schema_obj).into()),
123            )
124            .await;
125
126        // notify default privileges for schemas
127        if !updated_user_info.is_empty() {
128            version = self.notify_users_update(updated_user_info).await;
129        }
130
131        Ok(version)
132    }
133
134    pub async fn create_subscription_catalog(
135        &self,
136        pb_subscription: &mut PbSubscription,
137    ) -> MetaResult<()> {
138        let inner = self.inner.write().await;
139        let txn = inner.db.begin().await?;
140
141        ensure_user_id(pb_subscription.owner as _, &txn).await?;
142        ensure_object_id(ObjectType::Database, pb_subscription.database_id as _, &txn).await?;
143        ensure_object_id(ObjectType::Schema, pb_subscription.schema_id as _, &txn).await?;
144        check_subscription_name_duplicate(pb_subscription, &txn).await?;
145
146        let obj = Self::create_object(
147            &txn,
148            ObjectType::Subscription,
149            pb_subscription.owner as _,
150            Some(pb_subscription.database_id as _),
151            Some(pb_subscription.schema_id as _),
152        )
153        .await?;
154        pb_subscription.id = obj.oid as _;
155        let subscription: subscription::ActiveModel = pb_subscription.clone().into();
156        Subscription::insert(subscription).exec(&txn).await?;
157
158        // record object dependency.
159        ObjectDependency::insert(object_dependency::ActiveModel {
160            oid: Set(pb_subscription.dependent_table_id as _),
161            used_by: Set(pb_subscription.id as _),
162            ..Default::default()
163        })
164        .exec(&txn)
165        .await?;
166        txn.commit().await?;
167        Ok(())
168    }
169
170    pub async fn create_source(
171        &self,
172        mut pb_source: PbSource,
173    ) -> MetaResult<(SourceId, NotificationVersion)> {
174        let inner = self.inner.write().await;
175        let owner_id = pb_source.owner as _;
176        let txn = inner.db.begin().await?;
177        ensure_user_id(owner_id, &txn).await?;
178        ensure_object_id(ObjectType::Database, pb_source.database_id as _, &txn).await?;
179        ensure_object_id(ObjectType::Schema, pb_source.schema_id as _, &txn).await?;
180        check_relation_name_duplicate(
181            &pb_source.name,
182            pb_source.database_id as _,
183            pb_source.schema_id as _,
184            &txn,
185        )
186        .await?;
187
188        // handle secret ref
189        let secret_ids = get_referred_secret_ids_from_source(&pb_source)?;
190        let connection_ids = get_referred_connection_ids_from_source(&pb_source);
191
192        let source_obj = Self::create_object(
193            &txn,
194            ObjectType::Source,
195            owner_id,
196            Some(pb_source.database_id as _),
197            Some(pb_source.schema_id as _),
198        )
199        .await?;
200        let source_id = source_obj.oid;
201        pb_source.id = source_id as _;
202        let source: source::ActiveModel = pb_source.clone().into();
203        Source::insert(source).exec(&txn).await?;
204
205        // add secret and connection dependency
206        let dep_relation_ids = secret_ids.iter().chain(connection_ids.iter());
207        if !secret_ids.is_empty() || !connection_ids.is_empty() {
208            ObjectDependency::insert_many(dep_relation_ids.map(|id| {
209                object_dependency::ActiveModel {
210                    oid: Set(*id as _),
211                    used_by: Set(source_id as _),
212                    ..Default::default()
213                }
214            }))
215            .exec(&txn)
216            .await?;
217        }
218
219        let updated_user_info = grant_default_privileges_automatically(&txn, source_id).await?;
220
221        txn.commit().await?;
222
223        let mut version = self
224            .notify_frontend_relation_info(
225                NotificationOperation::Add,
226                PbObjectInfo::Source(pb_source),
227            )
228            .await;
229
230        // notify default privileges for source
231        if !updated_user_info.is_empty() {
232            version = self.notify_users_update(updated_user_info).await;
233        }
234
235        Ok((source_id, version))
236    }
237
238    pub async fn create_function(
239        &self,
240        mut pb_function: PbFunction,
241    ) -> MetaResult<NotificationVersion> {
242        let inner = self.inner.write().await;
243        let owner_id = pb_function.owner as _;
244        let txn = inner.db.begin().await?;
245        ensure_user_id(owner_id, &txn).await?;
246        ensure_object_id(ObjectType::Database, pb_function.database_id as _, &txn).await?;
247        ensure_object_id(ObjectType::Schema, pb_function.schema_id as _, &txn).await?;
248        check_function_signature_duplicate(&pb_function, &txn).await?;
249
250        let function_obj = Self::create_object(
251            &txn,
252            ObjectType::Function,
253            owner_id,
254            Some(pb_function.database_id as _),
255            Some(pb_function.schema_id as _),
256        )
257        .await?;
258        pb_function.id = function_obj.oid as _;
259        let function: function::ActiveModel = pb_function.clone().into();
260        Function::insert(function).exec(&txn).await?;
261
262        let updated_user_info =
263            grant_default_privileges_automatically(&txn, function_obj.oid).await?;
264
265        txn.commit().await?;
266
267        let mut version = self
268            .notify_frontend(
269                NotificationOperation::Add,
270                NotificationInfo::Function(pb_function),
271            )
272            .await;
273
274        // notify default privileges for functions
275        if !updated_user_info.is_empty() {
276            version = self.notify_users_update(updated_user_info).await;
277        }
278
279        Ok(version)
280    }
281
282    pub async fn create_connection(
283        &self,
284        mut pb_connection: PbConnection,
285    ) -> MetaResult<NotificationVersion> {
286        let inner = self.inner.write().await;
287        let owner_id = pb_connection.owner as _;
288        let txn = inner.db.begin().await?;
289        ensure_user_id(owner_id, &txn).await?;
290        ensure_object_id(ObjectType::Database, pb_connection.database_id as _, &txn).await?;
291        ensure_object_id(ObjectType::Schema, pb_connection.schema_id as _, &txn).await?;
292        check_connection_name_duplicate(&pb_connection, &txn).await?;
293
294        let mut dep_secrets = HashSet::new();
295        if let Some(ConnectionInfo::ConnectionParams(params)) = &pb_connection.info {
296            dep_secrets.extend(
297                params
298                    .secret_refs
299                    .values()
300                    .map(|secret_ref| secret_ref.secret_id),
301            );
302        }
303
304        let conn_obj = Self::create_object(
305            &txn,
306            ObjectType::Connection,
307            owner_id,
308            Some(pb_connection.database_id as _),
309            Some(pb_connection.schema_id as _),
310        )
311        .await?;
312        pb_connection.id = conn_obj.oid as _;
313        let connection: connection::ActiveModel = pb_connection.clone().into();
314        Connection::insert(connection).exec(&txn).await?;
315
316        for secret_id in dep_secrets {
317            ObjectDependency::insert(object_dependency::ActiveModel {
318                oid: Set(secret_id as _),
319                used_by: Set(conn_obj.oid),
320                ..Default::default()
321            })
322            .exec(&txn)
323            .await?;
324        }
325
326        let updated_user_info = grant_default_privileges_automatically(&txn, conn_obj.oid).await?;
327
328        txn.commit().await?;
329
330        {
331            // call meta telemetry here to report the connection creation
332            report_event(
333                PbTelemetryEventStage::Unspecified,
334                "connection_create",
335                pb_connection.get_id() as _,
336                {
337                    pb_connection.info.as_ref().and_then(|info| match info {
338                        ConnectionInfo::ConnectionParams(params) => {
339                            Some(params.connection_type().as_str_name().to_owned())
340                        }
341                        _ => None,
342                    })
343                },
344                None,
345                None,
346            );
347        }
348
349        let mut version = self
350            .notify_frontend(
351                NotificationOperation::Add,
352                NotificationInfo::Connection(pb_connection),
353            )
354            .await;
355
356        // notify default privileges for connections
357        if !updated_user_info.is_empty() {
358            version = self.notify_users_update(updated_user_info).await;
359        }
360
361        Ok(version)
362    }
363
364    pub async fn create_secret(
365        &self,
366        mut pb_secret: PbSecret,
367        secret_plain_payload: Vec<u8>,
368    ) -> MetaResult<NotificationVersion> {
369        let inner = self.inner.write().await;
370        let owner_id = pb_secret.owner as _;
371        let txn = inner.db.begin().await?;
372        ensure_user_id(owner_id, &txn).await?;
373        ensure_object_id(ObjectType::Database, pb_secret.database_id as _, &txn).await?;
374        ensure_object_id(ObjectType::Schema, pb_secret.schema_id as _, &txn).await?;
375        check_secret_name_duplicate(&pb_secret, &txn).await?;
376
377        let secret_obj = Self::create_object(
378            &txn,
379            ObjectType::Secret,
380            owner_id,
381            Some(pb_secret.database_id as _),
382            Some(pb_secret.schema_id as _),
383        )
384        .await?;
385        pb_secret.id = secret_obj.oid as _;
386        let secret: secret::ActiveModel = pb_secret.clone().into();
387        Secret::insert(secret).exec(&txn).await?;
388
389        let updated_user_info =
390            grant_default_privileges_automatically(&txn, secret_obj.oid).await?;
391
392        txn.commit().await?;
393
394        // Notify the compute and frontend node plain secret
395        let mut secret_plain = pb_secret;
396        secret_plain.value.clone_from(&secret_plain_payload);
397
398        LocalSecretManager::global().add_secret(secret_plain.id, secret_plain_payload);
399        self.env
400            .notification_manager()
401            .notify_compute_without_version(Operation::Add, Info::Secret(secret_plain.clone()));
402
403        let mut version = self
404            .notify_frontend(
405                NotificationOperation::Add,
406                NotificationInfo::Secret(secret_plain),
407            )
408            .await;
409
410        // notify default privileges for secrets
411        if !updated_user_info.is_empty() {
412            version = self.notify_users_update(updated_user_info).await;
413        }
414
415        Ok(version)
416    }
417
418    pub async fn create_view(
419        &self,
420        mut pb_view: PbView,
421        dependencies: HashSet<ObjectId>,
422    ) -> MetaResult<NotificationVersion> {
423        let inner = self.inner.write().await;
424        let owner_id = pb_view.owner as _;
425        let txn = inner.db.begin().await?;
426        ensure_user_id(owner_id, &txn).await?;
427        ensure_object_id(ObjectType::Database, pb_view.database_id as _, &txn).await?;
428        ensure_object_id(ObjectType::Schema, pb_view.schema_id as _, &txn).await?;
429        check_relation_name_duplicate(
430            &pb_view.name,
431            pb_view.database_id as _,
432            pb_view.schema_id as _,
433            &txn,
434        )
435        .await?;
436
437        let view_obj = Self::create_object(
438            &txn,
439            ObjectType::View,
440            owner_id,
441            Some(pb_view.database_id as _),
442            Some(pb_view.schema_id as _),
443        )
444        .await?;
445        pb_view.id = view_obj.oid as _;
446        let view: view::ActiveModel = pb_view.clone().into();
447        View::insert(view).exec(&txn).await?;
448
449        for obj_id in dependencies {
450            ObjectDependency::insert(object_dependency::ActiveModel {
451                oid: Set(obj_id),
452                used_by: Set(view_obj.oid),
453                ..Default::default()
454            })
455            .exec(&txn)
456            .await?;
457        }
458        let updated_user_info = grant_default_privileges_automatically(&txn, view_obj.oid).await?;
459
460        txn.commit().await?;
461
462        let mut version = self
463            .notify_frontend_relation_info(NotificationOperation::Add, PbObjectInfo::View(pb_view))
464            .await;
465
466        // notify default privileges for views
467        if !updated_user_info.is_empty() {
468            version = self.notify_users_update(updated_user_info).await;
469        }
470
471        Ok(version)
472    }
473
474    pub async fn validate_cross_db_snapshot_backfill(
475        &self,
476        cross_db_snapshot_backfill_info: &SnapshotBackfillInfo,
477    ) -> MetaResult<()> {
478        if cross_db_snapshot_backfill_info
479            .upstream_mv_table_id_to_backfill_epoch
480            .is_empty()
481        {
482            return Ok(());
483        }
484
485        let inner = self.inner.read().await;
486        let table_ids = cross_db_snapshot_backfill_info
487            .upstream_mv_table_id_to_backfill_epoch
488            .keys()
489            .map(|t| t.table_id as ObjectId)
490            .collect_vec();
491        let cnt = Subscription::find()
492            .select_only()
493            .column(subscription::Column::DependentTableId)
494            .distinct()
495            .filter(subscription::Column::DependentTableId.is_in(table_ids))
496            .count(&inner.db)
497            .await? as usize;
498
499        if cnt
500            < cross_db_snapshot_backfill_info
501                .upstream_mv_table_id_to_backfill_epoch
502                .keys()
503                .count()
504        {
505            return Err(MetaError::permission_denied(
506                "Some upstream tables are not subscribed".to_owned(),
507            ));
508        }
509
510        Ok(())
511    }
512}