1use std::collections::{BTreeSet, HashMap, HashSet};
16use std::sync::Arc;
17
18use anyhow::{Context, anyhow};
19use itertools::Itertools;
20use risingwave_common::bitmap::Bitmap;
21use risingwave_common::catalog::{
22 FragmentTypeFlag, FragmentTypeMask, ICEBERG_SINK_PREFIX, ICEBERG_SOURCE_PREFIX,
23};
24use risingwave_common::hash::{ActorMapping, VnodeBitmapExt, WorkerSlotId, WorkerSlotMapping};
25use risingwave_common::id::{JobId, SubscriptionId};
26use risingwave_common::system_param::AdaptiveParallelismStrategy;
27use risingwave_common::system_param::adaptive_parallelism_strategy::parse_strategy;
28use risingwave_common::types::{DataType, Datum};
29use risingwave_common::util::value_encoding::DatumToProtoExt;
30use risingwave_common::util::worker_util::DEFAULT_RESOURCE_GROUP;
31use risingwave_common::{bail, hash};
32use risingwave_meta_model::fragment::DistributionType;
33use risingwave_meta_model::object::ObjectType;
34use risingwave_meta_model::prelude::*;
35use risingwave_meta_model::streaming_job::BackfillOrders;
36use risingwave_meta_model::table::TableType;
37use risingwave_meta_model::user_privilege::Action;
38use risingwave_meta_model::{
39 ActorId, ColumnCatalogArray, CreateType, DataTypeArray, DatabaseId, DispatcherType, FragmentId,
40 JobStatus, ObjectId, PrivilegeId, SchemaId, SinkId, SourceId, StreamNode, StreamSourceInfo,
41 TableId, TableIdArray, UserId, WorkerId, connection, database, fragment, fragment_relation,
42 function, index, object, object_dependency, schema, secret, sink, source, streaming_job,
43 subscription, table, user, user_default_privilege, user_privilege, view,
44};
45use risingwave_meta_model_migration::WithQuery;
46use risingwave_pb::catalog::{
47 PbConnection, PbDatabase, PbFunction, PbIndex, PbSchema, PbSecret, PbSink, PbSource,
48 PbSubscription, PbTable, PbView,
49};
50use risingwave_pb::common::{PbObjectType, WorkerNode};
51use risingwave_pb::expr::{PbExprNode, expr_node};
52use risingwave_pb::meta::object::PbObjectInfo;
53use risingwave_pb::meta::subscribe_response::Info as NotificationInfo;
54use risingwave_pb::meta::{
55 ObjectDependency as PbObjectDependency, PbFragmentWorkerSlotMapping, PbObject, PbObjectGroup,
56};
57use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn;
58use risingwave_pb::plan_common::{ColumnCatalog, DefaultColumnDesc};
59use risingwave_pb::stream_plan::{PbDispatchOutputMapping, PbDispatcher, PbDispatcherType};
60use risingwave_pb::user::grant_privilege::{PbActionWithGrantOption, PbObject as PbGrantObject};
61use risingwave_pb::user::{PbAction, PbGrantPrivilege, PbUserInfo};
62use risingwave_sqlparser::ast::Statement as SqlStatement;
63use risingwave_sqlparser::parser::Parser;
64use sea_orm::sea_query::{
65 Alias, CommonTableExpression, Expr, Query, QueryStatementBuilder, SelectStatement, UnionType,
66 WithClause,
67};
68use sea_orm::{
69 ColumnTrait, ConnectionTrait, DatabaseTransaction, DerivePartialModel, EntityTrait,
70 FromQueryResult, IntoActiveModel, JoinType, Order, PaginatorTrait, QueryFilter, QuerySelect,
71 RelationTrait, Set, Statement,
72};
73use thiserror_ext::AsReport;
74use tracing::warn;
75
76use crate::barrier::{SharedActorInfos, SharedFragmentInfo};
77use crate::controller::ObjectModel;
78use crate::controller::fragment::FragmentTypeMaskExt;
79use crate::controller::scale::resolve_streaming_job_definition;
80use crate::model::{FragmentDownstreamRelation, StreamContext};
81use crate::{MetaError, MetaResult};
82
83pub fn construct_obj_dependency_query(obj_id: ObjectId) -> WithQuery {
108 let cte_alias = Alias::new("used_by_object_ids");
109 let cte_return_alias = Alias::new("used_by");
110
111 let mut base_query = SelectStatement::new()
112 .column(object_dependency::Column::UsedBy)
113 .from(ObjectDependency)
114 .and_where(object_dependency::Column::Oid.eq(obj_id))
115 .to_owned();
116
117 let belonged_obj_query = SelectStatement::new()
118 .column(object::Column::Oid)
119 .from(Object)
120 .and_where(
121 object::Column::DatabaseId
122 .eq(obj_id)
123 .or(object::Column::SchemaId.eq(obj_id)),
124 )
125 .to_owned();
126
127 let cte_referencing = Query::select()
128 .column((ObjectDependency, object_dependency::Column::UsedBy))
129 .from(ObjectDependency)
130 .inner_join(
131 cte_alias.clone(),
132 Expr::col((cte_alias.clone(), cte_return_alias.clone()))
133 .equals(object_dependency::Column::Oid),
134 )
135 .to_owned();
136
137 let mut common_table_expr = CommonTableExpression::new();
138 common_table_expr
139 .query(
140 base_query
141 .union(UnionType::All, belonged_obj_query)
142 .union(UnionType::All, cte_referencing)
143 .to_owned(),
144 )
145 .column(cte_return_alias.clone())
146 .table_name(cte_alias.clone());
147
148 SelectStatement::new()
149 .distinct()
150 .columns([
151 object::Column::Oid,
152 object::Column::ObjType,
153 object::Column::SchemaId,
154 object::Column::DatabaseId,
155 ])
156 .from(cte_alias.clone())
157 .inner_join(
158 Object,
159 Expr::col((cte_alias, cte_return_alias)).equals(object::Column::Oid),
160 )
161 .order_by(object::Column::Oid, Order::Desc)
162 .to_owned()
163 .with(
164 WithClause::new()
165 .recursive(true)
166 .cte(common_table_expr)
167 .to_owned(),
168 )
169}
170
171fn to_pb_object_type(obj_type: ObjectType) -> PbObjectType {
172 match obj_type {
173 ObjectType::Database => PbObjectType::Database,
174 ObjectType::Schema => PbObjectType::Schema,
175 ObjectType::Table => PbObjectType::Table,
176 ObjectType::Source => PbObjectType::Source,
177 ObjectType::Sink => PbObjectType::Sink,
178 ObjectType::View => PbObjectType::View,
179 ObjectType::Index => PbObjectType::Index,
180 ObjectType::Function => PbObjectType::Function,
181 ObjectType::Connection => PbObjectType::Connection,
182 ObjectType::Subscription => PbObjectType::Subscription,
183 ObjectType::Secret => PbObjectType::Secret,
184 }
185}
186
187async fn list_object_dependencies_impl(
189 txn: &DatabaseTransaction,
190 object_id: Option<ObjectId>,
191 include_creating: bool,
192) -> MetaResult<Vec<PbObjectDependency>> {
193 let referenced_alias = Alias::new("referenced_obj");
194 let mut query = ObjectDependency::find()
195 .select_only()
196 .columns([
197 object_dependency::Column::Oid,
198 object_dependency::Column::UsedBy,
199 ])
200 .column_as(
201 Expr::col((referenced_alias.clone(), object::Column::ObjType)),
202 "referenced_obj_type",
203 )
204 .join(
205 JoinType::InnerJoin,
206 object_dependency::Relation::Object1.def(),
207 )
208 .join_as(
209 JoinType::InnerJoin,
210 object_dependency::Relation::Object2.def(),
211 referenced_alias.clone(),
212 );
213 if let Some(object_id) = object_id {
214 query = query.filter(object_dependency::Column::UsedBy.eq(object_id));
215 }
216 let mut obj_dependencies: Vec<PbObjectDependency> = query
217 .into_tuple()
218 .all(txn)
219 .await?
220 .into_iter()
221 .map(|(oid, used_by, referenced_type)| PbObjectDependency {
222 object_id: used_by,
223 referenced_object_id: oid,
224 referenced_object_type: to_pb_object_type(referenced_type) as i32,
225 })
226 .collect();
227
228 let mut sink_query = Sink::find()
229 .select_only()
230 .columns([sink::Column::SinkId, sink::Column::TargetTable])
231 .filter(sink::Column::TargetTable.is_not_null());
232 if let Some(object_id) = object_id {
233 sink_query = sink_query.filter(
234 sink::Column::SinkId
235 .eq(object_id)
236 .or(sink::Column::TargetTable.eq(object_id)),
237 );
238 }
239 let sink_dependencies: Vec<(SinkId, TableId)> = sink_query.into_tuple().all(txn).await?;
240 obj_dependencies.extend(sink_dependencies.into_iter().map(|(sink_id, table_id)| {
241 PbObjectDependency {
242 object_id: table_id.into(),
243 referenced_object_id: sink_id.into(),
244 referenced_object_type: PbObjectType::Sink as i32,
245 }
246 }));
247
248 if !include_creating {
249 let mut streaming_job_ids = obj_dependencies
250 .iter()
251 .map(|dependency| dependency.object_id)
252 .collect_vec();
253 streaming_job_ids.sort_unstable();
254 streaming_job_ids.dedup();
255
256 if !streaming_job_ids.is_empty() {
257 let non_created_jobs: HashSet<JobId> = StreamingJob::find()
258 .select_only()
259 .columns([streaming_job::Column::JobId])
260 .filter(
261 streaming_job::Column::JobId
262 .is_in(streaming_job_ids)
263 .and(streaming_job::Column::JobStatus.ne(JobStatus::Created)),
264 )
265 .into_tuple()
266 .all(txn)
267 .await?
268 .into_iter()
269 .collect();
270
271 if !non_created_jobs.is_empty() {
272 obj_dependencies.retain(|dependency| {
273 !non_created_jobs.contains(&dependency.object_id.as_job_id())
274 });
275 }
276 }
277 }
278
279 Ok(obj_dependencies)
280}
281
282pub async fn list_object_dependencies(
284 txn: &DatabaseTransaction,
285 include_creating: bool,
286) -> MetaResult<Vec<PbObjectDependency>> {
287 list_object_dependencies_impl(txn, None, include_creating).await
288}
289
290pub async fn list_object_dependencies_by_object_id(
292 txn: &DatabaseTransaction,
293 object_id: ObjectId,
294) -> MetaResult<Vec<PbObjectDependency>> {
295 list_object_dependencies_impl(txn, Some(object_id), true).await
296}
297
298pub fn construct_sink_cycle_check_query(
323 target_table: ObjectId,
324 dependent_objects: Vec<ObjectId>,
325) -> WithQuery {
326 let cte_alias = Alias::new("used_by_object_ids_with_sink");
327 let depend_alias = Alias::new("obj_dependency_with_sink");
328
329 let mut base_query = SelectStatement::new()
330 .columns([
331 object_dependency::Column::Oid,
332 object_dependency::Column::UsedBy,
333 ])
334 .from(ObjectDependency)
335 .and_where(object_dependency::Column::Oid.eq(target_table))
336 .to_owned();
337
338 let query_sink_deps = SelectStatement::new()
339 .columns([sink::Column::SinkId, sink::Column::TargetTable])
340 .from(Sink)
341 .and_where(sink::Column::TargetTable.is_not_null())
342 .to_owned();
343
344 let cte_referencing = Query::select()
345 .column((depend_alias.clone(), object_dependency::Column::Oid))
346 .column((depend_alias.clone(), object_dependency::Column::UsedBy))
347 .from_subquery(
348 SelectStatement::new()
349 .columns([
350 object_dependency::Column::Oid,
351 object_dependency::Column::UsedBy,
352 ])
353 .from(ObjectDependency)
354 .union(UnionType::All, query_sink_deps)
355 .to_owned(),
356 depend_alias.clone(),
357 )
358 .inner_join(
359 cte_alias.clone(),
360 Expr::col((cte_alias.clone(), object_dependency::Column::UsedBy))
361 .eq(Expr::col((depend_alias, object_dependency::Column::Oid))),
362 )
363 .and_where(
364 Expr::col((cte_alias.clone(), object_dependency::Column::UsedBy)).ne(Expr::col((
365 cte_alias.clone(),
366 object_dependency::Column::Oid,
367 ))),
368 )
369 .to_owned();
370
371 let mut common_table_expr = CommonTableExpression::new();
372 common_table_expr
373 .query(base_query.union(UnionType::All, cte_referencing).to_owned())
374 .columns([
375 object_dependency::Column::Oid,
376 object_dependency::Column::UsedBy,
377 ])
378 .table_name(cte_alias.clone());
379
380 SelectStatement::new()
381 .expr(Expr::col((cte_alias.clone(), object_dependency::Column::UsedBy)).count())
382 .from(cte_alias.clone())
383 .and_where(
384 Expr::col((cte_alias, object_dependency::Column::UsedBy)).is_in(dependent_objects),
385 )
386 .to_owned()
387 .with(
388 WithClause::new()
389 .recursive(true)
390 .cte(common_table_expr)
391 .to_owned(),
392 )
393}
394
395#[derive(Clone, DerivePartialModel, FromQueryResult, Debug)]
396#[sea_orm(entity = "Object")]
397pub struct PartialObject {
398 pub oid: ObjectId,
399 pub obj_type: ObjectType,
400 pub schema_id: Option<SchemaId>,
401 pub database_id: Option<DatabaseId>,
402}
403
404#[derive(Clone, DerivePartialModel, FromQueryResult)]
405#[sea_orm(entity = "Fragment")]
406pub struct PartialFragmentStateTables {
407 pub fragment_id: FragmentId,
408 pub job_id: ObjectId,
409 pub state_table_ids: TableIdArray,
410}
411
412#[derive(Clone, Eq, PartialEq, Debug)]
413pub struct PartialActorLocation {
414 pub actor_id: ActorId,
415 pub fragment_id: FragmentId,
416 pub worker_id: WorkerId,
417}
418
419#[derive(FromQueryResult, Debug, Eq, PartialEq, Clone)]
420pub struct FragmentDesc {
421 pub fragment_id: FragmentId,
422 pub job_id: JobId,
423 pub fragment_type_mask: i32,
424 pub distribution_type: DistributionType,
425 pub state_table_ids: TableIdArray,
426 pub parallelism: i64,
427 pub vnode_count: i32,
428 pub stream_node: StreamNode,
429 pub parallelism_policy: String,
430}
431
432pub async fn get_referring_objects_cascade<C>(
434 obj_id: ObjectId,
435 db: &C,
436) -> MetaResult<Vec<PartialObject>>
437where
438 C: ConnectionTrait,
439{
440 let query = construct_obj_dependency_query(obj_id);
441 let (sql, values) = query.build_any(&*db.get_database_backend().get_query_builder());
442 let objects = PartialObject::find_by_statement(Statement::from_sql_and_values(
443 db.get_database_backend(),
444 sql,
445 values,
446 ))
447 .all(db)
448 .await?;
449 Ok(objects)
450}
451
452pub async fn check_sink_into_table_cycle<C>(
454 target_table: ObjectId,
455 dependent_objs: Vec<ObjectId>,
456 db: &C,
457) -> MetaResult<bool>
458where
459 C: ConnectionTrait,
460{
461 if dependent_objs.is_empty() {
462 return Ok(false);
463 }
464
465 if dependent_objs.contains(&target_table) {
467 return Ok(true);
468 }
469
470 let query = construct_sink_cycle_check_query(target_table, dependent_objs);
471 let (sql, values) = query.build_any(&*db.get_database_backend().get_query_builder());
472
473 let res = db
474 .query_one(Statement::from_sql_and_values(
475 db.get_database_backend(),
476 sql,
477 values,
478 ))
479 .await?
480 .unwrap();
481
482 let cnt: i64 = res.try_get_by(0)?;
483
484 Ok(cnt != 0)
485}
486
487pub async fn ensure_object_id<C>(
489 object_type: ObjectType,
490 obj_id: impl Into<ObjectId>,
491 db: &C,
492) -> MetaResult<()>
493where
494 C: ConnectionTrait,
495{
496 let obj_id = obj_id.into();
497 let count = Object::find_by_id(obj_id).count(db).await?;
498 if count == 0 {
499 return Err(MetaError::catalog_id_not_found(
500 object_type.as_str(),
501 obj_id,
502 ));
503 }
504 Ok(())
505}
506
507pub async fn ensure_job_not_canceled<C>(job_id: JobId, db: &C) -> MetaResult<()>
508where
509 C: ConnectionTrait,
510{
511 let count = Object::find_by_id(job_id).count(db).await?;
512 if count == 0 {
513 return Err(MetaError::cancelled(format!(
514 "job {} might be cancelled manually or by recovery",
515 job_id
516 )));
517 }
518 Ok(())
519}
520
521pub async fn ensure_user_id<C>(user_id: UserId, db: &C) -> MetaResult<()>
523where
524 C: ConnectionTrait,
525{
526 let count = User::find_by_id(user_id).count(db).await?;
527 if count == 0 {
528 return Err(anyhow!("user {} was concurrently dropped", user_id).into());
529 }
530 Ok(())
531}
532
533pub async fn check_database_name_duplicate<C>(name: &str, db: &C) -> MetaResult<()>
535where
536 C: ConnectionTrait,
537{
538 let count = Database::find()
539 .filter(database::Column::Name.eq(name))
540 .count(db)
541 .await?;
542 if count > 0 {
543 assert_eq!(count, 1);
544 return Err(MetaError::catalog_duplicated("database", name));
545 }
546 Ok(())
547}
548
549pub async fn check_function_signature_duplicate<C>(
551 pb_function: &PbFunction,
552 db: &C,
553) -> MetaResult<()>
554where
555 C: ConnectionTrait,
556{
557 let count = Function::find()
558 .inner_join(Object)
559 .filter(
560 object::Column::DatabaseId
561 .eq(pb_function.database_id)
562 .and(object::Column::SchemaId.eq(pb_function.schema_id))
563 .and(function::Column::Name.eq(&pb_function.name))
564 .and(
565 function::Column::ArgTypes
566 .eq(DataTypeArray::from(pb_function.arg_types.clone())),
567 ),
568 )
569 .count(db)
570 .await?;
571 if count > 0 {
572 assert_eq!(count, 1);
573 return Err(MetaError::catalog_duplicated("function", &pb_function.name));
574 }
575 Ok(())
576}
577
578pub async fn check_connection_name_duplicate<C>(
580 pb_connection: &PbConnection,
581 db: &C,
582) -> MetaResult<()>
583where
584 C: ConnectionTrait,
585{
586 let count = Connection::find()
587 .inner_join(Object)
588 .filter(
589 object::Column::DatabaseId
590 .eq(pb_connection.database_id)
591 .and(object::Column::SchemaId.eq(pb_connection.schema_id))
592 .and(connection::Column::Name.eq(&pb_connection.name)),
593 )
594 .count(db)
595 .await?;
596 if count > 0 {
597 assert_eq!(count, 1);
598 return Err(MetaError::catalog_duplicated(
599 "connection",
600 &pb_connection.name,
601 ));
602 }
603 Ok(())
604}
605
606pub async fn check_secret_name_duplicate<C>(pb_secret: &PbSecret, db: &C) -> MetaResult<()>
607where
608 C: ConnectionTrait,
609{
610 let count = Secret::find()
611 .inner_join(Object)
612 .filter(
613 object::Column::DatabaseId
614 .eq(pb_secret.database_id)
615 .and(object::Column::SchemaId.eq(pb_secret.schema_id))
616 .and(secret::Column::Name.eq(&pb_secret.name)),
617 )
618 .count(db)
619 .await?;
620 if count > 0 {
621 assert_eq!(count, 1);
622 return Err(MetaError::catalog_duplicated("secret", &pb_secret.name));
623 }
624 Ok(())
625}
626
627pub async fn check_subscription_name_duplicate<C>(
628 pb_subscription: &PbSubscription,
629 db: &C,
630) -> MetaResult<()>
631where
632 C: ConnectionTrait,
633{
634 let count = Subscription::find()
635 .inner_join(Object)
636 .filter(
637 object::Column::DatabaseId
638 .eq(pb_subscription.database_id)
639 .and(object::Column::SchemaId.eq(pb_subscription.schema_id))
640 .and(subscription::Column::Name.eq(&pb_subscription.name)),
641 )
642 .count(db)
643 .await?;
644 if count > 0 {
645 assert_eq!(count, 1);
646 return Err(MetaError::catalog_duplicated(
647 "subscription",
648 &pb_subscription.name,
649 ));
650 }
651 Ok(())
652}
653
654pub async fn check_user_name_duplicate<C>(name: &str, db: &C) -> MetaResult<()>
656where
657 C: ConnectionTrait,
658{
659 let count = User::find()
660 .filter(user::Column::Name.eq(name))
661 .count(db)
662 .await?;
663 if count > 0 {
664 assert_eq!(count, 1);
665 return Err(MetaError::catalog_duplicated("user", name));
666 }
667 Ok(())
668}
669
670pub async fn check_relation_name_duplicate<C>(
672 name: &str,
673 database_id: DatabaseId,
674 schema_id: SchemaId,
675 db: &C,
676) -> MetaResult<()>
677where
678 C: ConnectionTrait,
679{
680 macro_rules! check_duplicated {
681 ($obj_type:expr, $entity:ident, $table:ident) => {
682 let object_id = Object::find()
683 .select_only()
684 .column(object::Column::Oid)
685 .inner_join($entity)
686 .filter(
687 object::Column::DatabaseId
688 .eq(Some(database_id))
689 .and(object::Column::SchemaId.eq(Some(schema_id)))
690 .and($table::Column::Name.eq(name)),
691 )
692 .into_tuple::<ObjectId>()
693 .one(db)
694 .await?;
695 if let Some(oid) = object_id {
696 let check_creation = if $obj_type == ObjectType::View {
697 false
698 } else if $obj_type == ObjectType::Source {
699 let source_info = Source::find_by_id(oid.as_source_id())
700 .select_only()
701 .column(source::Column::SourceInfo)
702 .into_tuple::<Option<StreamSourceInfo>>()
703 .one(db)
704 .await?
705 .unwrap();
706 source_info.map_or(false, |info| info.to_protobuf().is_shared())
707 } else {
708 true
709 };
710 let job_id = oid.as_job_id();
711 return if check_creation
712 && !matches!(
713 StreamingJob::find_by_id(job_id)
714 .select_only()
715 .column(streaming_job::Column::JobStatus)
716 .into_tuple::<JobStatus>()
717 .one(db)
718 .await?,
719 Some(JobStatus::Created)
720 ) {
721 Err(MetaError::catalog_under_creation(
722 $obj_type.as_str(),
723 name,
724 job_id,
725 ))
726 } else {
727 Err(MetaError::catalog_duplicated($obj_type.as_str(), name))
728 };
729 }
730 };
731 }
732 check_duplicated!(ObjectType::Table, Table, table);
733 check_duplicated!(ObjectType::Source, Source, source);
734 check_duplicated!(ObjectType::Sink, Sink, sink);
735 check_duplicated!(ObjectType::Index, Index, index);
736 check_duplicated!(ObjectType::View, View, view);
737
738 Ok(())
739}
740
741pub async fn check_schema_name_duplicate<C>(
743 name: &str,
744 database_id: DatabaseId,
745 db: &C,
746) -> MetaResult<()>
747where
748 C: ConnectionTrait,
749{
750 let count = Object::find()
751 .inner_join(Schema)
752 .filter(
753 object::Column::ObjType
754 .eq(ObjectType::Schema)
755 .and(object::Column::DatabaseId.eq(Some(database_id)))
756 .and(schema::Column::Name.eq(name)),
757 )
758 .count(db)
759 .await?;
760 if count != 0 {
761 return Err(MetaError::catalog_duplicated("schema", name));
762 }
763
764 Ok(())
765}
766
767pub async fn check_object_refer_for_drop<C>(
770 object_type: ObjectType,
771 object_id: ObjectId,
772 db: &C,
773) -> MetaResult<()>
774where
775 C: ConnectionTrait,
776{
777 let count = if object_type == ObjectType::Table {
779 ObjectDependency::find()
780 .join(
781 JoinType::InnerJoin,
782 object_dependency::Relation::Object1.def(),
783 )
784 .filter(
785 object_dependency::Column::Oid
786 .eq(object_id)
787 .and(object::Column::ObjType.ne(ObjectType::Index)),
788 )
789 .count(db)
790 .await?
791 } else {
792 ObjectDependency::find()
793 .filter(object_dependency::Column::Oid.eq(object_id))
794 .count(db)
795 .await?
796 };
797 if count != 0 {
798 let referring_objects = get_referring_objects(object_id, db).await?;
800 let referring_objs_map = referring_objects
801 .into_iter()
802 .filter(|o| o.obj_type != ObjectType::Index)
803 .into_group_map_by(|o| o.obj_type);
804 let mut details = vec![];
805 for (obj_type, objs) in referring_objs_map {
806 match obj_type {
807 ObjectType::Table => {
808 let tables: Vec<(String, String)> = Object::find()
809 .join(JoinType::InnerJoin, object::Relation::Table.def())
810 .join(JoinType::InnerJoin, object::Relation::Database2.def())
811 .join(JoinType::InnerJoin, object::Relation::Schema2.def())
812 .select_only()
813 .column(schema::Column::Name)
814 .column(table::Column::Name)
815 .filter(object::Column::Oid.is_in(objs.iter().map(|o| o.oid)))
816 .into_tuple()
817 .all(db)
818 .await?;
819 details.extend(tables.into_iter().map(|(schema_name, table_name)| {
820 format!(
821 "materialized view {}.{} depends on it",
822 schema_name, table_name
823 )
824 }));
825 }
826 ObjectType::Sink => {
827 let sinks: Vec<(String, String)> = Object::find()
828 .join(JoinType::InnerJoin, object::Relation::Sink.def())
829 .join(JoinType::InnerJoin, object::Relation::Database2.def())
830 .join(JoinType::InnerJoin, object::Relation::Schema2.def())
831 .select_only()
832 .column(schema::Column::Name)
833 .column(sink::Column::Name)
834 .filter(object::Column::Oid.is_in(objs.iter().map(|o| o.oid)))
835 .into_tuple()
836 .all(db)
837 .await?;
838 if object_type == ObjectType::Table {
839 let engine = Table::find_by_id(object_id.as_table_id())
840 .select_only()
841 .column(table::Column::Engine)
842 .into_tuple::<table::Engine>()
843 .one(db)
844 .await?;
845 if engine == Some(table::Engine::Iceberg) && sinks.len() == 1 {
846 continue;
847 }
848 }
849 details.extend(sinks.into_iter().map(|(schema_name, sink_name)| {
850 format!("sink {}.{} depends on it", schema_name, sink_name)
851 }));
852 }
853 ObjectType::View => {
854 let views: Vec<(String, String)> = Object::find()
855 .join(JoinType::InnerJoin, object::Relation::View.def())
856 .join(JoinType::InnerJoin, object::Relation::Database2.def())
857 .join(JoinType::InnerJoin, object::Relation::Schema2.def())
858 .select_only()
859 .column(schema::Column::Name)
860 .column(view::Column::Name)
861 .filter(object::Column::Oid.is_in(objs.iter().map(|o| o.oid)))
862 .into_tuple()
863 .all(db)
864 .await?;
865 details.extend(views.into_iter().map(|(schema_name, view_name)| {
866 format!("view {}.{} depends on it", schema_name, view_name)
867 }));
868 }
869 ObjectType::Subscription => {
870 let subscriptions: Vec<(String, String)> = Object::find()
871 .join(JoinType::InnerJoin, object::Relation::Subscription.def())
872 .join(JoinType::InnerJoin, object::Relation::Database2.def())
873 .join(JoinType::InnerJoin, object::Relation::Schema2.def())
874 .select_only()
875 .column(schema::Column::Name)
876 .column(subscription::Column::Name)
877 .filter(object::Column::Oid.is_in(objs.iter().map(|o| o.oid)))
878 .into_tuple()
879 .all(db)
880 .await?;
881 details.extend(subscriptions.into_iter().map(
882 |(schema_name, subscription_name)| {
883 format!(
884 "subscription {}.{} depends on it",
885 schema_name, subscription_name
886 )
887 },
888 ));
889 }
890 ObjectType::Source => {
891 let sources: Vec<(String, String)> = Object::find()
892 .join(JoinType::InnerJoin, object::Relation::Source.def())
893 .join(JoinType::InnerJoin, object::Relation::Database2.def())
894 .join(JoinType::InnerJoin, object::Relation::Schema2.def())
895 .select_only()
896 .column(schema::Column::Name)
897 .column(source::Column::Name)
898 .filter(object::Column::Oid.is_in(objs.iter().map(|o| o.oid)))
899 .into_tuple()
900 .all(db)
901 .await?;
902 details.extend(sources.into_iter().map(|(schema_name, view_name)| {
903 format!("source {}.{} depends on it", schema_name, view_name)
904 }));
905 }
906 ObjectType::Connection => {
907 let connections: Vec<(String, String)> = Object::find()
908 .join(JoinType::InnerJoin, object::Relation::Connection.def())
909 .join(JoinType::InnerJoin, object::Relation::Database2.def())
910 .join(JoinType::InnerJoin, object::Relation::Schema2.def())
911 .select_only()
912 .column(schema::Column::Name)
913 .column(connection::Column::Name)
914 .filter(object::Column::Oid.is_in(objs.iter().map(|o| o.oid)))
915 .into_tuple()
916 .all(db)
917 .await?;
918 details.extend(connections.into_iter().map(|(schema_name, view_name)| {
919 format!("connection {}.{} depends on it", schema_name, view_name)
920 }));
921 }
922 _ => bail!("unexpected referring object type: {}", obj_type.as_str()),
924 }
925 }
926 if details.is_empty() {
927 return Ok(());
928 }
929
930 return Err(MetaError::permission_denied(format!(
931 "{} used by {} other objects. \nDETAIL: {}\n\
932 {}",
933 object_type.as_str(),
934 details.len(),
935 details.join("\n"),
936 match object_type {
937 ObjectType::Function | ObjectType::Connection | ObjectType::Secret =>
938 "HINT: DROP the dependent objects first.",
939 ObjectType::Database | ObjectType::Schema => unreachable!(),
940 _ => "HINT: Use DROP ... CASCADE to drop the dependent objects too.",
941 }
942 )));
943 }
944 Ok(())
945}
946
947pub async fn get_referring_objects<C>(object_id: ObjectId, db: &C) -> MetaResult<Vec<PartialObject>>
949where
950 C: ConnectionTrait,
951{
952 let objs = ObjectDependency::find()
953 .filter(object_dependency::Column::Oid.eq(object_id))
954 .join(
955 JoinType::InnerJoin,
956 object_dependency::Relation::Object1.def(),
957 )
958 .into_partial_model()
959 .all(db)
960 .await?;
961
962 Ok(objs)
963}
964
965pub async fn ensure_schema_empty<C>(schema_id: SchemaId, db: &C) -> MetaResult<()>
967where
968 C: ConnectionTrait,
969{
970 let count = Object::find()
971 .filter(object::Column::SchemaId.eq(Some(schema_id)))
972 .count(db)
973 .await?;
974 if count != 0 {
975 return Err(MetaError::permission_denied("schema is not empty"));
976 }
977
978 Ok(())
979}
980
981pub async fn list_user_info_by_ids<C>(
983 user_ids: impl IntoIterator<Item = UserId>,
984 db: &C,
985) -> MetaResult<Vec<PbUserInfo>>
986where
987 C: ConnectionTrait,
988{
989 let mut user_infos = vec![];
990 for user_id in user_ids {
991 let user = User::find_by_id(user_id)
992 .one(db)
993 .await?
994 .ok_or_else(|| MetaError::catalog_id_not_found("user", user_id))?;
995 let mut user_info: PbUserInfo = user.into();
996 user_info.grant_privileges = get_user_privilege(user_id, db).await?;
997 user_infos.push(user_info);
998 }
999 Ok(user_infos)
1000}
1001
1002pub async fn get_object_owner<C>(object_id: ObjectId, db: &C) -> MetaResult<UserId>
1004where
1005 C: ConnectionTrait,
1006{
1007 let obj_owner: UserId = Object::find_by_id(object_id)
1008 .select_only()
1009 .column(object::Column::OwnerId)
1010 .into_tuple()
1011 .one(db)
1012 .await?
1013 .ok_or_else(|| MetaError::catalog_id_not_found("object", object_id))?;
1014 Ok(obj_owner)
1015}
1016
1017pub fn construct_privilege_dependency_query(ids: Vec<PrivilegeId>) -> WithQuery {
1042 let cte_alias = Alias::new("granted_privilege_ids");
1043 let cte_return_privilege_alias = Alias::new("id");
1044 let cte_return_user_alias = Alias::new("user_id");
1045
1046 let mut base_query = SelectStatement::new()
1047 .columns([user_privilege::Column::Id, user_privilege::Column::UserId])
1048 .from(UserPrivilege)
1049 .and_where(user_privilege::Column::Id.is_in(ids))
1050 .to_owned();
1051
1052 let cte_referencing = Query::select()
1053 .columns([
1054 (UserPrivilege, user_privilege::Column::Id),
1055 (UserPrivilege, user_privilege::Column::UserId),
1056 ])
1057 .from(UserPrivilege)
1058 .inner_join(
1059 cte_alias.clone(),
1060 Expr::col((cte_alias.clone(), cte_return_privilege_alias.clone()))
1061 .equals(user_privilege::Column::DependentId),
1062 )
1063 .to_owned();
1064
1065 let mut common_table_expr = CommonTableExpression::new();
1066 common_table_expr
1067 .query(base_query.union(UnionType::All, cte_referencing).to_owned())
1068 .columns([
1069 cte_return_privilege_alias.clone(),
1070 cte_return_user_alias.clone(),
1071 ])
1072 .table_name(cte_alias.clone());
1073
1074 SelectStatement::new()
1075 .columns([cte_return_privilege_alias, cte_return_user_alias])
1076 .from(cte_alias)
1077 .to_owned()
1078 .with(
1079 WithClause::new()
1080 .recursive(true)
1081 .cte(common_table_expr)
1082 .to_owned(),
1083 )
1084}
1085
1086pub async fn get_internal_tables_by_id<C>(job_id: JobId, db: &C) -> MetaResult<Vec<TableId>>
1087where
1088 C: ConnectionTrait,
1089{
1090 let table_ids: Vec<TableId> = Table::find()
1091 .select_only()
1092 .column(table::Column::TableId)
1093 .filter(
1094 table::Column::TableType
1095 .eq(TableType::Internal)
1096 .and(table::Column::BelongsToJobId.eq(job_id)),
1097 )
1098 .into_tuple()
1099 .all(db)
1100 .await?;
1101 Ok(table_ids)
1102}
1103
1104pub async fn get_index_state_tables_by_table_id<C>(
1105 table_id: TableId,
1106 db: &C,
1107) -> MetaResult<Vec<TableId>>
1108where
1109 C: ConnectionTrait,
1110{
1111 let mut index_table_ids: Vec<TableId> = Index::find()
1112 .select_only()
1113 .column(index::Column::IndexTableId)
1114 .filter(index::Column::PrimaryTableId.eq(table_id))
1115 .into_tuple()
1116 .all(db)
1117 .await?;
1118
1119 if !index_table_ids.is_empty() {
1120 let internal_table_ids: Vec<TableId> = Table::find()
1121 .select_only()
1122 .column(table::Column::TableId)
1123 .filter(
1124 table::Column::TableType
1125 .eq(TableType::Internal)
1126 .and(table::Column::BelongsToJobId.is_in(index_table_ids.clone())),
1127 )
1128 .into_tuple()
1129 .all(db)
1130 .await?;
1131
1132 index_table_ids.extend(internal_table_ids.into_iter());
1133 }
1134
1135 Ok(index_table_ids)
1136}
1137
1138pub async fn get_iceberg_related_object_ids<C>(
1140 object_id: ObjectId,
1141 db: &C,
1142) -> MetaResult<Vec<ObjectId>>
1143where
1144 C: ConnectionTrait,
1145{
1146 let object = Object::find_by_id(object_id)
1147 .one(db)
1148 .await?
1149 .ok_or_else(|| MetaError::catalog_id_not_found("object", object_id))?;
1150 if object.obj_type != ObjectType::Table {
1151 return Ok(vec![]);
1152 }
1153
1154 let table = Table::find_by_id(object_id.as_table_id())
1155 .one(db)
1156 .await?
1157 .ok_or_else(|| MetaError::catalog_id_not_found("table", object_id))?;
1158 if !matches!(table.engine, Some(table::Engine::Iceberg)) {
1159 return Ok(vec![]);
1160 }
1161
1162 let database_id = object.database_id.unwrap();
1163 let schema_id = object.schema_id.unwrap();
1164
1165 let mut related_objects = vec![];
1166
1167 let iceberg_sink_name = format!("{}{}", ICEBERG_SINK_PREFIX, table.name);
1168 let iceberg_sink_id = Sink::find()
1169 .inner_join(Object)
1170 .select_only()
1171 .column(sink::Column::SinkId)
1172 .filter(
1173 object::Column::DatabaseId
1174 .eq(database_id)
1175 .and(object::Column::SchemaId.eq(schema_id))
1176 .and(sink::Column::Name.eq(&iceberg_sink_name)),
1177 )
1178 .into_tuple::<SinkId>()
1179 .one(db)
1180 .await?;
1181 if let Some(sink_id) = iceberg_sink_id {
1182 related_objects.push(sink_id.as_object_id());
1183 let sink_internal_tables = get_internal_tables_by_id(sink_id.as_job_id(), db).await?;
1184 related_objects.extend(
1185 sink_internal_tables
1186 .into_iter()
1187 .map(|tid| tid.as_object_id()),
1188 );
1189 } else {
1190 warn!(
1191 "iceberg table {} missing sink {}",
1192 table.name, iceberg_sink_name
1193 );
1194 }
1195
1196 let iceberg_source_name = format!("{}{}", ICEBERG_SOURCE_PREFIX, table.name);
1197 let iceberg_source_id = Source::find()
1198 .inner_join(Object)
1199 .select_only()
1200 .column(source::Column::SourceId)
1201 .filter(
1202 object::Column::DatabaseId
1203 .eq(database_id)
1204 .and(object::Column::SchemaId.eq(schema_id))
1205 .and(source::Column::Name.eq(&iceberg_source_name)),
1206 )
1207 .into_tuple::<SourceId>()
1208 .one(db)
1209 .await?;
1210 if let Some(source_id) = iceberg_source_id {
1211 related_objects.push(source_id.as_object_id());
1212 } else {
1213 warn!(
1214 "iceberg table {} missing source {}",
1215 table.name, iceberg_source_name
1216 );
1217 }
1218
1219 Ok(related_objects)
1220}
1221
1222pub(crate) async fn load_streaming_jobs_by_ids<C>(
1224 txn: &C,
1225 job_ids: impl IntoIterator<Item = JobId>,
1226) -> MetaResult<HashMap<JobId, streaming_job::Model>>
1227where
1228 C: ConnectionTrait,
1229{
1230 let job_ids: HashSet<JobId> = job_ids.into_iter().collect();
1231 if job_ids.is_empty() {
1232 return Ok(HashMap::new());
1233 }
1234 let jobs = streaming_job::Entity::find()
1235 .filter(streaming_job::Column::JobId.is_in(job_ids.clone()))
1236 .all(txn)
1237 .await?;
1238 Ok(jobs.into_iter().map(|job| (job.job_id, job)).collect())
1239}
1240
1241#[derive(Clone, DerivePartialModel, FromQueryResult)]
1242#[sea_orm(entity = "UserPrivilege")]
1243pub struct PartialUserPrivilege {
1244 pub id: PrivilegeId,
1245 pub user_id: UserId,
1246}
1247
1248pub async fn get_referring_privileges_cascade<C>(
1249 ids: Vec<PrivilegeId>,
1250 db: &C,
1251) -> MetaResult<Vec<PartialUserPrivilege>>
1252where
1253 C: ConnectionTrait,
1254{
1255 let query = construct_privilege_dependency_query(ids);
1256 let (sql, values) = query.build_any(&*db.get_database_backend().get_query_builder());
1257 let privileges = PartialUserPrivilege::find_by_statement(Statement::from_sql_and_values(
1258 db.get_database_backend(),
1259 sql,
1260 values,
1261 ))
1262 .all(db)
1263 .await?;
1264
1265 Ok(privileges)
1266}
1267
1268pub async fn ensure_privileges_not_referred<C>(ids: Vec<PrivilegeId>, db: &C) -> MetaResult<()>
1270where
1271 C: ConnectionTrait,
1272{
1273 let count = UserPrivilege::find()
1274 .filter(user_privilege::Column::DependentId.is_in(ids))
1275 .count(db)
1276 .await?;
1277 if count != 0 {
1278 return Err(MetaError::permission_denied(format!(
1279 "privileges granted to {} other ones.",
1280 count
1281 )));
1282 }
1283 Ok(())
1284}
1285
1286pub async fn get_user_privilege<C>(user_id: UserId, db: &C) -> MetaResult<Vec<PbGrantPrivilege>>
1288where
1289 C: ConnectionTrait,
1290{
1291 let user_privileges = UserPrivilege::find()
1292 .find_also_related(Object)
1293 .filter(user_privilege::Column::UserId.eq(user_id))
1294 .all(db)
1295 .await?;
1296 Ok(user_privileges
1297 .into_iter()
1298 .map(|(privilege, object)| {
1299 let object = object.unwrap();
1300 let obj = match object.obj_type {
1301 ObjectType::Database => PbGrantObject::DatabaseId(object.oid.as_database_id()),
1302 ObjectType::Schema => PbGrantObject::SchemaId(object.oid.as_schema_id()),
1303 ObjectType::Table | ObjectType::Index => {
1304 PbGrantObject::TableId(object.oid.as_table_id())
1305 }
1306 ObjectType::Source => PbGrantObject::SourceId(object.oid.as_source_id()),
1307 ObjectType::Sink => PbGrantObject::SinkId(object.oid.as_sink_id()),
1308 ObjectType::View => PbGrantObject::ViewId(object.oid.as_view_id()),
1309 ObjectType::Function => PbGrantObject::FunctionId(object.oid.as_function_id()),
1310 ObjectType::Connection => {
1311 PbGrantObject::ConnectionId(object.oid.as_connection_id())
1312 }
1313 ObjectType::Subscription => {
1314 PbGrantObject::SubscriptionId(object.oid.as_subscription_id())
1315 }
1316 ObjectType::Secret => PbGrantObject::SecretId(object.oid.as_secret_id()),
1317 };
1318 PbGrantPrivilege {
1319 action_with_opts: vec![PbActionWithGrantOption {
1320 action: PbAction::from(privilege.action) as _,
1321 with_grant_option: privilege.with_grant_option,
1322 granted_by: privilege.granted_by as _,
1323 }],
1324 object: Some(obj),
1325 }
1326 })
1327 .collect())
1328}
1329
1330pub async fn get_table_columns(
1331 txn: &impl ConnectionTrait,
1332 id: TableId,
1333) -> MetaResult<ColumnCatalogArray> {
1334 let columns = Table::find_by_id(id)
1335 .select_only()
1336 .columns([table::Column::Columns])
1337 .into_tuple::<ColumnCatalogArray>()
1338 .one(txn)
1339 .await?
1340 .ok_or_else(|| MetaError::catalog_id_not_found("table", id))?;
1341 Ok(columns)
1342}
1343
1344pub async fn grant_default_privileges_automatically<C>(
1347 db: &C,
1348 object_id: impl Into<ObjectId>,
1349) -> MetaResult<Vec<PbUserInfo>>
1350where
1351 C: ConnectionTrait,
1352{
1353 let object_id = object_id.into();
1354 let object = Object::find_by_id(object_id)
1355 .one(db)
1356 .await?
1357 .ok_or_else(|| MetaError::catalog_id_not_found("object", object_id))?;
1358 assert_ne!(object.obj_type, ObjectType::Database);
1359
1360 let for_mview_filter = if object.obj_type == ObjectType::Table {
1361 let table_type = Table::find_by_id(object_id.as_table_id())
1362 .select_only()
1363 .column(table::Column::TableType)
1364 .into_tuple::<TableType>()
1365 .one(db)
1366 .await?
1367 .ok_or_else(|| MetaError::catalog_id_not_found("table", object_id))?;
1368 user_default_privilege::Column::ForMaterializedView
1369 .eq(table_type == TableType::MaterializedView)
1370 } else {
1371 user_default_privilege::Column::ForMaterializedView.eq(false)
1372 };
1373 let schema_filter = if let Some(schema_id) = &object.schema_id {
1374 user_default_privilege::Column::SchemaId.eq(*schema_id)
1375 } else {
1376 user_default_privilege::Column::SchemaId.is_null()
1377 };
1378
1379 let default_privileges: Vec<(UserId, UserId, Action, bool)> = UserDefaultPrivilege::find()
1380 .select_only()
1381 .columns([
1382 user_default_privilege::Column::Grantee,
1383 user_default_privilege::Column::GrantedBy,
1384 user_default_privilege::Column::Action,
1385 user_default_privilege::Column::WithGrantOption,
1386 ])
1387 .filter(
1388 user_default_privilege::Column::DatabaseId
1389 .eq(object.database_id.unwrap())
1390 .and(schema_filter)
1391 .and(user_default_privilege::Column::UserId.eq(object.owner_id))
1392 .and(user_default_privilege::Column::ObjectType.eq(object.obj_type))
1393 .and(for_mview_filter),
1394 )
1395 .into_tuple()
1396 .all(db)
1397 .await?;
1398 if default_privileges.is_empty() {
1399 return Ok(vec![]);
1400 }
1401
1402 let updated_user_ids = default_privileges
1403 .iter()
1404 .map(|(grantee, _, _, _)| *grantee)
1405 .collect::<HashSet<_>>();
1406
1407 for (grantee, granted_by, action, with_grant_option) in default_privileges {
1408 UserPrivilege::insert(user_privilege::ActiveModel {
1409 user_id: Set(grantee),
1410 oid: Set(object_id),
1411 granted_by: Set(granted_by),
1412 action: Set(action),
1413 with_grant_option: Set(with_grant_option),
1414 ..Default::default()
1415 })
1416 .exec(db)
1417 .await?;
1418 if action == Action::Select {
1419 let internal_table_ids = get_internal_tables_by_id(object_id.as_job_id(), db).await?;
1421 if !internal_table_ids.is_empty() {
1422 for internal_table_id in &internal_table_ids {
1423 UserPrivilege::insert(user_privilege::ActiveModel {
1424 user_id: Set(grantee),
1425 oid: Set(internal_table_id.as_object_id()),
1426 granted_by: Set(granted_by),
1427 action: Set(Action::Select),
1428 with_grant_option: Set(with_grant_option),
1429 ..Default::default()
1430 })
1431 .exec(db)
1432 .await?;
1433 }
1434 }
1435
1436 let iceberg_privilege_object_ids =
1438 get_iceberg_related_object_ids(object_id, db).await?;
1439 if !iceberg_privilege_object_ids.is_empty() {
1440 for iceberg_object_id in &iceberg_privilege_object_ids {
1441 UserPrivilege::insert(user_privilege::ActiveModel {
1442 user_id: Set(grantee),
1443 oid: Set(*iceberg_object_id),
1444 granted_by: Set(granted_by),
1445 action: Set(action),
1446 with_grant_option: Set(with_grant_option),
1447 ..Default::default()
1448 })
1449 .exec(db)
1450 .await?;
1451 }
1452 }
1453 }
1454 }
1455
1456 let updated_user_infos = list_user_info_by_ids(updated_user_ids, db).await?;
1457 Ok(updated_user_infos)
1458}
1459
1460pub fn extract_grant_obj_id(object: &PbGrantObject) -> ObjectId {
1462 match object {
1463 PbGrantObject::DatabaseId(id) => (*id).into(),
1464 PbGrantObject::SchemaId(id) => (*id).into(),
1465 PbGrantObject::TableId(id) => (*id).into(),
1466 PbGrantObject::SourceId(id) => (*id).into(),
1467 PbGrantObject::SinkId(id) => (*id).into(),
1468 PbGrantObject::ViewId(id) => (*id).into(),
1469 PbGrantObject::FunctionId(id) => (*id).into(),
1470 PbGrantObject::SubscriptionId(id) => (*id).into(),
1471 PbGrantObject::ConnectionId(id) => (*id).into(),
1472 PbGrantObject::SecretId(id) => (*id).into(),
1473 }
1474}
1475
1476pub async fn insert_fragment_relations(
1477 db: &impl ConnectionTrait,
1478 downstream_fragment_relations: &FragmentDownstreamRelation,
1479) -> MetaResult<()> {
1480 let mut relations = vec![];
1481 for (upstream_fragment_id, downstreams) in downstream_fragment_relations {
1482 for downstream in downstreams {
1483 relations.push(
1484 fragment_relation::Model {
1485 source_fragment_id: *upstream_fragment_id as _,
1486 target_fragment_id: downstream.downstream_fragment_id as _,
1487 dispatcher_type: downstream.dispatcher_type,
1488 dist_key_indices: downstream
1489 .dist_key_indices
1490 .iter()
1491 .map(|idx| *idx as i32)
1492 .collect_vec()
1493 .into(),
1494 output_indices: downstream
1495 .output_mapping
1496 .indices
1497 .iter()
1498 .map(|idx| *idx as i32)
1499 .collect_vec()
1500 .into(),
1501 output_type_mapping: Some(downstream.output_mapping.types.clone().into()),
1502 }
1503 .into_active_model(),
1504 );
1505 }
1506 }
1507 if !relations.is_empty() {
1508 FragmentRelation::insert_many(relations).exec(db).await?;
1509 }
1510 Ok(())
1511}
1512
1513pub fn compose_dispatchers(
1514 source_fragment_distribution: DistributionType,
1515 source_fragment_actors: &HashMap<crate::model::ActorId, Option<Bitmap>>,
1516 target_fragment_id: crate::model::FragmentId,
1517 target_fragment_distribution: DistributionType,
1518 target_fragment_actors: &HashMap<crate::model::ActorId, Option<Bitmap>>,
1519 dispatcher_type: DispatcherType,
1520 dist_key_indices: Vec<u32>,
1521 output_mapping: PbDispatchOutputMapping,
1522) -> HashMap<crate::model::ActorId, PbDispatcher> {
1523 match dispatcher_type {
1524 DispatcherType::Hash => {
1525 let dispatcher = PbDispatcher {
1526 r#type: PbDispatcherType::from(dispatcher_type) as _,
1527 dist_key_indices,
1528 output_mapping: output_mapping.into(),
1529 hash_mapping: Some(
1530 ActorMapping::from_bitmaps(
1531 &target_fragment_actors
1532 .iter()
1533 .map(|(actor_id, bitmap)| {
1534 (
1535 *actor_id as _,
1536 bitmap
1537 .clone()
1538 .expect("downstream hash dispatch must have distribution"),
1539 )
1540 })
1541 .collect(),
1542 )
1543 .to_protobuf(),
1544 ),
1545 dispatcher_id: target_fragment_id,
1546 downstream_actor_id: target_fragment_actors.keys().copied().collect(),
1547 };
1548 source_fragment_actors
1549 .keys()
1550 .map(|source_actor_id| (*source_actor_id, dispatcher.clone()))
1551 .collect()
1552 }
1553 DispatcherType::Broadcast | DispatcherType::Simple => {
1554 let dispatcher = PbDispatcher {
1555 r#type: PbDispatcherType::from(dispatcher_type) as _,
1556 dist_key_indices,
1557 output_mapping: output_mapping.into(),
1558 hash_mapping: None,
1559 dispatcher_id: target_fragment_id,
1560 downstream_actor_id: target_fragment_actors.keys().copied().collect(),
1561 };
1562 source_fragment_actors
1563 .keys()
1564 .map(|source_actor_id| (*source_actor_id, dispatcher.clone()))
1565 .collect()
1566 }
1567 DispatcherType::NoShuffle => resolve_no_shuffle_actor_dispatcher(
1568 source_fragment_distribution,
1569 source_fragment_actors,
1570 target_fragment_distribution,
1571 target_fragment_actors,
1572 )
1573 .into_iter()
1574 .map(|(upstream_actor_id, downstream_actor_id)| {
1575 (
1576 upstream_actor_id,
1577 PbDispatcher {
1578 r#type: PbDispatcherType::NoShuffle as _,
1579 dist_key_indices: dist_key_indices.clone(),
1580 output_mapping: output_mapping.clone().into(),
1581 hash_mapping: None,
1582 dispatcher_id: target_fragment_id,
1583 downstream_actor_id: vec![downstream_actor_id],
1584 },
1585 )
1586 })
1587 .collect(),
1588 }
1589}
1590
1591pub fn resolve_no_shuffle_actor_dispatcher(
1593 source_fragment_distribution: DistributionType,
1594 source_fragment_actors: &HashMap<crate::model::ActorId, Option<Bitmap>>,
1595 target_fragment_distribution: DistributionType,
1596 target_fragment_actors: &HashMap<crate::model::ActorId, Option<Bitmap>>,
1597) -> Vec<(crate::model::ActorId, crate::model::ActorId)> {
1598 assert_eq!(source_fragment_distribution, target_fragment_distribution);
1599 assert_eq!(
1600 source_fragment_actors.len(),
1601 target_fragment_actors.len(),
1602 "no-shuffle should have equal upstream downstream actor count: {:?} {:?}",
1603 source_fragment_actors,
1604 target_fragment_actors
1605 );
1606 match source_fragment_distribution {
1607 DistributionType::Single => {
1608 let assert_singleton = |bitmap: &Option<Bitmap>| {
1609 assert!(
1610 bitmap.as_ref().map(|bitmap| bitmap.all()).unwrap_or(true),
1611 "not singleton: {:?}",
1612 bitmap
1613 );
1614 };
1615 assert_eq!(
1616 source_fragment_actors.len(),
1617 1,
1618 "singleton distribution actor count not 1: {:?}",
1619 source_fragment_distribution
1620 );
1621 assert_eq!(
1622 target_fragment_actors.len(),
1623 1,
1624 "singleton distribution actor count not 1: {:?}",
1625 target_fragment_distribution
1626 );
1627 let (source_actor_id, bitmap) = source_fragment_actors.iter().next().unwrap();
1628 assert_singleton(bitmap);
1629 let (target_actor_id, bitmap) = target_fragment_actors.iter().next().unwrap();
1630 assert_singleton(bitmap);
1631 vec![(*source_actor_id, *target_actor_id)]
1632 }
1633 DistributionType::Hash => {
1634 let mut target_fragment_actor_index: HashMap<_, _> = target_fragment_actors
1635 .iter()
1636 .map(|(actor_id, bitmap)| {
1637 let bitmap = bitmap
1638 .as_ref()
1639 .expect("hash distribution should have bitmap");
1640 let first_vnode = bitmap.iter_vnodes().next().expect("non-empty bitmap");
1641 (first_vnode, (*actor_id, bitmap))
1642 })
1643 .collect();
1644 source_fragment_actors
1645 .iter()
1646 .map(|(source_actor_id, bitmap)| {
1647 let bitmap = bitmap
1648 .as_ref()
1649 .expect("hash distribution should have bitmap");
1650 let first_vnode = bitmap.iter_vnodes().next().expect("non-empty bitmap");
1651 let (target_actor_id, target_bitmap) =
1652 target_fragment_actor_index.remove(&first_vnode).unwrap_or_else(|| {
1653 panic!(
1654 "cannot find matched target actor: {} {:?} {:?} {:?}",
1655 source_actor_id,
1656 first_vnode,
1657 source_fragment_actors,
1658 target_fragment_actors
1659 );
1660 });
1661 assert_eq!(
1662 bitmap,
1663 target_bitmap,
1664 "cannot find matched target actor due to bitmap mismatch: {} {:?} {:?} {:?}",
1665 source_actor_id,
1666 first_vnode,
1667 source_fragment_actors,
1668 target_fragment_actors
1669 );
1670 (*source_actor_id, target_actor_id)
1671 }).collect()
1672 }
1673 }
1674}
1675
1676pub fn rebuild_fragment_mapping(fragment: &SharedFragmentInfo) -> PbFragmentWorkerSlotMapping {
1677 let fragment_worker_slot_mapping = match fragment.distribution_type {
1678 DistributionType::Single => {
1679 let actor = fragment.actors.values().exactly_one().unwrap();
1680 WorkerSlotMapping::new_single(WorkerSlotId::new(actor.worker_id as _, 0))
1681 }
1682 DistributionType::Hash => {
1683 let actor_bitmaps: HashMap<_, _> = fragment
1684 .actors
1685 .iter()
1686 .map(|(actor_id, actor_info)| {
1687 let vnode_bitmap = actor_info
1688 .vnode_bitmap
1689 .as_ref()
1690 .cloned()
1691 .expect("actor bitmap shouldn't be none in hash fragment");
1692
1693 (*actor_id as hash::ActorId, vnode_bitmap)
1694 })
1695 .collect();
1696
1697 let actor_mapping = ActorMapping::from_bitmaps(&actor_bitmaps);
1698
1699 let actor_locations = fragment
1700 .actors
1701 .iter()
1702 .map(|(actor_id, actor_info)| (*actor_id as hash::ActorId, actor_info.worker_id))
1703 .collect();
1704
1705 actor_mapping.to_worker_slot(&actor_locations)
1706 }
1707 };
1708
1709 PbFragmentWorkerSlotMapping {
1710 fragment_id: fragment.fragment_id,
1711 mapping: Some(fragment_worker_slot_mapping.to_protobuf()),
1712 }
1713}
1714
1715pub async fn get_fragments_for_jobs<C>(
1721 db: &C,
1722 actor_info: &SharedActorInfos,
1723 streaming_jobs: Vec<JobId>,
1724) -> MetaResult<(
1725 HashMap<SourceId, BTreeSet<FragmentId>>,
1726 HashSet<FragmentId>,
1727 HashSet<ActorId>,
1728 HashSet<FragmentId>,
1729)>
1730where
1731 C: ConnectionTrait,
1732{
1733 if streaming_jobs.is_empty() {
1734 return Ok((
1735 HashMap::default(),
1736 HashSet::default(),
1737 HashSet::default(),
1738 HashSet::default(),
1739 ));
1740 }
1741
1742 let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find()
1743 .select_only()
1744 .columns([
1745 fragment::Column::FragmentId,
1746 fragment::Column::FragmentTypeMask,
1747 fragment::Column::StreamNode,
1748 ])
1749 .filter(fragment::Column::JobId.is_in(streaming_jobs))
1750 .into_tuple()
1751 .all(db)
1752 .await?;
1753
1754 let fragment_ids: HashSet<_> = fragments
1755 .iter()
1756 .map(|(fragment_id, _, _)| *fragment_id)
1757 .collect();
1758
1759 let actors = {
1760 let guard = actor_info.read_guard();
1761 fragment_ids
1762 .iter()
1763 .flat_map(|id| guard.get_fragment(*id as _))
1764 .flat_map(|f| f.actors.keys().cloned().map(|id| id as _))
1765 .collect::<HashSet<_>>()
1766 };
1767
1768 let mut source_fragment_ids: HashMap<SourceId, BTreeSet<FragmentId>> = HashMap::new();
1769 let mut sink_fragment_ids: HashSet<FragmentId> = HashSet::new();
1770 for (fragment_id, mask, stream_node) in fragments {
1771 if FragmentTypeMask::from(mask).contains(FragmentTypeFlag::Source)
1772 && let Some(source_id) = stream_node.to_protobuf().find_stream_source()
1773 {
1774 source_fragment_ids
1775 .entry(source_id)
1776 .or_default()
1777 .insert(fragment_id);
1778 }
1779 if FragmentTypeMask::from(mask).contains(FragmentTypeFlag::Sink) {
1780 sink_fragment_ids.insert(fragment_id);
1781 }
1782 }
1783
1784 Ok((
1785 source_fragment_ids,
1786 sink_fragment_ids,
1787 actors.into_iter().collect(),
1788 fragment_ids,
1789 ))
1790}
1791
1792pub(crate) fn build_object_group_for_delete(
1797 partial_objects: Vec<PartialObject>,
1798) -> NotificationInfo {
1799 let mut objects = vec![];
1800 for obj in partial_objects {
1801 match obj.obj_type {
1802 ObjectType::Database => objects.push(PbObject {
1803 object_info: Some(PbObjectInfo::Database(PbDatabase {
1804 id: obj.oid.as_database_id(),
1805 ..Default::default()
1806 })),
1807 }),
1808 ObjectType::Schema => objects.push(PbObject {
1809 object_info: Some(PbObjectInfo::Schema(PbSchema {
1810 id: obj.oid.as_schema_id(),
1811 database_id: obj.database_id.unwrap(),
1812 ..Default::default()
1813 })),
1814 }),
1815 ObjectType::Table => objects.push(PbObject {
1816 object_info: Some(PbObjectInfo::Table(PbTable {
1817 id: obj.oid.as_table_id(),
1818 schema_id: obj.schema_id.unwrap(),
1819 database_id: obj.database_id.unwrap(),
1820 ..Default::default()
1821 })),
1822 }),
1823 ObjectType::Source => objects.push(PbObject {
1824 object_info: Some(PbObjectInfo::Source(PbSource {
1825 id: obj.oid.as_source_id(),
1826 schema_id: obj.schema_id.unwrap(),
1827 database_id: obj.database_id.unwrap(),
1828 ..Default::default()
1829 })),
1830 }),
1831 ObjectType::Sink => objects.push(PbObject {
1832 object_info: Some(PbObjectInfo::Sink(PbSink {
1833 id: obj.oid.as_sink_id(),
1834 schema_id: obj.schema_id.unwrap(),
1835 database_id: obj.database_id.unwrap(),
1836 ..Default::default()
1837 })),
1838 }),
1839 ObjectType::Subscription => objects.push(PbObject {
1840 object_info: Some(PbObjectInfo::Subscription(PbSubscription {
1841 id: obj.oid.as_subscription_id(),
1842 schema_id: obj.schema_id.unwrap(),
1843 database_id: obj.database_id.unwrap(),
1844 ..Default::default()
1845 })),
1846 }),
1847 ObjectType::View => objects.push(PbObject {
1848 object_info: Some(PbObjectInfo::View(PbView {
1849 id: obj.oid.as_view_id(),
1850 schema_id: obj.schema_id.unwrap(),
1851 database_id: obj.database_id.unwrap(),
1852 ..Default::default()
1853 })),
1854 }),
1855 ObjectType::Index => {
1856 objects.push(PbObject {
1857 object_info: Some(PbObjectInfo::Index(PbIndex {
1858 id: obj.oid.as_index_id(),
1859 schema_id: obj.schema_id.unwrap(),
1860 database_id: obj.database_id.unwrap(),
1861 ..Default::default()
1862 })),
1863 });
1864 objects.push(PbObject {
1865 object_info: Some(PbObjectInfo::Table(PbTable {
1866 id: obj.oid.as_table_id(),
1867 schema_id: obj.schema_id.unwrap(),
1868 database_id: obj.database_id.unwrap(),
1869 ..Default::default()
1870 })),
1871 });
1872 }
1873 ObjectType::Function => objects.push(PbObject {
1874 object_info: Some(PbObjectInfo::Function(PbFunction {
1875 id: obj.oid.as_function_id(),
1876 schema_id: obj.schema_id.unwrap(),
1877 database_id: obj.database_id.unwrap(),
1878 ..Default::default()
1879 })),
1880 }),
1881 ObjectType::Connection => objects.push(PbObject {
1882 object_info: Some(PbObjectInfo::Connection(PbConnection {
1883 id: obj.oid.as_connection_id(),
1884 schema_id: obj.schema_id.unwrap(),
1885 database_id: obj.database_id.unwrap(),
1886 ..Default::default()
1887 })),
1888 }),
1889 ObjectType::Secret => objects.push(PbObject {
1890 object_info: Some(PbObjectInfo::Secret(PbSecret {
1891 id: obj.oid.as_secret_id(),
1892 schema_id: obj.schema_id.unwrap(),
1893 database_id: obj.database_id.unwrap(),
1894 ..Default::default()
1895 })),
1896 }),
1897 }
1898 }
1899 NotificationInfo::ObjectGroup(PbObjectGroup {
1900 objects,
1901 dependencies: vec![],
1902 })
1903}
1904
1905pub fn extract_external_table_name_from_definition(table_definition: &str) -> Option<String> {
1906 let [mut definition]: [_; 1] = Parser::parse_sql(table_definition)
1907 .context("unable to parse table definition")
1908 .inspect_err(|e| {
1909 tracing::error!(
1910 target: "auto_schema_change",
1911 error = %e.as_report(),
1912 "failed to parse table definition")
1913 })
1914 .unwrap()
1915 .try_into()
1916 .unwrap();
1917 if let SqlStatement::CreateTable { cdc_table_info, .. } = &mut definition {
1918 cdc_table_info
1919 .clone()
1920 .map(|cdc_table_info| cdc_table_info.external_table_name)
1921 } else {
1922 None
1923 }
1924}
1925
1926pub async fn rename_relation(
1929 txn: &DatabaseTransaction,
1930 object_type: ObjectType,
1931 object_id: ObjectId,
1932 object_name: &str,
1933) -> MetaResult<(Vec<PbObject>, String)> {
1934 use sea_orm::ActiveModelTrait;
1935
1936 use crate::controller::rename::alter_relation_rename;
1937
1938 let mut to_update_relations = vec![];
1939 macro_rules! rename_relation {
1941 ($entity:ident, $table:ident, $identity:ident, $object_id:expr) => {{
1942 let (mut relation, obj) = $entity::find_by_id($object_id)
1943 .find_also_related(Object)
1944 .one(txn)
1945 .await?
1946 .unwrap();
1947 let obj = obj.unwrap();
1948 let old_name = relation.name.clone();
1949 relation.name = object_name.into();
1950 if obj.obj_type != ObjectType::View {
1951 relation.definition = alter_relation_rename(&relation.definition, object_name);
1952 }
1953 let active_model = $table::ActiveModel {
1954 $identity: Set(relation.$identity),
1955 name: Set(object_name.into()),
1956 definition: Set(relation.definition.clone()),
1957 ..Default::default()
1958 };
1959 active_model.update(txn).await?;
1960 let streaming_job = streaming_job::Entity::find_by_id($object_id.as_raw_id())
1961 .one(txn)
1962 .await?;
1963 to_update_relations.push(PbObject {
1964 object_info: Some(PbObjectInfo::$entity(
1965 ObjectModel(relation, obj, streaming_job).into(),
1966 )),
1967 });
1968 old_name
1969 }};
1970 }
1971 let old_name = match object_type {
1973 ObjectType::Table => {
1974 let associated_source_id: Option<SourceId> = Source::find()
1975 .select_only()
1976 .column(source::Column::SourceId)
1977 .filter(source::Column::OptionalAssociatedTableId.eq(object_id))
1978 .into_tuple()
1979 .one(txn)
1980 .await?;
1981 if let Some(source_id) = associated_source_id {
1982 rename_relation!(Source, source, source_id, source_id);
1983 }
1984 rename_relation!(Table, table, table_id, object_id.as_table_id())
1985 }
1986 ObjectType::Source => {
1987 rename_relation!(Source, source, source_id, object_id.as_source_id())
1988 }
1989 ObjectType::Sink => rename_relation!(Sink, sink, sink_id, object_id.as_sink_id()),
1990 ObjectType::Subscription => {
1991 rename_relation!(
1992 Subscription,
1993 subscription,
1994 subscription_id,
1995 object_id.as_subscription_id()
1996 )
1997 }
1998 ObjectType::View => rename_relation!(View, view, view_id, object_id.as_view_id()),
1999 ObjectType::Index => {
2000 let (mut index, obj) = Index::find_by_id(object_id.as_index_id())
2001 .find_also_related(Object)
2002 .one(txn)
2003 .await?
2004 .unwrap();
2005 let streaming_job = streaming_job::Entity::find_by_id(index.index_id.as_job_id())
2006 .one(txn)
2007 .await?;
2008 index.name = object_name.into();
2009 let index_table_id = index.index_table_id;
2010 let old_name = rename_relation!(Table, table, table_id, index_table_id);
2011
2012 let active_model = index::ActiveModel {
2014 index_id: sea_orm::ActiveValue::Set(index.index_id),
2015 name: sea_orm::ActiveValue::Set(object_name.into()),
2016 ..Default::default()
2017 };
2018 active_model.update(txn).await?;
2019 to_update_relations.push(PbObject {
2020 object_info: Some(PbObjectInfo::Index(
2021 ObjectModel(index, obj.unwrap(), streaming_job).into(),
2022 )),
2023 });
2024 old_name
2025 }
2026 _ => unreachable!("only relation name can be altered."),
2027 };
2028
2029 Ok((to_update_relations, old_name))
2030}
2031
2032pub async fn get_database_resource_group<C>(txn: &C, database_id: DatabaseId) -> MetaResult<String>
2033where
2034 C: ConnectionTrait,
2035{
2036 let database_resource_group: Option<String> = Database::find_by_id(database_id)
2037 .select_only()
2038 .column(database::Column::ResourceGroup)
2039 .into_tuple()
2040 .one(txn)
2041 .await?
2042 .ok_or_else(|| MetaError::catalog_id_not_found("database", database_id))?;
2043
2044 Ok(database_resource_group.unwrap_or_else(|| DEFAULT_RESOURCE_GROUP.to_owned()))
2045}
2046
2047pub async fn get_existing_job_resource_group<C>(
2048 txn: &C,
2049 streaming_job_id: JobId,
2050) -> MetaResult<String>
2051where
2052 C: ConnectionTrait,
2053{
2054 let (job_specific_resource_group, database_resource_group): (Option<String>, Option<String>) =
2055 StreamingJob::find_by_id(streaming_job_id)
2056 .select_only()
2057 .join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
2058 .join(JoinType::InnerJoin, object::Relation::Database2.def())
2059 .column(streaming_job::Column::SpecificResourceGroup)
2060 .column(database::Column::ResourceGroup)
2061 .into_tuple()
2062 .one(txn)
2063 .await?
2064 .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", streaming_job_id))?;
2065
2066 Ok(job_specific_resource_group.unwrap_or_else(|| {
2067 database_resource_group.unwrap_or_else(|| DEFAULT_RESOURCE_GROUP.to_owned())
2068 }))
2069}
2070
2071pub fn filter_workers_by_resource_group(
2072 workers: &HashMap<WorkerId, WorkerNode>,
2073 resource_group: &str,
2074) -> BTreeSet<WorkerId> {
2075 workers
2076 .iter()
2077 .filter(|&(_, worker)| {
2078 worker
2079 .resource_group()
2080 .map(|node_label| node_label.as_str() == resource_group)
2081 .unwrap_or(false)
2082 })
2083 .map(|(id, _)| *id)
2084 .collect()
2085}
2086
2087pub async fn rename_relation_refer(
2090 txn: &DatabaseTransaction,
2091 object_type: ObjectType,
2092 object_id: ObjectId,
2093 object_name: &str,
2094 old_name: &str,
2095) -> MetaResult<Vec<PbObject>> {
2096 use sea_orm::ActiveModelTrait;
2097
2098 use crate::controller::rename::alter_relation_rename_refs;
2099
2100 let mut to_update_relations = vec![];
2101 macro_rules! rename_relation_ref {
2102 ($entity:ident, $table:ident, $identity:ident, $object_id:expr) => {{
2103 let (mut relation, obj) = $entity::find_by_id($object_id)
2104 .find_also_related(Object)
2105 .one(txn)
2106 .await?
2107 .unwrap();
2108 relation.definition =
2109 alter_relation_rename_refs(&relation.definition, old_name, object_name);
2110 let active_model = $table::ActiveModel {
2111 $identity: Set(relation.$identity),
2112 definition: Set(relation.definition.clone()),
2113 ..Default::default()
2114 };
2115 active_model.update(txn).await?;
2116 let streaming_job = streaming_job::Entity::find_by_id($object_id.as_raw_id())
2117 .one(txn)
2118 .await?;
2119 to_update_relations.push(PbObject {
2120 object_info: Some(PbObjectInfo::$entity(
2121 ObjectModel(relation, obj.unwrap(), streaming_job).into(),
2122 )),
2123 });
2124 }};
2125 }
2126 let mut objs = get_referring_objects(object_id, txn).await?;
2127 if object_type == ObjectType::Table {
2128 let incoming_sinks: Vec<SinkId> = Sink::find()
2129 .select_only()
2130 .column(sink::Column::SinkId)
2131 .filter(sink::Column::TargetTable.eq(object_id))
2132 .into_tuple()
2133 .all(txn)
2134 .await?;
2135
2136 objs.extend(incoming_sinks.into_iter().map(|id| PartialObject {
2137 oid: id.as_object_id(),
2138 obj_type: ObjectType::Sink,
2139 schema_id: None,
2140 database_id: None,
2141 }));
2142 }
2143
2144 for obj in objs {
2145 match obj.obj_type {
2146 ObjectType::Table => {
2147 rename_relation_ref!(Table, table, table_id, obj.oid.as_table_id())
2148 }
2149 ObjectType::Sink => {
2150 rename_relation_ref!(Sink, sink, sink_id, obj.oid.as_sink_id())
2151 }
2152 ObjectType::Subscription => {
2153 rename_relation_ref!(
2154 Subscription,
2155 subscription,
2156 subscription_id,
2157 obj.oid.as_subscription_id()
2158 )
2159 }
2160 ObjectType::View => rename_relation_ref!(View, view, view_id, obj.oid.as_view_id()),
2161 ObjectType::Index => {
2162 let index_table_id: Option<TableId> = Index::find_by_id(obj.oid.as_index_id())
2163 .select_only()
2164 .column(index::Column::IndexTableId)
2165 .into_tuple()
2166 .one(txn)
2167 .await?;
2168 rename_relation_ref!(Table, table, table_id, index_table_id.unwrap());
2169 }
2170 _ => {
2171 bail!(
2172 "only the table, sink, subscription, view and index will depend on other objects."
2173 )
2174 }
2175 }
2176 }
2177
2178 Ok(to_update_relations)
2179}
2180
2181pub async fn validate_subscription_deletion<C>(
2185 txn: &C,
2186 subscription_id: SubscriptionId,
2187) -> MetaResult<()>
2188where
2189 C: ConnectionTrait,
2190{
2191 let upstream_table_id: ObjectId = Subscription::find_by_id(subscription_id)
2192 .select_only()
2193 .column(subscription::Column::DependentTableId)
2194 .into_tuple()
2195 .one(txn)
2196 .await?
2197 .ok_or_else(|| MetaError::catalog_id_not_found("subscription", subscription_id))?;
2198
2199 let cnt = Subscription::find()
2200 .filter(subscription::Column::DependentTableId.eq(upstream_table_id))
2201 .count(txn)
2202 .await?;
2203 if cnt > 1 {
2204 return Ok(());
2207 }
2208
2209 let obj_alias = Alias::new("o1");
2211 let used_by_alias = Alias::new("o2");
2212 let count = ObjectDependency::find()
2213 .join_as(
2214 JoinType::InnerJoin,
2215 object_dependency::Relation::Object2.def(),
2216 obj_alias.clone(),
2217 )
2218 .join_as(
2219 JoinType::InnerJoin,
2220 object_dependency::Relation::Object1.def(),
2221 used_by_alias.clone(),
2222 )
2223 .filter(
2224 object_dependency::Column::Oid
2225 .eq(upstream_table_id)
2226 .and(object_dependency::Column::UsedBy.ne(subscription_id))
2227 .and(
2228 Expr::col((obj_alias, object::Column::DatabaseId))
2229 .ne(Expr::col((used_by_alias, object::Column::DatabaseId))),
2230 ),
2231 )
2232 .count(txn)
2233 .await?;
2234
2235 if count != 0 {
2236 return Err(MetaError::permission_denied(format!(
2237 "Referenced by {} cross-db objects.",
2238 count
2239 )));
2240 }
2241
2242 Ok(())
2243}
2244
2245pub async fn fetch_target_fragments<C>(
2246 txn: &C,
2247 src_fragment_id: impl IntoIterator<Item = FragmentId>,
2248) -> MetaResult<HashMap<FragmentId, Vec<FragmentId>>>
2249where
2250 C: ConnectionTrait,
2251{
2252 let source_target_fragments: Vec<(FragmentId, FragmentId)> = FragmentRelation::find()
2253 .select_only()
2254 .columns([
2255 fragment_relation::Column::SourceFragmentId,
2256 fragment_relation::Column::TargetFragmentId,
2257 ])
2258 .filter(fragment_relation::Column::SourceFragmentId.is_in(src_fragment_id))
2259 .into_tuple()
2260 .all(txn)
2261 .await?;
2262
2263 let source_target_fragments = source_target_fragments.into_iter().into_group_map();
2264
2265 Ok(source_target_fragments)
2266}
2267
2268pub async fn get_sink_fragment_by_ids<C>(
2269 txn: &C,
2270 sink_ids: Vec<SinkId>,
2271) -> MetaResult<HashMap<SinkId, FragmentId>>
2272where
2273 C: ConnectionTrait,
2274{
2275 let sink_num = sink_ids.len();
2276 let sink_fragment_ids: Vec<(SinkId, FragmentId)> = Fragment::find()
2277 .select_only()
2278 .columns([fragment::Column::JobId, fragment::Column::FragmentId])
2279 .filter(
2280 fragment::Column::JobId
2281 .is_in(sink_ids)
2282 .and(FragmentTypeMask::intersects(FragmentTypeFlag::Sink)),
2283 )
2284 .into_tuple()
2285 .all(txn)
2286 .await?;
2287
2288 if sink_fragment_ids.len() != sink_num {
2289 return Err(anyhow::anyhow!(
2290 "expected exactly one sink fragment for each sink, but got {} fragments for {} sinks",
2291 sink_fragment_ids.len(),
2292 sink_num
2293 )
2294 .into());
2295 }
2296
2297 Ok(sink_fragment_ids.into_iter().collect())
2298}
2299
2300pub async fn has_table_been_migrated<C>(txn: &C, table_id: TableId) -> MetaResult<bool>
2301where
2302 C: ConnectionTrait,
2303{
2304 let mview_fragment: Vec<i32> = Fragment::find()
2305 .select_only()
2306 .column(fragment::Column::FragmentTypeMask)
2307 .filter(
2308 fragment::Column::JobId
2309 .eq(table_id)
2310 .and(FragmentTypeMask::intersects(FragmentTypeFlag::Mview)),
2311 )
2312 .into_tuple()
2313 .all(txn)
2314 .await?;
2315
2316 let mview_fragment_len = mview_fragment.len();
2317 if mview_fragment_len != 1 {
2318 bail!(
2319 "expected exactly one mview fragment for table {}, found {}",
2320 table_id,
2321 mview_fragment_len
2322 );
2323 }
2324
2325 let mview_fragment = mview_fragment.into_iter().next().unwrap();
2326 let migrated =
2327 FragmentTypeMask::from(mview_fragment).contains(FragmentTypeFlag::UpstreamSinkUnion);
2328
2329 Ok(migrated)
2330}
2331
2332pub async fn try_get_iceberg_table_by_downstream_sink<C>(
2333 txn: &C,
2334 sink_id: SinkId,
2335) -> MetaResult<Option<TableId>>
2336where
2337 C: ConnectionTrait,
2338{
2339 let sink = Sink::find_by_id(sink_id).one(txn).await?;
2340 let Some(sink) = sink else {
2341 return Ok(None);
2342 };
2343
2344 if sink.name.starts_with(ICEBERG_SINK_PREFIX) {
2345 let object_ids: Vec<ObjectId> = ObjectDependency::find()
2346 .select_only()
2347 .column(object_dependency::Column::Oid)
2348 .filter(object_dependency::Column::UsedBy.eq(sink_id))
2349 .into_tuple()
2350 .all(txn)
2351 .await?;
2352 let mut iceberg_table_ids = vec![];
2353 for object_id in object_ids {
2354 let table_id = object_id.as_table_id();
2355 if let Some(table_engine) = Table::find_by_id(table_id)
2356 .select_only()
2357 .column(table::Column::Engine)
2358 .into_tuple::<table::Engine>()
2359 .one(txn)
2360 .await?
2361 && table_engine == table::Engine::Iceberg
2362 {
2363 iceberg_table_ids.push(table_id);
2364 }
2365 }
2366 if iceberg_table_ids.len() == 1 {
2367 return Ok(Some(iceberg_table_ids[0]));
2368 }
2369 }
2370 Ok(None)
2371}
2372
2373pub async fn check_if_belongs_to_iceberg_table<C>(txn: &C, job_id: JobId) -> MetaResult<bool>
2374where
2375 C: ConnectionTrait,
2376{
2377 if let Some(engine) = Table::find_by_id(job_id.as_mv_table_id())
2378 .select_only()
2379 .column(table::Column::Engine)
2380 .into_tuple::<table::Engine>()
2381 .one(txn)
2382 .await?
2383 && engine == table::Engine::Iceberg
2384 {
2385 return Ok(true);
2386 }
2387 if let Some(sink_name) = Sink::find_by_id(job_id.as_sink_id())
2388 .select_only()
2389 .column(sink::Column::Name)
2390 .into_tuple::<String>()
2391 .one(txn)
2392 .await?
2393 && sink_name.starts_with(ICEBERG_SINK_PREFIX)
2394 {
2395 return Ok(true);
2396 }
2397 Ok(false)
2398}
2399
2400pub async fn find_dirty_iceberg_table_jobs<C>(
2401 txn: &C,
2402 database_id: Option<DatabaseId>,
2403) -> MetaResult<Vec<PartialObject>>
2404where
2405 C: ConnectionTrait,
2406{
2407 let mut filter_condition = streaming_job::Column::JobStatus
2408 .ne(JobStatus::Created)
2409 .and(object::Column::ObjType.is_in([ObjectType::Table, ObjectType::Sink]))
2410 .and(streaming_job::Column::CreateType.eq(CreateType::Background));
2411 if let Some(database_id) = database_id {
2412 filter_condition = filter_condition.and(object::Column::DatabaseId.eq(database_id));
2413 }
2414 let creating_table_sink_jobs: Vec<PartialObject> = StreamingJob::find()
2415 .select_only()
2416 .columns([
2417 object::Column::Oid,
2418 object::Column::ObjType,
2419 object::Column::SchemaId,
2420 object::Column::DatabaseId,
2421 ])
2422 .join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
2423 .filter(filter_condition)
2424 .into_partial_model()
2425 .all(txn)
2426 .await?;
2427
2428 let mut dirty_iceberg_table_jobs = vec![];
2429 for job in creating_table_sink_jobs {
2430 if check_if_belongs_to_iceberg_table(txn, job.oid.as_job_id()).await? {
2431 tracing::info!("Found dirty iceberg job with id: {}", job.oid);
2432 dirty_iceberg_table_jobs.push(job);
2433 }
2434 }
2435
2436 Ok(dirty_iceberg_table_jobs)
2437}
2438
2439pub fn build_select_node_list(
2440 from: &[ColumnCatalog],
2441 to: &[ColumnCatalog],
2442) -> MetaResult<Vec<PbExprNode>> {
2443 let mut exprs = Vec::with_capacity(to.len());
2444 let idx_by_col_id = from
2445 .iter()
2446 .enumerate()
2447 .map(|(idx, col)| (col.column_desc.as_ref().unwrap().column_id, idx))
2448 .collect::<HashMap<_, _>>();
2449
2450 for to_col in to {
2451 let to_col = to_col.column_desc.as_ref().unwrap();
2452 let to_col_type_ref = to_col.column_type.as_ref().unwrap();
2453 let to_col_type = DataType::from(to_col_type_ref);
2454 if let Some(from_idx) = idx_by_col_id.get(&to_col.column_id) {
2455 let from_col_type = DataType::from(
2456 from[*from_idx]
2457 .column_desc
2458 .as_ref()
2459 .unwrap()
2460 .column_type
2461 .as_ref()
2462 .unwrap(),
2463 );
2464 if !to_col_type.equals_datatype(&from_col_type) {
2465 return Err(anyhow!(
2466 "Column type mismatch: {:?} != {:?}",
2467 from_col_type,
2468 to_col_type
2469 )
2470 .into());
2471 }
2472 exprs.push(PbExprNode {
2473 function_type: expr_node::Type::Unspecified.into(),
2474 return_type: Some(to_col_type_ref.clone()),
2475 rex_node: Some(expr_node::RexNode::InputRef(*from_idx as _)),
2476 });
2477 } else {
2478 let to_default_node =
2479 if let Some(GeneratedOrDefaultColumn::DefaultColumn(DefaultColumnDesc {
2480 expr,
2481 ..
2482 })) = &to_col.generated_or_default_column
2483 {
2484 expr.clone().unwrap()
2485 } else {
2486 let null = Datum::None.to_protobuf();
2487 PbExprNode {
2488 function_type: expr_node::Type::Unspecified.into(),
2489 return_type: Some(to_col_type_ref.clone()),
2490 rex_node: Some(expr_node::RexNode::Constant(null)),
2491 }
2492 };
2493 exprs.push(to_default_node);
2494 }
2495 }
2496
2497 Ok(exprs)
2498}
2499
2500#[derive(Clone, Debug, Default)]
2501pub struct StreamingJobExtraInfo {
2502 pub timezone: Option<String>,
2503 pub config_override: Arc<str>,
2504 pub adaptive_parallelism_strategy: Option<AdaptiveParallelismStrategy>,
2505 pub job_definition: String,
2506 pub backfill_orders: Option<BackfillOrders>,
2507}
2508
2509impl StreamingJobExtraInfo {
2510 pub fn stream_context(&self) -> StreamContext {
2511 StreamContext {
2512 timezone: self.timezone.clone(),
2513 config_override: self.config_override.clone(),
2514 adaptive_parallelism_strategy: self.adaptive_parallelism_strategy,
2515 }
2516 }
2517}
2518
2519type StreamingJobExtraInfoRow = (
2521 JobId,
2522 Option<String>,
2523 Option<String>,
2524 Option<String>,
2525 Option<BackfillOrders>,
2526);
2527
2528pub async fn get_streaming_job_extra_info<C>(
2529 txn: &C,
2530 job_ids: Vec<JobId>,
2531) -> MetaResult<HashMap<JobId, StreamingJobExtraInfo>>
2532where
2533 C: ConnectionTrait,
2534{
2535 let pairs: Vec<StreamingJobExtraInfoRow> = StreamingJob::find()
2536 .select_only()
2537 .columns([
2538 streaming_job::Column::JobId,
2539 streaming_job::Column::Timezone,
2540 streaming_job::Column::ConfigOverride,
2541 streaming_job::Column::AdaptiveParallelismStrategy,
2542 streaming_job::Column::BackfillOrders,
2543 ])
2544 .filter(streaming_job::Column::JobId.is_in(job_ids.clone()))
2545 .into_tuple()
2546 .all(txn)
2547 .await?;
2548
2549 let job_ids = job_ids.into_iter().collect();
2550
2551 let mut definitions = resolve_streaming_job_definition(txn, &job_ids).await?;
2552
2553 let result = pairs
2554 .into_iter()
2555 .map(
2556 |(job_id, timezone, config_override, strategy, backfill_orders)| {
2557 let job_definition = definitions.remove(&job_id).unwrap_or_default();
2558 let adaptive_parallelism_strategy = strategy.as_deref().map(|s| {
2559 parse_strategy(s).expect("strategy should be validated before storing")
2560 });
2561 (
2562 job_id,
2563 StreamingJobExtraInfo {
2564 timezone,
2565 config_override: config_override.unwrap_or_default().into(),
2566 adaptive_parallelism_strategy,
2567 job_definition,
2568 backfill_orders,
2569 },
2570 )
2571 },
2572 )
2573 .collect();
2574
2575 Ok(result)
2576}
2577
2578#[cfg(test)]
2579mod tests {
2580 use super::*;
2581
2582 #[test]
2583 fn test_extract_cdc_table_name() {
2584 let ddl1 = "CREATE TABLE t1 () FROM pg_source TABLE 'public.t1'";
2585 let ddl2 = "CREATE TABLE t2 (v1 int) FROM pg_source TABLE 'mydb.t2'";
2586 assert_eq!(
2587 extract_external_table_name_from_definition(ddl1),
2588 Some("public.t1".into())
2589 );
2590 assert_eq!(
2591 extract_external_table_name_from_definition(ddl2),
2592 Some("mydb.t2".into())
2593 );
2594 }
2595}