1use sea_orm::DatabaseTransaction;
16
17use super::*;
18
19impl CatalogController {
20 async fn alter_database_name(
21 &self,
22 database_id: DatabaseId,
23 name: &str,
24 ) -> MetaResult<NotificationVersion> {
25 let inner = self.inner.write().await;
26 let txn = inner.db.begin().await?;
27 check_database_name_duplicate(name, &txn).await?;
28
29 let active_model = database::ActiveModel {
30 database_id: Set(database_id),
31 name: Set(name.to_owned()),
32 ..Default::default()
33 };
34 let database = active_model.update(&txn).await?;
35
36 let obj = Object::find_by_id(database_id)
37 .one(&txn)
38 .await?
39 .ok_or_else(|| MetaError::catalog_id_not_found("database", database_id))?;
40
41 txn.commit().await?;
42
43 let version = self
44 .notify_frontend(
45 NotificationOperation::Update,
46 NotificationInfo::Database(ObjectModel(database, obj).into()),
47 )
48 .await;
49 Ok(version)
50 }
51
52 async fn alter_schema_name(
53 &self,
54 schema_id: SchemaId,
55 name: &str,
56 ) -> MetaResult<NotificationVersion> {
57 let inner = self.inner.write().await;
58 let txn = inner.db.begin().await?;
59
60 let obj = Object::find_by_id(schema_id)
61 .one(&txn)
62 .await?
63 .ok_or_else(|| MetaError::catalog_id_not_found("schema", schema_id))?;
64 check_schema_name_duplicate(name, obj.database_id.unwrap(), &txn).await?;
65
66 let active_model = schema::ActiveModel {
67 schema_id: Set(schema_id),
68 name: Set(name.to_owned()),
69 };
70 let schema = active_model.update(&txn).await?;
71
72 txn.commit().await?;
73
74 let version = self
75 .notify_frontend(
76 NotificationOperation::Update,
77 NotificationInfo::Schema(ObjectModel(schema, obj).into()),
78 )
79 .await;
80 Ok(version)
81 }
82
83 pub async fn alter_name(
84 &self,
85 object_type: ObjectType,
86 object_id: ObjectId,
87 object_name: &str,
88 ) -> MetaResult<NotificationVersion> {
89 if object_type == ObjectType::Database {
90 return self.alter_database_name(object_id as _, object_name).await;
91 } else if object_type == ObjectType::Schema {
92 return self.alter_schema_name(object_id as _, object_name).await;
93 }
94
95 let inner = self.inner.write().await;
96 let txn = inner.db.begin().await?;
97 let obj: PartialObject = Object::find_by_id(object_id)
98 .into_partial_model()
99 .one(&txn)
100 .await?
101 .ok_or_else(|| MetaError::catalog_id_not_found(object_type.as_str(), object_id))?;
102 assert_eq!(obj.obj_type, object_type);
103 check_relation_name_duplicate(
104 object_name,
105 obj.database_id.unwrap(),
106 obj.schema_id.unwrap(),
107 &txn,
108 )
109 .await?;
110
111 let (mut to_update_relations, old_name) =
113 rename_relation(&txn, object_type, object_id, object_name).await?;
114 to_update_relations.extend(
116 rename_relation_refer(&txn, object_type, object_id, object_name, &old_name).await?,
117 );
118
119 txn.commit().await?;
120
121 let version = self
122 .notify_frontend(
123 NotificationOperation::Update,
124 NotificationInfo::ObjectGroup(PbObjectGroup {
125 objects: to_update_relations,
126 }),
127 )
128 .await;
129
130 Ok(version)
131 }
132
133 pub async fn alter_swap_rename(
134 &self,
135 object_type: ObjectType,
136 object_id: ObjectId,
137 dst_object_id: ObjectId,
138 ) -> MetaResult<NotificationVersion> {
139 let inner = self.inner.write().await;
140 let txn = inner.db.begin().await?;
141 let dst_name: String = match object_type {
142 ObjectType::Table => Table::find_by_id(dst_object_id)
143 .select_only()
144 .column(table::Column::Name)
145 .into_tuple()
146 .one(&txn)
147 .await?
148 .ok_or_else(|| {
149 MetaError::catalog_id_not_found(object_type.as_str(), dst_object_id)
150 })?,
151 ObjectType::Source => Source::find_by_id(dst_object_id)
152 .select_only()
153 .column(source::Column::Name)
154 .into_tuple()
155 .one(&txn)
156 .await?
157 .ok_or_else(|| {
158 MetaError::catalog_id_not_found(object_type.as_str(), dst_object_id)
159 })?,
160 ObjectType::Sink => Sink::find_by_id(dst_object_id)
161 .select_only()
162 .column(sink::Column::Name)
163 .into_tuple()
164 .one(&txn)
165 .await?
166 .ok_or_else(|| {
167 MetaError::catalog_id_not_found(object_type.as_str(), dst_object_id)
168 })?,
169 ObjectType::View => View::find_by_id(dst_object_id)
170 .select_only()
171 .column(view::Column::Name)
172 .into_tuple()
173 .one(&txn)
174 .await?
175 .ok_or_else(|| {
176 MetaError::catalog_id_not_found(object_type.as_str(), dst_object_id)
177 })?,
178 ObjectType::Subscription => Subscription::find_by_id(dst_object_id)
179 .select_only()
180 .column(subscription::Column::Name)
181 .into_tuple()
182 .one(&txn)
183 .await?
184 .ok_or_else(|| {
185 MetaError::catalog_id_not_found(object_type.as_str(), dst_object_id)
186 })?,
187 _ => {
188 return Err(MetaError::permission_denied(format!(
189 "swap rename not supported for object type: {:?}",
190 object_type
191 )));
192 }
193 };
194
195 let (mut to_update_relations, src_name) =
197 rename_relation(&txn, object_type, object_id, &dst_name).await?;
198 let (to_update_relations2, _) =
199 rename_relation(&txn, object_type, dst_object_id, &src_name).await?;
200 to_update_relations.extend(to_update_relations2);
201 to_update_relations.extend(
203 rename_relation_refer(&txn, object_type, object_id, &dst_name, &src_name).await?,
204 );
205 to_update_relations.extend(
206 rename_relation_refer(&txn, object_type, dst_object_id, &src_name, &dst_name).await?,
207 );
208
209 txn.commit().await?;
210
211 let version = self
212 .notify_frontend(
213 NotificationOperation::Update,
214 NotificationInfo::ObjectGroup(PbObjectGroup {
215 objects: to_update_relations,
216 }),
217 )
218 .await;
219
220 Ok(version)
221 }
222
223 pub async fn alter_non_shared_source(
224 &self,
225 pb_source: PbSource,
226 ) -> MetaResult<NotificationVersion> {
227 let source_id = pb_source.id as SourceId;
228 let inner = self.inner.write().await;
229 let txn = inner.db.begin().await?;
230
231 let original_version: i64 = Source::find_by_id(source_id)
232 .select_only()
233 .column(source::Column::Version)
234 .into_tuple()
235 .one(&txn)
236 .await?
237 .ok_or_else(|| MetaError::catalog_id_not_found("source", source_id))?;
238 if original_version + 1 != pb_source.version as i64 {
239 return Err(MetaError::permission_denied(
240 "source version is stale".to_owned(),
241 ));
242 }
243
244 let source: source::ActiveModel = pb_source.clone().into();
245 source.update(&txn).await?;
246 txn.commit().await?;
247
248 let version = self
249 .notify_frontend_relation_info(
250 NotificationOperation::Update,
251 PbObjectInfo::Source(pb_source),
252 )
253 .await;
254 Ok(version)
255 }
256
257 pub async fn alter_owner(
258 &self,
259 object_type: ObjectType,
260 object_id: ObjectId,
261 new_owner: UserId,
262 ) -> MetaResult<NotificationVersion> {
263 let inner = self.inner.write().await;
264 let txn = inner.db.begin().await?;
265 ensure_user_id(new_owner, &txn).await?;
266
267 let obj = Object::find_by_id(object_id)
268 .one(&txn)
269 .await?
270 .ok_or_else(|| MetaError::catalog_id_not_found(object_type.as_str(), object_id))?;
271 if obj.owner_id == new_owner {
272 return Ok(IGNORED_NOTIFICATION_VERSION);
273 }
274 let mut obj = obj.into_active_model();
275 obj.owner_id = Set(new_owner);
276 let obj = obj.update(&txn).await?;
277
278 let mut objects = vec![];
279 match object_type {
280 ObjectType::Database => {
281 let db = Database::find_by_id(object_id)
282 .one(&txn)
283 .await?
284 .ok_or_else(|| MetaError::catalog_id_not_found("database", object_id))?;
285
286 txn.commit().await?;
287
288 let version = self
289 .notify_frontend(
290 NotificationOperation::Update,
291 NotificationInfo::Database(ObjectModel(db, obj).into()),
292 )
293 .await;
294 return Ok(version);
295 }
296 ObjectType::Schema => {
297 let schema = Schema::find_by_id(object_id)
298 .one(&txn)
299 .await?
300 .ok_or_else(|| MetaError::catalog_id_not_found("schema", object_id))?;
301
302 txn.commit().await?;
303
304 let version = self
305 .notify_frontend(
306 NotificationOperation::Update,
307 NotificationInfo::Schema(ObjectModel(schema, obj).into()),
308 )
309 .await;
310 return Ok(version);
311 }
312 ObjectType::Table => {
313 let table = Table::find_by_id(object_id)
314 .one(&txn)
315 .await?
316 .ok_or_else(|| MetaError::catalog_id_not_found("table", object_id))?;
317
318 if let Some(associated_source_id) = table.optional_associated_source_id {
320 let src_obj = object::ActiveModel {
321 oid: Set(associated_source_id as _),
322 owner_id: Set(new_owner),
323 ..Default::default()
324 }
325 .update(&txn)
326 .await?;
327 let source = Source::find_by_id(associated_source_id)
328 .one(&txn)
329 .await?
330 .ok_or_else(|| {
331 MetaError::catalog_id_not_found("source", associated_source_id)
332 })?;
333 objects.push(PbObjectInfo::Source(ObjectModel(source, src_obj).into()));
334 }
335
336 let (index_ids, mut table_ids): (Vec<IndexId>, Vec<TableId>) = Index::find()
338 .select_only()
339 .columns([index::Column::IndexId, index::Column::IndexTableId])
340 .filter(index::Column::PrimaryTableId.eq(object_id))
341 .into_tuple::<(IndexId, TableId)>()
342 .all(&txn)
343 .await?
344 .into_iter()
345 .unzip();
346 objects.push(PbObjectInfo::Table(ObjectModel(table, obj).into()));
347
348 let internal_tables: Vec<TableId> = Table::find()
350 .select_only()
351 .column(table::Column::TableId)
352 .filter(
353 table::Column::BelongsToJobId
354 .is_in(table_ids.iter().cloned().chain(std::iter::once(object_id))),
355 )
356 .into_tuple()
357 .all(&txn)
358 .await?;
359 table_ids.extend(internal_tables);
360
361 if !index_ids.is_empty() || !table_ids.is_empty() {
362 Object::update_many()
363 .col_expr(
364 object::Column::OwnerId,
365 SimpleExpr::Value(Value::Int(Some(new_owner))),
366 )
367 .filter(
368 object::Column::Oid
369 .is_in(index_ids.iter().cloned().chain(table_ids.iter().cloned())),
370 )
371 .exec(&txn)
372 .await?;
373 }
374
375 if !table_ids.is_empty() {
376 let table_objs = Table::find()
377 .find_also_related(Object)
378 .filter(table::Column::TableId.is_in(table_ids))
379 .all(&txn)
380 .await?;
381 for (table, table_obj) in table_objs {
382 objects.push(PbObjectInfo::Table(
383 ObjectModel(table, table_obj.unwrap()).into(),
384 ));
385 }
386 }
387 if !index_ids.is_empty() {
389 let index_objs = Index::find()
390 .find_also_related(Object)
391 .filter(index::Column::IndexId.is_in(index_ids))
392 .all(&txn)
393 .await?;
394 for (index, index_obj) in index_objs {
395 objects.push(PbObjectInfo::Index(
396 ObjectModel(index, index_obj.unwrap()).into(),
397 ));
398 }
399 }
400 }
401 ObjectType::Source => {
402 let source = Source::find_by_id(object_id)
403 .one(&txn)
404 .await?
405 .ok_or_else(|| MetaError::catalog_id_not_found("source", object_id))?;
406 let is_shared = source.is_shared();
407 objects.push(PbObjectInfo::Source(ObjectModel(source, obj).into()));
408
409 if is_shared {
412 update_internal_tables(
413 &txn,
414 object_id,
415 object::Column::OwnerId,
416 Value::Int(Some(new_owner)),
417 &mut objects,
418 )
419 .await?;
420 }
421 }
422 ObjectType::Sink => {
423 let sink = Sink::find_by_id(object_id)
424 .one(&txn)
425 .await?
426 .ok_or_else(|| MetaError::catalog_id_not_found("sink", object_id))?;
427 objects.push(PbObjectInfo::Sink(ObjectModel(sink, obj).into()));
428
429 update_internal_tables(
430 &txn,
431 object_id,
432 object::Column::OwnerId,
433 Value::Int(Some(new_owner)),
434 &mut objects,
435 )
436 .await?;
437 }
438 ObjectType::Subscription => {
439 let subscription = Subscription::find_by_id(object_id)
440 .one(&txn)
441 .await?
442 .ok_or_else(|| MetaError::catalog_id_not_found("subscription", object_id))?;
443 objects.push(PbObjectInfo::Subscription(
444 ObjectModel(subscription, obj).into(),
445 ));
446 }
447 ObjectType::View => {
448 let view = View::find_by_id(object_id)
449 .one(&txn)
450 .await?
451 .ok_or_else(|| MetaError::catalog_id_not_found("view", object_id))?;
452 objects.push(PbObjectInfo::View(ObjectModel(view, obj).into()));
453 }
454 ObjectType::Connection => {
455 let connection = Connection::find_by_id(object_id)
456 .one(&txn)
457 .await?
458 .ok_or_else(|| MetaError::catalog_id_not_found("connection", object_id))?;
459 objects.push(PbObjectInfo::Connection(
460 ObjectModel(connection, obj).into(),
461 ));
462 }
463 _ => unreachable!("not supported object type: {:?}", object_type),
464 };
465
466 txn.commit().await?;
467
468 let version = self
469 .notify_frontend(
470 NotificationOperation::Update,
471 NotificationInfo::ObjectGroup(PbObjectGroup {
472 objects: objects
473 .into_iter()
474 .map(|object| PbObject {
475 object_info: Some(object),
476 })
477 .collect(),
478 }),
479 )
480 .await;
481 Ok(version)
482 }
483
484 pub async fn alter_schema(
485 &self,
486 object_type: ObjectType,
487 object_id: ObjectId,
488 new_schema: SchemaId,
489 ) -> MetaResult<NotificationVersion> {
490 let inner = self.inner.write().await;
491 let txn = inner.db.begin().await?;
492 ensure_object_id(ObjectType::Schema, new_schema, &txn).await?;
493
494 let obj = Object::find_by_id(object_id)
495 .one(&txn)
496 .await?
497 .ok_or_else(|| MetaError::catalog_id_not_found(object_type.as_str(), object_id))?;
498 if obj.schema_id == Some(new_schema) {
499 return Ok(IGNORED_NOTIFICATION_VERSION);
500 }
501 let database_id = obj.database_id.unwrap();
502
503 let mut objects = vec![];
504 match object_type {
505 ObjectType::Table => {
506 let table = Table::find_by_id(object_id)
507 .one(&txn)
508 .await?
509 .ok_or_else(|| MetaError::catalog_id_not_found("table", object_id))?;
510 check_relation_name_duplicate(&table.name, database_id, new_schema, &txn).await?;
511 let associated_src_id = table.optional_associated_source_id;
512
513 let mut obj = obj.into_active_model();
514 obj.schema_id = Set(Some(new_schema));
515 let obj = obj.update(&txn).await?;
516 objects.push(PbObjectInfo::Table(ObjectModel(table, obj).into()));
517
518 if let Some(associated_source_id) = associated_src_id {
520 let src_obj = object::ActiveModel {
521 oid: Set(associated_source_id as _),
522 schema_id: Set(Some(new_schema)),
523 ..Default::default()
524 }
525 .update(&txn)
526 .await?;
527 let source = Source::find_by_id(associated_source_id)
528 .one(&txn)
529 .await?
530 .ok_or_else(|| {
531 MetaError::catalog_id_not_found("source", associated_source_id)
532 })?;
533 objects.push(PbObjectInfo::Source(ObjectModel(source, src_obj).into()));
534 }
535
536 let (index_ids, (index_names, mut table_ids)): (
538 Vec<IndexId>,
539 (Vec<String>, Vec<TableId>),
540 ) = Index::find()
541 .select_only()
542 .columns([
543 index::Column::IndexId,
544 index::Column::Name,
545 index::Column::IndexTableId,
546 ])
547 .filter(index::Column::PrimaryTableId.eq(object_id))
548 .into_tuple::<(IndexId, String, TableId)>()
549 .all(&txn)
550 .await?
551 .into_iter()
552 .map(|(id, name, t_id)| (id, (name, t_id)))
553 .unzip();
554
555 let internal_tables: Vec<TableId> = Table::find()
557 .select_only()
558 .column(table::Column::TableId)
559 .filter(
560 table::Column::BelongsToJobId
561 .is_in(table_ids.iter().cloned().chain(std::iter::once(object_id))),
562 )
563 .into_tuple()
564 .all(&txn)
565 .await?;
566 table_ids.extend(internal_tables);
567
568 if !index_ids.is_empty() || !table_ids.is_empty() {
569 for index_name in index_names {
570 check_relation_name_duplicate(&index_name, database_id, new_schema, &txn)
571 .await?;
572 }
573
574 Object::update_many()
575 .col_expr(
576 object::Column::SchemaId,
577 SimpleExpr::Value(Value::Int(Some(new_schema))),
578 )
579 .filter(
580 object::Column::Oid
581 .is_in(index_ids.iter().cloned().chain(table_ids.iter().cloned())),
582 )
583 .exec(&txn)
584 .await?;
585 }
586
587 if !table_ids.is_empty() {
588 let table_objs = Table::find()
589 .find_also_related(Object)
590 .filter(table::Column::TableId.is_in(table_ids))
591 .all(&txn)
592 .await?;
593 for (table, table_obj) in table_objs {
594 objects.push(PbObjectInfo::Table(
595 ObjectModel(table, table_obj.unwrap()).into(),
596 ));
597 }
598 }
599 if !index_ids.is_empty() {
600 let index_objs = Index::find()
601 .find_also_related(Object)
602 .filter(index::Column::IndexId.is_in(index_ids))
603 .all(&txn)
604 .await?;
605 for (index, index_obj) in index_objs {
606 objects.push(PbObjectInfo::Index(
607 ObjectModel(index, index_obj.unwrap()).into(),
608 ));
609 }
610 }
611 }
612 ObjectType::Source => {
613 let source = Source::find_by_id(object_id)
614 .one(&txn)
615 .await?
616 .ok_or_else(|| MetaError::catalog_id_not_found("source", object_id))?;
617 check_relation_name_duplicate(&source.name, database_id, new_schema, &txn).await?;
618 let is_shared = source.is_shared();
619
620 let mut obj = obj.into_active_model();
621 obj.schema_id = Set(Some(new_schema));
622 let obj = obj.update(&txn).await?;
623 objects.push(PbObjectInfo::Source(ObjectModel(source, obj).into()));
624
625 if is_shared {
628 update_internal_tables(
629 &txn,
630 object_id,
631 object::Column::SchemaId,
632 Value::Int(Some(new_schema)),
633 &mut objects,
634 )
635 .await?;
636 }
637 }
638 ObjectType::Sink => {
639 let sink = Sink::find_by_id(object_id)
640 .one(&txn)
641 .await?
642 .ok_or_else(|| MetaError::catalog_id_not_found("sink", object_id))?;
643 check_relation_name_duplicate(&sink.name, database_id, new_schema, &txn).await?;
644
645 let mut obj = obj.into_active_model();
646 obj.schema_id = Set(Some(new_schema));
647 let obj = obj.update(&txn).await?;
648 objects.push(PbObjectInfo::Sink(ObjectModel(sink, obj).into()));
649
650 update_internal_tables(
651 &txn,
652 object_id,
653 object::Column::SchemaId,
654 Value::Int(Some(new_schema)),
655 &mut objects,
656 )
657 .await?;
658 }
659 ObjectType::Subscription => {
660 let subscription = Subscription::find_by_id(object_id)
661 .one(&txn)
662 .await?
663 .ok_or_else(|| MetaError::catalog_id_not_found("subscription", object_id))?;
664 check_relation_name_duplicate(&subscription.name, database_id, new_schema, &txn)
665 .await?;
666
667 let mut obj = obj.into_active_model();
668 obj.schema_id = Set(Some(new_schema));
669 let obj = obj.update(&txn).await?;
670 objects.push(PbObjectInfo::Subscription(
671 ObjectModel(subscription, obj).into(),
672 ));
673 }
674 ObjectType::View => {
675 let view = View::find_by_id(object_id)
676 .one(&txn)
677 .await?
678 .ok_or_else(|| MetaError::catalog_id_not_found("view", object_id))?;
679 check_relation_name_duplicate(&view.name, database_id, new_schema, &txn).await?;
680
681 let mut obj = obj.into_active_model();
682 obj.schema_id = Set(Some(new_schema));
683 let obj = obj.update(&txn).await?;
684 objects.push(PbObjectInfo::View(ObjectModel(view, obj).into()));
685 }
686 ObjectType::Function => {
687 let function = Function::find_by_id(object_id)
688 .one(&txn)
689 .await?
690 .ok_or_else(|| MetaError::catalog_id_not_found("function", object_id))?;
691
692 let mut pb_function: PbFunction = ObjectModel(function, obj).into();
693 pb_function.schema_id = new_schema as _;
694 check_function_signature_duplicate(&pb_function, &txn).await?;
695
696 object::ActiveModel {
697 oid: Set(object_id),
698 schema_id: Set(Some(new_schema)),
699 ..Default::default()
700 }
701 .update(&txn)
702 .await?;
703
704 txn.commit().await?;
705 let version = self
706 .notify_frontend(
707 NotificationOperation::Update,
708 NotificationInfo::Function(pb_function),
709 )
710 .await;
711 return Ok(version);
712 }
713 ObjectType::Connection => {
714 let connection = Connection::find_by_id(object_id)
715 .one(&txn)
716 .await?
717 .ok_or_else(|| MetaError::catalog_id_not_found("connection", object_id))?;
718
719 let mut pb_connection: PbConnection = ObjectModel(connection, obj).into();
720 pb_connection.schema_id = new_schema as _;
721 check_connection_name_duplicate(&pb_connection, &txn).await?;
722
723 object::ActiveModel {
724 oid: Set(object_id),
725 schema_id: Set(Some(new_schema)),
726 ..Default::default()
727 }
728 .update(&txn)
729 .await?;
730
731 txn.commit().await?;
732 let version = self
733 .notify_frontend(
734 NotificationOperation::Update,
735 NotificationInfo::Connection(pb_connection),
736 )
737 .await;
738 return Ok(version);
739 }
740 _ => unreachable!("not supported object type: {:?}", object_type),
741 }
742
743 txn.commit().await?;
744 let version = self
745 .notify_frontend(
746 Operation::Update,
747 Info::ObjectGroup(PbObjectGroup {
748 objects: objects
749 .into_iter()
750 .map(|relation_info| PbObject {
751 object_info: Some(relation_info),
752 })
753 .collect_vec(),
754 }),
755 )
756 .await;
757 Ok(version)
758 }
759
760 pub async fn alter_secret(
761 &self,
762 pb_secret: PbSecret,
763 secret_plain_payload: Vec<u8>,
764 ) -> MetaResult<NotificationVersion> {
765 let inner = self.inner.write().await;
766 let owner_id = pb_secret.owner as _;
767 let txn = inner.db.begin().await?;
768 ensure_user_id(owner_id, &txn).await?;
769 ensure_object_id(ObjectType::Database, pb_secret.database_id as _, &txn).await?;
770 ensure_object_id(ObjectType::Schema, pb_secret.schema_id as _, &txn).await?;
771
772 ensure_object_id(ObjectType::Secret, pb_secret.id as _, &txn).await?;
773 let secret: secret::ActiveModel = pb_secret.clone().into();
774 Secret::update(secret).exec(&txn).await?;
775
776 txn.commit().await?;
777
778 let mut secret_plain = pb_secret;
780 secret_plain.value.clone_from(&secret_plain_payload);
781
782 LocalSecretManager::global().update_secret(secret_plain.id, secret_plain_payload);
783 self.env
784 .notification_manager()
785 .notify_compute_without_version(Operation::Update, Info::Secret(secret_plain.clone()));
786
787 let version = self
788 .notify_frontend(
789 NotificationOperation::Update,
790 NotificationInfo::Secret(secret_plain),
791 )
792 .await;
793
794 Ok(version)
795 }
796
797 pub async fn drop_table_associated_source(
799 txn: &DatabaseTransaction,
800 drop_table_connector_ctx: &DropTableConnectorContext,
801 ) -> MetaResult<(Vec<PbUserInfo>, Vec<PartialObject>)> {
802 let to_drop_source_objects: Vec<PartialObject> = Object::find()
803 .filter(object::Column::Oid.is_in(vec![drop_table_connector_ctx.to_remove_source_id]))
804 .into_partial_model()
805 .all(txn)
806 .await?;
807 let to_drop_internal_table_objs: Vec<PartialObject> = Object::find()
808 .select_only()
809 .filter(
810 object::Column::Oid.is_in(vec![drop_table_connector_ctx.to_remove_state_table_id]),
811 )
812 .into_partial_model()
813 .all(txn)
814 .await?;
815 let to_drop_objects = to_drop_source_objects
816 .into_iter()
817 .chain(to_drop_internal_table_objs.into_iter())
818 .collect_vec();
819 let to_update_user_ids: Vec<UserId> = UserPrivilege::find()
821 .select_only()
822 .distinct()
823 .column(user_privilege::Column::UserId)
824 .filter(user_privilege::Column::Oid.is_in(to_drop_objects.iter().map(|obj| obj.oid)))
825 .into_tuple()
826 .all(txn)
827 .await?;
828
829 tracing::debug!(
830 "drop_table_associated_source: to_drop_objects: {:?}",
831 to_drop_objects
832 );
833
834 let res = Object::delete_many()
836 .filter(object::Column::Oid.is_in(to_drop_objects.iter().map(|obj| obj.oid)))
837 .exec(txn)
838 .await?;
839 if res.rows_affected == 0 {
840 return Err(MetaError::catalog_id_not_found(
841 ObjectType::Source.as_str(),
842 drop_table_connector_ctx.to_remove_source_id,
843 ));
844 }
845 let user_infos = list_user_info_by_ids(to_update_user_ids, txn).await?;
846
847 Ok((user_infos, to_drop_objects))
848 }
849}