1use risingwave_common::cast::datetime_to_timestamp_millis;
16use risingwave_common::catalog::{ICEBERG_SINK_PREFIX, ICEBERG_SOURCE_PREFIX};
17use risingwave_common::system_param::{OverrideValidate, Validate};
18use risingwave_common::util::epoch::Epoch;
19use risingwave_pb::common::PbObjectType;
20use risingwave_pb::meta::PbObjectDependency;
21
22use super::*;
23use crate::barrier::SnapshotBackfillInfo;
24
25impl CatalogController {
26 pub(crate) async fn create_object(
27 txn: &DatabaseTransaction,
28 obj_type: ObjectType,
29 owner_id: UserId,
30 database_id: Option<DatabaseId>,
31 schema_id: Option<SchemaId>,
32 ) -> MetaResult<object::Model> {
33 let active_db = object::ActiveModel {
34 oid: Default::default(),
35 obj_type: Set(obj_type),
36 owner_id: Set(owner_id),
37 schema_id: Set(schema_id),
38 database_id: Set(database_id),
39 initialized_at: Default::default(),
40 created_at: Default::default(),
41 initialized_at_cluster_version: Set(Some(current_cluster_version())),
42 created_at_cluster_version: Set(Some(current_cluster_version())),
43 };
44 Ok(active_db.insert(txn).await?)
45 }
46
47 pub async fn create_database(
48 &self,
49 db: PbDatabase,
50 ) -> MetaResult<(NotificationVersion, risingwave_meta_model::database::Model)> {
51 if let Some(ref interval) = db.barrier_interval_ms {
53 OverrideValidate::barrier_interval_ms(interval).map_err(|e| anyhow::anyhow!(e))?;
54 }
55 if let Some(ref frequency) = db.checkpoint_frequency {
56 OverrideValidate::checkpoint_frequency(frequency).map_err(|e| anyhow::anyhow!(e))?;
57 }
58
59 let inner = self.inner.write().await;
60 let owner_id = db.owner as _;
61 let txn = inner.db.begin().await?;
62 ensure_user_id(owner_id, &txn).await?;
63 check_database_name_duplicate(&db.name, &txn).await?;
64
65 let db_obj = Self::create_object(&txn, ObjectType::Database, owner_id, None, None).await?;
66 let mut db: database::ActiveModel = db.into();
67 db.database_id = Set(db_obj.oid.as_database_id());
68 let db = db.insert(&txn).await?;
69
70 let mut schemas = vec![];
71 for schema_name in iter::once(DEFAULT_SCHEMA_NAME).chain(SYSTEM_SCHEMAS) {
72 let schema_obj = Self::create_object(
73 &txn,
74 ObjectType::Schema,
75 owner_id,
76 Some(db_obj.oid.as_database_id()),
77 None,
78 )
79 .await?;
80 let schema = schema::ActiveModel {
81 schema_id: Set(schema_obj.oid.as_schema_id()),
82 name: Set(schema_name.into()),
83 };
84 let schema = schema.insert(&txn).await?;
85 schemas.push(ObjectModel(schema, schema_obj, None).into());
86 }
87 txn.commit().await?;
88
89 let mut version = self
90 .notify_frontend(
91 NotificationOperation::Add,
92 NotificationInfo::Database(ObjectModel(db.clone(), db_obj, None).into()),
93 )
94 .await;
95 for schema in schemas {
96 version = self
97 .notify_frontend(NotificationOperation::Add, NotificationInfo::Schema(schema))
98 .await;
99 }
100
101 Ok((version, db))
102 }
103
104 pub async fn create_schema(&self, schema: PbSchema) -> MetaResult<NotificationVersion> {
105 let inner = self.inner.write().await;
106 let owner_id = schema.owner as _;
107 let txn = inner.db.begin().await?;
108 ensure_user_id(owner_id, &txn).await?;
109 ensure_object_id(ObjectType::Database, schema.database_id, &txn).await?;
110 check_schema_name_duplicate(&schema.name, schema.database_id, &txn).await?;
111
112 let schema_obj = Self::create_object(
113 &txn,
114 ObjectType::Schema,
115 owner_id,
116 Some(schema.database_id),
117 None,
118 )
119 .await?;
120 let mut schema: schema::ActiveModel = schema.into();
121 schema.schema_id = Set(schema_obj.oid.as_schema_id());
122 let schema = schema.insert(&txn).await?;
123
124 let updated_user_info =
125 grant_default_privileges_automatically(&txn, schema_obj.oid).await?;
126
127 txn.commit().await?;
128
129 let mut version = self
130 .notify_frontend(
131 NotificationOperation::Add,
132 NotificationInfo::Schema(ObjectModel(schema, schema_obj, None).into()),
133 )
134 .await;
135
136 if !updated_user_info.is_empty() {
138 version = self.notify_users_update(updated_user_info).await;
139 }
140
141 Ok(version)
142 }
143
144 pub async fn create_subscription_catalog(
145 &self,
146 pb_subscription: &mut PbSubscription,
147 ) -> MetaResult<()> {
148 let inner = self.inner.write().await;
149 let txn = inner.db.begin().await?;
150
151 ensure_user_id(pb_subscription.owner as _, &txn).await?;
152 ensure_object_id(ObjectType::Database, pb_subscription.database_id, &txn).await?;
153 ensure_object_id(ObjectType::Schema, pb_subscription.schema_id, &txn).await?;
154 check_subscription_name_duplicate(pb_subscription, &txn).await?;
155
156 let obj = Self::create_object(
157 &txn,
158 ObjectType::Subscription,
159 pb_subscription.owner as _,
160 Some(pb_subscription.database_id),
161 Some(pb_subscription.schema_id),
162 )
163 .await?;
164 pb_subscription.id = obj.oid.as_subscription_id();
165 let subscription: subscription::ActiveModel = pb_subscription.clone().into();
166 Subscription::insert(subscription).exec(&txn).await?;
167
168 ObjectDependency::insert(object_dependency::ActiveModel {
170 oid: Set(pb_subscription.dependent_table_id.into()),
171 used_by: Set(pb_subscription.id.into()),
172 ..Default::default()
173 })
174 .exec(&txn)
175 .await?;
176 txn.commit().await?;
177 Ok(())
178 }
179
180 pub async fn create_source(
181 &self,
182 mut pb_source: PbSource,
183 ) -> MetaResult<(SourceId, NotificationVersion)> {
184 let mut inner = self.inner.write().await;
185 let owner_id = pb_source.owner as _;
186 let txn = inner.db.begin().await?;
187 ensure_user_id(owner_id, &txn).await?;
188 ensure_object_id(ObjectType::Database, pb_source.database_id, &txn).await?;
189 ensure_object_id(ObjectType::Schema, pb_source.schema_id, &txn).await?;
190 check_relation_name_duplicate(
191 &pb_source.name,
192 pb_source.database_id,
193 pb_source.schema_id,
194 &txn,
195 )
196 .await?;
197
198 let secret_ids = get_referred_secret_ids_from_source(&pb_source)?;
200 let connection_ids = get_referred_connection_ids_from_source(&pb_source);
201
202 let source_obj = Self::create_object(
203 &txn,
204 ObjectType::Source,
205 owner_id,
206 Some(pb_source.database_id),
207 Some(pb_source.schema_id),
208 )
209 .await?;
210 let source_id = source_obj.oid.as_source_id();
211 pb_source.id = source_id;
212 let source: source::ActiveModel = pb_source.clone().into();
213 Source::insert(source).exec(&txn).await?;
214
215 let dep_relation_ids = secret_ids
217 .iter()
218 .copied()
219 .map_into()
220 .chain(connection_ids.iter().copied().map_into());
221 let dependencies = if !secret_ids.is_empty() || !connection_ids.is_empty() {
222 ObjectDependency::insert_many(dep_relation_ids.map(|id| {
223 object_dependency::ActiveModel {
224 oid: Set(id),
225 used_by: Set(source_id.as_object_id()),
226 ..Default::default()
227 }
228 }))
229 .exec(&txn)
230 .await?;
231 secret_ids
232 .iter()
233 .map(|id| PbObjectDependency {
234 object_id: source_id.as_object_id(),
235 referenced_object_id: id.as_object_id(),
236 referenced_object_type: PbObjectType::Secret as _,
237 })
238 .chain(connection_ids.iter().map(|id| PbObjectDependency {
239 object_id: source_id.as_object_id(),
240 referenced_object_id: id.as_object_id(),
241 referenced_object_type: PbObjectType::Connection as _,
242 }))
243 .collect_vec()
244 } else {
245 vec![]
246 };
247
248 let mut job_notifications = vec![];
249 let mut updated_user_info = vec![];
250 if pb_source.name.starts_with(ICEBERG_SOURCE_PREFIX) {
252 let table_name = pb_source.name.trim_start_matches(ICEBERG_SOURCE_PREFIX);
254 let table_id = Table::find()
255 .select_only()
256 .column(table::Column::TableId)
257 .join(JoinType::InnerJoin, table::Relation::Object1.def())
258 .filter(
259 object::Column::DatabaseId
260 .eq(pb_source.database_id)
261 .and(object::Column::SchemaId.eq(pb_source.schema_id))
262 .and(table::Column::Name.eq(table_name)),
263 )
264 .into_tuple::<TableId>()
265 .one(&txn)
266 .await?
267 .ok_or(MetaError::from(anyhow!("table {} not found", table_name)))?;
268 let table_notifications = self
269 .finish_streaming_job_inner(&txn, table_id.as_job_id())
270 .await?;
271 job_notifications.push((table_id.as_job_id(), table_notifications));
272
273 let sink_name = format!("{}{}", ICEBERG_SINK_PREFIX, table_name);
275 let sink_id = Sink::find()
276 .select_only()
277 .column(sink::Column::SinkId)
278 .join(JoinType::InnerJoin, sink::Relation::Object.def())
279 .filter(
280 object::Column::DatabaseId
281 .eq(pb_source.database_id)
282 .and(object::Column::SchemaId.eq(pb_source.schema_id))
283 .and(sink::Column::Name.eq(&sink_name)),
284 )
285 .into_tuple::<SinkId>()
286 .one(&txn)
287 .await?
288 .ok_or(MetaError::from(anyhow!("sink {} not found", sink_name)))?;
289 let sink_job_id = sink_id.as_job_id();
290 let sink_notifications = self.finish_streaming_job_inner(&txn, sink_job_id).await?;
291 job_notifications.push((sink_job_id, sink_notifications));
292 } else {
293 updated_user_info = grant_default_privileges_automatically(&txn, source_id).await?;
294 }
295
296 txn.commit().await?;
297
298 for (job_id, (op, objects, user_info, dependencies)) in job_notifications {
299 let mut version = self
300 .notify_frontend(
301 op,
302 NotificationInfo::ObjectGroup(PbObjectGroup {
303 objects,
304 dependencies,
305 }),
306 )
307 .await;
308 if !user_info.is_empty() {
309 version = self.notify_users_update(user_info).await;
310 }
311 inner
312 .creating_table_finish_notifier
313 .values_mut()
314 .for_each(|creating_tables| {
315 if let Some(txs) = creating_tables.remove(&job_id) {
316 for tx in txs {
317 let _ = tx.send(Ok(version));
318 }
319 }
320 });
321 }
322
323 let mut version = self
324 .notify_frontend(
325 NotificationOperation::Add,
326 NotificationInfo::ObjectGroup(PbObjectGroup {
327 objects: vec![PbObject {
328 object_info: Some(PbObjectInfo::Source(pb_source)),
329 }],
330 dependencies,
331 }),
332 )
333 .await;
334
335 if !updated_user_info.is_empty() {
337 version = self.notify_users_update(updated_user_info).await;
338 }
339
340 Ok((source_id, version))
341 }
342
343 pub async fn create_function(
344 &self,
345 mut pb_function: PbFunction,
346 ) -> MetaResult<NotificationVersion> {
347 let inner = self.inner.write().await;
348 let owner_id = pb_function.owner as _;
349 let txn = inner.db.begin().await?;
350 ensure_user_id(owner_id, &txn).await?;
351 ensure_object_id(ObjectType::Database, pb_function.database_id, &txn).await?;
352 ensure_object_id(ObjectType::Schema, pb_function.schema_id, &txn).await?;
353 check_function_signature_duplicate(&pb_function, &txn).await?;
354
355 let function_obj = Self::create_object(
356 &txn,
357 ObjectType::Function,
358 owner_id,
359 Some(pb_function.database_id),
360 Some(pb_function.schema_id),
361 )
362 .await?;
363 pb_function.id = function_obj.oid.as_function_id();
364 pb_function.created_at_epoch = Some(
365 Epoch::from_unix_millis(datetime_to_timestamp_millis(function_obj.created_at) as _).0,
366 );
367 pb_function.created_at_cluster_version = function_obj.created_at_cluster_version;
368 let function: function::ActiveModel = pb_function.clone().into();
369 Function::insert(function).exec(&txn).await?;
370
371 let updated_user_info =
372 grant_default_privileges_automatically(&txn, function_obj.oid).await?;
373
374 txn.commit().await?;
375
376 let mut version = self
377 .notify_frontend(
378 NotificationOperation::Add,
379 NotificationInfo::Function(pb_function),
380 )
381 .await;
382
383 if !updated_user_info.is_empty() {
385 version = self.notify_users_update(updated_user_info).await;
386 }
387
388 Ok(version)
389 }
390
391 pub async fn create_connection(
392 &self,
393 mut pb_connection: PbConnection,
394 ) -> MetaResult<NotificationVersion> {
395 let inner = self.inner.write().await;
396 let owner_id = pb_connection.owner as _;
397 let txn = inner.db.begin().await?;
398 ensure_user_id(owner_id, &txn).await?;
399 ensure_object_id(ObjectType::Database, pb_connection.database_id, &txn).await?;
400 ensure_object_id(ObjectType::Schema, pb_connection.schema_id, &txn).await?;
401 check_connection_name_duplicate(&pb_connection, &txn).await?;
402
403 let mut dep_secrets: HashSet<SecretId> = HashSet::new();
404 if let Some(ConnectionInfo::ConnectionParams(params)) = &pb_connection.info {
405 dep_secrets.extend(
406 params
407 .secret_refs
408 .values()
409 .map(|secret_ref| secret_ref.secret_id),
410 );
411 }
412
413 let conn_obj = Self::create_object(
414 &txn,
415 ObjectType::Connection,
416 owner_id,
417 Some(pb_connection.database_id),
418 Some(pb_connection.schema_id),
419 )
420 .await?;
421 pb_connection.id = conn_obj.oid.as_connection_id();
422 let connection: connection::ActiveModel = pb_connection.clone().into();
423 Connection::insert(connection).exec(&txn).await?;
424
425 for secret_id in &dep_secrets {
426 ObjectDependency::insert(object_dependency::ActiveModel {
427 oid: Set(secret_id.as_object_id()),
428 used_by: Set(conn_obj.oid),
429 ..Default::default()
430 })
431 .exec(&txn)
432 .await?;
433 }
434
435 let updated_user_info = grant_default_privileges_automatically(&txn, conn_obj.oid).await?;
436
437 txn.commit().await?;
438
439 {
440 report_event(
442 PbTelemetryEventStage::Unspecified,
443 "connection_create",
444 pb_connection.get_id().as_raw_id() as i64,
445 {
446 pb_connection.info.as_ref().and_then(|info| match info {
447 ConnectionInfo::ConnectionParams(params) => {
448 Some(params.connection_type().as_str_name().to_owned())
449 }
450 _ => None,
451 })
452 },
453 None,
454 None,
455 );
456 }
457
458 let mut version = self
459 .notify_frontend(
460 NotificationOperation::Add,
461 NotificationInfo::ObjectGroup(PbObjectGroup {
462 objects: vec![PbObject {
463 object_info: Some(PbObjectInfo::Connection(pb_connection)),
464 }],
465 dependencies: dep_secrets
466 .iter()
467 .map(|secret_id| PbObjectDependency {
468 object_id: conn_obj.oid,
469 referenced_object_id: secret_id.as_object_id(),
470 referenced_object_type: PbObjectType::Secret as _,
471 })
472 .collect(),
473 }),
474 )
475 .await;
476
477 if !updated_user_info.is_empty() {
479 version = self.notify_users_update(updated_user_info).await;
480 }
481
482 Ok(version)
483 }
484
485 pub async fn create_secret(
486 &self,
487 mut pb_secret: PbSecret,
488 secret_plain_payload: Vec<u8>,
489 ) -> MetaResult<NotificationVersion> {
490 let inner = self.inner.write().await;
491 let owner_id = pb_secret.owner as _;
492 let txn = inner.db.begin().await?;
493 ensure_user_id(owner_id, &txn).await?;
494 ensure_object_id(ObjectType::Database, pb_secret.database_id, &txn).await?;
495 ensure_object_id(ObjectType::Schema, pb_secret.schema_id, &txn).await?;
496 check_secret_name_duplicate(&pb_secret, &txn).await?;
497
498 let secret_obj = Self::create_object(
499 &txn,
500 ObjectType::Secret,
501 owner_id,
502 Some(pb_secret.database_id),
503 Some(pb_secret.schema_id),
504 )
505 .await?;
506 pb_secret.id = secret_obj.oid.as_secret_id();
507 let secret: secret::ActiveModel = pb_secret.clone().into();
508 Secret::insert(secret).exec(&txn).await?;
509
510 let updated_user_info =
511 grant_default_privileges_automatically(&txn, secret_obj.oid).await?;
512
513 txn.commit().await?;
514
515 let mut secret_plain = pb_secret;
517 secret_plain.value.clone_from(&secret_plain_payload);
518
519 LocalSecretManager::global().add_secret(secret_plain.id, secret_plain_payload);
520 self.env
521 .notification_manager()
522 .notify_compute_without_version(Operation::Add, Info::Secret(secret_plain.clone()));
523
524 let mut version = self
525 .notify_frontend(
526 NotificationOperation::Add,
527 NotificationInfo::Secret(secret_plain),
528 )
529 .await;
530
531 if !updated_user_info.is_empty() {
533 version = self.notify_users_update(updated_user_info).await;
534 }
535
536 Ok(version)
537 }
538
539 pub async fn create_view(
540 &self,
541 mut pb_view: PbView,
542 dependencies: HashSet<ObjectId>,
543 ) -> MetaResult<NotificationVersion> {
544 let inner = self.inner.write().await;
545 let owner_id = pb_view.owner as _;
546 let txn = inner.db.begin().await?;
547 ensure_user_id(owner_id, &txn).await?;
548 ensure_object_id(ObjectType::Database, pb_view.database_id, &txn).await?;
549 ensure_object_id(ObjectType::Schema, pb_view.schema_id, &txn).await?;
550 check_relation_name_duplicate(&pb_view.name, pb_view.database_id, pb_view.schema_id, &txn)
551 .await?;
552 ensure_object_id(ObjectType::Schema, pb_view.schema_id, &txn).await?;
553 check_relation_name_duplicate(&pb_view.name, pb_view.database_id, pb_view.schema_id, &txn)
554 .await?;
555
556 let view_obj = Self::create_object(
557 &txn,
558 ObjectType::View,
559 owner_id,
560 Some(pb_view.database_id),
561 Some(pb_view.schema_id),
562 )
563 .await?;
564 pb_view.id = view_obj.oid.as_view_id();
565 pb_view.created_at_epoch =
566 Some(Epoch::from_unix_millis(datetime_to_timestamp_millis(view_obj.created_at) as _).0);
567 pb_view.created_at_cluster_version = view_obj.created_at_cluster_version;
568
569 let view: view::ActiveModel = pb_view.clone().into();
570 View::insert(view).exec(&txn).await?;
571
572 let dependencies = if !dependencies.is_empty() {
573 ObjectDependency::insert_many(dependencies.into_iter().map(|obj_id| {
574 object_dependency::ActiveModel {
575 oid: Set(obj_id),
576 used_by: Set(view_obj.oid),
577 ..Default::default()
578 }
579 }))
580 .exec(&txn)
581 .await?;
582 list_object_dependencies_by_object_id(&txn, view_obj.oid).await?
583 } else {
584 vec![]
585 };
586
587 let updated_user_info = grant_default_privileges_automatically(&txn, view_obj.oid).await?;
588
589 txn.commit().await?;
590 let mut version = self
591 .notify_frontend(
592 NotificationOperation::Add,
593 NotificationInfo::ObjectGroup(PbObjectGroup {
594 objects: vec![PbObject {
595 object_info: Some(PbObjectInfo::View(pb_view)),
596 }],
597 dependencies,
598 }),
599 )
600 .await;
601
602 if !updated_user_info.is_empty() {
604 version = self.notify_users_update(updated_user_info).await;
605 }
606
607 Ok(version)
608 }
609
610 pub async fn validate_cross_db_snapshot_backfill(
611 &self,
612 cross_db_snapshot_backfill_info: &SnapshotBackfillInfo,
613 ) -> MetaResult<()> {
614 if cross_db_snapshot_backfill_info
615 .upstream_mv_table_id_to_backfill_epoch
616 .is_empty()
617 {
618 return Ok(());
619 }
620
621 let inner = self.inner.read().await;
622 let table_ids = cross_db_snapshot_backfill_info
623 .upstream_mv_table_id_to_backfill_epoch
624 .keys()
625 .copied()
626 .map_into()
627 .collect_vec();
628 let cnt = Subscription::find()
629 .select_only()
630 .column(subscription::Column::DependentTableId)
631 .distinct()
632 .filter(subscription::Column::DependentTableId.is_in::<TableId, _>(table_ids))
633 .count(&inner.db)
634 .await? as usize;
635
636 if cnt
637 < cross_db_snapshot_backfill_info
638 .upstream_mv_table_id_to_backfill_epoch
639 .keys()
640 .count()
641 {
642 return Err(MetaError::permission_denied(
643 "Some upstream tables are not subscribed".to_owned(),
644 ));
645 }
646
647 Ok(())
648 }
649}