1use std::collections::hash_map::Entry;
16use std::collections::{BTreeSet, HashMap, HashSet};
17use std::sync::Arc;
18
19use anyhow::{Context, anyhow};
20use itertools::Itertools;
21use risingwave_common::bitmap::Bitmap;
22use risingwave_common::hash::{ActorMapping, VnodeBitmapExt, WorkerSlotId, WorkerSlotMapping};
23use risingwave_common::util::worker_util::DEFAULT_RESOURCE_GROUP;
24use risingwave_common::{bail, hash};
25use risingwave_meta_model::actor::ActorStatus;
26use risingwave_meta_model::actor_dispatcher::DispatcherType;
27use risingwave_meta_model::fragment::DistributionType;
28use risingwave_meta_model::object::ObjectType;
29use risingwave_meta_model::prelude::*;
30use risingwave_meta_model::table::TableType;
31use risingwave_meta_model::{
32 ActorId, DataTypeArray, DatabaseId, FragmentId, I32Array, JobStatus, ObjectId, PrivilegeId,
33 SchemaId, SourceId, StreamNode, StreamSourceInfo, TableId, UserId, VnodeBitmap, WorkerId,
34 actor, connection, database, fragment, fragment_relation, function, index, object,
35 object_dependency, schema, secret, sink, source, streaming_job, subscription, table, user,
36 user_privilege, view,
37};
38use risingwave_meta_model_migration::WithQuery;
39use risingwave_pb::catalog::{
40 PbConnection, PbDatabase, PbFunction, PbIndex, PbSchema, PbSecret, PbSink, PbSource,
41 PbSubscription, PbTable, PbView,
42};
43use risingwave_pb::common::WorkerNode;
44use risingwave_pb::meta::object::PbObjectInfo;
45use risingwave_pb::meta::subscribe_response::Info as NotificationInfo;
46use risingwave_pb::meta::{
47 FragmentWorkerSlotMapping, PbFragmentWorkerSlotMapping, PbObject, PbObjectGroup,
48};
49use risingwave_pb::stream_plan::{PbDispatcher, PbDispatcherType, PbFragmentTypeFlag};
50use risingwave_pb::user::grant_privilege::{
51 PbAction, PbActionWithGrantOption, PbObject as PbGrantObject,
52};
53use risingwave_pb::user::{PbGrantPrivilege, PbUserInfo};
54use risingwave_sqlparser::ast::Statement as SqlStatement;
55use risingwave_sqlparser::parser::Parser;
56use sea_orm::sea_query::{
57 Alias, CommonTableExpression, Expr, Query, QueryStatementBuilder, SelectStatement, UnionType,
58 WithClause,
59};
60use sea_orm::{
61 ColumnTrait, ConnectionTrait, DatabaseTransaction, DerivePartialModel, EntityTrait,
62 FromQueryResult, IntoActiveModel, JoinType, Order, PaginatorTrait, QueryFilter, QuerySelect,
63 RelationTrait, Set, Statement,
64};
65use thiserror_ext::AsReport;
66
67use crate::controller::ObjectModel;
68use crate::model::{FragmentActorDispatchers, FragmentDownstreamRelation};
69use crate::{MetaError, MetaResult};
70
71pub fn construct_obj_dependency_query(obj_id: ObjectId) -> WithQuery {
96 let cte_alias = Alias::new("used_by_object_ids");
97 let cte_return_alias = Alias::new("used_by");
98
99 let mut base_query = SelectStatement::new()
100 .column(object_dependency::Column::UsedBy)
101 .from(ObjectDependency)
102 .and_where(object_dependency::Column::Oid.eq(obj_id))
103 .to_owned();
104
105 let belonged_obj_query = SelectStatement::new()
106 .column(object::Column::Oid)
107 .from(Object)
108 .and_where(
109 object::Column::DatabaseId
110 .eq(obj_id)
111 .or(object::Column::SchemaId.eq(obj_id)),
112 )
113 .to_owned();
114
115 let cte_referencing = Query::select()
116 .column((ObjectDependency, object_dependency::Column::UsedBy))
117 .from(ObjectDependency)
118 .inner_join(
119 cte_alias.clone(),
120 Expr::col((cte_alias.clone(), cte_return_alias.clone()))
121 .equals(object_dependency::Column::Oid),
122 )
123 .to_owned();
124
125 let common_table_expr = CommonTableExpression::new()
126 .query(
127 base_query
128 .union(UnionType::All, belonged_obj_query)
129 .union(UnionType::All, cte_referencing)
130 .to_owned(),
131 )
132 .column(cte_return_alias.clone())
133 .table_name(cte_alias.clone())
134 .to_owned();
135
136 SelectStatement::new()
137 .distinct()
138 .columns([
139 object::Column::Oid,
140 object::Column::ObjType,
141 object::Column::SchemaId,
142 object::Column::DatabaseId,
143 ])
144 .from(cte_alias.clone())
145 .inner_join(
146 Object,
147 Expr::col((cte_alias, cte_return_alias.clone())).equals(object::Column::Oid),
148 )
149 .order_by(object::Column::Oid, Order::Desc)
150 .to_owned()
151 .with(
152 WithClause::new()
153 .recursive(true)
154 .cte(common_table_expr)
155 .to_owned(),
156 )
157 .to_owned()
158}
159
160pub fn construct_sink_cycle_check_query(
185 target_table: ObjectId,
186 dependent_objects: Vec<ObjectId>,
187) -> WithQuery {
188 let cte_alias = Alias::new("used_by_object_ids_with_sink");
189 let depend_alias = Alias::new("obj_dependency_with_sink");
190
191 let mut base_query = SelectStatement::new()
192 .columns([
193 object_dependency::Column::Oid,
194 object_dependency::Column::UsedBy,
195 ])
196 .from(ObjectDependency)
197 .and_where(object_dependency::Column::Oid.eq(target_table))
198 .to_owned();
199
200 let query_sink_deps = SelectStatement::new()
201 .columns([sink::Column::SinkId, sink::Column::TargetTable])
202 .from(Sink)
203 .and_where(sink::Column::TargetTable.is_not_null())
204 .to_owned();
205
206 let cte_referencing = Query::select()
207 .column((depend_alias.clone(), object_dependency::Column::Oid))
208 .column((depend_alias.clone(), object_dependency::Column::UsedBy))
209 .from_subquery(
210 SelectStatement::new()
211 .columns([
212 object_dependency::Column::Oid,
213 object_dependency::Column::UsedBy,
214 ])
215 .from(ObjectDependency)
216 .union(UnionType::All, query_sink_deps)
217 .to_owned(),
218 depend_alias.clone(),
219 )
220 .inner_join(
221 cte_alias.clone(),
222 Expr::col((cte_alias.clone(), object_dependency::Column::UsedBy)).eq(Expr::col((
223 depend_alias.clone(),
224 object_dependency::Column::Oid,
225 ))),
226 )
227 .and_where(
228 Expr::col((cte_alias.clone(), object_dependency::Column::UsedBy)).ne(Expr::col((
229 cte_alias.clone(),
230 object_dependency::Column::Oid,
231 ))),
232 )
233 .to_owned();
234
235 let common_table_expr = CommonTableExpression::new()
236 .query(base_query.union(UnionType::All, cte_referencing).to_owned())
237 .columns([
238 object_dependency::Column::Oid,
239 object_dependency::Column::UsedBy,
240 ])
241 .table_name(cte_alias.clone())
242 .to_owned();
243
244 SelectStatement::new()
245 .expr(Expr::col((cte_alias.clone(), object_dependency::Column::UsedBy)).count())
246 .from(cte_alias.clone())
247 .and_where(
248 Expr::col((cte_alias.clone(), object_dependency::Column::UsedBy))
249 .is_in(dependent_objects),
250 )
251 .to_owned()
252 .with(
253 WithClause::new()
254 .recursive(true)
255 .cte(common_table_expr)
256 .to_owned(),
257 )
258 .to_owned()
259}
260
261#[derive(Clone, DerivePartialModel, FromQueryResult, Debug)]
262#[sea_orm(entity = "Object")]
263pub struct PartialObject {
264 pub oid: ObjectId,
265 pub obj_type: ObjectType,
266 pub schema_id: Option<SchemaId>,
267 pub database_id: Option<DatabaseId>,
268}
269
270#[derive(Clone, DerivePartialModel, FromQueryResult)]
271#[sea_orm(entity = "Fragment")]
272pub struct PartialFragmentStateTables {
273 pub fragment_id: FragmentId,
274 pub job_id: ObjectId,
275 pub state_table_ids: I32Array,
276}
277
278#[derive(Clone, DerivePartialModel, FromQueryResult)]
279#[sea_orm(entity = "Actor")]
280pub struct PartialActorLocation {
281 pub actor_id: ActorId,
282 pub fragment_id: FragmentId,
283 pub worker_id: WorkerId,
284 pub status: ActorStatus,
285}
286
287#[derive(FromQueryResult)]
288pub struct FragmentDesc {
289 pub fragment_id: FragmentId,
290 pub job_id: ObjectId,
291 pub fragment_type_mask: i32,
292 pub distribution_type: DistributionType,
293 pub state_table_ids: I32Array,
294 pub parallelism: i64,
295 pub vnode_count: i32,
296 pub stream_node: StreamNode,
297}
298
299pub async fn get_referring_objects_cascade<C>(
301 obj_id: ObjectId,
302 db: &C,
303) -> MetaResult<Vec<PartialObject>>
304where
305 C: ConnectionTrait,
306{
307 let query = construct_obj_dependency_query(obj_id);
308 let (sql, values) = query.build_any(&*db.get_database_backend().get_query_builder());
309 let objects = PartialObject::find_by_statement(Statement::from_sql_and_values(
310 db.get_database_backend(),
311 sql,
312 values,
313 ))
314 .all(db)
315 .await?;
316 Ok(objects)
317}
318
319pub async fn check_sink_into_table_cycle<C>(
321 target_table: ObjectId,
322 dependent_objs: Vec<ObjectId>,
323 db: &C,
324) -> MetaResult<bool>
325where
326 C: ConnectionTrait,
327{
328 if dependent_objs.is_empty() {
329 return Ok(false);
330 }
331
332 if dependent_objs.contains(&target_table) {
334 return Ok(true);
335 }
336
337 let query = construct_sink_cycle_check_query(target_table, dependent_objs);
338 let (sql, values) = query.build_any(&*db.get_database_backend().get_query_builder());
339
340 let res = db
341 .query_one(Statement::from_sql_and_values(
342 db.get_database_backend(),
343 sql,
344 values,
345 ))
346 .await?
347 .unwrap();
348
349 let cnt: i64 = res.try_get_by(0)?;
350
351 Ok(cnt != 0)
352}
353
354pub async fn ensure_object_id<C>(
356 object_type: ObjectType,
357 obj_id: ObjectId,
358 db: &C,
359) -> MetaResult<()>
360where
361 C: ConnectionTrait,
362{
363 let count = Object::find_by_id(obj_id).count(db).await?;
364 if count == 0 {
365 return Err(MetaError::catalog_id_not_found(
366 object_type.as_str(),
367 obj_id,
368 ));
369 }
370 Ok(())
371}
372
373pub async fn ensure_user_id<C>(user_id: UserId, db: &C) -> MetaResult<()>
375where
376 C: ConnectionTrait,
377{
378 let count = User::find_by_id(user_id).count(db).await?;
379 if count == 0 {
380 return Err(anyhow!("user {} was concurrently dropped", user_id).into());
381 }
382 Ok(())
383}
384
385pub async fn check_database_name_duplicate<C>(name: &str, db: &C) -> MetaResult<()>
387where
388 C: ConnectionTrait,
389{
390 let count = Database::find()
391 .filter(database::Column::Name.eq(name))
392 .count(db)
393 .await?;
394 if count > 0 {
395 assert_eq!(count, 1);
396 return Err(MetaError::catalog_duplicated("database", name));
397 }
398 Ok(())
399}
400
401pub async fn check_function_signature_duplicate<C>(
403 pb_function: &PbFunction,
404 db: &C,
405) -> MetaResult<()>
406where
407 C: ConnectionTrait,
408{
409 let count = Function::find()
410 .inner_join(Object)
411 .filter(
412 object::Column::DatabaseId
413 .eq(pb_function.database_id as DatabaseId)
414 .and(object::Column::SchemaId.eq(pb_function.schema_id as SchemaId))
415 .and(function::Column::Name.eq(&pb_function.name))
416 .and(
417 function::Column::ArgTypes
418 .eq(DataTypeArray::from(pb_function.arg_types.clone())),
419 ),
420 )
421 .count(db)
422 .await?;
423 if count > 0 {
424 assert_eq!(count, 1);
425 return Err(MetaError::catalog_duplicated("function", &pb_function.name));
426 }
427 Ok(())
428}
429
430pub async fn check_connection_name_duplicate<C>(
432 pb_connection: &PbConnection,
433 db: &C,
434) -> MetaResult<()>
435where
436 C: ConnectionTrait,
437{
438 let count = Connection::find()
439 .inner_join(Object)
440 .filter(
441 object::Column::DatabaseId
442 .eq(pb_connection.database_id as DatabaseId)
443 .and(object::Column::SchemaId.eq(pb_connection.schema_id as SchemaId))
444 .and(connection::Column::Name.eq(&pb_connection.name)),
445 )
446 .count(db)
447 .await?;
448 if count > 0 {
449 assert_eq!(count, 1);
450 return Err(MetaError::catalog_duplicated(
451 "connection",
452 &pb_connection.name,
453 ));
454 }
455 Ok(())
456}
457
458pub async fn check_secret_name_duplicate<C>(pb_secret: &PbSecret, db: &C) -> MetaResult<()>
459where
460 C: ConnectionTrait,
461{
462 let count = Secret::find()
463 .inner_join(Object)
464 .filter(
465 object::Column::DatabaseId
466 .eq(pb_secret.database_id as DatabaseId)
467 .and(object::Column::SchemaId.eq(pb_secret.schema_id as SchemaId))
468 .and(secret::Column::Name.eq(&pb_secret.name)),
469 )
470 .count(db)
471 .await?;
472 if count > 0 {
473 assert_eq!(count, 1);
474 return Err(MetaError::catalog_duplicated("secret", &pb_secret.name));
475 }
476 Ok(())
477}
478
479pub async fn check_subscription_name_duplicate<C>(
480 pb_subscription: &PbSubscription,
481 db: &C,
482) -> MetaResult<()>
483where
484 C: ConnectionTrait,
485{
486 let count = Subscription::find()
487 .inner_join(Object)
488 .filter(
489 object::Column::DatabaseId
490 .eq(pb_subscription.database_id as DatabaseId)
491 .and(object::Column::SchemaId.eq(pb_subscription.schema_id as SchemaId))
492 .and(subscription::Column::Name.eq(&pb_subscription.name)),
493 )
494 .count(db)
495 .await?;
496 if count > 0 {
497 assert_eq!(count, 1);
498 return Err(MetaError::catalog_duplicated(
499 "subscription",
500 &pb_subscription.name,
501 ));
502 }
503 Ok(())
504}
505
506pub async fn check_user_name_duplicate<C>(name: &str, db: &C) -> MetaResult<()>
508where
509 C: ConnectionTrait,
510{
511 let count = User::find()
512 .filter(user::Column::Name.eq(name))
513 .count(db)
514 .await?;
515 if count > 0 {
516 assert_eq!(count, 1);
517 return Err(MetaError::catalog_duplicated("user", name));
518 }
519 Ok(())
520}
521
522pub async fn check_relation_name_duplicate<C>(
524 name: &str,
525 database_id: DatabaseId,
526 schema_id: SchemaId,
527 db: &C,
528) -> MetaResult<()>
529where
530 C: ConnectionTrait,
531{
532 macro_rules! check_duplicated {
533 ($obj_type:expr, $entity:ident, $table:ident) => {
534 let object_id = Object::find()
535 .select_only()
536 .column(object::Column::Oid)
537 .inner_join($entity)
538 .filter(
539 object::Column::DatabaseId
540 .eq(Some(database_id))
541 .and(object::Column::SchemaId.eq(Some(schema_id)))
542 .and($table::Column::Name.eq(name)),
543 )
544 .into_tuple::<ObjectId>()
545 .one(db)
546 .await?;
547 if let Some(oid) = object_id {
548 let check_creation = if $obj_type == ObjectType::View {
549 false
550 } else if $obj_type == ObjectType::Source {
551 let source_info = Source::find_by_id(oid)
552 .select_only()
553 .column(source::Column::SourceInfo)
554 .into_tuple::<Option<StreamSourceInfo>>()
555 .one(db)
556 .await?
557 .unwrap();
558 source_info.map_or(false, |info| info.to_protobuf().is_shared())
559 } else {
560 true
561 };
562 return if check_creation
563 && !matches!(
564 StreamingJob::find_by_id(oid)
565 .select_only()
566 .column(streaming_job::Column::JobStatus)
567 .into_tuple::<JobStatus>()
568 .one(db)
569 .await?,
570 Some(JobStatus::Created)
571 ) {
572 Err(MetaError::catalog_under_creation($obj_type.as_str(), name))
573 } else {
574 Err(MetaError::catalog_duplicated($obj_type.as_str(), name))
575 };
576 }
577 };
578 }
579 check_duplicated!(ObjectType::Table, Table, table);
580 check_duplicated!(ObjectType::Source, Source, source);
581 check_duplicated!(ObjectType::Sink, Sink, sink);
582 check_duplicated!(ObjectType::Index, Index, index);
583 check_duplicated!(ObjectType::View, View, view);
584
585 Ok(())
586}
587
588pub async fn check_schema_name_duplicate<C>(
590 name: &str,
591 database_id: DatabaseId,
592 db: &C,
593) -> MetaResult<()>
594where
595 C: ConnectionTrait,
596{
597 let count = Object::find()
598 .inner_join(Schema)
599 .filter(
600 object::Column::ObjType
601 .eq(ObjectType::Schema)
602 .and(object::Column::DatabaseId.eq(Some(database_id)))
603 .and(schema::Column::Name.eq(name)),
604 )
605 .count(db)
606 .await?;
607 if count != 0 {
608 return Err(MetaError::catalog_duplicated("schema", name));
609 }
610
611 Ok(())
612}
613
614pub async fn ensure_object_not_refer<C>(
616 object_type: ObjectType,
617 object_id: ObjectId,
618 db: &C,
619) -> MetaResult<()>
620where
621 C: ConnectionTrait,
622{
623 let count = if object_type == ObjectType::Table {
625 ObjectDependency::find()
626 .join(
627 JoinType::InnerJoin,
628 object_dependency::Relation::Object1.def(),
629 )
630 .filter(
631 object_dependency::Column::Oid
632 .eq(object_id)
633 .and(object::Column::ObjType.ne(ObjectType::Index)),
634 )
635 .count(db)
636 .await?
637 } else {
638 ObjectDependency::find()
639 .filter(object_dependency::Column::Oid.eq(object_id))
640 .count(db)
641 .await?
642 };
643 if count != 0 {
644 return Err(MetaError::permission_denied(format!(
645 "{} used by {} other objects.",
646 object_type.as_str(),
647 count
648 )));
649 }
650 Ok(())
651}
652
653pub async fn get_referring_objects<C>(object_id: ObjectId, db: &C) -> MetaResult<Vec<PartialObject>>
655where
656 C: ConnectionTrait,
657{
658 let objs = ObjectDependency::find()
659 .filter(object_dependency::Column::Oid.eq(object_id))
660 .join(
661 JoinType::InnerJoin,
662 object_dependency::Relation::Object1.def(),
663 )
664 .into_partial_model()
665 .all(db)
666 .await?;
667
668 Ok(objs)
669}
670
671pub async fn ensure_schema_empty<C>(schema_id: SchemaId, db: &C) -> MetaResult<()>
673where
674 C: ConnectionTrait,
675{
676 let count = Object::find()
677 .filter(object::Column::SchemaId.eq(Some(schema_id)))
678 .count(db)
679 .await?;
680 if count != 0 {
681 return Err(MetaError::permission_denied("schema is not empty"));
682 }
683
684 Ok(())
685}
686
687pub async fn list_user_info_by_ids<C>(user_ids: Vec<UserId>, db: &C) -> MetaResult<Vec<PbUserInfo>>
689where
690 C: ConnectionTrait,
691{
692 let mut user_infos = vec![];
693 for user_id in user_ids {
694 let user = User::find_by_id(user_id)
695 .one(db)
696 .await?
697 .ok_or_else(|| MetaError::catalog_id_not_found("user", user_id))?;
698 let mut user_info: PbUserInfo = user.into();
699 user_info.grant_privileges = get_user_privilege(user_id, db).await?;
700 user_infos.push(user_info);
701 }
702 Ok(user_infos)
703}
704
705pub async fn get_object_owner<C>(object_id: ObjectId, db: &C) -> MetaResult<UserId>
707where
708 C: ConnectionTrait,
709{
710 let obj_owner: UserId = Object::find_by_id(object_id)
711 .select_only()
712 .column(object::Column::OwnerId)
713 .into_tuple()
714 .one(db)
715 .await?
716 .ok_or_else(|| MetaError::catalog_id_not_found("object", object_id))?;
717 Ok(obj_owner)
718}
719
720pub fn construct_privilege_dependency_query(ids: Vec<PrivilegeId>) -> WithQuery {
745 let cte_alias = Alias::new("granted_privilege_ids");
746 let cte_return_privilege_alias = Alias::new("id");
747 let cte_return_user_alias = Alias::new("user_id");
748
749 let mut base_query = SelectStatement::new()
750 .columns([user_privilege::Column::Id, user_privilege::Column::UserId])
751 .from(UserPrivilege)
752 .and_where(user_privilege::Column::Id.is_in(ids))
753 .to_owned();
754
755 let cte_referencing = Query::select()
756 .columns([
757 (UserPrivilege, user_privilege::Column::Id),
758 (UserPrivilege, user_privilege::Column::UserId),
759 ])
760 .from(UserPrivilege)
761 .inner_join(
762 cte_alias.clone(),
763 Expr::col((cte_alias.clone(), cte_return_privilege_alias.clone()))
764 .equals(user_privilege::Column::DependentId),
765 )
766 .to_owned();
767
768 let common_table_expr = CommonTableExpression::new()
769 .query(base_query.union(UnionType::All, cte_referencing).to_owned())
770 .columns([
771 cte_return_privilege_alias.clone(),
772 cte_return_user_alias.clone(),
773 ])
774 .table_name(cte_alias.clone())
775 .to_owned();
776
777 SelectStatement::new()
778 .columns([cte_return_privilege_alias, cte_return_user_alias])
779 .from(cte_alias.clone())
780 .to_owned()
781 .with(
782 WithClause::new()
783 .recursive(true)
784 .cte(common_table_expr)
785 .to_owned(),
786 )
787 .to_owned()
788}
789
790pub async fn get_internal_tables_by_id<C>(job_id: ObjectId, db: &C) -> MetaResult<Vec<TableId>>
791where
792 C: ConnectionTrait,
793{
794 let table_ids: Vec<TableId> = Table::find()
795 .select_only()
796 .column(table::Column::TableId)
797 .filter(
798 table::Column::TableType
799 .eq(TableType::Internal)
800 .and(table::Column::BelongsToJobId.eq(job_id)),
801 )
802 .into_tuple()
803 .all(db)
804 .await?;
805 Ok(table_ids)
806}
807
808pub async fn get_index_state_tables_by_table_id<C>(
809 table_id: TableId,
810 db: &C,
811) -> MetaResult<Vec<TableId>>
812where
813 C: ConnectionTrait,
814{
815 let mut index_table_ids: Vec<TableId> = Index::find()
816 .select_only()
817 .column(index::Column::IndexTableId)
818 .filter(index::Column::PrimaryTableId.eq(table_id))
819 .into_tuple()
820 .all(db)
821 .await?;
822
823 if !index_table_ids.is_empty() {
824 let internal_table_ids: Vec<TableId> = Table::find()
825 .select_only()
826 .column(table::Column::TableId)
827 .filter(
828 table::Column::TableType
829 .eq(TableType::Internal)
830 .and(table::Column::BelongsToJobId.is_in(index_table_ids.clone())),
831 )
832 .into_tuple()
833 .all(db)
834 .await?;
835
836 index_table_ids.extend(internal_table_ids.into_iter());
837 }
838
839 Ok(index_table_ids)
840}
841
842#[derive(Clone, DerivePartialModel, FromQueryResult)]
843#[sea_orm(entity = "UserPrivilege")]
844pub struct PartialUserPrivilege {
845 pub id: PrivilegeId,
846 pub user_id: UserId,
847}
848
849pub async fn get_referring_privileges_cascade<C>(
850 ids: Vec<PrivilegeId>,
851 db: &C,
852) -> MetaResult<Vec<PartialUserPrivilege>>
853where
854 C: ConnectionTrait,
855{
856 let query = construct_privilege_dependency_query(ids);
857 let (sql, values) = query.build_any(&*db.get_database_backend().get_query_builder());
858 let privileges = PartialUserPrivilege::find_by_statement(Statement::from_sql_and_values(
859 db.get_database_backend(),
860 sql,
861 values,
862 ))
863 .all(db)
864 .await?;
865
866 Ok(privileges)
867}
868
869pub async fn ensure_privileges_not_referred<C>(ids: Vec<PrivilegeId>, db: &C) -> MetaResult<()>
871where
872 C: ConnectionTrait,
873{
874 let count = UserPrivilege::find()
875 .filter(user_privilege::Column::DependentId.is_in(ids))
876 .count(db)
877 .await?;
878 if count != 0 {
879 return Err(MetaError::permission_denied(format!(
880 "privileges granted to {} other ones.",
881 count
882 )));
883 }
884 Ok(())
885}
886
887pub async fn get_user_privilege<C>(user_id: UserId, db: &C) -> MetaResult<Vec<PbGrantPrivilege>>
889where
890 C: ConnectionTrait,
891{
892 let user_privileges = UserPrivilege::find()
893 .find_also_related(Object)
894 .filter(user_privilege::Column::UserId.eq(user_id))
895 .all(db)
896 .await?;
897 Ok(user_privileges
898 .into_iter()
899 .map(|(privilege, object)| {
900 let object = object.unwrap();
901 let oid = object.oid as _;
902 let obj = match object.obj_type {
903 ObjectType::Database => PbGrantObject::DatabaseId(oid),
904 ObjectType::Schema => PbGrantObject::SchemaId(oid),
905 ObjectType::Table | ObjectType::Index => PbGrantObject::TableId(oid),
906 ObjectType::Source => PbGrantObject::SourceId(oid),
907 ObjectType::Sink => PbGrantObject::SinkId(oid),
908 ObjectType::View => PbGrantObject::ViewId(oid),
909 ObjectType::Function => PbGrantObject::FunctionId(oid),
910 ObjectType::Connection => unreachable!("connection is not supported yet"),
911 ObjectType::Subscription => PbGrantObject::SubscriptionId(oid),
912 ObjectType::Secret => unreachable!("secret is not supported yet"),
913 };
914 PbGrantPrivilege {
915 action_with_opts: vec![PbActionWithGrantOption {
916 action: PbAction::from(privilege.action) as _,
917 with_grant_option: privilege.with_grant_option,
918 granted_by: privilege.granted_by as _,
919 }],
920 object: Some(obj),
921 }
922 })
923 .collect())
924}
925
926pub fn extract_grant_obj_id(object: &PbGrantObject) -> ObjectId {
928 match object {
929 PbGrantObject::DatabaseId(id)
930 | PbGrantObject::SchemaId(id)
931 | PbGrantObject::TableId(id)
932 | PbGrantObject::SourceId(id)
933 | PbGrantObject::SinkId(id)
934 | PbGrantObject::ViewId(id)
935 | PbGrantObject::FunctionId(id)
936 | PbGrantObject::SubscriptionId(id)
937 | PbGrantObject::ConnectionId(id)
938 | PbGrantObject::SecretId(id) => *id as _,
939 }
940}
941
942pub async fn insert_fragment_relations(
943 db: &impl ConnectionTrait,
944 downstream_fragment_relations: &FragmentDownstreamRelation,
945) -> MetaResult<()> {
946 for (upstream_fragment_id, downstreams) in downstream_fragment_relations {
947 for downstream in downstreams {
948 let relation = fragment_relation::Model {
949 source_fragment_id: *upstream_fragment_id as _,
950 target_fragment_id: downstream.downstream_fragment_id as _,
951 dispatcher_type: downstream.dispatcher_type,
952 dist_key_indices: downstream
953 .dist_key_indices
954 .iter()
955 .map(|idx| *idx as i32)
956 .collect_vec()
957 .into(),
958 output_indices: downstream
959 .output_indices
960 .iter()
961 .map(|idx| *idx as i32)
962 .collect_vec()
963 .into(),
964 };
965 FragmentRelation::insert(relation.into_active_model())
966 .exec(db)
967 .await?;
968 }
969 }
970 Ok(())
971}
972
973pub async fn get_fragment_actor_dispatchers<C>(
974 db: &C,
975 fragment_ids: Vec<FragmentId>,
976) -> MetaResult<FragmentActorDispatchers>
977where
978 C: ConnectionTrait,
979{
980 type FragmentActorInfo = (
981 DistributionType,
982 Arc<HashMap<crate::model::ActorId, Option<Bitmap>>>,
983 );
984 let mut fragment_actor_cache: HashMap<FragmentId, FragmentActorInfo> = HashMap::new();
985 let get_fragment_actors = |fragment_id: FragmentId| async move {
986 let result: MetaResult<FragmentActorInfo> = try {
987 let mut fragment_actors = Fragment::find_by_id(fragment_id)
988 .find_with_related(Actor)
989 .filter(actor::Column::Status.eq(ActorStatus::Running))
990 .all(db)
991 .await?;
992 if fragment_actors.is_empty() {
993 return Err(anyhow!("failed to find fragment: {}", fragment_id).into());
994 }
995 assert_eq!(
996 fragment_actors.len(),
997 1,
998 "find multiple fragment {:?}",
999 fragment_actors
1000 );
1001 let (fragment, actors) = fragment_actors.pop().unwrap();
1002 (
1003 fragment.distribution_type,
1004 Arc::new(
1005 actors
1006 .into_iter()
1007 .map(|actor| {
1008 (
1009 actor.actor_id as _,
1010 actor
1011 .vnode_bitmap
1012 .map(|bitmap| Bitmap::from(bitmap.to_protobuf())),
1013 )
1014 })
1015 .collect(),
1016 ),
1017 )
1018 };
1019 result
1020 };
1021 let fragment_relations = FragmentRelation::find()
1022 .filter(fragment_relation::Column::SourceFragmentId.is_in(fragment_ids))
1023 .all(db)
1024 .await?;
1025
1026 let mut actor_dispatchers_map: HashMap<_, HashMap<_, Vec<_>>> = HashMap::new();
1027 for fragment_relation::Model {
1028 source_fragment_id,
1029 target_fragment_id,
1030 dispatcher_type,
1031 dist_key_indices,
1032 output_indices,
1033 } in fragment_relations
1034 {
1035 let (source_fragment_distribution, source_fragment_actors) = {
1036 let (distribution, actors) = {
1037 match fragment_actor_cache.entry(source_fragment_id) {
1038 Entry::Occupied(entry) => entry.into_mut(),
1039 Entry::Vacant(entry) => {
1040 entry.insert(get_fragment_actors(source_fragment_id).await?)
1041 }
1042 }
1043 };
1044 (*distribution, actors.clone())
1045 };
1046 let (target_fragment_distribution, target_fragment_actors) = {
1047 let (distribution, actors) = {
1048 match fragment_actor_cache.entry(target_fragment_id) {
1049 Entry::Occupied(entry) => entry.into_mut(),
1050 Entry::Vacant(entry) => {
1051 entry.insert(get_fragment_actors(target_fragment_id).await?)
1052 }
1053 }
1054 };
1055 (*distribution, actors.clone())
1056 };
1057 let dispatchers = compose_dispatchers(
1058 source_fragment_distribution,
1059 &source_fragment_actors,
1060 target_fragment_id as _,
1061 target_fragment_distribution,
1062 &target_fragment_actors,
1063 dispatcher_type,
1064 dist_key_indices.into_u32_array(),
1065 output_indices.into_u32_array(),
1066 );
1067 let actor_dispatchers_map = actor_dispatchers_map
1068 .entry(source_fragment_id as _)
1069 .or_default();
1070 for (actor_id, dispatchers) in dispatchers {
1071 actor_dispatchers_map
1072 .entry(actor_id as _)
1073 .or_default()
1074 .push(dispatchers);
1075 }
1076 }
1077 Ok(actor_dispatchers_map)
1078}
1079
1080pub fn compose_dispatchers(
1081 source_fragment_distribution: DistributionType,
1082 source_fragment_actors: &HashMap<crate::model::ActorId, Option<Bitmap>>,
1083 target_fragment_id: crate::model::FragmentId,
1084 target_fragment_distribution: DistributionType,
1085 target_fragment_actors: &HashMap<crate::model::ActorId, Option<Bitmap>>,
1086 dispatcher_type: DispatcherType,
1087 dist_key_indices: Vec<u32>,
1088 output_indices: Vec<u32>,
1089) -> HashMap<crate::model::ActorId, PbDispatcher> {
1090 match dispatcher_type {
1091 DispatcherType::Hash => {
1092 let dispatcher = PbDispatcher {
1093 r#type: PbDispatcherType::from(dispatcher_type) as _,
1094 dist_key_indices: dist_key_indices.clone(),
1095 output_indices: output_indices.clone(),
1096 hash_mapping: Some(
1097 ActorMapping::from_bitmaps(
1098 &target_fragment_actors
1099 .iter()
1100 .map(|(actor_id, bitmap)| {
1101 (
1102 *actor_id as _,
1103 bitmap
1104 .clone()
1105 .expect("downstream hash dispatch must have distribution"),
1106 )
1107 })
1108 .collect(),
1109 )
1110 .to_protobuf(),
1111 ),
1112 dispatcher_id: target_fragment_id as _,
1113 downstream_actor_id: target_fragment_actors
1114 .keys()
1115 .map(|actor_id| *actor_id as _)
1116 .collect(),
1117 };
1118 source_fragment_actors
1119 .keys()
1120 .map(|source_actor_id| (*source_actor_id, dispatcher.clone()))
1121 .collect()
1122 }
1123 DispatcherType::Broadcast | DispatcherType::Simple => {
1124 let dispatcher = PbDispatcher {
1125 r#type: PbDispatcherType::from(dispatcher_type) as _,
1126 dist_key_indices: dist_key_indices.clone(),
1127 output_indices: output_indices.clone(),
1128 hash_mapping: None,
1129 dispatcher_id: target_fragment_id as _,
1130 downstream_actor_id: target_fragment_actors
1131 .keys()
1132 .map(|actor_id| *actor_id as _)
1133 .collect(),
1134 };
1135 source_fragment_actors
1136 .keys()
1137 .map(|source_actor_id| (*source_actor_id, dispatcher.clone()))
1138 .collect()
1139 }
1140 DispatcherType::NoShuffle => resolve_no_shuffle_actor_dispatcher(
1141 source_fragment_distribution,
1142 source_fragment_actors,
1143 target_fragment_distribution,
1144 target_fragment_actors,
1145 )
1146 .into_iter()
1147 .map(|(upstream_actor_id, downstream_actor_id)| {
1148 (
1149 upstream_actor_id,
1150 PbDispatcher {
1151 r#type: PbDispatcherType::NoShuffle as _,
1152 dist_key_indices: dist_key_indices.clone(),
1153 output_indices: output_indices.clone(),
1154 hash_mapping: None,
1155 dispatcher_id: target_fragment_id as _,
1156 downstream_actor_id: vec![downstream_actor_id as _],
1157 },
1158 )
1159 })
1160 .collect(),
1161 }
1162}
1163
1164pub fn resolve_no_shuffle_actor_dispatcher(
1166 source_fragment_distribution: DistributionType,
1167 source_fragment_actors: &HashMap<crate::model::ActorId, Option<Bitmap>>,
1168 target_fragment_distribution: DistributionType,
1169 target_fragment_actors: &HashMap<crate::model::ActorId, Option<Bitmap>>,
1170) -> Vec<(crate::model::ActorId, crate::model::ActorId)> {
1171 assert_eq!(source_fragment_distribution, target_fragment_distribution);
1172 assert_eq!(
1173 source_fragment_actors.len(),
1174 target_fragment_actors.len(),
1175 "no-shuffle should have equal upstream downstream actor count: {:?} {:?}",
1176 source_fragment_actors,
1177 target_fragment_actors
1178 );
1179 match source_fragment_distribution {
1180 DistributionType::Single => {
1181 let assert_singleton = |bitmap: &Option<Bitmap>| {
1182 assert!(
1183 bitmap.as_ref().map(|bitmap| bitmap.all()).unwrap_or(true),
1184 "not singleton: {:?}",
1185 bitmap
1186 );
1187 };
1188 assert_eq!(
1189 source_fragment_actors.len(),
1190 1,
1191 "singleton distribution actor count not 1: {:?}",
1192 source_fragment_distribution
1193 );
1194 assert_eq!(
1195 target_fragment_actors.len(),
1196 1,
1197 "singleton distribution actor count not 1: {:?}",
1198 target_fragment_distribution
1199 );
1200 let (source_actor_id, bitmap) = source_fragment_actors.iter().next().unwrap();
1201 assert_singleton(bitmap);
1202 let (target_actor_id, bitmap) = target_fragment_actors.iter().next().unwrap();
1203 assert_singleton(bitmap);
1204 vec![(*source_actor_id, *target_actor_id)]
1205 }
1206 DistributionType::Hash => {
1207 let mut target_fragment_actor_index: HashMap<_, _> = target_fragment_actors
1208 .iter()
1209 .map(|(actor_id, bitmap)| {
1210 let bitmap = bitmap
1211 .as_ref()
1212 .expect("hash distribution should have bitmap");
1213 let first_vnode = bitmap.iter_vnodes().next().expect("non-empty bitmap");
1214 (first_vnode, (*actor_id, bitmap))
1215 })
1216 .collect();
1217 source_fragment_actors
1218 .iter()
1219 .map(|(source_actor_id, bitmap)| {
1220 let bitmap = bitmap
1221 .as_ref()
1222 .expect("hash distribution should have bitmap");
1223 let first_vnode = bitmap.iter_vnodes().next().expect("non-empty bitmap");
1224 let (target_actor_id, target_bitmap) =
1225 target_fragment_actor_index.remove(&first_vnode).unwrap_or_else(|| {
1226 panic!(
1227 "cannot find matched target actor: {} {:?} {:?} {:?}",
1228 source_actor_id,
1229 first_vnode,
1230 source_fragment_actors,
1231 target_fragment_actors
1232 );
1233 });
1234 assert_eq!(
1235 bitmap,
1236 target_bitmap,
1237 "cannot find matched target actor due to bitmap mismatch: {} {:?} {:?} {:?}",
1238 source_actor_id,
1239 first_vnode,
1240 source_fragment_actors,
1241 target_fragment_actors
1242 );
1243 (*source_actor_id, target_actor_id)
1244 }).collect()
1245 }
1246 }
1247}
1248
1249pub async fn get_fragment_mappings<C>(
1251 db: &C,
1252 job_id: ObjectId,
1253) -> MetaResult<Vec<PbFragmentWorkerSlotMapping>>
1254where
1255 C: ConnectionTrait,
1256{
1257 let job_actors: Vec<(
1258 FragmentId,
1259 DistributionType,
1260 ActorId,
1261 Option<VnodeBitmap>,
1262 WorkerId,
1263 ActorStatus,
1264 )> = Actor::find()
1265 .select_only()
1266 .columns([
1267 fragment::Column::FragmentId,
1268 fragment::Column::DistributionType,
1269 ])
1270 .columns([
1271 actor::Column::ActorId,
1272 actor::Column::VnodeBitmap,
1273 actor::Column::WorkerId,
1274 actor::Column::Status,
1275 ])
1276 .join(JoinType::InnerJoin, actor::Relation::Fragment.def())
1277 .filter(fragment::Column::JobId.eq(job_id))
1278 .into_tuple()
1279 .all(db)
1280 .await?;
1281
1282 Ok(rebuild_fragment_mapping_from_actors(job_actors))
1283}
1284
1285pub fn rebuild_fragment_mapping_from_actors(
1286 job_actors: Vec<(
1287 FragmentId,
1288 DistributionType,
1289 ActorId,
1290 Option<VnodeBitmap>,
1291 WorkerId,
1292 ActorStatus,
1293 )>,
1294) -> Vec<FragmentWorkerSlotMapping> {
1295 let mut all_actor_locations = HashMap::new();
1296 let mut actor_bitmaps = HashMap::new();
1297 let mut fragment_actors = HashMap::new();
1298 let mut fragment_dist = HashMap::new();
1299
1300 for (fragment_id, dist, actor_id, bitmap, worker_id, actor_status) in job_actors {
1301 if actor_status == ActorStatus::Inactive {
1302 continue;
1303 }
1304
1305 all_actor_locations
1306 .entry(fragment_id)
1307 .or_insert(HashMap::new())
1308 .insert(actor_id as hash::ActorId, worker_id as u32);
1309 actor_bitmaps.insert(actor_id, bitmap);
1310 fragment_actors
1311 .entry(fragment_id)
1312 .or_insert_with(Vec::new)
1313 .push(actor_id);
1314 fragment_dist.insert(fragment_id, dist);
1315 }
1316
1317 let mut result = vec![];
1318 for (fragment_id, dist) in fragment_dist {
1319 let mut actor_locations = all_actor_locations.remove(&fragment_id).unwrap();
1320 let fragment_worker_slot_mapping = match dist {
1321 DistributionType::Single => {
1322 let actor = fragment_actors
1323 .remove(&fragment_id)
1324 .unwrap()
1325 .into_iter()
1326 .exactly_one()
1327 .unwrap() as hash::ActorId;
1328 let actor_location = actor_locations.remove(&actor).unwrap();
1329
1330 WorkerSlotMapping::new_single(WorkerSlotId::new(actor_location, 0))
1331 }
1332 DistributionType::Hash => {
1333 let actors = fragment_actors.remove(&fragment_id).unwrap();
1334
1335 let all_actor_bitmaps: HashMap<_, _> = actors
1336 .iter()
1337 .map(|actor_id| {
1338 let vnode_bitmap = actor_bitmaps
1339 .remove(actor_id)
1340 .flatten()
1341 .expect("actor bitmap shouldn't be none in hash fragment");
1342
1343 let bitmap = Bitmap::from(&vnode_bitmap.to_protobuf());
1344 (*actor_id as hash::ActorId, bitmap)
1345 })
1346 .collect();
1347
1348 let actor_mapping = ActorMapping::from_bitmaps(&all_actor_bitmaps);
1349
1350 actor_mapping.to_worker_slot(&actor_locations)
1351 }
1352 };
1353
1354 result.push(PbFragmentWorkerSlotMapping {
1355 fragment_id: fragment_id as u32,
1356 mapping: Some(fragment_worker_slot_mapping.to_protobuf()),
1357 })
1358 }
1359 result
1360}
1361
1362pub async fn get_fragment_actor_ids<C>(
1364 db: &C,
1365 fragment_ids: Vec<FragmentId>,
1366) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>>
1367where
1368 C: ConnectionTrait,
1369{
1370 let fragment_actors: Vec<(FragmentId, ActorId)> = Actor::find()
1371 .select_only()
1372 .columns([actor::Column::FragmentId, actor::Column::ActorId])
1373 .filter(actor::Column::FragmentId.is_in(fragment_ids))
1374 .into_tuple()
1375 .all(db)
1376 .await?;
1377
1378 Ok(fragment_actors.into_iter().into_group_map())
1379}
1380
1381pub async fn get_fragments_for_jobs<C>(
1386 db: &C,
1387 streaming_jobs: Vec<ObjectId>,
1388) -> MetaResult<(
1389 HashMap<SourceId, BTreeSet<FragmentId>>,
1390 HashSet<ActorId>,
1391 HashSet<FragmentId>,
1392)>
1393where
1394 C: ConnectionTrait,
1395{
1396 if streaming_jobs.is_empty() {
1397 return Ok((HashMap::default(), HashSet::default(), HashSet::default()));
1398 }
1399
1400 let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find()
1401 .select_only()
1402 .columns([
1403 fragment::Column::FragmentId,
1404 fragment::Column::FragmentTypeMask,
1405 fragment::Column::StreamNode,
1406 ])
1407 .filter(fragment::Column::JobId.is_in(streaming_jobs))
1408 .into_tuple()
1409 .all(db)
1410 .await?;
1411 let actors: Vec<ActorId> = Actor::find()
1412 .select_only()
1413 .column(actor::Column::ActorId)
1414 .filter(
1415 actor::Column::FragmentId.is_in(fragments.iter().map(|(id, _, _)| *id).collect_vec()),
1416 )
1417 .into_tuple()
1418 .all(db)
1419 .await?;
1420
1421 let fragment_ids = fragments
1422 .iter()
1423 .map(|(fragment_id, _, _)| *fragment_id)
1424 .collect();
1425
1426 let mut source_fragment_ids: HashMap<SourceId, BTreeSet<FragmentId>> = HashMap::new();
1427 for (fragment_id, mask, stream_node) in fragments {
1428 if mask & PbFragmentTypeFlag::Source as i32 == 0 {
1429 continue;
1430 }
1431 if let Some(source_id) = stream_node.to_protobuf().find_stream_source() {
1432 source_fragment_ids
1433 .entry(source_id as _)
1434 .or_default()
1435 .insert(fragment_id);
1436 }
1437 }
1438
1439 Ok((
1440 source_fragment_ids,
1441 actors.into_iter().collect(),
1442 fragment_ids,
1443 ))
1444}
1445
1446pub(crate) fn build_object_group_for_delete(
1451 partial_objects: Vec<PartialObject>,
1452) -> NotificationInfo {
1453 let mut objects = vec![];
1454 for obj in partial_objects {
1455 match obj.obj_type {
1456 ObjectType::Database => objects.push(PbObject {
1457 object_info: Some(PbObjectInfo::Database(PbDatabase {
1458 id: obj.oid as _,
1459 ..Default::default()
1460 })),
1461 }),
1462 ObjectType::Schema => objects.push(PbObject {
1463 object_info: Some(PbObjectInfo::Schema(PbSchema {
1464 id: obj.oid as _,
1465 database_id: obj.database_id.unwrap() as _,
1466 ..Default::default()
1467 })),
1468 }),
1469 ObjectType::Table => objects.push(PbObject {
1470 object_info: Some(PbObjectInfo::Table(PbTable {
1471 id: obj.oid as _,
1472 schema_id: obj.schema_id.unwrap() as _,
1473 database_id: obj.database_id.unwrap() as _,
1474 ..Default::default()
1475 })),
1476 }),
1477 ObjectType::Source => objects.push(PbObject {
1478 object_info: Some(PbObjectInfo::Source(PbSource {
1479 id: obj.oid as _,
1480 schema_id: obj.schema_id.unwrap() as _,
1481 database_id: obj.database_id.unwrap() as _,
1482 ..Default::default()
1483 })),
1484 }),
1485 ObjectType::Sink => objects.push(PbObject {
1486 object_info: Some(PbObjectInfo::Sink(PbSink {
1487 id: obj.oid as _,
1488 schema_id: obj.schema_id.unwrap() as _,
1489 database_id: obj.database_id.unwrap() as _,
1490 ..Default::default()
1491 })),
1492 }),
1493 ObjectType::Subscription => objects.push(PbObject {
1494 object_info: Some(PbObjectInfo::Subscription(PbSubscription {
1495 id: obj.oid as _,
1496 schema_id: obj.schema_id.unwrap() as _,
1497 database_id: obj.database_id.unwrap() as _,
1498 ..Default::default()
1499 })),
1500 }),
1501 ObjectType::View => objects.push(PbObject {
1502 object_info: Some(PbObjectInfo::View(PbView {
1503 id: obj.oid as _,
1504 schema_id: obj.schema_id.unwrap() as _,
1505 database_id: obj.database_id.unwrap() as _,
1506 ..Default::default()
1507 })),
1508 }),
1509 ObjectType::Index => {
1510 objects.push(PbObject {
1511 object_info: Some(PbObjectInfo::Index(PbIndex {
1512 id: obj.oid as _,
1513 schema_id: obj.schema_id.unwrap() as _,
1514 database_id: obj.database_id.unwrap() as _,
1515 ..Default::default()
1516 })),
1517 });
1518 objects.push(PbObject {
1519 object_info: Some(PbObjectInfo::Table(PbTable {
1520 id: obj.oid as _,
1521 schema_id: obj.schema_id.unwrap() as _,
1522 database_id: obj.database_id.unwrap() as _,
1523 ..Default::default()
1524 })),
1525 });
1526 }
1527 ObjectType::Function => objects.push(PbObject {
1528 object_info: Some(PbObjectInfo::Function(PbFunction {
1529 id: obj.oid as _,
1530 schema_id: obj.schema_id.unwrap() as _,
1531 database_id: obj.database_id.unwrap() as _,
1532 ..Default::default()
1533 })),
1534 }),
1535 ObjectType::Connection => objects.push(PbObject {
1536 object_info: Some(PbObjectInfo::Connection(PbConnection {
1537 id: obj.oid as _,
1538 schema_id: obj.schema_id.unwrap() as _,
1539 database_id: obj.database_id.unwrap() as _,
1540 ..Default::default()
1541 })),
1542 }),
1543 ObjectType::Secret => objects.push(PbObject {
1544 object_info: Some(PbObjectInfo::Secret(PbSecret {
1545 id: obj.oid as _,
1546 schema_id: obj.schema_id.unwrap() as _,
1547 database_id: obj.database_id.unwrap() as _,
1548 ..Default::default()
1549 })),
1550 }),
1551 }
1552 }
1553 NotificationInfo::ObjectGroup(PbObjectGroup { objects })
1554}
1555
1556pub fn extract_external_table_name_from_definition(table_definition: &str) -> Option<String> {
1557 let [mut definition]: [_; 1] = Parser::parse_sql(table_definition)
1558 .context("unable to parse table definition")
1559 .inspect_err(|e| {
1560 tracing::error!(
1561 target: "auto_schema_change",
1562 error = %e.as_report(),
1563 "failed to parse table definition")
1564 })
1565 .unwrap()
1566 .try_into()
1567 .unwrap();
1568 if let SqlStatement::CreateTable { cdc_table_info, .. } = &mut definition {
1569 cdc_table_info
1570 .clone()
1571 .map(|cdc_table_info| cdc_table_info.external_table_name)
1572 } else {
1573 None
1574 }
1575}
1576
1577pub async fn rename_relation(
1580 txn: &DatabaseTransaction,
1581 object_type: ObjectType,
1582 object_id: ObjectId,
1583 object_name: &str,
1584) -> MetaResult<(Vec<PbObject>, String)> {
1585 use sea_orm::ActiveModelTrait;
1586
1587 use crate::controller::rename::alter_relation_rename;
1588
1589 let mut to_update_relations = vec![];
1590 macro_rules! rename_relation {
1592 ($entity:ident, $table:ident, $identity:ident, $object_id:expr) => {{
1593 let (mut relation, obj) = $entity::find_by_id($object_id)
1594 .find_also_related(Object)
1595 .one(txn)
1596 .await?
1597 .unwrap();
1598 let obj = obj.unwrap();
1599 let old_name = relation.name.clone();
1600 relation.name = object_name.into();
1601 if obj.obj_type != ObjectType::View {
1602 relation.definition = alter_relation_rename(&relation.definition, object_name);
1603 }
1604 let active_model = $table::ActiveModel {
1605 $identity: Set(relation.$identity),
1606 name: Set(object_name.into()),
1607 definition: Set(relation.definition.clone()),
1608 ..Default::default()
1609 };
1610 active_model.update(txn).await?;
1611 to_update_relations.push(PbObject {
1612 object_info: Some(PbObjectInfo::$entity(ObjectModel(relation, obj).into())),
1613 });
1614 old_name
1615 }};
1616 }
1617 let old_name = match object_type {
1619 ObjectType::Table => rename_relation!(Table, table, table_id, object_id),
1620 ObjectType::Source => rename_relation!(Source, source, source_id, object_id),
1621 ObjectType::Sink => rename_relation!(Sink, sink, sink_id, object_id),
1622 ObjectType::Subscription => {
1623 rename_relation!(Subscription, subscription, subscription_id, object_id)
1624 }
1625 ObjectType::View => rename_relation!(View, view, view_id, object_id),
1626 ObjectType::Index => {
1627 let (mut index, obj) = Index::find_by_id(object_id)
1628 .find_also_related(Object)
1629 .one(txn)
1630 .await?
1631 .unwrap();
1632 index.name = object_name.into();
1633 let index_table_id = index.index_table_id;
1634 let old_name = rename_relation!(Table, table, table_id, index_table_id);
1635
1636 let active_model = index::ActiveModel {
1638 index_id: sea_orm::ActiveValue::Set(index.index_id),
1639 name: sea_orm::ActiveValue::Set(object_name.into()),
1640 ..Default::default()
1641 };
1642 active_model.update(txn).await?;
1643 to_update_relations.push(PbObject {
1644 object_info: Some(PbObjectInfo::Index(ObjectModel(index, obj.unwrap()).into())),
1645 });
1646 old_name
1647 }
1648 _ => unreachable!("only relation name can be altered."),
1649 };
1650
1651 Ok((to_update_relations, old_name))
1652}
1653
1654pub async fn get_database_resource_group<C>(txn: &C, database_id: ObjectId) -> MetaResult<String>
1655where
1656 C: ConnectionTrait,
1657{
1658 let database_resource_group: Option<String> = Database::find_by_id(database_id)
1659 .select_only()
1660 .column(database::Column::ResourceGroup)
1661 .into_tuple()
1662 .one(txn)
1663 .await?
1664 .ok_or_else(|| MetaError::catalog_id_not_found("database", database_id))?;
1665
1666 Ok(database_resource_group.unwrap_or_else(|| DEFAULT_RESOURCE_GROUP.to_owned()))
1667}
1668
1669pub async fn get_existing_job_resource_group<C>(
1670 txn: &C,
1671 streaming_job_id: ObjectId,
1672) -> MetaResult<String>
1673where
1674 C: ConnectionTrait,
1675{
1676 let (job_specific_resource_group, database_resource_group): (Option<String>, Option<String>) =
1677 StreamingJob::find_by_id(streaming_job_id)
1678 .select_only()
1679 .join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
1680 .join(JoinType::InnerJoin, object::Relation::Database2.def())
1681 .column(streaming_job::Column::SpecificResourceGroup)
1682 .column(database::Column::ResourceGroup)
1683 .into_tuple()
1684 .one(txn)
1685 .await?
1686 .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", streaming_job_id))?;
1687
1688 Ok(job_specific_resource_group.unwrap_or_else(|| {
1689 database_resource_group.unwrap_or_else(|| DEFAULT_RESOURCE_GROUP.to_owned())
1690 }))
1691}
1692
1693pub fn filter_workers_by_resource_group(
1694 workers: &HashMap<u32, WorkerNode>,
1695 resource_group: &str,
1696) -> BTreeSet<WorkerId> {
1697 workers
1698 .iter()
1699 .filter(|&(_, worker)| {
1700 worker
1701 .resource_group()
1702 .map(|node_label| node_label.as_str() == resource_group)
1703 .unwrap_or(false)
1704 })
1705 .map(|(id, _)| (*id as WorkerId))
1706 .collect()
1707}
1708
1709pub async fn rename_relation_refer(
1712 txn: &DatabaseTransaction,
1713 object_type: ObjectType,
1714 object_id: ObjectId,
1715 object_name: &str,
1716 old_name: &str,
1717) -> MetaResult<Vec<PbObject>> {
1718 use sea_orm::ActiveModelTrait;
1719
1720 use crate::controller::rename::alter_relation_rename_refs;
1721
1722 let mut to_update_relations = vec![];
1723 macro_rules! rename_relation_ref {
1724 ($entity:ident, $table:ident, $identity:ident, $object_id:expr) => {{
1725 let (mut relation, obj) = $entity::find_by_id($object_id)
1726 .find_also_related(Object)
1727 .one(txn)
1728 .await?
1729 .unwrap();
1730 relation.definition =
1731 alter_relation_rename_refs(&relation.definition, old_name, object_name);
1732 let active_model = $table::ActiveModel {
1733 $identity: Set(relation.$identity),
1734 definition: Set(relation.definition.clone()),
1735 ..Default::default()
1736 };
1737 active_model.update(txn).await?;
1738 to_update_relations.push(PbObject {
1739 object_info: Some(PbObjectInfo::$entity(
1740 ObjectModel(relation, obj.unwrap()).into(),
1741 )),
1742 });
1743 }};
1744 }
1745 let mut objs = get_referring_objects(object_id, txn).await?;
1746 if object_type == ObjectType::Table {
1747 let incoming_sinks: I32Array = Table::find_by_id(object_id)
1748 .select_only()
1749 .column(table::Column::IncomingSinks)
1750 .into_tuple()
1751 .one(txn)
1752 .await?
1753 .ok_or_else(|| MetaError::catalog_id_not_found("table", object_id))?;
1754
1755 objs.extend(
1756 incoming_sinks
1757 .into_inner()
1758 .into_iter()
1759 .map(|id| PartialObject {
1760 oid: id,
1761 obj_type: ObjectType::Sink,
1762 schema_id: None,
1763 database_id: None,
1764 }),
1765 );
1766 }
1767
1768 for obj in objs {
1769 match obj.obj_type {
1770 ObjectType::Table => rename_relation_ref!(Table, table, table_id, obj.oid),
1771 ObjectType::Sink => rename_relation_ref!(Sink, sink, sink_id, obj.oid),
1772 ObjectType::Subscription => {
1773 rename_relation_ref!(Subscription, subscription, subscription_id, obj.oid)
1774 }
1775 ObjectType::View => rename_relation_ref!(View, view, view_id, obj.oid),
1776 ObjectType::Index => {
1777 let index_table_id: Option<TableId> = Index::find_by_id(obj.oid)
1778 .select_only()
1779 .column(index::Column::IndexTableId)
1780 .into_tuple()
1781 .one(txn)
1782 .await?;
1783 rename_relation_ref!(Table, table, table_id, index_table_id.unwrap());
1784 }
1785 _ => {
1786 bail!("only table, sink, subscription, view and index depend on other objects.")
1787 }
1788 }
1789 }
1790
1791 Ok(to_update_relations)
1792}
1793
1794pub async fn validate_subscription_deletion<C>(txn: &C, subscription_id: ObjectId) -> MetaResult<()>
1798where
1799 C: ConnectionTrait,
1800{
1801 let upstream_table_id: ObjectId = Subscription::find_by_id(subscription_id)
1802 .select_only()
1803 .column(subscription::Column::DependentTableId)
1804 .into_tuple()
1805 .one(txn)
1806 .await?
1807 .ok_or_else(|| MetaError::catalog_id_not_found("subscription", subscription_id))?;
1808
1809 let cnt = Subscription::find()
1810 .filter(subscription::Column::DependentTableId.eq(upstream_table_id))
1811 .count(txn)
1812 .await?;
1813 if cnt > 1 {
1814 return Ok(());
1817 }
1818
1819 let obj_alias = Alias::new("o1");
1821 let used_by_alias = Alias::new("o2");
1822 let count = ObjectDependency::find()
1823 .join_as(
1824 JoinType::InnerJoin,
1825 object_dependency::Relation::Object2.def(),
1826 obj_alias.clone(),
1827 )
1828 .join_as(
1829 JoinType::InnerJoin,
1830 object_dependency::Relation::Object1.def(),
1831 used_by_alias.clone(),
1832 )
1833 .filter(
1834 object_dependency::Column::Oid
1835 .eq(upstream_table_id)
1836 .and(object_dependency::Column::UsedBy.ne(subscription_id))
1837 .and(
1838 Expr::col((obj_alias, object::Column::DatabaseId))
1839 .ne(Expr::col((used_by_alias, object::Column::DatabaseId))),
1840 ),
1841 )
1842 .count(txn)
1843 .await?;
1844
1845 if count != 0 {
1846 return Err(MetaError::permission_denied(format!(
1847 "Referenced by {} cross-db objects.",
1848 count
1849 )));
1850 }
1851
1852 Ok(())
1853}
1854
1855#[cfg(test)]
1856mod tests {
1857 use super::*;
1858
1859 #[test]
1860 fn test_extract_cdc_table_name() {
1861 let ddl1 = "CREATE TABLE t1 () FROM pg_source TABLE 'public.t1'";
1862 let ddl2 = "CREATE TABLE t2 (v1 int) FROM pg_source TABLE 'mydb.t2'";
1863 assert_eq!(
1864 extract_external_table_name_from_definition(ddl1),
1865 Some("public.t1".into())
1866 );
1867 assert_eq!(
1868 extract_external_table_name_from_definition(ddl2),
1869 Some("mydb.t2".into())
1870 );
1871 }
1872}