1use risingwave_common::system_param::{OverrideValidate, Validate};
16use risingwave_common::util::epoch::Epoch;
17
18use super::*;
19use crate::barrier::SnapshotBackfillInfo;
20
21impl CatalogController {
22 pub(crate) async fn create_object(
23 txn: &DatabaseTransaction,
24 obj_type: ObjectType,
25 owner_id: UserId,
26 database_id: Option<DatabaseId>,
27 schema_id: Option<SchemaId>,
28 ) -> MetaResult<object::Model> {
29 let active_db = object::ActiveModel {
30 oid: Default::default(),
31 obj_type: Set(obj_type),
32 owner_id: Set(owner_id),
33 schema_id: Set(schema_id),
34 database_id: Set(database_id),
35 initialized_at: Default::default(),
36 created_at: Default::default(),
37 initialized_at_cluster_version: Set(Some(current_cluster_version())),
38 created_at_cluster_version: Set(Some(current_cluster_version())),
39 };
40 Ok(active_db.insert(txn).await?)
41 }
42
43 pub async fn create_database(
44 &self,
45 db: PbDatabase,
46 ) -> MetaResult<(NotificationVersion, risingwave_meta_model::database::Model)> {
47 if let Some(ref interval) = db.barrier_interval_ms {
49 OverrideValidate::barrier_interval_ms(interval).map_err(|e| anyhow::anyhow!(e))?;
50 }
51 if let Some(ref frequency) = db.checkpoint_frequency {
52 OverrideValidate::checkpoint_frequency(frequency).map_err(|e| anyhow::anyhow!(e))?;
53 }
54
55 let inner = self.inner.write().await;
56 let owner_id = db.owner as _;
57 let txn = inner.db.begin().await?;
58 ensure_user_id(owner_id, &txn).await?;
59 check_database_name_duplicate(&db.name, &txn).await?;
60
61 let db_obj = Self::create_object(&txn, ObjectType::Database, owner_id, None, None).await?;
62 let mut db: database::ActiveModel = db.into();
63 db.database_id = Set(db_obj.oid);
64 let db = db.insert(&txn).await?;
65
66 let mut schemas = vec![];
67 for schema_name in iter::once(DEFAULT_SCHEMA_NAME).chain(SYSTEM_SCHEMAS) {
68 let schema_obj =
69 Self::create_object(&txn, ObjectType::Schema, owner_id, Some(db_obj.oid), None)
70 .await?;
71 let schema = schema::ActiveModel {
72 schema_id: Set(schema_obj.oid),
73 name: Set(schema_name.into()),
74 };
75 let schema = schema.insert(&txn).await?;
76 schemas.push(ObjectModel(schema, schema_obj).into());
77 }
78 txn.commit().await?;
79
80 let mut version = self
81 .notify_frontend(
82 NotificationOperation::Add,
83 NotificationInfo::Database(ObjectModel(db.clone(), db_obj).into()),
84 )
85 .await;
86 for schema in schemas {
87 version = self
88 .notify_frontend(NotificationOperation::Add, NotificationInfo::Schema(schema))
89 .await;
90 }
91
92 Ok((version, db))
93 }
94
95 pub async fn create_schema(&self, schema: PbSchema) -> MetaResult<NotificationVersion> {
96 let inner = self.inner.write().await;
97 let owner_id = schema.owner as _;
98 let txn = inner.db.begin().await?;
99 ensure_user_id(owner_id, &txn).await?;
100 ensure_object_id(ObjectType::Database, schema.database_id as _, &txn).await?;
101 check_schema_name_duplicate(&schema.name, schema.database_id as _, &txn).await?;
102
103 let schema_obj = Self::create_object(
104 &txn,
105 ObjectType::Schema,
106 owner_id,
107 Some(schema.database_id as _),
108 None,
109 )
110 .await?;
111 let mut schema: schema::ActiveModel = schema.into();
112 schema.schema_id = Set(schema_obj.oid);
113 let schema = schema.insert(&txn).await?;
114
115 let updated_user_info =
116 grant_default_privileges_automatically(&txn, schema_obj.oid).await?;
117
118 txn.commit().await?;
119
120 let mut version = self
121 .notify_frontend(
122 NotificationOperation::Add,
123 NotificationInfo::Schema(ObjectModel(schema, schema_obj).into()),
124 )
125 .await;
126
127 if !updated_user_info.is_empty() {
129 version = self.notify_users_update(updated_user_info).await;
130 }
131
132 Ok(version)
133 }
134
135 pub async fn create_subscription_catalog(
136 &self,
137 pb_subscription: &mut PbSubscription,
138 ) -> MetaResult<()> {
139 let inner = self.inner.write().await;
140 let txn = inner.db.begin().await?;
141
142 ensure_user_id(pb_subscription.owner as _, &txn).await?;
143 ensure_object_id(ObjectType::Database, pb_subscription.database_id as _, &txn).await?;
144 ensure_object_id(ObjectType::Schema, pb_subscription.schema_id as _, &txn).await?;
145 check_subscription_name_duplicate(pb_subscription, &txn).await?;
146
147 let obj = Self::create_object(
148 &txn,
149 ObjectType::Subscription,
150 pb_subscription.owner as _,
151 Some(pb_subscription.database_id as _),
152 Some(pb_subscription.schema_id as _),
153 )
154 .await?;
155 pb_subscription.id = obj.oid as _;
156 let subscription: subscription::ActiveModel = pb_subscription.clone().into();
157 Subscription::insert(subscription).exec(&txn).await?;
158
159 ObjectDependency::insert(object_dependency::ActiveModel {
161 oid: Set(pb_subscription.dependent_table_id as _),
162 used_by: Set(pb_subscription.id as _),
163 ..Default::default()
164 })
165 .exec(&txn)
166 .await?;
167 txn.commit().await?;
168 Ok(())
169 }
170
171 pub async fn create_source(
172 &self,
173 mut pb_source: PbSource,
174 ) -> MetaResult<(SourceId, NotificationVersion)> {
175 let inner = self.inner.write().await;
176 let owner_id = pb_source.owner as _;
177 let txn = inner.db.begin().await?;
178 ensure_user_id(owner_id, &txn).await?;
179 ensure_object_id(ObjectType::Database, pb_source.database_id as _, &txn).await?;
180 ensure_object_id(ObjectType::Schema, pb_source.schema_id as _, &txn).await?;
181 check_relation_name_duplicate(
182 &pb_source.name,
183 pb_source.database_id as _,
184 pb_source.schema_id as _,
185 &txn,
186 )
187 .await?;
188
189 let secret_ids = get_referred_secret_ids_from_source(&pb_source)?;
191 let connection_ids = get_referred_connection_ids_from_source(&pb_source);
192
193 let source_obj = Self::create_object(
194 &txn,
195 ObjectType::Source,
196 owner_id,
197 Some(pb_source.database_id as _),
198 Some(pb_source.schema_id as _),
199 )
200 .await?;
201 let source_id = source_obj.oid;
202 pb_source.id = source_id as _;
203 let source: source::ActiveModel = pb_source.clone().into();
204 Source::insert(source).exec(&txn).await?;
205
206 let dep_relation_ids = secret_ids.iter().chain(connection_ids.iter());
208 if !secret_ids.is_empty() || !connection_ids.is_empty() {
209 ObjectDependency::insert_many(dep_relation_ids.map(|id| {
210 object_dependency::ActiveModel {
211 oid: Set(*id as _),
212 used_by: Set(source_id as _),
213 ..Default::default()
214 }
215 }))
216 .exec(&txn)
217 .await?;
218 }
219
220 let updated_user_info = grant_default_privileges_automatically(&txn, source_id).await?;
221
222 txn.commit().await?;
223
224 let mut version = self
225 .notify_frontend_relation_info(
226 NotificationOperation::Add,
227 PbObjectInfo::Source(pb_source),
228 )
229 .await;
230
231 if !updated_user_info.is_empty() {
233 version = self.notify_users_update(updated_user_info).await;
234 }
235
236 Ok((source_id, version))
237 }
238
239 pub async fn create_function(
240 &self,
241 mut pb_function: PbFunction,
242 ) -> MetaResult<NotificationVersion> {
243 let inner = self.inner.write().await;
244 let owner_id = pb_function.owner as _;
245 let txn = inner.db.begin().await?;
246 ensure_user_id(owner_id, &txn).await?;
247 ensure_object_id(ObjectType::Database, pb_function.database_id as _, &txn).await?;
248 ensure_object_id(ObjectType::Schema, pb_function.schema_id as _, &txn).await?;
249 check_function_signature_duplicate(&pb_function, &txn).await?;
250
251 let function_obj = Self::create_object(
252 &txn,
253 ObjectType::Function,
254 owner_id,
255 Some(pb_function.database_id as _),
256 Some(pb_function.schema_id as _),
257 )
258 .await?;
259 pb_function.id = function_obj.oid as _;
260 pb_function.created_at_epoch = Some(
261 Epoch::from_unix_millis(function_obj.created_at.and_utc().timestamp_millis() as _).0,
262 );
263 pb_function.created_at_cluster_version = function_obj.created_at_cluster_version;
264 let function: function::ActiveModel = pb_function.clone().into();
265 Function::insert(function).exec(&txn).await?;
266
267 let updated_user_info =
268 grant_default_privileges_automatically(&txn, function_obj.oid).await?;
269
270 txn.commit().await?;
271
272 let mut version = self
273 .notify_frontend(
274 NotificationOperation::Add,
275 NotificationInfo::Function(pb_function),
276 )
277 .await;
278
279 if !updated_user_info.is_empty() {
281 version = self.notify_users_update(updated_user_info).await;
282 }
283
284 Ok(version)
285 }
286
287 pub async fn create_connection(
288 &self,
289 mut pb_connection: PbConnection,
290 ) -> MetaResult<NotificationVersion> {
291 let inner = self.inner.write().await;
292 let owner_id = pb_connection.owner as _;
293 let txn = inner.db.begin().await?;
294 ensure_user_id(owner_id, &txn).await?;
295 ensure_object_id(ObjectType::Database, pb_connection.database_id as _, &txn).await?;
296 ensure_object_id(ObjectType::Schema, pb_connection.schema_id as _, &txn).await?;
297 check_connection_name_duplicate(&pb_connection, &txn).await?;
298
299 let mut dep_secrets = HashSet::new();
300 if let Some(ConnectionInfo::ConnectionParams(params)) = &pb_connection.info {
301 dep_secrets.extend(
302 params
303 .secret_refs
304 .values()
305 .map(|secret_ref| secret_ref.secret_id),
306 );
307 }
308
309 let conn_obj = Self::create_object(
310 &txn,
311 ObjectType::Connection,
312 owner_id,
313 Some(pb_connection.database_id as _),
314 Some(pb_connection.schema_id as _),
315 )
316 .await?;
317 pb_connection.id = conn_obj.oid as _;
318 let connection: connection::ActiveModel = pb_connection.clone().into();
319 Connection::insert(connection).exec(&txn).await?;
320
321 for secret_id in dep_secrets {
322 ObjectDependency::insert(object_dependency::ActiveModel {
323 oid: Set(secret_id as _),
324 used_by: Set(conn_obj.oid),
325 ..Default::default()
326 })
327 .exec(&txn)
328 .await?;
329 }
330
331 let updated_user_info = grant_default_privileges_automatically(&txn, conn_obj.oid).await?;
332
333 txn.commit().await?;
334
335 {
336 report_event(
338 PbTelemetryEventStage::Unspecified,
339 "connection_create",
340 pb_connection.get_id() as _,
341 {
342 pb_connection.info.as_ref().and_then(|info| match info {
343 ConnectionInfo::ConnectionParams(params) => {
344 Some(params.connection_type().as_str_name().to_owned())
345 }
346 _ => None,
347 })
348 },
349 None,
350 None,
351 );
352 }
353
354 let mut version = self
355 .notify_frontend(
356 NotificationOperation::Add,
357 NotificationInfo::Connection(pb_connection),
358 )
359 .await;
360
361 if !updated_user_info.is_empty() {
363 version = self.notify_users_update(updated_user_info).await;
364 }
365
366 Ok(version)
367 }
368
369 pub async fn create_secret(
370 &self,
371 mut pb_secret: PbSecret,
372 secret_plain_payload: Vec<u8>,
373 ) -> MetaResult<NotificationVersion> {
374 let inner = self.inner.write().await;
375 let owner_id = pb_secret.owner as _;
376 let txn = inner.db.begin().await?;
377 ensure_user_id(owner_id, &txn).await?;
378 ensure_object_id(ObjectType::Database, pb_secret.database_id as _, &txn).await?;
379 ensure_object_id(ObjectType::Schema, pb_secret.schema_id as _, &txn).await?;
380 check_secret_name_duplicate(&pb_secret, &txn).await?;
381
382 let secret_obj = Self::create_object(
383 &txn,
384 ObjectType::Secret,
385 owner_id,
386 Some(pb_secret.database_id as _),
387 Some(pb_secret.schema_id as _),
388 )
389 .await?;
390 pb_secret.id = secret_obj.oid as _;
391 let secret: secret::ActiveModel = pb_secret.clone().into();
392 Secret::insert(secret).exec(&txn).await?;
393
394 let updated_user_info =
395 grant_default_privileges_automatically(&txn, secret_obj.oid).await?;
396
397 txn.commit().await?;
398
399 let mut secret_plain = pb_secret;
401 secret_plain.value.clone_from(&secret_plain_payload);
402
403 LocalSecretManager::global().add_secret(secret_plain.id, secret_plain_payload);
404 self.env
405 .notification_manager()
406 .notify_compute_without_version(Operation::Add, Info::Secret(secret_plain.clone()));
407
408 let mut version = self
409 .notify_frontend(
410 NotificationOperation::Add,
411 NotificationInfo::Secret(secret_plain),
412 )
413 .await;
414
415 if !updated_user_info.is_empty() {
417 version = self.notify_users_update(updated_user_info).await;
418 }
419
420 Ok(version)
421 }
422
423 pub async fn create_view(
424 &self,
425 mut pb_view: PbView,
426 dependencies: HashSet<ObjectId>,
427 ) -> MetaResult<NotificationVersion> {
428 let inner = self.inner.write().await;
429 let owner_id = pb_view.owner as _;
430 let txn = inner.db.begin().await?;
431 ensure_user_id(owner_id, &txn).await?;
432 ensure_object_id(ObjectType::Database, pb_view.database_id as _, &txn).await?;
433 ensure_object_id(ObjectType::Schema, pb_view.schema_id as _, &txn).await?;
434 check_relation_name_duplicate(
435 &pb_view.name,
436 pb_view.database_id as _,
437 pb_view.schema_id as _,
438 &txn,
439 )
440 .await?;
441
442 let view_obj = Self::create_object(
443 &txn,
444 ObjectType::View,
445 owner_id,
446 Some(pb_view.database_id as _),
447 Some(pb_view.schema_id as _),
448 )
449 .await?;
450 pb_view.id = view_obj.oid as _;
451 let view: view::ActiveModel = pb_view.clone().into();
452 View::insert(view).exec(&txn).await?;
453
454 for obj_id in dependencies {
455 ObjectDependency::insert(object_dependency::ActiveModel {
456 oid: Set(obj_id),
457 used_by: Set(view_obj.oid),
458 ..Default::default()
459 })
460 .exec(&txn)
461 .await?;
462 }
463 let updated_user_info = grant_default_privileges_automatically(&txn, view_obj.oid).await?;
464
465 txn.commit().await?;
466
467 let mut version = self
468 .notify_frontend_relation_info(NotificationOperation::Add, PbObjectInfo::View(pb_view))
469 .await;
470
471 if !updated_user_info.is_empty() {
473 version = self.notify_users_update(updated_user_info).await;
474 }
475
476 Ok(version)
477 }
478
479 pub async fn validate_cross_db_snapshot_backfill(
480 &self,
481 cross_db_snapshot_backfill_info: &SnapshotBackfillInfo,
482 ) -> MetaResult<()> {
483 if cross_db_snapshot_backfill_info
484 .upstream_mv_table_id_to_backfill_epoch
485 .is_empty()
486 {
487 return Ok(());
488 }
489
490 let inner = self.inner.read().await;
491 let table_ids = cross_db_snapshot_backfill_info
492 .upstream_mv_table_id_to_backfill_epoch
493 .keys()
494 .map(|t| t.table_id as ObjectId)
495 .collect_vec();
496 let cnt = Subscription::find()
497 .select_only()
498 .column(subscription::Column::DependentTableId)
499 .distinct()
500 .filter(subscription::Column::DependentTableId.is_in(table_ids))
501 .count(&inner.db)
502 .await? as usize;
503
504 if cnt
505 < cross_db_snapshot_backfill_info
506 .upstream_mv_table_id_to_backfill_epoch
507 .keys()
508 .count()
509 {
510 return Err(MetaError::permission_denied(
511 "Some upstream tables are not subscribed".to_owned(),
512 ));
513 }
514
515 Ok(())
516 }
517}