risingwave_meta/controller/catalog/
create_op.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::system_param::{OverrideValidate, Validate};
16use risingwave_common::util::epoch::Epoch;
17
18use super::*;
19use crate::barrier::SnapshotBackfillInfo;
20
21impl CatalogController {
22    pub(crate) async fn create_object(
23        txn: &DatabaseTransaction,
24        obj_type: ObjectType,
25        owner_id: UserId,
26        database_id: Option<DatabaseId>,
27        schema_id: Option<SchemaId>,
28    ) -> MetaResult<object::Model> {
29        let active_db = object::ActiveModel {
30            oid: Default::default(),
31            obj_type: Set(obj_type),
32            owner_id: Set(owner_id),
33            schema_id: Set(schema_id),
34            database_id: Set(database_id),
35            initialized_at: Default::default(),
36            created_at: Default::default(),
37            initialized_at_cluster_version: Set(Some(current_cluster_version())),
38            created_at_cluster_version: Set(Some(current_cluster_version())),
39        };
40        Ok(active_db.insert(txn).await?)
41    }
42
43    pub async fn create_database(
44        &self,
45        db: PbDatabase,
46    ) -> MetaResult<(NotificationVersion, risingwave_meta_model::database::Model)> {
47        // validate first
48        if let Some(ref interval) = db.barrier_interval_ms {
49            OverrideValidate::barrier_interval_ms(interval).map_err(|e| anyhow::anyhow!(e))?;
50        }
51        if let Some(ref frequency) = db.checkpoint_frequency {
52            OverrideValidate::checkpoint_frequency(frequency).map_err(|e| anyhow::anyhow!(e))?;
53        }
54
55        let inner = self.inner.write().await;
56        let owner_id = db.owner as _;
57        let txn = inner.db.begin().await?;
58        ensure_user_id(owner_id, &txn).await?;
59        check_database_name_duplicate(&db.name, &txn).await?;
60
61        let db_obj = Self::create_object(&txn, ObjectType::Database, owner_id, None, None).await?;
62        let mut db: database::ActiveModel = db.into();
63        db.database_id = Set(db_obj.oid);
64        let db = db.insert(&txn).await?;
65
66        let mut schemas = vec![];
67        for schema_name in iter::once(DEFAULT_SCHEMA_NAME).chain(SYSTEM_SCHEMAS) {
68            let schema_obj =
69                Self::create_object(&txn, ObjectType::Schema, owner_id, Some(db_obj.oid), None)
70                    .await?;
71            let schema = schema::ActiveModel {
72                schema_id: Set(schema_obj.oid),
73                name: Set(schema_name.into()),
74            };
75            let schema = schema.insert(&txn).await?;
76            schemas.push(ObjectModel(schema, schema_obj).into());
77        }
78        txn.commit().await?;
79
80        let mut version = self
81            .notify_frontend(
82                NotificationOperation::Add,
83                NotificationInfo::Database(ObjectModel(db.clone(), db_obj).into()),
84            )
85            .await;
86        for schema in schemas {
87            version = self
88                .notify_frontend(NotificationOperation::Add, NotificationInfo::Schema(schema))
89                .await;
90        }
91
92        Ok((version, db))
93    }
94
95    pub async fn create_schema(&self, schema: PbSchema) -> MetaResult<NotificationVersion> {
96        let inner = self.inner.write().await;
97        let owner_id = schema.owner as _;
98        let txn = inner.db.begin().await?;
99        ensure_user_id(owner_id, &txn).await?;
100        ensure_object_id(ObjectType::Database, schema.database_id as _, &txn).await?;
101        check_schema_name_duplicate(&schema.name, schema.database_id as _, &txn).await?;
102
103        let schema_obj = Self::create_object(
104            &txn,
105            ObjectType::Schema,
106            owner_id,
107            Some(schema.database_id as _),
108            None,
109        )
110        .await?;
111        let mut schema: schema::ActiveModel = schema.into();
112        schema.schema_id = Set(schema_obj.oid);
113        let schema = schema.insert(&txn).await?;
114
115        let updated_user_info =
116            grant_default_privileges_automatically(&txn, schema_obj.oid).await?;
117
118        txn.commit().await?;
119
120        let mut version = self
121            .notify_frontend(
122                NotificationOperation::Add,
123                NotificationInfo::Schema(ObjectModel(schema, schema_obj).into()),
124            )
125            .await;
126
127        // notify default privileges for schemas
128        if !updated_user_info.is_empty() {
129            version = self.notify_users_update(updated_user_info).await;
130        }
131
132        Ok(version)
133    }
134
135    pub async fn create_subscription_catalog(
136        &self,
137        pb_subscription: &mut PbSubscription,
138    ) -> MetaResult<()> {
139        let inner = self.inner.write().await;
140        let txn = inner.db.begin().await?;
141
142        ensure_user_id(pb_subscription.owner as _, &txn).await?;
143        ensure_object_id(ObjectType::Database, pb_subscription.database_id as _, &txn).await?;
144        ensure_object_id(ObjectType::Schema, pb_subscription.schema_id as _, &txn).await?;
145        check_subscription_name_duplicate(pb_subscription, &txn).await?;
146
147        let obj = Self::create_object(
148            &txn,
149            ObjectType::Subscription,
150            pb_subscription.owner as _,
151            Some(pb_subscription.database_id as _),
152            Some(pb_subscription.schema_id as _),
153        )
154        .await?;
155        pb_subscription.id = obj.oid as _;
156        let subscription: subscription::ActiveModel = pb_subscription.clone().into();
157        Subscription::insert(subscription).exec(&txn).await?;
158
159        // record object dependency.
160        ObjectDependency::insert(object_dependency::ActiveModel {
161            oid: Set(pb_subscription.dependent_table_id as _),
162            used_by: Set(pb_subscription.id as _),
163            ..Default::default()
164        })
165        .exec(&txn)
166        .await?;
167        txn.commit().await?;
168        Ok(())
169    }
170
171    pub async fn create_source(
172        &self,
173        mut pb_source: PbSource,
174    ) -> MetaResult<(SourceId, NotificationVersion)> {
175        let inner = self.inner.write().await;
176        let owner_id = pb_source.owner as _;
177        let txn = inner.db.begin().await?;
178        ensure_user_id(owner_id, &txn).await?;
179        ensure_object_id(ObjectType::Database, pb_source.database_id as _, &txn).await?;
180        ensure_object_id(ObjectType::Schema, pb_source.schema_id as _, &txn).await?;
181        check_relation_name_duplicate(
182            &pb_source.name,
183            pb_source.database_id as _,
184            pb_source.schema_id as _,
185            &txn,
186        )
187        .await?;
188
189        // handle secret ref
190        let secret_ids = get_referred_secret_ids_from_source(&pb_source)?;
191        let connection_ids = get_referred_connection_ids_from_source(&pb_source);
192
193        let source_obj = Self::create_object(
194            &txn,
195            ObjectType::Source,
196            owner_id,
197            Some(pb_source.database_id as _),
198            Some(pb_source.schema_id as _),
199        )
200        .await?;
201        let source_id = source_obj.oid;
202        pb_source.id = source_id as _;
203        let source: source::ActiveModel = pb_source.clone().into();
204        Source::insert(source).exec(&txn).await?;
205
206        // add secret and connection dependency
207        let dep_relation_ids = secret_ids.iter().chain(connection_ids.iter());
208        if !secret_ids.is_empty() || !connection_ids.is_empty() {
209            ObjectDependency::insert_many(dep_relation_ids.map(|id| {
210                object_dependency::ActiveModel {
211                    oid: Set(*id as _),
212                    used_by: Set(source_id as _),
213                    ..Default::default()
214                }
215            }))
216            .exec(&txn)
217            .await?;
218        }
219
220        let updated_user_info = grant_default_privileges_automatically(&txn, source_id).await?;
221
222        txn.commit().await?;
223
224        let mut version = self
225            .notify_frontend_relation_info(
226                NotificationOperation::Add,
227                PbObjectInfo::Source(pb_source),
228            )
229            .await;
230
231        // notify default privileges for source
232        if !updated_user_info.is_empty() {
233            version = self.notify_users_update(updated_user_info).await;
234        }
235
236        Ok((source_id, version))
237    }
238
239    pub async fn create_function(
240        &self,
241        mut pb_function: PbFunction,
242    ) -> MetaResult<NotificationVersion> {
243        let inner = self.inner.write().await;
244        let owner_id = pb_function.owner as _;
245        let txn = inner.db.begin().await?;
246        ensure_user_id(owner_id, &txn).await?;
247        ensure_object_id(ObjectType::Database, pb_function.database_id as _, &txn).await?;
248        ensure_object_id(ObjectType::Schema, pb_function.schema_id as _, &txn).await?;
249        check_function_signature_duplicate(&pb_function, &txn).await?;
250
251        let function_obj = Self::create_object(
252            &txn,
253            ObjectType::Function,
254            owner_id,
255            Some(pb_function.database_id as _),
256            Some(pb_function.schema_id as _),
257        )
258        .await?;
259        pb_function.id = function_obj.oid as _;
260        pb_function.created_at_epoch = Some(
261            Epoch::from_unix_millis(function_obj.created_at.and_utc().timestamp_millis() as _).0,
262        );
263        pb_function.created_at_cluster_version = function_obj.created_at_cluster_version;
264        let function: function::ActiveModel = pb_function.clone().into();
265        Function::insert(function).exec(&txn).await?;
266
267        let updated_user_info =
268            grant_default_privileges_automatically(&txn, function_obj.oid).await?;
269
270        txn.commit().await?;
271
272        let mut version = self
273            .notify_frontend(
274                NotificationOperation::Add,
275                NotificationInfo::Function(pb_function),
276            )
277            .await;
278
279        // notify default privileges for functions
280        if !updated_user_info.is_empty() {
281            version = self.notify_users_update(updated_user_info).await;
282        }
283
284        Ok(version)
285    }
286
287    pub async fn create_connection(
288        &self,
289        mut pb_connection: PbConnection,
290    ) -> MetaResult<NotificationVersion> {
291        let inner = self.inner.write().await;
292        let owner_id = pb_connection.owner as _;
293        let txn = inner.db.begin().await?;
294        ensure_user_id(owner_id, &txn).await?;
295        ensure_object_id(ObjectType::Database, pb_connection.database_id as _, &txn).await?;
296        ensure_object_id(ObjectType::Schema, pb_connection.schema_id as _, &txn).await?;
297        check_connection_name_duplicate(&pb_connection, &txn).await?;
298
299        let mut dep_secrets = HashSet::new();
300        if let Some(ConnectionInfo::ConnectionParams(params)) = &pb_connection.info {
301            dep_secrets.extend(
302                params
303                    .secret_refs
304                    .values()
305                    .map(|secret_ref| secret_ref.secret_id),
306            );
307        }
308
309        let conn_obj = Self::create_object(
310            &txn,
311            ObjectType::Connection,
312            owner_id,
313            Some(pb_connection.database_id as _),
314            Some(pb_connection.schema_id as _),
315        )
316        .await?;
317        pb_connection.id = conn_obj.oid as _;
318        let connection: connection::ActiveModel = pb_connection.clone().into();
319        Connection::insert(connection).exec(&txn).await?;
320
321        for secret_id in dep_secrets {
322            ObjectDependency::insert(object_dependency::ActiveModel {
323                oid: Set(secret_id as _),
324                used_by: Set(conn_obj.oid),
325                ..Default::default()
326            })
327            .exec(&txn)
328            .await?;
329        }
330
331        let updated_user_info = grant_default_privileges_automatically(&txn, conn_obj.oid).await?;
332
333        txn.commit().await?;
334
335        {
336            // call meta telemetry here to report the connection creation
337            report_event(
338                PbTelemetryEventStage::Unspecified,
339                "connection_create",
340                pb_connection.get_id() as _,
341                {
342                    pb_connection.info.as_ref().and_then(|info| match info {
343                        ConnectionInfo::ConnectionParams(params) => {
344                            Some(params.connection_type().as_str_name().to_owned())
345                        }
346                        _ => None,
347                    })
348                },
349                None,
350                None,
351            );
352        }
353
354        let mut version = self
355            .notify_frontend(
356                NotificationOperation::Add,
357                NotificationInfo::Connection(pb_connection),
358            )
359            .await;
360
361        // notify default privileges for connections
362        if !updated_user_info.is_empty() {
363            version = self.notify_users_update(updated_user_info).await;
364        }
365
366        Ok(version)
367    }
368
369    pub async fn create_secret(
370        &self,
371        mut pb_secret: PbSecret,
372        secret_plain_payload: Vec<u8>,
373    ) -> MetaResult<NotificationVersion> {
374        let inner = self.inner.write().await;
375        let owner_id = pb_secret.owner as _;
376        let txn = inner.db.begin().await?;
377        ensure_user_id(owner_id, &txn).await?;
378        ensure_object_id(ObjectType::Database, pb_secret.database_id as _, &txn).await?;
379        ensure_object_id(ObjectType::Schema, pb_secret.schema_id as _, &txn).await?;
380        check_secret_name_duplicate(&pb_secret, &txn).await?;
381
382        let secret_obj = Self::create_object(
383            &txn,
384            ObjectType::Secret,
385            owner_id,
386            Some(pb_secret.database_id as _),
387            Some(pb_secret.schema_id as _),
388        )
389        .await?;
390        pb_secret.id = secret_obj.oid as _;
391        let secret: secret::ActiveModel = pb_secret.clone().into();
392        Secret::insert(secret).exec(&txn).await?;
393
394        let updated_user_info =
395            grant_default_privileges_automatically(&txn, secret_obj.oid).await?;
396
397        txn.commit().await?;
398
399        // Notify the compute and frontend node plain secret
400        let mut secret_plain = pb_secret;
401        secret_plain.value.clone_from(&secret_plain_payload);
402
403        LocalSecretManager::global().add_secret(secret_plain.id, secret_plain_payload);
404        self.env
405            .notification_manager()
406            .notify_compute_without_version(Operation::Add, Info::Secret(secret_plain.clone()));
407
408        let mut version = self
409            .notify_frontend(
410                NotificationOperation::Add,
411                NotificationInfo::Secret(secret_plain),
412            )
413            .await;
414
415        // notify default privileges for secrets
416        if !updated_user_info.is_empty() {
417            version = self.notify_users_update(updated_user_info).await;
418        }
419
420        Ok(version)
421    }
422
423    pub async fn create_view(
424        &self,
425        mut pb_view: PbView,
426        dependencies: HashSet<ObjectId>,
427    ) -> MetaResult<NotificationVersion> {
428        let inner = self.inner.write().await;
429        let owner_id = pb_view.owner as _;
430        let txn = inner.db.begin().await?;
431        ensure_user_id(owner_id, &txn).await?;
432        ensure_object_id(ObjectType::Database, pb_view.database_id as _, &txn).await?;
433        ensure_object_id(ObjectType::Schema, pb_view.schema_id as _, &txn).await?;
434        check_relation_name_duplicate(
435            &pb_view.name,
436            pb_view.database_id as _,
437            pb_view.schema_id as _,
438            &txn,
439        )
440        .await?;
441
442        let view_obj = Self::create_object(
443            &txn,
444            ObjectType::View,
445            owner_id,
446            Some(pb_view.database_id as _),
447            Some(pb_view.schema_id as _),
448        )
449        .await?;
450        pb_view.id = view_obj.oid as _;
451        let view: view::ActiveModel = pb_view.clone().into();
452        View::insert(view).exec(&txn).await?;
453
454        for obj_id in dependencies {
455            ObjectDependency::insert(object_dependency::ActiveModel {
456                oid: Set(obj_id),
457                used_by: Set(view_obj.oid),
458                ..Default::default()
459            })
460            .exec(&txn)
461            .await?;
462        }
463        let updated_user_info = grant_default_privileges_automatically(&txn, view_obj.oid).await?;
464
465        txn.commit().await?;
466
467        let mut version = self
468            .notify_frontend_relation_info(NotificationOperation::Add, PbObjectInfo::View(pb_view))
469            .await;
470
471        // notify default privileges for views
472        if !updated_user_info.is_empty() {
473            version = self.notify_users_update(updated_user_info).await;
474        }
475
476        Ok(version)
477    }
478
479    pub async fn validate_cross_db_snapshot_backfill(
480        &self,
481        cross_db_snapshot_backfill_info: &SnapshotBackfillInfo,
482    ) -> MetaResult<()> {
483        if cross_db_snapshot_backfill_info
484            .upstream_mv_table_id_to_backfill_epoch
485            .is_empty()
486        {
487            return Ok(());
488        }
489
490        let inner = self.inner.read().await;
491        let table_ids = cross_db_snapshot_backfill_info
492            .upstream_mv_table_id_to_backfill_epoch
493            .keys()
494            .map(|t| t.table_id as ObjectId)
495            .collect_vec();
496        let cnt = Subscription::find()
497            .select_only()
498            .column(subscription::Column::DependentTableId)
499            .distinct()
500            .filter(subscription::Column::DependentTableId.is_in(table_ids))
501            .count(&inner.db)
502            .await? as usize;
503
504        if cnt
505            < cross_db_snapshot_backfill_info
506                .upstream_mv_table_id_to_backfill_epoch
507                .keys()
508                .count()
509        {
510            return Err(MetaError::permission_denied(
511                "Some upstream tables are not subscribed".to_owned(),
512            ));
513        }
514
515        Ok(())
516    }
517}