1use std::collections::{BTreeSet, HashMap, HashSet};
16use std::sync::Arc;
17
18use anyhow::{Context, anyhow};
19use itertools::Itertools;
20use risingwave_common::bitmap::Bitmap;
21use risingwave_common::catalog::{
22 FragmentTypeFlag, FragmentTypeMask, ICEBERG_SINK_PREFIX, ICEBERG_SOURCE_PREFIX,
23};
24use risingwave_common::hash::{ActorMapping, VnodeBitmapExt, WorkerSlotId, WorkerSlotMapping};
25use risingwave_common::id::{JobId, SubscriptionId};
26use risingwave_common::system_param::AdaptiveParallelismStrategy;
27use risingwave_common::system_param::adaptive_parallelism_strategy::parse_strategy;
28use risingwave_common::types::{DataType, Datum};
29use risingwave_common::util::value_encoding::DatumToProtoExt;
30use risingwave_common::util::worker_util::DEFAULT_RESOURCE_GROUP;
31use risingwave_common::{bail, hash};
32use risingwave_meta_model::fragment::DistributionType;
33use risingwave_meta_model::object::ObjectType;
34use risingwave_meta_model::prelude::*;
35use risingwave_meta_model::streaming_job::BackfillOrders;
36use risingwave_meta_model::table::TableType;
37use risingwave_meta_model::user_privilege::Action;
38use risingwave_meta_model::{
39 ActorId, ColumnCatalogArray, CreateType, DataTypeArray, DatabaseId, DispatcherType, FragmentId,
40 JobStatus, ObjectId, PrivilegeId, SchemaId, SinkId, SourceId, StreamNode, StreamSourceInfo,
41 TableId, TableIdArray, UserId, WorkerId, connection, database, fragment, fragment_relation,
42 function, index, object, object_dependency, schema, secret, sink, source, streaming_job,
43 subscription, table, user, user_default_privilege, user_privilege, view,
44};
45use risingwave_meta_model_migration::WithQuery;
46use risingwave_pb::catalog::{
47 PbConnection, PbDatabase, PbFunction, PbIndex, PbSchema, PbSecret, PbSink, PbSource,
48 PbSubscription, PbTable, PbView,
49};
50use risingwave_pb::common::{PbObjectType, WorkerNode};
51use risingwave_pb::expr::{PbExprNode, expr_node};
52use risingwave_pb::meta::object::PbObjectInfo;
53use risingwave_pb::meta::subscribe_response::Info as NotificationInfo;
54use risingwave_pb::meta::{
55 ObjectDependency as PbObjectDependency, PbFragmentWorkerSlotMapping, PbObject, PbObjectGroup,
56};
57use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn;
58use risingwave_pb::plan_common::{ColumnCatalog, DefaultColumnDesc};
59use risingwave_pb::stream_plan::{PbDispatchOutputMapping, PbDispatcher, PbDispatcherType};
60use risingwave_pb::user::grant_privilege::{PbActionWithGrantOption, PbObject as PbGrantObject};
61use risingwave_pb::user::{PbAction, PbGrantPrivilege, PbUserInfo};
62use risingwave_sqlparser::ast::Statement as SqlStatement;
63use risingwave_sqlparser::parser::Parser;
64use sea_orm::sea_query::{
65 Alias, CommonTableExpression, Expr, Query, QueryStatementBuilder, SelectStatement, UnionType,
66 WithClause,
67};
68use sea_orm::{
69 ColumnTrait, ConnectionTrait, DatabaseTransaction, DerivePartialModel, EntityTrait,
70 FromQueryResult, IntoActiveModel, JoinType, Order, PaginatorTrait, QueryFilter, QuerySelect,
71 RelationTrait, Set, Statement,
72};
73use thiserror_ext::AsReport;
74use tracing::warn;
75
76use crate::barrier::SharedFragmentInfo;
77use crate::controller::ObjectModel;
78use crate::controller::fragment::FragmentTypeMaskExt;
79use crate::controller::scale::resolve_streaming_job_definition;
80use crate::model::{FragmentDownstreamRelation, StreamContext};
81use crate::{MetaError, MetaResult};
82
83pub fn construct_obj_dependency_query(obj_id: ObjectId) -> WithQuery {
108 let cte_alias = Alias::new("used_by_object_ids");
109 let cte_return_alias = Alias::new("used_by");
110
111 let mut base_query = SelectStatement::new()
112 .column(object_dependency::Column::UsedBy)
113 .from(ObjectDependency)
114 .and_where(object_dependency::Column::Oid.eq(obj_id))
115 .to_owned();
116
117 let belonged_obj_query = SelectStatement::new()
118 .column(object::Column::Oid)
119 .from(Object)
120 .and_where(
121 object::Column::DatabaseId
122 .eq(obj_id)
123 .or(object::Column::SchemaId.eq(obj_id)),
124 )
125 .to_owned();
126
127 let cte_referencing = Query::select()
128 .column((ObjectDependency, object_dependency::Column::UsedBy))
129 .from(ObjectDependency)
130 .inner_join(
131 cte_alias.clone(),
132 Expr::col((cte_alias.clone(), cte_return_alias.clone()))
133 .equals(object_dependency::Column::Oid),
134 )
135 .to_owned();
136
137 let mut common_table_expr = CommonTableExpression::new();
138 common_table_expr
139 .query(
140 base_query
141 .union(UnionType::All, belonged_obj_query)
142 .union(UnionType::All, cte_referencing)
143 .to_owned(),
144 )
145 .column(cte_return_alias.clone())
146 .table_name(cte_alias.clone());
147
148 SelectStatement::new()
149 .distinct()
150 .columns([
151 object::Column::Oid,
152 object::Column::ObjType,
153 object::Column::SchemaId,
154 object::Column::DatabaseId,
155 ])
156 .from(cte_alias.clone())
157 .inner_join(
158 Object,
159 Expr::col((cte_alias, cte_return_alias)).equals(object::Column::Oid),
160 )
161 .order_by(object::Column::Oid, Order::Desc)
162 .to_owned()
163 .with(
164 WithClause::new()
165 .recursive(true)
166 .cte(common_table_expr)
167 .to_owned(),
168 )
169}
170
171fn to_pb_object_type(obj_type: ObjectType) -> PbObjectType {
172 match obj_type {
173 ObjectType::Database => PbObjectType::Database,
174 ObjectType::Schema => PbObjectType::Schema,
175 ObjectType::Table => PbObjectType::Table,
176 ObjectType::Source => PbObjectType::Source,
177 ObjectType::Sink => PbObjectType::Sink,
178 ObjectType::View => PbObjectType::View,
179 ObjectType::Index => PbObjectType::Index,
180 ObjectType::Function => PbObjectType::Function,
181 ObjectType::Connection => PbObjectType::Connection,
182 ObjectType::Subscription => PbObjectType::Subscription,
183 ObjectType::Secret => PbObjectType::Secret,
184 }
185}
186
187async fn list_object_dependencies_impl(
189 txn: &DatabaseTransaction,
190 object_id: Option<ObjectId>,
191 include_creating: bool,
192) -> MetaResult<Vec<PbObjectDependency>> {
193 let referenced_alias = Alias::new("referenced_obj");
194 let mut query = ObjectDependency::find()
195 .select_only()
196 .columns([
197 object_dependency::Column::Oid,
198 object_dependency::Column::UsedBy,
199 ])
200 .column_as(
201 Expr::col((referenced_alias.clone(), object::Column::ObjType)),
202 "referenced_obj_type",
203 )
204 .join(
205 JoinType::InnerJoin,
206 object_dependency::Relation::Object1.def(),
207 )
208 .join_as(
209 JoinType::InnerJoin,
210 object_dependency::Relation::Object2.def(),
211 referenced_alias.clone(),
212 );
213 if let Some(object_id) = object_id {
214 query = query.filter(object_dependency::Column::UsedBy.eq(object_id));
215 }
216 let mut obj_dependencies: Vec<PbObjectDependency> = query
217 .into_tuple()
218 .all(txn)
219 .await?
220 .into_iter()
221 .map(|(oid, used_by, referenced_type)| PbObjectDependency {
222 object_id: used_by,
223 referenced_object_id: oid,
224 referenced_object_type: to_pb_object_type(referenced_type) as i32,
225 })
226 .collect();
227
228 let mut sink_query = Sink::find()
229 .select_only()
230 .columns([sink::Column::SinkId, sink::Column::TargetTable])
231 .filter(sink::Column::TargetTable.is_not_null());
232 if let Some(object_id) = object_id {
233 sink_query = sink_query.filter(
234 sink::Column::SinkId
235 .eq(object_id)
236 .or(sink::Column::TargetTable.eq(object_id)),
237 );
238 }
239 let sink_dependencies: Vec<(SinkId, TableId)> = sink_query.into_tuple().all(txn).await?;
240 obj_dependencies.extend(sink_dependencies.into_iter().map(|(sink_id, table_id)| {
241 PbObjectDependency {
242 object_id: table_id.into(),
243 referenced_object_id: sink_id.into(),
244 referenced_object_type: PbObjectType::Sink as i32,
245 }
246 }));
247
248 if !include_creating {
249 let mut streaming_job_ids = obj_dependencies
250 .iter()
251 .map(|dependency| dependency.object_id)
252 .collect_vec();
253 streaming_job_ids.sort_unstable();
254 streaming_job_ids.dedup();
255
256 if !streaming_job_ids.is_empty() {
257 let non_created_jobs: HashSet<JobId> = StreamingJob::find()
258 .select_only()
259 .columns([streaming_job::Column::JobId])
260 .filter(
261 streaming_job::Column::JobId
262 .is_in(streaming_job_ids)
263 .and(streaming_job::Column::JobStatus.ne(JobStatus::Created)),
264 )
265 .into_tuple()
266 .all(txn)
267 .await?
268 .into_iter()
269 .collect();
270
271 if !non_created_jobs.is_empty() {
272 obj_dependencies.retain(|dependency| {
273 !non_created_jobs.contains(&dependency.object_id.as_job_id())
274 });
275 }
276 }
277 }
278
279 Ok(obj_dependencies)
280}
281
282pub async fn list_object_dependencies(
284 txn: &DatabaseTransaction,
285 include_creating: bool,
286) -> MetaResult<Vec<PbObjectDependency>> {
287 list_object_dependencies_impl(txn, None, include_creating).await
288}
289
290pub async fn list_object_dependencies_by_object_id(
292 txn: &DatabaseTransaction,
293 object_id: ObjectId,
294) -> MetaResult<Vec<PbObjectDependency>> {
295 list_object_dependencies_impl(txn, Some(object_id), true).await
296}
297
298pub fn construct_sink_cycle_check_query(
323 target_table: ObjectId,
324 dependent_objects: Vec<ObjectId>,
325) -> WithQuery {
326 let cte_alias = Alias::new("used_by_object_ids_with_sink");
327 let depend_alias = Alias::new("obj_dependency_with_sink");
328
329 let mut base_query = SelectStatement::new()
330 .columns([
331 object_dependency::Column::Oid,
332 object_dependency::Column::UsedBy,
333 ])
334 .from(ObjectDependency)
335 .and_where(object_dependency::Column::Oid.eq(target_table))
336 .to_owned();
337
338 let query_sink_deps = SelectStatement::new()
339 .columns([sink::Column::SinkId, sink::Column::TargetTable])
340 .from(Sink)
341 .and_where(sink::Column::TargetTable.is_not_null())
342 .to_owned();
343
344 let cte_referencing = Query::select()
345 .column((depend_alias.clone(), object_dependency::Column::Oid))
346 .column((depend_alias.clone(), object_dependency::Column::UsedBy))
347 .from_subquery(
348 SelectStatement::new()
349 .columns([
350 object_dependency::Column::Oid,
351 object_dependency::Column::UsedBy,
352 ])
353 .from(ObjectDependency)
354 .union(UnionType::All, query_sink_deps)
355 .to_owned(),
356 depend_alias.clone(),
357 )
358 .inner_join(
359 cte_alias.clone(),
360 Expr::col((cte_alias.clone(), object_dependency::Column::UsedBy))
361 .eq(Expr::col((depend_alias, object_dependency::Column::Oid))),
362 )
363 .and_where(
364 Expr::col((cte_alias.clone(), object_dependency::Column::UsedBy)).ne(Expr::col((
365 cte_alias.clone(),
366 object_dependency::Column::Oid,
367 ))),
368 )
369 .to_owned();
370
371 let mut common_table_expr = CommonTableExpression::new();
372 common_table_expr
373 .query(base_query.union(UnionType::All, cte_referencing).to_owned())
374 .columns([
375 object_dependency::Column::Oid,
376 object_dependency::Column::UsedBy,
377 ])
378 .table_name(cte_alias.clone());
379
380 SelectStatement::new()
381 .expr(Expr::col((cte_alias.clone(), object_dependency::Column::UsedBy)).count())
382 .from(cte_alias.clone())
383 .and_where(
384 Expr::col((cte_alias, object_dependency::Column::UsedBy)).is_in(dependent_objects),
385 )
386 .to_owned()
387 .with(
388 WithClause::new()
389 .recursive(true)
390 .cte(common_table_expr)
391 .to_owned(),
392 )
393}
394
395#[derive(Clone, DerivePartialModel, FromQueryResult, Debug)]
396#[sea_orm(entity = "Object")]
397pub struct PartialObject {
398 pub oid: ObjectId,
399 pub obj_type: ObjectType,
400 pub schema_id: Option<SchemaId>,
401 pub database_id: Option<DatabaseId>,
402}
403
404#[derive(Clone, DerivePartialModel, FromQueryResult)]
405#[sea_orm(entity = "Fragment")]
406pub struct PartialFragmentStateTables {
407 pub fragment_id: FragmentId,
408 pub job_id: ObjectId,
409 pub state_table_ids: TableIdArray,
410}
411
412#[derive(Clone, Eq, PartialEq, Debug)]
413pub struct PartialActorLocation {
414 pub actor_id: ActorId,
415 pub fragment_id: FragmentId,
416 pub worker_id: WorkerId,
417}
418
419#[derive(FromQueryResult, Debug, Eq, PartialEq, Clone)]
420pub struct FragmentDesc {
421 pub fragment_id: FragmentId,
422 pub job_id: JobId,
423 pub fragment_type_mask: i32,
424 pub distribution_type: DistributionType,
425 pub state_table_ids: TableIdArray,
426 pub parallelism: i64,
427 pub vnode_count: i32,
428 pub stream_node: StreamNode,
429 pub parallelism_policy: String,
430}
431
432pub async fn get_referring_objects_cascade<C>(
434 obj_id: ObjectId,
435 db: &C,
436) -> MetaResult<Vec<PartialObject>>
437where
438 C: ConnectionTrait,
439{
440 let query = construct_obj_dependency_query(obj_id);
441 let (sql, values) = query.build_any(&*db.get_database_backend().get_query_builder());
442 let objects = PartialObject::find_by_statement(Statement::from_sql_and_values(
443 db.get_database_backend(),
444 sql,
445 values,
446 ))
447 .all(db)
448 .await?;
449 Ok(objects)
450}
451
452pub async fn check_sink_into_table_cycle<C>(
454 target_table: ObjectId,
455 dependent_objs: Vec<ObjectId>,
456 db: &C,
457) -> MetaResult<bool>
458where
459 C: ConnectionTrait,
460{
461 if dependent_objs.is_empty() {
462 return Ok(false);
463 }
464
465 if dependent_objs.contains(&target_table) {
467 return Ok(true);
468 }
469
470 let query = construct_sink_cycle_check_query(target_table, dependent_objs);
471 let (sql, values) = query.build_any(&*db.get_database_backend().get_query_builder());
472
473 let res = db
474 .query_one(Statement::from_sql_and_values(
475 db.get_database_backend(),
476 sql,
477 values,
478 ))
479 .await?
480 .unwrap();
481
482 let cnt: i64 = res.try_get_by(0)?;
483
484 Ok(cnt != 0)
485}
486
487pub async fn ensure_object_id<C>(
489 object_type: ObjectType,
490 obj_id: impl Into<ObjectId>,
491 db: &C,
492) -> MetaResult<()>
493where
494 C: ConnectionTrait,
495{
496 let obj_id = obj_id.into();
497 let count = Object::find_by_id(obj_id).count(db).await?;
498 if count == 0 {
499 return Err(MetaError::catalog_id_not_found(
500 object_type.as_str(),
501 obj_id,
502 ));
503 }
504 Ok(())
505}
506
507pub async fn ensure_job_not_canceled<C>(job_id: JobId, db: &C) -> MetaResult<()>
508where
509 C: ConnectionTrait,
510{
511 let count = Object::find_by_id(job_id).count(db).await?;
512 if count == 0 {
513 return Err(MetaError::cancelled(format!(
514 "job {} might be cancelled manually or by recovery",
515 job_id
516 )));
517 }
518 Ok(())
519}
520
521pub async fn ensure_user_id<C>(user_id: UserId, db: &C) -> MetaResult<()>
523where
524 C: ConnectionTrait,
525{
526 let count = User::find_by_id(user_id).count(db).await?;
527 if count == 0 {
528 return Err(anyhow!("user {} was concurrently dropped", user_id).into());
529 }
530 Ok(())
531}
532
533pub async fn check_database_name_duplicate<C>(name: &str, db: &C) -> MetaResult<()>
535where
536 C: ConnectionTrait,
537{
538 let count = Database::find()
539 .filter(database::Column::Name.eq(name))
540 .count(db)
541 .await?;
542 if count > 0 {
543 assert_eq!(count, 1);
544 return Err(MetaError::catalog_duplicated("database", name));
545 }
546 Ok(())
547}
548
549pub async fn check_function_signature_duplicate<C>(
551 pb_function: &PbFunction,
552 db: &C,
553) -> MetaResult<()>
554where
555 C: ConnectionTrait,
556{
557 let count = Function::find()
558 .inner_join(Object)
559 .filter(
560 object::Column::DatabaseId
561 .eq(pb_function.database_id)
562 .and(object::Column::SchemaId.eq(pb_function.schema_id))
563 .and(function::Column::Name.eq(&pb_function.name))
564 .and(
565 function::Column::ArgTypes
566 .eq(DataTypeArray::from(pb_function.arg_types.clone())),
567 ),
568 )
569 .count(db)
570 .await?;
571 if count > 0 {
572 assert_eq!(count, 1);
573 return Err(MetaError::catalog_duplicated("function", &pb_function.name));
574 }
575 Ok(())
576}
577
578pub async fn check_connection_name_duplicate<C>(
580 pb_connection: &PbConnection,
581 db: &C,
582) -> MetaResult<()>
583where
584 C: ConnectionTrait,
585{
586 let count = Connection::find()
587 .inner_join(Object)
588 .filter(
589 object::Column::DatabaseId
590 .eq(pb_connection.database_id)
591 .and(object::Column::SchemaId.eq(pb_connection.schema_id))
592 .and(connection::Column::Name.eq(&pb_connection.name)),
593 )
594 .count(db)
595 .await?;
596 if count > 0 {
597 assert_eq!(count, 1);
598 return Err(MetaError::catalog_duplicated(
599 "connection",
600 &pb_connection.name,
601 ));
602 }
603 Ok(())
604}
605
606pub async fn check_secret_name_duplicate<C>(pb_secret: &PbSecret, db: &C) -> MetaResult<()>
607where
608 C: ConnectionTrait,
609{
610 let count = Secret::find()
611 .inner_join(Object)
612 .filter(
613 object::Column::DatabaseId
614 .eq(pb_secret.database_id)
615 .and(object::Column::SchemaId.eq(pb_secret.schema_id))
616 .and(secret::Column::Name.eq(&pb_secret.name)),
617 )
618 .count(db)
619 .await?;
620 if count > 0 {
621 assert_eq!(count, 1);
622 return Err(MetaError::catalog_duplicated("secret", &pb_secret.name));
623 }
624 Ok(())
625}
626
627pub async fn check_subscription_name_duplicate<C>(
628 pb_subscription: &PbSubscription,
629 db: &C,
630) -> MetaResult<()>
631where
632 C: ConnectionTrait,
633{
634 let count = Subscription::find()
635 .inner_join(Object)
636 .filter(
637 object::Column::DatabaseId
638 .eq(pb_subscription.database_id)
639 .and(object::Column::SchemaId.eq(pb_subscription.schema_id))
640 .and(subscription::Column::Name.eq(&pb_subscription.name)),
641 )
642 .count(db)
643 .await?;
644 if count > 0 {
645 assert_eq!(count, 1);
646 return Err(MetaError::catalog_duplicated(
647 "subscription",
648 &pb_subscription.name,
649 ));
650 }
651 Ok(())
652}
653
654pub async fn check_user_name_duplicate<C>(name: &str, db: &C) -> MetaResult<()>
656where
657 C: ConnectionTrait,
658{
659 let count = User::find()
660 .filter(user::Column::Name.eq(name))
661 .count(db)
662 .await?;
663 if count > 0 {
664 assert_eq!(count, 1);
665 return Err(MetaError::catalog_duplicated("user", name));
666 }
667 Ok(())
668}
669
670pub async fn check_relation_name_duplicate<C>(
672 name: &str,
673 database_id: DatabaseId,
674 schema_id: SchemaId,
675 db: &C,
676) -> MetaResult<()>
677where
678 C: ConnectionTrait,
679{
680 macro_rules! check_duplicated {
681 ($obj_type:expr, $entity:ident, $table:ident) => {
682 let object_id = Object::find()
683 .select_only()
684 .column(object::Column::Oid)
685 .inner_join($entity)
686 .filter(
687 object::Column::DatabaseId
688 .eq(Some(database_id))
689 .and(object::Column::SchemaId.eq(Some(schema_id)))
690 .and($table::Column::Name.eq(name)),
691 )
692 .into_tuple::<ObjectId>()
693 .one(db)
694 .await?;
695 if let Some(oid) = object_id {
696 let check_creation = if $obj_type == ObjectType::View {
697 false
698 } else if $obj_type == ObjectType::Source {
699 let source_info = Source::find_by_id(oid.as_source_id())
700 .select_only()
701 .column(source::Column::SourceInfo)
702 .into_tuple::<Option<StreamSourceInfo>>()
703 .one(db)
704 .await?
705 .unwrap();
706 source_info.map_or(false, |info| info.to_protobuf().is_shared())
707 } else {
708 true
709 };
710 let job_id = oid.as_job_id();
711 return if check_creation
712 && !matches!(
713 StreamingJob::find_by_id(job_id)
714 .select_only()
715 .column(streaming_job::Column::JobStatus)
716 .into_tuple::<JobStatus>()
717 .one(db)
718 .await?,
719 Some(JobStatus::Created)
720 ) {
721 Err(MetaError::catalog_under_creation(
722 $obj_type.as_str(),
723 name,
724 job_id,
725 ))
726 } else {
727 Err(MetaError::catalog_duplicated($obj_type.as_str(), name))
728 };
729 }
730 };
731 }
732 check_duplicated!(ObjectType::Table, Table, table);
733 check_duplicated!(ObjectType::Source, Source, source);
734 check_duplicated!(ObjectType::Sink, Sink, sink);
735 check_duplicated!(ObjectType::Index, Index, index);
736 check_duplicated!(ObjectType::View, View, view);
737
738 Ok(())
739}
740
741pub async fn check_schema_name_duplicate<C>(
743 name: &str,
744 database_id: DatabaseId,
745 db: &C,
746) -> MetaResult<()>
747where
748 C: ConnectionTrait,
749{
750 let count = Object::find()
751 .inner_join(Schema)
752 .filter(
753 object::Column::ObjType
754 .eq(ObjectType::Schema)
755 .and(object::Column::DatabaseId.eq(Some(database_id)))
756 .and(schema::Column::Name.eq(name)),
757 )
758 .count(db)
759 .await?;
760 if count != 0 {
761 return Err(MetaError::catalog_duplicated("schema", name));
762 }
763
764 Ok(())
765}
766
767pub async fn check_object_refer_for_drop<C>(
770 object_type: ObjectType,
771 object_id: ObjectId,
772 db: &C,
773) -> MetaResult<()>
774where
775 C: ConnectionTrait,
776{
777 let count = if object_type == ObjectType::Table {
779 ObjectDependency::find()
780 .join(
781 JoinType::InnerJoin,
782 object_dependency::Relation::Object1.def(),
783 )
784 .filter(
785 object_dependency::Column::Oid
786 .eq(object_id)
787 .and(object::Column::ObjType.ne(ObjectType::Index)),
788 )
789 .count(db)
790 .await?
791 } else {
792 ObjectDependency::find()
793 .filter(object_dependency::Column::Oid.eq(object_id))
794 .count(db)
795 .await?
796 };
797 if count != 0 {
798 let referring_objects = get_referring_objects(object_id, db).await?;
800 let referring_objs_map = referring_objects
801 .into_iter()
802 .filter(|o| o.obj_type != ObjectType::Index)
803 .into_group_map_by(|o| o.obj_type);
804 let mut details = vec![];
805 for (obj_type, objs) in referring_objs_map {
806 match obj_type {
807 ObjectType::Table => {
808 let tables: Vec<(String, String)> = Object::find()
809 .join(JoinType::InnerJoin, object::Relation::Table.def())
810 .join(JoinType::InnerJoin, object::Relation::Database2.def())
811 .join(JoinType::InnerJoin, object::Relation::Schema2.def())
812 .select_only()
813 .column(schema::Column::Name)
814 .column(table::Column::Name)
815 .filter(object::Column::Oid.is_in(objs.iter().map(|o| o.oid)))
816 .into_tuple()
817 .all(db)
818 .await?;
819 details.extend(tables.into_iter().map(|(schema_name, table_name)| {
820 format!(
821 "materialized view {}.{} depends on it",
822 schema_name, table_name
823 )
824 }));
825 }
826 ObjectType::Sink => {
827 let sinks: Vec<(String, String)> = Object::find()
828 .join(JoinType::InnerJoin, object::Relation::Sink.def())
829 .join(JoinType::InnerJoin, object::Relation::Database2.def())
830 .join(JoinType::InnerJoin, object::Relation::Schema2.def())
831 .select_only()
832 .column(schema::Column::Name)
833 .column(sink::Column::Name)
834 .filter(object::Column::Oid.is_in(objs.iter().map(|o| o.oid)))
835 .into_tuple()
836 .all(db)
837 .await?;
838 if object_type == ObjectType::Table {
839 let engine = Table::find_by_id(object_id.as_table_id())
840 .select_only()
841 .column(table::Column::Engine)
842 .into_tuple::<table::Engine>()
843 .one(db)
844 .await?;
845 if engine == Some(table::Engine::Iceberg) && sinks.len() == 1 {
846 continue;
847 }
848 }
849 details.extend(sinks.into_iter().map(|(schema_name, sink_name)| {
850 format!("sink {}.{} depends on it", schema_name, sink_name)
851 }));
852 }
853 ObjectType::View => {
854 let views: Vec<(String, String)> = Object::find()
855 .join(JoinType::InnerJoin, object::Relation::View.def())
856 .join(JoinType::InnerJoin, object::Relation::Database2.def())
857 .join(JoinType::InnerJoin, object::Relation::Schema2.def())
858 .select_only()
859 .column(schema::Column::Name)
860 .column(view::Column::Name)
861 .filter(object::Column::Oid.is_in(objs.iter().map(|o| o.oid)))
862 .into_tuple()
863 .all(db)
864 .await?;
865 details.extend(views.into_iter().map(|(schema_name, view_name)| {
866 format!("view {}.{} depends on it", schema_name, view_name)
867 }));
868 }
869 ObjectType::Subscription => {
870 let subscriptions: Vec<(String, String)> = Object::find()
871 .join(JoinType::InnerJoin, object::Relation::Subscription.def())
872 .join(JoinType::InnerJoin, object::Relation::Database2.def())
873 .join(JoinType::InnerJoin, object::Relation::Schema2.def())
874 .select_only()
875 .column(schema::Column::Name)
876 .column(subscription::Column::Name)
877 .filter(object::Column::Oid.is_in(objs.iter().map(|o| o.oid)))
878 .into_tuple()
879 .all(db)
880 .await?;
881 details.extend(subscriptions.into_iter().map(
882 |(schema_name, subscription_name)| {
883 format!(
884 "subscription {}.{} depends on it",
885 schema_name, subscription_name
886 )
887 },
888 ));
889 }
890 ObjectType::Source => {
891 let sources: Vec<(String, String)> = Object::find()
892 .join(JoinType::InnerJoin, object::Relation::Source.def())
893 .join(JoinType::InnerJoin, object::Relation::Database2.def())
894 .join(JoinType::InnerJoin, object::Relation::Schema2.def())
895 .select_only()
896 .column(schema::Column::Name)
897 .column(source::Column::Name)
898 .filter(object::Column::Oid.is_in(objs.iter().map(|o| o.oid)))
899 .into_tuple()
900 .all(db)
901 .await?;
902 details.extend(sources.into_iter().map(|(schema_name, view_name)| {
903 format!("source {}.{} depends on it", schema_name, view_name)
904 }));
905 }
906 ObjectType::Connection => {
907 let connections: Vec<(String, String)> = Object::find()
908 .join(JoinType::InnerJoin, object::Relation::Connection.def())
909 .join(JoinType::InnerJoin, object::Relation::Database2.def())
910 .join(JoinType::InnerJoin, object::Relation::Schema2.def())
911 .select_only()
912 .column(schema::Column::Name)
913 .column(connection::Column::Name)
914 .filter(object::Column::Oid.is_in(objs.iter().map(|o| o.oid)))
915 .into_tuple()
916 .all(db)
917 .await?;
918 details.extend(connections.into_iter().map(|(schema_name, view_name)| {
919 format!("connection {}.{} depends on it", schema_name, view_name)
920 }));
921 }
922 _ => bail!("unexpected referring object type: {}", obj_type.as_str()),
924 }
925 }
926 if details.is_empty() {
927 return Ok(());
928 }
929
930 return Err(MetaError::permission_denied(format!(
931 "{} used by {} other objects. \nDETAIL: {}\n\
932 {}",
933 object_type.as_str(),
934 details.len(),
935 details.join("\n"),
936 match object_type {
937 ObjectType::Function | ObjectType::Connection | ObjectType::Secret =>
938 "HINT: DROP the dependent objects first.",
939 ObjectType::Database | ObjectType::Schema => unreachable!(),
940 _ => "HINT: Use DROP ... CASCADE to drop the dependent objects too.",
941 }
942 )));
943 }
944 Ok(())
945}
946
947pub async fn get_referring_objects<C>(object_id: ObjectId, db: &C) -> MetaResult<Vec<PartialObject>>
949where
950 C: ConnectionTrait,
951{
952 let objs = ObjectDependency::find()
953 .filter(object_dependency::Column::Oid.eq(object_id))
954 .join(
955 JoinType::InnerJoin,
956 object_dependency::Relation::Object1.def(),
957 )
958 .into_partial_model()
959 .all(db)
960 .await?;
961
962 Ok(objs)
963}
964
965pub async fn ensure_schema_empty<C>(schema_id: SchemaId, db: &C) -> MetaResult<()>
967where
968 C: ConnectionTrait,
969{
970 let count = Object::find()
971 .filter(object::Column::SchemaId.eq(Some(schema_id)))
972 .count(db)
973 .await?;
974 if count != 0 {
975 return Err(MetaError::permission_denied("schema is not empty"));
976 }
977
978 Ok(())
979}
980
981pub async fn list_user_info_by_ids<C>(
983 user_ids: impl IntoIterator<Item = UserId>,
984 db: &C,
985) -> MetaResult<Vec<PbUserInfo>>
986where
987 C: ConnectionTrait,
988{
989 let mut user_infos = vec![];
990 for user_id in user_ids {
991 let user = User::find_by_id(user_id)
992 .one(db)
993 .await?
994 .ok_or_else(|| MetaError::catalog_id_not_found("user", user_id))?;
995 let mut user_info: PbUserInfo = user.into();
996 user_info.grant_privileges = get_user_privilege(user_id, db).await?;
997 user_infos.push(user_info);
998 }
999 Ok(user_infos)
1000}
1001
1002pub async fn get_object_owner<C>(object_id: ObjectId, db: &C) -> MetaResult<UserId>
1004where
1005 C: ConnectionTrait,
1006{
1007 let obj_owner: UserId = Object::find_by_id(object_id)
1008 .select_only()
1009 .column(object::Column::OwnerId)
1010 .into_tuple()
1011 .one(db)
1012 .await?
1013 .ok_or_else(|| MetaError::catalog_id_not_found("object", object_id))?;
1014 Ok(obj_owner)
1015}
1016
1017pub fn construct_privilege_dependency_query(ids: Vec<PrivilegeId>) -> WithQuery {
1042 let cte_alias = Alias::new("granted_privilege_ids");
1043 let cte_return_privilege_alias = Alias::new("id");
1044 let cte_return_user_alias = Alias::new("user_id");
1045
1046 let mut base_query = SelectStatement::new()
1047 .columns([user_privilege::Column::Id, user_privilege::Column::UserId])
1048 .from(UserPrivilege)
1049 .and_where(user_privilege::Column::Id.is_in(ids))
1050 .to_owned();
1051
1052 let cte_referencing = Query::select()
1053 .columns([
1054 (UserPrivilege, user_privilege::Column::Id),
1055 (UserPrivilege, user_privilege::Column::UserId),
1056 ])
1057 .from(UserPrivilege)
1058 .inner_join(
1059 cte_alias.clone(),
1060 Expr::col((cte_alias.clone(), cte_return_privilege_alias.clone()))
1061 .equals(user_privilege::Column::DependentId),
1062 )
1063 .to_owned();
1064
1065 let mut common_table_expr = CommonTableExpression::new();
1066 common_table_expr
1067 .query(base_query.union(UnionType::All, cte_referencing).to_owned())
1068 .columns([
1069 cte_return_privilege_alias.clone(),
1070 cte_return_user_alias.clone(),
1071 ])
1072 .table_name(cte_alias.clone());
1073
1074 SelectStatement::new()
1075 .columns([cte_return_privilege_alias, cte_return_user_alias])
1076 .from(cte_alias)
1077 .to_owned()
1078 .with(
1079 WithClause::new()
1080 .recursive(true)
1081 .cte(common_table_expr)
1082 .to_owned(),
1083 )
1084}
1085
1086pub async fn get_internal_tables_by_id<C>(job_id: JobId, db: &C) -> MetaResult<Vec<TableId>>
1087where
1088 C: ConnectionTrait,
1089{
1090 let table_ids: Vec<TableId> = Table::find()
1091 .select_only()
1092 .column(table::Column::TableId)
1093 .filter(
1094 table::Column::TableType
1095 .eq(TableType::Internal)
1096 .and(table::Column::BelongsToJobId.eq(job_id)),
1097 )
1098 .into_tuple()
1099 .all(db)
1100 .await?;
1101 Ok(table_ids)
1102}
1103
1104pub async fn get_index_state_tables_by_table_id<C>(
1105 table_id: TableId,
1106 db: &C,
1107) -> MetaResult<Vec<TableId>>
1108where
1109 C: ConnectionTrait,
1110{
1111 let mut index_table_ids: Vec<TableId> = Index::find()
1112 .select_only()
1113 .column(index::Column::IndexTableId)
1114 .filter(index::Column::PrimaryTableId.eq(table_id))
1115 .into_tuple()
1116 .all(db)
1117 .await?;
1118
1119 if !index_table_ids.is_empty() {
1120 let internal_table_ids: Vec<TableId> = Table::find()
1121 .select_only()
1122 .column(table::Column::TableId)
1123 .filter(
1124 table::Column::TableType
1125 .eq(TableType::Internal)
1126 .and(table::Column::BelongsToJobId.is_in(index_table_ids.clone())),
1127 )
1128 .into_tuple()
1129 .all(db)
1130 .await?;
1131
1132 index_table_ids.extend(internal_table_ids.into_iter());
1133 }
1134
1135 Ok(index_table_ids)
1136}
1137
1138pub async fn get_iceberg_related_object_ids<C>(
1140 object_id: ObjectId,
1141 db: &C,
1142) -> MetaResult<Vec<ObjectId>>
1143where
1144 C: ConnectionTrait,
1145{
1146 let object = Object::find_by_id(object_id)
1147 .one(db)
1148 .await?
1149 .ok_or_else(|| MetaError::catalog_id_not_found("object", object_id))?;
1150 if object.obj_type != ObjectType::Table {
1151 return Ok(vec![]);
1152 }
1153
1154 let table = Table::find_by_id(object_id.as_table_id())
1155 .one(db)
1156 .await?
1157 .ok_or_else(|| MetaError::catalog_id_not_found("table", object_id))?;
1158 if !matches!(table.engine, Some(table::Engine::Iceberg)) {
1159 return Ok(vec![]);
1160 }
1161
1162 let database_id = object.database_id.unwrap();
1163 let schema_id = object.schema_id.unwrap();
1164
1165 let mut related_objects = vec![];
1166
1167 let iceberg_sink_name = format!("{}{}", ICEBERG_SINK_PREFIX, table.name);
1168 let iceberg_sink_id = Sink::find()
1169 .inner_join(Object)
1170 .select_only()
1171 .column(sink::Column::SinkId)
1172 .filter(
1173 object::Column::DatabaseId
1174 .eq(database_id)
1175 .and(object::Column::SchemaId.eq(schema_id))
1176 .and(sink::Column::Name.eq(&iceberg_sink_name)),
1177 )
1178 .into_tuple::<SinkId>()
1179 .one(db)
1180 .await?;
1181 if let Some(sink_id) = iceberg_sink_id {
1182 related_objects.push(sink_id.as_object_id());
1183 let sink_internal_tables = get_internal_tables_by_id(sink_id.as_job_id(), db).await?;
1184 related_objects.extend(
1185 sink_internal_tables
1186 .into_iter()
1187 .map(|tid| tid.as_object_id()),
1188 );
1189 } else {
1190 warn!(
1191 "iceberg table {} missing sink {}",
1192 table.name, iceberg_sink_name
1193 );
1194 }
1195
1196 let iceberg_source_name = format!("{}{}", ICEBERG_SOURCE_PREFIX, table.name);
1197 let iceberg_source_id = Source::find()
1198 .inner_join(Object)
1199 .select_only()
1200 .column(source::Column::SourceId)
1201 .filter(
1202 object::Column::DatabaseId
1203 .eq(database_id)
1204 .and(object::Column::SchemaId.eq(schema_id))
1205 .and(source::Column::Name.eq(&iceberg_source_name)),
1206 )
1207 .into_tuple::<SourceId>()
1208 .one(db)
1209 .await?;
1210 if let Some(source_id) = iceberg_source_id {
1211 related_objects.push(source_id.as_object_id());
1212 } else {
1213 warn!(
1214 "iceberg table {} missing source {}",
1215 table.name, iceberg_source_name
1216 );
1217 }
1218
1219 Ok(related_objects)
1220}
1221
1222pub(crate) async fn load_streaming_jobs_by_ids<C>(
1224 txn: &C,
1225 job_ids: impl IntoIterator<Item = JobId>,
1226) -> MetaResult<HashMap<JobId, streaming_job::Model>>
1227where
1228 C: ConnectionTrait,
1229{
1230 let job_ids: HashSet<JobId> = job_ids.into_iter().collect();
1231 if job_ids.is_empty() {
1232 return Ok(HashMap::new());
1233 }
1234 let jobs = streaming_job::Entity::find()
1235 .filter(streaming_job::Column::JobId.is_in(job_ids.clone()))
1236 .all(txn)
1237 .await?;
1238 Ok(jobs.into_iter().map(|job| (job.job_id, job)).collect())
1239}
1240
1241#[derive(Clone, DerivePartialModel, FromQueryResult)]
1242#[sea_orm(entity = "UserPrivilege")]
1243pub struct PartialUserPrivilege {
1244 pub id: PrivilegeId,
1245 pub user_id: UserId,
1246}
1247
1248pub async fn get_referring_privileges_cascade<C>(
1249 ids: Vec<PrivilegeId>,
1250 db: &C,
1251) -> MetaResult<Vec<PartialUserPrivilege>>
1252where
1253 C: ConnectionTrait,
1254{
1255 let query = construct_privilege_dependency_query(ids);
1256 let (sql, values) = query.build_any(&*db.get_database_backend().get_query_builder());
1257 let privileges = PartialUserPrivilege::find_by_statement(Statement::from_sql_and_values(
1258 db.get_database_backend(),
1259 sql,
1260 values,
1261 ))
1262 .all(db)
1263 .await?;
1264
1265 Ok(privileges)
1266}
1267
1268pub async fn ensure_privileges_not_referred<C>(ids: Vec<PrivilegeId>, db: &C) -> MetaResult<()>
1270where
1271 C: ConnectionTrait,
1272{
1273 let count = UserPrivilege::find()
1274 .filter(user_privilege::Column::DependentId.is_in(ids))
1275 .count(db)
1276 .await?;
1277 if count != 0 {
1278 return Err(MetaError::permission_denied(format!(
1279 "privileges granted to {} other ones.",
1280 count
1281 )));
1282 }
1283 Ok(())
1284}
1285
1286pub async fn get_user_privilege<C>(user_id: UserId, db: &C) -> MetaResult<Vec<PbGrantPrivilege>>
1288where
1289 C: ConnectionTrait,
1290{
1291 let user_privileges = UserPrivilege::find()
1292 .find_also_related(Object)
1293 .filter(user_privilege::Column::UserId.eq(user_id))
1294 .all(db)
1295 .await?;
1296 Ok(user_privileges
1297 .into_iter()
1298 .map(|(privilege, object)| {
1299 let object = object.unwrap();
1300 let obj = match object.obj_type {
1301 ObjectType::Database => PbGrantObject::DatabaseId(object.oid.as_database_id()),
1302 ObjectType::Schema => PbGrantObject::SchemaId(object.oid.as_schema_id()),
1303 ObjectType::Table | ObjectType::Index => {
1304 PbGrantObject::TableId(object.oid.as_table_id())
1305 }
1306 ObjectType::Source => PbGrantObject::SourceId(object.oid.as_source_id()),
1307 ObjectType::Sink => PbGrantObject::SinkId(object.oid.as_sink_id()),
1308 ObjectType::View => PbGrantObject::ViewId(object.oid.as_view_id()),
1309 ObjectType::Function => PbGrantObject::FunctionId(object.oid.as_function_id()),
1310 ObjectType::Connection => {
1311 PbGrantObject::ConnectionId(object.oid.as_connection_id())
1312 }
1313 ObjectType::Subscription => {
1314 PbGrantObject::SubscriptionId(object.oid.as_subscription_id())
1315 }
1316 ObjectType::Secret => PbGrantObject::SecretId(object.oid.as_secret_id()),
1317 };
1318 PbGrantPrivilege {
1319 action_with_opts: vec![PbActionWithGrantOption {
1320 action: PbAction::from(privilege.action) as _,
1321 with_grant_option: privilege.with_grant_option,
1322 granted_by: privilege.granted_by as _,
1323 }],
1324 object: Some(obj),
1325 }
1326 })
1327 .collect())
1328}
1329
1330pub async fn get_table_columns(
1331 txn: &impl ConnectionTrait,
1332 id: TableId,
1333) -> MetaResult<ColumnCatalogArray> {
1334 let columns = Table::find_by_id(id)
1335 .select_only()
1336 .columns([table::Column::Columns])
1337 .into_tuple::<ColumnCatalogArray>()
1338 .one(txn)
1339 .await?
1340 .ok_or_else(|| MetaError::catalog_id_not_found("table", id))?;
1341 Ok(columns)
1342}
1343
1344pub async fn grant_default_privileges_automatically<C>(
1347 db: &C,
1348 object_id: impl Into<ObjectId>,
1349) -> MetaResult<Vec<PbUserInfo>>
1350where
1351 C: ConnectionTrait,
1352{
1353 let object_id = object_id.into();
1354 let object = Object::find_by_id(object_id)
1355 .one(db)
1356 .await?
1357 .ok_or_else(|| MetaError::catalog_id_not_found("object", object_id))?;
1358 assert_ne!(object.obj_type, ObjectType::Database);
1359
1360 let for_mview_filter = if object.obj_type == ObjectType::Table {
1361 let table_type = Table::find_by_id(object_id.as_table_id())
1362 .select_only()
1363 .column(table::Column::TableType)
1364 .into_tuple::<TableType>()
1365 .one(db)
1366 .await?
1367 .ok_or_else(|| MetaError::catalog_id_not_found("table", object_id))?;
1368 user_default_privilege::Column::ForMaterializedView
1369 .eq(table_type == TableType::MaterializedView)
1370 } else {
1371 user_default_privilege::Column::ForMaterializedView.eq(false)
1372 };
1373 let schema_filter = if let Some(schema_id) = &object.schema_id {
1374 user_default_privilege::Column::SchemaId.eq(*schema_id)
1375 } else {
1376 user_default_privilege::Column::SchemaId.is_null()
1377 };
1378
1379 let default_privileges: Vec<(UserId, UserId, Action, bool)> = UserDefaultPrivilege::find()
1380 .select_only()
1381 .columns([
1382 user_default_privilege::Column::Grantee,
1383 user_default_privilege::Column::GrantedBy,
1384 user_default_privilege::Column::Action,
1385 user_default_privilege::Column::WithGrantOption,
1386 ])
1387 .filter(
1388 user_default_privilege::Column::DatabaseId
1389 .eq(object.database_id.unwrap())
1390 .and(schema_filter)
1391 .and(user_default_privilege::Column::UserId.eq(object.owner_id))
1392 .and(user_default_privilege::Column::ObjectType.eq(object.obj_type))
1393 .and(for_mview_filter),
1394 )
1395 .into_tuple()
1396 .all(db)
1397 .await?;
1398 if default_privileges.is_empty() {
1399 return Ok(vec![]);
1400 }
1401
1402 let updated_user_ids = default_privileges
1403 .iter()
1404 .map(|(grantee, _, _, _)| *grantee)
1405 .collect::<HashSet<_>>();
1406
1407 for (grantee, granted_by, action, with_grant_option) in default_privileges {
1408 UserPrivilege::insert(user_privilege::ActiveModel {
1409 user_id: Set(grantee),
1410 oid: Set(object_id),
1411 granted_by: Set(granted_by),
1412 action: Set(action),
1413 with_grant_option: Set(with_grant_option),
1414 ..Default::default()
1415 })
1416 .exec(db)
1417 .await?;
1418 if action == Action::Select {
1419 let internal_table_ids = get_internal_tables_by_id(object_id.as_job_id(), db).await?;
1421 if !internal_table_ids.is_empty() {
1422 for internal_table_id in &internal_table_ids {
1423 UserPrivilege::insert(user_privilege::ActiveModel {
1424 user_id: Set(grantee),
1425 oid: Set(internal_table_id.as_object_id()),
1426 granted_by: Set(granted_by),
1427 action: Set(Action::Select),
1428 with_grant_option: Set(with_grant_option),
1429 ..Default::default()
1430 })
1431 .exec(db)
1432 .await?;
1433 }
1434 }
1435
1436 let iceberg_privilege_object_ids =
1438 get_iceberg_related_object_ids(object_id, db).await?;
1439 if !iceberg_privilege_object_ids.is_empty() {
1440 for iceberg_object_id in &iceberg_privilege_object_ids {
1441 UserPrivilege::insert(user_privilege::ActiveModel {
1442 user_id: Set(grantee),
1443 oid: Set(*iceberg_object_id),
1444 granted_by: Set(granted_by),
1445 action: Set(action),
1446 with_grant_option: Set(with_grant_option),
1447 ..Default::default()
1448 })
1449 .exec(db)
1450 .await?;
1451 }
1452 }
1453 }
1454 }
1455
1456 let updated_user_infos = list_user_info_by_ids(updated_user_ids, db).await?;
1457 Ok(updated_user_infos)
1458}
1459
1460pub fn extract_grant_obj_id(object: &PbGrantObject) -> ObjectId {
1462 match object {
1463 PbGrantObject::DatabaseId(id) => (*id).into(),
1464 PbGrantObject::SchemaId(id) => (*id).into(),
1465 PbGrantObject::TableId(id) => (*id).into(),
1466 PbGrantObject::SourceId(id) => (*id).into(),
1467 PbGrantObject::SinkId(id) => (*id).into(),
1468 PbGrantObject::ViewId(id) => (*id).into(),
1469 PbGrantObject::FunctionId(id) => (*id).into(),
1470 PbGrantObject::SubscriptionId(id) => (*id).into(),
1471 PbGrantObject::ConnectionId(id) => (*id).into(),
1472 PbGrantObject::SecretId(id) => (*id).into(),
1473 }
1474}
1475
1476pub async fn insert_fragment_relations(
1477 db: &impl ConnectionTrait,
1478 downstream_fragment_relations: &FragmentDownstreamRelation,
1479) -> MetaResult<()> {
1480 let mut relations = vec![];
1481 for (upstream_fragment_id, downstreams) in downstream_fragment_relations {
1482 for downstream in downstreams {
1483 relations.push(
1484 fragment_relation::Model {
1485 source_fragment_id: *upstream_fragment_id as _,
1486 target_fragment_id: downstream.downstream_fragment_id as _,
1487 dispatcher_type: downstream.dispatcher_type,
1488 dist_key_indices: downstream
1489 .dist_key_indices
1490 .iter()
1491 .map(|idx| *idx as i32)
1492 .collect_vec()
1493 .into(),
1494 output_indices: downstream
1495 .output_mapping
1496 .indices
1497 .iter()
1498 .map(|idx| *idx as i32)
1499 .collect_vec()
1500 .into(),
1501 output_type_mapping: Some(downstream.output_mapping.types.clone().into()),
1502 }
1503 .into_active_model(),
1504 );
1505 }
1506 }
1507 if !relations.is_empty() {
1508 FragmentRelation::insert_many(relations).exec(db).await?;
1509 }
1510 Ok(())
1511}
1512
1513pub fn compose_dispatchers(
1514 source_fragment_distribution: DistributionType,
1515 source_fragment_actors: &HashMap<crate::model::ActorId, Option<Bitmap>>,
1516 target_fragment_id: crate::model::FragmentId,
1517 target_fragment_distribution: DistributionType,
1518 target_fragment_actors: &HashMap<crate::model::ActorId, Option<Bitmap>>,
1519 dispatcher_type: DispatcherType,
1520 dist_key_indices: Vec<u32>,
1521 output_mapping: PbDispatchOutputMapping,
1522) -> (
1523 HashMap<crate::model::ActorId, PbDispatcher>,
1524 Option<HashMap<crate::model::ActorId, crate::model::ActorId>>,
1525) {
1526 match dispatcher_type {
1527 DispatcherType::Hash => {
1528 let dispatcher = PbDispatcher {
1529 r#type: PbDispatcherType::from(dispatcher_type) as _,
1530 dist_key_indices,
1531 output_mapping: output_mapping.into(),
1532 hash_mapping: Some(
1533 ActorMapping::from_bitmaps(
1534 &target_fragment_actors
1535 .iter()
1536 .map(|(actor_id, bitmap)| {
1537 (
1538 *actor_id as _,
1539 bitmap
1540 .clone()
1541 .expect("downstream hash dispatch must have distribution"),
1542 )
1543 })
1544 .collect(),
1545 )
1546 .to_protobuf(),
1547 ),
1548 dispatcher_id: target_fragment_id,
1549 downstream_actor_id: target_fragment_actors.keys().copied().collect(),
1550 };
1551 (
1552 source_fragment_actors
1553 .keys()
1554 .map(|source_actor_id| (*source_actor_id, dispatcher.clone()))
1555 .collect(),
1556 None,
1557 )
1558 }
1559 DispatcherType::Broadcast | DispatcherType::Simple => {
1560 let dispatcher = PbDispatcher {
1561 r#type: PbDispatcherType::from(dispatcher_type) as _,
1562 dist_key_indices,
1563 output_mapping: output_mapping.into(),
1564 hash_mapping: None,
1565 dispatcher_id: target_fragment_id,
1566 downstream_actor_id: target_fragment_actors.keys().copied().collect(),
1567 };
1568 (
1569 source_fragment_actors
1570 .keys()
1571 .map(|source_actor_id| (*source_actor_id, dispatcher.clone()))
1572 .collect(),
1573 None,
1574 )
1575 }
1576 DispatcherType::NoShuffle => {
1577 let no_shuffle_map = resolve_no_shuffle_actor_mapping(
1578 source_fragment_distribution,
1579 source_fragment_actors
1580 .iter()
1581 .map(|(&id, bitmap)| (id, bitmap)),
1582 target_fragment_distribution,
1583 target_fragment_actors
1584 .iter()
1585 .map(|(&id, bitmap)| (id, bitmap)),
1586 );
1587 let dispatchers = no_shuffle_map
1588 .iter()
1589 .map(|(&upstream_actor_id, &downstream_actor_id)| {
1590 (
1591 upstream_actor_id,
1592 PbDispatcher {
1593 r#type: PbDispatcherType::NoShuffle as _,
1594 dist_key_indices: dist_key_indices.clone(),
1595 output_mapping: output_mapping.clone().into(),
1596 hash_mapping: None,
1597 dispatcher_id: target_fragment_id,
1598 downstream_actor_id: vec![downstream_actor_id],
1599 },
1600 )
1601 })
1602 .collect();
1603 (dispatchers, Some(no_shuffle_map))
1604 }
1605 }
1606}
1607
1608pub fn resolve_no_shuffle_actor_mapping<
1618 'a,
1619 ActorId: Copy + Eq + std::hash::Hash + std::fmt::Debug,
1620>(
1621 source_fragment_distribution: DistributionType,
1622 source_fragment_actors: impl IntoIterator<Item = (ActorId, &'a Option<Bitmap>)>,
1623 target_fragment_distribution: DistributionType,
1624 target_fragment_actors: impl IntoIterator<Item = (ActorId, &'a Option<Bitmap>)>,
1625) -> HashMap<ActorId, ActorId> {
1626 assert_eq!(source_fragment_distribution, target_fragment_distribution);
1627
1628 match source_fragment_distribution {
1629 DistributionType::Single => {
1630 let assert_singleton = |bitmap: &Option<Bitmap>| {
1631 assert!(
1632 bitmap.as_ref().map(|bitmap| bitmap.all()).unwrap_or(true),
1633 "not singleton: {:?}",
1634 bitmap
1635 );
1636 };
1637 let (source_actor_id, bitmap) = source_fragment_actors
1638 .into_iter()
1639 .exactly_one()
1640 .ok()
1641 .expect("Single distribution should have exactly one source actor");
1642 assert_singleton(bitmap);
1643 let (target_actor_id, bitmap) = target_fragment_actors
1644 .into_iter()
1645 .exactly_one()
1646 .ok()
1647 .expect("Single distribution should have exactly one target actor");
1648 assert_singleton(bitmap);
1649 HashMap::from([(source_actor_id, target_actor_id)])
1650 }
1651 DistributionType::Hash => {
1652 let mut target_by_vnode: HashMap<_, _> = target_fragment_actors
1654 .into_iter()
1655 .map(|(actor_id, bitmap)| {
1656 let bitmap = bitmap
1657 .as_ref()
1658 .expect("hash distribution should have bitmap");
1659 let first_vnode = bitmap.iter_vnodes().next().expect("non-empty bitmap");
1660 (first_vnode, (actor_id, bitmap))
1661 })
1662 .collect();
1663
1664 let target_count = target_by_vnode.len();
1665
1666 let mapping: HashMap<_, _> = source_fragment_actors
1668 .into_iter()
1669 .map(|(source_actor_id, source_bitmap)| {
1670 let source_bitmap = source_bitmap
1671 .as_ref()
1672 .expect("hash distribution should have bitmap");
1673 let first_vnode = source_bitmap
1674 .iter_vnodes()
1675 .next()
1676 .expect("non-empty bitmap");
1677 let (target_actor_id, target_bitmap) =
1678 target_by_vnode.remove(&first_vnode).unwrap_or_else(|| {
1679 panic!(
1680 "cannot find matched target actor: {:?} first_vnode {:?}",
1681 source_actor_id, first_vnode,
1682 )
1683 });
1684 assert_eq!(
1685 source_bitmap, target_bitmap,
1686 "bitmap mismatch for source {:?} target {:?} at first_vnode {:?}",
1687 source_actor_id, target_actor_id, first_vnode,
1688 );
1689 (source_actor_id, target_actor_id)
1690 })
1691 .collect();
1692
1693 assert_eq!(
1694 mapping.len(),
1695 target_count,
1696 "no-shuffle should have equal upstream downstream actor count: {} vs {}",
1697 mapping.len(),
1698 target_count,
1699 );
1700
1701 mapping
1702 }
1703 }
1704}
1705
1706pub fn rebuild_fragment_mapping(fragment: &SharedFragmentInfo) -> PbFragmentWorkerSlotMapping {
1707 let fragment_worker_slot_mapping = match fragment.distribution_type {
1708 DistributionType::Single => {
1709 let actor = fragment.actors.values().exactly_one().unwrap();
1710 WorkerSlotMapping::new_single(WorkerSlotId::new(actor.worker_id as _, 0))
1711 }
1712 DistributionType::Hash => {
1713 let actor_bitmaps: HashMap<_, _> = fragment
1714 .actors
1715 .iter()
1716 .map(|(actor_id, actor_info)| {
1717 let vnode_bitmap = actor_info
1718 .vnode_bitmap
1719 .as_ref()
1720 .cloned()
1721 .expect("actor bitmap shouldn't be none in hash fragment");
1722
1723 (*actor_id as hash::ActorId, vnode_bitmap)
1724 })
1725 .collect();
1726
1727 let actor_mapping = ActorMapping::from_bitmaps(&actor_bitmaps);
1728
1729 let actor_locations = fragment
1730 .actors
1731 .iter()
1732 .map(|(actor_id, actor_info)| (*actor_id as hash::ActorId, actor_info.worker_id))
1733 .collect();
1734
1735 actor_mapping.to_worker_slot(&actor_locations)
1736 }
1737 };
1738
1739 PbFragmentWorkerSlotMapping {
1740 fragment_id: fragment.fragment_id,
1741 mapping: Some(fragment_worker_slot_mapping.to_protobuf()),
1742 }
1743}
1744
1745pub async fn get_fragments_for_jobs<C>(
1751 db: &C,
1752 streaming_jobs: Vec<JobId>,
1753) -> MetaResult<(
1754 HashMap<SourceId, BTreeSet<FragmentId>>,
1755 HashSet<FragmentId>,
1756 HashSet<FragmentId>,
1757)>
1758where
1759 C: ConnectionTrait,
1760{
1761 if streaming_jobs.is_empty() {
1762 return Ok((HashMap::default(), HashSet::default(), HashSet::default()));
1763 }
1764
1765 let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find()
1766 .select_only()
1767 .columns([
1768 fragment::Column::FragmentId,
1769 fragment::Column::FragmentTypeMask,
1770 fragment::Column::StreamNode,
1771 ])
1772 .filter(fragment::Column::JobId.is_in(streaming_jobs))
1773 .into_tuple()
1774 .all(db)
1775 .await?;
1776
1777 let fragment_ids: HashSet<_> = fragments
1778 .iter()
1779 .map(|(fragment_id, _, _)| *fragment_id)
1780 .collect();
1781
1782 let mut source_fragment_ids: HashMap<SourceId, BTreeSet<FragmentId>> = HashMap::new();
1783 let mut sink_fragment_ids: HashSet<FragmentId> = HashSet::new();
1784 for (fragment_id, mask, stream_node) in fragments {
1785 if FragmentTypeMask::from(mask).contains(FragmentTypeFlag::Source)
1786 && let Some(source_id) = stream_node.to_protobuf().find_stream_source()
1787 {
1788 source_fragment_ids
1789 .entry(source_id)
1790 .or_default()
1791 .insert(fragment_id);
1792 }
1793 if FragmentTypeMask::from(mask).contains(FragmentTypeFlag::Sink) {
1794 sink_fragment_ids.insert(fragment_id);
1795 }
1796 }
1797
1798 Ok((source_fragment_ids, sink_fragment_ids, fragment_ids))
1799}
1800
1801pub(crate) fn build_object_group_for_delete(
1806 partial_objects: Vec<PartialObject>,
1807) -> NotificationInfo {
1808 let mut objects = vec![];
1809 for obj in partial_objects {
1810 match obj.obj_type {
1811 ObjectType::Database => objects.push(PbObject {
1812 object_info: Some(PbObjectInfo::Database(PbDatabase {
1813 id: obj.oid.as_database_id(),
1814 ..Default::default()
1815 })),
1816 }),
1817 ObjectType::Schema => objects.push(PbObject {
1818 object_info: Some(PbObjectInfo::Schema(PbSchema {
1819 id: obj.oid.as_schema_id(),
1820 database_id: obj.database_id.unwrap(),
1821 ..Default::default()
1822 })),
1823 }),
1824 ObjectType::Table => objects.push(PbObject {
1825 object_info: Some(PbObjectInfo::Table(PbTable {
1826 id: obj.oid.as_table_id(),
1827 schema_id: obj.schema_id.unwrap(),
1828 database_id: obj.database_id.unwrap(),
1829 ..Default::default()
1830 })),
1831 }),
1832 ObjectType::Source => objects.push(PbObject {
1833 object_info: Some(PbObjectInfo::Source(PbSource {
1834 id: obj.oid.as_source_id(),
1835 schema_id: obj.schema_id.unwrap(),
1836 database_id: obj.database_id.unwrap(),
1837 ..Default::default()
1838 })),
1839 }),
1840 ObjectType::Sink => objects.push(PbObject {
1841 object_info: Some(PbObjectInfo::Sink(PbSink {
1842 id: obj.oid.as_sink_id(),
1843 schema_id: obj.schema_id.unwrap(),
1844 database_id: obj.database_id.unwrap(),
1845 ..Default::default()
1846 })),
1847 }),
1848 ObjectType::Subscription => objects.push(PbObject {
1849 object_info: Some(PbObjectInfo::Subscription(PbSubscription {
1850 id: obj.oid.as_subscription_id(),
1851 schema_id: obj.schema_id.unwrap(),
1852 database_id: obj.database_id.unwrap(),
1853 ..Default::default()
1854 })),
1855 }),
1856 ObjectType::View => objects.push(PbObject {
1857 object_info: Some(PbObjectInfo::View(PbView {
1858 id: obj.oid.as_view_id(),
1859 schema_id: obj.schema_id.unwrap(),
1860 database_id: obj.database_id.unwrap(),
1861 ..Default::default()
1862 })),
1863 }),
1864 ObjectType::Index => {
1865 objects.push(PbObject {
1866 object_info: Some(PbObjectInfo::Index(PbIndex {
1867 id: obj.oid.as_index_id(),
1868 schema_id: obj.schema_id.unwrap(),
1869 database_id: obj.database_id.unwrap(),
1870 ..Default::default()
1871 })),
1872 });
1873 objects.push(PbObject {
1874 object_info: Some(PbObjectInfo::Table(PbTable {
1875 id: obj.oid.as_table_id(),
1876 schema_id: obj.schema_id.unwrap(),
1877 database_id: obj.database_id.unwrap(),
1878 ..Default::default()
1879 })),
1880 });
1881 }
1882 ObjectType::Function => objects.push(PbObject {
1883 object_info: Some(PbObjectInfo::Function(PbFunction {
1884 id: obj.oid.as_function_id(),
1885 schema_id: obj.schema_id.unwrap(),
1886 database_id: obj.database_id.unwrap(),
1887 ..Default::default()
1888 })),
1889 }),
1890 ObjectType::Connection => objects.push(PbObject {
1891 object_info: Some(PbObjectInfo::Connection(PbConnection {
1892 id: obj.oid.as_connection_id(),
1893 schema_id: obj.schema_id.unwrap(),
1894 database_id: obj.database_id.unwrap(),
1895 ..Default::default()
1896 })),
1897 }),
1898 ObjectType::Secret => objects.push(PbObject {
1899 object_info: Some(PbObjectInfo::Secret(PbSecret {
1900 id: obj.oid.as_secret_id(),
1901 schema_id: obj.schema_id.unwrap(),
1902 database_id: obj.database_id.unwrap(),
1903 ..Default::default()
1904 })),
1905 }),
1906 }
1907 }
1908 NotificationInfo::ObjectGroup(PbObjectGroup {
1909 objects,
1910 dependencies: vec![],
1911 })
1912}
1913
1914pub fn extract_external_table_name_from_definition(table_definition: &str) -> Option<String> {
1915 let [mut definition]: [_; 1] = Parser::parse_sql(table_definition)
1916 .context("unable to parse table definition")
1917 .inspect_err(|e| {
1918 tracing::error!(
1919 target: "auto_schema_change",
1920 error = %e.as_report(),
1921 "failed to parse table definition")
1922 })
1923 .unwrap()
1924 .try_into()
1925 .unwrap();
1926 if let SqlStatement::CreateTable { cdc_table_info, .. } = &mut definition {
1927 cdc_table_info
1928 .clone()
1929 .map(|cdc_table_info| cdc_table_info.external_table_name)
1930 } else {
1931 None
1932 }
1933}
1934
1935pub async fn rename_relation(
1938 txn: &DatabaseTransaction,
1939 object_type: ObjectType,
1940 object_id: ObjectId,
1941 object_name: &str,
1942) -> MetaResult<(Vec<PbObject>, String)> {
1943 use sea_orm::ActiveModelTrait;
1944
1945 use crate::controller::rename::alter_relation_rename;
1946
1947 let mut to_update_relations = vec![];
1948 macro_rules! rename_relation {
1950 ($entity:ident, $table:ident, $identity:ident, $object_id:expr) => {{
1951 let (mut relation, obj) = $entity::find_by_id($object_id)
1952 .find_also_related(Object)
1953 .one(txn)
1954 .await?
1955 .unwrap();
1956 let obj = obj.unwrap();
1957 let old_name = relation.name.clone();
1958 relation.name = object_name.into();
1959 if obj.obj_type != ObjectType::View {
1960 relation.definition = alter_relation_rename(&relation.definition, object_name);
1961 }
1962 let active_model = $table::ActiveModel {
1963 $identity: Set(relation.$identity),
1964 name: Set(object_name.into()),
1965 definition: Set(relation.definition.clone()),
1966 ..Default::default()
1967 };
1968 active_model.update(txn).await?;
1969 let streaming_job = streaming_job::Entity::find_by_id($object_id.as_raw_id())
1970 .one(txn)
1971 .await?;
1972 to_update_relations.push(PbObject {
1973 object_info: Some(PbObjectInfo::$entity(
1974 ObjectModel(relation, obj, streaming_job).into(),
1975 )),
1976 });
1977 old_name
1978 }};
1979 }
1980 let old_name = match object_type {
1982 ObjectType::Table => {
1983 let associated_source_id: Option<SourceId> = Source::find()
1984 .select_only()
1985 .column(source::Column::SourceId)
1986 .filter(source::Column::OptionalAssociatedTableId.eq(object_id))
1987 .into_tuple()
1988 .one(txn)
1989 .await?;
1990 if let Some(source_id) = associated_source_id {
1991 rename_relation!(Source, source, source_id, source_id);
1992 }
1993 rename_relation!(Table, table, table_id, object_id.as_table_id())
1994 }
1995 ObjectType::Source => {
1996 rename_relation!(Source, source, source_id, object_id.as_source_id())
1997 }
1998 ObjectType::Sink => rename_relation!(Sink, sink, sink_id, object_id.as_sink_id()),
1999 ObjectType::Subscription => {
2000 rename_relation!(
2001 Subscription,
2002 subscription,
2003 subscription_id,
2004 object_id.as_subscription_id()
2005 )
2006 }
2007 ObjectType::View => rename_relation!(View, view, view_id, object_id.as_view_id()),
2008 ObjectType::Index => {
2009 let (mut index, obj) = Index::find_by_id(object_id.as_index_id())
2010 .find_also_related(Object)
2011 .one(txn)
2012 .await?
2013 .unwrap();
2014 let streaming_job = streaming_job::Entity::find_by_id(index.index_id.as_job_id())
2015 .one(txn)
2016 .await?;
2017 index.name = object_name.into();
2018 let index_table_id = index.index_table_id;
2019 let old_name = rename_relation!(Table, table, table_id, index_table_id);
2020
2021 let active_model = index::ActiveModel {
2023 index_id: sea_orm::ActiveValue::Set(index.index_id),
2024 name: sea_orm::ActiveValue::Set(object_name.into()),
2025 ..Default::default()
2026 };
2027 active_model.update(txn).await?;
2028 to_update_relations.push(PbObject {
2029 object_info: Some(PbObjectInfo::Index(
2030 ObjectModel(index, obj.unwrap(), streaming_job).into(),
2031 )),
2032 });
2033 old_name
2034 }
2035 _ => unreachable!("only relation name can be altered."),
2036 };
2037
2038 Ok((to_update_relations, old_name))
2039}
2040
2041pub async fn get_database_resource_group<C>(txn: &C, database_id: DatabaseId) -> MetaResult<String>
2042where
2043 C: ConnectionTrait,
2044{
2045 let database_resource_group: Option<String> = Database::find_by_id(database_id)
2046 .select_only()
2047 .column(database::Column::ResourceGroup)
2048 .into_tuple()
2049 .one(txn)
2050 .await?
2051 .ok_or_else(|| MetaError::catalog_id_not_found("database", database_id))?;
2052
2053 Ok(database_resource_group.unwrap_or_else(|| DEFAULT_RESOURCE_GROUP.to_owned()))
2054}
2055
2056pub async fn get_existing_job_resource_group<C>(
2057 txn: &C,
2058 streaming_job_id: JobId,
2059) -> MetaResult<String>
2060where
2061 C: ConnectionTrait,
2062{
2063 let (job_specific_resource_group, database_resource_group): (Option<String>, Option<String>) =
2064 StreamingJob::find_by_id(streaming_job_id)
2065 .select_only()
2066 .join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
2067 .join(JoinType::InnerJoin, object::Relation::Database2.def())
2068 .column(streaming_job::Column::SpecificResourceGroup)
2069 .column(database::Column::ResourceGroup)
2070 .into_tuple()
2071 .one(txn)
2072 .await?
2073 .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", streaming_job_id))?;
2074
2075 Ok(job_specific_resource_group.unwrap_or_else(|| {
2076 database_resource_group.unwrap_or_else(|| DEFAULT_RESOURCE_GROUP.to_owned())
2077 }))
2078}
2079
2080pub fn filter_workers_by_resource_group(
2081 workers: &HashMap<WorkerId, WorkerNode>,
2082 resource_group: &str,
2083) -> BTreeSet<WorkerId> {
2084 workers
2085 .iter()
2086 .filter(|&(_, worker)| {
2087 worker
2088 .resource_group()
2089 .map(|node_label| node_label.as_str() == resource_group)
2090 .unwrap_or(false)
2091 })
2092 .map(|(id, _)| *id)
2093 .collect()
2094}
2095
2096pub async fn rename_relation_refer(
2099 txn: &DatabaseTransaction,
2100 object_type: ObjectType,
2101 object_id: ObjectId,
2102 object_name: &str,
2103 old_name: &str,
2104) -> MetaResult<Vec<PbObject>> {
2105 use sea_orm::ActiveModelTrait;
2106
2107 use crate::controller::rename::alter_relation_rename_refs;
2108
2109 let mut to_update_relations = vec![];
2110 macro_rules! rename_relation_ref {
2111 ($entity:ident, $table:ident, $identity:ident, $object_id:expr) => {{
2112 let (mut relation, obj) = $entity::find_by_id($object_id)
2113 .find_also_related(Object)
2114 .one(txn)
2115 .await?
2116 .unwrap();
2117 relation.definition =
2118 alter_relation_rename_refs(&relation.definition, old_name, object_name);
2119 let active_model = $table::ActiveModel {
2120 $identity: Set(relation.$identity),
2121 definition: Set(relation.definition.clone()),
2122 ..Default::default()
2123 };
2124 active_model.update(txn).await?;
2125 let streaming_job = streaming_job::Entity::find_by_id($object_id.as_raw_id())
2126 .one(txn)
2127 .await?;
2128 to_update_relations.push(PbObject {
2129 object_info: Some(PbObjectInfo::$entity(
2130 ObjectModel(relation, obj.unwrap(), streaming_job).into(),
2131 )),
2132 });
2133 }};
2134 }
2135 let mut objs = get_referring_objects(object_id, txn).await?;
2136 if object_type == ObjectType::Table {
2137 let incoming_sinks: Vec<SinkId> = Sink::find()
2138 .select_only()
2139 .column(sink::Column::SinkId)
2140 .filter(sink::Column::TargetTable.eq(object_id))
2141 .into_tuple()
2142 .all(txn)
2143 .await?;
2144
2145 objs.extend(incoming_sinks.into_iter().map(|id| PartialObject {
2146 oid: id.as_object_id(),
2147 obj_type: ObjectType::Sink,
2148 schema_id: None,
2149 database_id: None,
2150 }));
2151 }
2152
2153 for obj in objs {
2154 match obj.obj_type {
2155 ObjectType::Table => {
2156 rename_relation_ref!(Table, table, table_id, obj.oid.as_table_id())
2157 }
2158 ObjectType::Sink => {
2159 rename_relation_ref!(Sink, sink, sink_id, obj.oid.as_sink_id())
2160 }
2161 ObjectType::Subscription => {
2162 rename_relation_ref!(
2163 Subscription,
2164 subscription,
2165 subscription_id,
2166 obj.oid.as_subscription_id()
2167 )
2168 }
2169 ObjectType::View => rename_relation_ref!(View, view, view_id, obj.oid.as_view_id()),
2170 ObjectType::Index => {
2171 let index_table_id: Option<TableId> = Index::find_by_id(obj.oid.as_index_id())
2172 .select_only()
2173 .column(index::Column::IndexTableId)
2174 .into_tuple()
2175 .one(txn)
2176 .await?;
2177 rename_relation_ref!(Table, table, table_id, index_table_id.unwrap());
2178 }
2179 _ => {
2180 bail!(
2181 "only the table, sink, subscription, view and index will depend on other objects."
2182 )
2183 }
2184 }
2185 }
2186
2187 Ok(to_update_relations)
2188}
2189
2190pub async fn validate_subscription_deletion<C>(
2194 txn: &C,
2195 subscription_id: SubscriptionId,
2196) -> MetaResult<()>
2197where
2198 C: ConnectionTrait,
2199{
2200 let upstream_table_id: ObjectId = Subscription::find_by_id(subscription_id)
2201 .select_only()
2202 .column(subscription::Column::DependentTableId)
2203 .into_tuple()
2204 .one(txn)
2205 .await?
2206 .ok_or_else(|| MetaError::catalog_id_not_found("subscription", subscription_id))?;
2207
2208 let cnt = Subscription::find()
2209 .filter(subscription::Column::DependentTableId.eq(upstream_table_id))
2210 .count(txn)
2211 .await?;
2212 if cnt > 1 {
2213 return Ok(());
2216 }
2217
2218 let obj_alias = Alias::new("o1");
2220 let used_by_alias = Alias::new("o2");
2221 let count = ObjectDependency::find()
2222 .join_as(
2223 JoinType::InnerJoin,
2224 object_dependency::Relation::Object2.def(),
2225 obj_alias.clone(),
2226 )
2227 .join_as(
2228 JoinType::InnerJoin,
2229 object_dependency::Relation::Object1.def(),
2230 used_by_alias.clone(),
2231 )
2232 .filter(
2233 object_dependency::Column::Oid
2234 .eq(upstream_table_id)
2235 .and(object_dependency::Column::UsedBy.ne(subscription_id))
2236 .and(
2237 Expr::col((obj_alias, object::Column::DatabaseId))
2238 .ne(Expr::col((used_by_alias, object::Column::DatabaseId))),
2239 ),
2240 )
2241 .count(txn)
2242 .await?;
2243
2244 if count != 0 {
2245 return Err(MetaError::permission_denied(format!(
2246 "Referenced by {} cross-db objects.",
2247 count
2248 )));
2249 }
2250
2251 Ok(())
2252}
2253
2254pub async fn fetch_target_fragments<C>(
2255 txn: &C,
2256 src_fragment_id: impl IntoIterator<Item = FragmentId>,
2257) -> MetaResult<HashMap<FragmentId, Vec<FragmentId>>>
2258where
2259 C: ConnectionTrait,
2260{
2261 let source_target_fragments: Vec<(FragmentId, FragmentId)> = FragmentRelation::find()
2262 .select_only()
2263 .columns([
2264 fragment_relation::Column::SourceFragmentId,
2265 fragment_relation::Column::TargetFragmentId,
2266 ])
2267 .filter(fragment_relation::Column::SourceFragmentId.is_in(src_fragment_id))
2268 .into_tuple()
2269 .all(txn)
2270 .await?;
2271
2272 let source_target_fragments = source_target_fragments.into_iter().into_group_map();
2273
2274 Ok(source_target_fragments)
2275}
2276
2277pub async fn get_sink_fragment_by_ids<C>(
2278 txn: &C,
2279 sink_ids: Vec<SinkId>,
2280) -> MetaResult<HashMap<SinkId, FragmentId>>
2281where
2282 C: ConnectionTrait,
2283{
2284 let sink_num = sink_ids.len();
2285 let sink_fragment_ids: Vec<(SinkId, FragmentId)> = Fragment::find()
2286 .select_only()
2287 .columns([fragment::Column::JobId, fragment::Column::FragmentId])
2288 .filter(
2289 fragment::Column::JobId
2290 .is_in(sink_ids)
2291 .and(FragmentTypeMask::intersects(FragmentTypeFlag::Sink)),
2292 )
2293 .into_tuple()
2294 .all(txn)
2295 .await?;
2296
2297 if sink_fragment_ids.len() != sink_num {
2298 return Err(anyhow::anyhow!(
2299 "expected exactly one sink fragment for each sink, but got {} fragments for {} sinks",
2300 sink_fragment_ids.len(),
2301 sink_num
2302 )
2303 .into());
2304 }
2305
2306 Ok(sink_fragment_ids.into_iter().collect())
2307}
2308
2309pub async fn has_table_been_migrated<C>(txn: &C, table_id: TableId) -> MetaResult<bool>
2310where
2311 C: ConnectionTrait,
2312{
2313 let mview_fragment: Vec<i32> = Fragment::find()
2314 .select_only()
2315 .column(fragment::Column::FragmentTypeMask)
2316 .filter(
2317 fragment::Column::JobId
2318 .eq(table_id)
2319 .and(FragmentTypeMask::intersects(FragmentTypeFlag::Mview)),
2320 )
2321 .into_tuple()
2322 .all(txn)
2323 .await?;
2324
2325 let mview_fragment_len = mview_fragment.len();
2326 if mview_fragment_len != 1 {
2327 bail!(
2328 "expected exactly one mview fragment for table {}, found {}",
2329 table_id,
2330 mview_fragment_len
2331 );
2332 }
2333
2334 let mview_fragment = mview_fragment.into_iter().next().unwrap();
2335 let migrated =
2336 FragmentTypeMask::from(mview_fragment).contains(FragmentTypeFlag::UpstreamSinkUnion);
2337
2338 Ok(migrated)
2339}
2340
2341pub async fn try_get_iceberg_table_by_downstream_sink<C>(
2342 txn: &C,
2343 sink_id: SinkId,
2344) -> MetaResult<Option<TableId>>
2345where
2346 C: ConnectionTrait,
2347{
2348 let sink = Sink::find_by_id(sink_id).one(txn).await?;
2349 let Some(sink) = sink else {
2350 return Ok(None);
2351 };
2352
2353 if sink.name.starts_with(ICEBERG_SINK_PREFIX) {
2354 let object_ids: Vec<ObjectId> = ObjectDependency::find()
2355 .select_only()
2356 .column(object_dependency::Column::Oid)
2357 .filter(object_dependency::Column::UsedBy.eq(sink_id))
2358 .into_tuple()
2359 .all(txn)
2360 .await?;
2361 let mut iceberg_table_ids = vec![];
2362 for object_id in object_ids {
2363 let table_id = object_id.as_table_id();
2364 if let Some(table_engine) = Table::find_by_id(table_id)
2365 .select_only()
2366 .column(table::Column::Engine)
2367 .into_tuple::<table::Engine>()
2368 .one(txn)
2369 .await?
2370 && table_engine == table::Engine::Iceberg
2371 {
2372 iceberg_table_ids.push(table_id);
2373 }
2374 }
2375 if iceberg_table_ids.len() == 1 {
2376 return Ok(Some(iceberg_table_ids[0]));
2377 }
2378 }
2379 Ok(None)
2380}
2381
2382pub async fn check_if_belongs_to_iceberg_table<C>(txn: &C, job_id: JobId) -> MetaResult<bool>
2383where
2384 C: ConnectionTrait,
2385{
2386 if let Some(engine) = Table::find_by_id(job_id.as_mv_table_id())
2387 .select_only()
2388 .column(table::Column::Engine)
2389 .into_tuple::<table::Engine>()
2390 .one(txn)
2391 .await?
2392 && engine == table::Engine::Iceberg
2393 {
2394 return Ok(true);
2395 }
2396 if let Some(sink_name) = Sink::find_by_id(job_id.as_sink_id())
2397 .select_only()
2398 .column(sink::Column::Name)
2399 .into_tuple::<String>()
2400 .one(txn)
2401 .await?
2402 && sink_name.starts_with(ICEBERG_SINK_PREFIX)
2403 {
2404 return Ok(true);
2405 }
2406 Ok(false)
2407}
2408
2409pub async fn find_dirty_iceberg_table_jobs<C>(
2410 txn: &C,
2411 database_id: Option<DatabaseId>,
2412) -> MetaResult<Vec<PartialObject>>
2413where
2414 C: ConnectionTrait,
2415{
2416 let mut filter_condition = streaming_job::Column::JobStatus
2417 .ne(JobStatus::Created)
2418 .and(object::Column::ObjType.is_in([ObjectType::Table, ObjectType::Sink]))
2419 .and(streaming_job::Column::CreateType.eq(CreateType::Background));
2420 if let Some(database_id) = database_id {
2421 filter_condition = filter_condition.and(object::Column::DatabaseId.eq(database_id));
2422 }
2423 let creating_table_sink_jobs: Vec<PartialObject> = StreamingJob::find()
2424 .select_only()
2425 .columns([
2426 object::Column::Oid,
2427 object::Column::ObjType,
2428 object::Column::SchemaId,
2429 object::Column::DatabaseId,
2430 ])
2431 .join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
2432 .filter(filter_condition)
2433 .into_partial_model()
2434 .all(txn)
2435 .await?;
2436
2437 let mut dirty_iceberg_table_jobs = vec![];
2438 for job in creating_table_sink_jobs {
2439 if check_if_belongs_to_iceberg_table(txn, job.oid.as_job_id()).await? {
2440 tracing::info!("Found dirty iceberg job with id: {}", job.oid);
2441 dirty_iceberg_table_jobs.push(job);
2442 }
2443 }
2444
2445 Ok(dirty_iceberg_table_jobs)
2446}
2447
2448pub fn build_select_node_list(
2449 from: &[ColumnCatalog],
2450 to: &[ColumnCatalog],
2451) -> MetaResult<Vec<PbExprNode>> {
2452 let mut exprs = Vec::with_capacity(to.len());
2453 let idx_by_col_id = from
2454 .iter()
2455 .enumerate()
2456 .map(|(idx, col)| (col.column_desc.as_ref().unwrap().column_id, idx))
2457 .collect::<HashMap<_, _>>();
2458
2459 for to_col in to {
2460 let to_col = to_col.column_desc.as_ref().unwrap();
2461 let to_col_type_ref = to_col.column_type.as_ref().unwrap();
2462 let to_col_type = DataType::from(to_col_type_ref);
2463 if let Some(from_idx) = idx_by_col_id.get(&to_col.column_id) {
2464 let from_col_type = DataType::from(
2465 from[*from_idx]
2466 .column_desc
2467 .as_ref()
2468 .unwrap()
2469 .column_type
2470 .as_ref()
2471 .unwrap(),
2472 );
2473 if !to_col_type.equals_datatype(&from_col_type) {
2474 return Err(anyhow!(
2475 "Column type mismatch: {:?} != {:?}",
2476 from_col_type,
2477 to_col_type
2478 )
2479 .into());
2480 }
2481 exprs.push(PbExprNode {
2482 function_type: expr_node::Type::Unspecified.into(),
2483 return_type: Some(to_col_type_ref.clone()),
2484 rex_node: Some(expr_node::RexNode::InputRef(*from_idx as _)),
2485 });
2486 } else {
2487 let to_default_node =
2488 if let Some(GeneratedOrDefaultColumn::DefaultColumn(DefaultColumnDesc {
2489 expr,
2490 ..
2491 })) = &to_col.generated_or_default_column
2492 {
2493 expr.clone().unwrap()
2494 } else {
2495 let null = Datum::None.to_protobuf();
2496 PbExprNode {
2497 function_type: expr_node::Type::Unspecified.into(),
2498 return_type: Some(to_col_type_ref.clone()),
2499 rex_node: Some(expr_node::RexNode::Constant(null)),
2500 }
2501 };
2502 exprs.push(to_default_node);
2503 }
2504 }
2505
2506 Ok(exprs)
2507}
2508
2509#[derive(Clone, Debug, Default)]
2510pub struct StreamingJobExtraInfo {
2511 pub timezone: Option<String>,
2512 pub config_override: Arc<str>,
2513 pub adaptive_parallelism_strategy: Option<AdaptiveParallelismStrategy>,
2514 pub job_definition: String,
2515 pub backfill_orders: Option<BackfillOrders>,
2516}
2517
2518impl StreamingJobExtraInfo {
2519 pub fn stream_context(&self) -> StreamContext {
2520 StreamContext {
2521 timezone: self.timezone.clone(),
2522 config_override: self.config_override.clone(),
2523 adaptive_parallelism_strategy: self.adaptive_parallelism_strategy,
2524 }
2525 }
2526}
2527
2528type StreamingJobExtraInfoRow = (
2530 JobId,
2531 Option<String>,
2532 Option<String>,
2533 Option<String>,
2534 Option<BackfillOrders>,
2535);
2536
2537pub async fn get_streaming_job_extra_info<C>(
2538 txn: &C,
2539 job_ids: Vec<JobId>,
2540) -> MetaResult<HashMap<JobId, StreamingJobExtraInfo>>
2541where
2542 C: ConnectionTrait,
2543{
2544 let pairs: Vec<StreamingJobExtraInfoRow> = StreamingJob::find()
2545 .select_only()
2546 .columns([
2547 streaming_job::Column::JobId,
2548 streaming_job::Column::Timezone,
2549 streaming_job::Column::ConfigOverride,
2550 streaming_job::Column::AdaptiveParallelismStrategy,
2551 streaming_job::Column::BackfillOrders,
2552 ])
2553 .filter(streaming_job::Column::JobId.is_in(job_ids.clone()))
2554 .into_tuple()
2555 .all(txn)
2556 .await?;
2557
2558 let job_ids = job_ids.into_iter().collect();
2559
2560 let mut definitions = resolve_streaming_job_definition(txn, &job_ids).await?;
2561
2562 let result = pairs
2563 .into_iter()
2564 .map(
2565 |(job_id, timezone, config_override, strategy, backfill_orders)| {
2566 let job_definition = definitions.remove(&job_id).unwrap_or_default();
2567 let adaptive_parallelism_strategy = strategy.as_deref().map(|s| {
2568 parse_strategy(s).expect("strategy should be validated before storing")
2569 });
2570 (
2571 job_id,
2572 StreamingJobExtraInfo {
2573 timezone,
2574 config_override: config_override.unwrap_or_default().into(),
2575 adaptive_parallelism_strategy,
2576 job_definition,
2577 backfill_orders,
2578 },
2579 )
2580 },
2581 )
2582 .collect();
2583
2584 Ok(result)
2585}
2586
2587#[cfg(test)]
2588mod tests {
2589 use super::*;
2590
2591 #[test]
2592 fn test_extract_cdc_table_name() {
2593 let ddl1 = "CREATE TABLE t1 () FROM pg_source TABLE 'public.t1'";
2594 let ddl2 = "CREATE TABLE t2 (v1 int) FROM pg_source TABLE 'mydb.t2'";
2595 assert_eq!(
2596 extract_external_table_name_from_definition(ddl1),
2597 Some("public.t1".into())
2598 );
2599 assert_eq!(
2600 extract_external_table_name_from_definition(ddl2),
2601 Some("mydb.t2".into())
2602 );
2603 }
2604}