1use std::collections::{BTreeSet, HashMap, HashSet};
16use std::sync::Arc;
17
18use anyhow::{Context, anyhow};
19use itertools::Itertools;
20use risingwave_common::bitmap::Bitmap;
21use risingwave_common::catalog::{
22 FragmentTypeFlag, FragmentTypeMask, ICEBERG_SINK_PREFIX, ICEBERG_SOURCE_PREFIX,
23};
24use risingwave_common::hash::{ActorMapping, VnodeBitmapExt, WorkerSlotId, WorkerSlotMapping};
25use risingwave_common::id::{JobId, SubscriptionId};
26use risingwave_common::types::{DataType, Datum};
27use risingwave_common::util::value_encoding::DatumToProtoExt;
28use risingwave_common::util::worker_util::DEFAULT_RESOURCE_GROUP;
29use risingwave_common::{bail, hash};
30use risingwave_meta_model::fragment::DistributionType;
31use risingwave_meta_model::object::ObjectType;
32use risingwave_meta_model::prelude::*;
33use risingwave_meta_model::streaming_job::BackfillOrders;
34use risingwave_meta_model::table::TableType;
35use risingwave_meta_model::user_privilege::Action;
36use risingwave_meta_model::{
37 ActorId, ColumnCatalogArray, CreateType, DataTypeArray, DatabaseId, DispatcherType, FragmentId,
38 JobStatus, ObjectId, PrivilegeId, SchemaId, SinkId, SourceId, StreamNode, StreamSourceInfo,
39 TableId, TableIdArray, UserId, WorkerId, connection, database, fragment, fragment_relation,
40 function, index, object, object_dependency, schema, secret, sink, source, streaming_job,
41 subscription, table, user, user_default_privilege, user_privilege, view,
42};
43use risingwave_meta_model_migration::WithQuery;
44use risingwave_pb::catalog::{
45 PbConnection, PbDatabase, PbFunction, PbIndex, PbSchema, PbSecret, PbSink, PbSource,
46 PbSubscription, PbTable, PbView,
47};
48use risingwave_pb::common::{PbObjectType, WorkerNode};
49use risingwave_pb::expr::{PbExprNode, expr_node};
50use risingwave_pb::meta::object::PbObjectInfo;
51use risingwave_pb::meta::subscribe_response::Info as NotificationInfo;
52use risingwave_pb::meta::{
53 ObjectDependency as PbObjectDependency, PbFragmentWorkerSlotMapping, PbObject, PbObjectGroup,
54};
55use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn;
56use risingwave_pb::plan_common::{ColumnCatalog, DefaultColumnDesc};
57use risingwave_pb::stream_plan::{PbDispatchOutputMapping, PbDispatcher, PbDispatcherType};
58use risingwave_pb::user::grant_privilege::{PbActionWithGrantOption, PbObject as PbGrantObject};
59use risingwave_pb::user::{PbAction, PbGrantPrivilege, PbUserInfo};
60use risingwave_sqlparser::ast::Statement as SqlStatement;
61use risingwave_sqlparser::parser::Parser;
62use sea_orm::sea_query::{
63 Alias, CommonTableExpression, Expr, Query, QueryStatementBuilder, SelectStatement, UnionType,
64 WithClause,
65};
66use sea_orm::{
67 ColumnTrait, ConnectionTrait, DatabaseTransaction, DerivePartialModel, EntityTrait,
68 FromQueryResult, IntoActiveModel, JoinType, Order, PaginatorTrait, QueryFilter, QuerySelect,
69 RelationTrait, Set, Statement,
70};
71use thiserror_ext::AsReport;
72use tracing::warn;
73
74use crate::barrier::SharedFragmentInfo;
75use crate::controller::ObjectModel;
76use crate::controller::fragment::FragmentTypeMaskExt;
77use crate::controller::scale::resolve_streaming_job_definition;
78use crate::model::{FragmentDownstreamRelation, StreamContext};
79use crate::{MetaError, MetaResult};
80
81pub fn construct_obj_dependency_query(obj_id: ObjectId) -> WithQuery {
106 let cte_alias = Alias::new("used_by_object_ids");
107 let cte_return_alias = Alias::new("used_by");
108
109 let mut base_query = SelectStatement::new()
110 .column(object_dependency::Column::UsedBy)
111 .from(ObjectDependency)
112 .and_where(object_dependency::Column::Oid.eq(obj_id))
113 .to_owned();
114
115 let belonged_obj_query = SelectStatement::new()
116 .column(object::Column::Oid)
117 .from(Object)
118 .and_where(
119 object::Column::DatabaseId
120 .eq(obj_id)
121 .or(object::Column::SchemaId.eq(obj_id)),
122 )
123 .to_owned();
124
125 let cte_referencing = Query::select()
126 .column((ObjectDependency, object_dependency::Column::UsedBy))
127 .from(ObjectDependency)
128 .inner_join(
129 cte_alias.clone(),
130 Expr::col((cte_alias.clone(), cte_return_alias.clone()))
131 .equals(object_dependency::Column::Oid),
132 )
133 .to_owned();
134
135 let mut common_table_expr = CommonTableExpression::new();
136 common_table_expr
137 .query(
138 base_query
139 .union(UnionType::All, belonged_obj_query)
140 .union(UnionType::All, cte_referencing)
141 .to_owned(),
142 )
143 .column(cte_return_alias.clone())
144 .table_name(cte_alias.clone());
145
146 SelectStatement::new()
147 .distinct()
148 .columns([
149 object::Column::Oid,
150 object::Column::ObjType,
151 object::Column::SchemaId,
152 object::Column::DatabaseId,
153 ])
154 .from(cte_alias.clone())
155 .inner_join(
156 Object,
157 Expr::col((cte_alias, cte_return_alias)).equals(object::Column::Oid),
158 )
159 .order_by(object::Column::Oid, Order::Desc)
160 .to_owned()
161 .with(
162 WithClause::new()
163 .recursive(true)
164 .cte(common_table_expr)
165 .to_owned(),
166 )
167}
168
169fn to_pb_object_type(obj_type: ObjectType) -> PbObjectType {
170 match obj_type {
171 ObjectType::Database => PbObjectType::Database,
172 ObjectType::Schema => PbObjectType::Schema,
173 ObjectType::Table => PbObjectType::Table,
174 ObjectType::Source => PbObjectType::Source,
175 ObjectType::Sink => PbObjectType::Sink,
176 ObjectType::View => PbObjectType::View,
177 ObjectType::Index => PbObjectType::Index,
178 ObjectType::Function => PbObjectType::Function,
179 ObjectType::Connection => PbObjectType::Connection,
180 ObjectType::Subscription => PbObjectType::Subscription,
181 ObjectType::Secret => PbObjectType::Secret,
182 }
183}
184
185async fn list_object_dependencies_impl(
187 txn: &DatabaseTransaction,
188 object_id: Option<ObjectId>,
189 include_creating: bool,
190) -> MetaResult<Vec<PbObjectDependency>> {
191 let referenced_alias = Alias::new("referenced_obj");
192 let mut query = ObjectDependency::find()
193 .select_only()
194 .columns([
195 object_dependency::Column::Oid,
196 object_dependency::Column::UsedBy,
197 ])
198 .column_as(
199 Expr::col((referenced_alias.clone(), object::Column::ObjType)),
200 "referenced_obj_type",
201 )
202 .join(
203 JoinType::InnerJoin,
204 object_dependency::Relation::Object1.def(),
205 )
206 .join_as(
207 JoinType::InnerJoin,
208 object_dependency::Relation::Object2.def(),
209 referenced_alias.clone(),
210 );
211 if let Some(object_id) = object_id {
212 query = query.filter(object_dependency::Column::UsedBy.eq(object_id));
213 }
214 let mut obj_dependencies: Vec<PbObjectDependency> = query
215 .into_tuple()
216 .all(txn)
217 .await?
218 .into_iter()
219 .map(|(oid, used_by, referenced_type)| PbObjectDependency {
220 object_id: used_by,
221 referenced_object_id: oid,
222 referenced_object_type: to_pb_object_type(referenced_type) as i32,
223 })
224 .collect();
225
226 let mut sink_query = Sink::find()
227 .select_only()
228 .columns([sink::Column::SinkId, sink::Column::TargetTable])
229 .filter(sink::Column::TargetTable.is_not_null());
230 if let Some(object_id) = object_id {
231 sink_query = sink_query.filter(
232 sink::Column::SinkId
233 .eq(object_id)
234 .or(sink::Column::TargetTable.eq(object_id)),
235 );
236 }
237 let sink_dependencies: Vec<(SinkId, TableId)> = sink_query.into_tuple().all(txn).await?;
238 obj_dependencies.extend(sink_dependencies.into_iter().map(|(sink_id, table_id)| {
239 PbObjectDependency {
240 object_id: table_id.into(),
241 referenced_object_id: sink_id.into(),
242 referenced_object_type: PbObjectType::Sink as i32,
243 }
244 }));
245
246 if !include_creating {
247 let mut streaming_job_ids = obj_dependencies
248 .iter()
249 .map(|dependency| dependency.object_id)
250 .collect_vec();
251 streaming_job_ids.sort_unstable();
252 streaming_job_ids.dedup();
253
254 if !streaming_job_ids.is_empty() {
255 let non_created_jobs: HashSet<JobId> = StreamingJob::find()
256 .select_only()
257 .columns([streaming_job::Column::JobId])
258 .filter(
259 streaming_job::Column::JobId
260 .is_in(streaming_job_ids)
261 .and(streaming_job::Column::JobStatus.ne(JobStatus::Created)),
262 )
263 .into_tuple()
264 .all(txn)
265 .await?
266 .into_iter()
267 .collect();
268
269 if !non_created_jobs.is_empty() {
270 obj_dependencies.retain(|dependency| {
271 !non_created_jobs.contains(&dependency.object_id.as_job_id())
272 });
273 }
274 }
275 }
276
277 Ok(obj_dependencies)
278}
279
280pub async fn list_object_dependencies(
282 txn: &DatabaseTransaction,
283 include_creating: bool,
284) -> MetaResult<Vec<PbObjectDependency>> {
285 list_object_dependencies_impl(txn, None, include_creating).await
286}
287
288pub async fn list_object_dependencies_by_object_id(
290 txn: &DatabaseTransaction,
291 object_id: ObjectId,
292) -> MetaResult<Vec<PbObjectDependency>> {
293 list_object_dependencies_impl(txn, Some(object_id), true).await
294}
295
296pub fn construct_sink_cycle_check_query(
321 target_table: ObjectId,
322 dependent_objects: Vec<ObjectId>,
323) -> WithQuery {
324 let cte_alias = Alias::new("used_by_object_ids_with_sink");
325 let depend_alias = Alias::new("obj_dependency_with_sink");
326
327 let mut base_query = SelectStatement::new()
328 .columns([
329 object_dependency::Column::Oid,
330 object_dependency::Column::UsedBy,
331 ])
332 .from(ObjectDependency)
333 .and_where(object_dependency::Column::Oid.eq(target_table))
334 .to_owned();
335
336 let query_sink_deps = SelectStatement::new()
337 .columns([sink::Column::SinkId, sink::Column::TargetTable])
338 .from(Sink)
339 .and_where(sink::Column::TargetTable.is_not_null())
340 .to_owned();
341
342 let cte_referencing = Query::select()
343 .column((depend_alias.clone(), object_dependency::Column::Oid))
344 .column((depend_alias.clone(), object_dependency::Column::UsedBy))
345 .from_subquery(
346 SelectStatement::new()
347 .columns([
348 object_dependency::Column::Oid,
349 object_dependency::Column::UsedBy,
350 ])
351 .from(ObjectDependency)
352 .union(UnionType::All, query_sink_deps)
353 .to_owned(),
354 depend_alias.clone(),
355 )
356 .inner_join(
357 cte_alias.clone(),
358 Expr::col((cte_alias.clone(), object_dependency::Column::UsedBy))
359 .eq(Expr::col((depend_alias, object_dependency::Column::Oid))),
360 )
361 .and_where(
362 Expr::col((cte_alias.clone(), object_dependency::Column::UsedBy)).ne(Expr::col((
363 cte_alias.clone(),
364 object_dependency::Column::Oid,
365 ))),
366 )
367 .to_owned();
368
369 let mut common_table_expr = CommonTableExpression::new();
370 common_table_expr
371 .query(base_query.union(UnionType::All, cte_referencing).to_owned())
372 .columns([
373 object_dependency::Column::Oid,
374 object_dependency::Column::UsedBy,
375 ])
376 .table_name(cte_alias.clone());
377
378 SelectStatement::new()
379 .expr(Expr::col((cte_alias.clone(), object_dependency::Column::UsedBy)).count())
380 .from(cte_alias.clone())
381 .and_where(
382 Expr::col((cte_alias, object_dependency::Column::UsedBy)).is_in(dependent_objects),
383 )
384 .to_owned()
385 .with(
386 WithClause::new()
387 .recursive(true)
388 .cte(common_table_expr)
389 .to_owned(),
390 )
391}
392
393#[derive(Clone, DerivePartialModel, FromQueryResult, Debug)]
394#[sea_orm(entity = "Object")]
395pub struct PartialObject {
396 pub oid: ObjectId,
397 pub obj_type: ObjectType,
398 pub schema_id: Option<SchemaId>,
399 pub database_id: Option<DatabaseId>,
400}
401
402#[derive(Clone, DerivePartialModel, FromQueryResult)]
403#[sea_orm(entity = "Fragment")]
404pub struct PartialFragmentStateTables {
405 pub fragment_id: FragmentId,
406 pub job_id: ObjectId,
407 pub state_table_ids: TableIdArray,
408}
409
410#[derive(Clone, Eq, PartialEq, Debug)]
411pub struct PartialActorLocation {
412 pub actor_id: ActorId,
413 pub fragment_id: FragmentId,
414 pub worker_id: WorkerId,
415}
416
417#[derive(FromQueryResult, Debug, Eq, PartialEq, Clone)]
418pub struct FragmentDesc {
419 pub fragment_id: FragmentId,
420 pub job_id: JobId,
421 pub fragment_type_mask: i32,
422 pub distribution_type: DistributionType,
423 pub state_table_ids: TableIdArray,
424 pub parallelism: i64,
425 pub vnode_count: i32,
426 pub stream_node: StreamNode,
427 pub parallelism_policy: String,
428}
429
430pub async fn get_referring_objects_cascade<C>(
432 obj_id: ObjectId,
433 db: &C,
434) -> MetaResult<Vec<PartialObject>>
435where
436 C: ConnectionTrait,
437{
438 let query = construct_obj_dependency_query(obj_id);
439 let (sql, values) = query.build_any(&*db.get_database_backend().get_query_builder());
440 let objects = PartialObject::find_by_statement(Statement::from_sql_and_values(
441 db.get_database_backend(),
442 sql,
443 values,
444 ))
445 .all(db)
446 .await?;
447 Ok(objects)
448}
449
450pub async fn check_sink_into_table_cycle<C>(
452 target_table: ObjectId,
453 dependent_objs: Vec<ObjectId>,
454 db: &C,
455) -> MetaResult<bool>
456where
457 C: ConnectionTrait,
458{
459 if dependent_objs.is_empty() {
460 return Ok(false);
461 }
462
463 if dependent_objs.contains(&target_table) {
465 return Ok(true);
466 }
467
468 let query = construct_sink_cycle_check_query(target_table, dependent_objs);
469 let (sql, values) = query.build_any(&*db.get_database_backend().get_query_builder());
470
471 let res = db
472 .query_one(Statement::from_sql_and_values(
473 db.get_database_backend(),
474 sql,
475 values,
476 ))
477 .await?
478 .unwrap();
479
480 let cnt: i64 = res.try_get_by(0)?;
481
482 Ok(cnt != 0)
483}
484
485pub async fn ensure_object_id<C>(
487 object_type: ObjectType,
488 obj_id: impl Into<ObjectId>,
489 db: &C,
490) -> MetaResult<()>
491where
492 C: ConnectionTrait,
493{
494 let obj_id = obj_id.into();
495 let count = Object::find_by_id(obj_id).count(db).await?;
496 if count == 0 {
497 return Err(MetaError::catalog_id_not_found(
498 object_type.as_str(),
499 obj_id,
500 ));
501 }
502 Ok(())
503}
504
505pub async fn ensure_job_not_canceled<C>(job_id: JobId, db: &C) -> MetaResult<()>
506where
507 C: ConnectionTrait,
508{
509 let count = Object::find_by_id(job_id).count(db).await?;
510 if count == 0 {
511 return Err(MetaError::cancelled(format!(
512 "job {} might be cancelled manually or by recovery",
513 job_id
514 )));
515 }
516 Ok(())
517}
518
519pub async fn ensure_user_id<C>(user_id: UserId, db: &C) -> MetaResult<()>
521where
522 C: ConnectionTrait,
523{
524 let count = User::find_by_id(user_id).count(db).await?;
525 if count == 0 {
526 return Err(anyhow!("user {} was concurrently dropped", user_id).into());
527 }
528 Ok(())
529}
530
531pub async fn check_database_name_duplicate<C>(name: &str, db: &C) -> MetaResult<()>
533where
534 C: ConnectionTrait,
535{
536 let count = Database::find()
537 .filter(database::Column::Name.eq(name))
538 .count(db)
539 .await?;
540 if count > 0 {
541 assert_eq!(count, 1);
542 return Err(MetaError::catalog_duplicated("database", name));
543 }
544 Ok(())
545}
546
547pub async fn check_function_signature_duplicate<C>(
549 pb_function: &PbFunction,
550 db: &C,
551) -> MetaResult<()>
552where
553 C: ConnectionTrait,
554{
555 let count = Function::find()
556 .inner_join(Object)
557 .filter(
558 object::Column::DatabaseId
559 .eq(pb_function.database_id)
560 .and(object::Column::SchemaId.eq(pb_function.schema_id))
561 .and(function::Column::Name.eq(&pb_function.name))
562 .and(
563 function::Column::ArgTypes
564 .eq(DataTypeArray::from(pb_function.arg_types.clone())),
565 ),
566 )
567 .count(db)
568 .await?;
569 if count > 0 {
570 assert_eq!(count, 1);
571 return Err(MetaError::catalog_duplicated("function", &pb_function.name));
572 }
573 Ok(())
574}
575
576pub async fn check_connection_name_duplicate<C>(
578 pb_connection: &PbConnection,
579 db: &C,
580) -> MetaResult<()>
581where
582 C: ConnectionTrait,
583{
584 let count = Connection::find()
585 .inner_join(Object)
586 .filter(
587 object::Column::DatabaseId
588 .eq(pb_connection.database_id)
589 .and(object::Column::SchemaId.eq(pb_connection.schema_id))
590 .and(connection::Column::Name.eq(&pb_connection.name)),
591 )
592 .count(db)
593 .await?;
594 if count > 0 {
595 assert_eq!(count, 1);
596 return Err(MetaError::catalog_duplicated(
597 "connection",
598 &pb_connection.name,
599 ));
600 }
601 Ok(())
602}
603
604pub async fn check_secret_name_duplicate<C>(pb_secret: &PbSecret, db: &C) -> MetaResult<()>
605where
606 C: ConnectionTrait,
607{
608 let count = Secret::find()
609 .inner_join(Object)
610 .filter(
611 object::Column::DatabaseId
612 .eq(pb_secret.database_id)
613 .and(object::Column::SchemaId.eq(pb_secret.schema_id))
614 .and(secret::Column::Name.eq(&pb_secret.name)),
615 )
616 .count(db)
617 .await?;
618 if count > 0 {
619 assert_eq!(count, 1);
620 return Err(MetaError::catalog_duplicated("secret", &pb_secret.name));
621 }
622 Ok(())
623}
624
625pub async fn check_subscription_name_duplicate<C>(
626 pb_subscription: &PbSubscription,
627 db: &C,
628) -> MetaResult<()>
629where
630 C: ConnectionTrait,
631{
632 let count = Subscription::find()
633 .inner_join(Object)
634 .filter(
635 object::Column::DatabaseId
636 .eq(pb_subscription.database_id)
637 .and(object::Column::SchemaId.eq(pb_subscription.schema_id))
638 .and(subscription::Column::Name.eq(&pb_subscription.name)),
639 )
640 .count(db)
641 .await?;
642 if count > 0 {
643 assert_eq!(count, 1);
644 return Err(MetaError::catalog_duplicated(
645 "subscription",
646 &pb_subscription.name,
647 ));
648 }
649 Ok(())
650}
651
652pub async fn check_user_name_duplicate<C>(name: &str, db: &C) -> MetaResult<()>
654where
655 C: ConnectionTrait,
656{
657 let count = User::find()
658 .filter(user::Column::Name.eq(name))
659 .count(db)
660 .await?;
661 if count > 0 {
662 assert_eq!(count, 1);
663 return Err(MetaError::catalog_duplicated("user", name));
664 }
665 Ok(())
666}
667
668pub async fn check_relation_name_duplicate<C>(
670 name: &str,
671 database_id: DatabaseId,
672 schema_id: SchemaId,
673 db: &C,
674) -> MetaResult<()>
675where
676 C: ConnectionTrait,
677{
678 macro_rules! check_duplicated {
679 ($obj_type:expr, $entity:ident, $table:ident) => {
680 let object_id = Object::find()
681 .select_only()
682 .column(object::Column::Oid)
683 .inner_join($entity)
684 .filter(
685 object::Column::DatabaseId
686 .eq(Some(database_id))
687 .and(object::Column::SchemaId.eq(Some(schema_id)))
688 .and($table::Column::Name.eq(name)),
689 )
690 .into_tuple::<ObjectId>()
691 .one(db)
692 .await?;
693 if let Some(oid) = object_id {
694 let check_creation = if $obj_type == ObjectType::View {
695 false
696 } else if $obj_type == ObjectType::Source {
697 let source_info = Source::find_by_id(oid.as_source_id())
698 .select_only()
699 .column(source::Column::SourceInfo)
700 .into_tuple::<Option<StreamSourceInfo>>()
701 .one(db)
702 .await?
703 .unwrap();
704 source_info.map_or(false, |info| info.to_protobuf().is_shared())
705 } else {
706 true
707 };
708 let job_id = oid.as_job_id();
709 return if check_creation
710 && !matches!(
711 StreamingJob::find_by_id(job_id)
712 .select_only()
713 .column(streaming_job::Column::JobStatus)
714 .into_tuple::<JobStatus>()
715 .one(db)
716 .await?,
717 Some(JobStatus::Created)
718 ) {
719 Err(MetaError::catalog_under_creation(
720 $obj_type.as_str(),
721 name,
722 job_id,
723 ))
724 } else {
725 Err(MetaError::catalog_duplicated($obj_type.as_str(), name))
726 };
727 }
728 };
729 }
730 check_duplicated!(ObjectType::Table, Table, table);
731 check_duplicated!(ObjectType::Source, Source, source);
732 check_duplicated!(ObjectType::Sink, Sink, sink);
733 check_duplicated!(ObjectType::Index, Index, index);
734 check_duplicated!(ObjectType::View, View, view);
735
736 Ok(())
737}
738
739pub async fn check_schema_name_duplicate<C>(
741 name: &str,
742 database_id: DatabaseId,
743 db: &C,
744) -> MetaResult<()>
745where
746 C: ConnectionTrait,
747{
748 let count = Object::find()
749 .inner_join(Schema)
750 .filter(
751 object::Column::ObjType
752 .eq(ObjectType::Schema)
753 .and(object::Column::DatabaseId.eq(Some(database_id)))
754 .and(schema::Column::Name.eq(name)),
755 )
756 .count(db)
757 .await?;
758 if count != 0 {
759 return Err(MetaError::catalog_duplicated("schema", name));
760 }
761
762 Ok(())
763}
764
765pub async fn check_object_refer_for_drop<C>(
768 object_type: ObjectType,
769 object_id: ObjectId,
770 db: &C,
771) -> MetaResult<()>
772where
773 C: ConnectionTrait,
774{
775 let count = if object_type == ObjectType::Table {
777 ObjectDependency::find()
778 .join(
779 JoinType::InnerJoin,
780 object_dependency::Relation::Object1.def(),
781 )
782 .filter(
783 object_dependency::Column::Oid
784 .eq(object_id)
785 .and(object::Column::ObjType.ne(ObjectType::Index)),
786 )
787 .count(db)
788 .await?
789 } else {
790 ObjectDependency::find()
791 .filter(object_dependency::Column::Oid.eq(object_id))
792 .count(db)
793 .await?
794 };
795 if count != 0 {
796 let referring_objects = get_referring_objects(object_id, db).await?;
798 let referring_objs_map = referring_objects
799 .into_iter()
800 .filter(|o| o.obj_type != ObjectType::Index)
801 .into_group_map_by(|o| o.obj_type);
802 let mut details = vec![];
803 for (obj_type, objs) in referring_objs_map {
804 match obj_type {
805 ObjectType::Table => {
806 let tables: Vec<(String, String)> = Object::find()
807 .join(JoinType::InnerJoin, object::Relation::Table.def())
808 .join(JoinType::InnerJoin, object::Relation::Database2.def())
809 .join(JoinType::InnerJoin, object::Relation::Schema2.def())
810 .select_only()
811 .column(schema::Column::Name)
812 .column(table::Column::Name)
813 .filter(object::Column::Oid.is_in(objs.iter().map(|o| o.oid)))
814 .into_tuple()
815 .all(db)
816 .await?;
817 details.extend(tables.into_iter().map(|(schema_name, table_name)| {
818 format!(
819 "materialized view {}.{} depends on it",
820 schema_name, table_name
821 )
822 }));
823 }
824 ObjectType::Sink => {
825 let sinks: Vec<(String, String)> = Object::find()
826 .join(JoinType::InnerJoin, object::Relation::Sink.def())
827 .join(JoinType::InnerJoin, object::Relation::Database2.def())
828 .join(JoinType::InnerJoin, object::Relation::Schema2.def())
829 .select_only()
830 .column(schema::Column::Name)
831 .column(sink::Column::Name)
832 .filter(object::Column::Oid.is_in(objs.iter().map(|o| o.oid)))
833 .into_tuple()
834 .all(db)
835 .await?;
836 if object_type == ObjectType::Table {
837 let engine = Table::find_by_id(object_id.as_table_id())
838 .select_only()
839 .column(table::Column::Engine)
840 .into_tuple::<table::Engine>()
841 .one(db)
842 .await?;
843 if engine == Some(table::Engine::Iceberg) && sinks.len() == 1 {
844 continue;
845 }
846 }
847 details.extend(sinks.into_iter().map(|(schema_name, sink_name)| {
848 format!("sink {}.{} depends on it", schema_name, sink_name)
849 }));
850 }
851 ObjectType::View => {
852 let views: Vec<(String, String)> = Object::find()
853 .join(JoinType::InnerJoin, object::Relation::View.def())
854 .join(JoinType::InnerJoin, object::Relation::Database2.def())
855 .join(JoinType::InnerJoin, object::Relation::Schema2.def())
856 .select_only()
857 .column(schema::Column::Name)
858 .column(view::Column::Name)
859 .filter(object::Column::Oid.is_in(objs.iter().map(|o| o.oid)))
860 .into_tuple()
861 .all(db)
862 .await?;
863 details.extend(views.into_iter().map(|(schema_name, view_name)| {
864 format!("view {}.{} depends on it", schema_name, view_name)
865 }));
866 }
867 ObjectType::Subscription => {
868 let subscriptions: Vec<(String, String)> = Object::find()
869 .join(JoinType::InnerJoin, object::Relation::Subscription.def())
870 .join(JoinType::InnerJoin, object::Relation::Database2.def())
871 .join(JoinType::InnerJoin, object::Relation::Schema2.def())
872 .select_only()
873 .column(schema::Column::Name)
874 .column(subscription::Column::Name)
875 .filter(object::Column::Oid.is_in(objs.iter().map(|o| o.oid)))
876 .into_tuple()
877 .all(db)
878 .await?;
879 details.extend(subscriptions.into_iter().map(
880 |(schema_name, subscription_name)| {
881 format!(
882 "subscription {}.{} depends on it",
883 schema_name, subscription_name
884 )
885 },
886 ));
887 }
888 ObjectType::Source => {
889 let sources: Vec<(String, String)> = Object::find()
890 .join(JoinType::InnerJoin, object::Relation::Source.def())
891 .join(JoinType::InnerJoin, object::Relation::Database2.def())
892 .join(JoinType::InnerJoin, object::Relation::Schema2.def())
893 .select_only()
894 .column(schema::Column::Name)
895 .column(source::Column::Name)
896 .filter(object::Column::Oid.is_in(objs.iter().map(|o| o.oid)))
897 .into_tuple()
898 .all(db)
899 .await?;
900 details.extend(sources.into_iter().map(|(schema_name, view_name)| {
901 format!("source {}.{} depends on it", schema_name, view_name)
902 }));
903 }
904 ObjectType::Connection => {
905 let connections: Vec<(String, String)> = Object::find()
906 .join(JoinType::InnerJoin, object::Relation::Connection.def())
907 .join(JoinType::InnerJoin, object::Relation::Database2.def())
908 .join(JoinType::InnerJoin, object::Relation::Schema2.def())
909 .select_only()
910 .column(schema::Column::Name)
911 .column(connection::Column::Name)
912 .filter(object::Column::Oid.is_in(objs.iter().map(|o| o.oid)))
913 .into_tuple()
914 .all(db)
915 .await?;
916 details.extend(connections.into_iter().map(|(schema_name, view_name)| {
917 format!("connection {}.{} depends on it", schema_name, view_name)
918 }));
919 }
920 _ => bail!("unexpected referring object type: {}", obj_type.as_str()),
922 }
923 }
924 if details.is_empty() {
925 return Ok(());
926 }
927
928 return Err(MetaError::permission_denied(format!(
929 "{} used by {} other objects. \nDETAIL: {}\n\
930 {}",
931 object_type.as_str(),
932 details.len(),
933 details.join("\n"),
934 match object_type {
935 ObjectType::Function | ObjectType::Connection | ObjectType::Secret =>
936 "HINT: DROP the dependent objects first.",
937 ObjectType::Database | ObjectType::Schema => unreachable!(),
938 _ => "HINT: Use DROP ... CASCADE to drop the dependent objects too.",
939 }
940 )));
941 }
942 Ok(())
943}
944
945pub async fn get_referring_objects<C>(object_id: ObjectId, db: &C) -> MetaResult<Vec<PartialObject>>
947where
948 C: ConnectionTrait,
949{
950 let objs = ObjectDependency::find()
951 .filter(object_dependency::Column::Oid.eq(object_id))
952 .join(
953 JoinType::InnerJoin,
954 object_dependency::Relation::Object1.def(),
955 )
956 .into_partial_model()
957 .all(db)
958 .await?;
959
960 Ok(objs)
961}
962
963pub async fn ensure_schema_empty<C>(schema_id: SchemaId, db: &C) -> MetaResult<()>
965where
966 C: ConnectionTrait,
967{
968 let count = Object::find()
969 .filter(object::Column::SchemaId.eq(Some(schema_id)))
970 .count(db)
971 .await?;
972 if count != 0 {
973 return Err(MetaError::permission_denied("schema is not empty"));
974 }
975
976 Ok(())
977}
978
979pub async fn list_user_info_by_ids<C>(
981 user_ids: impl IntoIterator<Item = UserId>,
982 db: &C,
983) -> MetaResult<Vec<PbUserInfo>>
984where
985 C: ConnectionTrait,
986{
987 let mut user_infos = vec![];
988 for user_id in user_ids {
989 let user = User::find_by_id(user_id)
990 .one(db)
991 .await?
992 .ok_or_else(|| MetaError::catalog_id_not_found("user", user_id))?;
993 let mut user_info: PbUserInfo = user.into();
994 user_info.grant_privileges = get_user_privilege(user_id, db).await?;
995 user_infos.push(user_info);
996 }
997 Ok(user_infos)
998}
999
1000pub async fn get_object_owner<C>(object_id: ObjectId, db: &C) -> MetaResult<UserId>
1002where
1003 C: ConnectionTrait,
1004{
1005 let obj_owner: UserId = Object::find_by_id(object_id)
1006 .select_only()
1007 .column(object::Column::OwnerId)
1008 .into_tuple()
1009 .one(db)
1010 .await?
1011 .ok_or_else(|| MetaError::catalog_id_not_found("object", object_id))?;
1012 Ok(obj_owner)
1013}
1014
1015pub fn construct_privilege_dependency_query(ids: Vec<PrivilegeId>) -> WithQuery {
1040 let cte_alias = Alias::new("granted_privilege_ids");
1041 let cte_return_privilege_alias = Alias::new("id");
1042 let cte_return_user_alias = Alias::new("user_id");
1043
1044 let mut base_query = SelectStatement::new()
1045 .columns([user_privilege::Column::Id, user_privilege::Column::UserId])
1046 .from(UserPrivilege)
1047 .and_where(user_privilege::Column::Id.is_in(ids))
1048 .to_owned();
1049
1050 let cte_referencing = Query::select()
1051 .columns([
1052 (UserPrivilege, user_privilege::Column::Id),
1053 (UserPrivilege, user_privilege::Column::UserId),
1054 ])
1055 .from(UserPrivilege)
1056 .inner_join(
1057 cte_alias.clone(),
1058 Expr::col((cte_alias.clone(), cte_return_privilege_alias.clone()))
1059 .equals(user_privilege::Column::DependentId),
1060 )
1061 .to_owned();
1062
1063 let mut common_table_expr = CommonTableExpression::new();
1064 common_table_expr
1065 .query(base_query.union(UnionType::All, cte_referencing).to_owned())
1066 .columns([
1067 cte_return_privilege_alias.clone(),
1068 cte_return_user_alias.clone(),
1069 ])
1070 .table_name(cte_alias.clone());
1071
1072 SelectStatement::new()
1073 .columns([cte_return_privilege_alias, cte_return_user_alias])
1074 .from(cte_alias)
1075 .to_owned()
1076 .with(
1077 WithClause::new()
1078 .recursive(true)
1079 .cte(common_table_expr)
1080 .to_owned(),
1081 )
1082}
1083
1084pub async fn get_internal_tables_by_id<C>(job_id: JobId, db: &C) -> MetaResult<Vec<TableId>>
1085where
1086 C: ConnectionTrait,
1087{
1088 let table_ids: Vec<TableId> = Table::find()
1089 .select_only()
1090 .column(table::Column::TableId)
1091 .filter(
1092 table::Column::TableType
1093 .eq(TableType::Internal)
1094 .and(table::Column::BelongsToJobId.eq(job_id)),
1095 )
1096 .into_tuple()
1097 .all(db)
1098 .await?;
1099 Ok(table_ids)
1100}
1101
1102pub async fn get_index_state_tables_by_table_id<C>(
1103 table_id: TableId,
1104 db: &C,
1105) -> MetaResult<Vec<TableId>>
1106where
1107 C: ConnectionTrait,
1108{
1109 let mut index_table_ids: Vec<TableId> = Index::find()
1110 .select_only()
1111 .column(index::Column::IndexTableId)
1112 .filter(index::Column::PrimaryTableId.eq(table_id))
1113 .into_tuple()
1114 .all(db)
1115 .await?;
1116
1117 if !index_table_ids.is_empty() {
1118 let internal_table_ids: Vec<TableId> = Table::find()
1119 .select_only()
1120 .column(table::Column::TableId)
1121 .filter(
1122 table::Column::TableType
1123 .eq(TableType::Internal)
1124 .and(table::Column::BelongsToJobId.is_in(index_table_ids.clone())),
1125 )
1126 .into_tuple()
1127 .all(db)
1128 .await?;
1129
1130 index_table_ids.extend(internal_table_ids.into_iter());
1131 }
1132
1133 Ok(index_table_ids)
1134}
1135
1136pub async fn get_iceberg_related_object_ids<C>(
1138 object_id: ObjectId,
1139 db: &C,
1140) -> MetaResult<Vec<ObjectId>>
1141where
1142 C: ConnectionTrait,
1143{
1144 let object = Object::find_by_id(object_id)
1145 .one(db)
1146 .await?
1147 .ok_or_else(|| MetaError::catalog_id_not_found("object", object_id))?;
1148 if object.obj_type != ObjectType::Table {
1149 return Ok(vec![]);
1150 }
1151
1152 let table = Table::find_by_id(object_id.as_table_id())
1153 .one(db)
1154 .await?
1155 .ok_or_else(|| MetaError::catalog_id_not_found("table", object_id))?;
1156 if !matches!(table.engine, Some(table::Engine::Iceberg)) {
1157 return Ok(vec![]);
1158 }
1159
1160 let database_id = object.database_id.unwrap();
1161 let schema_id = object.schema_id.unwrap();
1162
1163 let mut related_objects = vec![];
1164
1165 let iceberg_sink_name = format!("{}{}", ICEBERG_SINK_PREFIX, table.name);
1166 let iceberg_sink_id = Sink::find()
1167 .inner_join(Object)
1168 .select_only()
1169 .column(sink::Column::SinkId)
1170 .filter(
1171 object::Column::DatabaseId
1172 .eq(database_id)
1173 .and(object::Column::SchemaId.eq(schema_id))
1174 .and(sink::Column::Name.eq(&iceberg_sink_name)),
1175 )
1176 .into_tuple::<SinkId>()
1177 .one(db)
1178 .await?;
1179 if let Some(sink_id) = iceberg_sink_id {
1180 related_objects.push(sink_id.as_object_id());
1181 let sink_internal_tables = get_internal_tables_by_id(sink_id.as_job_id(), db).await?;
1182 related_objects.extend(
1183 sink_internal_tables
1184 .into_iter()
1185 .map(|tid| tid.as_object_id()),
1186 );
1187 } else {
1188 warn!(
1189 "iceberg table {} missing sink {}",
1190 table.name, iceberg_sink_name
1191 );
1192 }
1193
1194 let iceberg_source_name = format!("{}{}", ICEBERG_SOURCE_PREFIX, table.name);
1195 let iceberg_source_id = Source::find()
1196 .inner_join(Object)
1197 .select_only()
1198 .column(source::Column::SourceId)
1199 .filter(
1200 object::Column::DatabaseId
1201 .eq(database_id)
1202 .and(object::Column::SchemaId.eq(schema_id))
1203 .and(source::Column::Name.eq(&iceberg_source_name)),
1204 )
1205 .into_tuple::<SourceId>()
1206 .one(db)
1207 .await?;
1208 if let Some(source_id) = iceberg_source_id {
1209 related_objects.push(source_id.as_object_id());
1210 } else {
1211 warn!(
1212 "iceberg table {} missing source {}",
1213 table.name, iceberg_source_name
1214 );
1215 }
1216
1217 Ok(related_objects)
1218}
1219
1220pub(crate) async fn load_streaming_jobs_by_ids<C>(
1222 txn: &C,
1223 job_ids: impl IntoIterator<Item = JobId>,
1224) -> MetaResult<HashMap<JobId, streaming_job::Model>>
1225where
1226 C: ConnectionTrait,
1227{
1228 let job_ids: HashSet<JobId> = job_ids.into_iter().collect();
1229 if job_ids.is_empty() {
1230 return Ok(HashMap::new());
1231 }
1232 let jobs = streaming_job::Entity::find()
1233 .filter(streaming_job::Column::JobId.is_in(job_ids.clone()))
1234 .all(txn)
1235 .await?;
1236 Ok(jobs.into_iter().map(|job| (job.job_id, job)).collect())
1237}
1238
1239#[derive(Clone, DerivePartialModel, FromQueryResult)]
1240#[sea_orm(entity = "UserPrivilege")]
1241pub struct PartialUserPrivilege {
1242 pub id: PrivilegeId,
1243 pub user_id: UserId,
1244}
1245
1246pub async fn get_referring_privileges_cascade<C>(
1247 ids: Vec<PrivilegeId>,
1248 db: &C,
1249) -> MetaResult<Vec<PartialUserPrivilege>>
1250where
1251 C: ConnectionTrait,
1252{
1253 let query = construct_privilege_dependency_query(ids);
1254 let (sql, values) = query.build_any(&*db.get_database_backend().get_query_builder());
1255 let privileges = PartialUserPrivilege::find_by_statement(Statement::from_sql_and_values(
1256 db.get_database_backend(),
1257 sql,
1258 values,
1259 ))
1260 .all(db)
1261 .await?;
1262
1263 Ok(privileges)
1264}
1265
1266pub async fn ensure_privileges_not_referred<C>(ids: Vec<PrivilegeId>, db: &C) -> MetaResult<()>
1268where
1269 C: ConnectionTrait,
1270{
1271 let count = UserPrivilege::find()
1272 .filter(user_privilege::Column::DependentId.is_in(ids))
1273 .count(db)
1274 .await?;
1275 if count != 0 {
1276 return Err(MetaError::permission_denied(format!(
1277 "privileges granted to {} other ones.",
1278 count
1279 )));
1280 }
1281 Ok(())
1282}
1283
1284pub async fn get_user_privilege<C>(user_id: UserId, db: &C) -> MetaResult<Vec<PbGrantPrivilege>>
1286where
1287 C: ConnectionTrait,
1288{
1289 let user_privileges = UserPrivilege::find()
1290 .find_also_related(Object)
1291 .filter(user_privilege::Column::UserId.eq(user_id))
1292 .all(db)
1293 .await?;
1294 Ok(user_privileges
1295 .into_iter()
1296 .map(|(privilege, object)| {
1297 let object = object.unwrap();
1298 let obj = match object.obj_type {
1299 ObjectType::Database => PbGrantObject::DatabaseId(object.oid.as_database_id()),
1300 ObjectType::Schema => PbGrantObject::SchemaId(object.oid.as_schema_id()),
1301 ObjectType::Table | ObjectType::Index => {
1302 PbGrantObject::TableId(object.oid.as_table_id())
1303 }
1304 ObjectType::Source => PbGrantObject::SourceId(object.oid.as_source_id()),
1305 ObjectType::Sink => PbGrantObject::SinkId(object.oid.as_sink_id()),
1306 ObjectType::View => PbGrantObject::ViewId(object.oid.as_view_id()),
1307 ObjectType::Function => PbGrantObject::FunctionId(object.oid.as_function_id()),
1308 ObjectType::Connection => {
1309 PbGrantObject::ConnectionId(object.oid.as_connection_id())
1310 }
1311 ObjectType::Subscription => {
1312 PbGrantObject::SubscriptionId(object.oid.as_subscription_id())
1313 }
1314 ObjectType::Secret => PbGrantObject::SecretId(object.oid.as_secret_id()),
1315 };
1316 PbGrantPrivilege {
1317 action_with_opts: vec![PbActionWithGrantOption {
1318 action: PbAction::from(privilege.action) as _,
1319 with_grant_option: privilege.with_grant_option,
1320 granted_by: privilege.granted_by as _,
1321 }],
1322 object: Some(obj),
1323 }
1324 })
1325 .collect())
1326}
1327
1328pub async fn get_table_columns(
1329 txn: &impl ConnectionTrait,
1330 id: TableId,
1331) -> MetaResult<ColumnCatalogArray> {
1332 let columns = Table::find_by_id(id)
1333 .select_only()
1334 .columns([table::Column::Columns])
1335 .into_tuple::<ColumnCatalogArray>()
1336 .one(txn)
1337 .await?
1338 .ok_or_else(|| MetaError::catalog_id_not_found("table", id))?;
1339 Ok(columns)
1340}
1341
1342pub async fn grant_default_privileges_automatically<C>(
1345 db: &C,
1346 object_id: impl Into<ObjectId>,
1347) -> MetaResult<Vec<PbUserInfo>>
1348where
1349 C: ConnectionTrait,
1350{
1351 let object_id = object_id.into();
1352 let object = Object::find_by_id(object_id)
1353 .one(db)
1354 .await?
1355 .ok_or_else(|| MetaError::catalog_id_not_found("object", object_id))?;
1356 assert_ne!(object.obj_type, ObjectType::Database);
1357
1358 let for_mview_filter = if object.obj_type == ObjectType::Table {
1359 let table_type = Table::find_by_id(object_id.as_table_id())
1360 .select_only()
1361 .column(table::Column::TableType)
1362 .into_tuple::<TableType>()
1363 .one(db)
1364 .await?
1365 .ok_or_else(|| MetaError::catalog_id_not_found("table", object_id))?;
1366 user_default_privilege::Column::ForMaterializedView
1367 .eq(table_type == TableType::MaterializedView)
1368 } else {
1369 user_default_privilege::Column::ForMaterializedView.eq(false)
1370 };
1371 let schema_filter = if let Some(schema_id) = &object.schema_id {
1372 user_default_privilege::Column::SchemaId.eq(*schema_id)
1373 } else {
1374 user_default_privilege::Column::SchemaId.is_null()
1375 };
1376
1377 let default_privileges: Vec<(UserId, UserId, Action, bool)> = UserDefaultPrivilege::find()
1378 .select_only()
1379 .columns([
1380 user_default_privilege::Column::Grantee,
1381 user_default_privilege::Column::GrantedBy,
1382 user_default_privilege::Column::Action,
1383 user_default_privilege::Column::WithGrantOption,
1384 ])
1385 .filter(
1386 user_default_privilege::Column::DatabaseId
1387 .eq(object.database_id.unwrap())
1388 .and(schema_filter)
1389 .and(user_default_privilege::Column::UserId.eq(object.owner_id))
1390 .and(user_default_privilege::Column::ObjectType.eq(object.obj_type))
1391 .and(for_mview_filter),
1392 )
1393 .into_tuple()
1394 .all(db)
1395 .await?;
1396 if default_privileges.is_empty() {
1397 return Ok(vec![]);
1398 }
1399
1400 let updated_user_ids = default_privileges
1401 .iter()
1402 .map(|(grantee, _, _, _)| *grantee)
1403 .collect::<HashSet<_>>();
1404
1405 for (grantee, granted_by, action, with_grant_option) in default_privileges {
1406 UserPrivilege::insert(user_privilege::ActiveModel {
1407 user_id: Set(grantee),
1408 oid: Set(object_id),
1409 granted_by: Set(granted_by),
1410 action: Set(action),
1411 with_grant_option: Set(with_grant_option),
1412 ..Default::default()
1413 })
1414 .exec(db)
1415 .await?;
1416 if action == Action::Select {
1417 let internal_table_ids = get_internal_tables_by_id(object_id.as_job_id(), db).await?;
1419 if !internal_table_ids.is_empty() {
1420 for internal_table_id in &internal_table_ids {
1421 UserPrivilege::insert(user_privilege::ActiveModel {
1422 user_id: Set(grantee),
1423 oid: Set(internal_table_id.as_object_id()),
1424 granted_by: Set(granted_by),
1425 action: Set(Action::Select),
1426 with_grant_option: Set(with_grant_option),
1427 ..Default::default()
1428 })
1429 .exec(db)
1430 .await?;
1431 }
1432 }
1433
1434 let iceberg_privilege_object_ids =
1436 get_iceberg_related_object_ids(object_id, db).await?;
1437 if !iceberg_privilege_object_ids.is_empty() {
1438 for iceberg_object_id in &iceberg_privilege_object_ids {
1439 UserPrivilege::insert(user_privilege::ActiveModel {
1440 user_id: Set(grantee),
1441 oid: Set(*iceberg_object_id),
1442 granted_by: Set(granted_by),
1443 action: Set(action),
1444 with_grant_option: Set(with_grant_option),
1445 ..Default::default()
1446 })
1447 .exec(db)
1448 .await?;
1449 }
1450 }
1451 }
1452 }
1453
1454 let updated_user_infos = list_user_info_by_ids(updated_user_ids, db).await?;
1455 Ok(updated_user_infos)
1456}
1457
1458pub fn extract_grant_obj_id(object: &PbGrantObject) -> ObjectId {
1460 match object {
1461 PbGrantObject::DatabaseId(id) => (*id).into(),
1462 PbGrantObject::SchemaId(id) => (*id).into(),
1463 PbGrantObject::TableId(id) => (*id).into(),
1464 PbGrantObject::SourceId(id) => (*id).into(),
1465 PbGrantObject::SinkId(id) => (*id).into(),
1466 PbGrantObject::ViewId(id) => (*id).into(),
1467 PbGrantObject::FunctionId(id) => (*id).into(),
1468 PbGrantObject::SubscriptionId(id) => (*id).into(),
1469 PbGrantObject::ConnectionId(id) => (*id).into(),
1470 PbGrantObject::SecretId(id) => (*id).into(),
1471 }
1472}
1473
1474pub async fn insert_fragment_relations(
1475 db: &impl ConnectionTrait,
1476 downstream_fragment_relations: &FragmentDownstreamRelation,
1477) -> MetaResult<()> {
1478 let mut relations = vec![];
1479 for (upstream_fragment_id, downstreams) in downstream_fragment_relations {
1480 for downstream in downstreams {
1481 relations.push(
1482 fragment_relation::Model {
1483 source_fragment_id: *upstream_fragment_id as _,
1484 target_fragment_id: downstream.downstream_fragment_id as _,
1485 dispatcher_type: downstream.dispatcher_type,
1486 dist_key_indices: downstream
1487 .dist_key_indices
1488 .iter()
1489 .map(|idx| *idx as i32)
1490 .collect_vec()
1491 .into(),
1492 output_indices: downstream
1493 .output_mapping
1494 .indices
1495 .iter()
1496 .map(|idx| *idx as i32)
1497 .collect_vec()
1498 .into(),
1499 output_type_mapping: Some(downstream.output_mapping.types.clone().into()),
1500 }
1501 .into_active_model(),
1502 );
1503 }
1504 }
1505 if !relations.is_empty() {
1506 FragmentRelation::insert_many(relations).exec(db).await?;
1507 }
1508 Ok(())
1509}
1510
1511pub fn compose_dispatchers(
1512 source_fragment_distribution: DistributionType,
1513 source_fragment_actors: &HashMap<crate::model::ActorId, Option<Bitmap>>,
1514 target_fragment_id: crate::model::FragmentId,
1515 target_fragment_distribution: DistributionType,
1516 target_fragment_actors: &HashMap<crate::model::ActorId, Option<Bitmap>>,
1517 dispatcher_type: DispatcherType,
1518 dist_key_indices: Vec<u32>,
1519 output_mapping: PbDispatchOutputMapping,
1520) -> (
1521 HashMap<crate::model::ActorId, PbDispatcher>,
1522 Option<HashMap<crate::model::ActorId, crate::model::ActorId>>,
1523) {
1524 match dispatcher_type {
1525 DispatcherType::Hash => {
1526 let dispatcher = PbDispatcher {
1527 r#type: PbDispatcherType::from(dispatcher_type) as _,
1528 dist_key_indices,
1529 output_mapping: output_mapping.into(),
1530 hash_mapping: Some(
1531 ActorMapping::from_bitmaps(
1532 &target_fragment_actors
1533 .iter()
1534 .map(|(actor_id, bitmap)| {
1535 (
1536 *actor_id as _,
1537 bitmap
1538 .clone()
1539 .expect("downstream hash dispatch must have distribution"),
1540 )
1541 })
1542 .collect(),
1543 )
1544 .to_protobuf(),
1545 ),
1546 dispatcher_id: target_fragment_id,
1547 downstream_actor_id: target_fragment_actors.keys().copied().collect(),
1548 };
1549 (
1550 source_fragment_actors
1551 .keys()
1552 .map(|source_actor_id| (*source_actor_id, dispatcher.clone()))
1553 .collect(),
1554 None,
1555 )
1556 }
1557 DispatcherType::Broadcast | DispatcherType::Simple => {
1558 let dispatcher = PbDispatcher {
1559 r#type: PbDispatcherType::from(dispatcher_type) as _,
1560 dist_key_indices,
1561 output_mapping: output_mapping.into(),
1562 hash_mapping: None,
1563 dispatcher_id: target_fragment_id,
1564 downstream_actor_id: target_fragment_actors.keys().copied().collect(),
1565 };
1566 (
1567 source_fragment_actors
1568 .keys()
1569 .map(|source_actor_id| (*source_actor_id, dispatcher.clone()))
1570 .collect(),
1571 None,
1572 )
1573 }
1574 DispatcherType::NoShuffle => {
1575 let no_shuffle_map = resolve_no_shuffle_actor_mapping(
1576 source_fragment_distribution,
1577 source_fragment_actors
1578 .iter()
1579 .map(|(&id, bitmap)| (id, bitmap)),
1580 target_fragment_distribution,
1581 target_fragment_actors
1582 .iter()
1583 .map(|(&id, bitmap)| (id, bitmap)),
1584 );
1585 let dispatchers = no_shuffle_map
1586 .iter()
1587 .map(|(&upstream_actor_id, &downstream_actor_id)| {
1588 (
1589 upstream_actor_id,
1590 PbDispatcher {
1591 r#type: PbDispatcherType::NoShuffle as _,
1592 dist_key_indices: dist_key_indices.clone(),
1593 output_mapping: output_mapping.clone().into(),
1594 hash_mapping: None,
1595 dispatcher_id: target_fragment_id,
1596 downstream_actor_id: vec![downstream_actor_id],
1597 },
1598 )
1599 })
1600 .collect();
1601 (dispatchers, Some(no_shuffle_map))
1602 }
1603 }
1604}
1605
1606pub fn resolve_no_shuffle_actor_mapping<
1616 'a,
1617 ActorId: Copy + Eq + std::hash::Hash + std::fmt::Debug,
1618>(
1619 source_fragment_distribution: DistributionType,
1620 source_fragment_actors: impl IntoIterator<Item = (ActorId, &'a Option<Bitmap>)>,
1621 target_fragment_distribution: DistributionType,
1622 target_fragment_actors: impl IntoIterator<Item = (ActorId, &'a Option<Bitmap>)>,
1623) -> HashMap<ActorId, ActorId> {
1624 assert_eq!(source_fragment_distribution, target_fragment_distribution);
1625
1626 match source_fragment_distribution {
1627 DistributionType::Single => {
1628 let assert_singleton = |bitmap: &Option<Bitmap>| {
1629 assert!(
1630 bitmap.as_ref().map(|bitmap| bitmap.all()).unwrap_or(true),
1631 "not singleton: {:?}",
1632 bitmap
1633 );
1634 };
1635 let (source_actor_id, bitmap) = source_fragment_actors
1636 .into_iter()
1637 .exactly_one()
1638 .ok()
1639 .expect("Single distribution should have exactly one source actor");
1640 assert_singleton(bitmap);
1641 let (target_actor_id, bitmap) = target_fragment_actors
1642 .into_iter()
1643 .exactly_one()
1644 .ok()
1645 .expect("Single distribution should have exactly one target actor");
1646 assert_singleton(bitmap);
1647 HashMap::from([(source_actor_id, target_actor_id)])
1648 }
1649 DistributionType::Hash => {
1650 let mut target_by_vnode: HashMap<_, _> = target_fragment_actors
1652 .into_iter()
1653 .map(|(actor_id, bitmap)| {
1654 let bitmap = bitmap
1655 .as_ref()
1656 .expect("hash distribution should have bitmap");
1657 let first_vnode = bitmap.iter_vnodes().next().expect("non-empty bitmap");
1658 (first_vnode, (actor_id, bitmap))
1659 })
1660 .collect();
1661
1662 let target_count = target_by_vnode.len();
1663
1664 let mapping: HashMap<_, _> = source_fragment_actors
1666 .into_iter()
1667 .map(|(source_actor_id, source_bitmap)| {
1668 let source_bitmap = source_bitmap
1669 .as_ref()
1670 .expect("hash distribution should have bitmap");
1671 let first_vnode = source_bitmap
1672 .iter_vnodes()
1673 .next()
1674 .expect("non-empty bitmap");
1675 let (target_actor_id, target_bitmap) =
1676 target_by_vnode.remove(&first_vnode).unwrap_or_else(|| {
1677 panic!(
1678 "cannot find matched target actor: {:?} first_vnode {:?}",
1679 source_actor_id, first_vnode,
1680 )
1681 });
1682 assert_eq!(
1683 source_bitmap, target_bitmap,
1684 "bitmap mismatch for source {:?} target {:?} at first_vnode {:?}",
1685 source_actor_id, target_actor_id, first_vnode,
1686 );
1687 (source_actor_id, target_actor_id)
1688 })
1689 .collect();
1690
1691 assert_eq!(
1692 mapping.len(),
1693 target_count,
1694 "no-shuffle should have equal upstream downstream actor count: {} vs {}",
1695 mapping.len(),
1696 target_count,
1697 );
1698
1699 mapping
1700 }
1701 }
1702}
1703
1704pub fn rebuild_fragment_mapping(fragment: &SharedFragmentInfo) -> PbFragmentWorkerSlotMapping {
1705 let fragment_worker_slot_mapping = match fragment.distribution_type {
1706 DistributionType::Single => {
1707 let actor = fragment.actors.values().exactly_one().unwrap();
1708 WorkerSlotMapping::new_single(WorkerSlotId::new(actor.worker_id as _, 0))
1709 }
1710 DistributionType::Hash => {
1711 let actor_bitmaps: HashMap<_, _> = fragment
1712 .actors
1713 .iter()
1714 .map(|(actor_id, actor_info)| {
1715 let vnode_bitmap = actor_info
1716 .vnode_bitmap
1717 .as_ref()
1718 .cloned()
1719 .expect("actor bitmap shouldn't be none in hash fragment");
1720
1721 (*actor_id as hash::ActorId, vnode_bitmap)
1722 })
1723 .collect();
1724
1725 let actor_mapping = ActorMapping::from_bitmaps(&actor_bitmaps);
1726
1727 let actor_locations = fragment
1728 .actors
1729 .iter()
1730 .map(|(actor_id, actor_info)| (*actor_id as hash::ActorId, actor_info.worker_id))
1731 .collect();
1732
1733 actor_mapping.to_worker_slot(&actor_locations)
1734 }
1735 };
1736
1737 PbFragmentWorkerSlotMapping {
1738 fragment_id: fragment.fragment_id,
1739 mapping: Some(fragment_worker_slot_mapping.to_protobuf()),
1740 }
1741}
1742
1743pub async fn get_fragments_for_jobs<C>(
1749 db: &C,
1750 streaming_jobs: Vec<JobId>,
1751) -> MetaResult<(
1752 HashMap<SourceId, BTreeSet<FragmentId>>,
1753 HashSet<FragmentId>,
1754 HashSet<FragmentId>,
1755)>
1756where
1757 C: ConnectionTrait,
1758{
1759 if streaming_jobs.is_empty() {
1760 return Ok((HashMap::default(), HashSet::default(), HashSet::default()));
1761 }
1762
1763 let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find()
1764 .select_only()
1765 .columns([
1766 fragment::Column::FragmentId,
1767 fragment::Column::FragmentTypeMask,
1768 fragment::Column::StreamNode,
1769 ])
1770 .filter(fragment::Column::JobId.is_in(streaming_jobs))
1771 .into_tuple()
1772 .all(db)
1773 .await?;
1774
1775 let fragment_ids: HashSet<_> = fragments
1776 .iter()
1777 .map(|(fragment_id, _, _)| *fragment_id)
1778 .collect();
1779
1780 let mut source_fragment_ids: HashMap<SourceId, BTreeSet<FragmentId>> = HashMap::new();
1781 let mut sink_fragment_ids: HashSet<FragmentId> = HashSet::new();
1782 for (fragment_id, mask, stream_node) in fragments {
1783 if FragmentTypeMask::from(mask).contains(FragmentTypeFlag::Source)
1784 && let Some(source_id) = stream_node.to_protobuf().find_stream_source()
1785 {
1786 source_fragment_ids
1787 .entry(source_id)
1788 .or_default()
1789 .insert(fragment_id);
1790 }
1791 if FragmentTypeMask::from(mask).contains(FragmentTypeFlag::Sink) {
1792 sink_fragment_ids.insert(fragment_id);
1793 }
1794 }
1795
1796 Ok((source_fragment_ids, sink_fragment_ids, fragment_ids))
1797}
1798
1799pub(crate) fn build_object_group_for_delete(
1804 partial_objects: Vec<PartialObject>,
1805) -> NotificationInfo {
1806 let mut objects = vec![];
1807 for obj in partial_objects {
1808 match obj.obj_type {
1809 ObjectType::Database => objects.push(PbObject {
1810 object_info: Some(PbObjectInfo::Database(PbDatabase {
1811 id: obj.oid.as_database_id(),
1812 ..Default::default()
1813 })),
1814 }),
1815 ObjectType::Schema => objects.push(PbObject {
1816 object_info: Some(PbObjectInfo::Schema(PbSchema {
1817 id: obj.oid.as_schema_id(),
1818 database_id: obj.database_id.unwrap(),
1819 ..Default::default()
1820 })),
1821 }),
1822 ObjectType::Table => objects.push(PbObject {
1823 object_info: Some(PbObjectInfo::Table(PbTable {
1824 id: obj.oid.as_table_id(),
1825 schema_id: obj.schema_id.unwrap(),
1826 database_id: obj.database_id.unwrap(),
1827 ..Default::default()
1828 })),
1829 }),
1830 ObjectType::Source => objects.push(PbObject {
1831 object_info: Some(PbObjectInfo::Source(PbSource {
1832 id: obj.oid.as_source_id(),
1833 schema_id: obj.schema_id.unwrap(),
1834 database_id: obj.database_id.unwrap(),
1835 ..Default::default()
1836 })),
1837 }),
1838 ObjectType::Sink => objects.push(PbObject {
1839 object_info: Some(PbObjectInfo::Sink(PbSink {
1840 id: obj.oid.as_sink_id(),
1841 schema_id: obj.schema_id.unwrap(),
1842 database_id: obj.database_id.unwrap(),
1843 ..Default::default()
1844 })),
1845 }),
1846 ObjectType::Subscription => objects.push(PbObject {
1847 object_info: Some(PbObjectInfo::Subscription(PbSubscription {
1848 id: obj.oid.as_subscription_id(),
1849 schema_id: obj.schema_id.unwrap(),
1850 database_id: obj.database_id.unwrap(),
1851 ..Default::default()
1852 })),
1853 }),
1854 ObjectType::View => objects.push(PbObject {
1855 object_info: Some(PbObjectInfo::View(PbView {
1856 id: obj.oid.as_view_id(),
1857 schema_id: obj.schema_id.unwrap(),
1858 database_id: obj.database_id.unwrap(),
1859 ..Default::default()
1860 })),
1861 }),
1862 ObjectType::Index => {
1863 objects.push(PbObject {
1864 object_info: Some(PbObjectInfo::Index(PbIndex {
1865 id: obj.oid.as_index_id(),
1866 schema_id: obj.schema_id.unwrap(),
1867 database_id: obj.database_id.unwrap(),
1868 ..Default::default()
1869 })),
1870 });
1871 objects.push(PbObject {
1872 object_info: Some(PbObjectInfo::Table(PbTable {
1873 id: obj.oid.as_table_id(),
1874 schema_id: obj.schema_id.unwrap(),
1875 database_id: obj.database_id.unwrap(),
1876 ..Default::default()
1877 })),
1878 });
1879 }
1880 ObjectType::Function => objects.push(PbObject {
1881 object_info: Some(PbObjectInfo::Function(PbFunction {
1882 id: obj.oid.as_function_id(),
1883 schema_id: obj.schema_id.unwrap(),
1884 database_id: obj.database_id.unwrap(),
1885 ..Default::default()
1886 })),
1887 }),
1888 ObjectType::Connection => objects.push(PbObject {
1889 object_info: Some(PbObjectInfo::Connection(PbConnection {
1890 id: obj.oid.as_connection_id(),
1891 schema_id: obj.schema_id.unwrap(),
1892 database_id: obj.database_id.unwrap(),
1893 ..Default::default()
1894 })),
1895 }),
1896 ObjectType::Secret => objects.push(PbObject {
1897 object_info: Some(PbObjectInfo::Secret(PbSecret {
1898 id: obj.oid.as_secret_id(),
1899 schema_id: obj.schema_id.unwrap(),
1900 database_id: obj.database_id.unwrap(),
1901 ..Default::default()
1902 })),
1903 }),
1904 }
1905 }
1906 NotificationInfo::ObjectGroup(PbObjectGroup {
1907 objects,
1908 dependencies: vec![],
1909 })
1910}
1911
1912pub fn extract_external_table_name_from_definition(table_definition: &str) -> Option<String> {
1913 let [mut definition]: [_; 1] = Parser::parse_sql(table_definition)
1914 .context("unable to parse table definition")
1915 .inspect_err(|e| {
1916 tracing::error!(
1917 target: "auto_schema_change",
1918 error = %e.as_report(),
1919 "failed to parse table definition")
1920 })
1921 .unwrap()
1922 .try_into()
1923 .unwrap();
1924 if let SqlStatement::CreateTable { cdc_table_info, .. } = &mut definition {
1925 cdc_table_info
1926 .clone()
1927 .map(|cdc_table_info| cdc_table_info.external_table_name)
1928 } else {
1929 None
1930 }
1931}
1932
1933pub async fn rename_relation(
1936 txn: &DatabaseTransaction,
1937 object_type: ObjectType,
1938 object_id: ObjectId,
1939 object_name: &str,
1940) -> MetaResult<(Vec<PbObject>, String)> {
1941 use sea_orm::ActiveModelTrait;
1942
1943 use crate::controller::rename::alter_relation_rename;
1944
1945 let mut to_update_relations = vec![];
1946 macro_rules! rename_relation {
1948 ($entity:ident, $table:ident, $identity:ident, $object_id:expr) => {{
1949 let (mut relation, obj) = $entity::find_by_id($object_id)
1950 .find_also_related(Object)
1951 .one(txn)
1952 .await?
1953 .unwrap();
1954 let obj = obj.unwrap();
1955 let old_name = relation.name.clone();
1956 relation.name = object_name.into();
1957 if obj.obj_type != ObjectType::View {
1958 relation.definition = alter_relation_rename(&relation.definition, object_name);
1959 }
1960 let active_model = $table::ActiveModel {
1961 $identity: Set(relation.$identity),
1962 name: Set(object_name.into()),
1963 definition: Set(relation.definition.clone()),
1964 ..Default::default()
1965 };
1966 active_model.update(txn).await?;
1967 let streaming_job = streaming_job::Entity::find_by_id($object_id.as_raw_id())
1968 .one(txn)
1969 .await?;
1970 to_update_relations.push(PbObject {
1971 object_info: Some(PbObjectInfo::$entity(
1972 ObjectModel(relation, obj, streaming_job).into(),
1973 )),
1974 });
1975 old_name
1976 }};
1977 }
1978 let old_name = match object_type {
1980 ObjectType::Table => {
1981 let associated_source_id: Option<SourceId> = Source::find()
1982 .select_only()
1983 .column(source::Column::SourceId)
1984 .filter(source::Column::OptionalAssociatedTableId.eq(object_id))
1985 .into_tuple()
1986 .one(txn)
1987 .await?;
1988 if let Some(source_id) = associated_source_id {
1989 rename_relation!(Source, source, source_id, source_id);
1990 }
1991 rename_relation!(Table, table, table_id, object_id.as_table_id())
1992 }
1993 ObjectType::Source => {
1994 rename_relation!(Source, source, source_id, object_id.as_source_id())
1995 }
1996 ObjectType::Sink => rename_relation!(Sink, sink, sink_id, object_id.as_sink_id()),
1997 ObjectType::Subscription => {
1998 rename_relation!(
1999 Subscription,
2000 subscription,
2001 subscription_id,
2002 object_id.as_subscription_id()
2003 )
2004 }
2005 ObjectType::View => rename_relation!(View, view, view_id, object_id.as_view_id()),
2006 ObjectType::Index => {
2007 let (mut index, obj) = Index::find_by_id(object_id.as_index_id())
2008 .find_also_related(Object)
2009 .one(txn)
2010 .await?
2011 .unwrap();
2012 let streaming_job = streaming_job::Entity::find_by_id(index.index_id.as_job_id())
2013 .one(txn)
2014 .await?;
2015 index.name = object_name.into();
2016 let index_table_id = index.index_table_id;
2017 let old_name = rename_relation!(Table, table, table_id, index_table_id);
2018
2019 let active_model = index::ActiveModel {
2021 index_id: sea_orm::ActiveValue::Set(index.index_id),
2022 name: sea_orm::ActiveValue::Set(object_name.into()),
2023 ..Default::default()
2024 };
2025 active_model.update(txn).await?;
2026 to_update_relations.push(PbObject {
2027 object_info: Some(PbObjectInfo::Index(
2028 ObjectModel(index, obj.unwrap(), streaming_job).into(),
2029 )),
2030 });
2031 old_name
2032 }
2033 _ => unreachable!("only relation name can be altered."),
2034 };
2035
2036 Ok((to_update_relations, old_name))
2037}
2038
2039pub async fn get_database_resource_group<C>(txn: &C, database_id: DatabaseId) -> MetaResult<String>
2040where
2041 C: ConnectionTrait,
2042{
2043 let database_resource_group: Option<String> = Database::find_by_id(database_id)
2044 .select_only()
2045 .column(database::Column::ResourceGroup)
2046 .into_tuple()
2047 .one(txn)
2048 .await?
2049 .ok_or_else(|| MetaError::catalog_id_not_found("database", database_id))?;
2050
2051 Ok(database_resource_group.unwrap_or_else(|| DEFAULT_RESOURCE_GROUP.to_owned()))
2052}
2053
2054pub async fn get_existing_job_resource_group<C>(
2055 txn: &C,
2056 streaming_job_id: JobId,
2057) -> MetaResult<String>
2058where
2059 C: ConnectionTrait,
2060{
2061 let (job_specific_resource_group, database_resource_group): (Option<String>, Option<String>) =
2062 StreamingJob::find_by_id(streaming_job_id)
2063 .select_only()
2064 .join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
2065 .join(JoinType::InnerJoin, object::Relation::Database2.def())
2066 .column(streaming_job::Column::SpecificResourceGroup)
2067 .column(database::Column::ResourceGroup)
2068 .into_tuple()
2069 .one(txn)
2070 .await?
2071 .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", streaming_job_id))?;
2072
2073 Ok(job_specific_resource_group.unwrap_or_else(|| {
2074 database_resource_group.unwrap_or_else(|| DEFAULT_RESOURCE_GROUP.to_owned())
2075 }))
2076}
2077
2078pub fn filter_workers_by_resource_group(
2079 workers: &HashMap<WorkerId, WorkerNode>,
2080 resource_group: &str,
2081) -> BTreeSet<WorkerId> {
2082 workers
2083 .iter()
2084 .filter(|&(_, worker)| {
2085 worker
2086 .resource_group()
2087 .map(|node_label| node_label.as_str() == resource_group)
2088 .unwrap_or(false)
2089 })
2090 .map(|(id, _)| *id)
2091 .collect()
2092}
2093
2094pub async fn rename_relation_refer(
2097 txn: &DatabaseTransaction,
2098 object_type: ObjectType,
2099 object_id: ObjectId,
2100 object_name: &str,
2101 old_name: &str,
2102) -> MetaResult<Vec<PbObject>> {
2103 use sea_orm::ActiveModelTrait;
2104
2105 use crate::controller::rename::alter_relation_rename_refs;
2106
2107 let mut to_update_relations = vec![];
2108 macro_rules! rename_relation_ref {
2109 ($entity:ident, $table:ident, $identity:ident, $object_id:expr) => {{
2110 let (mut relation, obj) = $entity::find_by_id($object_id)
2111 .find_also_related(Object)
2112 .one(txn)
2113 .await?
2114 .unwrap();
2115 relation.definition =
2116 alter_relation_rename_refs(&relation.definition, old_name, object_name);
2117 let active_model = $table::ActiveModel {
2118 $identity: Set(relation.$identity),
2119 definition: Set(relation.definition.clone()),
2120 ..Default::default()
2121 };
2122 active_model.update(txn).await?;
2123 let streaming_job = streaming_job::Entity::find_by_id($object_id.as_raw_id())
2124 .one(txn)
2125 .await?;
2126 to_update_relations.push(PbObject {
2127 object_info: Some(PbObjectInfo::$entity(
2128 ObjectModel(relation, obj.unwrap(), streaming_job).into(),
2129 )),
2130 });
2131 }};
2132 }
2133 let mut objs = get_referring_objects(object_id, txn).await?;
2134 if object_type == ObjectType::Table {
2135 let incoming_sinks: Vec<SinkId> = Sink::find()
2136 .select_only()
2137 .column(sink::Column::SinkId)
2138 .filter(sink::Column::TargetTable.eq(object_id))
2139 .into_tuple()
2140 .all(txn)
2141 .await?;
2142
2143 objs.extend(incoming_sinks.into_iter().map(|id| PartialObject {
2144 oid: id.as_object_id(),
2145 obj_type: ObjectType::Sink,
2146 schema_id: None,
2147 database_id: None,
2148 }));
2149 }
2150
2151 for obj in objs {
2152 match obj.obj_type {
2153 ObjectType::Table => {
2154 rename_relation_ref!(Table, table, table_id, obj.oid.as_table_id())
2155 }
2156 ObjectType::Sink => {
2157 rename_relation_ref!(Sink, sink, sink_id, obj.oid.as_sink_id())
2158 }
2159 ObjectType::Subscription => {
2160 rename_relation_ref!(
2161 Subscription,
2162 subscription,
2163 subscription_id,
2164 obj.oid.as_subscription_id()
2165 )
2166 }
2167 ObjectType::View => rename_relation_ref!(View, view, view_id, obj.oid.as_view_id()),
2168 ObjectType::Index => {
2169 let index_table_id: Option<TableId> = Index::find_by_id(obj.oid.as_index_id())
2170 .select_only()
2171 .column(index::Column::IndexTableId)
2172 .into_tuple()
2173 .one(txn)
2174 .await?;
2175 rename_relation_ref!(Table, table, table_id, index_table_id.unwrap());
2176 }
2177 _ => {
2178 bail!(
2179 "only the table, sink, subscription, view and index will depend on other objects."
2180 )
2181 }
2182 }
2183 }
2184
2185 Ok(to_update_relations)
2186}
2187
2188pub async fn validate_subscription_deletion<C>(
2192 txn: &C,
2193 subscription_id: SubscriptionId,
2194) -> MetaResult<()>
2195where
2196 C: ConnectionTrait,
2197{
2198 let upstream_table_id: ObjectId = Subscription::find_by_id(subscription_id)
2199 .select_only()
2200 .column(subscription::Column::DependentTableId)
2201 .into_tuple()
2202 .one(txn)
2203 .await?
2204 .ok_or_else(|| MetaError::catalog_id_not_found("subscription", subscription_id))?;
2205
2206 let cnt = Subscription::find()
2207 .filter(subscription::Column::DependentTableId.eq(upstream_table_id))
2208 .count(txn)
2209 .await?;
2210 if cnt > 1 {
2211 return Ok(());
2214 }
2215
2216 let obj_alias = Alias::new("o1");
2218 let used_by_alias = Alias::new("o2");
2219 let count = ObjectDependency::find()
2220 .join_as(
2221 JoinType::InnerJoin,
2222 object_dependency::Relation::Object2.def(),
2223 obj_alias.clone(),
2224 )
2225 .join_as(
2226 JoinType::InnerJoin,
2227 object_dependency::Relation::Object1.def(),
2228 used_by_alias.clone(),
2229 )
2230 .filter(
2231 object_dependency::Column::Oid
2232 .eq(upstream_table_id)
2233 .and(object_dependency::Column::UsedBy.ne(subscription_id))
2234 .and(
2235 Expr::col((obj_alias, object::Column::DatabaseId))
2236 .ne(Expr::col((used_by_alias, object::Column::DatabaseId))),
2237 ),
2238 )
2239 .count(txn)
2240 .await?;
2241
2242 if count != 0 {
2243 return Err(MetaError::permission_denied(format!(
2244 "Referenced by {} cross-db objects.",
2245 count
2246 )));
2247 }
2248
2249 Ok(())
2250}
2251
2252pub async fn fetch_target_fragments<C>(
2253 txn: &C,
2254 src_fragment_id: impl IntoIterator<Item = FragmentId>,
2255) -> MetaResult<HashMap<FragmentId, Vec<FragmentId>>>
2256where
2257 C: ConnectionTrait,
2258{
2259 let source_target_fragments: Vec<(FragmentId, FragmentId)> = FragmentRelation::find()
2260 .select_only()
2261 .columns([
2262 fragment_relation::Column::SourceFragmentId,
2263 fragment_relation::Column::TargetFragmentId,
2264 ])
2265 .filter(fragment_relation::Column::SourceFragmentId.is_in(src_fragment_id))
2266 .into_tuple()
2267 .all(txn)
2268 .await?;
2269
2270 let source_target_fragments = source_target_fragments.into_iter().into_group_map();
2271
2272 Ok(source_target_fragments)
2273}
2274
2275pub async fn get_sink_fragment_by_ids<C>(
2276 txn: &C,
2277 sink_ids: Vec<SinkId>,
2278) -> MetaResult<HashMap<SinkId, FragmentId>>
2279where
2280 C: ConnectionTrait,
2281{
2282 let sink_num = sink_ids.len();
2283 let sink_fragment_ids: Vec<(SinkId, FragmentId)> = Fragment::find()
2284 .select_only()
2285 .columns([fragment::Column::JobId, fragment::Column::FragmentId])
2286 .filter(
2287 fragment::Column::JobId
2288 .is_in(sink_ids)
2289 .and(FragmentTypeMask::intersects(FragmentTypeFlag::Sink)),
2290 )
2291 .into_tuple()
2292 .all(txn)
2293 .await?;
2294
2295 if sink_fragment_ids.len() != sink_num {
2296 return Err(anyhow::anyhow!(
2297 "expected exactly one sink fragment for each sink, but got {} fragments for {} sinks",
2298 sink_fragment_ids.len(),
2299 sink_num
2300 )
2301 .into());
2302 }
2303
2304 Ok(sink_fragment_ids.into_iter().collect())
2305}
2306
2307pub async fn has_table_been_migrated<C>(txn: &C, table_id: TableId) -> MetaResult<bool>
2308where
2309 C: ConnectionTrait,
2310{
2311 let mview_fragment: Vec<i32> = Fragment::find()
2312 .select_only()
2313 .column(fragment::Column::FragmentTypeMask)
2314 .filter(
2315 fragment::Column::JobId
2316 .eq(table_id)
2317 .and(FragmentTypeMask::intersects(FragmentTypeFlag::Mview)),
2318 )
2319 .into_tuple()
2320 .all(txn)
2321 .await?;
2322
2323 let mview_fragment_len = mview_fragment.len();
2324 if mview_fragment_len != 1 {
2325 bail!(
2326 "expected exactly one mview fragment for table {}, found {}",
2327 table_id,
2328 mview_fragment_len
2329 );
2330 }
2331
2332 let mview_fragment = mview_fragment.into_iter().next().unwrap();
2333 let migrated =
2334 FragmentTypeMask::from(mview_fragment).contains(FragmentTypeFlag::UpstreamSinkUnion);
2335
2336 Ok(migrated)
2337}
2338
2339pub async fn try_get_iceberg_table_by_downstream_sink<C>(
2340 txn: &C,
2341 sink_id: SinkId,
2342) -> MetaResult<Option<TableId>>
2343where
2344 C: ConnectionTrait,
2345{
2346 let sink = Sink::find_by_id(sink_id).one(txn).await?;
2347 let Some(sink) = sink else {
2348 return Ok(None);
2349 };
2350
2351 if sink.name.starts_with(ICEBERG_SINK_PREFIX) {
2352 let object_ids: Vec<ObjectId> = ObjectDependency::find()
2353 .select_only()
2354 .column(object_dependency::Column::Oid)
2355 .filter(object_dependency::Column::UsedBy.eq(sink_id))
2356 .into_tuple()
2357 .all(txn)
2358 .await?;
2359 let mut iceberg_table_ids = vec![];
2360 for object_id in object_ids {
2361 let table_id = object_id.as_table_id();
2362 if let Some(table_engine) = Table::find_by_id(table_id)
2363 .select_only()
2364 .column(table::Column::Engine)
2365 .into_tuple::<table::Engine>()
2366 .one(txn)
2367 .await?
2368 && table_engine == table::Engine::Iceberg
2369 {
2370 iceberg_table_ids.push(table_id);
2371 }
2372 }
2373 if iceberg_table_ids.len() == 1 {
2374 return Ok(Some(iceberg_table_ids[0]));
2375 }
2376 }
2377 Ok(None)
2378}
2379
2380pub async fn check_if_belongs_to_iceberg_table<C>(txn: &C, job_id: JobId) -> MetaResult<bool>
2381where
2382 C: ConnectionTrait,
2383{
2384 if let Some(engine) = Table::find_by_id(job_id.as_mv_table_id())
2385 .select_only()
2386 .column(table::Column::Engine)
2387 .into_tuple::<table::Engine>()
2388 .one(txn)
2389 .await?
2390 && engine == table::Engine::Iceberg
2391 {
2392 return Ok(true);
2393 }
2394 if let Some(sink_name) = Sink::find_by_id(job_id.as_sink_id())
2395 .select_only()
2396 .column(sink::Column::Name)
2397 .into_tuple::<String>()
2398 .one(txn)
2399 .await?
2400 && sink_name.starts_with(ICEBERG_SINK_PREFIX)
2401 {
2402 return Ok(true);
2403 }
2404 Ok(false)
2405}
2406
2407pub async fn find_dirty_iceberg_table_jobs<C>(
2408 txn: &C,
2409 database_id: Option<DatabaseId>,
2410) -> MetaResult<Vec<PartialObject>>
2411where
2412 C: ConnectionTrait,
2413{
2414 let mut filter_condition = streaming_job::Column::JobStatus
2415 .ne(JobStatus::Created)
2416 .and(object::Column::ObjType.is_in([ObjectType::Table, ObjectType::Sink]))
2417 .and(streaming_job::Column::CreateType.eq(CreateType::Background));
2418 if let Some(database_id) = database_id {
2419 filter_condition = filter_condition.and(object::Column::DatabaseId.eq(database_id));
2420 }
2421 let creating_table_sink_jobs: Vec<PartialObject> = StreamingJob::find()
2422 .select_only()
2423 .columns([
2424 object::Column::Oid,
2425 object::Column::ObjType,
2426 object::Column::SchemaId,
2427 object::Column::DatabaseId,
2428 ])
2429 .join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
2430 .filter(filter_condition)
2431 .into_partial_model()
2432 .all(txn)
2433 .await?;
2434
2435 let mut dirty_iceberg_table_jobs = vec![];
2436 for job in creating_table_sink_jobs {
2437 if check_if_belongs_to_iceberg_table(txn, job.oid.as_job_id()).await? {
2438 tracing::info!("Found dirty iceberg job with id: {}", job.oid);
2439 dirty_iceberg_table_jobs.push(job);
2440 }
2441 }
2442
2443 Ok(dirty_iceberg_table_jobs)
2444}
2445
2446pub fn build_select_node_list(
2447 from: &[ColumnCatalog],
2448 to: &[ColumnCatalog],
2449) -> MetaResult<Vec<PbExprNode>> {
2450 let mut exprs = Vec::with_capacity(to.len());
2451 let idx_by_col_id = from
2452 .iter()
2453 .enumerate()
2454 .map(|(idx, col)| (col.column_desc.as_ref().unwrap().column_id, idx))
2455 .collect::<HashMap<_, _>>();
2456
2457 for to_col in to {
2458 let to_col = to_col.column_desc.as_ref().unwrap();
2459 let to_col_type_ref = to_col.column_type.as_ref().unwrap();
2460 let to_col_type = DataType::from(to_col_type_ref);
2461 if let Some(from_idx) = idx_by_col_id.get(&to_col.column_id) {
2462 let from_col_type = DataType::from(
2463 from[*from_idx]
2464 .column_desc
2465 .as_ref()
2466 .unwrap()
2467 .column_type
2468 .as_ref()
2469 .unwrap(),
2470 );
2471 if !to_col_type.equals_datatype(&from_col_type) {
2472 return Err(anyhow!(
2473 "Column type mismatch: {:?} != {:?}",
2474 from_col_type,
2475 to_col_type
2476 )
2477 .into());
2478 }
2479 exprs.push(PbExprNode {
2480 function_type: expr_node::Type::Unspecified.into(),
2481 return_type: Some(to_col_type_ref.clone()),
2482 rex_node: Some(expr_node::RexNode::InputRef(*from_idx as _)),
2483 });
2484 } else {
2485 let to_default_node =
2486 if let Some(GeneratedOrDefaultColumn::DefaultColumn(DefaultColumnDesc {
2487 expr,
2488 ..
2489 })) = &to_col.generated_or_default_column
2490 {
2491 expr.clone().unwrap()
2492 } else {
2493 let null = Datum::None.to_protobuf();
2494 PbExprNode {
2495 function_type: expr_node::Type::Unspecified.into(),
2496 return_type: Some(to_col_type_ref.clone()),
2497 rex_node: Some(expr_node::RexNode::Constant(null)),
2498 }
2499 };
2500 exprs.push(to_default_node);
2501 }
2502 }
2503
2504 Ok(exprs)
2505}
2506
2507#[derive(Clone, Debug, Default)]
2508pub struct StreamingJobExtraInfo {
2509 pub timezone: Option<String>,
2510 pub config_override: Arc<str>,
2511 pub job_definition: String,
2512 pub backfill_orders: Option<BackfillOrders>,
2513}
2514
2515impl StreamingJobExtraInfo {
2516 pub fn stream_context(&self) -> StreamContext {
2517 StreamContext {
2518 timezone: self.timezone.clone(),
2519 config_override: self.config_override.clone(),
2520 }
2521 }
2522}
2523
2524type StreamingJobExtraInfoRow = (
2526 JobId,
2527 Option<String>,
2528 Option<String>,
2529 Option<BackfillOrders>,
2530);
2531
2532pub async fn get_streaming_job_extra_info<C>(
2533 txn: &C,
2534 job_ids: Vec<JobId>,
2535) -> MetaResult<HashMap<JobId, StreamingJobExtraInfo>>
2536where
2537 C: ConnectionTrait,
2538{
2539 let pairs: Vec<StreamingJobExtraInfoRow> = StreamingJob::find()
2540 .select_only()
2541 .columns([
2542 streaming_job::Column::JobId,
2543 streaming_job::Column::Timezone,
2544 streaming_job::Column::ConfigOverride,
2545 streaming_job::Column::BackfillOrders,
2546 ])
2547 .filter(streaming_job::Column::JobId.is_in(job_ids.clone()))
2548 .into_tuple()
2549 .all(txn)
2550 .await?;
2551
2552 let job_ids = job_ids.into_iter().collect();
2553
2554 let mut definitions = resolve_streaming_job_definition(txn, &job_ids).await?;
2555
2556 let result = pairs
2557 .into_iter()
2558 .map(|(job_id, timezone, config_override, backfill_orders)| {
2559 let job_definition = definitions.remove(&job_id).unwrap_or_default();
2560 (
2561 job_id,
2562 StreamingJobExtraInfo {
2563 timezone,
2564 config_override: config_override.unwrap_or_default().into(),
2565 job_definition,
2566 backfill_orders,
2567 },
2568 )
2569 })
2570 .collect();
2571
2572 Ok(result)
2573}
2574
2575#[cfg(test)]
2576mod tests {
2577 use super::*;
2578
2579 #[test]
2580 fn test_extract_cdc_table_name() {
2581 let ddl1 = "CREATE TABLE t1 () FROM pg_source TABLE 'public.t1'";
2582 let ddl2 = "CREATE TABLE t2 (v1 int) FROM pg_source TABLE 'mydb.t2'";
2583 assert_eq!(
2584 extract_external_table_name_from_definition(ddl1),
2585 Some("public.t1".into())
2586 );
2587 assert_eq!(
2588 extract_external_table_name_from_definition(ddl2),
2589 Some("mydb.t2".into())
2590 );
2591 }
2592}