risingwave_meta/controller/catalog/
create_op.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::cast::datetime_to_timestamp_millis;
16use risingwave_common::catalog::{ICEBERG_SINK_PREFIX, ICEBERG_SOURCE_PREFIX};
17use risingwave_common::system_param::{OverrideValidate, Validate};
18use risingwave_common::util::epoch::Epoch;
19use risingwave_pb::common::PbObjectType;
20use risingwave_pb::meta::PbObjectDependency;
21
22use super::*;
23use crate::barrier::SnapshotBackfillInfo;
24
25impl CatalogController {
26    pub(crate) async fn create_object(
27        txn: &DatabaseTransaction,
28        obj_type: ObjectType,
29        owner_id: UserId,
30        database_id: Option<DatabaseId>,
31        schema_id: Option<SchemaId>,
32    ) -> MetaResult<object::Model> {
33        let active_db = object::ActiveModel {
34            oid: Default::default(),
35            obj_type: Set(obj_type),
36            owner_id: Set(owner_id),
37            schema_id: Set(schema_id),
38            database_id: Set(database_id),
39            initialized_at: Default::default(),
40            created_at: Default::default(),
41            initialized_at_cluster_version: Set(Some(current_cluster_version())),
42            created_at_cluster_version: Set(Some(current_cluster_version())),
43        };
44        Ok(active_db.insert(txn).await?)
45    }
46
47    pub async fn create_database(
48        &self,
49        db: PbDatabase,
50    ) -> MetaResult<(NotificationVersion, risingwave_meta_model::database::Model)> {
51        // validate first
52        if let Some(ref interval) = db.barrier_interval_ms {
53            OverrideValidate::barrier_interval_ms(interval).map_err(|e| anyhow::anyhow!(e))?;
54        }
55        if let Some(ref frequency) = db.checkpoint_frequency {
56            OverrideValidate::checkpoint_frequency(frequency).map_err(|e| anyhow::anyhow!(e))?;
57        }
58
59        let inner = self.inner.write().await;
60        let owner_id = db.owner as _;
61        let txn = inner.db.begin().await?;
62        ensure_user_id(owner_id, &txn).await?;
63        check_database_name_duplicate(&db.name, &txn).await?;
64
65        let db_obj = Self::create_object(&txn, ObjectType::Database, owner_id, None, None).await?;
66        let mut db: database::ActiveModel = db.into();
67        db.database_id = Set(db_obj.oid.as_database_id());
68        let db = db.insert(&txn).await?;
69
70        let mut schemas = vec![];
71        for schema_name in iter::once(DEFAULT_SCHEMA_NAME).chain(SYSTEM_SCHEMAS) {
72            let schema_obj = Self::create_object(
73                &txn,
74                ObjectType::Schema,
75                owner_id,
76                Some(db_obj.oid.as_database_id()),
77                None,
78            )
79            .await?;
80            let schema = schema::ActiveModel {
81                schema_id: Set(schema_obj.oid.as_schema_id()),
82                name: Set(schema_name.into()),
83            };
84            let schema = schema.insert(&txn).await?;
85            schemas.push(ObjectModel(schema, schema_obj, None).into());
86        }
87        txn.commit().await?;
88
89        let mut version = self
90            .notify_frontend(
91                NotificationOperation::Add,
92                NotificationInfo::Database(ObjectModel(db.clone(), db_obj, None).into()),
93            )
94            .await;
95        for schema in schemas {
96            version = self
97                .notify_frontend(NotificationOperation::Add, NotificationInfo::Schema(schema))
98                .await;
99        }
100
101        Ok((version, db))
102    }
103
104    pub async fn create_schema(&self, schema: PbSchema) -> MetaResult<NotificationVersion> {
105        let inner = self.inner.write().await;
106        let owner_id = schema.owner as _;
107        let txn = inner.db.begin().await?;
108        ensure_user_id(owner_id, &txn).await?;
109        ensure_object_id(ObjectType::Database, schema.database_id, &txn).await?;
110        check_schema_name_duplicate(&schema.name, schema.database_id, &txn).await?;
111
112        let schema_obj = Self::create_object(
113            &txn,
114            ObjectType::Schema,
115            owner_id,
116            Some(schema.database_id),
117            None,
118        )
119        .await?;
120        let mut schema: schema::ActiveModel = schema.into();
121        schema.schema_id = Set(schema_obj.oid.as_schema_id());
122        let schema = schema.insert(&txn).await?;
123
124        let updated_user_info =
125            grant_default_privileges_automatically(&txn, schema_obj.oid).await?;
126
127        txn.commit().await?;
128
129        let mut version = self
130            .notify_frontend(
131                NotificationOperation::Add,
132                NotificationInfo::Schema(ObjectModel(schema, schema_obj, None).into()),
133            )
134            .await;
135
136        // notify default privileges for schemas
137        if !updated_user_info.is_empty() {
138            version = self.notify_users_update(updated_user_info).await;
139        }
140
141        Ok(version)
142    }
143
144    pub async fn create_subscription_catalog(
145        &self,
146        pb_subscription: &mut PbSubscription,
147    ) -> MetaResult<()> {
148        let inner = self.inner.write().await;
149        let txn = inner.db.begin().await?;
150
151        ensure_user_id(pb_subscription.owner as _, &txn).await?;
152        ensure_object_id(ObjectType::Database, pb_subscription.database_id, &txn).await?;
153        ensure_object_id(ObjectType::Schema, pb_subscription.schema_id, &txn).await?;
154        check_subscription_name_duplicate(pb_subscription, &txn).await?;
155
156        let obj = Self::create_object(
157            &txn,
158            ObjectType::Subscription,
159            pb_subscription.owner as _,
160            Some(pb_subscription.database_id),
161            Some(pb_subscription.schema_id),
162        )
163        .await?;
164        pb_subscription.id = obj.oid.as_subscription_id();
165        let subscription: subscription::ActiveModel = pb_subscription.clone().into();
166        Subscription::insert(subscription).exec(&txn).await?;
167
168        // record object dependency.
169        ObjectDependency::insert(object_dependency::ActiveModel {
170            oid: Set(pb_subscription.dependent_table_id.into()),
171            used_by: Set(pb_subscription.id.into()),
172            ..Default::default()
173        })
174        .exec(&txn)
175        .await?;
176        txn.commit().await?;
177        Ok(())
178    }
179
180    pub async fn create_source(
181        &self,
182        mut pb_source: PbSource,
183    ) -> MetaResult<(SourceId, NotificationVersion)> {
184        let mut inner = self.inner.write().await;
185        let owner_id = pb_source.owner as _;
186        let txn = inner.db.begin().await?;
187        ensure_user_id(owner_id, &txn).await?;
188        ensure_object_id(ObjectType::Database, pb_source.database_id, &txn).await?;
189        ensure_object_id(ObjectType::Schema, pb_source.schema_id, &txn).await?;
190        check_relation_name_duplicate(
191            &pb_source.name,
192            pb_source.database_id,
193            pb_source.schema_id,
194            &txn,
195        )
196        .await?;
197
198        // handle secret ref
199        let secret_ids = get_referred_secret_ids_from_source(&pb_source)?;
200        let connection_ids = get_referred_connection_ids_from_source(&pb_source);
201
202        let source_obj = Self::create_object(
203            &txn,
204            ObjectType::Source,
205            owner_id,
206            Some(pb_source.database_id),
207            Some(pb_source.schema_id),
208        )
209        .await?;
210        let source_id = source_obj.oid.as_source_id();
211        pb_source.id = source_id;
212        let source: source::ActiveModel = pb_source.clone().into();
213        Source::insert(source).exec(&txn).await?;
214
215        // add secret and connection dependency
216        let dep_relation_ids = secret_ids
217            .iter()
218            .copied()
219            .map_into()
220            .chain(connection_ids.iter().copied().map_into());
221        let dependencies = if !secret_ids.is_empty() || !connection_ids.is_empty() {
222            ObjectDependency::insert_many(dep_relation_ids.map(|id| {
223                object_dependency::ActiveModel {
224                    oid: Set(id),
225                    used_by: Set(source_id.as_object_id()),
226                    ..Default::default()
227                }
228            }))
229            .exec(&txn)
230            .await?;
231            secret_ids
232                .iter()
233                .map(|id| PbObjectDependency {
234                    object_id: source_id.as_object_id(),
235                    referenced_object_id: id.as_object_id(),
236                    referenced_object_type: PbObjectType::Secret as _,
237                })
238                .chain(connection_ids.iter().map(|id| PbObjectDependency {
239                    object_id: source_id.as_object_id(),
240                    referenced_object_id: id.as_object_id(),
241                    referenced_object_type: PbObjectType::Connection as _,
242                }))
243                .collect_vec()
244        } else {
245            vec![]
246        };
247
248        let mut job_notifications = vec![];
249        let mut updated_user_info = vec![];
250        // check if it belongs to iceberg table
251        if pb_source.name.starts_with(ICEBERG_SOURCE_PREFIX) {
252            // 1. finish iceberg table job.
253            let table_name = pb_source.name.trim_start_matches(ICEBERG_SOURCE_PREFIX);
254            let table_id = Table::find()
255                .select_only()
256                .column(table::Column::TableId)
257                .join(JoinType::InnerJoin, table::Relation::Object1.def())
258                .filter(
259                    object::Column::DatabaseId
260                        .eq(pb_source.database_id)
261                        .and(object::Column::SchemaId.eq(pb_source.schema_id))
262                        .and(table::Column::Name.eq(table_name)),
263                )
264                .into_tuple::<TableId>()
265                .one(&txn)
266                .await?
267                .ok_or(MetaError::from(anyhow!("table {} not found", table_name)))?;
268            let table_notifications = self
269                .finish_streaming_job_inner(&txn, table_id.as_job_id())
270                .await?;
271            job_notifications.push((table_id.as_job_id(), table_notifications));
272
273            // 2. finish iceberg sink job.
274            let sink_name = format!("{}{}", ICEBERG_SINK_PREFIX, table_name);
275            let sink_id = Sink::find()
276                .select_only()
277                .column(sink::Column::SinkId)
278                .join(JoinType::InnerJoin, sink::Relation::Object.def())
279                .filter(
280                    object::Column::DatabaseId
281                        .eq(pb_source.database_id)
282                        .and(object::Column::SchemaId.eq(pb_source.schema_id))
283                        .and(sink::Column::Name.eq(&sink_name)),
284                )
285                .into_tuple::<SinkId>()
286                .one(&txn)
287                .await?
288                .ok_or(MetaError::from(anyhow!("sink {} not found", sink_name)))?;
289            let sink_job_id = sink_id.as_job_id();
290            let sink_notifications = self.finish_streaming_job_inner(&txn, sink_job_id).await?;
291            job_notifications.push((sink_job_id, sink_notifications));
292        } else {
293            updated_user_info = grant_default_privileges_automatically(&txn, source_id).await?;
294        }
295
296        txn.commit().await?;
297
298        for (job_id, (op, objects, user_info, dependencies)) in job_notifications {
299            let mut version = self
300                .notify_frontend(
301                    op,
302                    NotificationInfo::ObjectGroup(PbObjectGroup {
303                        objects,
304                        dependencies,
305                    }),
306                )
307                .await;
308            if !user_info.is_empty() {
309                version = self.notify_users_update(user_info).await;
310            }
311            inner
312                .creating_table_finish_notifier
313                .values_mut()
314                .for_each(|creating_tables| {
315                    if let Some(txs) = creating_tables.remove(&job_id) {
316                        for tx in txs {
317                            let _ = tx.send(Ok(version));
318                        }
319                    }
320                });
321        }
322
323        let mut version = self
324            .notify_frontend(
325                NotificationOperation::Add,
326                NotificationInfo::ObjectGroup(PbObjectGroup {
327                    objects: vec![PbObject {
328                        object_info: Some(PbObjectInfo::Source(pb_source)),
329                    }],
330                    dependencies,
331                }),
332            )
333            .await;
334
335        // notify default privileges for source
336        if !updated_user_info.is_empty() {
337            version = self.notify_users_update(updated_user_info).await;
338        }
339
340        Ok((source_id, version))
341    }
342
343    pub async fn create_function(
344        &self,
345        mut pb_function: PbFunction,
346    ) -> MetaResult<NotificationVersion> {
347        let inner = self.inner.write().await;
348        let owner_id = pb_function.owner as _;
349        let txn = inner.db.begin().await?;
350        ensure_user_id(owner_id, &txn).await?;
351        ensure_object_id(ObjectType::Database, pb_function.database_id, &txn).await?;
352        ensure_object_id(ObjectType::Schema, pb_function.schema_id, &txn).await?;
353        check_function_signature_duplicate(&pb_function, &txn).await?;
354
355        let function_obj = Self::create_object(
356            &txn,
357            ObjectType::Function,
358            owner_id,
359            Some(pb_function.database_id),
360            Some(pb_function.schema_id),
361        )
362        .await?;
363        pb_function.id = function_obj.oid.as_function_id();
364        pb_function.created_at_epoch = Some(
365            Epoch::from_unix_millis(datetime_to_timestamp_millis(function_obj.created_at) as _).0,
366        );
367        pb_function.created_at_cluster_version = function_obj.created_at_cluster_version;
368        let function: function::ActiveModel = pb_function.clone().into();
369        Function::insert(function).exec(&txn).await?;
370
371        let updated_user_info =
372            grant_default_privileges_automatically(&txn, function_obj.oid).await?;
373
374        txn.commit().await?;
375
376        let mut version = self
377            .notify_frontend(
378                NotificationOperation::Add,
379                NotificationInfo::Function(pb_function),
380            )
381            .await;
382
383        // notify default privileges for functions
384        if !updated_user_info.is_empty() {
385            version = self.notify_users_update(updated_user_info).await;
386        }
387
388        Ok(version)
389    }
390
391    pub async fn create_connection(
392        &self,
393        mut pb_connection: PbConnection,
394    ) -> MetaResult<NotificationVersion> {
395        let inner = self.inner.write().await;
396        let owner_id = pb_connection.owner as _;
397        let txn = inner.db.begin().await?;
398        ensure_user_id(owner_id, &txn).await?;
399        ensure_object_id(ObjectType::Database, pb_connection.database_id, &txn).await?;
400        ensure_object_id(ObjectType::Schema, pb_connection.schema_id, &txn).await?;
401        check_connection_name_duplicate(&pb_connection, &txn).await?;
402
403        let mut dep_secrets: HashSet<SecretId> = HashSet::new();
404        if let Some(ConnectionInfo::ConnectionParams(params)) = &pb_connection.info {
405            dep_secrets.extend(
406                params
407                    .secret_refs
408                    .values()
409                    .map(|secret_ref| secret_ref.secret_id),
410            );
411        }
412
413        let conn_obj = Self::create_object(
414            &txn,
415            ObjectType::Connection,
416            owner_id,
417            Some(pb_connection.database_id),
418            Some(pb_connection.schema_id),
419        )
420        .await?;
421        pb_connection.id = conn_obj.oid.as_connection_id();
422        let connection: connection::ActiveModel = pb_connection.clone().into();
423        Connection::insert(connection).exec(&txn).await?;
424
425        for secret_id in &dep_secrets {
426            ObjectDependency::insert(object_dependency::ActiveModel {
427                oid: Set(secret_id.as_object_id()),
428                used_by: Set(conn_obj.oid),
429                ..Default::default()
430            })
431            .exec(&txn)
432            .await?;
433        }
434
435        let updated_user_info = grant_default_privileges_automatically(&txn, conn_obj.oid).await?;
436
437        txn.commit().await?;
438
439        {
440            // call meta telemetry here to report the connection creation
441            report_event(
442                PbTelemetryEventStage::Unspecified,
443                "connection_create",
444                pb_connection.get_id().as_raw_id() as i64,
445                {
446                    pb_connection.info.as_ref().and_then(|info| match info {
447                        ConnectionInfo::ConnectionParams(params) => {
448                            Some(params.connection_type().as_str_name().to_owned())
449                        }
450                        _ => None,
451                    })
452                },
453                None,
454                None,
455            );
456        }
457
458        let mut version = self
459            .notify_frontend(
460                NotificationOperation::Add,
461                NotificationInfo::ObjectGroup(PbObjectGroup {
462                    objects: vec![PbObject {
463                        object_info: Some(PbObjectInfo::Connection(pb_connection)),
464                    }],
465                    dependencies: dep_secrets
466                        .iter()
467                        .map(|secret_id| PbObjectDependency {
468                            object_id: conn_obj.oid,
469                            referenced_object_id: secret_id.as_object_id(),
470                            referenced_object_type: PbObjectType::Secret as _,
471                        })
472                        .collect(),
473                }),
474            )
475            .await;
476
477        // notify default privileges for connections
478        if !updated_user_info.is_empty() {
479            version = self.notify_users_update(updated_user_info).await;
480        }
481
482        Ok(version)
483    }
484
485    pub async fn create_secret(
486        &self,
487        mut pb_secret: PbSecret,
488        secret_plain_payload: Vec<u8>,
489    ) -> MetaResult<NotificationVersion> {
490        let inner = self.inner.write().await;
491        let owner_id = pb_secret.owner as _;
492        let txn = inner.db.begin().await?;
493        ensure_user_id(owner_id, &txn).await?;
494        ensure_object_id(ObjectType::Database, pb_secret.database_id, &txn).await?;
495        ensure_object_id(ObjectType::Schema, pb_secret.schema_id, &txn).await?;
496        check_secret_name_duplicate(&pb_secret, &txn).await?;
497
498        let secret_obj = Self::create_object(
499            &txn,
500            ObjectType::Secret,
501            owner_id,
502            Some(pb_secret.database_id),
503            Some(pb_secret.schema_id),
504        )
505        .await?;
506        pb_secret.id = secret_obj.oid.as_secret_id();
507        let secret: secret::ActiveModel = pb_secret.clone().into();
508        Secret::insert(secret).exec(&txn).await?;
509
510        let updated_user_info =
511            grant_default_privileges_automatically(&txn, secret_obj.oid).await?;
512
513        txn.commit().await?;
514
515        // Notify the compute and frontend node plain secret
516        let mut secret_plain = pb_secret;
517        secret_plain.value.clone_from(&secret_plain_payload);
518
519        LocalSecretManager::global().add_secret(secret_plain.id, secret_plain_payload);
520        self.env
521            .notification_manager()
522            .notify_compute_without_version(Operation::Add, Info::Secret(secret_plain.clone()));
523
524        let mut version = self
525            .notify_frontend(
526                NotificationOperation::Add,
527                NotificationInfo::Secret(secret_plain),
528            )
529            .await;
530
531        // notify default privileges for secrets
532        if !updated_user_info.is_empty() {
533            version = self.notify_users_update(updated_user_info).await;
534        }
535
536        Ok(version)
537    }
538
539    pub async fn create_view(
540        &self,
541        mut pb_view: PbView,
542        dependencies: HashSet<ObjectId>,
543    ) -> MetaResult<NotificationVersion> {
544        let inner = self.inner.write().await;
545        let owner_id = pb_view.owner as _;
546        let txn = inner.db.begin().await?;
547        ensure_user_id(owner_id, &txn).await?;
548        ensure_object_id(ObjectType::Database, pb_view.database_id, &txn).await?;
549        ensure_object_id(ObjectType::Schema, pb_view.schema_id, &txn).await?;
550        check_relation_name_duplicate(&pb_view.name, pb_view.database_id, pb_view.schema_id, &txn)
551            .await?;
552        ensure_object_id(ObjectType::Schema, pb_view.schema_id, &txn).await?;
553        check_relation_name_duplicate(&pb_view.name, pb_view.database_id, pb_view.schema_id, &txn)
554            .await?;
555
556        let view_obj = Self::create_object(
557            &txn,
558            ObjectType::View,
559            owner_id,
560            Some(pb_view.database_id),
561            Some(pb_view.schema_id),
562        )
563        .await?;
564        pb_view.id = view_obj.oid.as_view_id();
565        pb_view.created_at_epoch =
566            Some(Epoch::from_unix_millis(datetime_to_timestamp_millis(view_obj.created_at) as _).0);
567        pb_view.created_at_cluster_version = view_obj.created_at_cluster_version;
568
569        let view: view::ActiveModel = pb_view.clone().into();
570        View::insert(view).exec(&txn).await?;
571
572        let dependencies = if !dependencies.is_empty() {
573            ObjectDependency::insert_many(dependencies.into_iter().map(|obj_id| {
574                object_dependency::ActiveModel {
575                    oid: Set(obj_id),
576                    used_by: Set(view_obj.oid),
577                    ..Default::default()
578                }
579            }))
580            .exec(&txn)
581            .await?;
582            list_object_dependencies_by_object_id(&txn, view_obj.oid).await?
583        } else {
584            vec![]
585        };
586
587        let updated_user_info = grant_default_privileges_automatically(&txn, view_obj.oid).await?;
588
589        txn.commit().await?;
590        let mut version = self
591            .notify_frontend(
592                NotificationOperation::Add,
593                NotificationInfo::ObjectGroup(PbObjectGroup {
594                    objects: vec![PbObject {
595                        object_info: Some(PbObjectInfo::View(pb_view)),
596                    }],
597                    dependencies,
598                }),
599            )
600            .await;
601
602        // notify default privileges for views
603        if !updated_user_info.is_empty() {
604            version = self.notify_users_update(updated_user_info).await;
605        }
606
607        Ok(version)
608    }
609
610    pub async fn validate_cross_db_snapshot_backfill(
611        &self,
612        cross_db_snapshot_backfill_info: &SnapshotBackfillInfo,
613    ) -> MetaResult<()> {
614        if cross_db_snapshot_backfill_info
615            .upstream_mv_table_id_to_backfill_epoch
616            .is_empty()
617        {
618            return Ok(());
619        }
620
621        let inner = self.inner.read().await;
622        let table_ids = cross_db_snapshot_backfill_info
623            .upstream_mv_table_id_to_backfill_epoch
624            .keys()
625            .copied()
626            .map_into()
627            .collect_vec();
628        let cnt = Subscription::find()
629            .select_only()
630            .column(subscription::Column::DependentTableId)
631            .distinct()
632            .filter(subscription::Column::DependentTableId.is_in::<TableId, _>(table_ids))
633            .count(&inner.db)
634            .await? as usize;
635
636        if cnt
637            < cross_db_snapshot_backfill_info
638                .upstream_mv_table_id_to_backfill_epoch
639                .keys()
640                .count()
641        {
642            return Err(MetaError::permission_denied(
643                "Some upstream tables are not subscribed".to_owned(),
644            ));
645        }
646
647        Ok(())
648    }
649}