1use risingwave_common::cast::datetime_to_timestamp_millis;
16use risingwave_common::catalog::{ICEBERG_SINK_PREFIX, ICEBERG_SOURCE_PREFIX};
17use risingwave_common::system_param::{OverrideValidate, Validate};
18use risingwave_common::util::epoch::Epoch;
19
20use super::*;
21use crate::barrier::SnapshotBackfillInfo;
22
23impl CatalogController {
24 pub(crate) async fn create_object(
25 txn: &DatabaseTransaction,
26 obj_type: ObjectType,
27 owner_id: UserId,
28 database_id: Option<DatabaseId>,
29 schema_id: Option<SchemaId>,
30 ) -> MetaResult<object::Model> {
31 let active_db = object::ActiveModel {
32 oid: Default::default(),
33 obj_type: Set(obj_type),
34 owner_id: Set(owner_id),
35 schema_id: Set(schema_id),
36 database_id: Set(database_id),
37 initialized_at: Default::default(),
38 created_at: Default::default(),
39 initialized_at_cluster_version: Set(Some(current_cluster_version())),
40 created_at_cluster_version: Set(Some(current_cluster_version())),
41 };
42 Ok(active_db.insert(txn).await?)
43 }
44
45 pub async fn create_database(
46 &self,
47 db: PbDatabase,
48 ) -> MetaResult<(NotificationVersion, risingwave_meta_model::database::Model)> {
49 if let Some(ref interval) = db.barrier_interval_ms {
51 OverrideValidate::barrier_interval_ms(interval).map_err(|e| anyhow::anyhow!(e))?;
52 }
53 if let Some(ref frequency) = db.checkpoint_frequency {
54 OverrideValidate::checkpoint_frequency(frequency).map_err(|e| anyhow::anyhow!(e))?;
55 }
56
57 let inner = self.inner.write().await;
58 let owner_id = db.owner as _;
59 let txn = inner.db.begin().await?;
60 ensure_user_id(owner_id, &txn).await?;
61 check_database_name_duplicate(&db.name, &txn).await?;
62
63 let db_obj = Self::create_object(&txn, ObjectType::Database, owner_id, None, None).await?;
64 let mut db: database::ActiveModel = db.into();
65 db.database_id = Set(db_obj.oid.as_database_id());
66 let db = db.insert(&txn).await?;
67
68 let mut schemas = vec![];
69 for schema_name in iter::once(DEFAULT_SCHEMA_NAME).chain(SYSTEM_SCHEMAS) {
70 let schema_obj = Self::create_object(
71 &txn,
72 ObjectType::Schema,
73 owner_id,
74 Some(db_obj.oid.as_database_id()),
75 None,
76 )
77 .await?;
78 let schema = schema::ActiveModel {
79 schema_id: Set(schema_obj.oid.as_schema_id()),
80 name: Set(schema_name.into()),
81 };
82 let schema = schema.insert(&txn).await?;
83 schemas.push(ObjectModel(schema, schema_obj).into());
84 }
85 txn.commit().await?;
86
87 let mut version = self
88 .notify_frontend(
89 NotificationOperation::Add,
90 NotificationInfo::Database(ObjectModel(db.clone(), db_obj).into()),
91 )
92 .await;
93 for schema in schemas {
94 version = self
95 .notify_frontend(NotificationOperation::Add, NotificationInfo::Schema(schema))
96 .await;
97 }
98
99 Ok((version, db))
100 }
101
102 pub async fn create_schema(&self, schema: PbSchema) -> MetaResult<NotificationVersion> {
103 let inner = self.inner.write().await;
104 let owner_id = schema.owner as _;
105 let txn = inner.db.begin().await?;
106 ensure_user_id(owner_id, &txn).await?;
107 ensure_object_id(ObjectType::Database, schema.database_id, &txn).await?;
108 check_schema_name_duplicate(&schema.name, schema.database_id, &txn).await?;
109
110 let schema_obj = Self::create_object(
111 &txn,
112 ObjectType::Schema,
113 owner_id,
114 Some(schema.database_id),
115 None,
116 )
117 .await?;
118 let mut schema: schema::ActiveModel = schema.into();
119 schema.schema_id = Set(schema_obj.oid.as_schema_id());
120 let schema = schema.insert(&txn).await?;
121
122 let updated_user_info =
123 grant_default_privileges_automatically(&txn, schema_obj.oid).await?;
124
125 txn.commit().await?;
126
127 let mut version = self
128 .notify_frontend(
129 NotificationOperation::Add,
130 NotificationInfo::Schema(ObjectModel(schema, schema_obj).into()),
131 )
132 .await;
133
134 if !updated_user_info.is_empty() {
136 version = self.notify_users_update(updated_user_info).await;
137 }
138
139 Ok(version)
140 }
141
142 pub async fn create_subscription_catalog(
143 &self,
144 pb_subscription: &mut PbSubscription,
145 ) -> MetaResult<()> {
146 let inner = self.inner.write().await;
147 let txn = inner.db.begin().await?;
148
149 ensure_user_id(pb_subscription.owner as _, &txn).await?;
150 ensure_object_id(ObjectType::Database, pb_subscription.database_id, &txn).await?;
151 ensure_object_id(ObjectType::Schema, pb_subscription.schema_id, &txn).await?;
152 check_subscription_name_duplicate(pb_subscription, &txn).await?;
153
154 let obj = Self::create_object(
155 &txn,
156 ObjectType::Subscription,
157 pb_subscription.owner as _,
158 Some(pb_subscription.database_id),
159 Some(pb_subscription.schema_id),
160 )
161 .await?;
162 pb_subscription.id = obj.oid.as_subscription_id();
163 let subscription: subscription::ActiveModel = pb_subscription.clone().into();
164 Subscription::insert(subscription).exec(&txn).await?;
165
166 ObjectDependency::insert(object_dependency::ActiveModel {
168 oid: Set(pb_subscription.dependent_table_id.into()),
169 used_by: Set(pb_subscription.id.into()),
170 ..Default::default()
171 })
172 .exec(&txn)
173 .await?;
174 txn.commit().await?;
175 Ok(())
176 }
177
178 pub async fn create_source(
179 &self,
180 mut pb_source: PbSource,
181 ) -> MetaResult<(SourceId, NotificationVersion)> {
182 let mut inner = self.inner.write().await;
183 let owner_id = pb_source.owner as _;
184 let txn = inner.db.begin().await?;
185 ensure_user_id(owner_id, &txn).await?;
186 ensure_object_id(ObjectType::Database, pb_source.database_id, &txn).await?;
187 ensure_object_id(ObjectType::Schema, pb_source.schema_id, &txn).await?;
188 check_relation_name_duplicate(
189 &pb_source.name,
190 pb_source.database_id,
191 pb_source.schema_id,
192 &txn,
193 )
194 .await?;
195
196 let secret_ids = get_referred_secret_ids_from_source(&pb_source)?;
198 let connection_ids = get_referred_connection_ids_from_source(&pb_source);
199
200 let source_obj = Self::create_object(
201 &txn,
202 ObjectType::Source,
203 owner_id,
204 Some(pb_source.database_id),
205 Some(pb_source.schema_id),
206 )
207 .await?;
208 let source_id = source_obj.oid.as_source_id();
209 pb_source.id = source_id;
210 let source: source::ActiveModel = pb_source.clone().into();
211 Source::insert(source).exec(&txn).await?;
212
213 let dep_relation_ids = secret_ids
215 .iter()
216 .copied()
217 .map_into()
218 .chain(connection_ids.iter().copied().map_into());
219 if !secret_ids.is_empty() || !connection_ids.is_empty() {
220 ObjectDependency::insert_many(dep_relation_ids.map(|id| {
221 object_dependency::ActiveModel {
222 oid: Set(id),
223 used_by: Set(source_id.as_object_id()),
224 ..Default::default()
225 }
226 }))
227 .exec(&txn)
228 .await?;
229 }
230
231 let mut job_notifications = vec![];
232 let mut updated_user_info = vec![];
233 if pb_source.name.starts_with(ICEBERG_SOURCE_PREFIX) {
235 let table_name = pb_source.name.trim_start_matches(ICEBERG_SOURCE_PREFIX);
237 let table_id = Table::find()
238 .select_only()
239 .column(table::Column::TableId)
240 .join(JoinType::InnerJoin, table::Relation::Object1.def())
241 .filter(
242 object::Column::DatabaseId
243 .eq(pb_source.database_id)
244 .and(object::Column::SchemaId.eq(pb_source.schema_id))
245 .and(table::Column::Name.eq(table_name)),
246 )
247 .into_tuple::<TableId>()
248 .one(&txn)
249 .await?
250 .ok_or(MetaError::from(anyhow!("table {} not found", table_name)))?;
251 let table_notifications =
252 Self::finish_streaming_job_inner(&txn, table_id.as_job_id()).await?;
253 job_notifications.push((table_id.as_job_id(), table_notifications));
254
255 let sink_name = format!("{}{}", ICEBERG_SINK_PREFIX, table_name);
257 let sink_id = Sink::find()
258 .select_only()
259 .column(sink::Column::SinkId)
260 .join(JoinType::InnerJoin, sink::Relation::Object.def())
261 .filter(
262 object::Column::DatabaseId
263 .eq(pb_source.database_id)
264 .and(object::Column::SchemaId.eq(pb_source.schema_id))
265 .and(sink::Column::Name.eq(&sink_name)),
266 )
267 .into_tuple::<SinkId>()
268 .one(&txn)
269 .await?
270 .ok_or(MetaError::from(anyhow!("sink {} not found", sink_name)))?;
271 let sink_job_id = sink_id.as_job_id();
272 let sink_notifications = Self::finish_streaming_job_inner(&txn, sink_job_id).await?;
273 job_notifications.push((sink_job_id, sink_notifications));
274 } else {
275 updated_user_info = grant_default_privileges_automatically(&txn, source_id).await?;
276 }
277
278 txn.commit().await?;
279
280 for (job_id, (op, objects, user_info)) in job_notifications {
281 let mut version = self
282 .notify_frontend(op, NotificationInfo::ObjectGroup(PbObjectGroup { objects }))
283 .await;
284 if !user_info.is_empty() {
285 version = self.notify_users_update(user_info).await;
286 }
287 inner
288 .creating_table_finish_notifier
289 .values_mut()
290 .for_each(|creating_tables| {
291 if let Some(txs) = creating_tables.remove(&job_id) {
292 for tx in txs {
293 let _ = tx.send(Ok(version));
294 }
295 }
296 });
297 }
298
299 let mut version = self
300 .notify_frontend_relation_info(
301 NotificationOperation::Add,
302 PbObjectInfo::Source(pb_source),
303 )
304 .await;
305
306 if !updated_user_info.is_empty() {
308 version = self.notify_users_update(updated_user_info).await;
309 }
310
311 Ok((source_id, version))
312 }
313
314 pub async fn create_function(
315 &self,
316 mut pb_function: PbFunction,
317 ) -> MetaResult<NotificationVersion> {
318 let inner = self.inner.write().await;
319 let owner_id = pb_function.owner as _;
320 let txn = inner.db.begin().await?;
321 ensure_user_id(owner_id, &txn).await?;
322 ensure_object_id(ObjectType::Database, pb_function.database_id, &txn).await?;
323 ensure_object_id(ObjectType::Schema, pb_function.schema_id, &txn).await?;
324 check_function_signature_duplicate(&pb_function, &txn).await?;
325
326 let function_obj = Self::create_object(
327 &txn,
328 ObjectType::Function,
329 owner_id,
330 Some(pb_function.database_id),
331 Some(pb_function.schema_id),
332 )
333 .await?;
334 pb_function.id = function_obj.oid.as_function_id();
335 pb_function.created_at_epoch = Some(
336 Epoch::from_unix_millis(datetime_to_timestamp_millis(function_obj.created_at) as _).0,
337 );
338 pb_function.created_at_cluster_version = function_obj.created_at_cluster_version;
339 let function: function::ActiveModel = pb_function.clone().into();
340 Function::insert(function).exec(&txn).await?;
341
342 let updated_user_info =
343 grant_default_privileges_automatically(&txn, function_obj.oid).await?;
344
345 txn.commit().await?;
346
347 let mut version = self
348 .notify_frontend(
349 NotificationOperation::Add,
350 NotificationInfo::Function(pb_function),
351 )
352 .await;
353
354 if !updated_user_info.is_empty() {
356 version = self.notify_users_update(updated_user_info).await;
357 }
358
359 Ok(version)
360 }
361
362 pub async fn create_connection(
363 &self,
364 mut pb_connection: PbConnection,
365 ) -> MetaResult<NotificationVersion> {
366 let inner = self.inner.write().await;
367 let owner_id = pb_connection.owner as _;
368 let txn = inner.db.begin().await?;
369 ensure_user_id(owner_id, &txn).await?;
370 ensure_object_id(ObjectType::Database, pb_connection.database_id, &txn).await?;
371 ensure_object_id(ObjectType::Schema, pb_connection.schema_id, &txn).await?;
372 check_connection_name_duplicate(&pb_connection, &txn).await?;
373
374 let mut dep_secrets: HashSet<SecretId> = HashSet::new();
375 if let Some(ConnectionInfo::ConnectionParams(params)) = &pb_connection.info {
376 dep_secrets.extend(
377 params
378 .secret_refs
379 .values()
380 .map(|secret_ref| secret_ref.secret_id),
381 );
382 }
383
384 let conn_obj = Self::create_object(
385 &txn,
386 ObjectType::Connection,
387 owner_id,
388 Some(pb_connection.database_id),
389 Some(pb_connection.schema_id),
390 )
391 .await?;
392 pb_connection.id = conn_obj.oid.as_connection_id();
393 let connection: connection::ActiveModel = pb_connection.clone().into();
394 Connection::insert(connection).exec(&txn).await?;
395
396 for secret_id in dep_secrets {
397 ObjectDependency::insert(object_dependency::ActiveModel {
398 oid: Set(secret_id.as_object_id()),
399 used_by: Set(conn_obj.oid),
400 ..Default::default()
401 })
402 .exec(&txn)
403 .await?;
404 }
405
406 let updated_user_info = grant_default_privileges_automatically(&txn, conn_obj.oid).await?;
407
408 txn.commit().await?;
409
410 {
411 report_event(
413 PbTelemetryEventStage::Unspecified,
414 "connection_create",
415 pb_connection.get_id().as_raw_id() as i64,
416 {
417 pb_connection.info.as_ref().and_then(|info| match info {
418 ConnectionInfo::ConnectionParams(params) => {
419 Some(params.connection_type().as_str_name().to_owned())
420 }
421 _ => None,
422 })
423 },
424 None,
425 None,
426 );
427 }
428
429 let mut version = self
430 .notify_frontend(
431 NotificationOperation::Add,
432 NotificationInfo::Connection(pb_connection),
433 )
434 .await;
435
436 if !updated_user_info.is_empty() {
438 version = self.notify_users_update(updated_user_info).await;
439 }
440
441 Ok(version)
442 }
443
444 pub async fn create_secret(
445 &self,
446 mut pb_secret: PbSecret,
447 secret_plain_payload: Vec<u8>,
448 ) -> MetaResult<NotificationVersion> {
449 let inner = self.inner.write().await;
450 let owner_id = pb_secret.owner as _;
451 let txn = inner.db.begin().await?;
452 ensure_user_id(owner_id, &txn).await?;
453 ensure_object_id(ObjectType::Database, pb_secret.database_id, &txn).await?;
454 ensure_object_id(ObjectType::Schema, pb_secret.schema_id, &txn).await?;
455 check_secret_name_duplicate(&pb_secret, &txn).await?;
456
457 let secret_obj = Self::create_object(
458 &txn,
459 ObjectType::Secret,
460 owner_id,
461 Some(pb_secret.database_id),
462 Some(pb_secret.schema_id),
463 )
464 .await?;
465 pb_secret.id = secret_obj.oid.as_secret_id();
466 let secret: secret::ActiveModel = pb_secret.clone().into();
467 Secret::insert(secret).exec(&txn).await?;
468
469 let updated_user_info =
470 grant_default_privileges_automatically(&txn, secret_obj.oid).await?;
471
472 txn.commit().await?;
473
474 let mut secret_plain = pb_secret;
476 secret_plain.value.clone_from(&secret_plain_payload);
477
478 LocalSecretManager::global().add_secret(secret_plain.id, secret_plain_payload);
479 self.env
480 .notification_manager()
481 .notify_compute_without_version(Operation::Add, Info::Secret(secret_plain.clone()));
482
483 let mut version = self
484 .notify_frontend(
485 NotificationOperation::Add,
486 NotificationInfo::Secret(secret_plain),
487 )
488 .await;
489
490 if !updated_user_info.is_empty() {
492 version = self.notify_users_update(updated_user_info).await;
493 }
494
495 Ok(version)
496 }
497
498 pub async fn create_view(
499 &self,
500 mut pb_view: PbView,
501 dependencies: HashSet<ObjectId>,
502 ) -> MetaResult<NotificationVersion> {
503 let inner = self.inner.write().await;
504 let owner_id = pb_view.owner as _;
505 let txn = inner.db.begin().await?;
506 ensure_user_id(owner_id, &txn).await?;
507 ensure_object_id(ObjectType::Database, pb_view.database_id, &txn).await?;
508 ensure_object_id(ObjectType::Schema, pb_view.schema_id, &txn).await?;
509 check_relation_name_duplicate(&pb_view.name, pb_view.database_id, pb_view.schema_id, &txn)
510 .await?;
511 ensure_object_id(ObjectType::Schema, pb_view.schema_id, &txn).await?;
512 check_relation_name_duplicate(&pb_view.name, pb_view.database_id, pb_view.schema_id, &txn)
513 .await?;
514
515 let view_obj = Self::create_object(
516 &txn,
517 ObjectType::View,
518 owner_id,
519 Some(pb_view.database_id),
520 Some(pb_view.schema_id),
521 )
522 .await?;
523 pb_view.id = view_obj.oid.as_view_id();
524 pb_view.created_at_epoch =
525 Some(Epoch::from_unix_millis(datetime_to_timestamp_millis(view_obj.created_at) as _).0);
526 pb_view.created_at_cluster_version = view_obj.created_at_cluster_version;
527
528 let view: view::ActiveModel = pb_view.clone().into();
529 View::insert(view).exec(&txn).await?;
530
531 for obj_id in dependencies {
532 ObjectDependency::insert(object_dependency::ActiveModel {
533 oid: Set(obj_id),
534 used_by: Set(view_obj.oid),
535 ..Default::default()
536 })
537 .exec(&txn)
538 .await?;
539 }
540
541 let updated_user_info = grant_default_privileges_automatically(&txn, view_obj.oid).await?;
542
543 txn.commit().await?;
544
545 let mut version = self
546 .notify_frontend_relation_info(NotificationOperation::Add, PbObjectInfo::View(pb_view))
547 .await;
548
549 if !updated_user_info.is_empty() {
551 version = self.notify_users_update(updated_user_info).await;
552 }
553
554 Ok(version)
555 }
556
557 pub async fn validate_cross_db_snapshot_backfill(
558 &self,
559 cross_db_snapshot_backfill_info: &SnapshotBackfillInfo,
560 ) -> MetaResult<()> {
561 if cross_db_snapshot_backfill_info
562 .upstream_mv_table_id_to_backfill_epoch
563 .is_empty()
564 {
565 return Ok(());
566 }
567
568 let inner = self.inner.read().await;
569 let table_ids = cross_db_snapshot_backfill_info
570 .upstream_mv_table_id_to_backfill_epoch
571 .keys()
572 .copied()
573 .map_into()
574 .collect_vec();
575 let cnt = Subscription::find()
576 .select_only()
577 .column(subscription::Column::DependentTableId)
578 .distinct()
579 .filter(subscription::Column::DependentTableId.is_in::<TableId, _>(table_ids))
580 .count(&inner.db)
581 .await? as usize;
582
583 if cnt
584 < cross_db_snapshot_backfill_info
585 .upstream_mv_table_id_to_backfill_epoch
586 .keys()
587 .count()
588 {
589 return Err(MetaError::permission_denied(
590 "Some upstream tables are not subscribed".to_owned(),
591 ));
592 }
593
594 Ok(())
595 }
596}