1use std::collections::hash_map::Entry;
16use std::collections::{BTreeSet, HashMap, HashSet};
17use std::sync::Arc;
18
19use anyhow::{Context, anyhow};
20use itertools::Itertools;
21use risingwave_common::bitmap::Bitmap;
22use risingwave_common::hash::{ActorMapping, VnodeBitmapExt, WorkerSlotId, WorkerSlotMapping};
23use risingwave_common::util::worker_util::DEFAULT_RESOURCE_GROUP;
24use risingwave_common::{bail, hash};
25use risingwave_meta_model::actor::ActorStatus;
26use risingwave_meta_model::fragment::DistributionType;
27use risingwave_meta_model::object::ObjectType;
28use risingwave_meta_model::prelude::*;
29use risingwave_meta_model::table::TableType;
30use risingwave_meta_model::{
31 ActorId, DataTypeArray, DatabaseId, DispatcherType, FragmentId, I32Array, JobStatus, ObjectId,
32 PrivilegeId, SchemaId, SourceId, StreamNode, StreamSourceInfo, TableId, UserId, VnodeBitmap,
33 WorkerId, actor, connection, database, fragment, fragment_relation, function, index, object,
34 object_dependency, schema, secret, sink, source, streaming_job, subscription, table, user,
35 user_privilege, view,
36};
37use risingwave_meta_model_migration::WithQuery;
38use risingwave_pb::catalog::{
39 PbConnection, PbDatabase, PbFunction, PbIndex, PbSchema, PbSecret, PbSink, PbSource,
40 PbSubscription, PbTable, PbView,
41};
42use risingwave_pb::common::WorkerNode;
43use risingwave_pb::meta::object::PbObjectInfo;
44use risingwave_pb::meta::subscribe_response::Info as NotificationInfo;
45use risingwave_pb::meta::{
46 FragmentWorkerSlotMapping, PbFragmentWorkerSlotMapping, PbObject, PbObjectGroup,
47};
48use risingwave_pb::stream_plan::{
49 PbDispatchOutputMapping, PbDispatcher, PbDispatcherType, PbFragmentTypeFlag,
50};
51use risingwave_pb::user::grant_privilege::{
52 PbAction, PbActionWithGrantOption, PbObject as PbGrantObject,
53};
54use risingwave_pb::user::{PbGrantPrivilege, PbUserInfo};
55use risingwave_sqlparser::ast::Statement as SqlStatement;
56use risingwave_sqlparser::parser::Parser;
57use sea_orm::sea_query::{
58 Alias, CommonTableExpression, Expr, Query, QueryStatementBuilder, SelectStatement, UnionType,
59 WithClause,
60};
61use sea_orm::{
62 ColumnTrait, ConnectionTrait, DatabaseTransaction, DerivePartialModel, EntityTrait,
63 FromQueryResult, IntoActiveModel, JoinType, Order, PaginatorTrait, QueryFilter, QuerySelect,
64 RelationTrait, Set, Statement,
65};
66use thiserror_ext::AsReport;
67
68use crate::controller::ObjectModel;
69use crate::model::{FragmentActorDispatchers, FragmentDownstreamRelation};
70use crate::{MetaError, MetaResult};
71
72pub fn construct_obj_dependency_query(obj_id: ObjectId) -> WithQuery {
97 let cte_alias = Alias::new("used_by_object_ids");
98 let cte_return_alias = Alias::new("used_by");
99
100 let mut base_query = SelectStatement::new()
101 .column(object_dependency::Column::UsedBy)
102 .from(ObjectDependency)
103 .and_where(object_dependency::Column::Oid.eq(obj_id))
104 .to_owned();
105
106 let belonged_obj_query = SelectStatement::new()
107 .column(object::Column::Oid)
108 .from(Object)
109 .and_where(
110 object::Column::DatabaseId
111 .eq(obj_id)
112 .or(object::Column::SchemaId.eq(obj_id)),
113 )
114 .to_owned();
115
116 let cte_referencing = Query::select()
117 .column((ObjectDependency, object_dependency::Column::UsedBy))
118 .from(ObjectDependency)
119 .inner_join(
120 cte_alias.clone(),
121 Expr::col((cte_alias.clone(), cte_return_alias.clone()))
122 .equals(object_dependency::Column::Oid),
123 )
124 .to_owned();
125
126 let common_table_expr = CommonTableExpression::new()
127 .query(
128 base_query
129 .union(UnionType::All, belonged_obj_query)
130 .union(UnionType::All, cte_referencing)
131 .to_owned(),
132 )
133 .column(cte_return_alias.clone())
134 .table_name(cte_alias.clone())
135 .to_owned();
136
137 SelectStatement::new()
138 .distinct()
139 .columns([
140 object::Column::Oid,
141 object::Column::ObjType,
142 object::Column::SchemaId,
143 object::Column::DatabaseId,
144 ])
145 .from(cte_alias.clone())
146 .inner_join(
147 Object,
148 Expr::col((cte_alias, cte_return_alias.clone())).equals(object::Column::Oid),
149 )
150 .order_by(object::Column::Oid, Order::Desc)
151 .to_owned()
152 .with(
153 WithClause::new()
154 .recursive(true)
155 .cte(common_table_expr)
156 .to_owned(),
157 )
158 .to_owned()
159}
160
161pub fn construct_sink_cycle_check_query(
186 target_table: ObjectId,
187 dependent_objects: Vec<ObjectId>,
188) -> WithQuery {
189 let cte_alias = Alias::new("used_by_object_ids_with_sink");
190 let depend_alias = Alias::new("obj_dependency_with_sink");
191
192 let mut base_query = SelectStatement::new()
193 .columns([
194 object_dependency::Column::Oid,
195 object_dependency::Column::UsedBy,
196 ])
197 .from(ObjectDependency)
198 .and_where(object_dependency::Column::Oid.eq(target_table))
199 .to_owned();
200
201 let query_sink_deps = SelectStatement::new()
202 .columns([sink::Column::SinkId, sink::Column::TargetTable])
203 .from(Sink)
204 .and_where(sink::Column::TargetTable.is_not_null())
205 .to_owned();
206
207 let cte_referencing = Query::select()
208 .column((depend_alias.clone(), object_dependency::Column::Oid))
209 .column((depend_alias.clone(), object_dependency::Column::UsedBy))
210 .from_subquery(
211 SelectStatement::new()
212 .columns([
213 object_dependency::Column::Oid,
214 object_dependency::Column::UsedBy,
215 ])
216 .from(ObjectDependency)
217 .union(UnionType::All, query_sink_deps)
218 .to_owned(),
219 depend_alias.clone(),
220 )
221 .inner_join(
222 cte_alias.clone(),
223 Expr::col((cte_alias.clone(), object_dependency::Column::UsedBy)).eq(Expr::col((
224 depend_alias.clone(),
225 object_dependency::Column::Oid,
226 ))),
227 )
228 .and_where(
229 Expr::col((cte_alias.clone(), object_dependency::Column::UsedBy)).ne(Expr::col((
230 cte_alias.clone(),
231 object_dependency::Column::Oid,
232 ))),
233 )
234 .to_owned();
235
236 let common_table_expr = CommonTableExpression::new()
237 .query(base_query.union(UnionType::All, cte_referencing).to_owned())
238 .columns([
239 object_dependency::Column::Oid,
240 object_dependency::Column::UsedBy,
241 ])
242 .table_name(cte_alias.clone())
243 .to_owned();
244
245 SelectStatement::new()
246 .expr(Expr::col((cte_alias.clone(), object_dependency::Column::UsedBy)).count())
247 .from(cte_alias.clone())
248 .and_where(
249 Expr::col((cte_alias.clone(), object_dependency::Column::UsedBy))
250 .is_in(dependent_objects),
251 )
252 .to_owned()
253 .with(
254 WithClause::new()
255 .recursive(true)
256 .cte(common_table_expr)
257 .to_owned(),
258 )
259 .to_owned()
260}
261
262#[derive(Clone, DerivePartialModel, FromQueryResult, Debug)]
263#[sea_orm(entity = "Object")]
264pub struct PartialObject {
265 pub oid: ObjectId,
266 pub obj_type: ObjectType,
267 pub schema_id: Option<SchemaId>,
268 pub database_id: Option<DatabaseId>,
269}
270
271#[derive(Clone, DerivePartialModel, FromQueryResult)]
272#[sea_orm(entity = "Fragment")]
273pub struct PartialFragmentStateTables {
274 pub fragment_id: FragmentId,
275 pub job_id: ObjectId,
276 pub state_table_ids: I32Array,
277}
278
279#[derive(Clone, DerivePartialModel, FromQueryResult)]
280#[sea_orm(entity = "Actor")]
281pub struct PartialActorLocation {
282 pub actor_id: ActorId,
283 pub fragment_id: FragmentId,
284 pub worker_id: WorkerId,
285 pub status: ActorStatus,
286}
287
288#[derive(FromQueryResult)]
289pub struct FragmentDesc {
290 pub fragment_id: FragmentId,
291 pub job_id: ObjectId,
292 pub fragment_type_mask: i32,
293 pub distribution_type: DistributionType,
294 pub state_table_ids: I32Array,
295 pub parallelism: i64,
296 pub vnode_count: i32,
297 pub stream_node: StreamNode,
298}
299
300pub async fn get_referring_objects_cascade<C>(
302 obj_id: ObjectId,
303 db: &C,
304) -> MetaResult<Vec<PartialObject>>
305where
306 C: ConnectionTrait,
307{
308 let query = construct_obj_dependency_query(obj_id);
309 let (sql, values) = query.build_any(&*db.get_database_backend().get_query_builder());
310 let objects = PartialObject::find_by_statement(Statement::from_sql_and_values(
311 db.get_database_backend(),
312 sql,
313 values,
314 ))
315 .all(db)
316 .await?;
317 Ok(objects)
318}
319
320pub async fn check_sink_into_table_cycle<C>(
322 target_table: ObjectId,
323 dependent_objs: Vec<ObjectId>,
324 db: &C,
325) -> MetaResult<bool>
326where
327 C: ConnectionTrait,
328{
329 if dependent_objs.is_empty() {
330 return Ok(false);
331 }
332
333 if dependent_objs.contains(&target_table) {
335 return Ok(true);
336 }
337
338 let query = construct_sink_cycle_check_query(target_table, dependent_objs);
339 let (sql, values) = query.build_any(&*db.get_database_backend().get_query_builder());
340
341 let res = db
342 .query_one(Statement::from_sql_and_values(
343 db.get_database_backend(),
344 sql,
345 values,
346 ))
347 .await?
348 .unwrap();
349
350 let cnt: i64 = res.try_get_by(0)?;
351
352 Ok(cnt != 0)
353}
354
355pub async fn ensure_object_id<C>(
357 object_type: ObjectType,
358 obj_id: ObjectId,
359 db: &C,
360) -> MetaResult<()>
361where
362 C: ConnectionTrait,
363{
364 let count = Object::find_by_id(obj_id).count(db).await?;
365 if count == 0 {
366 return Err(MetaError::catalog_id_not_found(
367 object_type.as_str(),
368 obj_id,
369 ));
370 }
371 Ok(())
372}
373
374pub async fn ensure_user_id<C>(user_id: UserId, db: &C) -> MetaResult<()>
376where
377 C: ConnectionTrait,
378{
379 let count = User::find_by_id(user_id).count(db).await?;
380 if count == 0 {
381 return Err(anyhow!("user {} was concurrently dropped", user_id).into());
382 }
383 Ok(())
384}
385
386pub async fn check_database_name_duplicate<C>(name: &str, db: &C) -> MetaResult<()>
388where
389 C: ConnectionTrait,
390{
391 let count = Database::find()
392 .filter(database::Column::Name.eq(name))
393 .count(db)
394 .await?;
395 if count > 0 {
396 assert_eq!(count, 1);
397 return Err(MetaError::catalog_duplicated("database", name));
398 }
399 Ok(())
400}
401
402pub async fn check_function_signature_duplicate<C>(
404 pb_function: &PbFunction,
405 db: &C,
406) -> MetaResult<()>
407where
408 C: ConnectionTrait,
409{
410 let count = Function::find()
411 .inner_join(Object)
412 .filter(
413 object::Column::DatabaseId
414 .eq(pb_function.database_id as DatabaseId)
415 .and(object::Column::SchemaId.eq(pb_function.schema_id as SchemaId))
416 .and(function::Column::Name.eq(&pb_function.name))
417 .and(
418 function::Column::ArgTypes
419 .eq(DataTypeArray::from(pb_function.arg_types.clone())),
420 ),
421 )
422 .count(db)
423 .await?;
424 if count > 0 {
425 assert_eq!(count, 1);
426 return Err(MetaError::catalog_duplicated("function", &pb_function.name));
427 }
428 Ok(())
429}
430
431pub async fn check_connection_name_duplicate<C>(
433 pb_connection: &PbConnection,
434 db: &C,
435) -> MetaResult<()>
436where
437 C: ConnectionTrait,
438{
439 let count = Connection::find()
440 .inner_join(Object)
441 .filter(
442 object::Column::DatabaseId
443 .eq(pb_connection.database_id as DatabaseId)
444 .and(object::Column::SchemaId.eq(pb_connection.schema_id as SchemaId))
445 .and(connection::Column::Name.eq(&pb_connection.name)),
446 )
447 .count(db)
448 .await?;
449 if count > 0 {
450 assert_eq!(count, 1);
451 return Err(MetaError::catalog_duplicated(
452 "connection",
453 &pb_connection.name,
454 ));
455 }
456 Ok(())
457}
458
459pub async fn check_secret_name_duplicate<C>(pb_secret: &PbSecret, db: &C) -> MetaResult<()>
460where
461 C: ConnectionTrait,
462{
463 let count = Secret::find()
464 .inner_join(Object)
465 .filter(
466 object::Column::DatabaseId
467 .eq(pb_secret.database_id as DatabaseId)
468 .and(object::Column::SchemaId.eq(pb_secret.schema_id as SchemaId))
469 .and(secret::Column::Name.eq(&pb_secret.name)),
470 )
471 .count(db)
472 .await?;
473 if count > 0 {
474 assert_eq!(count, 1);
475 return Err(MetaError::catalog_duplicated("secret", &pb_secret.name));
476 }
477 Ok(())
478}
479
480pub async fn check_subscription_name_duplicate<C>(
481 pb_subscription: &PbSubscription,
482 db: &C,
483) -> MetaResult<()>
484where
485 C: ConnectionTrait,
486{
487 let count = Subscription::find()
488 .inner_join(Object)
489 .filter(
490 object::Column::DatabaseId
491 .eq(pb_subscription.database_id as DatabaseId)
492 .and(object::Column::SchemaId.eq(pb_subscription.schema_id as SchemaId))
493 .and(subscription::Column::Name.eq(&pb_subscription.name)),
494 )
495 .count(db)
496 .await?;
497 if count > 0 {
498 assert_eq!(count, 1);
499 return Err(MetaError::catalog_duplicated(
500 "subscription",
501 &pb_subscription.name,
502 ));
503 }
504 Ok(())
505}
506
507pub async fn check_user_name_duplicate<C>(name: &str, db: &C) -> MetaResult<()>
509where
510 C: ConnectionTrait,
511{
512 let count = User::find()
513 .filter(user::Column::Name.eq(name))
514 .count(db)
515 .await?;
516 if count > 0 {
517 assert_eq!(count, 1);
518 return Err(MetaError::catalog_duplicated("user", name));
519 }
520 Ok(())
521}
522
523pub async fn check_relation_name_duplicate<C>(
525 name: &str,
526 database_id: DatabaseId,
527 schema_id: SchemaId,
528 db: &C,
529) -> MetaResult<()>
530where
531 C: ConnectionTrait,
532{
533 macro_rules! check_duplicated {
534 ($obj_type:expr, $entity:ident, $table:ident) => {
535 let object_id = Object::find()
536 .select_only()
537 .column(object::Column::Oid)
538 .inner_join($entity)
539 .filter(
540 object::Column::DatabaseId
541 .eq(Some(database_id))
542 .and(object::Column::SchemaId.eq(Some(schema_id)))
543 .and($table::Column::Name.eq(name)),
544 )
545 .into_tuple::<ObjectId>()
546 .one(db)
547 .await?;
548 if let Some(oid) = object_id {
549 let check_creation = if $obj_type == ObjectType::View {
550 false
551 } else if $obj_type == ObjectType::Source {
552 let source_info = Source::find_by_id(oid)
553 .select_only()
554 .column(source::Column::SourceInfo)
555 .into_tuple::<Option<StreamSourceInfo>>()
556 .one(db)
557 .await?
558 .unwrap();
559 source_info.map_or(false, |info| info.to_protobuf().is_shared())
560 } else {
561 true
562 };
563 return if check_creation
564 && !matches!(
565 StreamingJob::find_by_id(oid)
566 .select_only()
567 .column(streaming_job::Column::JobStatus)
568 .into_tuple::<JobStatus>()
569 .one(db)
570 .await?,
571 Some(JobStatus::Created)
572 ) {
573 Err(MetaError::catalog_under_creation(
574 $obj_type.as_str(),
575 name,
576 oid,
577 ))
578 } else {
579 Err(MetaError::catalog_duplicated($obj_type.as_str(), name))
580 };
581 }
582 };
583 }
584 check_duplicated!(ObjectType::Table, Table, table);
585 check_duplicated!(ObjectType::Source, Source, source);
586 check_duplicated!(ObjectType::Sink, Sink, sink);
587 check_duplicated!(ObjectType::Index, Index, index);
588 check_duplicated!(ObjectType::View, View, view);
589
590 Ok(())
591}
592
593pub async fn check_schema_name_duplicate<C>(
595 name: &str,
596 database_id: DatabaseId,
597 db: &C,
598) -> MetaResult<()>
599where
600 C: ConnectionTrait,
601{
602 let count = Object::find()
603 .inner_join(Schema)
604 .filter(
605 object::Column::ObjType
606 .eq(ObjectType::Schema)
607 .and(object::Column::DatabaseId.eq(Some(database_id)))
608 .and(schema::Column::Name.eq(name)),
609 )
610 .count(db)
611 .await?;
612 if count != 0 {
613 return Err(MetaError::catalog_duplicated("schema", name));
614 }
615
616 Ok(())
617}
618
619pub async fn ensure_object_not_refer<C>(
621 object_type: ObjectType,
622 object_id: ObjectId,
623 db: &C,
624) -> MetaResult<()>
625where
626 C: ConnectionTrait,
627{
628 let count = if object_type == ObjectType::Table {
630 ObjectDependency::find()
631 .join(
632 JoinType::InnerJoin,
633 object_dependency::Relation::Object1.def(),
634 )
635 .filter(
636 object_dependency::Column::Oid
637 .eq(object_id)
638 .and(object::Column::ObjType.ne(ObjectType::Index)),
639 )
640 .count(db)
641 .await?
642 } else {
643 ObjectDependency::find()
644 .filter(object_dependency::Column::Oid.eq(object_id))
645 .count(db)
646 .await?
647 };
648 if count != 0 {
649 return Err(MetaError::permission_denied(format!(
650 "{} used by {} other objects.",
651 object_type.as_str(),
652 count
653 )));
654 }
655 Ok(())
656}
657
658pub async fn get_referring_objects<C>(object_id: ObjectId, db: &C) -> MetaResult<Vec<PartialObject>>
660where
661 C: ConnectionTrait,
662{
663 let objs = ObjectDependency::find()
664 .filter(object_dependency::Column::Oid.eq(object_id))
665 .join(
666 JoinType::InnerJoin,
667 object_dependency::Relation::Object1.def(),
668 )
669 .into_partial_model()
670 .all(db)
671 .await?;
672
673 Ok(objs)
674}
675
676pub async fn ensure_schema_empty<C>(schema_id: SchemaId, db: &C) -> MetaResult<()>
678where
679 C: ConnectionTrait,
680{
681 let count = Object::find()
682 .filter(object::Column::SchemaId.eq(Some(schema_id)))
683 .count(db)
684 .await?;
685 if count != 0 {
686 return Err(MetaError::permission_denied("schema is not empty"));
687 }
688
689 Ok(())
690}
691
692pub async fn list_user_info_by_ids<C>(user_ids: Vec<UserId>, db: &C) -> MetaResult<Vec<PbUserInfo>>
694where
695 C: ConnectionTrait,
696{
697 let mut user_infos = vec![];
698 for user_id in user_ids {
699 let user = User::find_by_id(user_id)
700 .one(db)
701 .await?
702 .ok_or_else(|| MetaError::catalog_id_not_found("user", user_id))?;
703 let mut user_info: PbUserInfo = user.into();
704 user_info.grant_privileges = get_user_privilege(user_id, db).await?;
705 user_infos.push(user_info);
706 }
707 Ok(user_infos)
708}
709
710pub async fn get_object_owner<C>(object_id: ObjectId, db: &C) -> MetaResult<UserId>
712where
713 C: ConnectionTrait,
714{
715 let obj_owner: UserId = Object::find_by_id(object_id)
716 .select_only()
717 .column(object::Column::OwnerId)
718 .into_tuple()
719 .one(db)
720 .await?
721 .ok_or_else(|| MetaError::catalog_id_not_found("object", object_id))?;
722 Ok(obj_owner)
723}
724
725pub fn construct_privilege_dependency_query(ids: Vec<PrivilegeId>) -> WithQuery {
750 let cte_alias = Alias::new("granted_privilege_ids");
751 let cte_return_privilege_alias = Alias::new("id");
752 let cte_return_user_alias = Alias::new("user_id");
753
754 let mut base_query = SelectStatement::new()
755 .columns([user_privilege::Column::Id, user_privilege::Column::UserId])
756 .from(UserPrivilege)
757 .and_where(user_privilege::Column::Id.is_in(ids))
758 .to_owned();
759
760 let cte_referencing = Query::select()
761 .columns([
762 (UserPrivilege, user_privilege::Column::Id),
763 (UserPrivilege, user_privilege::Column::UserId),
764 ])
765 .from(UserPrivilege)
766 .inner_join(
767 cte_alias.clone(),
768 Expr::col((cte_alias.clone(), cte_return_privilege_alias.clone()))
769 .equals(user_privilege::Column::DependentId),
770 )
771 .to_owned();
772
773 let common_table_expr = CommonTableExpression::new()
774 .query(base_query.union(UnionType::All, cte_referencing).to_owned())
775 .columns([
776 cte_return_privilege_alias.clone(),
777 cte_return_user_alias.clone(),
778 ])
779 .table_name(cte_alias.clone())
780 .to_owned();
781
782 SelectStatement::new()
783 .columns([cte_return_privilege_alias, cte_return_user_alias])
784 .from(cte_alias.clone())
785 .to_owned()
786 .with(
787 WithClause::new()
788 .recursive(true)
789 .cte(common_table_expr)
790 .to_owned(),
791 )
792 .to_owned()
793}
794
795pub async fn get_internal_tables_by_id<C>(job_id: ObjectId, db: &C) -> MetaResult<Vec<TableId>>
796where
797 C: ConnectionTrait,
798{
799 let table_ids: Vec<TableId> = Table::find()
800 .select_only()
801 .column(table::Column::TableId)
802 .filter(
803 table::Column::TableType
804 .eq(TableType::Internal)
805 .and(table::Column::BelongsToJobId.eq(job_id)),
806 )
807 .into_tuple()
808 .all(db)
809 .await?;
810 Ok(table_ids)
811}
812
813pub async fn get_index_state_tables_by_table_id<C>(
814 table_id: TableId,
815 db: &C,
816) -> MetaResult<Vec<TableId>>
817where
818 C: ConnectionTrait,
819{
820 let mut index_table_ids: Vec<TableId> = Index::find()
821 .select_only()
822 .column(index::Column::IndexTableId)
823 .filter(index::Column::PrimaryTableId.eq(table_id))
824 .into_tuple()
825 .all(db)
826 .await?;
827
828 if !index_table_ids.is_empty() {
829 let internal_table_ids: Vec<TableId> = Table::find()
830 .select_only()
831 .column(table::Column::TableId)
832 .filter(
833 table::Column::TableType
834 .eq(TableType::Internal)
835 .and(table::Column::BelongsToJobId.is_in(index_table_ids.clone())),
836 )
837 .into_tuple()
838 .all(db)
839 .await?;
840
841 index_table_ids.extend(internal_table_ids.into_iter());
842 }
843
844 Ok(index_table_ids)
845}
846
847#[derive(Clone, DerivePartialModel, FromQueryResult)]
848#[sea_orm(entity = "UserPrivilege")]
849pub struct PartialUserPrivilege {
850 pub id: PrivilegeId,
851 pub user_id: UserId,
852}
853
854pub async fn get_referring_privileges_cascade<C>(
855 ids: Vec<PrivilegeId>,
856 db: &C,
857) -> MetaResult<Vec<PartialUserPrivilege>>
858where
859 C: ConnectionTrait,
860{
861 let query = construct_privilege_dependency_query(ids);
862 let (sql, values) = query.build_any(&*db.get_database_backend().get_query_builder());
863 let privileges = PartialUserPrivilege::find_by_statement(Statement::from_sql_and_values(
864 db.get_database_backend(),
865 sql,
866 values,
867 ))
868 .all(db)
869 .await?;
870
871 Ok(privileges)
872}
873
874pub async fn ensure_privileges_not_referred<C>(ids: Vec<PrivilegeId>, db: &C) -> MetaResult<()>
876where
877 C: ConnectionTrait,
878{
879 let count = UserPrivilege::find()
880 .filter(user_privilege::Column::DependentId.is_in(ids))
881 .count(db)
882 .await?;
883 if count != 0 {
884 return Err(MetaError::permission_denied(format!(
885 "privileges granted to {} other ones.",
886 count
887 )));
888 }
889 Ok(())
890}
891
892pub async fn get_user_privilege<C>(user_id: UserId, db: &C) -> MetaResult<Vec<PbGrantPrivilege>>
894where
895 C: ConnectionTrait,
896{
897 let user_privileges = UserPrivilege::find()
898 .find_also_related(Object)
899 .filter(user_privilege::Column::UserId.eq(user_id))
900 .all(db)
901 .await?;
902 Ok(user_privileges
903 .into_iter()
904 .map(|(privilege, object)| {
905 let object = object.unwrap();
906 let oid = object.oid as _;
907 let obj = match object.obj_type {
908 ObjectType::Database => PbGrantObject::DatabaseId(oid),
909 ObjectType::Schema => PbGrantObject::SchemaId(oid),
910 ObjectType::Table | ObjectType::Index => PbGrantObject::TableId(oid),
911 ObjectType::Source => PbGrantObject::SourceId(oid),
912 ObjectType::Sink => PbGrantObject::SinkId(oid),
913 ObjectType::View => PbGrantObject::ViewId(oid),
914 ObjectType::Function => PbGrantObject::FunctionId(oid),
915 ObjectType::Connection => unreachable!("connection is not supported yet"),
916 ObjectType::Subscription => PbGrantObject::SubscriptionId(oid),
917 ObjectType::Secret => unreachable!("secret is not supported yet"),
918 };
919 PbGrantPrivilege {
920 action_with_opts: vec![PbActionWithGrantOption {
921 action: PbAction::from(privilege.action) as _,
922 with_grant_option: privilege.with_grant_option,
923 granted_by: privilege.granted_by as _,
924 }],
925 object: Some(obj),
926 }
927 })
928 .collect())
929}
930
931pub fn extract_grant_obj_id(object: &PbGrantObject) -> ObjectId {
933 match object {
934 PbGrantObject::DatabaseId(id)
935 | PbGrantObject::SchemaId(id)
936 | PbGrantObject::TableId(id)
937 | PbGrantObject::SourceId(id)
938 | PbGrantObject::SinkId(id)
939 | PbGrantObject::ViewId(id)
940 | PbGrantObject::FunctionId(id)
941 | PbGrantObject::SubscriptionId(id)
942 | PbGrantObject::ConnectionId(id)
943 | PbGrantObject::SecretId(id) => *id as _,
944 }
945}
946
947pub async fn insert_fragment_relations(
948 db: &impl ConnectionTrait,
949 downstream_fragment_relations: &FragmentDownstreamRelation,
950) -> MetaResult<()> {
951 for (upstream_fragment_id, downstreams) in downstream_fragment_relations {
952 for downstream in downstreams {
953 let relation = fragment_relation::Model {
954 source_fragment_id: *upstream_fragment_id as _,
955 target_fragment_id: downstream.downstream_fragment_id as _,
956 dispatcher_type: downstream.dispatcher_type,
957 dist_key_indices: downstream
958 .dist_key_indices
959 .iter()
960 .map(|idx| *idx as i32)
961 .collect_vec()
962 .into(),
963 output_indices: downstream
964 .output_mapping
965 .indices
966 .iter()
967 .map(|idx| *idx as i32)
968 .collect_vec()
969 .into(),
970 output_type_mapping: Some(downstream.output_mapping.types.clone().into()),
971 };
972 FragmentRelation::insert(relation.into_active_model())
973 .exec(db)
974 .await?;
975 }
976 }
977 Ok(())
978}
979
980pub async fn get_fragment_actor_dispatchers<C>(
981 db: &C,
982 fragment_ids: Vec<FragmentId>,
983) -> MetaResult<FragmentActorDispatchers>
984where
985 C: ConnectionTrait,
986{
987 type FragmentActorInfo = (
988 DistributionType,
989 Arc<HashMap<crate::model::ActorId, Option<Bitmap>>>,
990 );
991 let mut fragment_actor_cache: HashMap<FragmentId, FragmentActorInfo> = HashMap::new();
992 let get_fragment_actors = |fragment_id: FragmentId| async move {
993 let result: MetaResult<FragmentActorInfo> = try {
994 let mut fragment_actors = Fragment::find_by_id(fragment_id)
995 .find_with_related(Actor)
996 .filter(actor::Column::Status.eq(ActorStatus::Running))
997 .all(db)
998 .await?;
999 if fragment_actors.is_empty() {
1000 return Err(anyhow!("failed to find fragment: {}", fragment_id).into());
1001 }
1002 assert_eq!(
1003 fragment_actors.len(),
1004 1,
1005 "find multiple fragment {:?}",
1006 fragment_actors
1007 );
1008 let (fragment, actors) = fragment_actors.pop().unwrap();
1009 (
1010 fragment.distribution_type,
1011 Arc::new(
1012 actors
1013 .into_iter()
1014 .map(|actor| {
1015 (
1016 actor.actor_id as _,
1017 actor
1018 .vnode_bitmap
1019 .map(|bitmap| Bitmap::from(bitmap.to_protobuf())),
1020 )
1021 })
1022 .collect(),
1023 ),
1024 )
1025 };
1026 result
1027 };
1028 let fragment_relations = FragmentRelation::find()
1029 .filter(fragment_relation::Column::SourceFragmentId.is_in(fragment_ids))
1030 .all(db)
1031 .await?;
1032
1033 let mut actor_dispatchers_map: HashMap<_, HashMap<_, Vec<_>>> = HashMap::new();
1034 for fragment_relation::Model {
1035 source_fragment_id,
1036 target_fragment_id,
1037 dispatcher_type,
1038 dist_key_indices,
1039 output_indices,
1040 output_type_mapping,
1041 } in fragment_relations
1042 {
1043 let (source_fragment_distribution, source_fragment_actors) = {
1044 let (distribution, actors) = {
1045 match fragment_actor_cache.entry(source_fragment_id) {
1046 Entry::Occupied(entry) => entry.into_mut(),
1047 Entry::Vacant(entry) => {
1048 entry.insert(get_fragment_actors(source_fragment_id).await?)
1049 }
1050 }
1051 };
1052 (*distribution, actors.clone())
1053 };
1054 let (target_fragment_distribution, target_fragment_actors) = {
1055 let (distribution, actors) = {
1056 match fragment_actor_cache.entry(target_fragment_id) {
1057 Entry::Occupied(entry) => entry.into_mut(),
1058 Entry::Vacant(entry) => {
1059 entry.insert(get_fragment_actors(target_fragment_id).await?)
1060 }
1061 }
1062 };
1063 (*distribution, actors.clone())
1064 };
1065 let output_mapping = PbDispatchOutputMapping {
1066 indices: output_indices.into_u32_array(),
1067 types: output_type_mapping.unwrap_or_default().to_protobuf(),
1068 };
1069 let dispatchers = compose_dispatchers(
1070 source_fragment_distribution,
1071 &source_fragment_actors,
1072 target_fragment_id as _,
1073 target_fragment_distribution,
1074 &target_fragment_actors,
1075 dispatcher_type,
1076 dist_key_indices.into_u32_array(),
1077 output_mapping,
1078 );
1079 let actor_dispatchers_map = actor_dispatchers_map
1080 .entry(source_fragment_id as _)
1081 .or_default();
1082 for (actor_id, dispatchers) in dispatchers {
1083 actor_dispatchers_map
1084 .entry(actor_id as _)
1085 .or_default()
1086 .push(dispatchers);
1087 }
1088 }
1089 Ok(actor_dispatchers_map)
1090}
1091
1092pub fn compose_dispatchers(
1093 source_fragment_distribution: DistributionType,
1094 source_fragment_actors: &HashMap<crate::model::ActorId, Option<Bitmap>>,
1095 target_fragment_id: crate::model::FragmentId,
1096 target_fragment_distribution: DistributionType,
1097 target_fragment_actors: &HashMap<crate::model::ActorId, Option<Bitmap>>,
1098 dispatcher_type: DispatcherType,
1099 dist_key_indices: Vec<u32>,
1100 output_mapping: PbDispatchOutputMapping,
1101) -> HashMap<crate::model::ActorId, PbDispatcher> {
1102 match dispatcher_type {
1103 DispatcherType::Hash => {
1104 let dispatcher = PbDispatcher {
1105 r#type: PbDispatcherType::from(dispatcher_type) as _,
1106 dist_key_indices: dist_key_indices.clone(),
1107 output_mapping: output_mapping.into(),
1108 hash_mapping: Some(
1109 ActorMapping::from_bitmaps(
1110 &target_fragment_actors
1111 .iter()
1112 .map(|(actor_id, bitmap)| {
1113 (
1114 *actor_id as _,
1115 bitmap
1116 .clone()
1117 .expect("downstream hash dispatch must have distribution"),
1118 )
1119 })
1120 .collect(),
1121 )
1122 .to_protobuf(),
1123 ),
1124 dispatcher_id: target_fragment_id as _,
1125 downstream_actor_id: target_fragment_actors
1126 .keys()
1127 .map(|actor_id| *actor_id as _)
1128 .collect(),
1129 };
1130 source_fragment_actors
1131 .keys()
1132 .map(|source_actor_id| (*source_actor_id, dispatcher.clone()))
1133 .collect()
1134 }
1135 DispatcherType::Broadcast | DispatcherType::Simple => {
1136 let dispatcher = PbDispatcher {
1137 r#type: PbDispatcherType::from(dispatcher_type) as _,
1138 dist_key_indices: dist_key_indices.clone(),
1139 output_mapping: output_mapping.into(),
1140 hash_mapping: None,
1141 dispatcher_id: target_fragment_id as _,
1142 downstream_actor_id: target_fragment_actors
1143 .keys()
1144 .map(|actor_id| *actor_id as _)
1145 .collect(),
1146 };
1147 source_fragment_actors
1148 .keys()
1149 .map(|source_actor_id| (*source_actor_id, dispatcher.clone()))
1150 .collect()
1151 }
1152 DispatcherType::NoShuffle => resolve_no_shuffle_actor_dispatcher(
1153 source_fragment_distribution,
1154 source_fragment_actors,
1155 target_fragment_distribution,
1156 target_fragment_actors,
1157 )
1158 .into_iter()
1159 .map(|(upstream_actor_id, downstream_actor_id)| {
1160 (
1161 upstream_actor_id,
1162 PbDispatcher {
1163 r#type: PbDispatcherType::NoShuffle as _,
1164 dist_key_indices: dist_key_indices.clone(),
1165 output_mapping: output_mapping.clone().into(),
1166 hash_mapping: None,
1167 dispatcher_id: target_fragment_id as _,
1168 downstream_actor_id: vec![downstream_actor_id as _],
1169 },
1170 )
1171 })
1172 .collect(),
1173 }
1174}
1175
1176pub fn resolve_no_shuffle_actor_dispatcher(
1178 source_fragment_distribution: DistributionType,
1179 source_fragment_actors: &HashMap<crate::model::ActorId, Option<Bitmap>>,
1180 target_fragment_distribution: DistributionType,
1181 target_fragment_actors: &HashMap<crate::model::ActorId, Option<Bitmap>>,
1182) -> Vec<(crate::model::ActorId, crate::model::ActorId)> {
1183 assert_eq!(source_fragment_distribution, target_fragment_distribution);
1184 assert_eq!(
1185 source_fragment_actors.len(),
1186 target_fragment_actors.len(),
1187 "no-shuffle should have equal upstream downstream actor count: {:?} {:?}",
1188 source_fragment_actors,
1189 target_fragment_actors
1190 );
1191 match source_fragment_distribution {
1192 DistributionType::Single => {
1193 let assert_singleton = |bitmap: &Option<Bitmap>| {
1194 assert!(
1195 bitmap.as_ref().map(|bitmap| bitmap.all()).unwrap_or(true),
1196 "not singleton: {:?}",
1197 bitmap
1198 );
1199 };
1200 assert_eq!(
1201 source_fragment_actors.len(),
1202 1,
1203 "singleton distribution actor count not 1: {:?}",
1204 source_fragment_distribution
1205 );
1206 assert_eq!(
1207 target_fragment_actors.len(),
1208 1,
1209 "singleton distribution actor count not 1: {:?}",
1210 target_fragment_distribution
1211 );
1212 let (source_actor_id, bitmap) = source_fragment_actors.iter().next().unwrap();
1213 assert_singleton(bitmap);
1214 let (target_actor_id, bitmap) = target_fragment_actors.iter().next().unwrap();
1215 assert_singleton(bitmap);
1216 vec![(*source_actor_id, *target_actor_id)]
1217 }
1218 DistributionType::Hash => {
1219 let mut target_fragment_actor_index: HashMap<_, _> = target_fragment_actors
1220 .iter()
1221 .map(|(actor_id, bitmap)| {
1222 let bitmap = bitmap
1223 .as_ref()
1224 .expect("hash distribution should have bitmap");
1225 let first_vnode = bitmap.iter_vnodes().next().expect("non-empty bitmap");
1226 (first_vnode, (*actor_id, bitmap))
1227 })
1228 .collect();
1229 source_fragment_actors
1230 .iter()
1231 .map(|(source_actor_id, bitmap)| {
1232 let bitmap = bitmap
1233 .as_ref()
1234 .expect("hash distribution should have bitmap");
1235 let first_vnode = bitmap.iter_vnodes().next().expect("non-empty bitmap");
1236 let (target_actor_id, target_bitmap) =
1237 target_fragment_actor_index.remove(&first_vnode).unwrap_or_else(|| {
1238 panic!(
1239 "cannot find matched target actor: {} {:?} {:?} {:?}",
1240 source_actor_id,
1241 first_vnode,
1242 source_fragment_actors,
1243 target_fragment_actors
1244 );
1245 });
1246 assert_eq!(
1247 bitmap,
1248 target_bitmap,
1249 "cannot find matched target actor due to bitmap mismatch: {} {:?} {:?} {:?}",
1250 source_actor_id,
1251 first_vnode,
1252 source_fragment_actors,
1253 target_fragment_actors
1254 );
1255 (*source_actor_id, target_actor_id)
1256 }).collect()
1257 }
1258 }
1259}
1260
1261pub async fn get_fragment_mappings<C>(
1263 db: &C,
1264 job_id: ObjectId,
1265) -> MetaResult<Vec<PbFragmentWorkerSlotMapping>>
1266where
1267 C: ConnectionTrait,
1268{
1269 let job_actors: Vec<(
1270 FragmentId,
1271 DistributionType,
1272 ActorId,
1273 Option<VnodeBitmap>,
1274 WorkerId,
1275 ActorStatus,
1276 )> = Actor::find()
1277 .select_only()
1278 .columns([
1279 fragment::Column::FragmentId,
1280 fragment::Column::DistributionType,
1281 ])
1282 .columns([
1283 actor::Column::ActorId,
1284 actor::Column::VnodeBitmap,
1285 actor::Column::WorkerId,
1286 actor::Column::Status,
1287 ])
1288 .join(JoinType::InnerJoin, actor::Relation::Fragment.def())
1289 .filter(fragment::Column::JobId.eq(job_id))
1290 .into_tuple()
1291 .all(db)
1292 .await?;
1293
1294 Ok(rebuild_fragment_mapping_from_actors(job_actors))
1295}
1296
1297pub fn rebuild_fragment_mapping_from_actors(
1298 job_actors: Vec<(
1299 FragmentId,
1300 DistributionType,
1301 ActorId,
1302 Option<VnodeBitmap>,
1303 WorkerId,
1304 ActorStatus,
1305 )>,
1306) -> Vec<FragmentWorkerSlotMapping> {
1307 let mut all_actor_locations = HashMap::new();
1308 let mut actor_bitmaps = HashMap::new();
1309 let mut fragment_actors = HashMap::new();
1310 let mut fragment_dist = HashMap::new();
1311
1312 for (fragment_id, dist, actor_id, bitmap, worker_id, actor_status) in job_actors {
1313 if actor_status == ActorStatus::Inactive {
1314 continue;
1315 }
1316
1317 all_actor_locations
1318 .entry(fragment_id)
1319 .or_insert(HashMap::new())
1320 .insert(actor_id as hash::ActorId, worker_id as u32);
1321 actor_bitmaps.insert(actor_id, bitmap);
1322 fragment_actors
1323 .entry(fragment_id)
1324 .or_insert_with(Vec::new)
1325 .push(actor_id);
1326 fragment_dist.insert(fragment_id, dist);
1327 }
1328
1329 let mut result = vec![];
1330 for (fragment_id, dist) in fragment_dist {
1331 let mut actor_locations = all_actor_locations.remove(&fragment_id).unwrap();
1332 let fragment_worker_slot_mapping = match dist {
1333 DistributionType::Single => {
1334 let actor = fragment_actors
1335 .remove(&fragment_id)
1336 .unwrap()
1337 .into_iter()
1338 .exactly_one()
1339 .unwrap() as hash::ActorId;
1340 let actor_location = actor_locations.remove(&actor).unwrap();
1341
1342 WorkerSlotMapping::new_single(WorkerSlotId::new(actor_location, 0))
1343 }
1344 DistributionType::Hash => {
1345 let actors = fragment_actors.remove(&fragment_id).unwrap();
1346
1347 let all_actor_bitmaps: HashMap<_, _> = actors
1348 .iter()
1349 .map(|actor_id| {
1350 let vnode_bitmap = actor_bitmaps
1351 .remove(actor_id)
1352 .flatten()
1353 .expect("actor bitmap shouldn't be none in hash fragment");
1354
1355 let bitmap = Bitmap::from(&vnode_bitmap.to_protobuf());
1356 (*actor_id as hash::ActorId, bitmap)
1357 })
1358 .collect();
1359
1360 let actor_mapping = ActorMapping::from_bitmaps(&all_actor_bitmaps);
1361
1362 actor_mapping.to_worker_slot(&actor_locations)
1363 }
1364 };
1365
1366 result.push(PbFragmentWorkerSlotMapping {
1367 fragment_id: fragment_id as u32,
1368 mapping: Some(fragment_worker_slot_mapping.to_protobuf()),
1369 })
1370 }
1371 result
1372}
1373
1374pub async fn get_fragment_actor_ids<C>(
1376 db: &C,
1377 fragment_ids: Vec<FragmentId>,
1378) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>>
1379where
1380 C: ConnectionTrait,
1381{
1382 let fragment_actors: Vec<(FragmentId, ActorId)> = Actor::find()
1383 .select_only()
1384 .columns([actor::Column::FragmentId, actor::Column::ActorId])
1385 .filter(actor::Column::FragmentId.is_in(fragment_ids))
1386 .into_tuple()
1387 .all(db)
1388 .await?;
1389
1390 Ok(fragment_actors.into_iter().into_group_map())
1391}
1392
1393pub async fn get_fragments_for_jobs<C>(
1398 db: &C,
1399 streaming_jobs: Vec<ObjectId>,
1400) -> MetaResult<(
1401 HashMap<SourceId, BTreeSet<FragmentId>>,
1402 HashSet<ActorId>,
1403 HashSet<FragmentId>,
1404)>
1405where
1406 C: ConnectionTrait,
1407{
1408 if streaming_jobs.is_empty() {
1409 return Ok((HashMap::default(), HashSet::default(), HashSet::default()));
1410 }
1411
1412 let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find()
1413 .select_only()
1414 .columns([
1415 fragment::Column::FragmentId,
1416 fragment::Column::FragmentTypeMask,
1417 fragment::Column::StreamNode,
1418 ])
1419 .filter(fragment::Column::JobId.is_in(streaming_jobs))
1420 .into_tuple()
1421 .all(db)
1422 .await?;
1423 let actors: Vec<ActorId> = Actor::find()
1424 .select_only()
1425 .column(actor::Column::ActorId)
1426 .filter(
1427 actor::Column::FragmentId.is_in(fragments.iter().map(|(id, _, _)| *id).collect_vec()),
1428 )
1429 .into_tuple()
1430 .all(db)
1431 .await?;
1432
1433 let fragment_ids = fragments
1434 .iter()
1435 .map(|(fragment_id, _, _)| *fragment_id)
1436 .collect();
1437
1438 let mut source_fragment_ids: HashMap<SourceId, BTreeSet<FragmentId>> = HashMap::new();
1439 for (fragment_id, mask, stream_node) in fragments {
1440 if mask & PbFragmentTypeFlag::Source as i32 == 0 {
1441 continue;
1442 }
1443 if let Some(source_id) = stream_node.to_protobuf().find_stream_source() {
1444 source_fragment_ids
1445 .entry(source_id as _)
1446 .or_default()
1447 .insert(fragment_id);
1448 }
1449 }
1450
1451 Ok((
1452 source_fragment_ids,
1453 actors.into_iter().collect(),
1454 fragment_ids,
1455 ))
1456}
1457
1458pub(crate) fn build_object_group_for_delete(
1463 partial_objects: Vec<PartialObject>,
1464) -> NotificationInfo {
1465 let mut objects = vec![];
1466 for obj in partial_objects {
1467 match obj.obj_type {
1468 ObjectType::Database => objects.push(PbObject {
1469 object_info: Some(PbObjectInfo::Database(PbDatabase {
1470 id: obj.oid as _,
1471 ..Default::default()
1472 })),
1473 }),
1474 ObjectType::Schema => objects.push(PbObject {
1475 object_info: Some(PbObjectInfo::Schema(PbSchema {
1476 id: obj.oid as _,
1477 database_id: obj.database_id.unwrap() as _,
1478 ..Default::default()
1479 })),
1480 }),
1481 ObjectType::Table => objects.push(PbObject {
1482 object_info: Some(PbObjectInfo::Table(PbTable {
1483 id: obj.oid as _,
1484 schema_id: obj.schema_id.unwrap() as _,
1485 database_id: obj.database_id.unwrap() as _,
1486 ..Default::default()
1487 })),
1488 }),
1489 ObjectType::Source => objects.push(PbObject {
1490 object_info: Some(PbObjectInfo::Source(PbSource {
1491 id: obj.oid as _,
1492 schema_id: obj.schema_id.unwrap() as _,
1493 database_id: obj.database_id.unwrap() as _,
1494 ..Default::default()
1495 })),
1496 }),
1497 ObjectType::Sink => objects.push(PbObject {
1498 object_info: Some(PbObjectInfo::Sink(PbSink {
1499 id: obj.oid as _,
1500 schema_id: obj.schema_id.unwrap() as _,
1501 database_id: obj.database_id.unwrap() as _,
1502 ..Default::default()
1503 })),
1504 }),
1505 ObjectType::Subscription => objects.push(PbObject {
1506 object_info: Some(PbObjectInfo::Subscription(PbSubscription {
1507 id: obj.oid as _,
1508 schema_id: obj.schema_id.unwrap() as _,
1509 database_id: obj.database_id.unwrap() as _,
1510 ..Default::default()
1511 })),
1512 }),
1513 ObjectType::View => objects.push(PbObject {
1514 object_info: Some(PbObjectInfo::View(PbView {
1515 id: obj.oid as _,
1516 schema_id: obj.schema_id.unwrap() as _,
1517 database_id: obj.database_id.unwrap() as _,
1518 ..Default::default()
1519 })),
1520 }),
1521 ObjectType::Index => {
1522 objects.push(PbObject {
1523 object_info: Some(PbObjectInfo::Index(PbIndex {
1524 id: obj.oid as _,
1525 schema_id: obj.schema_id.unwrap() as _,
1526 database_id: obj.database_id.unwrap() as _,
1527 ..Default::default()
1528 })),
1529 });
1530 objects.push(PbObject {
1531 object_info: Some(PbObjectInfo::Table(PbTable {
1532 id: obj.oid as _,
1533 schema_id: obj.schema_id.unwrap() as _,
1534 database_id: obj.database_id.unwrap() as _,
1535 ..Default::default()
1536 })),
1537 });
1538 }
1539 ObjectType::Function => objects.push(PbObject {
1540 object_info: Some(PbObjectInfo::Function(PbFunction {
1541 id: obj.oid as _,
1542 schema_id: obj.schema_id.unwrap() as _,
1543 database_id: obj.database_id.unwrap() as _,
1544 ..Default::default()
1545 })),
1546 }),
1547 ObjectType::Connection => objects.push(PbObject {
1548 object_info: Some(PbObjectInfo::Connection(PbConnection {
1549 id: obj.oid as _,
1550 schema_id: obj.schema_id.unwrap() as _,
1551 database_id: obj.database_id.unwrap() as _,
1552 ..Default::default()
1553 })),
1554 }),
1555 ObjectType::Secret => objects.push(PbObject {
1556 object_info: Some(PbObjectInfo::Secret(PbSecret {
1557 id: obj.oid as _,
1558 schema_id: obj.schema_id.unwrap() as _,
1559 database_id: obj.database_id.unwrap() as _,
1560 ..Default::default()
1561 })),
1562 }),
1563 }
1564 }
1565 NotificationInfo::ObjectGroup(PbObjectGroup { objects })
1566}
1567
1568pub fn extract_external_table_name_from_definition(table_definition: &str) -> Option<String> {
1569 let [mut definition]: [_; 1] = Parser::parse_sql(table_definition)
1570 .context("unable to parse table definition")
1571 .inspect_err(|e| {
1572 tracing::error!(
1573 target: "auto_schema_change",
1574 error = %e.as_report(),
1575 "failed to parse table definition")
1576 })
1577 .unwrap()
1578 .try_into()
1579 .unwrap();
1580 if let SqlStatement::CreateTable { cdc_table_info, .. } = &mut definition {
1581 cdc_table_info
1582 .clone()
1583 .map(|cdc_table_info| cdc_table_info.external_table_name)
1584 } else {
1585 None
1586 }
1587}
1588
1589pub async fn rename_relation(
1592 txn: &DatabaseTransaction,
1593 object_type: ObjectType,
1594 object_id: ObjectId,
1595 object_name: &str,
1596) -> MetaResult<(Vec<PbObject>, String)> {
1597 use sea_orm::ActiveModelTrait;
1598
1599 use crate::controller::rename::alter_relation_rename;
1600
1601 let mut to_update_relations = vec![];
1602 macro_rules! rename_relation {
1604 ($entity:ident, $table:ident, $identity:ident, $object_id:expr) => {{
1605 let (mut relation, obj) = $entity::find_by_id($object_id)
1606 .find_also_related(Object)
1607 .one(txn)
1608 .await?
1609 .unwrap();
1610 let obj = obj.unwrap();
1611 let old_name = relation.name.clone();
1612 relation.name = object_name.into();
1613 if obj.obj_type != ObjectType::View {
1614 relation.definition = alter_relation_rename(&relation.definition, object_name);
1615 }
1616 let active_model = $table::ActiveModel {
1617 $identity: Set(relation.$identity),
1618 name: Set(object_name.into()),
1619 definition: Set(relation.definition.clone()),
1620 ..Default::default()
1621 };
1622 active_model.update(txn).await?;
1623 to_update_relations.push(PbObject {
1624 object_info: Some(PbObjectInfo::$entity(ObjectModel(relation, obj).into())),
1625 });
1626 old_name
1627 }};
1628 }
1629 let old_name = match object_type {
1631 ObjectType::Table => {
1632 let associated_source_id: Option<SourceId> = Source::find()
1633 .select_only()
1634 .column(source::Column::SourceId)
1635 .filter(source::Column::OptionalAssociatedTableId.eq(object_id))
1636 .into_tuple()
1637 .one(txn)
1638 .await?;
1639 if let Some(source_id) = associated_source_id {
1640 rename_relation!(Source, source, source_id, source_id);
1641 }
1642 rename_relation!(Table, table, table_id, object_id)
1643 }
1644 ObjectType::Source => rename_relation!(Source, source, source_id, object_id),
1645 ObjectType::Sink => rename_relation!(Sink, sink, sink_id, object_id),
1646 ObjectType::Subscription => {
1647 rename_relation!(Subscription, subscription, subscription_id, object_id)
1648 }
1649 ObjectType::View => rename_relation!(View, view, view_id, object_id),
1650 ObjectType::Index => {
1651 let (mut index, obj) = Index::find_by_id(object_id)
1652 .find_also_related(Object)
1653 .one(txn)
1654 .await?
1655 .unwrap();
1656 index.name = object_name.into();
1657 let index_table_id = index.index_table_id;
1658 let old_name = rename_relation!(Table, table, table_id, index_table_id);
1659
1660 let active_model = index::ActiveModel {
1662 index_id: sea_orm::ActiveValue::Set(index.index_id),
1663 name: sea_orm::ActiveValue::Set(object_name.into()),
1664 ..Default::default()
1665 };
1666 active_model.update(txn).await?;
1667 to_update_relations.push(PbObject {
1668 object_info: Some(PbObjectInfo::Index(ObjectModel(index, obj.unwrap()).into())),
1669 });
1670 old_name
1671 }
1672 _ => unreachable!("only relation name can be altered."),
1673 };
1674
1675 Ok((to_update_relations, old_name))
1676}
1677
1678pub async fn get_database_resource_group<C>(txn: &C, database_id: ObjectId) -> MetaResult<String>
1679where
1680 C: ConnectionTrait,
1681{
1682 let database_resource_group: Option<String> = Database::find_by_id(database_id)
1683 .select_only()
1684 .column(database::Column::ResourceGroup)
1685 .into_tuple()
1686 .one(txn)
1687 .await?
1688 .ok_or_else(|| MetaError::catalog_id_not_found("database", database_id))?;
1689
1690 Ok(database_resource_group.unwrap_or_else(|| DEFAULT_RESOURCE_GROUP.to_owned()))
1691}
1692
1693pub async fn get_existing_job_resource_group<C>(
1694 txn: &C,
1695 streaming_job_id: ObjectId,
1696) -> MetaResult<String>
1697where
1698 C: ConnectionTrait,
1699{
1700 let (job_specific_resource_group, database_resource_group): (Option<String>, Option<String>) =
1701 StreamingJob::find_by_id(streaming_job_id)
1702 .select_only()
1703 .join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
1704 .join(JoinType::InnerJoin, object::Relation::Database2.def())
1705 .column(streaming_job::Column::SpecificResourceGroup)
1706 .column(database::Column::ResourceGroup)
1707 .into_tuple()
1708 .one(txn)
1709 .await?
1710 .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", streaming_job_id))?;
1711
1712 Ok(job_specific_resource_group.unwrap_or_else(|| {
1713 database_resource_group.unwrap_or_else(|| DEFAULT_RESOURCE_GROUP.to_owned())
1714 }))
1715}
1716
1717pub fn filter_workers_by_resource_group(
1718 workers: &HashMap<u32, WorkerNode>,
1719 resource_group: &str,
1720) -> BTreeSet<WorkerId> {
1721 workers
1722 .iter()
1723 .filter(|&(_, worker)| {
1724 worker
1725 .resource_group()
1726 .map(|node_label| node_label.as_str() == resource_group)
1727 .unwrap_or(false)
1728 })
1729 .map(|(id, _)| (*id as WorkerId))
1730 .collect()
1731}
1732
1733pub async fn rename_relation_refer(
1736 txn: &DatabaseTransaction,
1737 object_type: ObjectType,
1738 object_id: ObjectId,
1739 object_name: &str,
1740 old_name: &str,
1741) -> MetaResult<Vec<PbObject>> {
1742 use sea_orm::ActiveModelTrait;
1743
1744 use crate::controller::rename::alter_relation_rename_refs;
1745
1746 let mut to_update_relations = vec![];
1747 macro_rules! rename_relation_ref {
1748 ($entity:ident, $table:ident, $identity:ident, $object_id:expr) => {{
1749 let (mut relation, obj) = $entity::find_by_id($object_id)
1750 .find_also_related(Object)
1751 .one(txn)
1752 .await?
1753 .unwrap();
1754 relation.definition =
1755 alter_relation_rename_refs(&relation.definition, old_name, object_name);
1756 let active_model = $table::ActiveModel {
1757 $identity: Set(relation.$identity),
1758 definition: Set(relation.definition.clone()),
1759 ..Default::default()
1760 };
1761 active_model.update(txn).await?;
1762 to_update_relations.push(PbObject {
1763 object_info: Some(PbObjectInfo::$entity(
1764 ObjectModel(relation, obj.unwrap()).into(),
1765 )),
1766 });
1767 }};
1768 }
1769 let mut objs = get_referring_objects(object_id, txn).await?;
1770 if object_type == ObjectType::Table {
1771 let incoming_sinks: I32Array = Table::find_by_id(object_id)
1772 .select_only()
1773 .column(table::Column::IncomingSinks)
1774 .into_tuple()
1775 .one(txn)
1776 .await?
1777 .ok_or_else(|| MetaError::catalog_id_not_found("table", object_id))?;
1778
1779 objs.extend(
1780 incoming_sinks
1781 .into_inner()
1782 .into_iter()
1783 .map(|id| PartialObject {
1784 oid: id,
1785 obj_type: ObjectType::Sink,
1786 schema_id: None,
1787 database_id: None,
1788 }),
1789 );
1790 }
1791
1792 for obj in objs {
1793 match obj.obj_type {
1794 ObjectType::Table => rename_relation_ref!(Table, table, table_id, obj.oid),
1795 ObjectType::Sink => rename_relation_ref!(Sink, sink, sink_id, obj.oid),
1796 ObjectType::Subscription => {
1797 rename_relation_ref!(Subscription, subscription, subscription_id, obj.oid)
1798 }
1799 ObjectType::View => rename_relation_ref!(View, view, view_id, obj.oid),
1800 ObjectType::Index => {
1801 let index_table_id: Option<TableId> = Index::find_by_id(obj.oid)
1802 .select_only()
1803 .column(index::Column::IndexTableId)
1804 .into_tuple()
1805 .one(txn)
1806 .await?;
1807 rename_relation_ref!(Table, table, table_id, index_table_id.unwrap());
1808 }
1809 _ => {
1810 bail!("only table, sink, subscription, view and index depend on other objects.")
1811 }
1812 }
1813 }
1814
1815 Ok(to_update_relations)
1816}
1817
1818pub async fn validate_subscription_deletion<C>(txn: &C, subscription_id: ObjectId) -> MetaResult<()>
1822where
1823 C: ConnectionTrait,
1824{
1825 let upstream_table_id: ObjectId = Subscription::find_by_id(subscription_id)
1826 .select_only()
1827 .column(subscription::Column::DependentTableId)
1828 .into_tuple()
1829 .one(txn)
1830 .await?
1831 .ok_or_else(|| MetaError::catalog_id_not_found("subscription", subscription_id))?;
1832
1833 let cnt = Subscription::find()
1834 .filter(subscription::Column::DependentTableId.eq(upstream_table_id))
1835 .count(txn)
1836 .await?;
1837 if cnt > 1 {
1838 return Ok(());
1841 }
1842
1843 let obj_alias = Alias::new("o1");
1845 let used_by_alias = Alias::new("o2");
1846 let count = ObjectDependency::find()
1847 .join_as(
1848 JoinType::InnerJoin,
1849 object_dependency::Relation::Object2.def(),
1850 obj_alias.clone(),
1851 )
1852 .join_as(
1853 JoinType::InnerJoin,
1854 object_dependency::Relation::Object1.def(),
1855 used_by_alias.clone(),
1856 )
1857 .filter(
1858 object_dependency::Column::Oid
1859 .eq(upstream_table_id)
1860 .and(object_dependency::Column::UsedBy.ne(subscription_id))
1861 .and(
1862 Expr::col((obj_alias, object::Column::DatabaseId))
1863 .ne(Expr::col((used_by_alias, object::Column::DatabaseId))),
1864 ),
1865 )
1866 .count(txn)
1867 .await?;
1868
1869 if count != 0 {
1870 return Err(MetaError::permission_denied(format!(
1871 "Referenced by {} cross-db objects.",
1872 count
1873 )));
1874 }
1875
1876 Ok(())
1877}
1878
1879#[cfg(test)]
1880mod tests {
1881 use super::*;
1882
1883 #[test]
1884 fn test_extract_cdc_table_name() {
1885 let ddl1 = "CREATE TABLE t1 () FROM pg_source TABLE 'public.t1'";
1886 let ddl2 = "CREATE TABLE t2 (v1 int) FROM pg_source TABLE 'mydb.t2'";
1887 assert_eq!(
1888 extract_external_table_name_from_definition(ddl1),
1889 Some("public.t1".into())
1890 );
1891 assert_eq!(
1892 extract_external_table_name_from_definition(ddl2),
1893 Some("mydb.t2".into())
1894 );
1895 }
1896}