1use risingwave_common::catalog::{ICEBERG_SINK_PREFIX, ICEBERG_SOURCE_PREFIX};
16use risingwave_common::system_param::{OverrideValidate, Validate};
17use risingwave_common::util::epoch::Epoch;
18
19use super::*;
20use crate::barrier::SnapshotBackfillInfo;
21
22impl CatalogController {
23 pub(crate) async fn create_object(
24 txn: &DatabaseTransaction,
25 obj_type: ObjectType,
26 owner_id: UserId,
27 database_id: Option<DatabaseId>,
28 schema_id: Option<SchemaId>,
29 ) -> MetaResult<object::Model> {
30 let active_db = object::ActiveModel {
31 oid: Default::default(),
32 obj_type: Set(obj_type),
33 owner_id: Set(owner_id),
34 schema_id: Set(schema_id),
35 database_id: Set(database_id),
36 initialized_at: Default::default(),
37 created_at: Default::default(),
38 initialized_at_cluster_version: Set(Some(current_cluster_version())),
39 created_at_cluster_version: Set(Some(current_cluster_version())),
40 };
41 Ok(active_db.insert(txn).await?)
42 }
43
44 pub async fn create_database(
45 &self,
46 db: PbDatabase,
47 ) -> MetaResult<(NotificationVersion, risingwave_meta_model::database::Model)> {
48 if let Some(ref interval) = db.barrier_interval_ms {
50 OverrideValidate::barrier_interval_ms(interval).map_err(|e| anyhow::anyhow!(e))?;
51 }
52 if let Some(ref frequency) = db.checkpoint_frequency {
53 OverrideValidate::checkpoint_frequency(frequency).map_err(|e| anyhow::anyhow!(e))?;
54 }
55
56 let inner = self.inner.write().await;
57 let owner_id = db.owner as _;
58 let txn = inner.db.begin().await?;
59 ensure_user_id(owner_id, &txn).await?;
60 check_database_name_duplicate(&db.name, &txn).await?;
61
62 let db_obj = Self::create_object(&txn, ObjectType::Database, owner_id, None, None).await?;
63 let mut db: database::ActiveModel = db.into();
64 db.database_id = Set(db_obj.oid.as_database_id());
65 let db = db.insert(&txn).await?;
66
67 let mut schemas = vec![];
68 for schema_name in iter::once(DEFAULT_SCHEMA_NAME).chain(SYSTEM_SCHEMAS) {
69 let schema_obj = Self::create_object(
70 &txn,
71 ObjectType::Schema,
72 owner_id,
73 Some(db_obj.oid.as_database_id()),
74 None,
75 )
76 .await?;
77 let schema = schema::ActiveModel {
78 schema_id: Set(schema_obj.oid.as_schema_id()),
79 name: Set(schema_name.into()),
80 };
81 let schema = schema.insert(&txn).await?;
82 schemas.push(ObjectModel(schema, schema_obj).into());
83 }
84 txn.commit().await?;
85
86 let mut version = self
87 .notify_frontend(
88 NotificationOperation::Add,
89 NotificationInfo::Database(ObjectModel(db.clone(), db_obj).into()),
90 )
91 .await;
92 for schema in schemas {
93 version = self
94 .notify_frontend(NotificationOperation::Add, NotificationInfo::Schema(schema))
95 .await;
96 }
97
98 Ok((version, db))
99 }
100
101 pub async fn create_schema(&self, schema: PbSchema) -> MetaResult<NotificationVersion> {
102 let inner = self.inner.write().await;
103 let owner_id = schema.owner as _;
104 let txn = inner.db.begin().await?;
105 ensure_user_id(owner_id, &txn).await?;
106 ensure_object_id(ObjectType::Database, schema.database_id, &txn).await?;
107 check_schema_name_duplicate(&schema.name, schema.database_id, &txn).await?;
108
109 let schema_obj = Self::create_object(
110 &txn,
111 ObjectType::Schema,
112 owner_id,
113 Some(schema.database_id),
114 None,
115 )
116 .await?;
117 let mut schema: schema::ActiveModel = schema.into();
118 schema.schema_id = Set(schema_obj.oid.as_schema_id());
119 let schema = schema.insert(&txn).await?;
120
121 let updated_user_info =
122 grant_default_privileges_automatically(&txn, schema_obj.oid).await?;
123
124 txn.commit().await?;
125
126 let mut version = self
127 .notify_frontend(
128 NotificationOperation::Add,
129 NotificationInfo::Schema(ObjectModel(schema, schema_obj).into()),
130 )
131 .await;
132
133 if !updated_user_info.is_empty() {
135 version = self.notify_users_update(updated_user_info).await;
136 }
137
138 Ok(version)
139 }
140
141 pub async fn create_subscription_catalog(
142 &self,
143 pb_subscription: &mut PbSubscription,
144 ) -> MetaResult<()> {
145 let inner = self.inner.write().await;
146 let txn = inner.db.begin().await?;
147
148 ensure_user_id(pb_subscription.owner as _, &txn).await?;
149 ensure_object_id(ObjectType::Database, pb_subscription.database_id, &txn).await?;
150 ensure_object_id(ObjectType::Schema, pb_subscription.schema_id, &txn).await?;
151 check_subscription_name_duplicate(pb_subscription, &txn).await?;
152
153 let obj = Self::create_object(
154 &txn,
155 ObjectType::Subscription,
156 pb_subscription.owner as _,
157 Some(pb_subscription.database_id),
158 Some(pb_subscription.schema_id),
159 )
160 .await?;
161 pb_subscription.id = obj.oid.as_subscription_id();
162 let subscription: subscription::ActiveModel = pb_subscription.clone().into();
163 Subscription::insert(subscription).exec(&txn).await?;
164
165 ObjectDependency::insert(object_dependency::ActiveModel {
167 oid: Set(pb_subscription.dependent_table_id.into()),
168 used_by: Set(pb_subscription.id.into()),
169 ..Default::default()
170 })
171 .exec(&txn)
172 .await?;
173 txn.commit().await?;
174 Ok(())
175 }
176
177 pub async fn create_source(
178 &self,
179 mut pb_source: PbSource,
180 ) -> MetaResult<(SourceId, NotificationVersion)> {
181 let mut inner = self.inner.write().await;
182 let owner_id = pb_source.owner as _;
183 let txn = inner.db.begin().await?;
184 ensure_user_id(owner_id, &txn).await?;
185 ensure_object_id(ObjectType::Database, pb_source.database_id, &txn).await?;
186 ensure_object_id(ObjectType::Schema, pb_source.schema_id, &txn).await?;
187 check_relation_name_duplicate(
188 &pb_source.name,
189 pb_source.database_id,
190 pb_source.schema_id,
191 &txn,
192 )
193 .await?;
194
195 let secret_ids = get_referred_secret_ids_from_source(&pb_source)?;
197 let connection_ids = get_referred_connection_ids_from_source(&pb_source);
198
199 let source_obj = Self::create_object(
200 &txn,
201 ObjectType::Source,
202 owner_id,
203 Some(pb_source.database_id),
204 Some(pb_source.schema_id),
205 )
206 .await?;
207 let source_id = source_obj.oid.as_source_id();
208 pb_source.id = source_id;
209 let source: source::ActiveModel = pb_source.clone().into();
210 Source::insert(source).exec(&txn).await?;
211
212 let dep_relation_ids = secret_ids
214 .iter()
215 .copied()
216 .map_into()
217 .chain(connection_ids.iter().copied().map_into());
218 if !secret_ids.is_empty() || !connection_ids.is_empty() {
219 ObjectDependency::insert_many(dep_relation_ids.map(|id| {
220 object_dependency::ActiveModel {
221 oid: Set(id),
222 used_by: Set(source_id.as_object_id()),
223 ..Default::default()
224 }
225 }))
226 .exec(&txn)
227 .await?;
228 }
229
230 let mut job_notifications = vec![];
231 let mut updated_user_info = vec![];
232 if pb_source.name.starts_with(ICEBERG_SOURCE_PREFIX) {
234 let table_name = pb_source.name.trim_start_matches(ICEBERG_SOURCE_PREFIX);
236 let table_id = Table::find()
237 .select_only()
238 .column(table::Column::TableId)
239 .join(JoinType::InnerJoin, table::Relation::Object1.def())
240 .filter(
241 object::Column::DatabaseId
242 .eq(pb_source.database_id)
243 .and(object::Column::SchemaId.eq(pb_source.schema_id))
244 .and(table::Column::Name.eq(table_name)),
245 )
246 .into_tuple::<TableId>()
247 .one(&txn)
248 .await?
249 .ok_or(MetaError::from(anyhow!("table {} not found", table_name)))?;
250 let table_notifications =
251 Self::finish_streaming_job_inner(&txn, table_id.as_job_id()).await?;
252 job_notifications.push((table_id.as_job_id(), table_notifications));
253
254 let sink_name = format!("{}{}", ICEBERG_SINK_PREFIX, table_name);
256 let sink_id = Sink::find()
257 .select_only()
258 .column(sink::Column::SinkId)
259 .join(JoinType::InnerJoin, sink::Relation::Object.def())
260 .filter(
261 object::Column::DatabaseId
262 .eq(pb_source.database_id)
263 .and(object::Column::SchemaId.eq(pb_source.schema_id))
264 .and(sink::Column::Name.eq(&sink_name)),
265 )
266 .into_tuple::<SinkId>()
267 .one(&txn)
268 .await?
269 .ok_or(MetaError::from(anyhow!("sink {} not found", sink_name)))?;
270 let sink_job_id = sink_id.as_job_id();
271 let sink_notifications = Self::finish_streaming_job_inner(&txn, sink_job_id).await?;
272 job_notifications.push((sink_job_id, sink_notifications));
273 } else {
274 updated_user_info = grant_default_privileges_automatically(&txn, source_id).await?;
275 }
276
277 txn.commit().await?;
278
279 for (job_id, (op, objects, user_info)) in job_notifications {
280 let mut version = self
281 .notify_frontend(op, NotificationInfo::ObjectGroup(PbObjectGroup { objects }))
282 .await;
283 if !user_info.is_empty() {
284 version = self.notify_users_update(user_info).await;
285 }
286 inner
287 .creating_table_finish_notifier
288 .values_mut()
289 .for_each(|creating_tables| {
290 if let Some(txs) = creating_tables.remove(&job_id) {
291 for tx in txs {
292 let _ = tx.send(Ok(version));
293 }
294 }
295 });
296 }
297
298 let mut version = self
299 .notify_frontend_relation_info(
300 NotificationOperation::Add,
301 PbObjectInfo::Source(pb_source),
302 )
303 .await;
304
305 if !updated_user_info.is_empty() {
307 version = self.notify_users_update(updated_user_info).await;
308 }
309
310 Ok((source_id, version))
311 }
312
313 pub async fn create_function(
314 &self,
315 mut pb_function: PbFunction,
316 ) -> MetaResult<NotificationVersion> {
317 let inner = self.inner.write().await;
318 let owner_id = pb_function.owner as _;
319 let txn = inner.db.begin().await?;
320 ensure_user_id(owner_id, &txn).await?;
321 ensure_object_id(ObjectType::Database, pb_function.database_id, &txn).await?;
322 ensure_object_id(ObjectType::Schema, pb_function.schema_id, &txn).await?;
323 check_function_signature_duplicate(&pb_function, &txn).await?;
324
325 let function_obj = Self::create_object(
326 &txn,
327 ObjectType::Function,
328 owner_id,
329 Some(pb_function.database_id),
330 Some(pb_function.schema_id),
331 )
332 .await?;
333 pb_function.id = function_obj.oid.as_function_id();
334 pb_function.created_at_epoch = Some(
335 Epoch::from_unix_millis(function_obj.created_at.and_utc().timestamp_millis() as _).0,
336 );
337 pb_function.created_at_cluster_version = function_obj.created_at_cluster_version;
338 let function: function::ActiveModel = pb_function.clone().into();
339 Function::insert(function).exec(&txn).await?;
340
341 let updated_user_info =
342 grant_default_privileges_automatically(&txn, function_obj.oid).await?;
343
344 txn.commit().await?;
345
346 let mut version = self
347 .notify_frontend(
348 NotificationOperation::Add,
349 NotificationInfo::Function(pb_function),
350 )
351 .await;
352
353 if !updated_user_info.is_empty() {
355 version = self.notify_users_update(updated_user_info).await;
356 }
357
358 Ok(version)
359 }
360
361 pub async fn create_connection(
362 &self,
363 mut pb_connection: PbConnection,
364 ) -> MetaResult<NotificationVersion> {
365 let inner = self.inner.write().await;
366 let owner_id = pb_connection.owner as _;
367 let txn = inner.db.begin().await?;
368 ensure_user_id(owner_id, &txn).await?;
369 ensure_object_id(ObjectType::Database, pb_connection.database_id, &txn).await?;
370 ensure_object_id(ObjectType::Schema, pb_connection.schema_id, &txn).await?;
371 check_connection_name_duplicate(&pb_connection, &txn).await?;
372
373 let mut dep_secrets: HashSet<SecretId> = HashSet::new();
374 if let Some(ConnectionInfo::ConnectionParams(params)) = &pb_connection.info {
375 dep_secrets.extend(
376 params
377 .secret_refs
378 .values()
379 .map(|secret_ref| secret_ref.secret_id),
380 );
381 }
382
383 let conn_obj = Self::create_object(
384 &txn,
385 ObjectType::Connection,
386 owner_id,
387 Some(pb_connection.database_id),
388 Some(pb_connection.schema_id),
389 )
390 .await?;
391 pb_connection.id = conn_obj.oid.as_connection_id();
392 let connection: connection::ActiveModel = pb_connection.clone().into();
393 Connection::insert(connection).exec(&txn).await?;
394
395 for secret_id in dep_secrets {
396 ObjectDependency::insert(object_dependency::ActiveModel {
397 oid: Set(secret_id.as_object_id()),
398 used_by: Set(conn_obj.oid),
399 ..Default::default()
400 })
401 .exec(&txn)
402 .await?;
403 }
404
405 let updated_user_info = grant_default_privileges_automatically(&txn, conn_obj.oid).await?;
406
407 txn.commit().await?;
408
409 {
410 report_event(
412 PbTelemetryEventStage::Unspecified,
413 "connection_create",
414 pb_connection.get_id().as_raw_id() as i64,
415 {
416 pb_connection.info.as_ref().and_then(|info| match info {
417 ConnectionInfo::ConnectionParams(params) => {
418 Some(params.connection_type().as_str_name().to_owned())
419 }
420 _ => None,
421 })
422 },
423 None,
424 None,
425 );
426 }
427
428 let mut version = self
429 .notify_frontend(
430 NotificationOperation::Add,
431 NotificationInfo::Connection(pb_connection),
432 )
433 .await;
434
435 if !updated_user_info.is_empty() {
437 version = self.notify_users_update(updated_user_info).await;
438 }
439
440 Ok(version)
441 }
442
443 pub async fn create_secret(
444 &self,
445 mut pb_secret: PbSecret,
446 secret_plain_payload: Vec<u8>,
447 ) -> MetaResult<NotificationVersion> {
448 let inner = self.inner.write().await;
449 let owner_id = pb_secret.owner as _;
450 let txn = inner.db.begin().await?;
451 ensure_user_id(owner_id, &txn).await?;
452 ensure_object_id(ObjectType::Database, pb_secret.database_id, &txn).await?;
453 ensure_object_id(ObjectType::Schema, pb_secret.schema_id, &txn).await?;
454 check_secret_name_duplicate(&pb_secret, &txn).await?;
455
456 let secret_obj = Self::create_object(
457 &txn,
458 ObjectType::Secret,
459 owner_id,
460 Some(pb_secret.database_id),
461 Some(pb_secret.schema_id),
462 )
463 .await?;
464 pb_secret.id = secret_obj.oid.as_secret_id();
465 let secret: secret::ActiveModel = pb_secret.clone().into();
466 Secret::insert(secret).exec(&txn).await?;
467
468 let updated_user_info =
469 grant_default_privileges_automatically(&txn, secret_obj.oid).await?;
470
471 txn.commit().await?;
472
473 let mut secret_plain = pb_secret;
475 secret_plain.value.clone_from(&secret_plain_payload);
476
477 LocalSecretManager::global().add_secret(secret_plain.id, secret_plain_payload);
478 self.env
479 .notification_manager()
480 .notify_compute_without_version(Operation::Add, Info::Secret(secret_plain.clone()));
481
482 let mut version = self
483 .notify_frontend(
484 NotificationOperation::Add,
485 NotificationInfo::Secret(secret_plain),
486 )
487 .await;
488
489 if !updated_user_info.is_empty() {
491 version = self.notify_users_update(updated_user_info).await;
492 }
493
494 Ok(version)
495 }
496
497 pub async fn create_view(
498 &self,
499 mut pb_view: PbView,
500 dependencies: HashSet<ObjectId>,
501 ) -> MetaResult<NotificationVersion> {
502 let inner = self.inner.write().await;
503 let owner_id = pb_view.owner as _;
504 let txn = inner.db.begin().await?;
505 ensure_user_id(owner_id, &txn).await?;
506 ensure_object_id(ObjectType::Database, pb_view.database_id, &txn).await?;
507 ensure_object_id(ObjectType::Schema, pb_view.schema_id, &txn).await?;
508 check_relation_name_duplicate(&pb_view.name, pb_view.database_id, pb_view.schema_id, &txn)
509 .await?;
510 ensure_object_id(ObjectType::Schema, pb_view.schema_id, &txn).await?;
511 check_relation_name_duplicate(&pb_view.name, pb_view.database_id, pb_view.schema_id, &txn)
512 .await?;
513
514 let view_obj = Self::create_object(
515 &txn,
516 ObjectType::View,
517 owner_id,
518 Some(pb_view.database_id),
519 Some(pb_view.schema_id),
520 )
521 .await?;
522 pb_view.id = view_obj.oid.as_view_id();
523 pb_view.created_at_epoch =
524 Some(Epoch::from_unix_millis(view_obj.created_at.and_utc().timestamp_millis() as _).0);
525 pb_view.created_at_cluster_version = view_obj.created_at_cluster_version;
526
527 let view: view::ActiveModel = pb_view.clone().into();
528 View::insert(view).exec(&txn).await?;
529
530 for obj_id in dependencies {
531 ObjectDependency::insert(object_dependency::ActiveModel {
532 oid: Set(obj_id),
533 used_by: Set(view_obj.oid),
534 ..Default::default()
535 })
536 .exec(&txn)
537 .await?;
538 }
539
540 let updated_user_info = grant_default_privileges_automatically(&txn, view_obj.oid).await?;
541
542 txn.commit().await?;
543
544 let mut version = self
545 .notify_frontend_relation_info(NotificationOperation::Add, PbObjectInfo::View(pb_view))
546 .await;
547
548 if !updated_user_info.is_empty() {
550 version = self.notify_users_update(updated_user_info).await;
551 }
552
553 Ok(version)
554 }
555
556 pub async fn validate_cross_db_snapshot_backfill(
557 &self,
558 cross_db_snapshot_backfill_info: &SnapshotBackfillInfo,
559 ) -> MetaResult<()> {
560 if cross_db_snapshot_backfill_info
561 .upstream_mv_table_id_to_backfill_epoch
562 .is_empty()
563 {
564 return Ok(());
565 }
566
567 let inner = self.inner.read().await;
568 let table_ids = cross_db_snapshot_backfill_info
569 .upstream_mv_table_id_to_backfill_epoch
570 .keys()
571 .copied()
572 .map_into()
573 .collect_vec();
574 let cnt = Subscription::find()
575 .select_only()
576 .column(subscription::Column::DependentTableId)
577 .distinct()
578 .filter(subscription::Column::DependentTableId.is_in::<TableId, _>(table_ids))
579 .count(&inner.db)
580 .await? as usize;
581
582 if cnt
583 < cross_db_snapshot_backfill_info
584 .upstream_mv_table_id_to_backfill_epoch
585 .keys()
586 .count()
587 {
588 return Err(MetaError::permission_denied(
589 "Some upstream tables are not subscribed".to_owned(),
590 ));
591 }
592
593 Ok(())
594 }
595}