1use risingwave_common::system_param::{OverrideValidate, Validate};
16use risingwave_common::util::epoch::Epoch;
17
18use super::*;
19use crate::barrier::SnapshotBackfillInfo;
20
21impl CatalogController {
22 pub(crate) async fn create_object(
23 txn: &DatabaseTransaction,
24 obj_type: ObjectType,
25 owner_id: UserId,
26 database_id: Option<DatabaseId>,
27 schema_id: Option<SchemaId>,
28 ) -> MetaResult<object::Model> {
29 let active_db = object::ActiveModel {
30 oid: Default::default(),
31 obj_type: Set(obj_type),
32 owner_id: Set(owner_id),
33 schema_id: Set(schema_id),
34 database_id: Set(database_id),
35 initialized_at: Default::default(),
36 created_at: Default::default(),
37 initialized_at_cluster_version: Set(Some(current_cluster_version())),
38 created_at_cluster_version: Set(Some(current_cluster_version())),
39 };
40 Ok(active_db.insert(txn).await?)
41 }
42
43 pub async fn create_database(
44 &self,
45 db: PbDatabase,
46 ) -> MetaResult<(NotificationVersion, risingwave_meta_model::database::Model)> {
47 if let Some(ref interval) = db.barrier_interval_ms {
49 OverrideValidate::barrier_interval_ms(interval).map_err(|e| anyhow::anyhow!(e))?;
50 }
51 if let Some(ref frequency) = db.checkpoint_frequency {
52 OverrideValidate::checkpoint_frequency(frequency).map_err(|e| anyhow::anyhow!(e))?;
53 }
54
55 let inner = self.inner.write().await;
56 let owner_id = db.owner as _;
57 let txn = inner.db.begin().await?;
58 ensure_user_id(owner_id, &txn).await?;
59 check_database_name_duplicate(&db.name, &txn).await?;
60
61 let db_obj = Self::create_object(&txn, ObjectType::Database, owner_id, None, None).await?;
62 let mut db: database::ActiveModel = db.into();
63 db.database_id = Set(DatabaseId::new(db_obj.oid as _));
64 let db = db.insert(&txn).await?;
65
66 let mut schemas = vec![];
67 for schema_name in iter::once(DEFAULT_SCHEMA_NAME).chain(SYSTEM_SCHEMAS) {
68 let schema_obj = Self::create_object(
69 &txn,
70 ObjectType::Schema,
71 owner_id,
72 Some(DatabaseId::new(db_obj.oid as _)),
73 None,
74 )
75 .await?;
76 let schema = schema::ActiveModel {
77 schema_id: Set(SchemaId::new(schema_obj.oid as _)),
78 name: Set(schema_name.into()),
79 };
80 let schema = schema.insert(&txn).await?;
81 schemas.push(ObjectModel(schema, schema_obj).into());
82 }
83 txn.commit().await?;
84
85 let mut version = self
86 .notify_frontend(
87 NotificationOperation::Add,
88 NotificationInfo::Database(ObjectModel(db.clone(), db_obj).into()),
89 )
90 .await;
91 for schema in schemas {
92 version = self
93 .notify_frontend(NotificationOperation::Add, NotificationInfo::Schema(schema))
94 .await;
95 }
96
97 Ok((version, db))
98 }
99
100 pub async fn create_schema(&self, schema: PbSchema) -> MetaResult<NotificationVersion> {
101 let inner = self.inner.write().await;
102 let owner_id = schema.owner as _;
103 let txn = inner.db.begin().await?;
104 ensure_user_id(owner_id, &txn).await?;
105 ensure_object_id(ObjectType::Database, schema.database_id as _, &txn).await?;
106 check_schema_name_duplicate(&schema.name, schema.database_id.into(), &txn).await?;
107
108 let schema_obj = Self::create_object(
109 &txn,
110 ObjectType::Schema,
111 owner_id,
112 Some(schema.database_id.into()),
113 None,
114 )
115 .await?;
116 let mut schema: schema::ActiveModel = schema.into();
117 schema.schema_id = Set(SchemaId::new(schema_obj.oid as _));
118 let schema = schema.insert(&txn).await?;
119
120 let updated_user_info =
121 grant_default_privileges_automatically(&txn, schema_obj.oid).await?;
122
123 txn.commit().await?;
124
125 let mut version = self
126 .notify_frontend(
127 NotificationOperation::Add,
128 NotificationInfo::Schema(ObjectModel(schema, schema_obj).into()),
129 )
130 .await;
131
132 if !updated_user_info.is_empty() {
134 version = self.notify_users_update(updated_user_info).await;
135 }
136
137 Ok(version)
138 }
139
140 pub async fn create_subscription_catalog(
141 &self,
142 pb_subscription: &mut PbSubscription,
143 ) -> MetaResult<()> {
144 let inner = self.inner.write().await;
145 let txn = inner.db.begin().await?;
146
147 ensure_user_id(pb_subscription.owner as _, &txn).await?;
148 ensure_object_id(ObjectType::Database, pb_subscription.database_id as _, &txn).await?;
149 ensure_object_id(ObjectType::Schema, pb_subscription.schema_id as _, &txn).await?;
150 check_subscription_name_duplicate(pb_subscription, &txn).await?;
151
152 let obj = Self::create_object(
153 &txn,
154 ObjectType::Subscription,
155 pb_subscription.owner as _,
156 Some(pb_subscription.database_id.into()),
157 Some(pb_subscription.schema_id.into()),
158 )
159 .await?;
160 pb_subscription.id = obj.oid as _;
161 let subscription: subscription::ActiveModel = pb_subscription.clone().into();
162 Subscription::insert(subscription).exec(&txn).await?;
163
164 ObjectDependency::insert(object_dependency::ActiveModel {
166 oid: Set(pb_subscription.dependent_table_id as _),
167 used_by: Set(pb_subscription.id as _),
168 ..Default::default()
169 })
170 .exec(&txn)
171 .await?;
172 txn.commit().await?;
173 Ok(())
174 }
175
176 pub async fn create_source(
177 &self,
178 mut pb_source: PbSource,
179 ) -> MetaResult<(SourceId, NotificationVersion)> {
180 let inner = self.inner.write().await;
181 let owner_id = pb_source.owner as _;
182 let txn = inner.db.begin().await?;
183 ensure_user_id(owner_id, &txn).await?;
184 ensure_object_id(ObjectType::Database, pb_source.database_id as _, &txn).await?;
185 ensure_object_id(ObjectType::Schema, pb_source.schema_id as _, &txn).await?;
186 check_relation_name_duplicate(
187 &pb_source.name,
188 pb_source.database_id.into(),
189 pb_source.schema_id.into(),
190 &txn,
191 )
192 .await?;
193
194 let secret_ids = get_referred_secret_ids_from_source(&pb_source)?;
196 let connection_ids = get_referred_connection_ids_from_source(&pb_source);
197
198 let source_obj = Self::create_object(
199 &txn,
200 ObjectType::Source,
201 owner_id,
202 Some(pb_source.database_id.into()),
203 Some(pb_source.schema_id.into()),
204 )
205 .await?;
206 let source_id = source_obj.oid;
207 pb_source.id = source_id as _;
208 let source: source::ActiveModel = pb_source.clone().into();
209 Source::insert(source).exec(&txn).await?;
210
211 let dep_relation_ids = secret_ids.iter().chain(connection_ids.iter());
213 if !secret_ids.is_empty() || !connection_ids.is_empty() {
214 ObjectDependency::insert_many(dep_relation_ids.map(|id| {
215 object_dependency::ActiveModel {
216 oid: Set(*id as _),
217 used_by: Set(source_id as _),
218 ..Default::default()
219 }
220 }))
221 .exec(&txn)
222 .await?;
223 }
224
225 let updated_user_info = grant_default_privileges_automatically(&txn, source_id).await?;
226
227 txn.commit().await?;
228
229 let mut version = self
230 .notify_frontend_relation_info(
231 NotificationOperation::Add,
232 PbObjectInfo::Source(pb_source),
233 )
234 .await;
235
236 if !updated_user_info.is_empty() {
238 version = self.notify_users_update(updated_user_info).await;
239 }
240
241 Ok((source_id, version))
242 }
243
244 pub async fn create_function(
245 &self,
246 mut pb_function: PbFunction,
247 ) -> MetaResult<NotificationVersion> {
248 let inner = self.inner.write().await;
249 let owner_id = pb_function.owner as _;
250 let txn = inner.db.begin().await?;
251 ensure_user_id(owner_id, &txn).await?;
252 ensure_object_id(ObjectType::Database, pb_function.database_id as _, &txn).await?;
253 ensure_object_id(ObjectType::Schema, pb_function.schema_id as _, &txn).await?;
254 check_function_signature_duplicate(&pb_function, &txn).await?;
255
256 let function_obj = Self::create_object(
257 &txn,
258 ObjectType::Function,
259 owner_id,
260 Some(pb_function.database_id.into()),
261 Some(pb_function.schema_id.into()),
262 )
263 .await?;
264 pb_function.id = function_obj.oid as _;
265 pb_function.created_at_epoch = Some(
266 Epoch::from_unix_millis(function_obj.created_at.and_utc().timestamp_millis() as _).0,
267 );
268 pb_function.created_at_cluster_version = function_obj.created_at_cluster_version;
269 let function: function::ActiveModel = pb_function.clone().into();
270 Function::insert(function).exec(&txn).await?;
271
272 let updated_user_info =
273 grant_default_privileges_automatically(&txn, function_obj.oid).await?;
274
275 txn.commit().await?;
276
277 let mut version = self
278 .notify_frontend(
279 NotificationOperation::Add,
280 NotificationInfo::Function(pb_function),
281 )
282 .await;
283
284 if !updated_user_info.is_empty() {
286 version = self.notify_users_update(updated_user_info).await;
287 }
288
289 Ok(version)
290 }
291
292 pub async fn create_connection(
293 &self,
294 mut pb_connection: PbConnection,
295 ) -> MetaResult<NotificationVersion> {
296 let inner = self.inner.write().await;
297 let owner_id = pb_connection.owner as _;
298 let txn = inner.db.begin().await?;
299 ensure_user_id(owner_id, &txn).await?;
300 ensure_object_id(ObjectType::Database, pb_connection.database_id as _, &txn).await?;
301 ensure_object_id(ObjectType::Schema, pb_connection.schema_id as _, &txn).await?;
302 check_connection_name_duplicate(&pb_connection, &txn).await?;
303
304 let mut dep_secrets = HashSet::new();
305 if let Some(ConnectionInfo::ConnectionParams(params)) = &pb_connection.info {
306 dep_secrets.extend(
307 params
308 .secret_refs
309 .values()
310 .map(|secret_ref| secret_ref.secret_id),
311 );
312 }
313
314 let conn_obj = Self::create_object(
315 &txn,
316 ObjectType::Connection,
317 owner_id,
318 Some(pb_connection.database_id.into()),
319 Some(pb_connection.schema_id.into()),
320 )
321 .await?;
322 pb_connection.id = conn_obj.oid as _;
323 let connection: connection::ActiveModel = pb_connection.clone().into();
324 Connection::insert(connection).exec(&txn).await?;
325
326 for secret_id in dep_secrets {
327 ObjectDependency::insert(object_dependency::ActiveModel {
328 oid: Set(secret_id as _),
329 used_by: Set(conn_obj.oid),
330 ..Default::default()
331 })
332 .exec(&txn)
333 .await?;
334 }
335
336 let updated_user_info = grant_default_privileges_automatically(&txn, conn_obj.oid).await?;
337
338 txn.commit().await?;
339
340 {
341 report_event(
343 PbTelemetryEventStage::Unspecified,
344 "connection_create",
345 pb_connection.get_id() as _,
346 {
347 pb_connection.info.as_ref().and_then(|info| match info {
348 ConnectionInfo::ConnectionParams(params) => {
349 Some(params.connection_type().as_str_name().to_owned())
350 }
351 _ => None,
352 })
353 },
354 None,
355 None,
356 );
357 }
358
359 let mut version = self
360 .notify_frontend(
361 NotificationOperation::Add,
362 NotificationInfo::Connection(pb_connection),
363 )
364 .await;
365
366 if !updated_user_info.is_empty() {
368 version = self.notify_users_update(updated_user_info).await;
369 }
370
371 Ok(version)
372 }
373
374 pub async fn create_secret(
375 &self,
376 mut pb_secret: PbSecret,
377 secret_plain_payload: Vec<u8>,
378 ) -> MetaResult<NotificationVersion> {
379 let inner = self.inner.write().await;
380 let owner_id = pb_secret.owner as _;
381 let txn = inner.db.begin().await?;
382 ensure_user_id(owner_id, &txn).await?;
383 ensure_object_id(ObjectType::Database, pb_secret.database_id as _, &txn).await?;
384 ensure_object_id(ObjectType::Schema, pb_secret.schema_id as _, &txn).await?;
385 check_secret_name_duplicate(&pb_secret, &txn).await?;
386
387 let secret_obj = Self::create_object(
388 &txn,
389 ObjectType::Secret,
390 owner_id,
391 Some(pb_secret.database_id.into()),
392 Some(pb_secret.schema_id.into()),
393 )
394 .await?;
395 pb_secret.id = secret_obj.oid as _;
396 let secret: secret::ActiveModel = pb_secret.clone().into();
397 Secret::insert(secret).exec(&txn).await?;
398
399 let updated_user_info =
400 grant_default_privileges_automatically(&txn, secret_obj.oid).await?;
401
402 txn.commit().await?;
403
404 let mut secret_plain = pb_secret;
406 secret_plain.value.clone_from(&secret_plain_payload);
407
408 LocalSecretManager::global().add_secret(secret_plain.id, secret_plain_payload);
409 self.env
410 .notification_manager()
411 .notify_compute_without_version(Operation::Add, Info::Secret(secret_plain.clone()));
412
413 let mut version = self
414 .notify_frontend(
415 NotificationOperation::Add,
416 NotificationInfo::Secret(secret_plain),
417 )
418 .await;
419
420 if !updated_user_info.is_empty() {
422 version = self.notify_users_update(updated_user_info).await;
423 }
424
425 Ok(version)
426 }
427
428 pub async fn create_view(
429 &self,
430 mut pb_view: PbView,
431 dependencies: HashSet<ObjectId>,
432 ) -> MetaResult<NotificationVersion> {
433 let inner = self.inner.write().await;
434 let owner_id = pb_view.owner as _;
435 let txn = inner.db.begin().await?;
436 ensure_user_id(owner_id, &txn).await?;
437 ensure_object_id(ObjectType::Database, pb_view.database_id as _, &txn).await?;
438 ensure_object_id(ObjectType::Schema, pb_view.schema_id as _, &txn).await?;
439 check_relation_name_duplicate(
440 &pb_view.name,
441 pb_view.database_id.into(),
442 pb_view.schema_id.into(),
443 &txn,
444 )
445 .await?;
446
447 let view_obj = Self::create_object(
448 &txn,
449 ObjectType::View,
450 owner_id,
451 Some(pb_view.database_id.into()),
452 Some(pb_view.schema_id.into()),
453 )
454 .await?;
455 pb_view.id = view_obj.oid as _;
456 let view: view::ActiveModel = pb_view.clone().into();
457 View::insert(view).exec(&txn).await?;
458
459 for obj_id in dependencies {
460 ObjectDependency::insert(object_dependency::ActiveModel {
461 oid: Set(obj_id),
462 used_by: Set(view_obj.oid),
463 ..Default::default()
464 })
465 .exec(&txn)
466 .await?;
467 }
468 let updated_user_info = grant_default_privileges_automatically(&txn, view_obj.oid).await?;
469
470 txn.commit().await?;
471
472 let mut version = self
473 .notify_frontend_relation_info(NotificationOperation::Add, PbObjectInfo::View(pb_view))
474 .await;
475
476 if !updated_user_info.is_empty() {
478 version = self.notify_users_update(updated_user_info).await;
479 }
480
481 Ok(version)
482 }
483
484 pub async fn validate_cross_db_snapshot_backfill(
485 &self,
486 cross_db_snapshot_backfill_info: &SnapshotBackfillInfo,
487 ) -> MetaResult<()> {
488 if cross_db_snapshot_backfill_info
489 .upstream_mv_table_id_to_backfill_epoch
490 .is_empty()
491 {
492 return Ok(());
493 }
494
495 let inner = self.inner.read().await;
496 let table_ids = cross_db_snapshot_backfill_info
497 .upstream_mv_table_id_to_backfill_epoch
498 .keys()
499 .map(|t| t.as_raw_id() as ObjectId)
500 .collect_vec();
501 let cnt = Subscription::find()
502 .select_only()
503 .column(subscription::Column::DependentTableId)
504 .distinct()
505 .filter(subscription::Column::DependentTableId.is_in(table_ids))
506 .count(&inner.db)
507 .await? as usize;
508
509 if cnt
510 < cross_db_snapshot_backfill_info
511 .upstream_mv_table_id_to_backfill_epoch
512 .keys()
513 .count()
514 {
515 return Err(MetaError::permission_denied(
516 "Some upstream tables are not subscribed".to_owned(),
517 ));
518 }
519
520 Ok(())
521 }
522}