risingwave_meta/controller/catalog/
create_op.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::system_param::{OverrideValidate, Validate};
16use risingwave_common::util::epoch::Epoch;
17
18use super::*;
19use crate::barrier::SnapshotBackfillInfo;
20
21impl CatalogController {
22    pub(crate) async fn create_object(
23        txn: &DatabaseTransaction,
24        obj_type: ObjectType,
25        owner_id: UserId,
26        database_id: Option<DatabaseId>,
27        schema_id: Option<SchemaId>,
28    ) -> MetaResult<object::Model> {
29        let active_db = object::ActiveModel {
30            oid: Default::default(),
31            obj_type: Set(obj_type),
32            owner_id: Set(owner_id),
33            schema_id: Set(schema_id),
34            database_id: Set(database_id),
35            initialized_at: Default::default(),
36            created_at: Default::default(),
37            initialized_at_cluster_version: Set(Some(current_cluster_version())),
38            created_at_cluster_version: Set(Some(current_cluster_version())),
39        };
40        Ok(active_db.insert(txn).await?)
41    }
42
43    pub async fn create_database(
44        &self,
45        db: PbDatabase,
46    ) -> MetaResult<(NotificationVersion, risingwave_meta_model::database::Model)> {
47        // validate first
48        if let Some(ref interval) = db.barrier_interval_ms {
49            OverrideValidate::barrier_interval_ms(interval).map_err(|e| anyhow::anyhow!(e))?;
50        }
51        if let Some(ref frequency) = db.checkpoint_frequency {
52            OverrideValidate::checkpoint_frequency(frequency).map_err(|e| anyhow::anyhow!(e))?;
53        }
54
55        let inner = self.inner.write().await;
56        let owner_id = db.owner as _;
57        let txn = inner.db.begin().await?;
58        ensure_user_id(owner_id, &txn).await?;
59        check_database_name_duplicate(&db.name, &txn).await?;
60
61        let db_obj = Self::create_object(&txn, ObjectType::Database, owner_id, None, None).await?;
62        let mut db: database::ActiveModel = db.into();
63        db.database_id = Set(DatabaseId::new(db_obj.oid as _));
64        let db = db.insert(&txn).await?;
65
66        let mut schemas = vec![];
67        for schema_name in iter::once(DEFAULT_SCHEMA_NAME).chain(SYSTEM_SCHEMAS) {
68            let schema_obj = Self::create_object(
69                &txn,
70                ObjectType::Schema,
71                owner_id,
72                Some(DatabaseId::new(db_obj.oid as _)),
73                None,
74            )
75            .await?;
76            let schema = schema::ActiveModel {
77                schema_id: Set(SchemaId::new(schema_obj.oid as _)),
78                name: Set(schema_name.into()),
79            };
80            let schema = schema.insert(&txn).await?;
81            schemas.push(ObjectModel(schema, schema_obj).into());
82        }
83        txn.commit().await?;
84
85        let mut version = self
86            .notify_frontend(
87                NotificationOperation::Add,
88                NotificationInfo::Database(ObjectModel(db.clone(), db_obj).into()),
89            )
90            .await;
91        for schema in schemas {
92            version = self
93                .notify_frontend(NotificationOperation::Add, NotificationInfo::Schema(schema))
94                .await;
95        }
96
97        Ok((version, db))
98    }
99
100    pub async fn create_schema(&self, schema: PbSchema) -> MetaResult<NotificationVersion> {
101        let inner = self.inner.write().await;
102        let owner_id = schema.owner as _;
103        let txn = inner.db.begin().await?;
104        ensure_user_id(owner_id, &txn).await?;
105        ensure_object_id(ObjectType::Database, schema.database_id as _, &txn).await?;
106        check_schema_name_duplicate(&schema.name, schema.database_id.into(), &txn).await?;
107
108        let schema_obj = Self::create_object(
109            &txn,
110            ObjectType::Schema,
111            owner_id,
112            Some(schema.database_id.into()),
113            None,
114        )
115        .await?;
116        let mut schema: schema::ActiveModel = schema.into();
117        schema.schema_id = Set(SchemaId::new(schema_obj.oid as _));
118        let schema = schema.insert(&txn).await?;
119
120        let updated_user_info =
121            grant_default_privileges_automatically(&txn, schema_obj.oid).await?;
122
123        txn.commit().await?;
124
125        let mut version = self
126            .notify_frontend(
127                NotificationOperation::Add,
128                NotificationInfo::Schema(ObjectModel(schema, schema_obj).into()),
129            )
130            .await;
131
132        // notify default privileges for schemas
133        if !updated_user_info.is_empty() {
134            version = self.notify_users_update(updated_user_info).await;
135        }
136
137        Ok(version)
138    }
139
140    pub async fn create_subscription_catalog(
141        &self,
142        pb_subscription: &mut PbSubscription,
143    ) -> MetaResult<()> {
144        let inner = self.inner.write().await;
145        let txn = inner.db.begin().await?;
146
147        ensure_user_id(pb_subscription.owner as _, &txn).await?;
148        ensure_object_id(ObjectType::Database, pb_subscription.database_id as _, &txn).await?;
149        ensure_object_id(ObjectType::Schema, pb_subscription.schema_id as _, &txn).await?;
150        check_subscription_name_duplicate(pb_subscription, &txn).await?;
151
152        let obj = Self::create_object(
153            &txn,
154            ObjectType::Subscription,
155            pb_subscription.owner as _,
156            Some(pb_subscription.database_id.into()),
157            Some(pb_subscription.schema_id.into()),
158        )
159        .await?;
160        pb_subscription.id = obj.oid as _;
161        let subscription: subscription::ActiveModel = pb_subscription.clone().into();
162        Subscription::insert(subscription).exec(&txn).await?;
163
164        // record object dependency.
165        ObjectDependency::insert(object_dependency::ActiveModel {
166            oid: Set(pb_subscription.dependent_table_id as _),
167            used_by: Set(pb_subscription.id as _),
168            ..Default::default()
169        })
170        .exec(&txn)
171        .await?;
172        txn.commit().await?;
173        Ok(())
174    }
175
176    pub async fn create_source(
177        &self,
178        mut pb_source: PbSource,
179    ) -> MetaResult<(SourceId, NotificationVersion)> {
180        let inner = self.inner.write().await;
181        let owner_id = pb_source.owner as _;
182        let txn = inner.db.begin().await?;
183        ensure_user_id(owner_id, &txn).await?;
184        ensure_object_id(ObjectType::Database, pb_source.database_id as _, &txn).await?;
185        ensure_object_id(ObjectType::Schema, pb_source.schema_id as _, &txn).await?;
186        check_relation_name_duplicate(
187            &pb_source.name,
188            pb_source.database_id.into(),
189            pb_source.schema_id.into(),
190            &txn,
191        )
192        .await?;
193
194        // handle secret ref
195        let secret_ids = get_referred_secret_ids_from_source(&pb_source)?;
196        let connection_ids = get_referred_connection_ids_from_source(&pb_source);
197
198        let source_obj = Self::create_object(
199            &txn,
200            ObjectType::Source,
201            owner_id,
202            Some(pb_source.database_id.into()),
203            Some(pb_source.schema_id.into()),
204        )
205        .await?;
206        let source_id = source_obj.oid;
207        pb_source.id = source_id as _;
208        let source: source::ActiveModel = pb_source.clone().into();
209        Source::insert(source).exec(&txn).await?;
210
211        // add secret and connection dependency
212        let dep_relation_ids = secret_ids.iter().chain(connection_ids.iter());
213        if !secret_ids.is_empty() || !connection_ids.is_empty() {
214            ObjectDependency::insert_many(dep_relation_ids.map(|id| {
215                object_dependency::ActiveModel {
216                    oid: Set(*id as _),
217                    used_by: Set(source_id as _),
218                    ..Default::default()
219                }
220            }))
221            .exec(&txn)
222            .await?;
223        }
224
225        let updated_user_info = grant_default_privileges_automatically(&txn, source_id).await?;
226
227        txn.commit().await?;
228
229        let mut version = self
230            .notify_frontend_relation_info(
231                NotificationOperation::Add,
232                PbObjectInfo::Source(pb_source),
233            )
234            .await;
235
236        // notify default privileges for source
237        if !updated_user_info.is_empty() {
238            version = self.notify_users_update(updated_user_info).await;
239        }
240
241        Ok((source_id, version))
242    }
243
244    pub async fn create_function(
245        &self,
246        mut pb_function: PbFunction,
247    ) -> MetaResult<NotificationVersion> {
248        let inner = self.inner.write().await;
249        let owner_id = pb_function.owner as _;
250        let txn = inner.db.begin().await?;
251        ensure_user_id(owner_id, &txn).await?;
252        ensure_object_id(ObjectType::Database, pb_function.database_id as _, &txn).await?;
253        ensure_object_id(ObjectType::Schema, pb_function.schema_id as _, &txn).await?;
254        check_function_signature_duplicate(&pb_function, &txn).await?;
255
256        let function_obj = Self::create_object(
257            &txn,
258            ObjectType::Function,
259            owner_id,
260            Some(pb_function.database_id.into()),
261            Some(pb_function.schema_id.into()),
262        )
263        .await?;
264        pb_function.id = function_obj.oid as _;
265        pb_function.created_at_epoch = Some(
266            Epoch::from_unix_millis(function_obj.created_at.and_utc().timestamp_millis() as _).0,
267        );
268        pb_function.created_at_cluster_version = function_obj.created_at_cluster_version;
269        let function: function::ActiveModel = pb_function.clone().into();
270        Function::insert(function).exec(&txn).await?;
271
272        let updated_user_info =
273            grant_default_privileges_automatically(&txn, function_obj.oid).await?;
274
275        txn.commit().await?;
276
277        let mut version = self
278            .notify_frontend(
279                NotificationOperation::Add,
280                NotificationInfo::Function(pb_function),
281            )
282            .await;
283
284        // notify default privileges for functions
285        if !updated_user_info.is_empty() {
286            version = self.notify_users_update(updated_user_info).await;
287        }
288
289        Ok(version)
290    }
291
292    pub async fn create_connection(
293        &self,
294        mut pb_connection: PbConnection,
295    ) -> MetaResult<NotificationVersion> {
296        let inner = self.inner.write().await;
297        let owner_id = pb_connection.owner as _;
298        let txn = inner.db.begin().await?;
299        ensure_user_id(owner_id, &txn).await?;
300        ensure_object_id(ObjectType::Database, pb_connection.database_id as _, &txn).await?;
301        ensure_object_id(ObjectType::Schema, pb_connection.schema_id as _, &txn).await?;
302        check_connection_name_duplicate(&pb_connection, &txn).await?;
303
304        let mut dep_secrets = HashSet::new();
305        if let Some(ConnectionInfo::ConnectionParams(params)) = &pb_connection.info {
306            dep_secrets.extend(
307                params
308                    .secret_refs
309                    .values()
310                    .map(|secret_ref| secret_ref.secret_id),
311            );
312        }
313
314        let conn_obj = Self::create_object(
315            &txn,
316            ObjectType::Connection,
317            owner_id,
318            Some(pb_connection.database_id.into()),
319            Some(pb_connection.schema_id.into()),
320        )
321        .await?;
322        pb_connection.id = conn_obj.oid as _;
323        let connection: connection::ActiveModel = pb_connection.clone().into();
324        Connection::insert(connection).exec(&txn).await?;
325
326        for secret_id in dep_secrets {
327            ObjectDependency::insert(object_dependency::ActiveModel {
328                oid: Set(secret_id as _),
329                used_by: Set(conn_obj.oid),
330                ..Default::default()
331            })
332            .exec(&txn)
333            .await?;
334        }
335
336        let updated_user_info = grant_default_privileges_automatically(&txn, conn_obj.oid).await?;
337
338        txn.commit().await?;
339
340        {
341            // call meta telemetry here to report the connection creation
342            report_event(
343                PbTelemetryEventStage::Unspecified,
344                "connection_create",
345                pb_connection.get_id() as _,
346                {
347                    pb_connection.info.as_ref().and_then(|info| match info {
348                        ConnectionInfo::ConnectionParams(params) => {
349                            Some(params.connection_type().as_str_name().to_owned())
350                        }
351                        _ => None,
352                    })
353                },
354                None,
355                None,
356            );
357        }
358
359        let mut version = self
360            .notify_frontend(
361                NotificationOperation::Add,
362                NotificationInfo::Connection(pb_connection),
363            )
364            .await;
365
366        // notify default privileges for connections
367        if !updated_user_info.is_empty() {
368            version = self.notify_users_update(updated_user_info).await;
369        }
370
371        Ok(version)
372    }
373
374    pub async fn create_secret(
375        &self,
376        mut pb_secret: PbSecret,
377        secret_plain_payload: Vec<u8>,
378    ) -> MetaResult<NotificationVersion> {
379        let inner = self.inner.write().await;
380        let owner_id = pb_secret.owner as _;
381        let txn = inner.db.begin().await?;
382        ensure_user_id(owner_id, &txn).await?;
383        ensure_object_id(ObjectType::Database, pb_secret.database_id as _, &txn).await?;
384        ensure_object_id(ObjectType::Schema, pb_secret.schema_id as _, &txn).await?;
385        check_secret_name_duplicate(&pb_secret, &txn).await?;
386
387        let secret_obj = Self::create_object(
388            &txn,
389            ObjectType::Secret,
390            owner_id,
391            Some(pb_secret.database_id.into()),
392            Some(pb_secret.schema_id.into()),
393        )
394        .await?;
395        pb_secret.id = secret_obj.oid as _;
396        let secret: secret::ActiveModel = pb_secret.clone().into();
397        Secret::insert(secret).exec(&txn).await?;
398
399        let updated_user_info =
400            grant_default_privileges_automatically(&txn, secret_obj.oid).await?;
401
402        txn.commit().await?;
403
404        // Notify the compute and frontend node plain secret
405        let mut secret_plain = pb_secret;
406        secret_plain.value.clone_from(&secret_plain_payload);
407
408        LocalSecretManager::global().add_secret(secret_plain.id, secret_plain_payload);
409        self.env
410            .notification_manager()
411            .notify_compute_without_version(Operation::Add, Info::Secret(secret_plain.clone()));
412
413        let mut version = self
414            .notify_frontend(
415                NotificationOperation::Add,
416                NotificationInfo::Secret(secret_plain),
417            )
418            .await;
419
420        // notify default privileges for secrets
421        if !updated_user_info.is_empty() {
422            version = self.notify_users_update(updated_user_info).await;
423        }
424
425        Ok(version)
426    }
427
428    pub async fn create_view(
429        &self,
430        mut pb_view: PbView,
431        dependencies: HashSet<ObjectId>,
432    ) -> MetaResult<NotificationVersion> {
433        let inner = self.inner.write().await;
434        let owner_id = pb_view.owner as _;
435        let txn = inner.db.begin().await?;
436        ensure_user_id(owner_id, &txn).await?;
437        ensure_object_id(ObjectType::Database, pb_view.database_id as _, &txn).await?;
438        ensure_object_id(ObjectType::Schema, pb_view.schema_id as _, &txn).await?;
439        check_relation_name_duplicate(
440            &pb_view.name,
441            pb_view.database_id.into(),
442            pb_view.schema_id.into(),
443            &txn,
444        )
445        .await?;
446
447        let view_obj = Self::create_object(
448            &txn,
449            ObjectType::View,
450            owner_id,
451            Some(pb_view.database_id.into()),
452            Some(pb_view.schema_id.into()),
453        )
454        .await?;
455        pb_view.id = view_obj.oid as _;
456        let view: view::ActiveModel = pb_view.clone().into();
457        View::insert(view).exec(&txn).await?;
458
459        for obj_id in dependencies {
460            ObjectDependency::insert(object_dependency::ActiveModel {
461                oid: Set(obj_id),
462                used_by: Set(view_obj.oid),
463                ..Default::default()
464            })
465            .exec(&txn)
466            .await?;
467        }
468        let updated_user_info = grant_default_privileges_automatically(&txn, view_obj.oid).await?;
469
470        txn.commit().await?;
471
472        let mut version = self
473            .notify_frontend_relation_info(NotificationOperation::Add, PbObjectInfo::View(pb_view))
474            .await;
475
476        // notify default privileges for views
477        if !updated_user_info.is_empty() {
478            version = self.notify_users_update(updated_user_info).await;
479        }
480
481        Ok(version)
482    }
483
484    pub async fn validate_cross_db_snapshot_backfill(
485        &self,
486        cross_db_snapshot_backfill_info: &SnapshotBackfillInfo,
487    ) -> MetaResult<()> {
488        if cross_db_snapshot_backfill_info
489            .upstream_mv_table_id_to_backfill_epoch
490            .is_empty()
491        {
492            return Ok(());
493        }
494
495        let inner = self.inner.read().await;
496        let table_ids = cross_db_snapshot_backfill_info
497            .upstream_mv_table_id_to_backfill_epoch
498            .keys()
499            .map(|t| t.as_raw_id() as ObjectId)
500            .collect_vec();
501        let cnt = Subscription::find()
502            .select_only()
503            .column(subscription::Column::DependentTableId)
504            .distinct()
505            .filter(subscription::Column::DependentTableId.is_in(table_ids))
506            .count(&inner.db)
507            .await? as usize;
508
509        if cnt
510            < cross_db_snapshot_backfill_info
511                .upstream_mv_table_id_to_backfill_epoch
512                .keys()
513                .count()
514        {
515            return Err(MetaError::permission_denied(
516                "Some upstream tables are not subscribed".to_owned(),
517            ));
518        }
519
520        Ok(())
521    }
522}