1use std::collections::hash_map::Entry;
16use std::collections::{BTreeSet, HashMap, HashSet};
17use std::sync::Arc;
18
19use anyhow::{Context, anyhow};
20use itertools::Itertools;
21use risingwave_common::bitmap::Bitmap;
22use risingwave_common::hash::{ActorMapping, VnodeBitmapExt, WorkerSlotId, WorkerSlotMapping};
23use risingwave_common::util::worker_util::DEFAULT_RESOURCE_GROUP;
24use risingwave_common::{bail, hash};
25use risingwave_meta_model::actor::ActorStatus;
26use risingwave_meta_model::actor_dispatcher::DispatcherType;
27use risingwave_meta_model::fragment::DistributionType;
28use risingwave_meta_model::object::ObjectType;
29use risingwave_meta_model::prelude::*;
30use risingwave_meta_model::table::TableType;
31use risingwave_meta_model::{
32 ActorId, DataTypeArray, DatabaseId, FragmentId, I32Array, ObjectId, PrivilegeId, SchemaId,
33 SourceId, StreamNode, TableId, UserId, VnodeBitmap, WorkerId, actor, connection, database,
34 fragment, fragment_relation, function, index, object, object_dependency, schema, secret, sink,
35 source, streaming_job, subscription, table, user, user_privilege, view,
36};
37use risingwave_meta_model_migration::WithQuery;
38use risingwave_pb::catalog::{
39 PbConnection, PbDatabase, PbFunction, PbIndex, PbSchema, PbSecret, PbSink, PbSource,
40 PbSubscription, PbTable, PbView,
41};
42use risingwave_pb::common::WorkerNode;
43use risingwave_pb::meta::object::PbObjectInfo;
44use risingwave_pb::meta::subscribe_response::Info as NotificationInfo;
45use risingwave_pb::meta::{
46 FragmentWorkerSlotMapping, PbFragmentWorkerSlotMapping, PbObject, PbObjectGroup,
47};
48use risingwave_pb::stream_plan::{PbDispatcher, PbDispatcherType, PbFragmentTypeFlag};
49use risingwave_pb::user::grant_privilege::{
50 PbAction, PbActionWithGrantOption, PbObject as PbGrantObject,
51};
52use risingwave_pb::user::{PbGrantPrivilege, PbUserInfo};
53use risingwave_sqlparser::ast::Statement as SqlStatement;
54use risingwave_sqlparser::parser::Parser;
55use sea_orm::sea_query::{
56 Alias, CommonTableExpression, Expr, Query, QueryStatementBuilder, SelectStatement, UnionType,
57 WithClause,
58};
59use sea_orm::{
60 ColumnTrait, ConnectionTrait, DatabaseTransaction, DerivePartialModel, EntityTrait,
61 FromQueryResult, IntoActiveModel, JoinType, Order, PaginatorTrait, QueryFilter, QuerySelect,
62 RelationTrait, Set, Statement,
63};
64use thiserror_ext::AsReport;
65
66use crate::controller::ObjectModel;
67use crate::model::{FragmentActorDispatchers, FragmentDownstreamRelation};
68use crate::{MetaError, MetaResult};
69
70pub fn construct_obj_dependency_query(obj_id: ObjectId) -> WithQuery {
95 let cte_alias = Alias::new("used_by_object_ids");
96 let cte_return_alias = Alias::new("used_by");
97
98 let mut base_query = SelectStatement::new()
99 .column(object_dependency::Column::UsedBy)
100 .from(ObjectDependency)
101 .and_where(object_dependency::Column::Oid.eq(obj_id))
102 .to_owned();
103
104 let belonged_obj_query = SelectStatement::new()
105 .column(object::Column::Oid)
106 .from(Object)
107 .and_where(
108 object::Column::DatabaseId
109 .eq(obj_id)
110 .or(object::Column::SchemaId.eq(obj_id)),
111 )
112 .to_owned();
113
114 let cte_referencing = Query::select()
115 .column((ObjectDependency, object_dependency::Column::UsedBy))
116 .from(ObjectDependency)
117 .inner_join(
118 cte_alias.clone(),
119 Expr::col((cte_alias.clone(), cte_return_alias.clone()))
120 .equals(object_dependency::Column::Oid),
121 )
122 .to_owned();
123
124 let common_table_expr = CommonTableExpression::new()
125 .query(
126 base_query
127 .union(UnionType::All, belonged_obj_query)
128 .union(UnionType::All, cte_referencing)
129 .to_owned(),
130 )
131 .column(cte_return_alias.clone())
132 .table_name(cte_alias.clone())
133 .to_owned();
134
135 SelectStatement::new()
136 .distinct()
137 .columns([
138 object::Column::Oid,
139 object::Column::ObjType,
140 object::Column::SchemaId,
141 object::Column::DatabaseId,
142 ])
143 .from(cte_alias.clone())
144 .inner_join(
145 Object,
146 Expr::col((cte_alias, cte_return_alias.clone())).equals(object::Column::Oid),
147 )
148 .order_by(object::Column::Oid, Order::Desc)
149 .to_owned()
150 .with(
151 WithClause::new()
152 .recursive(true)
153 .cte(common_table_expr)
154 .to_owned(),
155 )
156 .to_owned()
157}
158
159pub fn construct_sink_cycle_check_query(
184 target_table: ObjectId,
185 dependent_objects: Vec<ObjectId>,
186) -> WithQuery {
187 let cte_alias = Alias::new("used_by_object_ids_with_sink");
188 let depend_alias = Alias::new("obj_dependency_with_sink");
189
190 let mut base_query = SelectStatement::new()
191 .columns([
192 object_dependency::Column::Oid,
193 object_dependency::Column::UsedBy,
194 ])
195 .from(ObjectDependency)
196 .and_where(object_dependency::Column::Oid.eq(target_table))
197 .to_owned();
198
199 let query_sink_deps = SelectStatement::new()
200 .columns([sink::Column::SinkId, sink::Column::TargetTable])
201 .from(Sink)
202 .and_where(sink::Column::TargetTable.is_not_null())
203 .to_owned();
204
205 let cte_referencing = Query::select()
206 .column((depend_alias.clone(), object_dependency::Column::Oid))
207 .column((depend_alias.clone(), object_dependency::Column::UsedBy))
208 .from_subquery(
209 SelectStatement::new()
210 .columns([
211 object_dependency::Column::Oid,
212 object_dependency::Column::UsedBy,
213 ])
214 .from(ObjectDependency)
215 .union(UnionType::All, query_sink_deps)
216 .to_owned(),
217 depend_alias.clone(),
218 )
219 .inner_join(
220 cte_alias.clone(),
221 Expr::col((cte_alias.clone(), object_dependency::Column::UsedBy)).eq(Expr::col((
222 depend_alias.clone(),
223 object_dependency::Column::Oid,
224 ))),
225 )
226 .and_where(
227 Expr::col((cte_alias.clone(), object_dependency::Column::UsedBy)).ne(Expr::col((
228 cte_alias.clone(),
229 object_dependency::Column::Oid,
230 ))),
231 )
232 .to_owned();
233
234 let common_table_expr = CommonTableExpression::new()
235 .query(base_query.union(UnionType::All, cte_referencing).to_owned())
236 .columns([
237 object_dependency::Column::Oid,
238 object_dependency::Column::UsedBy,
239 ])
240 .table_name(cte_alias.clone())
241 .to_owned();
242
243 SelectStatement::new()
244 .expr(Expr::col((cte_alias.clone(), object_dependency::Column::UsedBy)).count())
245 .from(cte_alias.clone())
246 .and_where(
247 Expr::col((cte_alias.clone(), object_dependency::Column::UsedBy))
248 .is_in(dependent_objects),
249 )
250 .to_owned()
251 .with(
252 WithClause::new()
253 .recursive(true)
254 .cte(common_table_expr)
255 .to_owned(),
256 )
257 .to_owned()
258}
259
260#[derive(Clone, DerivePartialModel, FromQueryResult, Debug)]
261#[sea_orm(entity = "Object")]
262pub struct PartialObject {
263 pub oid: ObjectId,
264 pub obj_type: ObjectType,
265 pub schema_id: Option<SchemaId>,
266 pub database_id: Option<DatabaseId>,
267}
268
269#[derive(Clone, DerivePartialModel, FromQueryResult)]
270#[sea_orm(entity = "Fragment")]
271pub struct PartialFragmentStateTables {
272 pub fragment_id: FragmentId,
273 pub job_id: ObjectId,
274 pub state_table_ids: I32Array,
275}
276
277#[derive(Clone, DerivePartialModel, FromQueryResult)]
278#[sea_orm(entity = "Actor")]
279pub struct PartialActorLocation {
280 pub actor_id: ActorId,
281 pub fragment_id: FragmentId,
282 pub worker_id: WorkerId,
283 pub status: ActorStatus,
284}
285
286#[derive(FromQueryResult)]
287pub struct FragmentDesc {
288 pub fragment_id: FragmentId,
289 pub job_id: ObjectId,
290 pub fragment_type_mask: i32,
291 pub distribution_type: DistributionType,
292 pub state_table_ids: I32Array,
293 pub parallelism: i64,
294 pub vnode_count: i32,
295 pub stream_node: StreamNode,
296}
297
298pub async fn get_referring_objects_cascade<C>(
300 obj_id: ObjectId,
301 db: &C,
302) -> MetaResult<Vec<PartialObject>>
303where
304 C: ConnectionTrait,
305{
306 let query = construct_obj_dependency_query(obj_id);
307 let (sql, values) = query.build_any(&*db.get_database_backend().get_query_builder());
308 let objects = PartialObject::find_by_statement(Statement::from_sql_and_values(
309 db.get_database_backend(),
310 sql,
311 values,
312 ))
313 .all(db)
314 .await?;
315 Ok(objects)
316}
317
318pub async fn check_sink_into_table_cycle<C>(
320 target_table: ObjectId,
321 dependent_objs: Vec<ObjectId>,
322 db: &C,
323) -> MetaResult<bool>
324where
325 C: ConnectionTrait,
326{
327 if dependent_objs.is_empty() {
328 return Ok(false);
329 }
330
331 if dependent_objs.contains(&target_table) {
333 return Ok(true);
334 }
335
336 let query = construct_sink_cycle_check_query(target_table, dependent_objs);
337 let (sql, values) = query.build_any(&*db.get_database_backend().get_query_builder());
338
339 let res = db
340 .query_one(Statement::from_sql_and_values(
341 db.get_database_backend(),
342 sql,
343 values,
344 ))
345 .await?
346 .unwrap();
347
348 let cnt: i64 = res.try_get_by(0)?;
349
350 Ok(cnt != 0)
351}
352
353pub async fn ensure_object_id<C>(
355 object_type: ObjectType,
356 obj_id: ObjectId,
357 db: &C,
358) -> MetaResult<()>
359where
360 C: ConnectionTrait,
361{
362 let count = Object::find_by_id(obj_id).count(db).await?;
363 if count == 0 {
364 return Err(MetaError::catalog_id_not_found(
365 object_type.as_str(),
366 obj_id,
367 ));
368 }
369 Ok(())
370}
371
372pub async fn ensure_user_id<C>(user_id: UserId, db: &C) -> MetaResult<()>
374where
375 C: ConnectionTrait,
376{
377 let count = User::find_by_id(user_id).count(db).await?;
378 if count == 0 {
379 return Err(anyhow!("user {} was concurrently dropped", user_id).into());
380 }
381 Ok(())
382}
383
384pub async fn check_database_name_duplicate<C>(name: &str, db: &C) -> MetaResult<()>
386where
387 C: ConnectionTrait,
388{
389 let count = Database::find()
390 .filter(database::Column::Name.eq(name))
391 .count(db)
392 .await?;
393 if count > 0 {
394 assert_eq!(count, 1);
395 return Err(MetaError::catalog_duplicated("database", name));
396 }
397 Ok(())
398}
399
400pub async fn check_function_signature_duplicate<C>(
402 pb_function: &PbFunction,
403 db: &C,
404) -> MetaResult<()>
405where
406 C: ConnectionTrait,
407{
408 let count = Function::find()
409 .inner_join(Object)
410 .filter(
411 object::Column::DatabaseId
412 .eq(pb_function.database_id as DatabaseId)
413 .and(object::Column::SchemaId.eq(pb_function.schema_id as SchemaId))
414 .and(function::Column::Name.eq(&pb_function.name))
415 .and(
416 function::Column::ArgTypes
417 .eq(DataTypeArray::from(pb_function.arg_types.clone())),
418 ),
419 )
420 .count(db)
421 .await?;
422 if count > 0 {
423 assert_eq!(count, 1);
424 return Err(MetaError::catalog_duplicated("function", &pb_function.name));
425 }
426 Ok(())
427}
428
429pub async fn check_connection_name_duplicate<C>(
431 pb_connection: &PbConnection,
432 db: &C,
433) -> MetaResult<()>
434where
435 C: ConnectionTrait,
436{
437 let count = Connection::find()
438 .inner_join(Object)
439 .filter(
440 object::Column::DatabaseId
441 .eq(pb_connection.database_id as DatabaseId)
442 .and(object::Column::SchemaId.eq(pb_connection.schema_id as SchemaId))
443 .and(connection::Column::Name.eq(&pb_connection.name)),
444 )
445 .count(db)
446 .await?;
447 if count > 0 {
448 assert_eq!(count, 1);
449 return Err(MetaError::catalog_duplicated(
450 "connection",
451 &pb_connection.name,
452 ));
453 }
454 Ok(())
455}
456
457pub async fn check_secret_name_duplicate<C>(pb_secret: &PbSecret, db: &C) -> MetaResult<()>
458where
459 C: ConnectionTrait,
460{
461 let count = Secret::find()
462 .inner_join(Object)
463 .filter(
464 object::Column::DatabaseId
465 .eq(pb_secret.database_id as DatabaseId)
466 .and(object::Column::SchemaId.eq(pb_secret.schema_id as SchemaId))
467 .and(secret::Column::Name.eq(&pb_secret.name)),
468 )
469 .count(db)
470 .await?;
471 if count > 0 {
472 assert_eq!(count, 1);
473 return Err(MetaError::catalog_duplicated("secret", &pb_secret.name));
474 }
475 Ok(())
476}
477
478pub async fn check_subscription_name_duplicate<C>(
479 pb_subscription: &PbSubscription,
480 db: &C,
481) -> MetaResult<()>
482where
483 C: ConnectionTrait,
484{
485 let count = Subscription::find()
486 .inner_join(Object)
487 .filter(
488 object::Column::DatabaseId
489 .eq(pb_subscription.database_id as DatabaseId)
490 .and(object::Column::SchemaId.eq(pb_subscription.schema_id as SchemaId))
491 .and(subscription::Column::Name.eq(&pb_subscription.name)),
492 )
493 .count(db)
494 .await?;
495 if count > 0 {
496 assert_eq!(count, 1);
497 return Err(MetaError::catalog_duplicated(
498 "subscription",
499 &pb_subscription.name,
500 ));
501 }
502 Ok(())
503}
504
505pub async fn check_user_name_duplicate<C>(name: &str, db: &C) -> MetaResult<()>
507where
508 C: ConnectionTrait,
509{
510 let count = User::find()
511 .filter(user::Column::Name.eq(name))
512 .count(db)
513 .await?;
514 if count > 0 {
515 assert_eq!(count, 1);
516 return Err(MetaError::catalog_duplicated("user", name));
517 }
518 Ok(())
519}
520
521pub async fn check_relation_name_duplicate<C>(
523 name: &str,
524 database_id: DatabaseId,
525 schema_id: SchemaId,
526 db: &C,
527) -> MetaResult<()>
528where
529 C: ConnectionTrait,
530{
531 macro_rules! check_duplicated {
532 ($obj_type:expr, $entity:ident, $table:ident) => {
533 let count = Object::find()
534 .inner_join($entity)
535 .filter(
536 object::Column::DatabaseId
537 .eq(Some(database_id))
538 .and(object::Column::SchemaId.eq(Some(schema_id)))
539 .and($table::Column::Name.eq(name)),
540 )
541 .count(db)
542 .await?;
543 if count != 0 {
544 return Err(MetaError::catalog_duplicated($obj_type.as_str(), name));
545 }
546 };
547 }
548 check_duplicated!(ObjectType::Table, Table, table);
549 check_duplicated!(ObjectType::Source, Source, source);
550 check_duplicated!(ObjectType::Sink, Sink, sink);
551 check_duplicated!(ObjectType::Index, Index, index);
552 check_duplicated!(ObjectType::View, View, view);
553
554 Ok(())
555}
556
557pub async fn check_schema_name_duplicate<C>(
559 name: &str,
560 database_id: DatabaseId,
561 db: &C,
562) -> MetaResult<()>
563where
564 C: ConnectionTrait,
565{
566 let count = Object::find()
567 .inner_join(Schema)
568 .filter(
569 object::Column::ObjType
570 .eq(ObjectType::Schema)
571 .and(object::Column::DatabaseId.eq(Some(database_id)))
572 .and(schema::Column::Name.eq(name)),
573 )
574 .count(db)
575 .await?;
576 if count != 0 {
577 return Err(MetaError::catalog_duplicated("schema", name));
578 }
579
580 Ok(())
581}
582
583pub async fn ensure_object_not_refer<C>(
585 object_type: ObjectType,
586 object_id: ObjectId,
587 db: &C,
588) -> MetaResult<()>
589where
590 C: ConnectionTrait,
591{
592 let count = if object_type == ObjectType::Table {
594 ObjectDependency::find()
595 .join(
596 JoinType::InnerJoin,
597 object_dependency::Relation::Object1.def(),
598 )
599 .filter(
600 object_dependency::Column::Oid
601 .eq(object_id)
602 .and(object::Column::ObjType.ne(ObjectType::Index)),
603 )
604 .count(db)
605 .await?
606 } else {
607 ObjectDependency::find()
608 .filter(object_dependency::Column::Oid.eq(object_id))
609 .count(db)
610 .await?
611 };
612 if count != 0 {
613 return Err(MetaError::permission_denied(format!(
614 "{} used by {} other objects.",
615 object_type.as_str(),
616 count
617 )));
618 }
619 Ok(())
620}
621
622pub async fn get_referring_objects<C>(object_id: ObjectId, db: &C) -> MetaResult<Vec<PartialObject>>
624where
625 C: ConnectionTrait,
626{
627 let objs = ObjectDependency::find()
628 .filter(object_dependency::Column::Oid.eq(object_id))
629 .join(
630 JoinType::InnerJoin,
631 object_dependency::Relation::Object1.def(),
632 )
633 .into_partial_model()
634 .all(db)
635 .await?;
636
637 Ok(objs)
638}
639
640pub async fn ensure_schema_empty<C>(schema_id: SchemaId, db: &C) -> MetaResult<()>
642where
643 C: ConnectionTrait,
644{
645 let count = Object::find()
646 .filter(object::Column::SchemaId.eq(Some(schema_id)))
647 .count(db)
648 .await?;
649 if count != 0 {
650 return Err(MetaError::permission_denied("schema is not empty"));
651 }
652
653 Ok(())
654}
655
656pub async fn list_user_info_by_ids<C>(user_ids: Vec<UserId>, db: &C) -> MetaResult<Vec<PbUserInfo>>
658where
659 C: ConnectionTrait,
660{
661 let mut user_infos = vec![];
662 for user_id in user_ids {
663 let user = User::find_by_id(user_id)
664 .one(db)
665 .await?
666 .ok_or_else(|| MetaError::catalog_id_not_found("user", user_id))?;
667 let mut user_info: PbUserInfo = user.into();
668 user_info.grant_privileges = get_user_privilege(user_id, db).await?;
669 user_infos.push(user_info);
670 }
671 Ok(user_infos)
672}
673
674pub async fn get_object_owner<C>(object_id: ObjectId, db: &C) -> MetaResult<UserId>
676where
677 C: ConnectionTrait,
678{
679 let obj_owner: UserId = Object::find_by_id(object_id)
680 .select_only()
681 .column(object::Column::OwnerId)
682 .into_tuple()
683 .one(db)
684 .await?
685 .ok_or_else(|| MetaError::catalog_id_not_found("object", object_id))?;
686 Ok(obj_owner)
687}
688
689pub fn construct_privilege_dependency_query(ids: Vec<PrivilegeId>) -> WithQuery {
714 let cte_alias = Alias::new("granted_privilege_ids");
715 let cte_return_privilege_alias = Alias::new("id");
716 let cte_return_user_alias = Alias::new("user_id");
717
718 let mut base_query = SelectStatement::new()
719 .columns([user_privilege::Column::Id, user_privilege::Column::UserId])
720 .from(UserPrivilege)
721 .and_where(user_privilege::Column::Id.is_in(ids))
722 .to_owned();
723
724 let cte_referencing = Query::select()
725 .columns([
726 (UserPrivilege, user_privilege::Column::Id),
727 (UserPrivilege, user_privilege::Column::UserId),
728 ])
729 .from(UserPrivilege)
730 .inner_join(
731 cte_alias.clone(),
732 Expr::col((cte_alias.clone(), cte_return_privilege_alias.clone()))
733 .equals(user_privilege::Column::DependentId),
734 )
735 .to_owned();
736
737 let common_table_expr = CommonTableExpression::new()
738 .query(base_query.union(UnionType::All, cte_referencing).to_owned())
739 .columns([
740 cte_return_privilege_alias.clone(),
741 cte_return_user_alias.clone(),
742 ])
743 .table_name(cte_alias.clone())
744 .to_owned();
745
746 SelectStatement::new()
747 .columns([cte_return_privilege_alias, cte_return_user_alias])
748 .from(cte_alias.clone())
749 .to_owned()
750 .with(
751 WithClause::new()
752 .recursive(true)
753 .cte(common_table_expr)
754 .to_owned(),
755 )
756 .to_owned()
757}
758
759pub async fn get_internal_tables_by_id<C>(job_id: ObjectId, db: &C) -> MetaResult<Vec<TableId>>
760where
761 C: ConnectionTrait,
762{
763 let table_ids: Vec<TableId> = Table::find()
764 .select_only()
765 .column(table::Column::TableId)
766 .filter(
767 table::Column::TableType
768 .eq(TableType::Internal)
769 .and(table::Column::BelongsToJobId.eq(job_id)),
770 )
771 .into_tuple()
772 .all(db)
773 .await?;
774 Ok(table_ids)
775}
776
777pub async fn get_index_state_tables_by_table_id<C>(
778 table_id: TableId,
779 db: &C,
780) -> MetaResult<Vec<TableId>>
781where
782 C: ConnectionTrait,
783{
784 let mut index_table_ids: Vec<TableId> = Index::find()
785 .select_only()
786 .column(index::Column::IndexTableId)
787 .filter(index::Column::PrimaryTableId.eq(table_id))
788 .into_tuple()
789 .all(db)
790 .await?;
791
792 if !index_table_ids.is_empty() {
793 let internal_table_ids: Vec<TableId> = Table::find()
794 .select_only()
795 .column(table::Column::TableId)
796 .filter(
797 table::Column::TableType
798 .eq(TableType::Internal)
799 .and(table::Column::BelongsToJobId.is_in(index_table_ids.clone())),
800 )
801 .into_tuple()
802 .all(db)
803 .await?;
804
805 index_table_ids.extend(internal_table_ids.into_iter());
806 }
807
808 Ok(index_table_ids)
809}
810
811#[derive(Clone, DerivePartialModel, FromQueryResult)]
812#[sea_orm(entity = "UserPrivilege")]
813pub struct PartialUserPrivilege {
814 pub id: PrivilegeId,
815 pub user_id: UserId,
816}
817
818pub async fn get_referring_privileges_cascade<C>(
819 ids: Vec<PrivilegeId>,
820 db: &C,
821) -> MetaResult<Vec<PartialUserPrivilege>>
822where
823 C: ConnectionTrait,
824{
825 let query = construct_privilege_dependency_query(ids);
826 let (sql, values) = query.build_any(&*db.get_database_backend().get_query_builder());
827 let privileges = PartialUserPrivilege::find_by_statement(Statement::from_sql_and_values(
828 db.get_database_backend(),
829 sql,
830 values,
831 ))
832 .all(db)
833 .await?;
834
835 Ok(privileges)
836}
837
838pub async fn ensure_privileges_not_referred<C>(ids: Vec<PrivilegeId>, db: &C) -> MetaResult<()>
840where
841 C: ConnectionTrait,
842{
843 let count = UserPrivilege::find()
844 .filter(user_privilege::Column::DependentId.is_in(ids))
845 .count(db)
846 .await?;
847 if count != 0 {
848 return Err(MetaError::permission_denied(format!(
849 "privileges granted to {} other ones.",
850 count
851 )));
852 }
853 Ok(())
854}
855
856pub async fn get_user_privilege<C>(user_id: UserId, db: &C) -> MetaResult<Vec<PbGrantPrivilege>>
858where
859 C: ConnectionTrait,
860{
861 let user_privileges = UserPrivilege::find()
862 .find_also_related(Object)
863 .filter(user_privilege::Column::UserId.eq(user_id))
864 .all(db)
865 .await?;
866 Ok(user_privileges
867 .into_iter()
868 .map(|(privilege, object)| {
869 let object = object.unwrap();
870 let oid = object.oid as _;
871 let obj = match object.obj_type {
872 ObjectType::Database => PbGrantObject::DatabaseId(oid),
873 ObjectType::Schema => PbGrantObject::SchemaId(oid),
874 ObjectType::Table | ObjectType::Index => PbGrantObject::TableId(oid),
875 ObjectType::Source => PbGrantObject::SourceId(oid),
876 ObjectType::Sink => PbGrantObject::SinkId(oid),
877 ObjectType::View => PbGrantObject::ViewId(oid),
878 ObjectType::Function => PbGrantObject::FunctionId(oid),
879 ObjectType::Connection => unreachable!("connection is not supported yet"),
880 ObjectType::Subscription => PbGrantObject::SubscriptionId(oid),
881 ObjectType::Secret => unreachable!("secret is not supported yet"),
882 };
883 PbGrantPrivilege {
884 action_with_opts: vec![PbActionWithGrantOption {
885 action: PbAction::from(privilege.action) as _,
886 with_grant_option: privilege.with_grant_option,
887 granted_by: privilege.granted_by as _,
888 }],
889 object: Some(obj),
890 }
891 })
892 .collect())
893}
894
895pub fn extract_grant_obj_id(object: &PbGrantObject) -> ObjectId {
897 match object {
898 PbGrantObject::DatabaseId(id)
899 | PbGrantObject::SchemaId(id)
900 | PbGrantObject::TableId(id)
901 | PbGrantObject::SourceId(id)
902 | PbGrantObject::SinkId(id)
903 | PbGrantObject::ViewId(id)
904 | PbGrantObject::FunctionId(id)
905 | PbGrantObject::SubscriptionId(id)
906 | PbGrantObject::ConnectionId(id)
907 | PbGrantObject::SecretId(id) => *id as _,
908 }
909}
910
911pub async fn insert_fragment_relations(
912 db: &impl ConnectionTrait,
913 downstream_fragment_relations: &FragmentDownstreamRelation,
914) -> MetaResult<()> {
915 for (upstream_fragment_id, downstreams) in downstream_fragment_relations {
916 for downstream in downstreams {
917 let relation = fragment_relation::Model {
918 source_fragment_id: *upstream_fragment_id as _,
919 target_fragment_id: downstream.downstream_fragment_id as _,
920 dispatcher_type: downstream.dispatcher_type,
921 dist_key_indices: downstream
922 .dist_key_indices
923 .iter()
924 .map(|idx| *idx as i32)
925 .collect_vec()
926 .into(),
927 output_indices: downstream
928 .output_indices
929 .iter()
930 .map(|idx| *idx as i32)
931 .collect_vec()
932 .into(),
933 };
934 FragmentRelation::insert(relation.into_active_model())
935 .exec(db)
936 .await?;
937 }
938 }
939 Ok(())
940}
941
942pub async fn get_fragment_actor_dispatchers<C>(
943 db: &C,
944 fragment_ids: Vec<FragmentId>,
945) -> MetaResult<FragmentActorDispatchers>
946where
947 C: ConnectionTrait,
948{
949 type FragmentActorInfo = (
950 DistributionType,
951 Arc<HashMap<crate::model::ActorId, Option<Bitmap>>>,
952 );
953 let mut fragment_actor_cache: HashMap<FragmentId, FragmentActorInfo> = HashMap::new();
954 let get_fragment_actors = |fragment_id: FragmentId| async move {
955 let result: MetaResult<FragmentActorInfo> = try {
956 let mut fragment_actors = Fragment::find_by_id(fragment_id)
957 .find_with_related(Actor)
958 .filter(actor::Column::Status.eq(ActorStatus::Running))
959 .all(db)
960 .await?;
961 if fragment_actors.is_empty() {
962 return Err(anyhow!("failed to find fragment: {}", fragment_id).into());
963 }
964 assert_eq!(
965 fragment_actors.len(),
966 1,
967 "find multiple fragment {:?}",
968 fragment_actors
969 );
970 let (fragment, actors) = fragment_actors.pop().unwrap();
971 (
972 fragment.distribution_type,
973 Arc::new(
974 actors
975 .into_iter()
976 .map(|actor| {
977 (
978 actor.actor_id as _,
979 actor
980 .vnode_bitmap
981 .map(|bitmap| Bitmap::from(bitmap.to_protobuf())),
982 )
983 })
984 .collect(),
985 ),
986 )
987 };
988 result
989 };
990 let fragment_relations = FragmentRelation::find()
991 .filter(fragment_relation::Column::SourceFragmentId.is_in(fragment_ids))
992 .all(db)
993 .await?;
994
995 let mut actor_dispatchers_map: HashMap<_, HashMap<_, Vec<_>>> = HashMap::new();
996 for fragment_relation::Model {
997 source_fragment_id,
998 target_fragment_id,
999 dispatcher_type,
1000 dist_key_indices,
1001 output_indices,
1002 } in fragment_relations
1003 {
1004 let (source_fragment_distribution, source_fragment_actors) = {
1005 let (distribution, actors) = {
1006 match fragment_actor_cache.entry(source_fragment_id) {
1007 Entry::Occupied(entry) => entry.into_mut(),
1008 Entry::Vacant(entry) => {
1009 entry.insert(get_fragment_actors(source_fragment_id).await?)
1010 }
1011 }
1012 };
1013 (*distribution, actors.clone())
1014 };
1015 let (target_fragment_distribution, target_fragment_actors) = {
1016 let (distribution, actors) = {
1017 match fragment_actor_cache.entry(target_fragment_id) {
1018 Entry::Occupied(entry) => entry.into_mut(),
1019 Entry::Vacant(entry) => {
1020 entry.insert(get_fragment_actors(target_fragment_id).await?)
1021 }
1022 }
1023 };
1024 (*distribution, actors.clone())
1025 };
1026 let dispatchers = compose_dispatchers(
1027 source_fragment_distribution,
1028 &source_fragment_actors,
1029 target_fragment_id as _,
1030 target_fragment_distribution,
1031 &target_fragment_actors,
1032 dispatcher_type,
1033 dist_key_indices.into_u32_array(),
1034 output_indices.into_u32_array(),
1035 );
1036 let actor_dispatchers_map = actor_dispatchers_map
1037 .entry(source_fragment_id as _)
1038 .or_default();
1039 for (actor_id, dispatchers) in dispatchers {
1040 actor_dispatchers_map
1041 .entry(actor_id as _)
1042 .or_default()
1043 .push(dispatchers);
1044 }
1045 }
1046 Ok(actor_dispatchers_map)
1047}
1048
1049pub fn compose_dispatchers(
1050 source_fragment_distribution: DistributionType,
1051 source_fragment_actors: &HashMap<crate::model::ActorId, Option<Bitmap>>,
1052 target_fragment_id: crate::model::FragmentId,
1053 target_fragment_distribution: DistributionType,
1054 target_fragment_actors: &HashMap<crate::model::ActorId, Option<Bitmap>>,
1055 dispatcher_type: DispatcherType,
1056 dist_key_indices: Vec<u32>,
1057 output_indices: Vec<u32>,
1058) -> HashMap<crate::model::ActorId, PbDispatcher> {
1059 match dispatcher_type {
1060 DispatcherType::Hash => {
1061 let dispatcher = PbDispatcher {
1062 r#type: PbDispatcherType::from(dispatcher_type) as _,
1063 dist_key_indices: dist_key_indices.clone(),
1064 output_indices: output_indices.clone(),
1065 hash_mapping: Some(
1066 ActorMapping::from_bitmaps(
1067 &target_fragment_actors
1068 .iter()
1069 .map(|(actor_id, bitmap)| {
1070 (
1071 *actor_id as _,
1072 bitmap
1073 .clone()
1074 .expect("downstream hash dispatch must have distribution"),
1075 )
1076 })
1077 .collect(),
1078 )
1079 .to_protobuf(),
1080 ),
1081 dispatcher_id: target_fragment_id as _,
1082 downstream_actor_id: target_fragment_actors
1083 .keys()
1084 .map(|actor_id| *actor_id as _)
1085 .collect(),
1086 };
1087 source_fragment_actors
1088 .keys()
1089 .map(|source_actor_id| (*source_actor_id, dispatcher.clone()))
1090 .collect()
1091 }
1092 DispatcherType::Broadcast | DispatcherType::Simple => {
1093 let dispatcher = PbDispatcher {
1094 r#type: PbDispatcherType::from(dispatcher_type) as _,
1095 dist_key_indices: dist_key_indices.clone(),
1096 output_indices: output_indices.clone(),
1097 hash_mapping: None,
1098 dispatcher_id: target_fragment_id as _,
1099 downstream_actor_id: target_fragment_actors
1100 .keys()
1101 .map(|actor_id| *actor_id as _)
1102 .collect(),
1103 };
1104 source_fragment_actors
1105 .keys()
1106 .map(|source_actor_id| (*source_actor_id, dispatcher.clone()))
1107 .collect()
1108 }
1109 DispatcherType::NoShuffle => resolve_no_shuffle_actor_dispatcher(
1110 source_fragment_distribution,
1111 source_fragment_actors,
1112 target_fragment_distribution,
1113 target_fragment_actors,
1114 )
1115 .into_iter()
1116 .map(|(upstream_actor_id, downstream_actor_id)| {
1117 (
1118 upstream_actor_id,
1119 PbDispatcher {
1120 r#type: PbDispatcherType::NoShuffle as _,
1121 dist_key_indices: dist_key_indices.clone(),
1122 output_indices: output_indices.clone(),
1123 hash_mapping: None,
1124 dispatcher_id: target_fragment_id as _,
1125 downstream_actor_id: vec![downstream_actor_id as _],
1126 },
1127 )
1128 })
1129 .collect(),
1130 }
1131}
1132
1133pub fn resolve_no_shuffle_actor_dispatcher(
1135 source_fragment_distribution: DistributionType,
1136 source_fragment_actors: &HashMap<crate::model::ActorId, Option<Bitmap>>,
1137 target_fragment_distribution: DistributionType,
1138 target_fragment_actors: &HashMap<crate::model::ActorId, Option<Bitmap>>,
1139) -> Vec<(crate::model::ActorId, crate::model::ActorId)> {
1140 assert_eq!(source_fragment_distribution, target_fragment_distribution);
1141 assert_eq!(
1142 source_fragment_actors.len(),
1143 target_fragment_actors.len(),
1144 "no-shuffle should have equal upstream downstream actor count: {:?} {:?}",
1145 source_fragment_actors,
1146 target_fragment_actors
1147 );
1148 match source_fragment_distribution {
1149 DistributionType::Single => {
1150 let assert_singleton = |bitmap: &Option<Bitmap>| {
1151 assert!(
1152 bitmap.as_ref().map(|bitmap| bitmap.all()).unwrap_or(true),
1153 "not singleton: {:?}",
1154 bitmap
1155 );
1156 };
1157 assert_eq!(
1158 source_fragment_actors.len(),
1159 1,
1160 "singleton distribution actor count not 1: {:?}",
1161 source_fragment_distribution
1162 );
1163 assert_eq!(
1164 target_fragment_actors.len(),
1165 1,
1166 "singleton distribution actor count not 1: {:?}",
1167 target_fragment_distribution
1168 );
1169 let (source_actor_id, bitmap) = source_fragment_actors.iter().next().unwrap();
1170 assert_singleton(bitmap);
1171 let (target_actor_id, bitmap) = target_fragment_actors.iter().next().unwrap();
1172 assert_singleton(bitmap);
1173 vec![(*source_actor_id, *target_actor_id)]
1174 }
1175 DistributionType::Hash => {
1176 let mut target_fragment_actor_index: HashMap<_, _> = target_fragment_actors
1177 .iter()
1178 .map(|(actor_id, bitmap)| {
1179 let bitmap = bitmap
1180 .as_ref()
1181 .expect("hash distribution should have bitmap");
1182 let first_vnode = bitmap.iter_vnodes().next().expect("non-empty bitmap");
1183 (first_vnode, (*actor_id, bitmap))
1184 })
1185 .collect();
1186 source_fragment_actors
1187 .iter()
1188 .map(|(source_actor_id, bitmap)| {
1189 let bitmap = bitmap
1190 .as_ref()
1191 .expect("hash distribution should have bitmap");
1192 let first_vnode = bitmap.iter_vnodes().next().expect("non-empty bitmap");
1193 let (target_actor_id, target_bitmap) =
1194 target_fragment_actor_index.remove(&first_vnode).unwrap_or_else(|| {
1195 panic!(
1196 "cannot find matched target actor: {} {:?} {:?} {:?}",
1197 source_actor_id,
1198 first_vnode,
1199 source_fragment_actors,
1200 target_fragment_actors
1201 );
1202 });
1203 assert_eq!(
1204 bitmap,
1205 target_bitmap,
1206 "cannot find matched target actor due to bitmap mismatch: {} {:?} {:?} {:?}",
1207 source_actor_id,
1208 first_vnode,
1209 source_fragment_actors,
1210 target_fragment_actors
1211 );
1212 (*source_actor_id, target_actor_id)
1213 }).collect()
1214 }
1215 }
1216}
1217
1218pub async fn get_fragment_mappings<C>(
1220 db: &C,
1221 job_id: ObjectId,
1222) -> MetaResult<Vec<PbFragmentWorkerSlotMapping>>
1223where
1224 C: ConnectionTrait,
1225{
1226 let job_actors: Vec<(
1227 FragmentId,
1228 DistributionType,
1229 ActorId,
1230 Option<VnodeBitmap>,
1231 WorkerId,
1232 ActorStatus,
1233 )> = Actor::find()
1234 .select_only()
1235 .columns([
1236 fragment::Column::FragmentId,
1237 fragment::Column::DistributionType,
1238 ])
1239 .columns([
1240 actor::Column::ActorId,
1241 actor::Column::VnodeBitmap,
1242 actor::Column::WorkerId,
1243 actor::Column::Status,
1244 ])
1245 .join(JoinType::InnerJoin, actor::Relation::Fragment.def())
1246 .filter(fragment::Column::JobId.eq(job_id))
1247 .into_tuple()
1248 .all(db)
1249 .await?;
1250
1251 Ok(rebuild_fragment_mapping_from_actors(job_actors))
1252}
1253
1254pub fn rebuild_fragment_mapping_from_actors(
1255 job_actors: Vec<(
1256 FragmentId,
1257 DistributionType,
1258 ActorId,
1259 Option<VnodeBitmap>,
1260 WorkerId,
1261 ActorStatus,
1262 )>,
1263) -> Vec<FragmentWorkerSlotMapping> {
1264 let mut all_actor_locations = HashMap::new();
1265 let mut actor_bitmaps = HashMap::new();
1266 let mut fragment_actors = HashMap::new();
1267 let mut fragment_dist = HashMap::new();
1268
1269 for (fragment_id, dist, actor_id, bitmap, worker_id, actor_status) in job_actors {
1270 if actor_status == ActorStatus::Inactive {
1271 continue;
1272 }
1273
1274 all_actor_locations
1275 .entry(fragment_id)
1276 .or_insert(HashMap::new())
1277 .insert(actor_id as hash::ActorId, worker_id as u32);
1278 actor_bitmaps.insert(actor_id, bitmap);
1279 fragment_actors
1280 .entry(fragment_id)
1281 .or_insert_with(Vec::new)
1282 .push(actor_id);
1283 fragment_dist.insert(fragment_id, dist);
1284 }
1285
1286 let mut result = vec![];
1287 for (fragment_id, dist) in fragment_dist {
1288 let mut actor_locations = all_actor_locations.remove(&fragment_id).unwrap();
1289 let fragment_worker_slot_mapping = match dist {
1290 DistributionType::Single => {
1291 let actor = fragment_actors
1292 .remove(&fragment_id)
1293 .unwrap()
1294 .into_iter()
1295 .exactly_one()
1296 .unwrap() as hash::ActorId;
1297 let actor_location = actor_locations.remove(&actor).unwrap();
1298
1299 WorkerSlotMapping::new_single(WorkerSlotId::new(actor_location, 0))
1300 }
1301 DistributionType::Hash => {
1302 let actors = fragment_actors.remove(&fragment_id).unwrap();
1303
1304 let all_actor_bitmaps: HashMap<_, _> = actors
1305 .iter()
1306 .map(|actor_id| {
1307 let vnode_bitmap = actor_bitmaps
1308 .remove(actor_id)
1309 .flatten()
1310 .expect("actor bitmap shouldn't be none in hash fragment");
1311
1312 let bitmap = Bitmap::from(&vnode_bitmap.to_protobuf());
1313 (*actor_id as hash::ActorId, bitmap)
1314 })
1315 .collect();
1316
1317 let actor_mapping = ActorMapping::from_bitmaps(&all_actor_bitmaps);
1318
1319 actor_mapping.to_worker_slot(&actor_locations)
1320 }
1321 };
1322
1323 result.push(PbFragmentWorkerSlotMapping {
1324 fragment_id: fragment_id as u32,
1325 mapping: Some(fragment_worker_slot_mapping.to_protobuf()),
1326 })
1327 }
1328 result
1329}
1330
1331pub async fn get_fragment_actor_ids<C>(
1333 db: &C,
1334 fragment_ids: Vec<FragmentId>,
1335) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>>
1336where
1337 C: ConnectionTrait,
1338{
1339 let fragment_actors: Vec<(FragmentId, ActorId)> = Actor::find()
1340 .select_only()
1341 .columns([actor::Column::FragmentId, actor::Column::ActorId])
1342 .filter(actor::Column::FragmentId.is_in(fragment_ids))
1343 .into_tuple()
1344 .all(db)
1345 .await?;
1346
1347 Ok(fragment_actors.into_iter().into_group_map())
1348}
1349
1350pub async fn get_fragments_for_jobs<C>(
1355 db: &C,
1356 streaming_jobs: Vec<ObjectId>,
1357) -> MetaResult<(
1358 HashMap<SourceId, BTreeSet<FragmentId>>,
1359 HashSet<ActorId>,
1360 HashSet<FragmentId>,
1361)>
1362where
1363 C: ConnectionTrait,
1364{
1365 if streaming_jobs.is_empty() {
1366 return Ok((HashMap::default(), HashSet::default(), HashSet::default()));
1367 }
1368
1369 let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find()
1370 .select_only()
1371 .columns([
1372 fragment::Column::FragmentId,
1373 fragment::Column::FragmentTypeMask,
1374 fragment::Column::StreamNode,
1375 ])
1376 .filter(fragment::Column::JobId.is_in(streaming_jobs))
1377 .into_tuple()
1378 .all(db)
1379 .await?;
1380 let actors: Vec<ActorId> = Actor::find()
1381 .select_only()
1382 .column(actor::Column::ActorId)
1383 .filter(
1384 actor::Column::FragmentId.is_in(fragments.iter().map(|(id, _, _)| *id).collect_vec()),
1385 )
1386 .into_tuple()
1387 .all(db)
1388 .await?;
1389
1390 let fragment_ids = fragments
1391 .iter()
1392 .map(|(fragment_id, _, _)| *fragment_id)
1393 .collect();
1394
1395 let mut source_fragment_ids: HashMap<SourceId, BTreeSet<FragmentId>> = HashMap::new();
1396 for (fragment_id, mask, stream_node) in fragments {
1397 if mask & PbFragmentTypeFlag::Source as i32 == 0 {
1398 continue;
1399 }
1400 if let Some(source_id) = stream_node.to_protobuf().find_stream_source() {
1401 source_fragment_ids
1402 .entry(source_id as _)
1403 .or_default()
1404 .insert(fragment_id);
1405 }
1406 }
1407
1408 Ok((
1409 source_fragment_ids,
1410 actors.into_iter().collect(),
1411 fragment_ids,
1412 ))
1413}
1414
1415pub(crate) fn build_object_group_for_delete(
1420 partial_objects: Vec<PartialObject>,
1421) -> NotificationInfo {
1422 let mut objects = vec![];
1423 for obj in partial_objects {
1424 match obj.obj_type {
1425 ObjectType::Database => objects.push(PbObject {
1426 object_info: Some(PbObjectInfo::Database(PbDatabase {
1427 id: obj.oid as _,
1428 ..Default::default()
1429 })),
1430 }),
1431 ObjectType::Schema => objects.push(PbObject {
1432 object_info: Some(PbObjectInfo::Schema(PbSchema {
1433 id: obj.oid as _,
1434 database_id: obj.database_id.unwrap() as _,
1435 ..Default::default()
1436 })),
1437 }),
1438 ObjectType::Table => objects.push(PbObject {
1439 object_info: Some(PbObjectInfo::Table(PbTable {
1440 id: obj.oid as _,
1441 schema_id: obj.schema_id.unwrap() as _,
1442 database_id: obj.database_id.unwrap() as _,
1443 ..Default::default()
1444 })),
1445 }),
1446 ObjectType::Source => objects.push(PbObject {
1447 object_info: Some(PbObjectInfo::Source(PbSource {
1448 id: obj.oid as _,
1449 schema_id: obj.schema_id.unwrap() as _,
1450 database_id: obj.database_id.unwrap() as _,
1451 ..Default::default()
1452 })),
1453 }),
1454 ObjectType::Sink => objects.push(PbObject {
1455 object_info: Some(PbObjectInfo::Sink(PbSink {
1456 id: obj.oid as _,
1457 schema_id: obj.schema_id.unwrap() as _,
1458 database_id: obj.database_id.unwrap() as _,
1459 ..Default::default()
1460 })),
1461 }),
1462 ObjectType::Subscription => objects.push(PbObject {
1463 object_info: Some(PbObjectInfo::Subscription(PbSubscription {
1464 id: obj.oid as _,
1465 schema_id: obj.schema_id.unwrap() as _,
1466 database_id: obj.database_id.unwrap() as _,
1467 ..Default::default()
1468 })),
1469 }),
1470 ObjectType::View => objects.push(PbObject {
1471 object_info: Some(PbObjectInfo::View(PbView {
1472 id: obj.oid as _,
1473 schema_id: obj.schema_id.unwrap() as _,
1474 database_id: obj.database_id.unwrap() as _,
1475 ..Default::default()
1476 })),
1477 }),
1478 ObjectType::Index => {
1479 objects.push(PbObject {
1480 object_info: Some(PbObjectInfo::Index(PbIndex {
1481 id: obj.oid as _,
1482 schema_id: obj.schema_id.unwrap() as _,
1483 database_id: obj.database_id.unwrap() as _,
1484 ..Default::default()
1485 })),
1486 });
1487 objects.push(PbObject {
1488 object_info: Some(PbObjectInfo::Table(PbTable {
1489 id: obj.oid as _,
1490 schema_id: obj.schema_id.unwrap() as _,
1491 database_id: obj.database_id.unwrap() as _,
1492 ..Default::default()
1493 })),
1494 });
1495 }
1496 ObjectType::Function => objects.push(PbObject {
1497 object_info: Some(PbObjectInfo::Function(PbFunction {
1498 id: obj.oid as _,
1499 schema_id: obj.schema_id.unwrap() as _,
1500 database_id: obj.database_id.unwrap() as _,
1501 ..Default::default()
1502 })),
1503 }),
1504 ObjectType::Connection => objects.push(PbObject {
1505 object_info: Some(PbObjectInfo::Connection(PbConnection {
1506 id: obj.oid as _,
1507 schema_id: obj.schema_id.unwrap() as _,
1508 database_id: obj.database_id.unwrap() as _,
1509 ..Default::default()
1510 })),
1511 }),
1512 ObjectType::Secret => objects.push(PbObject {
1513 object_info: Some(PbObjectInfo::Secret(PbSecret {
1514 id: obj.oid as _,
1515 schema_id: obj.schema_id.unwrap() as _,
1516 database_id: obj.database_id.unwrap() as _,
1517 ..Default::default()
1518 })),
1519 }),
1520 }
1521 }
1522 NotificationInfo::ObjectGroup(PbObjectGroup { objects })
1523}
1524
1525pub fn extract_external_table_name_from_definition(table_definition: &str) -> Option<String> {
1526 let [mut definition]: [_; 1] = Parser::parse_sql(table_definition)
1527 .context("unable to parse table definition")
1528 .inspect_err(|e| {
1529 tracing::error!(
1530 target: "auto_schema_change",
1531 error = %e.as_report(),
1532 "failed to parse table definition")
1533 })
1534 .unwrap()
1535 .try_into()
1536 .unwrap();
1537 if let SqlStatement::CreateTable { cdc_table_info, .. } = &mut definition {
1538 cdc_table_info
1539 .clone()
1540 .map(|cdc_table_info| cdc_table_info.external_table_name)
1541 } else {
1542 None
1543 }
1544}
1545
1546pub async fn rename_relation(
1549 txn: &DatabaseTransaction,
1550 object_type: ObjectType,
1551 object_id: ObjectId,
1552 object_name: &str,
1553) -> MetaResult<(Vec<PbObject>, String)> {
1554 use sea_orm::ActiveModelTrait;
1555
1556 use crate::controller::rename::alter_relation_rename;
1557
1558 let mut to_update_relations = vec![];
1559 macro_rules! rename_relation {
1561 ($entity:ident, $table:ident, $identity:ident, $object_id:expr) => {{
1562 let (mut relation, obj) = $entity::find_by_id($object_id)
1563 .find_also_related(Object)
1564 .one(txn)
1565 .await?
1566 .unwrap();
1567 let obj = obj.unwrap();
1568 let old_name = relation.name.clone();
1569 relation.name = object_name.into();
1570 if obj.obj_type != ObjectType::View {
1571 relation.definition = alter_relation_rename(&relation.definition, object_name);
1572 }
1573 let active_model = $table::ActiveModel {
1574 $identity: Set(relation.$identity),
1575 name: Set(object_name.into()),
1576 definition: Set(relation.definition.clone()),
1577 ..Default::default()
1578 };
1579 active_model.update(txn).await?;
1580 to_update_relations.push(PbObject {
1581 object_info: Some(PbObjectInfo::$entity(ObjectModel(relation, obj).into())),
1582 });
1583 old_name
1584 }};
1585 }
1586 let old_name = match object_type {
1588 ObjectType::Table => rename_relation!(Table, table, table_id, object_id),
1589 ObjectType::Source => rename_relation!(Source, source, source_id, object_id),
1590 ObjectType::Sink => rename_relation!(Sink, sink, sink_id, object_id),
1591 ObjectType::Subscription => {
1592 rename_relation!(Subscription, subscription, subscription_id, object_id)
1593 }
1594 ObjectType::View => rename_relation!(View, view, view_id, object_id),
1595 ObjectType::Index => {
1596 let (mut index, obj) = Index::find_by_id(object_id)
1597 .find_also_related(Object)
1598 .one(txn)
1599 .await?
1600 .unwrap();
1601 index.name = object_name.into();
1602 let index_table_id = index.index_table_id;
1603 let old_name = rename_relation!(Table, table, table_id, index_table_id);
1604
1605 let active_model = index::ActiveModel {
1607 index_id: sea_orm::ActiveValue::Set(index.index_id),
1608 name: sea_orm::ActiveValue::Set(object_name.into()),
1609 ..Default::default()
1610 };
1611 active_model.update(txn).await?;
1612 to_update_relations.push(PbObject {
1613 object_info: Some(PbObjectInfo::Index(ObjectModel(index, obj.unwrap()).into())),
1614 });
1615 old_name
1616 }
1617 _ => unreachable!("only relation name can be altered."),
1618 };
1619
1620 Ok((to_update_relations, old_name))
1621}
1622
1623pub async fn get_database_resource_group<C>(txn: &C, database_id: ObjectId) -> MetaResult<String>
1624where
1625 C: ConnectionTrait,
1626{
1627 let database_resource_group: Option<String> = Database::find_by_id(database_id)
1628 .select_only()
1629 .column(database::Column::ResourceGroup)
1630 .into_tuple()
1631 .one(txn)
1632 .await?
1633 .ok_or_else(|| MetaError::catalog_id_not_found("database", database_id))?;
1634
1635 Ok(database_resource_group.unwrap_or_else(|| DEFAULT_RESOURCE_GROUP.to_owned()))
1636}
1637
1638pub async fn get_existing_job_resource_group<C>(
1639 txn: &C,
1640 streaming_job_id: ObjectId,
1641) -> MetaResult<String>
1642where
1643 C: ConnectionTrait,
1644{
1645 let (job_specific_resource_group, database_resource_group): (Option<String>, Option<String>) =
1646 StreamingJob::find_by_id(streaming_job_id)
1647 .select_only()
1648 .join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
1649 .join(JoinType::InnerJoin, object::Relation::Database2.def())
1650 .column(streaming_job::Column::SpecificResourceGroup)
1651 .column(database::Column::ResourceGroup)
1652 .into_tuple()
1653 .one(txn)
1654 .await?
1655 .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", streaming_job_id))?;
1656
1657 Ok(job_specific_resource_group.unwrap_or_else(|| {
1658 database_resource_group.unwrap_or_else(|| DEFAULT_RESOURCE_GROUP.to_owned())
1659 }))
1660}
1661
1662pub fn filter_workers_by_resource_group(
1663 workers: &HashMap<u32, WorkerNode>,
1664 resource_group: &str,
1665) -> BTreeSet<WorkerId> {
1666 workers
1667 .iter()
1668 .filter(|&(_, worker)| {
1669 worker
1670 .resource_group()
1671 .map(|node_label| node_label.as_str() == resource_group)
1672 .unwrap_or(false)
1673 })
1674 .map(|(id, _)| (*id as WorkerId))
1675 .collect()
1676}
1677
1678pub async fn rename_relation_refer(
1681 txn: &DatabaseTransaction,
1682 object_type: ObjectType,
1683 object_id: ObjectId,
1684 object_name: &str,
1685 old_name: &str,
1686) -> MetaResult<Vec<PbObject>> {
1687 use sea_orm::ActiveModelTrait;
1688
1689 use crate::controller::rename::alter_relation_rename_refs;
1690
1691 let mut to_update_relations = vec![];
1692 macro_rules! rename_relation_ref {
1693 ($entity:ident, $table:ident, $identity:ident, $object_id:expr) => {{
1694 let (mut relation, obj) = $entity::find_by_id($object_id)
1695 .find_also_related(Object)
1696 .one(txn)
1697 .await?
1698 .unwrap();
1699 relation.definition =
1700 alter_relation_rename_refs(&relation.definition, old_name, object_name);
1701 let active_model = $table::ActiveModel {
1702 $identity: Set(relation.$identity),
1703 definition: Set(relation.definition.clone()),
1704 ..Default::default()
1705 };
1706 active_model.update(txn).await?;
1707 to_update_relations.push(PbObject {
1708 object_info: Some(PbObjectInfo::$entity(
1709 ObjectModel(relation, obj.unwrap()).into(),
1710 )),
1711 });
1712 }};
1713 }
1714 let mut objs = get_referring_objects(object_id, txn).await?;
1715 if object_type == ObjectType::Table {
1716 let incoming_sinks: I32Array = Table::find_by_id(object_id)
1717 .select_only()
1718 .column(table::Column::IncomingSinks)
1719 .into_tuple()
1720 .one(txn)
1721 .await?
1722 .ok_or_else(|| MetaError::catalog_id_not_found("table", object_id))?;
1723
1724 objs.extend(
1725 incoming_sinks
1726 .into_inner()
1727 .into_iter()
1728 .map(|id| PartialObject {
1729 oid: id,
1730 obj_type: ObjectType::Sink,
1731 schema_id: None,
1732 database_id: None,
1733 }),
1734 );
1735 }
1736
1737 for obj in objs {
1738 match obj.obj_type {
1739 ObjectType::Table => rename_relation_ref!(Table, table, table_id, obj.oid),
1740 ObjectType::Sink => rename_relation_ref!(Sink, sink, sink_id, obj.oid),
1741 ObjectType::Subscription => {
1742 rename_relation_ref!(Subscription, subscription, subscription_id, obj.oid)
1743 }
1744 ObjectType::View => rename_relation_ref!(View, view, view_id, obj.oid),
1745 ObjectType::Index => {
1746 let index_table_id: Option<TableId> = Index::find_by_id(obj.oid)
1747 .select_only()
1748 .column(index::Column::IndexTableId)
1749 .into_tuple()
1750 .one(txn)
1751 .await?;
1752 rename_relation_ref!(Table, table, table_id, index_table_id.unwrap());
1753 }
1754 _ => {
1755 bail!("only table, sink, subscription, view and index depend on other objects.")
1756 }
1757 }
1758 }
1759
1760 Ok(to_update_relations)
1761}
1762
1763pub async fn validate_subscription_deletion<C>(txn: &C, subscription_id: ObjectId) -> MetaResult<()>
1767where
1768 C: ConnectionTrait,
1769{
1770 let upstream_table_id: ObjectId = Subscription::find_by_id(subscription_id)
1771 .select_only()
1772 .column(subscription::Column::DependentTableId)
1773 .into_tuple()
1774 .one(txn)
1775 .await?
1776 .ok_or_else(|| MetaError::catalog_id_not_found("subscription", subscription_id))?;
1777
1778 let cnt = Subscription::find()
1779 .filter(subscription::Column::DependentTableId.eq(upstream_table_id))
1780 .count(txn)
1781 .await?;
1782 if cnt > 1 {
1783 return Ok(());
1786 }
1787
1788 let obj_alias = Alias::new("o1");
1790 let used_by_alias = Alias::new("o2");
1791 let count = ObjectDependency::find()
1792 .join_as(
1793 JoinType::InnerJoin,
1794 object_dependency::Relation::Object2.def(),
1795 obj_alias.clone(),
1796 )
1797 .join_as(
1798 JoinType::InnerJoin,
1799 object_dependency::Relation::Object1.def(),
1800 used_by_alias.clone(),
1801 )
1802 .filter(
1803 object_dependency::Column::Oid
1804 .eq(upstream_table_id)
1805 .and(object_dependency::Column::UsedBy.ne(subscription_id))
1806 .and(
1807 Expr::col((obj_alias, object::Column::DatabaseId))
1808 .ne(Expr::col((used_by_alias, object::Column::DatabaseId))),
1809 ),
1810 )
1811 .count(txn)
1812 .await?;
1813
1814 if count != 0 {
1815 return Err(MetaError::permission_denied(format!(
1816 "Referenced by {} cross-db objects.",
1817 count
1818 )));
1819 }
1820
1821 Ok(())
1822}
1823
1824#[cfg(test)]
1825mod tests {
1826 use super::*;
1827
1828 #[test]
1829 fn test_extract_cdc_table_name() {
1830 let ddl1 = "CREATE TABLE t1 () FROM pg_source TABLE 'public.t1'";
1831 let ddl2 = "CREATE TABLE t2 (v1 int) FROM pg_source TABLE 'mydb.t2'";
1832 assert_eq!(
1833 extract_external_table_name_from_definition(ddl1),
1834 Some("public.t1".into())
1835 );
1836 assert_eq!(
1837 extract_external_table_name_from_definition(ddl2),
1838 Some("mydb.t2".into())
1839 );
1840 }
1841}