1use risingwave_common::system_param::{OverrideValidate, Validate};
16
17use super::*;
18use crate::barrier::SnapshotBackfillInfo;
19
20impl CatalogController {
21 pub(crate) async fn create_object(
22 txn: &DatabaseTransaction,
23 obj_type: ObjectType,
24 owner_id: UserId,
25 database_id: Option<DatabaseId>,
26 schema_id: Option<SchemaId>,
27 ) -> MetaResult<object::Model> {
28 let active_db = object::ActiveModel {
29 oid: Default::default(),
30 obj_type: Set(obj_type),
31 owner_id: Set(owner_id),
32 schema_id: Set(schema_id),
33 database_id: Set(database_id),
34 initialized_at: Default::default(),
35 created_at: Default::default(),
36 initialized_at_cluster_version: Set(Some(current_cluster_version())),
37 created_at_cluster_version: Set(Some(current_cluster_version())),
38 };
39 Ok(active_db.insert(txn).await?)
40 }
41
42 pub async fn create_database(
43 &self,
44 db: PbDatabase,
45 ) -> MetaResult<(NotificationVersion, risingwave_meta_model::database::Model)> {
46 if let Some(ref interval) = db.barrier_interval_ms {
48 OverrideValidate::barrier_interval_ms(interval).map_err(|e| anyhow::anyhow!(e))?;
49 }
50 if let Some(ref frequency) = db.checkpoint_frequency {
51 OverrideValidate::checkpoint_frequency(frequency).map_err(|e| anyhow::anyhow!(e))?;
52 }
53
54 let inner = self.inner.write().await;
55 let owner_id = db.owner as _;
56 let txn = inner.db.begin().await?;
57 ensure_user_id(owner_id, &txn).await?;
58 check_database_name_duplicate(&db.name, &txn).await?;
59
60 let db_obj = Self::create_object(&txn, ObjectType::Database, owner_id, None, None).await?;
61 let mut db: database::ActiveModel = db.into();
62 db.database_id = Set(db_obj.oid);
63 let db = db.insert(&txn).await?;
64
65 let mut schemas = vec![];
66 for schema_name in iter::once(DEFAULT_SCHEMA_NAME).chain(SYSTEM_SCHEMAS) {
67 let schema_obj =
68 Self::create_object(&txn, ObjectType::Schema, owner_id, Some(db_obj.oid), None)
69 .await?;
70 let schema = schema::ActiveModel {
71 schema_id: Set(schema_obj.oid),
72 name: Set(schema_name.into()),
73 };
74 let schema = schema.insert(&txn).await?;
75 schemas.push(ObjectModel(schema, schema_obj).into());
76 }
77 txn.commit().await?;
78
79 let mut version = self
80 .notify_frontend(
81 NotificationOperation::Add,
82 NotificationInfo::Database(ObjectModel(db.clone(), db_obj).into()),
83 )
84 .await;
85 for schema in schemas {
86 version = self
87 .notify_frontend(NotificationOperation::Add, NotificationInfo::Schema(schema))
88 .await;
89 }
90
91 Ok((version, db))
92 }
93
94 pub async fn create_schema(&self, schema: PbSchema) -> MetaResult<NotificationVersion> {
95 let inner = self.inner.write().await;
96 let owner_id = schema.owner as _;
97 let txn = inner.db.begin().await?;
98 ensure_user_id(owner_id, &txn).await?;
99 ensure_object_id(ObjectType::Database, schema.database_id as _, &txn).await?;
100 check_schema_name_duplicate(&schema.name, schema.database_id as _, &txn).await?;
101
102 let schema_obj = Self::create_object(
103 &txn,
104 ObjectType::Schema,
105 owner_id,
106 Some(schema.database_id as _),
107 None,
108 )
109 .await?;
110 let mut schema: schema::ActiveModel = schema.into();
111 schema.schema_id = Set(schema_obj.oid);
112 let schema = schema.insert(&txn).await?;
113
114 let updated_user_info =
115 grant_default_privileges_automatically(&txn, schema_obj.oid).await?;
116
117 txn.commit().await?;
118
119 let mut version = self
120 .notify_frontend(
121 NotificationOperation::Add,
122 NotificationInfo::Schema(ObjectModel(schema, schema_obj).into()),
123 )
124 .await;
125
126 if !updated_user_info.is_empty() {
128 version = self.notify_users_update(updated_user_info).await;
129 }
130
131 Ok(version)
132 }
133
134 pub async fn create_subscription_catalog(
135 &self,
136 pb_subscription: &mut PbSubscription,
137 ) -> MetaResult<()> {
138 let inner = self.inner.write().await;
139 let txn = inner.db.begin().await?;
140
141 ensure_user_id(pb_subscription.owner as _, &txn).await?;
142 ensure_object_id(ObjectType::Database, pb_subscription.database_id as _, &txn).await?;
143 ensure_object_id(ObjectType::Schema, pb_subscription.schema_id as _, &txn).await?;
144 check_subscription_name_duplicate(pb_subscription, &txn).await?;
145
146 let obj = Self::create_object(
147 &txn,
148 ObjectType::Subscription,
149 pb_subscription.owner as _,
150 Some(pb_subscription.database_id as _),
151 Some(pb_subscription.schema_id as _),
152 )
153 .await?;
154 pb_subscription.id = obj.oid as _;
155 let subscription: subscription::ActiveModel = pb_subscription.clone().into();
156 Subscription::insert(subscription).exec(&txn).await?;
157
158 ObjectDependency::insert(object_dependency::ActiveModel {
160 oid: Set(pb_subscription.dependent_table_id as _),
161 used_by: Set(pb_subscription.id as _),
162 ..Default::default()
163 })
164 .exec(&txn)
165 .await?;
166 txn.commit().await?;
167 Ok(())
168 }
169
170 pub async fn create_source(
171 &self,
172 mut pb_source: PbSource,
173 ) -> MetaResult<(SourceId, NotificationVersion)> {
174 let inner = self.inner.write().await;
175 let owner_id = pb_source.owner as _;
176 let txn = inner.db.begin().await?;
177 ensure_user_id(owner_id, &txn).await?;
178 ensure_object_id(ObjectType::Database, pb_source.database_id as _, &txn).await?;
179 ensure_object_id(ObjectType::Schema, pb_source.schema_id as _, &txn).await?;
180 check_relation_name_duplicate(
181 &pb_source.name,
182 pb_source.database_id as _,
183 pb_source.schema_id as _,
184 &txn,
185 )
186 .await?;
187
188 let secret_ids = get_referred_secret_ids_from_source(&pb_source)?;
190 let connection_ids = get_referred_connection_ids_from_source(&pb_source);
191
192 let source_obj = Self::create_object(
193 &txn,
194 ObjectType::Source,
195 owner_id,
196 Some(pb_source.database_id as _),
197 Some(pb_source.schema_id as _),
198 )
199 .await?;
200 let source_id = source_obj.oid;
201 pb_source.id = source_id as _;
202 let source: source::ActiveModel = pb_source.clone().into();
203 Source::insert(source).exec(&txn).await?;
204
205 let dep_relation_ids = secret_ids.iter().chain(connection_ids.iter());
207 if !secret_ids.is_empty() || !connection_ids.is_empty() {
208 ObjectDependency::insert_many(dep_relation_ids.map(|id| {
209 object_dependency::ActiveModel {
210 oid: Set(*id as _),
211 used_by: Set(source_id as _),
212 ..Default::default()
213 }
214 }))
215 .exec(&txn)
216 .await?;
217 }
218
219 let updated_user_info = grant_default_privileges_automatically(&txn, source_id).await?;
220
221 txn.commit().await?;
222
223 let mut version = self
224 .notify_frontend_relation_info(
225 NotificationOperation::Add,
226 PbObjectInfo::Source(pb_source),
227 )
228 .await;
229
230 if !updated_user_info.is_empty() {
232 version = self.notify_users_update(updated_user_info).await;
233 }
234
235 Ok((source_id, version))
236 }
237
238 pub async fn create_function(
239 &self,
240 mut pb_function: PbFunction,
241 ) -> MetaResult<NotificationVersion> {
242 let inner = self.inner.write().await;
243 let owner_id = pb_function.owner as _;
244 let txn = inner.db.begin().await?;
245 ensure_user_id(owner_id, &txn).await?;
246 ensure_object_id(ObjectType::Database, pb_function.database_id as _, &txn).await?;
247 ensure_object_id(ObjectType::Schema, pb_function.schema_id as _, &txn).await?;
248 check_function_signature_duplicate(&pb_function, &txn).await?;
249
250 let function_obj = Self::create_object(
251 &txn,
252 ObjectType::Function,
253 owner_id,
254 Some(pb_function.database_id as _),
255 Some(pb_function.schema_id as _),
256 )
257 .await?;
258 pb_function.id = function_obj.oid as _;
259 let function: function::ActiveModel = pb_function.clone().into();
260 Function::insert(function).exec(&txn).await?;
261
262 let updated_user_info =
263 grant_default_privileges_automatically(&txn, function_obj.oid).await?;
264
265 txn.commit().await?;
266
267 let mut version = self
268 .notify_frontend(
269 NotificationOperation::Add,
270 NotificationInfo::Function(pb_function),
271 )
272 .await;
273
274 if !updated_user_info.is_empty() {
276 version = self.notify_users_update(updated_user_info).await;
277 }
278
279 Ok(version)
280 }
281
282 pub async fn create_connection(
283 &self,
284 mut pb_connection: PbConnection,
285 ) -> MetaResult<NotificationVersion> {
286 let inner = self.inner.write().await;
287 let owner_id = pb_connection.owner as _;
288 let txn = inner.db.begin().await?;
289 ensure_user_id(owner_id, &txn).await?;
290 ensure_object_id(ObjectType::Database, pb_connection.database_id as _, &txn).await?;
291 ensure_object_id(ObjectType::Schema, pb_connection.schema_id as _, &txn).await?;
292 check_connection_name_duplicate(&pb_connection, &txn).await?;
293
294 let mut dep_secrets = HashSet::new();
295 if let Some(ConnectionInfo::ConnectionParams(params)) = &pb_connection.info {
296 dep_secrets.extend(
297 params
298 .secret_refs
299 .values()
300 .map(|secret_ref| secret_ref.secret_id),
301 );
302 }
303
304 let conn_obj = Self::create_object(
305 &txn,
306 ObjectType::Connection,
307 owner_id,
308 Some(pb_connection.database_id as _),
309 Some(pb_connection.schema_id as _),
310 )
311 .await?;
312 pb_connection.id = conn_obj.oid as _;
313 let connection: connection::ActiveModel = pb_connection.clone().into();
314 Connection::insert(connection).exec(&txn).await?;
315
316 for secret_id in dep_secrets {
317 ObjectDependency::insert(object_dependency::ActiveModel {
318 oid: Set(secret_id as _),
319 used_by: Set(conn_obj.oid),
320 ..Default::default()
321 })
322 .exec(&txn)
323 .await?;
324 }
325
326 let updated_user_info = grant_default_privileges_automatically(&txn, conn_obj.oid).await?;
327
328 txn.commit().await?;
329
330 {
331 report_event(
333 PbTelemetryEventStage::Unspecified,
334 "connection_create",
335 pb_connection.get_id() as _,
336 {
337 pb_connection.info.as_ref().and_then(|info| match info {
338 ConnectionInfo::ConnectionParams(params) => {
339 Some(params.connection_type().as_str_name().to_owned())
340 }
341 _ => None,
342 })
343 },
344 None,
345 None,
346 );
347 }
348
349 let mut version = self
350 .notify_frontend(
351 NotificationOperation::Add,
352 NotificationInfo::Connection(pb_connection),
353 )
354 .await;
355
356 if !updated_user_info.is_empty() {
358 version = self.notify_users_update(updated_user_info).await;
359 }
360
361 Ok(version)
362 }
363
364 pub async fn create_secret(
365 &self,
366 mut pb_secret: PbSecret,
367 secret_plain_payload: Vec<u8>,
368 ) -> MetaResult<NotificationVersion> {
369 let inner = self.inner.write().await;
370 let owner_id = pb_secret.owner as _;
371 let txn = inner.db.begin().await?;
372 ensure_user_id(owner_id, &txn).await?;
373 ensure_object_id(ObjectType::Database, pb_secret.database_id as _, &txn).await?;
374 ensure_object_id(ObjectType::Schema, pb_secret.schema_id as _, &txn).await?;
375 check_secret_name_duplicate(&pb_secret, &txn).await?;
376
377 let secret_obj = Self::create_object(
378 &txn,
379 ObjectType::Secret,
380 owner_id,
381 Some(pb_secret.database_id as _),
382 Some(pb_secret.schema_id as _),
383 )
384 .await?;
385 pb_secret.id = secret_obj.oid as _;
386 let secret: secret::ActiveModel = pb_secret.clone().into();
387 Secret::insert(secret).exec(&txn).await?;
388
389 let updated_user_info =
390 grant_default_privileges_automatically(&txn, secret_obj.oid).await?;
391
392 txn.commit().await?;
393
394 let mut secret_plain = pb_secret;
396 secret_plain.value.clone_from(&secret_plain_payload);
397
398 LocalSecretManager::global().add_secret(secret_plain.id, secret_plain_payload);
399 self.env
400 .notification_manager()
401 .notify_compute_without_version(Operation::Add, Info::Secret(secret_plain.clone()));
402
403 let mut version = self
404 .notify_frontend(
405 NotificationOperation::Add,
406 NotificationInfo::Secret(secret_plain),
407 )
408 .await;
409
410 if !updated_user_info.is_empty() {
412 version = self.notify_users_update(updated_user_info).await;
413 }
414
415 Ok(version)
416 }
417
418 pub async fn create_view(
419 &self,
420 mut pb_view: PbView,
421 dependencies: HashSet<ObjectId>,
422 ) -> MetaResult<NotificationVersion> {
423 let inner = self.inner.write().await;
424 let owner_id = pb_view.owner as _;
425 let txn = inner.db.begin().await?;
426 ensure_user_id(owner_id, &txn).await?;
427 ensure_object_id(ObjectType::Database, pb_view.database_id as _, &txn).await?;
428 ensure_object_id(ObjectType::Schema, pb_view.schema_id as _, &txn).await?;
429 check_relation_name_duplicate(
430 &pb_view.name,
431 pb_view.database_id as _,
432 pb_view.schema_id as _,
433 &txn,
434 )
435 .await?;
436
437 let view_obj = Self::create_object(
438 &txn,
439 ObjectType::View,
440 owner_id,
441 Some(pb_view.database_id as _),
442 Some(pb_view.schema_id as _),
443 )
444 .await?;
445 pb_view.id = view_obj.oid as _;
446 let view: view::ActiveModel = pb_view.clone().into();
447 View::insert(view).exec(&txn).await?;
448
449 for obj_id in dependencies {
450 ObjectDependency::insert(object_dependency::ActiveModel {
451 oid: Set(obj_id),
452 used_by: Set(view_obj.oid),
453 ..Default::default()
454 })
455 .exec(&txn)
456 .await?;
457 }
458 let updated_user_info = grant_default_privileges_automatically(&txn, view_obj.oid).await?;
459
460 txn.commit().await?;
461
462 let mut version = self
463 .notify_frontend_relation_info(NotificationOperation::Add, PbObjectInfo::View(pb_view))
464 .await;
465
466 if !updated_user_info.is_empty() {
468 version = self.notify_users_update(updated_user_info).await;
469 }
470
471 Ok(version)
472 }
473
474 pub async fn validate_cross_db_snapshot_backfill(
475 &self,
476 cross_db_snapshot_backfill_info: &SnapshotBackfillInfo,
477 ) -> MetaResult<()> {
478 if cross_db_snapshot_backfill_info
479 .upstream_mv_table_id_to_backfill_epoch
480 .is_empty()
481 {
482 return Ok(());
483 }
484
485 let inner = self.inner.read().await;
486 let table_ids = cross_db_snapshot_backfill_info
487 .upstream_mv_table_id_to_backfill_epoch
488 .keys()
489 .map(|t| t.table_id as ObjectId)
490 .collect_vec();
491 let cnt = Subscription::find()
492 .select_only()
493 .column(subscription::Column::DependentTableId)
494 .distinct()
495 .filter(subscription::Column::DependentTableId.is_in(table_ids))
496 .count(&inner.db)
497 .await? as usize;
498
499 if cnt
500 < cross_db_snapshot_backfill_info
501 .upstream_mv_table_id_to_backfill_epoch
502 .keys()
503 .count()
504 {
505 return Err(MetaError::permission_denied(
506 "Some upstream tables are not subscribed".to_owned(),
507 ));
508 }
509
510 Ok(())
511 }
512}