1use std::collections::hash_map::Entry;
16use std::collections::{BTreeSet, HashMap, HashSet};
17use std::sync::Arc;
18
19use anyhow::{Context, anyhow};
20use itertools::Itertools;
21use risingwave_common::bitmap::Bitmap;
22use risingwave_common::hash::{ActorMapping, VnodeBitmapExt, WorkerSlotId, WorkerSlotMapping};
23use risingwave_common::util::worker_util::DEFAULT_RESOURCE_GROUP;
24use risingwave_common::{bail, hash};
25use risingwave_meta_model::actor::ActorStatus;
26use risingwave_meta_model::fragment::DistributionType;
27use risingwave_meta_model::object::ObjectType;
28use risingwave_meta_model::prelude::*;
29use risingwave_meta_model::table::TableType;
30use risingwave_meta_model::{
31 ActorId, DataTypeArray, DatabaseId, DispatcherType, FragmentId, I32Array, JobStatus, ObjectId,
32 PrivilegeId, SchemaId, SourceId, StreamNode, StreamSourceInfo, TableId, UserId, VnodeBitmap,
33 WorkerId, actor, connection, database, fragment, fragment_relation, function, index, object,
34 object_dependency, schema, secret, sink, source, streaming_job, subscription, table, user,
35 user_privilege, view,
36};
37use risingwave_meta_model_migration::WithQuery;
38use risingwave_pb::catalog::{
39 PbConnection, PbDatabase, PbFunction, PbIndex, PbSchema, PbSecret, PbSink, PbSource,
40 PbSubscription, PbTable, PbView,
41};
42use risingwave_pb::common::WorkerNode;
43use risingwave_pb::meta::object::PbObjectInfo;
44use risingwave_pb::meta::subscribe_response::Info as NotificationInfo;
45use risingwave_pb::meta::{
46 FragmentWorkerSlotMapping, PbFragmentWorkerSlotMapping, PbObject, PbObjectGroup,
47};
48use risingwave_pb::stream_plan::{PbDispatcher, PbDispatcherType, PbFragmentTypeFlag};
49use risingwave_pb::user::grant_privilege::{
50 PbAction, PbActionWithGrantOption, PbObject as PbGrantObject,
51};
52use risingwave_pb::user::{PbGrantPrivilege, PbUserInfo};
53use risingwave_sqlparser::ast::Statement as SqlStatement;
54use risingwave_sqlparser::parser::Parser;
55use sea_orm::sea_query::{
56 Alias, CommonTableExpression, Expr, Query, QueryStatementBuilder, SelectStatement, UnionType,
57 WithClause,
58};
59use sea_orm::{
60 ColumnTrait, ConnectionTrait, DatabaseTransaction, DerivePartialModel, EntityTrait,
61 FromQueryResult, IntoActiveModel, JoinType, Order, PaginatorTrait, QueryFilter, QuerySelect,
62 RelationTrait, Set, Statement,
63};
64use thiserror_ext::AsReport;
65
66use crate::controller::ObjectModel;
67use crate::model::{FragmentActorDispatchers, FragmentDownstreamRelation};
68use crate::{MetaError, MetaResult};
69
70pub fn construct_obj_dependency_query(obj_id: ObjectId) -> WithQuery {
95 let cte_alias = Alias::new("used_by_object_ids");
96 let cte_return_alias = Alias::new("used_by");
97
98 let mut base_query = SelectStatement::new()
99 .column(object_dependency::Column::UsedBy)
100 .from(ObjectDependency)
101 .and_where(object_dependency::Column::Oid.eq(obj_id))
102 .to_owned();
103
104 let belonged_obj_query = SelectStatement::new()
105 .column(object::Column::Oid)
106 .from(Object)
107 .and_where(
108 object::Column::DatabaseId
109 .eq(obj_id)
110 .or(object::Column::SchemaId.eq(obj_id)),
111 )
112 .to_owned();
113
114 let cte_referencing = Query::select()
115 .column((ObjectDependency, object_dependency::Column::UsedBy))
116 .from(ObjectDependency)
117 .inner_join(
118 cte_alias.clone(),
119 Expr::col((cte_alias.clone(), cte_return_alias.clone()))
120 .equals(object_dependency::Column::Oid),
121 )
122 .to_owned();
123
124 let common_table_expr = CommonTableExpression::new()
125 .query(
126 base_query
127 .union(UnionType::All, belonged_obj_query)
128 .union(UnionType::All, cte_referencing)
129 .to_owned(),
130 )
131 .column(cte_return_alias.clone())
132 .table_name(cte_alias.clone())
133 .to_owned();
134
135 SelectStatement::new()
136 .distinct()
137 .columns([
138 object::Column::Oid,
139 object::Column::ObjType,
140 object::Column::SchemaId,
141 object::Column::DatabaseId,
142 ])
143 .from(cte_alias.clone())
144 .inner_join(
145 Object,
146 Expr::col((cte_alias, cte_return_alias.clone())).equals(object::Column::Oid),
147 )
148 .order_by(object::Column::Oid, Order::Desc)
149 .to_owned()
150 .with(
151 WithClause::new()
152 .recursive(true)
153 .cte(common_table_expr)
154 .to_owned(),
155 )
156 .to_owned()
157}
158
159pub fn construct_sink_cycle_check_query(
184 target_table: ObjectId,
185 dependent_objects: Vec<ObjectId>,
186) -> WithQuery {
187 let cte_alias = Alias::new("used_by_object_ids_with_sink");
188 let depend_alias = Alias::new("obj_dependency_with_sink");
189
190 let mut base_query = SelectStatement::new()
191 .columns([
192 object_dependency::Column::Oid,
193 object_dependency::Column::UsedBy,
194 ])
195 .from(ObjectDependency)
196 .and_where(object_dependency::Column::Oid.eq(target_table))
197 .to_owned();
198
199 let query_sink_deps = SelectStatement::new()
200 .columns([sink::Column::SinkId, sink::Column::TargetTable])
201 .from(Sink)
202 .and_where(sink::Column::TargetTable.is_not_null())
203 .to_owned();
204
205 let cte_referencing = Query::select()
206 .column((depend_alias.clone(), object_dependency::Column::Oid))
207 .column((depend_alias.clone(), object_dependency::Column::UsedBy))
208 .from_subquery(
209 SelectStatement::new()
210 .columns([
211 object_dependency::Column::Oid,
212 object_dependency::Column::UsedBy,
213 ])
214 .from(ObjectDependency)
215 .union(UnionType::All, query_sink_deps)
216 .to_owned(),
217 depend_alias.clone(),
218 )
219 .inner_join(
220 cte_alias.clone(),
221 Expr::col((cte_alias.clone(), object_dependency::Column::UsedBy)).eq(Expr::col((
222 depend_alias.clone(),
223 object_dependency::Column::Oid,
224 ))),
225 )
226 .and_where(
227 Expr::col((cte_alias.clone(), object_dependency::Column::UsedBy)).ne(Expr::col((
228 cte_alias.clone(),
229 object_dependency::Column::Oid,
230 ))),
231 )
232 .to_owned();
233
234 let common_table_expr = CommonTableExpression::new()
235 .query(base_query.union(UnionType::All, cte_referencing).to_owned())
236 .columns([
237 object_dependency::Column::Oid,
238 object_dependency::Column::UsedBy,
239 ])
240 .table_name(cte_alias.clone())
241 .to_owned();
242
243 SelectStatement::new()
244 .expr(Expr::col((cte_alias.clone(), object_dependency::Column::UsedBy)).count())
245 .from(cte_alias.clone())
246 .and_where(
247 Expr::col((cte_alias.clone(), object_dependency::Column::UsedBy))
248 .is_in(dependent_objects),
249 )
250 .to_owned()
251 .with(
252 WithClause::new()
253 .recursive(true)
254 .cte(common_table_expr)
255 .to_owned(),
256 )
257 .to_owned()
258}
259
260#[derive(Clone, DerivePartialModel, FromQueryResult, Debug)]
261#[sea_orm(entity = "Object")]
262pub struct PartialObject {
263 pub oid: ObjectId,
264 pub obj_type: ObjectType,
265 pub schema_id: Option<SchemaId>,
266 pub database_id: Option<DatabaseId>,
267}
268
269#[derive(Clone, DerivePartialModel, FromQueryResult)]
270#[sea_orm(entity = "Fragment")]
271pub struct PartialFragmentStateTables {
272 pub fragment_id: FragmentId,
273 pub job_id: ObjectId,
274 pub state_table_ids: I32Array,
275}
276
277#[derive(Clone, DerivePartialModel, FromQueryResult)]
278#[sea_orm(entity = "Actor")]
279pub struct PartialActorLocation {
280 pub actor_id: ActorId,
281 pub fragment_id: FragmentId,
282 pub worker_id: WorkerId,
283 pub status: ActorStatus,
284}
285
286#[derive(FromQueryResult)]
287pub struct FragmentDesc {
288 pub fragment_id: FragmentId,
289 pub job_id: ObjectId,
290 pub fragment_type_mask: i32,
291 pub distribution_type: DistributionType,
292 pub state_table_ids: I32Array,
293 pub parallelism: i64,
294 pub vnode_count: i32,
295 pub stream_node: StreamNode,
296}
297
298pub async fn get_referring_objects_cascade<C>(
300 obj_id: ObjectId,
301 db: &C,
302) -> MetaResult<Vec<PartialObject>>
303where
304 C: ConnectionTrait,
305{
306 let query = construct_obj_dependency_query(obj_id);
307 let (sql, values) = query.build_any(&*db.get_database_backend().get_query_builder());
308 let objects = PartialObject::find_by_statement(Statement::from_sql_and_values(
309 db.get_database_backend(),
310 sql,
311 values,
312 ))
313 .all(db)
314 .await?;
315 Ok(objects)
316}
317
318pub async fn check_sink_into_table_cycle<C>(
320 target_table: ObjectId,
321 dependent_objs: Vec<ObjectId>,
322 db: &C,
323) -> MetaResult<bool>
324where
325 C: ConnectionTrait,
326{
327 if dependent_objs.is_empty() {
328 return Ok(false);
329 }
330
331 if dependent_objs.contains(&target_table) {
333 return Ok(true);
334 }
335
336 let query = construct_sink_cycle_check_query(target_table, dependent_objs);
337 let (sql, values) = query.build_any(&*db.get_database_backend().get_query_builder());
338
339 let res = db
340 .query_one(Statement::from_sql_and_values(
341 db.get_database_backend(),
342 sql,
343 values,
344 ))
345 .await?
346 .unwrap();
347
348 let cnt: i64 = res.try_get_by(0)?;
349
350 Ok(cnt != 0)
351}
352
353pub async fn ensure_object_id<C>(
355 object_type: ObjectType,
356 obj_id: ObjectId,
357 db: &C,
358) -> MetaResult<()>
359where
360 C: ConnectionTrait,
361{
362 let count = Object::find_by_id(obj_id).count(db).await?;
363 if count == 0 {
364 return Err(MetaError::catalog_id_not_found(
365 object_type.as_str(),
366 obj_id,
367 ));
368 }
369 Ok(())
370}
371
372pub async fn ensure_user_id<C>(user_id: UserId, db: &C) -> MetaResult<()>
374where
375 C: ConnectionTrait,
376{
377 let count = User::find_by_id(user_id).count(db).await?;
378 if count == 0 {
379 return Err(anyhow!("user {} was concurrently dropped", user_id).into());
380 }
381 Ok(())
382}
383
384pub async fn check_database_name_duplicate<C>(name: &str, db: &C) -> MetaResult<()>
386where
387 C: ConnectionTrait,
388{
389 let count = Database::find()
390 .filter(database::Column::Name.eq(name))
391 .count(db)
392 .await?;
393 if count > 0 {
394 assert_eq!(count, 1);
395 return Err(MetaError::catalog_duplicated("database", name));
396 }
397 Ok(())
398}
399
400pub async fn check_function_signature_duplicate<C>(
402 pb_function: &PbFunction,
403 db: &C,
404) -> MetaResult<()>
405where
406 C: ConnectionTrait,
407{
408 let count = Function::find()
409 .inner_join(Object)
410 .filter(
411 object::Column::DatabaseId
412 .eq(pb_function.database_id as DatabaseId)
413 .and(object::Column::SchemaId.eq(pb_function.schema_id as SchemaId))
414 .and(function::Column::Name.eq(&pb_function.name))
415 .and(
416 function::Column::ArgTypes
417 .eq(DataTypeArray::from(pb_function.arg_types.clone())),
418 ),
419 )
420 .count(db)
421 .await?;
422 if count > 0 {
423 assert_eq!(count, 1);
424 return Err(MetaError::catalog_duplicated("function", &pb_function.name));
425 }
426 Ok(())
427}
428
429pub async fn check_connection_name_duplicate<C>(
431 pb_connection: &PbConnection,
432 db: &C,
433) -> MetaResult<()>
434where
435 C: ConnectionTrait,
436{
437 let count = Connection::find()
438 .inner_join(Object)
439 .filter(
440 object::Column::DatabaseId
441 .eq(pb_connection.database_id as DatabaseId)
442 .and(object::Column::SchemaId.eq(pb_connection.schema_id as SchemaId))
443 .and(connection::Column::Name.eq(&pb_connection.name)),
444 )
445 .count(db)
446 .await?;
447 if count > 0 {
448 assert_eq!(count, 1);
449 return Err(MetaError::catalog_duplicated(
450 "connection",
451 &pb_connection.name,
452 ));
453 }
454 Ok(())
455}
456
457pub async fn check_secret_name_duplicate<C>(pb_secret: &PbSecret, db: &C) -> MetaResult<()>
458where
459 C: ConnectionTrait,
460{
461 let count = Secret::find()
462 .inner_join(Object)
463 .filter(
464 object::Column::DatabaseId
465 .eq(pb_secret.database_id as DatabaseId)
466 .and(object::Column::SchemaId.eq(pb_secret.schema_id as SchemaId))
467 .and(secret::Column::Name.eq(&pb_secret.name)),
468 )
469 .count(db)
470 .await?;
471 if count > 0 {
472 assert_eq!(count, 1);
473 return Err(MetaError::catalog_duplicated("secret", &pb_secret.name));
474 }
475 Ok(())
476}
477
478pub async fn check_subscription_name_duplicate<C>(
479 pb_subscription: &PbSubscription,
480 db: &C,
481) -> MetaResult<()>
482where
483 C: ConnectionTrait,
484{
485 let count = Subscription::find()
486 .inner_join(Object)
487 .filter(
488 object::Column::DatabaseId
489 .eq(pb_subscription.database_id as DatabaseId)
490 .and(object::Column::SchemaId.eq(pb_subscription.schema_id as SchemaId))
491 .and(subscription::Column::Name.eq(&pb_subscription.name)),
492 )
493 .count(db)
494 .await?;
495 if count > 0 {
496 assert_eq!(count, 1);
497 return Err(MetaError::catalog_duplicated(
498 "subscription",
499 &pb_subscription.name,
500 ));
501 }
502 Ok(())
503}
504
505pub async fn check_user_name_duplicate<C>(name: &str, db: &C) -> MetaResult<()>
507where
508 C: ConnectionTrait,
509{
510 let count = User::find()
511 .filter(user::Column::Name.eq(name))
512 .count(db)
513 .await?;
514 if count > 0 {
515 assert_eq!(count, 1);
516 return Err(MetaError::catalog_duplicated("user", name));
517 }
518 Ok(())
519}
520
521pub async fn check_relation_name_duplicate<C>(
523 name: &str,
524 database_id: DatabaseId,
525 schema_id: SchemaId,
526 db: &C,
527) -> MetaResult<()>
528where
529 C: ConnectionTrait,
530{
531 macro_rules! check_duplicated {
532 ($obj_type:expr, $entity:ident, $table:ident) => {
533 let object_id = Object::find()
534 .select_only()
535 .column(object::Column::Oid)
536 .inner_join($entity)
537 .filter(
538 object::Column::DatabaseId
539 .eq(Some(database_id))
540 .and(object::Column::SchemaId.eq(Some(schema_id)))
541 .and($table::Column::Name.eq(name)),
542 )
543 .into_tuple::<ObjectId>()
544 .one(db)
545 .await?;
546 if let Some(oid) = object_id {
547 let check_creation = if $obj_type == ObjectType::View {
548 false
549 } else if $obj_type == ObjectType::Source {
550 let source_info = Source::find_by_id(oid)
551 .select_only()
552 .column(source::Column::SourceInfo)
553 .into_tuple::<Option<StreamSourceInfo>>()
554 .one(db)
555 .await?
556 .unwrap();
557 source_info.map_or(false, |info| info.to_protobuf().is_shared())
558 } else {
559 true
560 };
561 return if check_creation
562 && !matches!(
563 StreamingJob::find_by_id(oid)
564 .select_only()
565 .column(streaming_job::Column::JobStatus)
566 .into_tuple::<JobStatus>()
567 .one(db)
568 .await?,
569 Some(JobStatus::Created)
570 ) {
571 Err(MetaError::catalog_under_creation($obj_type.as_str(), name))
572 } else {
573 Err(MetaError::catalog_duplicated($obj_type.as_str(), name))
574 };
575 }
576 };
577 }
578 check_duplicated!(ObjectType::Table, Table, table);
579 check_duplicated!(ObjectType::Source, Source, source);
580 check_duplicated!(ObjectType::Sink, Sink, sink);
581 check_duplicated!(ObjectType::Index, Index, index);
582 check_duplicated!(ObjectType::View, View, view);
583
584 Ok(())
585}
586
587pub async fn check_schema_name_duplicate<C>(
589 name: &str,
590 database_id: DatabaseId,
591 db: &C,
592) -> MetaResult<()>
593where
594 C: ConnectionTrait,
595{
596 let count = Object::find()
597 .inner_join(Schema)
598 .filter(
599 object::Column::ObjType
600 .eq(ObjectType::Schema)
601 .and(object::Column::DatabaseId.eq(Some(database_id)))
602 .and(schema::Column::Name.eq(name)),
603 )
604 .count(db)
605 .await?;
606 if count != 0 {
607 return Err(MetaError::catalog_duplicated("schema", name));
608 }
609
610 Ok(())
611}
612
613pub async fn ensure_object_not_refer<C>(
615 object_type: ObjectType,
616 object_id: ObjectId,
617 db: &C,
618) -> MetaResult<()>
619where
620 C: ConnectionTrait,
621{
622 let count = if object_type == ObjectType::Table {
624 ObjectDependency::find()
625 .join(
626 JoinType::InnerJoin,
627 object_dependency::Relation::Object1.def(),
628 )
629 .filter(
630 object_dependency::Column::Oid
631 .eq(object_id)
632 .and(object::Column::ObjType.ne(ObjectType::Index)),
633 )
634 .count(db)
635 .await?
636 } else {
637 ObjectDependency::find()
638 .filter(object_dependency::Column::Oid.eq(object_id))
639 .count(db)
640 .await?
641 };
642 if count != 0 {
643 return Err(MetaError::permission_denied(format!(
644 "{} used by {} other objects.",
645 object_type.as_str(),
646 count
647 )));
648 }
649 Ok(())
650}
651
652pub async fn get_referring_objects<C>(object_id: ObjectId, db: &C) -> MetaResult<Vec<PartialObject>>
654where
655 C: ConnectionTrait,
656{
657 let objs = ObjectDependency::find()
658 .filter(object_dependency::Column::Oid.eq(object_id))
659 .join(
660 JoinType::InnerJoin,
661 object_dependency::Relation::Object1.def(),
662 )
663 .into_partial_model()
664 .all(db)
665 .await?;
666
667 Ok(objs)
668}
669
670pub async fn ensure_schema_empty<C>(schema_id: SchemaId, db: &C) -> MetaResult<()>
672where
673 C: ConnectionTrait,
674{
675 let count = Object::find()
676 .filter(object::Column::SchemaId.eq(Some(schema_id)))
677 .count(db)
678 .await?;
679 if count != 0 {
680 return Err(MetaError::permission_denied("schema is not empty"));
681 }
682
683 Ok(())
684}
685
686pub async fn list_user_info_by_ids<C>(user_ids: Vec<UserId>, db: &C) -> MetaResult<Vec<PbUserInfo>>
688where
689 C: ConnectionTrait,
690{
691 let mut user_infos = vec![];
692 for user_id in user_ids {
693 let user = User::find_by_id(user_id)
694 .one(db)
695 .await?
696 .ok_or_else(|| MetaError::catalog_id_not_found("user", user_id))?;
697 let mut user_info: PbUserInfo = user.into();
698 user_info.grant_privileges = get_user_privilege(user_id, db).await?;
699 user_infos.push(user_info);
700 }
701 Ok(user_infos)
702}
703
704pub async fn get_object_owner<C>(object_id: ObjectId, db: &C) -> MetaResult<UserId>
706where
707 C: ConnectionTrait,
708{
709 let obj_owner: UserId = Object::find_by_id(object_id)
710 .select_only()
711 .column(object::Column::OwnerId)
712 .into_tuple()
713 .one(db)
714 .await?
715 .ok_or_else(|| MetaError::catalog_id_not_found("object", object_id))?;
716 Ok(obj_owner)
717}
718
719pub fn construct_privilege_dependency_query(ids: Vec<PrivilegeId>) -> WithQuery {
744 let cte_alias = Alias::new("granted_privilege_ids");
745 let cte_return_privilege_alias = Alias::new("id");
746 let cte_return_user_alias = Alias::new("user_id");
747
748 let mut base_query = SelectStatement::new()
749 .columns([user_privilege::Column::Id, user_privilege::Column::UserId])
750 .from(UserPrivilege)
751 .and_where(user_privilege::Column::Id.is_in(ids))
752 .to_owned();
753
754 let cte_referencing = Query::select()
755 .columns([
756 (UserPrivilege, user_privilege::Column::Id),
757 (UserPrivilege, user_privilege::Column::UserId),
758 ])
759 .from(UserPrivilege)
760 .inner_join(
761 cte_alias.clone(),
762 Expr::col((cte_alias.clone(), cte_return_privilege_alias.clone()))
763 .equals(user_privilege::Column::DependentId),
764 )
765 .to_owned();
766
767 let common_table_expr = CommonTableExpression::new()
768 .query(base_query.union(UnionType::All, cte_referencing).to_owned())
769 .columns([
770 cte_return_privilege_alias.clone(),
771 cte_return_user_alias.clone(),
772 ])
773 .table_name(cte_alias.clone())
774 .to_owned();
775
776 SelectStatement::new()
777 .columns([cte_return_privilege_alias, cte_return_user_alias])
778 .from(cte_alias.clone())
779 .to_owned()
780 .with(
781 WithClause::new()
782 .recursive(true)
783 .cte(common_table_expr)
784 .to_owned(),
785 )
786 .to_owned()
787}
788
789pub async fn get_internal_tables_by_id<C>(job_id: ObjectId, db: &C) -> MetaResult<Vec<TableId>>
790where
791 C: ConnectionTrait,
792{
793 let table_ids: Vec<TableId> = Table::find()
794 .select_only()
795 .column(table::Column::TableId)
796 .filter(
797 table::Column::TableType
798 .eq(TableType::Internal)
799 .and(table::Column::BelongsToJobId.eq(job_id)),
800 )
801 .into_tuple()
802 .all(db)
803 .await?;
804 Ok(table_ids)
805}
806
807pub async fn get_index_state_tables_by_table_id<C>(
808 table_id: TableId,
809 db: &C,
810) -> MetaResult<Vec<TableId>>
811where
812 C: ConnectionTrait,
813{
814 let mut index_table_ids: Vec<TableId> = Index::find()
815 .select_only()
816 .column(index::Column::IndexTableId)
817 .filter(index::Column::PrimaryTableId.eq(table_id))
818 .into_tuple()
819 .all(db)
820 .await?;
821
822 if !index_table_ids.is_empty() {
823 let internal_table_ids: Vec<TableId> = Table::find()
824 .select_only()
825 .column(table::Column::TableId)
826 .filter(
827 table::Column::TableType
828 .eq(TableType::Internal)
829 .and(table::Column::BelongsToJobId.is_in(index_table_ids.clone())),
830 )
831 .into_tuple()
832 .all(db)
833 .await?;
834
835 index_table_ids.extend(internal_table_ids.into_iter());
836 }
837
838 Ok(index_table_ids)
839}
840
841#[derive(Clone, DerivePartialModel, FromQueryResult)]
842#[sea_orm(entity = "UserPrivilege")]
843pub struct PartialUserPrivilege {
844 pub id: PrivilegeId,
845 pub user_id: UserId,
846}
847
848pub async fn get_referring_privileges_cascade<C>(
849 ids: Vec<PrivilegeId>,
850 db: &C,
851) -> MetaResult<Vec<PartialUserPrivilege>>
852where
853 C: ConnectionTrait,
854{
855 let query = construct_privilege_dependency_query(ids);
856 let (sql, values) = query.build_any(&*db.get_database_backend().get_query_builder());
857 let privileges = PartialUserPrivilege::find_by_statement(Statement::from_sql_and_values(
858 db.get_database_backend(),
859 sql,
860 values,
861 ))
862 .all(db)
863 .await?;
864
865 Ok(privileges)
866}
867
868pub async fn ensure_privileges_not_referred<C>(ids: Vec<PrivilegeId>, db: &C) -> MetaResult<()>
870where
871 C: ConnectionTrait,
872{
873 let count = UserPrivilege::find()
874 .filter(user_privilege::Column::DependentId.is_in(ids))
875 .count(db)
876 .await?;
877 if count != 0 {
878 return Err(MetaError::permission_denied(format!(
879 "privileges granted to {} other ones.",
880 count
881 )));
882 }
883 Ok(())
884}
885
886pub async fn get_user_privilege<C>(user_id: UserId, db: &C) -> MetaResult<Vec<PbGrantPrivilege>>
888where
889 C: ConnectionTrait,
890{
891 let user_privileges = UserPrivilege::find()
892 .find_also_related(Object)
893 .filter(user_privilege::Column::UserId.eq(user_id))
894 .all(db)
895 .await?;
896 Ok(user_privileges
897 .into_iter()
898 .map(|(privilege, object)| {
899 let object = object.unwrap();
900 let oid = object.oid as _;
901 let obj = match object.obj_type {
902 ObjectType::Database => PbGrantObject::DatabaseId(oid),
903 ObjectType::Schema => PbGrantObject::SchemaId(oid),
904 ObjectType::Table | ObjectType::Index => PbGrantObject::TableId(oid),
905 ObjectType::Source => PbGrantObject::SourceId(oid),
906 ObjectType::Sink => PbGrantObject::SinkId(oid),
907 ObjectType::View => PbGrantObject::ViewId(oid),
908 ObjectType::Function => PbGrantObject::FunctionId(oid),
909 ObjectType::Connection => unreachable!("connection is not supported yet"),
910 ObjectType::Subscription => PbGrantObject::SubscriptionId(oid),
911 ObjectType::Secret => unreachable!("secret is not supported yet"),
912 };
913 PbGrantPrivilege {
914 action_with_opts: vec![PbActionWithGrantOption {
915 action: PbAction::from(privilege.action) as _,
916 with_grant_option: privilege.with_grant_option,
917 granted_by: privilege.granted_by as _,
918 }],
919 object: Some(obj),
920 }
921 })
922 .collect())
923}
924
925pub fn extract_grant_obj_id(object: &PbGrantObject) -> ObjectId {
927 match object {
928 PbGrantObject::DatabaseId(id)
929 | PbGrantObject::SchemaId(id)
930 | PbGrantObject::TableId(id)
931 | PbGrantObject::SourceId(id)
932 | PbGrantObject::SinkId(id)
933 | PbGrantObject::ViewId(id)
934 | PbGrantObject::FunctionId(id)
935 | PbGrantObject::SubscriptionId(id)
936 | PbGrantObject::ConnectionId(id)
937 | PbGrantObject::SecretId(id) => *id as _,
938 }
939}
940
941pub async fn insert_fragment_relations(
942 db: &impl ConnectionTrait,
943 downstream_fragment_relations: &FragmentDownstreamRelation,
944) -> MetaResult<()> {
945 for (upstream_fragment_id, downstreams) in downstream_fragment_relations {
946 for downstream in downstreams {
947 let relation = fragment_relation::Model {
948 source_fragment_id: *upstream_fragment_id as _,
949 target_fragment_id: downstream.downstream_fragment_id as _,
950 dispatcher_type: downstream.dispatcher_type,
951 dist_key_indices: downstream
952 .dist_key_indices
953 .iter()
954 .map(|idx| *idx as i32)
955 .collect_vec()
956 .into(),
957 output_indices: downstream
958 .output_indices
959 .iter()
960 .map(|idx| *idx as i32)
961 .collect_vec()
962 .into(),
963 };
964 FragmentRelation::insert(relation.into_active_model())
965 .exec(db)
966 .await?;
967 }
968 }
969 Ok(())
970}
971
972pub async fn get_fragment_actor_dispatchers<C>(
973 db: &C,
974 fragment_ids: Vec<FragmentId>,
975) -> MetaResult<FragmentActorDispatchers>
976where
977 C: ConnectionTrait,
978{
979 type FragmentActorInfo = (
980 DistributionType,
981 Arc<HashMap<crate::model::ActorId, Option<Bitmap>>>,
982 );
983 let mut fragment_actor_cache: HashMap<FragmentId, FragmentActorInfo> = HashMap::new();
984 let get_fragment_actors = |fragment_id: FragmentId| async move {
985 let result: MetaResult<FragmentActorInfo> = try {
986 let mut fragment_actors = Fragment::find_by_id(fragment_id)
987 .find_with_related(Actor)
988 .filter(actor::Column::Status.eq(ActorStatus::Running))
989 .all(db)
990 .await?;
991 if fragment_actors.is_empty() {
992 return Err(anyhow!("failed to find fragment: {}", fragment_id).into());
993 }
994 assert_eq!(
995 fragment_actors.len(),
996 1,
997 "find multiple fragment {:?}",
998 fragment_actors
999 );
1000 let (fragment, actors) = fragment_actors.pop().unwrap();
1001 (
1002 fragment.distribution_type,
1003 Arc::new(
1004 actors
1005 .into_iter()
1006 .map(|actor| {
1007 (
1008 actor.actor_id as _,
1009 actor
1010 .vnode_bitmap
1011 .map(|bitmap| Bitmap::from(bitmap.to_protobuf())),
1012 )
1013 })
1014 .collect(),
1015 ),
1016 )
1017 };
1018 result
1019 };
1020 let fragment_relations = FragmentRelation::find()
1021 .filter(fragment_relation::Column::SourceFragmentId.is_in(fragment_ids))
1022 .all(db)
1023 .await?;
1024
1025 let mut actor_dispatchers_map: HashMap<_, HashMap<_, Vec<_>>> = HashMap::new();
1026 for fragment_relation::Model {
1027 source_fragment_id,
1028 target_fragment_id,
1029 dispatcher_type,
1030 dist_key_indices,
1031 output_indices,
1032 } in fragment_relations
1033 {
1034 let (source_fragment_distribution, source_fragment_actors) = {
1035 let (distribution, actors) = {
1036 match fragment_actor_cache.entry(source_fragment_id) {
1037 Entry::Occupied(entry) => entry.into_mut(),
1038 Entry::Vacant(entry) => {
1039 entry.insert(get_fragment_actors(source_fragment_id).await?)
1040 }
1041 }
1042 };
1043 (*distribution, actors.clone())
1044 };
1045 let (target_fragment_distribution, target_fragment_actors) = {
1046 let (distribution, actors) = {
1047 match fragment_actor_cache.entry(target_fragment_id) {
1048 Entry::Occupied(entry) => entry.into_mut(),
1049 Entry::Vacant(entry) => {
1050 entry.insert(get_fragment_actors(target_fragment_id).await?)
1051 }
1052 }
1053 };
1054 (*distribution, actors.clone())
1055 };
1056 let dispatchers = compose_dispatchers(
1057 source_fragment_distribution,
1058 &source_fragment_actors,
1059 target_fragment_id as _,
1060 target_fragment_distribution,
1061 &target_fragment_actors,
1062 dispatcher_type,
1063 dist_key_indices.into_u32_array(),
1064 output_indices.into_u32_array(),
1065 );
1066 let actor_dispatchers_map = actor_dispatchers_map
1067 .entry(source_fragment_id as _)
1068 .or_default();
1069 for (actor_id, dispatchers) in dispatchers {
1070 actor_dispatchers_map
1071 .entry(actor_id as _)
1072 .or_default()
1073 .push(dispatchers);
1074 }
1075 }
1076 Ok(actor_dispatchers_map)
1077}
1078
1079pub fn compose_dispatchers(
1080 source_fragment_distribution: DistributionType,
1081 source_fragment_actors: &HashMap<crate::model::ActorId, Option<Bitmap>>,
1082 target_fragment_id: crate::model::FragmentId,
1083 target_fragment_distribution: DistributionType,
1084 target_fragment_actors: &HashMap<crate::model::ActorId, Option<Bitmap>>,
1085 dispatcher_type: DispatcherType,
1086 dist_key_indices: Vec<u32>,
1087 output_indices: Vec<u32>,
1088) -> HashMap<crate::model::ActorId, PbDispatcher> {
1089 match dispatcher_type {
1090 DispatcherType::Hash => {
1091 let dispatcher = PbDispatcher {
1092 r#type: PbDispatcherType::from(dispatcher_type) as _,
1093 dist_key_indices: dist_key_indices.clone(),
1094 output_indices: output_indices.clone(),
1095 hash_mapping: Some(
1096 ActorMapping::from_bitmaps(
1097 &target_fragment_actors
1098 .iter()
1099 .map(|(actor_id, bitmap)| {
1100 (
1101 *actor_id as _,
1102 bitmap
1103 .clone()
1104 .expect("downstream hash dispatch must have distribution"),
1105 )
1106 })
1107 .collect(),
1108 )
1109 .to_protobuf(),
1110 ),
1111 dispatcher_id: target_fragment_id as _,
1112 downstream_actor_id: target_fragment_actors
1113 .keys()
1114 .map(|actor_id| *actor_id as _)
1115 .collect(),
1116 };
1117 source_fragment_actors
1118 .keys()
1119 .map(|source_actor_id| (*source_actor_id, dispatcher.clone()))
1120 .collect()
1121 }
1122 DispatcherType::Broadcast | DispatcherType::Simple => {
1123 let dispatcher = PbDispatcher {
1124 r#type: PbDispatcherType::from(dispatcher_type) as _,
1125 dist_key_indices: dist_key_indices.clone(),
1126 output_indices: output_indices.clone(),
1127 hash_mapping: None,
1128 dispatcher_id: target_fragment_id as _,
1129 downstream_actor_id: target_fragment_actors
1130 .keys()
1131 .map(|actor_id| *actor_id as _)
1132 .collect(),
1133 };
1134 source_fragment_actors
1135 .keys()
1136 .map(|source_actor_id| (*source_actor_id, dispatcher.clone()))
1137 .collect()
1138 }
1139 DispatcherType::NoShuffle => resolve_no_shuffle_actor_dispatcher(
1140 source_fragment_distribution,
1141 source_fragment_actors,
1142 target_fragment_distribution,
1143 target_fragment_actors,
1144 )
1145 .into_iter()
1146 .map(|(upstream_actor_id, downstream_actor_id)| {
1147 (
1148 upstream_actor_id,
1149 PbDispatcher {
1150 r#type: PbDispatcherType::NoShuffle as _,
1151 dist_key_indices: dist_key_indices.clone(),
1152 output_indices: output_indices.clone(),
1153 hash_mapping: None,
1154 dispatcher_id: target_fragment_id as _,
1155 downstream_actor_id: vec![downstream_actor_id as _],
1156 },
1157 )
1158 })
1159 .collect(),
1160 }
1161}
1162
1163pub fn resolve_no_shuffle_actor_dispatcher(
1165 source_fragment_distribution: DistributionType,
1166 source_fragment_actors: &HashMap<crate::model::ActorId, Option<Bitmap>>,
1167 target_fragment_distribution: DistributionType,
1168 target_fragment_actors: &HashMap<crate::model::ActorId, Option<Bitmap>>,
1169) -> Vec<(crate::model::ActorId, crate::model::ActorId)> {
1170 assert_eq!(source_fragment_distribution, target_fragment_distribution);
1171 assert_eq!(
1172 source_fragment_actors.len(),
1173 target_fragment_actors.len(),
1174 "no-shuffle should have equal upstream downstream actor count: {:?} {:?}",
1175 source_fragment_actors,
1176 target_fragment_actors
1177 );
1178 match source_fragment_distribution {
1179 DistributionType::Single => {
1180 let assert_singleton = |bitmap: &Option<Bitmap>| {
1181 assert!(
1182 bitmap.as_ref().map(|bitmap| bitmap.all()).unwrap_or(true),
1183 "not singleton: {:?}",
1184 bitmap
1185 );
1186 };
1187 assert_eq!(
1188 source_fragment_actors.len(),
1189 1,
1190 "singleton distribution actor count not 1: {:?}",
1191 source_fragment_distribution
1192 );
1193 assert_eq!(
1194 target_fragment_actors.len(),
1195 1,
1196 "singleton distribution actor count not 1: {:?}",
1197 target_fragment_distribution
1198 );
1199 let (source_actor_id, bitmap) = source_fragment_actors.iter().next().unwrap();
1200 assert_singleton(bitmap);
1201 let (target_actor_id, bitmap) = target_fragment_actors.iter().next().unwrap();
1202 assert_singleton(bitmap);
1203 vec![(*source_actor_id, *target_actor_id)]
1204 }
1205 DistributionType::Hash => {
1206 let mut target_fragment_actor_index: HashMap<_, _> = target_fragment_actors
1207 .iter()
1208 .map(|(actor_id, bitmap)| {
1209 let bitmap = bitmap
1210 .as_ref()
1211 .expect("hash distribution should have bitmap");
1212 let first_vnode = bitmap.iter_vnodes().next().expect("non-empty bitmap");
1213 (first_vnode, (*actor_id, bitmap))
1214 })
1215 .collect();
1216 source_fragment_actors
1217 .iter()
1218 .map(|(source_actor_id, bitmap)| {
1219 let bitmap = bitmap
1220 .as_ref()
1221 .expect("hash distribution should have bitmap");
1222 let first_vnode = bitmap.iter_vnodes().next().expect("non-empty bitmap");
1223 let (target_actor_id, target_bitmap) =
1224 target_fragment_actor_index.remove(&first_vnode).unwrap_or_else(|| {
1225 panic!(
1226 "cannot find matched target actor: {} {:?} {:?} {:?}",
1227 source_actor_id,
1228 first_vnode,
1229 source_fragment_actors,
1230 target_fragment_actors
1231 );
1232 });
1233 assert_eq!(
1234 bitmap,
1235 target_bitmap,
1236 "cannot find matched target actor due to bitmap mismatch: {} {:?} {:?} {:?}",
1237 source_actor_id,
1238 first_vnode,
1239 source_fragment_actors,
1240 target_fragment_actors
1241 );
1242 (*source_actor_id, target_actor_id)
1243 }).collect()
1244 }
1245 }
1246}
1247
1248pub async fn get_fragment_mappings<C>(
1250 db: &C,
1251 job_id: ObjectId,
1252) -> MetaResult<Vec<PbFragmentWorkerSlotMapping>>
1253where
1254 C: ConnectionTrait,
1255{
1256 let job_actors: Vec<(
1257 FragmentId,
1258 DistributionType,
1259 ActorId,
1260 Option<VnodeBitmap>,
1261 WorkerId,
1262 ActorStatus,
1263 )> = Actor::find()
1264 .select_only()
1265 .columns([
1266 fragment::Column::FragmentId,
1267 fragment::Column::DistributionType,
1268 ])
1269 .columns([
1270 actor::Column::ActorId,
1271 actor::Column::VnodeBitmap,
1272 actor::Column::WorkerId,
1273 actor::Column::Status,
1274 ])
1275 .join(JoinType::InnerJoin, actor::Relation::Fragment.def())
1276 .filter(fragment::Column::JobId.eq(job_id))
1277 .into_tuple()
1278 .all(db)
1279 .await?;
1280
1281 Ok(rebuild_fragment_mapping_from_actors(job_actors))
1282}
1283
1284pub fn rebuild_fragment_mapping_from_actors(
1285 job_actors: Vec<(
1286 FragmentId,
1287 DistributionType,
1288 ActorId,
1289 Option<VnodeBitmap>,
1290 WorkerId,
1291 ActorStatus,
1292 )>,
1293) -> Vec<FragmentWorkerSlotMapping> {
1294 let mut all_actor_locations = HashMap::new();
1295 let mut actor_bitmaps = HashMap::new();
1296 let mut fragment_actors = HashMap::new();
1297 let mut fragment_dist = HashMap::new();
1298
1299 for (fragment_id, dist, actor_id, bitmap, worker_id, actor_status) in job_actors {
1300 if actor_status == ActorStatus::Inactive {
1301 continue;
1302 }
1303
1304 all_actor_locations
1305 .entry(fragment_id)
1306 .or_insert(HashMap::new())
1307 .insert(actor_id as hash::ActorId, worker_id as u32);
1308 actor_bitmaps.insert(actor_id, bitmap);
1309 fragment_actors
1310 .entry(fragment_id)
1311 .or_insert_with(Vec::new)
1312 .push(actor_id);
1313 fragment_dist.insert(fragment_id, dist);
1314 }
1315
1316 let mut result = vec![];
1317 for (fragment_id, dist) in fragment_dist {
1318 let mut actor_locations = all_actor_locations.remove(&fragment_id).unwrap();
1319 let fragment_worker_slot_mapping = match dist {
1320 DistributionType::Single => {
1321 let actor = fragment_actors
1322 .remove(&fragment_id)
1323 .unwrap()
1324 .into_iter()
1325 .exactly_one()
1326 .unwrap() as hash::ActorId;
1327 let actor_location = actor_locations.remove(&actor).unwrap();
1328
1329 WorkerSlotMapping::new_single(WorkerSlotId::new(actor_location, 0))
1330 }
1331 DistributionType::Hash => {
1332 let actors = fragment_actors.remove(&fragment_id).unwrap();
1333
1334 let all_actor_bitmaps: HashMap<_, _> = actors
1335 .iter()
1336 .map(|actor_id| {
1337 let vnode_bitmap = actor_bitmaps
1338 .remove(actor_id)
1339 .flatten()
1340 .expect("actor bitmap shouldn't be none in hash fragment");
1341
1342 let bitmap = Bitmap::from(&vnode_bitmap.to_protobuf());
1343 (*actor_id as hash::ActorId, bitmap)
1344 })
1345 .collect();
1346
1347 let actor_mapping = ActorMapping::from_bitmaps(&all_actor_bitmaps);
1348
1349 actor_mapping.to_worker_slot(&actor_locations)
1350 }
1351 };
1352
1353 result.push(PbFragmentWorkerSlotMapping {
1354 fragment_id: fragment_id as u32,
1355 mapping: Some(fragment_worker_slot_mapping.to_protobuf()),
1356 })
1357 }
1358 result
1359}
1360
1361pub async fn get_fragment_actor_ids<C>(
1363 db: &C,
1364 fragment_ids: Vec<FragmentId>,
1365) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>>
1366where
1367 C: ConnectionTrait,
1368{
1369 let fragment_actors: Vec<(FragmentId, ActorId)> = Actor::find()
1370 .select_only()
1371 .columns([actor::Column::FragmentId, actor::Column::ActorId])
1372 .filter(actor::Column::FragmentId.is_in(fragment_ids))
1373 .into_tuple()
1374 .all(db)
1375 .await?;
1376
1377 Ok(fragment_actors.into_iter().into_group_map())
1378}
1379
1380pub async fn get_fragments_for_jobs<C>(
1385 db: &C,
1386 streaming_jobs: Vec<ObjectId>,
1387) -> MetaResult<(
1388 HashMap<SourceId, BTreeSet<FragmentId>>,
1389 HashSet<ActorId>,
1390 HashSet<FragmentId>,
1391)>
1392where
1393 C: ConnectionTrait,
1394{
1395 if streaming_jobs.is_empty() {
1396 return Ok((HashMap::default(), HashSet::default(), HashSet::default()));
1397 }
1398
1399 let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find()
1400 .select_only()
1401 .columns([
1402 fragment::Column::FragmentId,
1403 fragment::Column::FragmentTypeMask,
1404 fragment::Column::StreamNode,
1405 ])
1406 .filter(fragment::Column::JobId.is_in(streaming_jobs))
1407 .into_tuple()
1408 .all(db)
1409 .await?;
1410 let actors: Vec<ActorId> = Actor::find()
1411 .select_only()
1412 .column(actor::Column::ActorId)
1413 .filter(
1414 actor::Column::FragmentId.is_in(fragments.iter().map(|(id, _, _)| *id).collect_vec()),
1415 )
1416 .into_tuple()
1417 .all(db)
1418 .await?;
1419
1420 let fragment_ids = fragments
1421 .iter()
1422 .map(|(fragment_id, _, _)| *fragment_id)
1423 .collect();
1424
1425 let mut source_fragment_ids: HashMap<SourceId, BTreeSet<FragmentId>> = HashMap::new();
1426 for (fragment_id, mask, stream_node) in fragments {
1427 if mask & PbFragmentTypeFlag::Source as i32 == 0 {
1428 continue;
1429 }
1430 if let Some(source_id) = stream_node.to_protobuf().find_stream_source() {
1431 source_fragment_ids
1432 .entry(source_id as _)
1433 .or_default()
1434 .insert(fragment_id);
1435 }
1436 }
1437
1438 Ok((
1439 source_fragment_ids,
1440 actors.into_iter().collect(),
1441 fragment_ids,
1442 ))
1443}
1444
1445pub(crate) fn build_object_group_for_delete(
1450 partial_objects: Vec<PartialObject>,
1451) -> NotificationInfo {
1452 let mut objects = vec![];
1453 for obj in partial_objects {
1454 match obj.obj_type {
1455 ObjectType::Database => objects.push(PbObject {
1456 object_info: Some(PbObjectInfo::Database(PbDatabase {
1457 id: obj.oid as _,
1458 ..Default::default()
1459 })),
1460 }),
1461 ObjectType::Schema => objects.push(PbObject {
1462 object_info: Some(PbObjectInfo::Schema(PbSchema {
1463 id: obj.oid as _,
1464 database_id: obj.database_id.unwrap() as _,
1465 ..Default::default()
1466 })),
1467 }),
1468 ObjectType::Table => objects.push(PbObject {
1469 object_info: Some(PbObjectInfo::Table(PbTable {
1470 id: obj.oid as _,
1471 schema_id: obj.schema_id.unwrap() as _,
1472 database_id: obj.database_id.unwrap() as _,
1473 ..Default::default()
1474 })),
1475 }),
1476 ObjectType::Source => objects.push(PbObject {
1477 object_info: Some(PbObjectInfo::Source(PbSource {
1478 id: obj.oid as _,
1479 schema_id: obj.schema_id.unwrap() as _,
1480 database_id: obj.database_id.unwrap() as _,
1481 ..Default::default()
1482 })),
1483 }),
1484 ObjectType::Sink => objects.push(PbObject {
1485 object_info: Some(PbObjectInfo::Sink(PbSink {
1486 id: obj.oid as _,
1487 schema_id: obj.schema_id.unwrap() as _,
1488 database_id: obj.database_id.unwrap() as _,
1489 ..Default::default()
1490 })),
1491 }),
1492 ObjectType::Subscription => objects.push(PbObject {
1493 object_info: Some(PbObjectInfo::Subscription(PbSubscription {
1494 id: obj.oid as _,
1495 schema_id: obj.schema_id.unwrap() as _,
1496 database_id: obj.database_id.unwrap() as _,
1497 ..Default::default()
1498 })),
1499 }),
1500 ObjectType::View => objects.push(PbObject {
1501 object_info: Some(PbObjectInfo::View(PbView {
1502 id: obj.oid as _,
1503 schema_id: obj.schema_id.unwrap() as _,
1504 database_id: obj.database_id.unwrap() as _,
1505 ..Default::default()
1506 })),
1507 }),
1508 ObjectType::Index => {
1509 objects.push(PbObject {
1510 object_info: Some(PbObjectInfo::Index(PbIndex {
1511 id: obj.oid as _,
1512 schema_id: obj.schema_id.unwrap() as _,
1513 database_id: obj.database_id.unwrap() as _,
1514 ..Default::default()
1515 })),
1516 });
1517 objects.push(PbObject {
1518 object_info: Some(PbObjectInfo::Table(PbTable {
1519 id: obj.oid as _,
1520 schema_id: obj.schema_id.unwrap() as _,
1521 database_id: obj.database_id.unwrap() as _,
1522 ..Default::default()
1523 })),
1524 });
1525 }
1526 ObjectType::Function => objects.push(PbObject {
1527 object_info: Some(PbObjectInfo::Function(PbFunction {
1528 id: obj.oid as _,
1529 schema_id: obj.schema_id.unwrap() as _,
1530 database_id: obj.database_id.unwrap() as _,
1531 ..Default::default()
1532 })),
1533 }),
1534 ObjectType::Connection => objects.push(PbObject {
1535 object_info: Some(PbObjectInfo::Connection(PbConnection {
1536 id: obj.oid as _,
1537 schema_id: obj.schema_id.unwrap() as _,
1538 database_id: obj.database_id.unwrap() as _,
1539 ..Default::default()
1540 })),
1541 }),
1542 ObjectType::Secret => objects.push(PbObject {
1543 object_info: Some(PbObjectInfo::Secret(PbSecret {
1544 id: obj.oid as _,
1545 schema_id: obj.schema_id.unwrap() as _,
1546 database_id: obj.database_id.unwrap() as _,
1547 ..Default::default()
1548 })),
1549 }),
1550 }
1551 }
1552 NotificationInfo::ObjectGroup(PbObjectGroup { objects })
1553}
1554
1555pub fn extract_external_table_name_from_definition(table_definition: &str) -> Option<String> {
1556 let [mut definition]: [_; 1] = Parser::parse_sql(table_definition)
1557 .context("unable to parse table definition")
1558 .inspect_err(|e| {
1559 tracing::error!(
1560 target: "auto_schema_change",
1561 error = %e.as_report(),
1562 "failed to parse table definition")
1563 })
1564 .unwrap()
1565 .try_into()
1566 .unwrap();
1567 if let SqlStatement::CreateTable { cdc_table_info, .. } = &mut definition {
1568 cdc_table_info
1569 .clone()
1570 .map(|cdc_table_info| cdc_table_info.external_table_name)
1571 } else {
1572 None
1573 }
1574}
1575
1576pub async fn rename_relation(
1579 txn: &DatabaseTransaction,
1580 object_type: ObjectType,
1581 object_id: ObjectId,
1582 object_name: &str,
1583) -> MetaResult<(Vec<PbObject>, String)> {
1584 use sea_orm::ActiveModelTrait;
1585
1586 use crate::controller::rename::alter_relation_rename;
1587
1588 let mut to_update_relations = vec![];
1589 macro_rules! rename_relation {
1591 ($entity:ident, $table:ident, $identity:ident, $object_id:expr) => {{
1592 let (mut relation, obj) = $entity::find_by_id($object_id)
1593 .find_also_related(Object)
1594 .one(txn)
1595 .await?
1596 .unwrap();
1597 let obj = obj.unwrap();
1598 let old_name = relation.name.clone();
1599 relation.name = object_name.into();
1600 if obj.obj_type != ObjectType::View {
1601 relation.definition = alter_relation_rename(&relation.definition, object_name);
1602 }
1603 let active_model = $table::ActiveModel {
1604 $identity: Set(relation.$identity),
1605 name: Set(object_name.into()),
1606 definition: Set(relation.definition.clone()),
1607 ..Default::default()
1608 };
1609 active_model.update(txn).await?;
1610 to_update_relations.push(PbObject {
1611 object_info: Some(PbObjectInfo::$entity(ObjectModel(relation, obj).into())),
1612 });
1613 old_name
1614 }};
1615 }
1616 let old_name = match object_type {
1618 ObjectType::Table => {
1619 let associated_source_id: Option<SourceId> = Source::find()
1620 .select_only()
1621 .column(source::Column::SourceId)
1622 .filter(source::Column::OptionalAssociatedTableId.eq(object_id))
1623 .into_tuple()
1624 .one(txn)
1625 .await?;
1626 if let Some(source_id) = associated_source_id {
1627 rename_relation!(Source, source, source_id, source_id);
1628 }
1629 rename_relation!(Table, table, table_id, object_id)
1630 }
1631 ObjectType::Source => rename_relation!(Source, source, source_id, object_id),
1632 ObjectType::Sink => rename_relation!(Sink, sink, sink_id, object_id),
1633 ObjectType::Subscription => {
1634 rename_relation!(Subscription, subscription, subscription_id, object_id)
1635 }
1636 ObjectType::View => rename_relation!(View, view, view_id, object_id),
1637 ObjectType::Index => {
1638 let (mut index, obj) = Index::find_by_id(object_id)
1639 .find_also_related(Object)
1640 .one(txn)
1641 .await?
1642 .unwrap();
1643 index.name = object_name.into();
1644 let index_table_id = index.index_table_id;
1645 let old_name = rename_relation!(Table, table, table_id, index_table_id);
1646
1647 let active_model = index::ActiveModel {
1649 index_id: sea_orm::ActiveValue::Set(index.index_id),
1650 name: sea_orm::ActiveValue::Set(object_name.into()),
1651 ..Default::default()
1652 };
1653 active_model.update(txn).await?;
1654 to_update_relations.push(PbObject {
1655 object_info: Some(PbObjectInfo::Index(ObjectModel(index, obj.unwrap()).into())),
1656 });
1657 old_name
1658 }
1659 _ => unreachable!("only relation name can be altered."),
1660 };
1661
1662 Ok((to_update_relations, old_name))
1663}
1664
1665pub async fn get_database_resource_group<C>(txn: &C, database_id: ObjectId) -> MetaResult<String>
1666where
1667 C: ConnectionTrait,
1668{
1669 let database_resource_group: Option<String> = Database::find_by_id(database_id)
1670 .select_only()
1671 .column(database::Column::ResourceGroup)
1672 .into_tuple()
1673 .one(txn)
1674 .await?
1675 .ok_or_else(|| MetaError::catalog_id_not_found("database", database_id))?;
1676
1677 Ok(database_resource_group.unwrap_or_else(|| DEFAULT_RESOURCE_GROUP.to_owned()))
1678}
1679
1680pub async fn get_existing_job_resource_group<C>(
1681 txn: &C,
1682 streaming_job_id: ObjectId,
1683) -> MetaResult<String>
1684where
1685 C: ConnectionTrait,
1686{
1687 let (job_specific_resource_group, database_resource_group): (Option<String>, Option<String>) =
1688 StreamingJob::find_by_id(streaming_job_id)
1689 .select_only()
1690 .join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
1691 .join(JoinType::InnerJoin, object::Relation::Database2.def())
1692 .column(streaming_job::Column::SpecificResourceGroup)
1693 .column(database::Column::ResourceGroup)
1694 .into_tuple()
1695 .one(txn)
1696 .await?
1697 .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", streaming_job_id))?;
1698
1699 Ok(job_specific_resource_group.unwrap_or_else(|| {
1700 database_resource_group.unwrap_or_else(|| DEFAULT_RESOURCE_GROUP.to_owned())
1701 }))
1702}
1703
1704pub fn filter_workers_by_resource_group(
1705 workers: &HashMap<u32, WorkerNode>,
1706 resource_group: &str,
1707) -> BTreeSet<WorkerId> {
1708 workers
1709 .iter()
1710 .filter(|&(_, worker)| {
1711 worker
1712 .resource_group()
1713 .map(|node_label| node_label.as_str() == resource_group)
1714 .unwrap_or(false)
1715 })
1716 .map(|(id, _)| (*id as WorkerId))
1717 .collect()
1718}
1719
1720pub async fn rename_relation_refer(
1723 txn: &DatabaseTransaction,
1724 object_type: ObjectType,
1725 object_id: ObjectId,
1726 object_name: &str,
1727 old_name: &str,
1728) -> MetaResult<Vec<PbObject>> {
1729 use sea_orm::ActiveModelTrait;
1730
1731 use crate::controller::rename::alter_relation_rename_refs;
1732
1733 let mut to_update_relations = vec![];
1734 macro_rules! rename_relation_ref {
1735 ($entity:ident, $table:ident, $identity:ident, $object_id:expr) => {{
1736 let (mut relation, obj) = $entity::find_by_id($object_id)
1737 .find_also_related(Object)
1738 .one(txn)
1739 .await?
1740 .unwrap();
1741 relation.definition =
1742 alter_relation_rename_refs(&relation.definition, old_name, object_name);
1743 let active_model = $table::ActiveModel {
1744 $identity: Set(relation.$identity),
1745 definition: Set(relation.definition.clone()),
1746 ..Default::default()
1747 };
1748 active_model.update(txn).await?;
1749 to_update_relations.push(PbObject {
1750 object_info: Some(PbObjectInfo::$entity(
1751 ObjectModel(relation, obj.unwrap()).into(),
1752 )),
1753 });
1754 }};
1755 }
1756 let mut objs = get_referring_objects(object_id, txn).await?;
1757 if object_type == ObjectType::Table {
1758 let incoming_sinks: I32Array = Table::find_by_id(object_id)
1759 .select_only()
1760 .column(table::Column::IncomingSinks)
1761 .into_tuple()
1762 .one(txn)
1763 .await?
1764 .ok_or_else(|| MetaError::catalog_id_not_found("table", object_id))?;
1765
1766 objs.extend(
1767 incoming_sinks
1768 .into_inner()
1769 .into_iter()
1770 .map(|id| PartialObject {
1771 oid: id,
1772 obj_type: ObjectType::Sink,
1773 schema_id: None,
1774 database_id: None,
1775 }),
1776 );
1777 }
1778
1779 for obj in objs {
1780 match obj.obj_type {
1781 ObjectType::Table => rename_relation_ref!(Table, table, table_id, obj.oid),
1782 ObjectType::Sink => rename_relation_ref!(Sink, sink, sink_id, obj.oid),
1783 ObjectType::Subscription => {
1784 rename_relation_ref!(Subscription, subscription, subscription_id, obj.oid)
1785 }
1786 ObjectType::View => rename_relation_ref!(View, view, view_id, obj.oid),
1787 ObjectType::Index => {
1788 let index_table_id: Option<TableId> = Index::find_by_id(obj.oid)
1789 .select_only()
1790 .column(index::Column::IndexTableId)
1791 .into_tuple()
1792 .one(txn)
1793 .await?;
1794 rename_relation_ref!(Table, table, table_id, index_table_id.unwrap());
1795 }
1796 _ => {
1797 bail!("only table, sink, subscription, view and index depend on other objects.")
1798 }
1799 }
1800 }
1801
1802 Ok(to_update_relations)
1803}
1804
1805pub async fn validate_subscription_deletion<C>(txn: &C, subscription_id: ObjectId) -> MetaResult<()>
1809where
1810 C: ConnectionTrait,
1811{
1812 let upstream_table_id: ObjectId = Subscription::find_by_id(subscription_id)
1813 .select_only()
1814 .column(subscription::Column::DependentTableId)
1815 .into_tuple()
1816 .one(txn)
1817 .await?
1818 .ok_or_else(|| MetaError::catalog_id_not_found("subscription", subscription_id))?;
1819
1820 let cnt = Subscription::find()
1821 .filter(subscription::Column::DependentTableId.eq(upstream_table_id))
1822 .count(txn)
1823 .await?;
1824 if cnt > 1 {
1825 return Ok(());
1828 }
1829
1830 let obj_alias = Alias::new("o1");
1832 let used_by_alias = Alias::new("o2");
1833 let count = ObjectDependency::find()
1834 .join_as(
1835 JoinType::InnerJoin,
1836 object_dependency::Relation::Object2.def(),
1837 obj_alias.clone(),
1838 )
1839 .join_as(
1840 JoinType::InnerJoin,
1841 object_dependency::Relation::Object1.def(),
1842 used_by_alias.clone(),
1843 )
1844 .filter(
1845 object_dependency::Column::Oid
1846 .eq(upstream_table_id)
1847 .and(object_dependency::Column::UsedBy.ne(subscription_id))
1848 .and(
1849 Expr::col((obj_alias, object::Column::DatabaseId))
1850 .ne(Expr::col((used_by_alias, object::Column::DatabaseId))),
1851 ),
1852 )
1853 .count(txn)
1854 .await?;
1855
1856 if count != 0 {
1857 return Err(MetaError::permission_denied(format!(
1858 "Referenced by {} cross-db objects.",
1859 count
1860 )));
1861 }
1862
1863 Ok(())
1864}
1865
1866#[cfg(test)]
1867mod tests {
1868 use super::*;
1869
1870 #[test]
1871 fn test_extract_cdc_table_name() {
1872 let ddl1 = "CREATE TABLE t1 () FROM pg_source TABLE 'public.t1'";
1873 let ddl2 = "CREATE TABLE t2 (v1 int) FROM pg_source TABLE 'mydb.t2'";
1874 assert_eq!(
1875 extract_external_table_name_from_definition(ddl1),
1876 Some("public.t1".into())
1877 );
1878 assert_eq!(
1879 extract_external_table_name_from_definition(ddl2),
1880 Some("mydb.t2".into())
1881 );
1882 }
1883}