risingwave_meta/controller/catalog/
create_op.rs1use super::*;
16use crate::barrier::SnapshotBackfillInfo;
17
18impl CatalogController {
19 pub(crate) async fn create_object(
20 txn: &DatabaseTransaction,
21 obj_type: ObjectType,
22 owner_id: UserId,
23 database_id: Option<DatabaseId>,
24 schema_id: Option<SchemaId>,
25 ) -> MetaResult<object::Model> {
26 let active_db = object::ActiveModel {
27 oid: Default::default(),
28 obj_type: Set(obj_type),
29 owner_id: Set(owner_id),
30 schema_id: Set(schema_id),
31 database_id: Set(database_id),
32 initialized_at: Default::default(),
33 created_at: Default::default(),
34 initialized_at_cluster_version: Set(Some(current_cluster_version())),
35 created_at_cluster_version: Set(Some(current_cluster_version())),
36 };
37 Ok(active_db.insert(txn).await?)
38 }
39
40 pub async fn create_database(&self, db: PbDatabase) -> MetaResult<NotificationVersion> {
41 let inner = self.inner.write().await;
42 let owner_id = db.owner as _;
43 let txn = inner.db.begin().await?;
44 ensure_user_id(owner_id, &txn).await?;
45 check_database_name_duplicate(&db.name, &txn).await?;
46
47 let db_obj = Self::create_object(&txn, ObjectType::Database, owner_id, None, None).await?;
48 let mut db: database::ActiveModel = db.into();
49 db.database_id = Set(db_obj.oid);
50 let db = db.insert(&txn).await?;
51
52 let mut schemas = vec![];
53 for schema_name in iter::once(DEFAULT_SCHEMA_NAME).chain(SYSTEM_SCHEMAS) {
54 let schema_obj =
55 Self::create_object(&txn, ObjectType::Schema, owner_id, Some(db_obj.oid), None)
56 .await?;
57 let schema = schema::ActiveModel {
58 schema_id: Set(schema_obj.oid),
59 name: Set(schema_name.into()),
60 };
61 let schema = schema.insert(&txn).await?;
62 schemas.push(ObjectModel(schema, schema_obj).into());
63 }
64 txn.commit().await?;
65
66 let mut version = self
67 .notify_frontend(
68 NotificationOperation::Add,
69 NotificationInfo::Database(ObjectModel(db, db_obj).into()),
70 )
71 .await;
72 for schema in schemas {
73 version = self
74 .notify_frontend(NotificationOperation::Add, NotificationInfo::Schema(schema))
75 .await;
76 }
77
78 Ok(version)
79 }
80
81 pub async fn create_schema(&self, schema: PbSchema) -> MetaResult<NotificationVersion> {
82 let inner = self.inner.write().await;
83 let owner_id = schema.owner as _;
84 let txn = inner.db.begin().await?;
85 ensure_user_id(owner_id, &txn).await?;
86 ensure_object_id(ObjectType::Database, schema.database_id as _, &txn).await?;
87 check_schema_name_duplicate(&schema.name, schema.database_id as _, &txn).await?;
88
89 let schema_obj = Self::create_object(
90 &txn,
91 ObjectType::Schema,
92 owner_id,
93 Some(schema.database_id as _),
94 None,
95 )
96 .await?;
97 let mut schema: schema::ActiveModel = schema.into();
98 schema.schema_id = Set(schema_obj.oid);
99 let schema = schema.insert(&txn).await?;
100 txn.commit().await?;
101
102 let version = self
103 .notify_frontend(
104 NotificationOperation::Add,
105 NotificationInfo::Schema(ObjectModel(schema, schema_obj).into()),
106 )
107 .await;
108 Ok(version)
109 }
110
111 pub async fn create_subscription_catalog(
112 &self,
113 pb_subscription: &mut PbSubscription,
114 ) -> MetaResult<()> {
115 let inner = self.inner.write().await;
116 let txn = inner.db.begin().await?;
117
118 ensure_user_id(pb_subscription.owner as _, &txn).await?;
119 ensure_object_id(ObjectType::Database, pb_subscription.database_id as _, &txn).await?;
120 ensure_object_id(ObjectType::Schema, pb_subscription.schema_id as _, &txn).await?;
121 check_subscription_name_duplicate(pb_subscription, &txn).await?;
122
123 let obj = Self::create_object(
124 &txn,
125 ObjectType::Subscription,
126 pb_subscription.owner as _,
127 Some(pb_subscription.database_id as _),
128 Some(pb_subscription.schema_id as _),
129 )
130 .await?;
131 pb_subscription.id = obj.oid as _;
132 let subscription: subscription::ActiveModel = pb_subscription.clone().into();
133 Subscription::insert(subscription).exec(&txn).await?;
134
135 ObjectDependency::insert(object_dependency::ActiveModel {
137 oid: Set(pb_subscription.dependent_table_id as _),
138 used_by: Set(pb_subscription.id as _),
139 ..Default::default()
140 })
141 .exec(&txn)
142 .await?;
143 txn.commit().await?;
144 Ok(())
145 }
146
147 pub async fn create_source(
148 &self,
149 mut pb_source: PbSource,
150 ) -> MetaResult<(SourceId, NotificationVersion)> {
151 let inner = self.inner.write().await;
152 let owner_id = pb_source.owner as _;
153 let txn = inner.db.begin().await?;
154 ensure_user_id(owner_id, &txn).await?;
155 ensure_object_id(ObjectType::Database, pb_source.database_id as _, &txn).await?;
156 ensure_object_id(ObjectType::Schema, pb_source.schema_id as _, &txn).await?;
157 check_relation_name_duplicate(
158 &pb_source.name,
159 pb_source.database_id as _,
160 pb_source.schema_id as _,
161 &txn,
162 )
163 .await?;
164
165 let secret_ids = get_referred_secret_ids_from_source(&pb_source)?;
167 let connection_ids = get_referred_connection_ids_from_source(&pb_source);
168
169 let source_obj = Self::create_object(
170 &txn,
171 ObjectType::Source,
172 owner_id,
173 Some(pb_source.database_id as _),
174 Some(pb_source.schema_id as _),
175 )
176 .await?;
177 let source_id = source_obj.oid;
178 pb_source.id = source_id as _;
179 let source: source::ActiveModel = pb_source.clone().into();
180 Source::insert(source).exec(&txn).await?;
181
182 let dep_relation_ids = secret_ids.iter().chain(connection_ids.iter());
184 if !secret_ids.is_empty() || !connection_ids.is_empty() {
185 ObjectDependency::insert_many(dep_relation_ids.map(|id| {
186 object_dependency::ActiveModel {
187 oid: Set(*id as _),
188 used_by: Set(source_id as _),
189 ..Default::default()
190 }
191 }))
192 .exec(&txn)
193 .await?;
194 }
195
196 txn.commit().await?;
197
198 let version = self
199 .notify_frontend_relation_info(
200 NotificationOperation::Add,
201 PbObjectInfo::Source(pb_source),
202 )
203 .await;
204 Ok((source_id, version))
205 }
206
207 pub async fn create_function(
208 &self,
209 mut pb_function: PbFunction,
210 ) -> MetaResult<NotificationVersion> {
211 let inner = self.inner.write().await;
212 let owner_id = pb_function.owner as _;
213 let txn = inner.db.begin().await?;
214 ensure_user_id(owner_id, &txn).await?;
215 ensure_object_id(ObjectType::Database, pb_function.database_id as _, &txn).await?;
216 ensure_object_id(ObjectType::Schema, pb_function.schema_id as _, &txn).await?;
217 check_function_signature_duplicate(&pb_function, &txn).await?;
218
219 let function_obj = Self::create_object(
220 &txn,
221 ObjectType::Function,
222 owner_id,
223 Some(pb_function.database_id as _),
224 Some(pb_function.schema_id as _),
225 )
226 .await?;
227 pb_function.id = function_obj.oid as _;
228 let function: function::ActiveModel = pb_function.clone().into();
229 Function::insert(function).exec(&txn).await?;
230 txn.commit().await?;
231
232 let version = self
233 .notify_frontend(
234 NotificationOperation::Add,
235 NotificationInfo::Function(pb_function),
236 )
237 .await;
238 Ok(version)
239 }
240
241 pub async fn create_connection(
242 &self,
243 mut pb_connection: PbConnection,
244 ) -> MetaResult<NotificationVersion> {
245 let inner = self.inner.write().await;
246 let owner_id = pb_connection.owner as _;
247 let txn = inner.db.begin().await?;
248 ensure_user_id(owner_id, &txn).await?;
249 ensure_object_id(ObjectType::Database, pb_connection.database_id as _, &txn).await?;
250 ensure_object_id(ObjectType::Schema, pb_connection.schema_id as _, &txn).await?;
251 check_connection_name_duplicate(&pb_connection, &txn).await?;
252
253 let mut dep_secrets = HashSet::new();
254 if let Some(ConnectionInfo::ConnectionParams(params)) = &pb_connection.info {
255 dep_secrets.extend(
256 params
257 .secret_refs
258 .values()
259 .map(|secret_ref| secret_ref.secret_id),
260 );
261 }
262
263 let conn_obj = Self::create_object(
264 &txn,
265 ObjectType::Connection,
266 owner_id,
267 Some(pb_connection.database_id as _),
268 Some(pb_connection.schema_id as _),
269 )
270 .await?;
271 pb_connection.id = conn_obj.oid as _;
272 let connection: connection::ActiveModel = pb_connection.clone().into();
273 Connection::insert(connection).exec(&txn).await?;
274
275 for secret_id in dep_secrets {
276 ObjectDependency::insert(object_dependency::ActiveModel {
277 oid: Set(secret_id as _),
278 used_by: Set(conn_obj.oid),
279 ..Default::default()
280 })
281 .exec(&txn)
282 .await?;
283 }
284
285 txn.commit().await?;
286
287 {
288 report_event(
290 PbTelemetryEventStage::Unspecified,
291 "connection_create",
292 pb_connection.get_id() as _,
293 {
294 pb_connection.info.as_ref().and_then(|info| match info {
295 ConnectionInfo::ConnectionParams(params) => {
296 Some(params.connection_type().as_str_name().to_owned())
297 }
298 _ => None,
299 })
300 },
301 None,
302 None,
303 );
304 }
305
306 let version = self
307 .notify_frontend(
308 NotificationOperation::Add,
309 NotificationInfo::Connection(pb_connection),
310 )
311 .await;
312 Ok(version)
313 }
314
315 pub async fn create_secret(
316 &self,
317 mut pb_secret: PbSecret,
318 secret_plain_payload: Vec<u8>,
319 ) -> MetaResult<NotificationVersion> {
320 let inner = self.inner.write().await;
321 let owner_id = pb_secret.owner as _;
322 let txn = inner.db.begin().await?;
323 ensure_user_id(owner_id, &txn).await?;
324 ensure_object_id(ObjectType::Database, pb_secret.database_id as _, &txn).await?;
325 ensure_object_id(ObjectType::Schema, pb_secret.schema_id as _, &txn).await?;
326 check_secret_name_duplicate(&pb_secret, &txn).await?;
327
328 let secret_obj = Self::create_object(
329 &txn,
330 ObjectType::Secret,
331 owner_id,
332 Some(pb_secret.database_id as _),
333 Some(pb_secret.schema_id as _),
334 )
335 .await?;
336 pb_secret.id = secret_obj.oid as _;
337 let secret: secret::ActiveModel = pb_secret.clone().into();
338 Secret::insert(secret).exec(&txn).await?;
339
340 txn.commit().await?;
341
342 let mut secret_plain = pb_secret;
344 secret_plain.value.clone_from(&secret_plain_payload);
345
346 LocalSecretManager::global().add_secret(secret_plain.id, secret_plain_payload);
347 self.env
348 .notification_manager()
349 .notify_compute_without_version(Operation::Add, Info::Secret(secret_plain.clone()));
350
351 let version = self
352 .notify_frontend(
353 NotificationOperation::Add,
354 NotificationInfo::Secret(secret_plain),
355 )
356 .await;
357
358 Ok(version)
359 }
360
361 pub async fn create_view(&self, mut pb_view: PbView) -> MetaResult<NotificationVersion> {
362 let inner = self.inner.write().await;
363 let owner_id = pb_view.owner as _;
364 let txn = inner.db.begin().await?;
365 ensure_user_id(owner_id, &txn).await?;
366 ensure_object_id(ObjectType::Database, pb_view.database_id as _, &txn).await?;
367 ensure_object_id(ObjectType::Schema, pb_view.schema_id as _, &txn).await?;
368 check_relation_name_duplicate(
369 &pb_view.name,
370 pb_view.database_id as _,
371 pb_view.schema_id as _,
372 &txn,
373 )
374 .await?;
375
376 let view_obj = Self::create_object(
377 &txn,
378 ObjectType::View,
379 owner_id,
380 Some(pb_view.database_id as _),
381 Some(pb_view.schema_id as _),
382 )
383 .await?;
384 pb_view.id = view_obj.oid as _;
385 let view: view::ActiveModel = pb_view.clone().into();
386 View::insert(view).exec(&txn).await?;
387
388 for obj_id in &pb_view.dependent_relations {
391 ObjectDependency::insert(object_dependency::ActiveModel {
392 oid: Set(*obj_id as _),
393 used_by: Set(view_obj.oid),
394 ..Default::default()
395 })
396 .exec(&txn)
397 .await?;
398 }
399
400 txn.commit().await?;
401
402 let version = self
403 .notify_frontend_relation_info(NotificationOperation::Add, PbObjectInfo::View(pb_view))
404 .await;
405 Ok(version)
406 }
407
408 pub async fn validate_cross_db_snapshot_backfill(
409 &self,
410 cross_db_snapshot_backfill_info: &SnapshotBackfillInfo,
411 ) -> MetaResult<()> {
412 if cross_db_snapshot_backfill_info
413 .upstream_mv_table_id_to_backfill_epoch
414 .is_empty()
415 {
416 return Ok(());
417 }
418
419 let inner = self.inner.read().await;
420 let table_ids = cross_db_snapshot_backfill_info
421 .upstream_mv_table_id_to_backfill_epoch
422 .keys()
423 .map(|t| t.table_id as ObjectId)
424 .collect_vec();
425 let cnt = Subscription::find()
426 .select_only()
427 .column(subscription::Column::DependentTableId)
428 .distinct()
429 .filter(subscription::Column::DependentTableId.is_in(table_ids))
430 .count(&inner.db)
431 .await? as usize;
432
433 if cnt
434 < cross_db_snapshot_backfill_info
435 .upstream_mv_table_id_to_backfill_epoch
436 .keys()
437 .count()
438 {
439 return Err(MetaError::permission_denied(
440 "Some upstream tables are not subscribed".to_owned(),
441 ));
442 }
443
444 Ok(())
445 }
446}