risingwave_meta/controller/catalog/
create_op.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::catalog::{ICEBERG_SINK_PREFIX, ICEBERG_SOURCE_PREFIX};
16use risingwave_common::system_param::{OverrideValidate, Validate};
17use risingwave_common::util::epoch::Epoch;
18
19use super::*;
20use crate::barrier::SnapshotBackfillInfo;
21
22impl CatalogController {
23    pub(crate) async fn create_object(
24        txn: &DatabaseTransaction,
25        obj_type: ObjectType,
26        owner_id: UserId,
27        database_id: Option<DatabaseId>,
28        schema_id: Option<SchemaId>,
29    ) -> MetaResult<object::Model> {
30        let active_db = object::ActiveModel {
31            oid: Default::default(),
32            obj_type: Set(obj_type),
33            owner_id: Set(owner_id),
34            schema_id: Set(schema_id),
35            database_id: Set(database_id),
36            initialized_at: Default::default(),
37            created_at: Default::default(),
38            initialized_at_cluster_version: Set(Some(current_cluster_version())),
39            created_at_cluster_version: Set(Some(current_cluster_version())),
40        };
41        Ok(active_db.insert(txn).await?)
42    }
43
44    pub async fn create_database(
45        &self,
46        db: PbDatabase,
47    ) -> MetaResult<(NotificationVersion, risingwave_meta_model::database::Model)> {
48        // validate first
49        if let Some(ref interval) = db.barrier_interval_ms {
50            OverrideValidate::barrier_interval_ms(interval).map_err(|e| anyhow::anyhow!(e))?;
51        }
52        if let Some(ref frequency) = db.checkpoint_frequency {
53            OverrideValidate::checkpoint_frequency(frequency).map_err(|e| anyhow::anyhow!(e))?;
54        }
55
56        let inner = self.inner.write().await;
57        let owner_id = db.owner as _;
58        let txn = inner.db.begin().await?;
59        ensure_user_id(owner_id, &txn).await?;
60        check_database_name_duplicate(&db.name, &txn).await?;
61
62        let db_obj = Self::create_object(&txn, ObjectType::Database, owner_id, None, None).await?;
63        let mut db: database::ActiveModel = db.into();
64        db.database_id = Set(db_obj.oid.as_database_id());
65        let db = db.insert(&txn).await?;
66
67        let mut schemas = vec![];
68        for schema_name in iter::once(DEFAULT_SCHEMA_NAME).chain(SYSTEM_SCHEMAS) {
69            let schema_obj = Self::create_object(
70                &txn,
71                ObjectType::Schema,
72                owner_id,
73                Some(db_obj.oid.as_database_id()),
74                None,
75            )
76            .await?;
77            let schema = schema::ActiveModel {
78                schema_id: Set(schema_obj.oid.as_schema_id()),
79                name: Set(schema_name.into()),
80            };
81            let schema = schema.insert(&txn).await?;
82            schemas.push(ObjectModel(schema, schema_obj).into());
83        }
84        txn.commit().await?;
85
86        let mut version = self
87            .notify_frontend(
88                NotificationOperation::Add,
89                NotificationInfo::Database(ObjectModel(db.clone(), db_obj).into()),
90            )
91            .await;
92        for schema in schemas {
93            version = self
94                .notify_frontend(NotificationOperation::Add, NotificationInfo::Schema(schema))
95                .await;
96        }
97
98        Ok((version, db))
99    }
100
101    pub async fn create_schema(&self, schema: PbSchema) -> MetaResult<NotificationVersion> {
102        let inner = self.inner.write().await;
103        let owner_id = schema.owner as _;
104        let txn = inner.db.begin().await?;
105        ensure_user_id(owner_id, &txn).await?;
106        ensure_object_id(ObjectType::Database, schema.database_id, &txn).await?;
107        check_schema_name_duplicate(&schema.name, schema.database_id, &txn).await?;
108
109        let schema_obj = Self::create_object(
110            &txn,
111            ObjectType::Schema,
112            owner_id,
113            Some(schema.database_id),
114            None,
115        )
116        .await?;
117        let mut schema: schema::ActiveModel = schema.into();
118        schema.schema_id = Set(schema_obj.oid.as_schema_id());
119        let schema = schema.insert(&txn).await?;
120
121        let updated_user_info =
122            grant_default_privileges_automatically(&txn, schema_obj.oid).await?;
123
124        txn.commit().await?;
125
126        let mut version = self
127            .notify_frontend(
128                NotificationOperation::Add,
129                NotificationInfo::Schema(ObjectModel(schema, schema_obj).into()),
130            )
131            .await;
132
133        // notify default privileges for schemas
134        if !updated_user_info.is_empty() {
135            version = self.notify_users_update(updated_user_info).await;
136        }
137
138        Ok(version)
139    }
140
141    pub async fn create_subscription_catalog(
142        &self,
143        pb_subscription: &mut PbSubscription,
144    ) -> MetaResult<()> {
145        let inner = self.inner.write().await;
146        let txn = inner.db.begin().await?;
147
148        ensure_user_id(pb_subscription.owner as _, &txn).await?;
149        ensure_object_id(ObjectType::Database, pb_subscription.database_id, &txn).await?;
150        ensure_object_id(ObjectType::Schema, pb_subscription.schema_id, &txn).await?;
151        check_subscription_name_duplicate(pb_subscription, &txn).await?;
152
153        let obj = Self::create_object(
154            &txn,
155            ObjectType::Subscription,
156            pb_subscription.owner as _,
157            Some(pb_subscription.database_id),
158            Some(pb_subscription.schema_id),
159        )
160        .await?;
161        pb_subscription.id = obj.oid.as_subscription_id();
162        let subscription: subscription::ActiveModel = pb_subscription.clone().into();
163        Subscription::insert(subscription).exec(&txn).await?;
164
165        // record object dependency.
166        ObjectDependency::insert(object_dependency::ActiveModel {
167            oid: Set(pb_subscription.dependent_table_id.into()),
168            used_by: Set(pb_subscription.id.into()),
169            ..Default::default()
170        })
171        .exec(&txn)
172        .await?;
173        txn.commit().await?;
174        Ok(())
175    }
176
177    pub async fn create_source(
178        &self,
179        mut pb_source: PbSource,
180    ) -> MetaResult<(SourceId, NotificationVersion)> {
181        let mut inner = self.inner.write().await;
182        let owner_id = pb_source.owner as _;
183        let txn = inner.db.begin().await?;
184        ensure_user_id(owner_id, &txn).await?;
185        ensure_object_id(ObjectType::Database, pb_source.database_id, &txn).await?;
186        ensure_object_id(ObjectType::Schema, pb_source.schema_id, &txn).await?;
187        check_relation_name_duplicate(
188            &pb_source.name,
189            pb_source.database_id,
190            pb_source.schema_id,
191            &txn,
192        )
193        .await?;
194
195        // handle secret ref
196        let secret_ids = get_referred_secret_ids_from_source(&pb_source)?;
197        let connection_ids = get_referred_connection_ids_from_source(&pb_source);
198
199        let source_obj = Self::create_object(
200            &txn,
201            ObjectType::Source,
202            owner_id,
203            Some(pb_source.database_id),
204            Some(pb_source.schema_id),
205        )
206        .await?;
207        let source_id = source_obj.oid.as_source_id();
208        pb_source.id = source_id;
209        let source: source::ActiveModel = pb_source.clone().into();
210        Source::insert(source).exec(&txn).await?;
211
212        // add secret and connection dependency
213        let dep_relation_ids = secret_ids
214            .iter()
215            .copied()
216            .map_into()
217            .chain(connection_ids.iter().copied().map_into());
218        if !secret_ids.is_empty() || !connection_ids.is_empty() {
219            ObjectDependency::insert_many(dep_relation_ids.map(|id| {
220                object_dependency::ActiveModel {
221                    oid: Set(id),
222                    used_by: Set(source_id.as_object_id()),
223                    ..Default::default()
224                }
225            }))
226            .exec(&txn)
227            .await?;
228        }
229
230        let mut job_notifications = vec![];
231        let mut updated_user_info = vec![];
232        // check if it belongs to iceberg table
233        if pb_source.name.starts_with(ICEBERG_SOURCE_PREFIX) {
234            // 1. finish iceberg table job.
235            let table_name = pb_source.name.trim_start_matches(ICEBERG_SOURCE_PREFIX);
236            let table_id = Table::find()
237                .select_only()
238                .column(table::Column::TableId)
239                .join(JoinType::InnerJoin, table::Relation::Object1.def())
240                .filter(
241                    object::Column::DatabaseId
242                        .eq(pb_source.database_id)
243                        .and(object::Column::SchemaId.eq(pb_source.schema_id))
244                        .and(table::Column::Name.eq(table_name)),
245                )
246                .into_tuple::<TableId>()
247                .one(&txn)
248                .await?
249                .ok_or(MetaError::from(anyhow!("table {} not found", table_name)))?;
250            let table_notifications =
251                Self::finish_streaming_job_inner(&txn, table_id.as_job_id()).await?;
252            job_notifications.push((table_id.as_job_id(), table_notifications));
253
254            // 2. finish iceberg sink job.
255            let sink_name = format!("{}{}", ICEBERG_SINK_PREFIX, table_name);
256            let sink_id = Sink::find()
257                .select_only()
258                .column(sink::Column::SinkId)
259                .join(JoinType::InnerJoin, sink::Relation::Object.def())
260                .filter(
261                    object::Column::DatabaseId
262                        .eq(pb_source.database_id)
263                        .and(object::Column::SchemaId.eq(pb_source.schema_id))
264                        .and(sink::Column::Name.eq(&sink_name)),
265                )
266                .into_tuple::<SinkId>()
267                .one(&txn)
268                .await?
269                .ok_or(MetaError::from(anyhow!("sink {} not found", sink_name)))?;
270            let sink_job_id = sink_id.as_job_id();
271            let sink_notifications = Self::finish_streaming_job_inner(&txn, sink_job_id).await?;
272            job_notifications.push((sink_job_id, sink_notifications));
273        } else {
274            updated_user_info = grant_default_privileges_automatically(&txn, source_id).await?;
275        }
276
277        txn.commit().await?;
278
279        for (job_id, (op, objects, user_info)) in job_notifications {
280            let mut version = self
281                .notify_frontend(op, NotificationInfo::ObjectGroup(PbObjectGroup { objects }))
282                .await;
283            if !user_info.is_empty() {
284                version = self.notify_users_update(user_info).await;
285            }
286            inner
287                .creating_table_finish_notifier
288                .values_mut()
289                .for_each(|creating_tables| {
290                    if let Some(txs) = creating_tables.remove(&job_id) {
291                        for tx in txs {
292                            let _ = tx.send(Ok(version));
293                        }
294                    }
295                });
296        }
297
298        let mut version = self
299            .notify_frontend_relation_info(
300                NotificationOperation::Add,
301                PbObjectInfo::Source(pb_source),
302            )
303            .await;
304
305        // notify default privileges for source
306        if !updated_user_info.is_empty() {
307            version = self.notify_users_update(updated_user_info).await;
308        }
309
310        Ok((source_id, version))
311    }
312
313    pub async fn create_function(
314        &self,
315        mut pb_function: PbFunction,
316    ) -> MetaResult<NotificationVersion> {
317        let inner = self.inner.write().await;
318        let owner_id = pb_function.owner as _;
319        let txn = inner.db.begin().await?;
320        ensure_user_id(owner_id, &txn).await?;
321        ensure_object_id(ObjectType::Database, pb_function.database_id, &txn).await?;
322        ensure_object_id(ObjectType::Schema, pb_function.schema_id, &txn).await?;
323        check_function_signature_duplicate(&pb_function, &txn).await?;
324
325        let function_obj = Self::create_object(
326            &txn,
327            ObjectType::Function,
328            owner_id,
329            Some(pb_function.database_id),
330            Some(pb_function.schema_id),
331        )
332        .await?;
333        pb_function.id = function_obj.oid.as_function_id();
334        pb_function.created_at_epoch = Some(
335            Epoch::from_unix_millis(function_obj.created_at.and_utc().timestamp_millis() as _).0,
336        );
337        pb_function.created_at_cluster_version = function_obj.created_at_cluster_version;
338        let function: function::ActiveModel = pb_function.clone().into();
339        Function::insert(function).exec(&txn).await?;
340
341        let updated_user_info =
342            grant_default_privileges_automatically(&txn, function_obj.oid).await?;
343
344        txn.commit().await?;
345
346        let mut version = self
347            .notify_frontend(
348                NotificationOperation::Add,
349                NotificationInfo::Function(pb_function),
350            )
351            .await;
352
353        // notify default privileges for functions
354        if !updated_user_info.is_empty() {
355            version = self.notify_users_update(updated_user_info).await;
356        }
357
358        Ok(version)
359    }
360
361    pub async fn create_connection(
362        &self,
363        mut pb_connection: PbConnection,
364    ) -> MetaResult<NotificationVersion> {
365        let inner = self.inner.write().await;
366        let owner_id = pb_connection.owner as _;
367        let txn = inner.db.begin().await?;
368        ensure_user_id(owner_id, &txn).await?;
369        ensure_object_id(ObjectType::Database, pb_connection.database_id, &txn).await?;
370        ensure_object_id(ObjectType::Schema, pb_connection.schema_id, &txn).await?;
371        check_connection_name_duplicate(&pb_connection, &txn).await?;
372
373        let mut dep_secrets: HashSet<SecretId> = HashSet::new();
374        if let Some(ConnectionInfo::ConnectionParams(params)) = &pb_connection.info {
375            dep_secrets.extend(
376                params
377                    .secret_refs
378                    .values()
379                    .map(|secret_ref| secret_ref.secret_id),
380            );
381        }
382
383        let conn_obj = Self::create_object(
384            &txn,
385            ObjectType::Connection,
386            owner_id,
387            Some(pb_connection.database_id),
388            Some(pb_connection.schema_id),
389        )
390        .await?;
391        pb_connection.id = conn_obj.oid.as_connection_id();
392        let connection: connection::ActiveModel = pb_connection.clone().into();
393        Connection::insert(connection).exec(&txn).await?;
394
395        for secret_id in dep_secrets {
396            ObjectDependency::insert(object_dependency::ActiveModel {
397                oid: Set(secret_id.as_object_id()),
398                used_by: Set(conn_obj.oid),
399                ..Default::default()
400            })
401            .exec(&txn)
402            .await?;
403        }
404
405        let updated_user_info = grant_default_privileges_automatically(&txn, conn_obj.oid).await?;
406
407        txn.commit().await?;
408
409        {
410            // call meta telemetry here to report the connection creation
411            report_event(
412                PbTelemetryEventStage::Unspecified,
413                "connection_create",
414                pb_connection.get_id().as_raw_id() as i64,
415                {
416                    pb_connection.info.as_ref().and_then(|info| match info {
417                        ConnectionInfo::ConnectionParams(params) => {
418                            Some(params.connection_type().as_str_name().to_owned())
419                        }
420                        _ => None,
421                    })
422                },
423                None,
424                None,
425            );
426        }
427
428        let mut version = self
429            .notify_frontend(
430                NotificationOperation::Add,
431                NotificationInfo::Connection(pb_connection),
432            )
433            .await;
434
435        // notify default privileges for connections
436        if !updated_user_info.is_empty() {
437            version = self.notify_users_update(updated_user_info).await;
438        }
439
440        Ok(version)
441    }
442
443    pub async fn create_secret(
444        &self,
445        mut pb_secret: PbSecret,
446        secret_plain_payload: Vec<u8>,
447    ) -> MetaResult<NotificationVersion> {
448        let inner = self.inner.write().await;
449        let owner_id = pb_secret.owner as _;
450        let txn = inner.db.begin().await?;
451        ensure_user_id(owner_id, &txn).await?;
452        ensure_object_id(ObjectType::Database, pb_secret.database_id, &txn).await?;
453        ensure_object_id(ObjectType::Schema, pb_secret.schema_id, &txn).await?;
454        check_secret_name_duplicate(&pb_secret, &txn).await?;
455
456        let secret_obj = Self::create_object(
457            &txn,
458            ObjectType::Secret,
459            owner_id,
460            Some(pb_secret.database_id),
461            Some(pb_secret.schema_id),
462        )
463        .await?;
464        pb_secret.id = secret_obj.oid.as_secret_id();
465        let secret: secret::ActiveModel = pb_secret.clone().into();
466        Secret::insert(secret).exec(&txn).await?;
467
468        let updated_user_info =
469            grant_default_privileges_automatically(&txn, secret_obj.oid).await?;
470
471        txn.commit().await?;
472
473        // Notify the compute and frontend node plain secret
474        let mut secret_plain = pb_secret;
475        secret_plain.value.clone_from(&secret_plain_payload);
476
477        LocalSecretManager::global().add_secret(secret_plain.id, secret_plain_payload);
478        self.env
479            .notification_manager()
480            .notify_compute_without_version(Operation::Add, Info::Secret(secret_plain.clone()));
481
482        let mut version = self
483            .notify_frontend(
484                NotificationOperation::Add,
485                NotificationInfo::Secret(secret_plain),
486            )
487            .await;
488
489        // notify default privileges for secrets
490        if !updated_user_info.is_empty() {
491            version = self.notify_users_update(updated_user_info).await;
492        }
493
494        Ok(version)
495    }
496
497    pub async fn create_view(
498        &self,
499        mut pb_view: PbView,
500        dependencies: HashSet<ObjectId>,
501    ) -> MetaResult<NotificationVersion> {
502        let inner = self.inner.write().await;
503        let owner_id = pb_view.owner as _;
504        let txn = inner.db.begin().await?;
505        ensure_user_id(owner_id, &txn).await?;
506        ensure_object_id(ObjectType::Database, pb_view.database_id, &txn).await?;
507        ensure_object_id(ObjectType::Schema, pb_view.schema_id, &txn).await?;
508        check_relation_name_duplicate(&pb_view.name, pb_view.database_id, pb_view.schema_id, &txn)
509            .await?;
510        ensure_object_id(ObjectType::Schema, pb_view.schema_id, &txn).await?;
511        check_relation_name_duplicate(&pb_view.name, pb_view.database_id, pb_view.schema_id, &txn)
512            .await?;
513
514        let view_obj = Self::create_object(
515            &txn,
516            ObjectType::View,
517            owner_id,
518            Some(pb_view.database_id),
519            Some(pb_view.schema_id),
520        )
521        .await?;
522        pb_view.id = view_obj.oid.as_view_id();
523        pb_view.created_at_epoch =
524            Some(Epoch::from_unix_millis(view_obj.created_at.and_utc().timestamp_millis() as _).0);
525        pb_view.created_at_cluster_version = view_obj.created_at_cluster_version;
526
527        let view: view::ActiveModel = pb_view.clone().into();
528        View::insert(view).exec(&txn).await?;
529
530        for obj_id in dependencies {
531            ObjectDependency::insert(object_dependency::ActiveModel {
532                oid: Set(obj_id),
533                used_by: Set(view_obj.oid),
534                ..Default::default()
535            })
536            .exec(&txn)
537            .await?;
538        }
539
540        let updated_user_info = grant_default_privileges_automatically(&txn, view_obj.oid).await?;
541
542        txn.commit().await?;
543
544        let mut version = self
545            .notify_frontend_relation_info(NotificationOperation::Add, PbObjectInfo::View(pb_view))
546            .await;
547
548        // notify default privileges for views
549        if !updated_user_info.is_empty() {
550            version = self.notify_users_update(updated_user_info).await;
551        }
552
553        Ok(version)
554    }
555
556    pub async fn validate_cross_db_snapshot_backfill(
557        &self,
558        cross_db_snapshot_backfill_info: &SnapshotBackfillInfo,
559    ) -> MetaResult<()> {
560        if cross_db_snapshot_backfill_info
561            .upstream_mv_table_id_to_backfill_epoch
562            .is_empty()
563        {
564            return Ok(());
565        }
566
567        let inner = self.inner.read().await;
568        let table_ids = cross_db_snapshot_backfill_info
569            .upstream_mv_table_id_to_backfill_epoch
570            .keys()
571            .copied()
572            .map_into()
573            .collect_vec();
574        let cnt = Subscription::find()
575            .select_only()
576            .column(subscription::Column::DependentTableId)
577            .distinct()
578            .filter(subscription::Column::DependentTableId.is_in::<TableId, _>(table_ids))
579            .count(&inner.db)
580            .await? as usize;
581
582        if cnt
583            < cross_db_snapshot_backfill_info
584                .upstream_mv_table_id_to_backfill_epoch
585                .keys()
586                .count()
587        {
588            return Err(MetaError::permission_denied(
589                "Some upstream tables are not subscribed".to_owned(),
590            ));
591        }
592
593        Ok(())
594    }
595}