risingwave_meta/controller/catalog/
create_op.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::cast::datetime_to_timestamp_millis;
16use risingwave_common::catalog::{ICEBERG_SINK_PREFIX, ICEBERG_SOURCE_PREFIX};
17use risingwave_common::system_param::{OverrideValidate, Validate};
18use risingwave_common::util::epoch::Epoch;
19
20use super::*;
21use crate::barrier::SnapshotBackfillInfo;
22
23impl CatalogController {
24    pub(crate) async fn create_object(
25        txn: &DatabaseTransaction,
26        obj_type: ObjectType,
27        owner_id: UserId,
28        database_id: Option<DatabaseId>,
29        schema_id: Option<SchemaId>,
30    ) -> MetaResult<object::Model> {
31        let active_db = object::ActiveModel {
32            oid: Default::default(),
33            obj_type: Set(obj_type),
34            owner_id: Set(owner_id),
35            schema_id: Set(schema_id),
36            database_id: Set(database_id),
37            initialized_at: Default::default(),
38            created_at: Default::default(),
39            initialized_at_cluster_version: Set(Some(current_cluster_version())),
40            created_at_cluster_version: Set(Some(current_cluster_version())),
41        };
42        Ok(active_db.insert(txn).await?)
43    }
44
45    pub async fn create_database(
46        &self,
47        db: PbDatabase,
48    ) -> MetaResult<(NotificationVersion, risingwave_meta_model::database::Model)> {
49        // validate first
50        if let Some(ref interval) = db.barrier_interval_ms {
51            OverrideValidate::barrier_interval_ms(interval).map_err(|e| anyhow::anyhow!(e))?;
52        }
53        if let Some(ref frequency) = db.checkpoint_frequency {
54            OverrideValidate::checkpoint_frequency(frequency).map_err(|e| anyhow::anyhow!(e))?;
55        }
56
57        let inner = self.inner.write().await;
58        let owner_id = db.owner as _;
59        let txn = inner.db.begin().await?;
60        ensure_user_id(owner_id, &txn).await?;
61        check_database_name_duplicate(&db.name, &txn).await?;
62
63        let db_obj = Self::create_object(&txn, ObjectType::Database, owner_id, None, None).await?;
64        let mut db: database::ActiveModel = db.into();
65        db.database_id = Set(db_obj.oid.as_database_id());
66        let db = db.insert(&txn).await?;
67
68        let mut schemas = vec![];
69        for schema_name in iter::once(DEFAULT_SCHEMA_NAME).chain(SYSTEM_SCHEMAS) {
70            let schema_obj = Self::create_object(
71                &txn,
72                ObjectType::Schema,
73                owner_id,
74                Some(db_obj.oid.as_database_id()),
75                None,
76            )
77            .await?;
78            let schema = schema::ActiveModel {
79                schema_id: Set(schema_obj.oid.as_schema_id()),
80                name: Set(schema_name.into()),
81            };
82            let schema = schema.insert(&txn).await?;
83            schemas.push(ObjectModel(schema, schema_obj).into());
84        }
85        txn.commit().await?;
86
87        let mut version = self
88            .notify_frontend(
89                NotificationOperation::Add,
90                NotificationInfo::Database(ObjectModel(db.clone(), db_obj).into()),
91            )
92            .await;
93        for schema in schemas {
94            version = self
95                .notify_frontend(NotificationOperation::Add, NotificationInfo::Schema(schema))
96                .await;
97        }
98
99        Ok((version, db))
100    }
101
102    pub async fn create_schema(&self, schema: PbSchema) -> MetaResult<NotificationVersion> {
103        let inner = self.inner.write().await;
104        let owner_id = schema.owner as _;
105        let txn = inner.db.begin().await?;
106        ensure_user_id(owner_id, &txn).await?;
107        ensure_object_id(ObjectType::Database, schema.database_id, &txn).await?;
108        check_schema_name_duplicate(&schema.name, schema.database_id, &txn).await?;
109
110        let schema_obj = Self::create_object(
111            &txn,
112            ObjectType::Schema,
113            owner_id,
114            Some(schema.database_id),
115            None,
116        )
117        .await?;
118        let mut schema: schema::ActiveModel = schema.into();
119        schema.schema_id = Set(schema_obj.oid.as_schema_id());
120        let schema = schema.insert(&txn).await?;
121
122        let updated_user_info =
123            grant_default_privileges_automatically(&txn, schema_obj.oid).await?;
124
125        txn.commit().await?;
126
127        let mut version = self
128            .notify_frontend(
129                NotificationOperation::Add,
130                NotificationInfo::Schema(ObjectModel(schema, schema_obj).into()),
131            )
132            .await;
133
134        // notify default privileges for schemas
135        if !updated_user_info.is_empty() {
136            version = self.notify_users_update(updated_user_info).await;
137        }
138
139        Ok(version)
140    }
141
142    pub async fn create_subscription_catalog(
143        &self,
144        pb_subscription: &mut PbSubscription,
145    ) -> MetaResult<()> {
146        let inner = self.inner.write().await;
147        let txn = inner.db.begin().await?;
148
149        ensure_user_id(pb_subscription.owner as _, &txn).await?;
150        ensure_object_id(ObjectType::Database, pb_subscription.database_id, &txn).await?;
151        ensure_object_id(ObjectType::Schema, pb_subscription.schema_id, &txn).await?;
152        check_subscription_name_duplicate(pb_subscription, &txn).await?;
153
154        let obj = Self::create_object(
155            &txn,
156            ObjectType::Subscription,
157            pb_subscription.owner as _,
158            Some(pb_subscription.database_id),
159            Some(pb_subscription.schema_id),
160        )
161        .await?;
162        pb_subscription.id = obj.oid.as_subscription_id();
163        let subscription: subscription::ActiveModel = pb_subscription.clone().into();
164        Subscription::insert(subscription).exec(&txn).await?;
165
166        // record object dependency.
167        ObjectDependency::insert(object_dependency::ActiveModel {
168            oid: Set(pb_subscription.dependent_table_id.into()),
169            used_by: Set(pb_subscription.id.into()),
170            ..Default::default()
171        })
172        .exec(&txn)
173        .await?;
174        txn.commit().await?;
175        Ok(())
176    }
177
178    pub async fn create_source(
179        &self,
180        mut pb_source: PbSource,
181    ) -> MetaResult<(SourceId, NotificationVersion)> {
182        let mut inner = self.inner.write().await;
183        let owner_id = pb_source.owner as _;
184        let txn = inner.db.begin().await?;
185        ensure_user_id(owner_id, &txn).await?;
186        ensure_object_id(ObjectType::Database, pb_source.database_id, &txn).await?;
187        ensure_object_id(ObjectType::Schema, pb_source.schema_id, &txn).await?;
188        check_relation_name_duplicate(
189            &pb_source.name,
190            pb_source.database_id,
191            pb_source.schema_id,
192            &txn,
193        )
194        .await?;
195
196        // handle secret ref
197        let secret_ids = get_referred_secret_ids_from_source(&pb_source)?;
198        let connection_ids = get_referred_connection_ids_from_source(&pb_source);
199
200        let source_obj = Self::create_object(
201            &txn,
202            ObjectType::Source,
203            owner_id,
204            Some(pb_source.database_id),
205            Some(pb_source.schema_id),
206        )
207        .await?;
208        let source_id = source_obj.oid.as_source_id();
209        pb_source.id = source_id;
210        let source: source::ActiveModel = pb_source.clone().into();
211        Source::insert(source).exec(&txn).await?;
212
213        // add secret and connection dependency
214        let dep_relation_ids = secret_ids
215            .iter()
216            .copied()
217            .map_into()
218            .chain(connection_ids.iter().copied().map_into());
219        if !secret_ids.is_empty() || !connection_ids.is_empty() {
220            ObjectDependency::insert_many(dep_relation_ids.map(|id| {
221                object_dependency::ActiveModel {
222                    oid: Set(id),
223                    used_by: Set(source_id.as_object_id()),
224                    ..Default::default()
225                }
226            }))
227            .exec(&txn)
228            .await?;
229        }
230
231        let mut job_notifications = vec![];
232        let mut updated_user_info = vec![];
233        // check if it belongs to iceberg table
234        if pb_source.name.starts_with(ICEBERG_SOURCE_PREFIX) {
235            // 1. finish iceberg table job.
236            let table_name = pb_source.name.trim_start_matches(ICEBERG_SOURCE_PREFIX);
237            let table_id = Table::find()
238                .select_only()
239                .column(table::Column::TableId)
240                .join(JoinType::InnerJoin, table::Relation::Object1.def())
241                .filter(
242                    object::Column::DatabaseId
243                        .eq(pb_source.database_id)
244                        .and(object::Column::SchemaId.eq(pb_source.schema_id))
245                        .and(table::Column::Name.eq(table_name)),
246                )
247                .into_tuple::<TableId>()
248                .one(&txn)
249                .await?
250                .ok_or(MetaError::from(anyhow!("table {} not found", table_name)))?;
251            let table_notifications =
252                Self::finish_streaming_job_inner(&txn, table_id.as_job_id()).await?;
253            job_notifications.push((table_id.as_job_id(), table_notifications));
254
255            // 2. finish iceberg sink job.
256            let sink_name = format!("{}{}", ICEBERG_SINK_PREFIX, table_name);
257            let sink_id = Sink::find()
258                .select_only()
259                .column(sink::Column::SinkId)
260                .join(JoinType::InnerJoin, sink::Relation::Object.def())
261                .filter(
262                    object::Column::DatabaseId
263                        .eq(pb_source.database_id)
264                        .and(object::Column::SchemaId.eq(pb_source.schema_id))
265                        .and(sink::Column::Name.eq(&sink_name)),
266                )
267                .into_tuple::<SinkId>()
268                .one(&txn)
269                .await?
270                .ok_or(MetaError::from(anyhow!("sink {} not found", sink_name)))?;
271            let sink_job_id = sink_id.as_job_id();
272            let sink_notifications = Self::finish_streaming_job_inner(&txn, sink_job_id).await?;
273            job_notifications.push((sink_job_id, sink_notifications));
274        } else {
275            updated_user_info = grant_default_privileges_automatically(&txn, source_id).await?;
276        }
277
278        txn.commit().await?;
279
280        for (job_id, (op, objects, user_info)) in job_notifications {
281            let mut version = self
282                .notify_frontend(op, NotificationInfo::ObjectGroup(PbObjectGroup { objects }))
283                .await;
284            if !user_info.is_empty() {
285                version = self.notify_users_update(user_info).await;
286            }
287            inner
288                .creating_table_finish_notifier
289                .values_mut()
290                .for_each(|creating_tables| {
291                    if let Some(txs) = creating_tables.remove(&job_id) {
292                        for tx in txs {
293                            let _ = tx.send(Ok(version));
294                        }
295                    }
296                });
297        }
298
299        let mut version = self
300            .notify_frontend_relation_info(
301                NotificationOperation::Add,
302                PbObjectInfo::Source(pb_source),
303            )
304            .await;
305
306        // notify default privileges for source
307        if !updated_user_info.is_empty() {
308            version = self.notify_users_update(updated_user_info).await;
309        }
310
311        Ok((source_id, version))
312    }
313
314    pub async fn create_function(
315        &self,
316        mut pb_function: PbFunction,
317    ) -> MetaResult<NotificationVersion> {
318        let inner = self.inner.write().await;
319        let owner_id = pb_function.owner as _;
320        let txn = inner.db.begin().await?;
321        ensure_user_id(owner_id, &txn).await?;
322        ensure_object_id(ObjectType::Database, pb_function.database_id, &txn).await?;
323        ensure_object_id(ObjectType::Schema, pb_function.schema_id, &txn).await?;
324        check_function_signature_duplicate(&pb_function, &txn).await?;
325
326        let function_obj = Self::create_object(
327            &txn,
328            ObjectType::Function,
329            owner_id,
330            Some(pb_function.database_id),
331            Some(pb_function.schema_id),
332        )
333        .await?;
334        pb_function.id = function_obj.oid.as_function_id();
335        pb_function.created_at_epoch = Some(
336            Epoch::from_unix_millis(datetime_to_timestamp_millis(function_obj.created_at) as _).0,
337        );
338        pb_function.created_at_cluster_version = function_obj.created_at_cluster_version;
339        let function: function::ActiveModel = pb_function.clone().into();
340        Function::insert(function).exec(&txn).await?;
341
342        let updated_user_info =
343            grant_default_privileges_automatically(&txn, function_obj.oid).await?;
344
345        txn.commit().await?;
346
347        let mut version = self
348            .notify_frontend(
349                NotificationOperation::Add,
350                NotificationInfo::Function(pb_function),
351            )
352            .await;
353
354        // notify default privileges for functions
355        if !updated_user_info.is_empty() {
356            version = self.notify_users_update(updated_user_info).await;
357        }
358
359        Ok(version)
360    }
361
362    pub async fn create_connection(
363        &self,
364        mut pb_connection: PbConnection,
365    ) -> MetaResult<NotificationVersion> {
366        let inner = self.inner.write().await;
367        let owner_id = pb_connection.owner as _;
368        let txn = inner.db.begin().await?;
369        ensure_user_id(owner_id, &txn).await?;
370        ensure_object_id(ObjectType::Database, pb_connection.database_id, &txn).await?;
371        ensure_object_id(ObjectType::Schema, pb_connection.schema_id, &txn).await?;
372        check_connection_name_duplicate(&pb_connection, &txn).await?;
373
374        let mut dep_secrets: HashSet<SecretId> = HashSet::new();
375        if let Some(ConnectionInfo::ConnectionParams(params)) = &pb_connection.info {
376            dep_secrets.extend(
377                params
378                    .secret_refs
379                    .values()
380                    .map(|secret_ref| secret_ref.secret_id),
381            );
382        }
383
384        let conn_obj = Self::create_object(
385            &txn,
386            ObjectType::Connection,
387            owner_id,
388            Some(pb_connection.database_id),
389            Some(pb_connection.schema_id),
390        )
391        .await?;
392        pb_connection.id = conn_obj.oid.as_connection_id();
393        let connection: connection::ActiveModel = pb_connection.clone().into();
394        Connection::insert(connection).exec(&txn).await?;
395
396        for secret_id in dep_secrets {
397            ObjectDependency::insert(object_dependency::ActiveModel {
398                oid: Set(secret_id.as_object_id()),
399                used_by: Set(conn_obj.oid),
400                ..Default::default()
401            })
402            .exec(&txn)
403            .await?;
404        }
405
406        let updated_user_info = grant_default_privileges_automatically(&txn, conn_obj.oid).await?;
407
408        txn.commit().await?;
409
410        {
411            // call meta telemetry here to report the connection creation
412            report_event(
413                PbTelemetryEventStage::Unspecified,
414                "connection_create",
415                pb_connection.get_id().as_raw_id() as i64,
416                {
417                    pb_connection.info.as_ref().and_then(|info| match info {
418                        ConnectionInfo::ConnectionParams(params) => {
419                            Some(params.connection_type().as_str_name().to_owned())
420                        }
421                        _ => None,
422                    })
423                },
424                None,
425                None,
426            );
427        }
428
429        let mut version = self
430            .notify_frontend(
431                NotificationOperation::Add,
432                NotificationInfo::Connection(pb_connection),
433            )
434            .await;
435
436        // notify default privileges for connections
437        if !updated_user_info.is_empty() {
438            version = self.notify_users_update(updated_user_info).await;
439        }
440
441        Ok(version)
442    }
443
444    pub async fn create_secret(
445        &self,
446        mut pb_secret: PbSecret,
447        secret_plain_payload: Vec<u8>,
448    ) -> MetaResult<NotificationVersion> {
449        let inner = self.inner.write().await;
450        let owner_id = pb_secret.owner as _;
451        let txn = inner.db.begin().await?;
452        ensure_user_id(owner_id, &txn).await?;
453        ensure_object_id(ObjectType::Database, pb_secret.database_id, &txn).await?;
454        ensure_object_id(ObjectType::Schema, pb_secret.schema_id, &txn).await?;
455        check_secret_name_duplicate(&pb_secret, &txn).await?;
456
457        let secret_obj = Self::create_object(
458            &txn,
459            ObjectType::Secret,
460            owner_id,
461            Some(pb_secret.database_id),
462            Some(pb_secret.schema_id),
463        )
464        .await?;
465        pb_secret.id = secret_obj.oid.as_secret_id();
466        let secret: secret::ActiveModel = pb_secret.clone().into();
467        Secret::insert(secret).exec(&txn).await?;
468
469        let updated_user_info =
470            grant_default_privileges_automatically(&txn, secret_obj.oid).await?;
471
472        txn.commit().await?;
473
474        // Notify the compute and frontend node plain secret
475        let mut secret_plain = pb_secret;
476        secret_plain.value.clone_from(&secret_plain_payload);
477
478        LocalSecretManager::global().add_secret(secret_plain.id, secret_plain_payload);
479        self.env
480            .notification_manager()
481            .notify_compute_without_version(Operation::Add, Info::Secret(secret_plain.clone()));
482
483        let mut version = self
484            .notify_frontend(
485                NotificationOperation::Add,
486                NotificationInfo::Secret(secret_plain),
487            )
488            .await;
489
490        // notify default privileges for secrets
491        if !updated_user_info.is_empty() {
492            version = self.notify_users_update(updated_user_info).await;
493        }
494
495        Ok(version)
496    }
497
498    pub async fn create_view(
499        &self,
500        mut pb_view: PbView,
501        dependencies: HashSet<ObjectId>,
502    ) -> MetaResult<NotificationVersion> {
503        let inner = self.inner.write().await;
504        let owner_id = pb_view.owner as _;
505        let txn = inner.db.begin().await?;
506        ensure_user_id(owner_id, &txn).await?;
507        ensure_object_id(ObjectType::Database, pb_view.database_id, &txn).await?;
508        ensure_object_id(ObjectType::Schema, pb_view.schema_id, &txn).await?;
509        check_relation_name_duplicate(&pb_view.name, pb_view.database_id, pb_view.schema_id, &txn)
510            .await?;
511        ensure_object_id(ObjectType::Schema, pb_view.schema_id, &txn).await?;
512        check_relation_name_duplicate(&pb_view.name, pb_view.database_id, pb_view.schema_id, &txn)
513            .await?;
514
515        let view_obj = Self::create_object(
516            &txn,
517            ObjectType::View,
518            owner_id,
519            Some(pb_view.database_id),
520            Some(pb_view.schema_id),
521        )
522        .await?;
523        pb_view.id = view_obj.oid.as_view_id();
524        pb_view.created_at_epoch =
525            Some(Epoch::from_unix_millis(datetime_to_timestamp_millis(view_obj.created_at) as _).0);
526        pb_view.created_at_cluster_version = view_obj.created_at_cluster_version;
527
528        let view: view::ActiveModel = pb_view.clone().into();
529        View::insert(view).exec(&txn).await?;
530
531        for obj_id in dependencies {
532            ObjectDependency::insert(object_dependency::ActiveModel {
533                oid: Set(obj_id),
534                used_by: Set(view_obj.oid),
535                ..Default::default()
536            })
537            .exec(&txn)
538            .await?;
539        }
540
541        let updated_user_info = grant_default_privileges_automatically(&txn, view_obj.oid).await?;
542
543        txn.commit().await?;
544
545        let mut version = self
546            .notify_frontend_relation_info(NotificationOperation::Add, PbObjectInfo::View(pb_view))
547            .await;
548
549        // notify default privileges for views
550        if !updated_user_info.is_empty() {
551            version = self.notify_users_update(updated_user_info).await;
552        }
553
554        Ok(version)
555    }
556
557    pub async fn validate_cross_db_snapshot_backfill(
558        &self,
559        cross_db_snapshot_backfill_info: &SnapshotBackfillInfo,
560    ) -> MetaResult<()> {
561        if cross_db_snapshot_backfill_info
562            .upstream_mv_table_id_to_backfill_epoch
563            .is_empty()
564        {
565            return Ok(());
566        }
567
568        let inner = self.inner.read().await;
569        let table_ids = cross_db_snapshot_backfill_info
570            .upstream_mv_table_id_to_backfill_epoch
571            .keys()
572            .copied()
573            .map_into()
574            .collect_vec();
575        let cnt = Subscription::find()
576            .select_only()
577            .column(subscription::Column::DependentTableId)
578            .distinct()
579            .filter(subscription::Column::DependentTableId.is_in::<TableId, _>(table_ids))
580            .count(&inner.db)
581            .await? as usize;
582
583        if cnt
584            < cross_db_snapshot_backfill_info
585                .upstream_mv_table_id_to_backfill_epoch
586                .keys()
587                .count()
588        {
589            return Err(MetaError::permission_denied(
590                "Some upstream tables are not subscribed".to_owned(),
591            ));
592        }
593
594        Ok(())
595    }
596}