1use std::collections::{BTreeSet, HashMap, HashSet};
16use std::sync::Arc;
17
18use anyhow::{Context, anyhow};
19use itertools::Itertools;
20use risingwave_common::bitmap::Bitmap;
21use risingwave_common::catalog::{
22 FragmentTypeFlag, FragmentTypeMask, ICEBERG_SINK_PREFIX, ICEBERG_SOURCE_PREFIX,
23};
24use risingwave_common::hash::{ActorMapping, VnodeBitmapExt, WorkerSlotId, WorkerSlotMapping};
25use risingwave_common::id::{JobId, SubscriptionId};
26use risingwave_common::types::{DataType, Datum};
27use risingwave_common::util::value_encoding::DatumToProtoExt;
28use risingwave_common::util::worker_util::DEFAULT_RESOURCE_GROUP;
29use risingwave_common::{bail, hash};
30use risingwave_meta_model::fragment::DistributionType;
31use risingwave_meta_model::object::ObjectType;
32use risingwave_meta_model::prelude::*;
33use risingwave_meta_model::table::TableType;
34use risingwave_meta_model::user_privilege::Action;
35use risingwave_meta_model::{
36 ActorId, ColumnCatalogArray, CreateType, DataTypeArray, DatabaseId, DispatcherType, FragmentId,
37 JobStatus, ObjectId, PrivilegeId, SchemaId, SinkId, SourceId, StreamNode, StreamSourceInfo,
38 TableId, TableIdArray, UserId, WorkerId, connection, database, fragment, fragment_relation,
39 function, index, object, object_dependency, schema, secret, sink, source, streaming_job,
40 subscription, table, user, user_default_privilege, user_privilege, view,
41};
42use risingwave_meta_model_migration::WithQuery;
43use risingwave_pb::catalog::{
44 PbConnection, PbDatabase, PbFunction, PbIndex, PbSchema, PbSecret, PbSink, PbSource,
45 PbSubscription, PbTable, PbView,
46};
47use risingwave_pb::common::WorkerNode;
48use risingwave_pb::expr::{PbExprNode, expr_node};
49use risingwave_pb::meta::object::PbObjectInfo;
50use risingwave_pb::meta::subscribe_response::Info as NotificationInfo;
51use risingwave_pb::meta::{PbFragmentWorkerSlotMapping, PbObject, PbObjectGroup};
52use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn;
53use risingwave_pb::plan_common::{ColumnCatalog, DefaultColumnDesc};
54use risingwave_pb::stream_plan::{PbDispatchOutputMapping, PbDispatcher, PbDispatcherType};
55use risingwave_pb::user::grant_privilege::{PbActionWithGrantOption, PbObject as PbGrantObject};
56use risingwave_pb::user::{PbAction, PbGrantPrivilege, PbUserInfo};
57use risingwave_sqlparser::ast::Statement as SqlStatement;
58use risingwave_sqlparser::parser::Parser;
59use sea_orm::sea_query::{
60 Alias, CommonTableExpression, Expr, Query, QueryStatementBuilder, SelectStatement, UnionType,
61 WithClause,
62};
63use sea_orm::{
64 ColumnTrait, ConnectionTrait, DatabaseTransaction, DerivePartialModel, EntityTrait,
65 FromQueryResult, IntoActiveModel, JoinType, Order, PaginatorTrait, QueryFilter, QuerySelect,
66 RelationTrait, Set, Statement,
67};
68use thiserror_ext::AsReport;
69use tracing::warn;
70
71use crate::barrier::{SharedActorInfos, SharedFragmentInfo};
72use crate::controller::ObjectModel;
73use crate::controller::fragment::FragmentTypeMaskExt;
74use crate::controller::scale::resolve_streaming_job_definition;
75use crate::model::{FragmentDownstreamRelation, StreamContext};
76use crate::{MetaError, MetaResult};
77
78pub fn construct_obj_dependency_query(obj_id: ObjectId) -> WithQuery {
103 let cte_alias = Alias::new("used_by_object_ids");
104 let cte_return_alias = Alias::new("used_by");
105
106 let mut base_query = SelectStatement::new()
107 .column(object_dependency::Column::UsedBy)
108 .from(ObjectDependency)
109 .and_where(object_dependency::Column::Oid.eq(obj_id))
110 .to_owned();
111
112 let belonged_obj_query = SelectStatement::new()
113 .column(object::Column::Oid)
114 .from(Object)
115 .and_where(
116 object::Column::DatabaseId
117 .eq(obj_id)
118 .or(object::Column::SchemaId.eq(obj_id)),
119 )
120 .to_owned();
121
122 let cte_referencing = Query::select()
123 .column((ObjectDependency, object_dependency::Column::UsedBy))
124 .from(ObjectDependency)
125 .inner_join(
126 cte_alias.clone(),
127 Expr::col((cte_alias.clone(), cte_return_alias.clone()))
128 .equals(object_dependency::Column::Oid),
129 )
130 .to_owned();
131
132 let mut common_table_expr = CommonTableExpression::new();
133 common_table_expr
134 .query(
135 base_query
136 .union(UnionType::All, belonged_obj_query)
137 .union(UnionType::All, cte_referencing)
138 .to_owned(),
139 )
140 .column(cte_return_alias.clone())
141 .table_name(cte_alias.clone());
142
143 SelectStatement::new()
144 .distinct()
145 .columns([
146 object::Column::Oid,
147 object::Column::ObjType,
148 object::Column::SchemaId,
149 object::Column::DatabaseId,
150 ])
151 .from(cte_alias.clone())
152 .inner_join(
153 Object,
154 Expr::col((cte_alias, cte_return_alias)).equals(object::Column::Oid),
155 )
156 .order_by(object::Column::Oid, Order::Desc)
157 .to_owned()
158 .with(
159 WithClause::new()
160 .recursive(true)
161 .cte(common_table_expr)
162 .to_owned(),
163 )
164}
165
166pub fn construct_sink_cycle_check_query(
191 target_table: ObjectId,
192 dependent_objects: Vec<ObjectId>,
193) -> WithQuery {
194 let cte_alias = Alias::new("used_by_object_ids_with_sink");
195 let depend_alias = Alias::new("obj_dependency_with_sink");
196
197 let mut base_query = SelectStatement::new()
198 .columns([
199 object_dependency::Column::Oid,
200 object_dependency::Column::UsedBy,
201 ])
202 .from(ObjectDependency)
203 .and_where(object_dependency::Column::Oid.eq(target_table))
204 .to_owned();
205
206 let query_sink_deps = SelectStatement::new()
207 .columns([sink::Column::SinkId, sink::Column::TargetTable])
208 .from(Sink)
209 .and_where(sink::Column::TargetTable.is_not_null())
210 .to_owned();
211
212 let cte_referencing = Query::select()
213 .column((depend_alias.clone(), object_dependency::Column::Oid))
214 .column((depend_alias.clone(), object_dependency::Column::UsedBy))
215 .from_subquery(
216 SelectStatement::new()
217 .columns([
218 object_dependency::Column::Oid,
219 object_dependency::Column::UsedBy,
220 ])
221 .from(ObjectDependency)
222 .union(UnionType::All, query_sink_deps)
223 .to_owned(),
224 depend_alias.clone(),
225 )
226 .inner_join(
227 cte_alias.clone(),
228 Expr::col((cte_alias.clone(), object_dependency::Column::UsedBy))
229 .eq(Expr::col((depend_alias, object_dependency::Column::Oid))),
230 )
231 .and_where(
232 Expr::col((cte_alias.clone(), object_dependency::Column::UsedBy)).ne(Expr::col((
233 cte_alias.clone(),
234 object_dependency::Column::Oid,
235 ))),
236 )
237 .to_owned();
238
239 let mut common_table_expr = CommonTableExpression::new();
240 common_table_expr
241 .query(base_query.union(UnionType::All, cte_referencing).to_owned())
242 .columns([
243 object_dependency::Column::Oid,
244 object_dependency::Column::UsedBy,
245 ])
246 .table_name(cte_alias.clone());
247
248 SelectStatement::new()
249 .expr(Expr::col((cte_alias.clone(), object_dependency::Column::UsedBy)).count())
250 .from(cte_alias.clone())
251 .and_where(
252 Expr::col((cte_alias, object_dependency::Column::UsedBy)).is_in(dependent_objects),
253 )
254 .to_owned()
255 .with(
256 WithClause::new()
257 .recursive(true)
258 .cte(common_table_expr)
259 .to_owned(),
260 )
261}
262
263#[derive(Clone, DerivePartialModel, FromQueryResult, Debug)]
264#[sea_orm(entity = "Object")]
265pub struct PartialObject {
266 pub oid: ObjectId,
267 pub obj_type: ObjectType,
268 pub schema_id: Option<SchemaId>,
269 pub database_id: Option<DatabaseId>,
270}
271
272#[derive(Clone, DerivePartialModel, FromQueryResult)]
273#[sea_orm(entity = "Fragment")]
274pub struct PartialFragmentStateTables {
275 pub fragment_id: FragmentId,
276 pub job_id: ObjectId,
277 pub state_table_ids: TableIdArray,
278}
279
280#[derive(Clone, Eq, PartialEq, Debug)]
281pub struct PartialActorLocation {
282 pub actor_id: ActorId,
283 pub fragment_id: FragmentId,
284 pub worker_id: WorkerId,
285}
286
287#[derive(FromQueryResult, Debug, Eq, PartialEq, Clone)]
288pub struct FragmentDesc {
289 pub fragment_id: FragmentId,
290 pub job_id: JobId,
291 pub fragment_type_mask: i32,
292 pub distribution_type: DistributionType,
293 pub state_table_ids: TableIdArray,
294 pub parallelism: i64,
295 pub vnode_count: i32,
296 pub stream_node: StreamNode,
297 pub parallelism_policy: String,
298}
299
300pub async fn get_referring_objects_cascade<C>(
302 obj_id: ObjectId,
303 db: &C,
304) -> MetaResult<Vec<PartialObject>>
305where
306 C: ConnectionTrait,
307{
308 let query = construct_obj_dependency_query(obj_id);
309 let (sql, values) = query.build_any(&*db.get_database_backend().get_query_builder());
310 let objects = PartialObject::find_by_statement(Statement::from_sql_and_values(
311 db.get_database_backend(),
312 sql,
313 values,
314 ))
315 .all(db)
316 .await?;
317 Ok(objects)
318}
319
320pub async fn check_sink_into_table_cycle<C>(
322 target_table: ObjectId,
323 dependent_objs: Vec<ObjectId>,
324 db: &C,
325) -> MetaResult<bool>
326where
327 C: ConnectionTrait,
328{
329 if dependent_objs.is_empty() {
330 return Ok(false);
331 }
332
333 if dependent_objs.contains(&target_table) {
335 return Ok(true);
336 }
337
338 let query = construct_sink_cycle_check_query(target_table, dependent_objs);
339 let (sql, values) = query.build_any(&*db.get_database_backend().get_query_builder());
340
341 let res = db
342 .query_one(Statement::from_sql_and_values(
343 db.get_database_backend(),
344 sql,
345 values,
346 ))
347 .await?
348 .unwrap();
349
350 let cnt: i64 = res.try_get_by(0)?;
351
352 Ok(cnt != 0)
353}
354
355pub async fn ensure_object_id<C>(
357 object_type: ObjectType,
358 obj_id: impl Into<ObjectId>,
359 db: &C,
360) -> MetaResult<()>
361where
362 C: ConnectionTrait,
363{
364 let obj_id = obj_id.into();
365 let count = Object::find_by_id(obj_id).count(db).await?;
366 if count == 0 {
367 return Err(MetaError::catalog_id_not_found(
368 object_type.as_str(),
369 obj_id,
370 ));
371 }
372 Ok(())
373}
374
375pub async fn ensure_job_not_canceled<C>(job_id: JobId, db: &C) -> MetaResult<()>
376where
377 C: ConnectionTrait,
378{
379 let count = Object::find_by_id(job_id).count(db).await?;
380 if count == 0 {
381 return Err(MetaError::cancelled(format!(
382 "job {} might be cancelled manually or by recovery",
383 job_id
384 )));
385 }
386 Ok(())
387}
388
389pub async fn ensure_user_id<C>(user_id: UserId, db: &C) -> MetaResult<()>
391where
392 C: ConnectionTrait,
393{
394 let count = User::find_by_id(user_id).count(db).await?;
395 if count == 0 {
396 return Err(anyhow!("user {} was concurrently dropped", user_id).into());
397 }
398 Ok(())
399}
400
401pub async fn check_database_name_duplicate<C>(name: &str, db: &C) -> MetaResult<()>
403where
404 C: ConnectionTrait,
405{
406 let count = Database::find()
407 .filter(database::Column::Name.eq(name))
408 .count(db)
409 .await?;
410 if count > 0 {
411 assert_eq!(count, 1);
412 return Err(MetaError::catalog_duplicated("database", name));
413 }
414 Ok(())
415}
416
417pub async fn check_function_signature_duplicate<C>(
419 pb_function: &PbFunction,
420 db: &C,
421) -> MetaResult<()>
422where
423 C: ConnectionTrait,
424{
425 let count = Function::find()
426 .inner_join(Object)
427 .filter(
428 object::Column::DatabaseId
429 .eq(pb_function.database_id)
430 .and(object::Column::SchemaId.eq(pb_function.schema_id))
431 .and(function::Column::Name.eq(&pb_function.name))
432 .and(
433 function::Column::ArgTypes
434 .eq(DataTypeArray::from(pb_function.arg_types.clone())),
435 ),
436 )
437 .count(db)
438 .await?;
439 if count > 0 {
440 assert_eq!(count, 1);
441 return Err(MetaError::catalog_duplicated("function", &pb_function.name));
442 }
443 Ok(())
444}
445
446pub async fn check_connection_name_duplicate<C>(
448 pb_connection: &PbConnection,
449 db: &C,
450) -> MetaResult<()>
451where
452 C: ConnectionTrait,
453{
454 let count = Connection::find()
455 .inner_join(Object)
456 .filter(
457 object::Column::DatabaseId
458 .eq(pb_connection.database_id)
459 .and(object::Column::SchemaId.eq(pb_connection.schema_id))
460 .and(connection::Column::Name.eq(&pb_connection.name)),
461 )
462 .count(db)
463 .await?;
464 if count > 0 {
465 assert_eq!(count, 1);
466 return Err(MetaError::catalog_duplicated(
467 "connection",
468 &pb_connection.name,
469 ));
470 }
471 Ok(())
472}
473
474pub async fn check_secret_name_duplicate<C>(pb_secret: &PbSecret, db: &C) -> MetaResult<()>
475where
476 C: ConnectionTrait,
477{
478 let count = Secret::find()
479 .inner_join(Object)
480 .filter(
481 object::Column::DatabaseId
482 .eq(pb_secret.database_id)
483 .and(object::Column::SchemaId.eq(pb_secret.schema_id))
484 .and(secret::Column::Name.eq(&pb_secret.name)),
485 )
486 .count(db)
487 .await?;
488 if count > 0 {
489 assert_eq!(count, 1);
490 return Err(MetaError::catalog_duplicated("secret", &pb_secret.name));
491 }
492 Ok(())
493}
494
495pub async fn check_subscription_name_duplicate<C>(
496 pb_subscription: &PbSubscription,
497 db: &C,
498) -> MetaResult<()>
499where
500 C: ConnectionTrait,
501{
502 let count = Subscription::find()
503 .inner_join(Object)
504 .filter(
505 object::Column::DatabaseId
506 .eq(pb_subscription.database_id)
507 .and(object::Column::SchemaId.eq(pb_subscription.schema_id))
508 .and(subscription::Column::Name.eq(&pb_subscription.name)),
509 )
510 .count(db)
511 .await?;
512 if count > 0 {
513 assert_eq!(count, 1);
514 return Err(MetaError::catalog_duplicated(
515 "subscription",
516 &pb_subscription.name,
517 ));
518 }
519 Ok(())
520}
521
522pub async fn check_user_name_duplicate<C>(name: &str, db: &C) -> MetaResult<()>
524where
525 C: ConnectionTrait,
526{
527 let count = User::find()
528 .filter(user::Column::Name.eq(name))
529 .count(db)
530 .await?;
531 if count > 0 {
532 assert_eq!(count, 1);
533 return Err(MetaError::catalog_duplicated("user", name));
534 }
535 Ok(())
536}
537
538pub async fn check_relation_name_duplicate<C>(
540 name: &str,
541 database_id: DatabaseId,
542 schema_id: SchemaId,
543 db: &C,
544) -> MetaResult<()>
545where
546 C: ConnectionTrait,
547{
548 macro_rules! check_duplicated {
549 ($obj_type:expr, $entity:ident, $table:ident) => {
550 let object_id = Object::find()
551 .select_only()
552 .column(object::Column::Oid)
553 .inner_join($entity)
554 .filter(
555 object::Column::DatabaseId
556 .eq(Some(database_id))
557 .and(object::Column::SchemaId.eq(Some(schema_id)))
558 .and($table::Column::Name.eq(name)),
559 )
560 .into_tuple::<ObjectId>()
561 .one(db)
562 .await?;
563 if let Some(oid) = object_id {
564 let check_creation = if $obj_type == ObjectType::View {
565 false
566 } else if $obj_type == ObjectType::Source {
567 let source_info = Source::find_by_id(oid.as_source_id())
568 .select_only()
569 .column(source::Column::SourceInfo)
570 .into_tuple::<Option<StreamSourceInfo>>()
571 .one(db)
572 .await?
573 .unwrap();
574 source_info.map_or(false, |info| info.to_protobuf().is_shared())
575 } else {
576 true
577 };
578 let job_id = oid.as_job_id();
579 return if check_creation
580 && !matches!(
581 StreamingJob::find_by_id(job_id)
582 .select_only()
583 .column(streaming_job::Column::JobStatus)
584 .into_tuple::<JobStatus>()
585 .one(db)
586 .await?,
587 Some(JobStatus::Created)
588 ) {
589 Err(MetaError::catalog_under_creation(
590 $obj_type.as_str(),
591 name,
592 job_id,
593 ))
594 } else {
595 Err(MetaError::catalog_duplicated($obj_type.as_str(), name))
596 };
597 }
598 };
599 }
600 check_duplicated!(ObjectType::Table, Table, table);
601 check_duplicated!(ObjectType::Source, Source, source);
602 check_duplicated!(ObjectType::Sink, Sink, sink);
603 check_duplicated!(ObjectType::Index, Index, index);
604 check_duplicated!(ObjectType::View, View, view);
605
606 Ok(())
607}
608
609pub async fn check_schema_name_duplicate<C>(
611 name: &str,
612 database_id: DatabaseId,
613 db: &C,
614) -> MetaResult<()>
615where
616 C: ConnectionTrait,
617{
618 let count = Object::find()
619 .inner_join(Schema)
620 .filter(
621 object::Column::ObjType
622 .eq(ObjectType::Schema)
623 .and(object::Column::DatabaseId.eq(Some(database_id)))
624 .and(schema::Column::Name.eq(name)),
625 )
626 .count(db)
627 .await?;
628 if count != 0 {
629 return Err(MetaError::catalog_duplicated("schema", name));
630 }
631
632 Ok(())
633}
634
635pub async fn check_object_refer_for_drop<C>(
638 object_type: ObjectType,
639 object_id: ObjectId,
640 db: &C,
641) -> MetaResult<()>
642where
643 C: ConnectionTrait,
644{
645 let count = if object_type == ObjectType::Table {
647 ObjectDependency::find()
648 .join(
649 JoinType::InnerJoin,
650 object_dependency::Relation::Object1.def(),
651 )
652 .filter(
653 object_dependency::Column::Oid
654 .eq(object_id)
655 .and(object::Column::ObjType.ne(ObjectType::Index)),
656 )
657 .count(db)
658 .await?
659 } else {
660 ObjectDependency::find()
661 .filter(object_dependency::Column::Oid.eq(object_id))
662 .count(db)
663 .await?
664 };
665 if count != 0 {
666 let referring_objects = get_referring_objects(object_id, db).await?;
668 let referring_objs_map = referring_objects
669 .into_iter()
670 .filter(|o| o.obj_type != ObjectType::Index)
671 .into_group_map_by(|o| o.obj_type);
672 let mut details = vec![];
673 for (obj_type, objs) in referring_objs_map {
674 match obj_type {
675 ObjectType::Table => {
676 let tables: Vec<(String, String)> = Object::find()
677 .join(JoinType::InnerJoin, object::Relation::Table.def())
678 .join(JoinType::InnerJoin, object::Relation::Database2.def())
679 .join(JoinType::InnerJoin, object::Relation::Schema2.def())
680 .select_only()
681 .column(schema::Column::Name)
682 .column(table::Column::Name)
683 .filter(object::Column::Oid.is_in(objs.iter().map(|o| o.oid)))
684 .into_tuple()
685 .all(db)
686 .await?;
687 details.extend(tables.into_iter().map(|(schema_name, table_name)| {
688 format!(
689 "materialized view {}.{} depends on it",
690 schema_name, table_name
691 )
692 }));
693 }
694 ObjectType::Sink => {
695 let sinks: Vec<(String, String)> = Object::find()
696 .join(JoinType::InnerJoin, object::Relation::Sink.def())
697 .join(JoinType::InnerJoin, object::Relation::Database2.def())
698 .join(JoinType::InnerJoin, object::Relation::Schema2.def())
699 .select_only()
700 .column(schema::Column::Name)
701 .column(sink::Column::Name)
702 .filter(object::Column::Oid.is_in(objs.iter().map(|o| o.oid)))
703 .into_tuple()
704 .all(db)
705 .await?;
706 if object_type == ObjectType::Table {
707 let engine = Table::find_by_id(object_id.as_table_id())
708 .select_only()
709 .column(table::Column::Engine)
710 .into_tuple::<table::Engine>()
711 .one(db)
712 .await?;
713 if engine == Some(table::Engine::Iceberg) && sinks.len() == 1 {
714 continue;
715 }
716 }
717 details.extend(sinks.into_iter().map(|(schema_name, sink_name)| {
718 format!("sink {}.{} depends on it", schema_name, sink_name)
719 }));
720 }
721 ObjectType::View => {
722 let views: Vec<(String, String)> = Object::find()
723 .join(JoinType::InnerJoin, object::Relation::View.def())
724 .join(JoinType::InnerJoin, object::Relation::Database2.def())
725 .join(JoinType::InnerJoin, object::Relation::Schema2.def())
726 .select_only()
727 .column(schema::Column::Name)
728 .column(view::Column::Name)
729 .filter(object::Column::Oid.is_in(objs.iter().map(|o| o.oid)))
730 .into_tuple()
731 .all(db)
732 .await?;
733 details.extend(views.into_iter().map(|(schema_name, view_name)| {
734 format!("view {}.{} depends on it", schema_name, view_name)
735 }));
736 }
737 ObjectType::Subscription => {
738 let subscriptions: Vec<(String, String)> = Object::find()
739 .join(JoinType::InnerJoin, object::Relation::Subscription.def())
740 .join(JoinType::InnerJoin, object::Relation::Database2.def())
741 .join(JoinType::InnerJoin, object::Relation::Schema2.def())
742 .select_only()
743 .column(schema::Column::Name)
744 .column(subscription::Column::Name)
745 .filter(object::Column::Oid.is_in(objs.iter().map(|o| o.oid)))
746 .into_tuple()
747 .all(db)
748 .await?;
749 details.extend(subscriptions.into_iter().map(
750 |(schema_name, subscription_name)| {
751 format!(
752 "subscription {}.{} depends on it",
753 schema_name, subscription_name
754 )
755 },
756 ));
757 }
758 ObjectType::Source => {
759 let sources: Vec<(String, String)> = Object::find()
760 .join(JoinType::InnerJoin, object::Relation::Source.def())
761 .join(JoinType::InnerJoin, object::Relation::Database2.def())
762 .join(JoinType::InnerJoin, object::Relation::Schema2.def())
763 .select_only()
764 .column(schema::Column::Name)
765 .column(source::Column::Name)
766 .filter(object::Column::Oid.is_in(objs.iter().map(|o| o.oid)))
767 .into_tuple()
768 .all(db)
769 .await?;
770 details.extend(sources.into_iter().map(|(schema_name, view_name)| {
771 format!("source {}.{} depends on it", schema_name, view_name)
772 }));
773 }
774 ObjectType::Connection => {
775 let connections: Vec<(String, String)> = Object::find()
776 .join(JoinType::InnerJoin, object::Relation::Connection.def())
777 .join(JoinType::InnerJoin, object::Relation::Database2.def())
778 .join(JoinType::InnerJoin, object::Relation::Schema2.def())
779 .select_only()
780 .column(schema::Column::Name)
781 .column(connection::Column::Name)
782 .filter(object::Column::Oid.is_in(objs.iter().map(|o| o.oid)))
783 .into_tuple()
784 .all(db)
785 .await?;
786 details.extend(connections.into_iter().map(|(schema_name, view_name)| {
787 format!("connection {}.{} depends on it", schema_name, view_name)
788 }));
789 }
790 _ => bail!("unexpected referring object type: {}", obj_type.as_str()),
792 }
793 }
794 if details.is_empty() {
795 return Ok(());
796 }
797
798 return Err(MetaError::permission_denied(format!(
799 "{} used by {} other objects. \nDETAIL: {}\n\
800 {}",
801 object_type.as_str(),
802 details.len(),
803 details.join("\n"),
804 match object_type {
805 ObjectType::Function | ObjectType::Connection | ObjectType::Secret =>
806 "HINT: DROP the dependent objects first.",
807 ObjectType::Database | ObjectType::Schema => unreachable!(),
808 _ => "HINT: Use DROP ... CASCADE to drop the dependent objects too.",
809 }
810 )));
811 }
812 Ok(())
813}
814
815pub async fn get_referring_objects<C>(object_id: ObjectId, db: &C) -> MetaResult<Vec<PartialObject>>
817where
818 C: ConnectionTrait,
819{
820 let objs = ObjectDependency::find()
821 .filter(object_dependency::Column::Oid.eq(object_id))
822 .join(
823 JoinType::InnerJoin,
824 object_dependency::Relation::Object1.def(),
825 )
826 .into_partial_model()
827 .all(db)
828 .await?;
829
830 Ok(objs)
831}
832
833pub async fn ensure_schema_empty<C>(schema_id: SchemaId, db: &C) -> MetaResult<()>
835where
836 C: ConnectionTrait,
837{
838 let count = Object::find()
839 .filter(object::Column::SchemaId.eq(Some(schema_id)))
840 .count(db)
841 .await?;
842 if count != 0 {
843 return Err(MetaError::permission_denied("schema is not empty"));
844 }
845
846 Ok(())
847}
848
849pub async fn list_user_info_by_ids<C>(
851 user_ids: impl IntoIterator<Item = UserId>,
852 db: &C,
853) -> MetaResult<Vec<PbUserInfo>>
854where
855 C: ConnectionTrait,
856{
857 let mut user_infos = vec![];
858 for user_id in user_ids {
859 let user = User::find_by_id(user_id)
860 .one(db)
861 .await?
862 .ok_or_else(|| MetaError::catalog_id_not_found("user", user_id))?;
863 let mut user_info: PbUserInfo = user.into();
864 user_info.grant_privileges = get_user_privilege(user_id, db).await?;
865 user_infos.push(user_info);
866 }
867 Ok(user_infos)
868}
869
870pub async fn get_object_owner<C>(object_id: ObjectId, db: &C) -> MetaResult<UserId>
872where
873 C: ConnectionTrait,
874{
875 let obj_owner: UserId = Object::find_by_id(object_id)
876 .select_only()
877 .column(object::Column::OwnerId)
878 .into_tuple()
879 .one(db)
880 .await?
881 .ok_or_else(|| MetaError::catalog_id_not_found("object", object_id))?;
882 Ok(obj_owner)
883}
884
885pub fn construct_privilege_dependency_query(ids: Vec<PrivilegeId>) -> WithQuery {
910 let cte_alias = Alias::new("granted_privilege_ids");
911 let cte_return_privilege_alias = Alias::new("id");
912 let cte_return_user_alias = Alias::new("user_id");
913
914 let mut base_query = SelectStatement::new()
915 .columns([user_privilege::Column::Id, user_privilege::Column::UserId])
916 .from(UserPrivilege)
917 .and_where(user_privilege::Column::Id.is_in(ids))
918 .to_owned();
919
920 let cte_referencing = Query::select()
921 .columns([
922 (UserPrivilege, user_privilege::Column::Id),
923 (UserPrivilege, user_privilege::Column::UserId),
924 ])
925 .from(UserPrivilege)
926 .inner_join(
927 cte_alias.clone(),
928 Expr::col((cte_alias.clone(), cte_return_privilege_alias.clone()))
929 .equals(user_privilege::Column::DependentId),
930 )
931 .to_owned();
932
933 let mut common_table_expr = CommonTableExpression::new();
934 common_table_expr
935 .query(base_query.union(UnionType::All, cte_referencing).to_owned())
936 .columns([
937 cte_return_privilege_alias.clone(),
938 cte_return_user_alias.clone(),
939 ])
940 .table_name(cte_alias.clone());
941
942 SelectStatement::new()
943 .columns([cte_return_privilege_alias, cte_return_user_alias])
944 .from(cte_alias)
945 .to_owned()
946 .with(
947 WithClause::new()
948 .recursive(true)
949 .cte(common_table_expr)
950 .to_owned(),
951 )
952}
953
954pub async fn get_internal_tables_by_id<C>(job_id: JobId, db: &C) -> MetaResult<Vec<TableId>>
955where
956 C: ConnectionTrait,
957{
958 let table_ids: Vec<TableId> = Table::find()
959 .select_only()
960 .column(table::Column::TableId)
961 .filter(
962 table::Column::TableType
963 .eq(TableType::Internal)
964 .and(table::Column::BelongsToJobId.eq(job_id)),
965 )
966 .into_tuple()
967 .all(db)
968 .await?;
969 Ok(table_ids)
970}
971
972pub async fn get_index_state_tables_by_table_id<C>(
973 table_id: TableId,
974 db: &C,
975) -> MetaResult<Vec<TableId>>
976where
977 C: ConnectionTrait,
978{
979 let mut index_table_ids: Vec<TableId> = Index::find()
980 .select_only()
981 .column(index::Column::IndexTableId)
982 .filter(index::Column::PrimaryTableId.eq(table_id))
983 .into_tuple()
984 .all(db)
985 .await?;
986
987 if !index_table_ids.is_empty() {
988 let internal_table_ids: Vec<TableId> = Table::find()
989 .select_only()
990 .column(table::Column::TableId)
991 .filter(
992 table::Column::TableType
993 .eq(TableType::Internal)
994 .and(table::Column::BelongsToJobId.is_in(index_table_ids.clone())),
995 )
996 .into_tuple()
997 .all(db)
998 .await?;
999
1000 index_table_ids.extend(internal_table_ids.into_iter());
1001 }
1002
1003 Ok(index_table_ids)
1004}
1005
1006pub async fn get_iceberg_related_object_ids<C>(
1008 object_id: ObjectId,
1009 db: &C,
1010) -> MetaResult<Vec<ObjectId>>
1011where
1012 C: ConnectionTrait,
1013{
1014 let object = Object::find_by_id(object_id)
1015 .one(db)
1016 .await?
1017 .ok_or_else(|| MetaError::catalog_id_not_found("object", object_id))?;
1018 if object.obj_type != ObjectType::Table {
1019 return Ok(vec![]);
1020 }
1021
1022 let table = Table::find_by_id(object_id.as_table_id())
1023 .one(db)
1024 .await?
1025 .ok_or_else(|| MetaError::catalog_id_not_found("table", object_id))?;
1026 if !matches!(table.engine, Some(table::Engine::Iceberg)) {
1027 return Ok(vec![]);
1028 }
1029
1030 let database_id = object.database_id.unwrap();
1031 let schema_id = object.schema_id.unwrap();
1032
1033 let mut related_objects = vec![];
1034
1035 let iceberg_sink_name = format!("{}{}", ICEBERG_SINK_PREFIX, table.name);
1036 let iceberg_sink_id = Sink::find()
1037 .inner_join(Object)
1038 .select_only()
1039 .column(sink::Column::SinkId)
1040 .filter(
1041 object::Column::DatabaseId
1042 .eq(database_id)
1043 .and(object::Column::SchemaId.eq(schema_id))
1044 .and(sink::Column::Name.eq(&iceberg_sink_name)),
1045 )
1046 .into_tuple::<SinkId>()
1047 .one(db)
1048 .await?;
1049 if let Some(sink_id) = iceberg_sink_id {
1050 related_objects.push(sink_id.as_object_id());
1051 let sink_internal_tables = get_internal_tables_by_id(sink_id.as_job_id(), db).await?;
1052 related_objects.extend(
1053 sink_internal_tables
1054 .into_iter()
1055 .map(|tid| tid.as_object_id()),
1056 );
1057 } else {
1058 warn!(
1059 "iceberg table {} missing sink {}",
1060 table.name, iceberg_sink_name
1061 );
1062 }
1063
1064 let iceberg_source_name = format!("{}{}", ICEBERG_SOURCE_PREFIX, table.name);
1065 let iceberg_source_id = Source::find()
1066 .inner_join(Object)
1067 .select_only()
1068 .column(source::Column::SourceId)
1069 .filter(
1070 object::Column::DatabaseId
1071 .eq(database_id)
1072 .and(object::Column::SchemaId.eq(schema_id))
1073 .and(source::Column::Name.eq(&iceberg_source_name)),
1074 )
1075 .into_tuple::<SourceId>()
1076 .one(db)
1077 .await?;
1078 if let Some(source_id) = iceberg_source_id {
1079 related_objects.push(source_id.as_object_id());
1080 } else {
1081 warn!(
1082 "iceberg table {} missing source {}",
1083 table.name, iceberg_source_name
1084 );
1085 }
1086
1087 Ok(related_objects)
1088}
1089
1090#[derive(Clone, DerivePartialModel, FromQueryResult)]
1091#[sea_orm(entity = "UserPrivilege")]
1092pub struct PartialUserPrivilege {
1093 pub id: PrivilegeId,
1094 pub user_id: UserId,
1095}
1096
1097pub async fn get_referring_privileges_cascade<C>(
1098 ids: Vec<PrivilegeId>,
1099 db: &C,
1100) -> MetaResult<Vec<PartialUserPrivilege>>
1101where
1102 C: ConnectionTrait,
1103{
1104 let query = construct_privilege_dependency_query(ids);
1105 let (sql, values) = query.build_any(&*db.get_database_backend().get_query_builder());
1106 let privileges = PartialUserPrivilege::find_by_statement(Statement::from_sql_and_values(
1107 db.get_database_backend(),
1108 sql,
1109 values,
1110 ))
1111 .all(db)
1112 .await?;
1113
1114 Ok(privileges)
1115}
1116
1117pub async fn ensure_privileges_not_referred<C>(ids: Vec<PrivilegeId>, db: &C) -> MetaResult<()>
1119where
1120 C: ConnectionTrait,
1121{
1122 let count = UserPrivilege::find()
1123 .filter(user_privilege::Column::DependentId.is_in(ids))
1124 .count(db)
1125 .await?;
1126 if count != 0 {
1127 return Err(MetaError::permission_denied(format!(
1128 "privileges granted to {} other ones.",
1129 count
1130 )));
1131 }
1132 Ok(())
1133}
1134
1135pub async fn get_user_privilege<C>(user_id: UserId, db: &C) -> MetaResult<Vec<PbGrantPrivilege>>
1137where
1138 C: ConnectionTrait,
1139{
1140 let user_privileges = UserPrivilege::find()
1141 .find_also_related(Object)
1142 .filter(user_privilege::Column::UserId.eq(user_id))
1143 .all(db)
1144 .await?;
1145 Ok(user_privileges
1146 .into_iter()
1147 .map(|(privilege, object)| {
1148 let object = object.unwrap();
1149 let oid = object.oid.as_raw_id();
1150 let obj = match object.obj_type {
1151 ObjectType::Database => PbGrantObject::DatabaseId(oid),
1152 ObjectType::Schema => PbGrantObject::SchemaId(oid),
1153 ObjectType::Table | ObjectType::Index => PbGrantObject::TableId(oid),
1154 ObjectType::Source => PbGrantObject::SourceId(oid),
1155 ObjectType::Sink => PbGrantObject::SinkId(oid),
1156 ObjectType::View => PbGrantObject::ViewId(oid),
1157 ObjectType::Function => PbGrantObject::FunctionId(oid),
1158 ObjectType::Connection => PbGrantObject::ConnectionId(oid),
1159 ObjectType::Subscription => PbGrantObject::SubscriptionId(oid),
1160 ObjectType::Secret => PbGrantObject::SecretId(oid),
1161 };
1162 PbGrantPrivilege {
1163 action_with_opts: vec![PbActionWithGrantOption {
1164 action: PbAction::from(privilege.action) as _,
1165 with_grant_option: privilege.with_grant_option,
1166 granted_by: privilege.granted_by as _,
1167 }],
1168 object: Some(obj),
1169 }
1170 })
1171 .collect())
1172}
1173
1174pub async fn get_table_columns(
1175 txn: &impl ConnectionTrait,
1176 id: TableId,
1177) -> MetaResult<ColumnCatalogArray> {
1178 let columns = Table::find_by_id(id)
1179 .select_only()
1180 .columns([table::Column::Columns])
1181 .into_tuple::<ColumnCatalogArray>()
1182 .one(txn)
1183 .await?
1184 .ok_or_else(|| MetaError::catalog_id_not_found("table", id))?;
1185 Ok(columns)
1186}
1187
1188pub async fn grant_default_privileges_automatically<C>(
1191 db: &C,
1192 object_id: impl Into<ObjectId>,
1193) -> MetaResult<Vec<PbUserInfo>>
1194where
1195 C: ConnectionTrait,
1196{
1197 let object_id = object_id.into();
1198 let object = Object::find_by_id(object_id)
1199 .one(db)
1200 .await?
1201 .ok_or_else(|| MetaError::catalog_id_not_found("object", object_id))?;
1202 assert_ne!(object.obj_type, ObjectType::Database);
1203
1204 let for_mview_filter = if object.obj_type == ObjectType::Table {
1205 let table_type = Table::find_by_id(object_id.as_table_id())
1206 .select_only()
1207 .column(table::Column::TableType)
1208 .into_tuple::<TableType>()
1209 .one(db)
1210 .await?
1211 .ok_or_else(|| MetaError::catalog_id_not_found("table", object_id))?;
1212 user_default_privilege::Column::ForMaterializedView
1213 .eq(table_type == TableType::MaterializedView)
1214 } else {
1215 user_default_privilege::Column::ForMaterializedView.eq(false)
1216 };
1217 let schema_filter = if let Some(schema_id) = &object.schema_id {
1218 user_default_privilege::Column::SchemaId.eq(*schema_id)
1219 } else {
1220 user_default_privilege::Column::SchemaId.is_null()
1221 };
1222
1223 let default_privileges: Vec<(UserId, UserId, Action, bool)> = UserDefaultPrivilege::find()
1224 .select_only()
1225 .columns([
1226 user_default_privilege::Column::Grantee,
1227 user_default_privilege::Column::GrantedBy,
1228 user_default_privilege::Column::Action,
1229 user_default_privilege::Column::WithGrantOption,
1230 ])
1231 .filter(
1232 user_default_privilege::Column::DatabaseId
1233 .eq(object.database_id.unwrap())
1234 .and(schema_filter)
1235 .and(user_default_privilege::Column::UserId.eq(object.owner_id))
1236 .and(user_default_privilege::Column::ObjectType.eq(object.obj_type))
1237 .and(for_mview_filter),
1238 )
1239 .into_tuple()
1240 .all(db)
1241 .await?;
1242 if default_privileges.is_empty() {
1243 return Ok(vec![]);
1244 }
1245
1246 let updated_user_ids = default_privileges
1247 .iter()
1248 .map(|(grantee, _, _, _)| *grantee)
1249 .collect::<HashSet<_>>();
1250
1251 for (grantee, granted_by, action, with_grant_option) in default_privileges {
1252 UserPrivilege::insert(user_privilege::ActiveModel {
1253 user_id: Set(grantee),
1254 oid: Set(object_id),
1255 granted_by: Set(granted_by),
1256 action: Set(action),
1257 with_grant_option: Set(with_grant_option),
1258 ..Default::default()
1259 })
1260 .exec(db)
1261 .await?;
1262 if action == Action::Select {
1263 let internal_table_ids = get_internal_tables_by_id(object_id.as_job_id(), db).await?;
1265 if !internal_table_ids.is_empty() {
1266 for internal_table_id in &internal_table_ids {
1267 UserPrivilege::insert(user_privilege::ActiveModel {
1268 user_id: Set(grantee),
1269 oid: Set(internal_table_id.as_object_id()),
1270 granted_by: Set(granted_by),
1271 action: Set(Action::Select),
1272 with_grant_option: Set(with_grant_option),
1273 ..Default::default()
1274 })
1275 .exec(db)
1276 .await?;
1277 }
1278 }
1279
1280 let iceberg_privilege_object_ids =
1282 get_iceberg_related_object_ids(object_id, db).await?;
1283 if !iceberg_privilege_object_ids.is_empty() {
1284 for iceberg_object_id in &iceberg_privilege_object_ids {
1285 UserPrivilege::insert(user_privilege::ActiveModel {
1286 user_id: Set(grantee),
1287 oid: Set(*iceberg_object_id),
1288 granted_by: Set(granted_by),
1289 action: Set(action),
1290 with_grant_option: Set(with_grant_option),
1291 ..Default::default()
1292 })
1293 .exec(db)
1294 .await?;
1295 }
1296 }
1297 }
1298 }
1299
1300 let updated_user_infos = list_user_info_by_ids(updated_user_ids, db).await?;
1301 Ok(updated_user_infos)
1302}
1303
1304pub fn extract_grant_obj_id(object: &PbGrantObject) -> ObjectId {
1306 match object {
1307 PbGrantObject::DatabaseId(id)
1308 | PbGrantObject::SchemaId(id)
1309 | PbGrantObject::TableId(id)
1310 | PbGrantObject::SourceId(id)
1311 | PbGrantObject::SinkId(id)
1312 | PbGrantObject::ViewId(id)
1313 | PbGrantObject::FunctionId(id)
1314 | PbGrantObject::SubscriptionId(id)
1315 | PbGrantObject::ConnectionId(id)
1316 | PbGrantObject::SecretId(id) => (*id).into(),
1317 }
1318}
1319
1320pub async fn insert_fragment_relations(
1321 db: &impl ConnectionTrait,
1322 downstream_fragment_relations: &FragmentDownstreamRelation,
1323) -> MetaResult<()> {
1324 for (upstream_fragment_id, downstreams) in downstream_fragment_relations {
1325 for downstream in downstreams {
1326 let relation = fragment_relation::Model {
1327 source_fragment_id: *upstream_fragment_id as _,
1328 target_fragment_id: downstream.downstream_fragment_id as _,
1329 dispatcher_type: downstream.dispatcher_type,
1330 dist_key_indices: downstream
1331 .dist_key_indices
1332 .iter()
1333 .map(|idx| *idx as i32)
1334 .collect_vec()
1335 .into(),
1336 output_indices: downstream
1337 .output_mapping
1338 .indices
1339 .iter()
1340 .map(|idx| *idx as i32)
1341 .collect_vec()
1342 .into(),
1343 output_type_mapping: Some(downstream.output_mapping.types.clone().into()),
1344 };
1345 FragmentRelation::insert(relation.into_active_model())
1346 .exec(db)
1347 .await?;
1348 }
1349 }
1350 Ok(())
1351}
1352
1353pub fn compose_dispatchers(
1354 source_fragment_distribution: DistributionType,
1355 source_fragment_actors: &HashMap<crate::model::ActorId, Option<Bitmap>>,
1356 target_fragment_id: crate::model::FragmentId,
1357 target_fragment_distribution: DistributionType,
1358 target_fragment_actors: &HashMap<crate::model::ActorId, Option<Bitmap>>,
1359 dispatcher_type: DispatcherType,
1360 dist_key_indices: Vec<u32>,
1361 output_mapping: PbDispatchOutputMapping,
1362) -> HashMap<crate::model::ActorId, PbDispatcher> {
1363 match dispatcher_type {
1364 DispatcherType::Hash => {
1365 let dispatcher = PbDispatcher {
1366 r#type: PbDispatcherType::from(dispatcher_type) as _,
1367 dist_key_indices,
1368 output_mapping: output_mapping.into(),
1369 hash_mapping: Some(
1370 ActorMapping::from_bitmaps(
1371 &target_fragment_actors
1372 .iter()
1373 .map(|(actor_id, bitmap)| {
1374 (
1375 *actor_id as _,
1376 bitmap
1377 .clone()
1378 .expect("downstream hash dispatch must have distribution"),
1379 )
1380 })
1381 .collect(),
1382 )
1383 .to_protobuf(),
1384 ),
1385 dispatcher_id: target_fragment_id.as_raw_id() as _,
1386 downstream_actor_id: target_fragment_actors.keys().copied().collect(),
1387 };
1388 source_fragment_actors
1389 .keys()
1390 .map(|source_actor_id| (*source_actor_id, dispatcher.clone()))
1391 .collect()
1392 }
1393 DispatcherType::Broadcast | DispatcherType::Simple => {
1394 let dispatcher = PbDispatcher {
1395 r#type: PbDispatcherType::from(dispatcher_type) as _,
1396 dist_key_indices,
1397 output_mapping: output_mapping.into(),
1398 hash_mapping: None,
1399 dispatcher_id: target_fragment_id.as_raw_id() as _,
1400 downstream_actor_id: target_fragment_actors.keys().copied().collect(),
1401 };
1402 source_fragment_actors
1403 .keys()
1404 .map(|source_actor_id| (*source_actor_id, dispatcher.clone()))
1405 .collect()
1406 }
1407 DispatcherType::NoShuffle => resolve_no_shuffle_actor_dispatcher(
1408 source_fragment_distribution,
1409 source_fragment_actors,
1410 target_fragment_distribution,
1411 target_fragment_actors,
1412 )
1413 .into_iter()
1414 .map(|(upstream_actor_id, downstream_actor_id)| {
1415 (
1416 upstream_actor_id,
1417 PbDispatcher {
1418 r#type: PbDispatcherType::NoShuffle as _,
1419 dist_key_indices: dist_key_indices.clone(),
1420 output_mapping: output_mapping.clone().into(),
1421 hash_mapping: None,
1422 dispatcher_id: target_fragment_id.as_raw_id() as _,
1423 downstream_actor_id: vec![downstream_actor_id],
1424 },
1425 )
1426 })
1427 .collect(),
1428 }
1429}
1430
1431pub fn resolve_no_shuffle_actor_dispatcher(
1433 source_fragment_distribution: DistributionType,
1434 source_fragment_actors: &HashMap<crate::model::ActorId, Option<Bitmap>>,
1435 target_fragment_distribution: DistributionType,
1436 target_fragment_actors: &HashMap<crate::model::ActorId, Option<Bitmap>>,
1437) -> Vec<(crate::model::ActorId, crate::model::ActorId)> {
1438 assert_eq!(source_fragment_distribution, target_fragment_distribution);
1439 assert_eq!(
1440 source_fragment_actors.len(),
1441 target_fragment_actors.len(),
1442 "no-shuffle should have equal upstream downstream actor count: {:?} {:?}",
1443 source_fragment_actors,
1444 target_fragment_actors
1445 );
1446 match source_fragment_distribution {
1447 DistributionType::Single => {
1448 let assert_singleton = |bitmap: &Option<Bitmap>| {
1449 assert!(
1450 bitmap.as_ref().map(|bitmap| bitmap.all()).unwrap_or(true),
1451 "not singleton: {:?}",
1452 bitmap
1453 );
1454 };
1455 assert_eq!(
1456 source_fragment_actors.len(),
1457 1,
1458 "singleton distribution actor count not 1: {:?}",
1459 source_fragment_distribution
1460 );
1461 assert_eq!(
1462 target_fragment_actors.len(),
1463 1,
1464 "singleton distribution actor count not 1: {:?}",
1465 target_fragment_distribution
1466 );
1467 let (source_actor_id, bitmap) = source_fragment_actors.iter().next().unwrap();
1468 assert_singleton(bitmap);
1469 let (target_actor_id, bitmap) = target_fragment_actors.iter().next().unwrap();
1470 assert_singleton(bitmap);
1471 vec![(*source_actor_id, *target_actor_id)]
1472 }
1473 DistributionType::Hash => {
1474 let mut target_fragment_actor_index: HashMap<_, _> = target_fragment_actors
1475 .iter()
1476 .map(|(actor_id, bitmap)| {
1477 let bitmap = bitmap
1478 .as_ref()
1479 .expect("hash distribution should have bitmap");
1480 let first_vnode = bitmap.iter_vnodes().next().expect("non-empty bitmap");
1481 (first_vnode, (*actor_id, bitmap))
1482 })
1483 .collect();
1484 source_fragment_actors
1485 .iter()
1486 .map(|(source_actor_id, bitmap)| {
1487 let bitmap = bitmap
1488 .as_ref()
1489 .expect("hash distribution should have bitmap");
1490 let first_vnode = bitmap.iter_vnodes().next().expect("non-empty bitmap");
1491 let (target_actor_id, target_bitmap) =
1492 target_fragment_actor_index.remove(&first_vnode).unwrap_or_else(|| {
1493 panic!(
1494 "cannot find matched target actor: {} {:?} {:?} {:?}",
1495 source_actor_id,
1496 first_vnode,
1497 source_fragment_actors,
1498 target_fragment_actors
1499 );
1500 });
1501 assert_eq!(
1502 bitmap,
1503 target_bitmap,
1504 "cannot find matched target actor due to bitmap mismatch: {} {:?} {:?} {:?}",
1505 source_actor_id,
1506 first_vnode,
1507 source_fragment_actors,
1508 target_fragment_actors
1509 );
1510 (*source_actor_id, target_actor_id)
1511 }).collect()
1512 }
1513 }
1514}
1515
1516pub fn rebuild_fragment_mapping(fragment: &SharedFragmentInfo) -> PbFragmentWorkerSlotMapping {
1517 let fragment_worker_slot_mapping = match fragment.distribution_type {
1518 DistributionType::Single => {
1519 let actor = fragment.actors.values().exactly_one().unwrap();
1520 WorkerSlotMapping::new_single(WorkerSlotId::new(actor.worker_id as _, 0))
1521 }
1522 DistributionType::Hash => {
1523 let actor_bitmaps: HashMap<_, _> = fragment
1524 .actors
1525 .iter()
1526 .map(|(actor_id, actor_info)| {
1527 let vnode_bitmap = actor_info
1528 .vnode_bitmap
1529 .as_ref()
1530 .cloned()
1531 .expect("actor bitmap shouldn't be none in hash fragment");
1532
1533 (*actor_id as hash::ActorId, vnode_bitmap)
1534 })
1535 .collect();
1536
1537 let actor_mapping = ActorMapping::from_bitmaps(&actor_bitmaps);
1538
1539 let actor_locations = fragment
1540 .actors
1541 .iter()
1542 .map(|(actor_id, actor_info)| (*actor_id as hash::ActorId, actor_info.worker_id))
1543 .collect();
1544
1545 actor_mapping.to_worker_slot(&actor_locations)
1546 }
1547 };
1548
1549 PbFragmentWorkerSlotMapping {
1550 fragment_id: fragment.fragment_id,
1551 mapping: Some(fragment_worker_slot_mapping.to_protobuf()),
1552 }
1553}
1554
1555pub async fn get_fragments_for_jobs<C>(
1561 db: &C,
1562 actor_info: &SharedActorInfos,
1563 streaming_jobs: Vec<JobId>,
1564) -> MetaResult<(
1565 HashMap<SourceId, BTreeSet<FragmentId>>,
1566 HashSet<FragmentId>,
1567 HashSet<ActorId>,
1568 HashSet<FragmentId>,
1569)>
1570where
1571 C: ConnectionTrait,
1572{
1573 if streaming_jobs.is_empty() {
1574 return Ok((
1575 HashMap::default(),
1576 HashSet::default(),
1577 HashSet::default(),
1578 HashSet::default(),
1579 ));
1580 }
1581
1582 let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find()
1583 .select_only()
1584 .columns([
1585 fragment::Column::FragmentId,
1586 fragment::Column::FragmentTypeMask,
1587 fragment::Column::StreamNode,
1588 ])
1589 .filter(fragment::Column::JobId.is_in(streaming_jobs))
1590 .into_tuple()
1591 .all(db)
1592 .await?;
1593
1594 let fragment_ids: HashSet<_> = fragments
1595 .iter()
1596 .map(|(fragment_id, _, _)| *fragment_id)
1597 .collect();
1598
1599 let actors = {
1600 let guard = actor_info.read_guard();
1601 fragment_ids
1602 .iter()
1603 .flat_map(|id| guard.get_fragment(*id as _))
1604 .flat_map(|f| f.actors.keys().cloned().map(|id| id as _))
1605 .collect::<HashSet<_>>()
1606 };
1607
1608 let mut source_fragment_ids: HashMap<SourceId, BTreeSet<FragmentId>> = HashMap::new();
1609 let mut sink_fragment_ids: HashSet<FragmentId> = HashSet::new();
1610 for (fragment_id, mask, stream_node) in fragments {
1611 if FragmentTypeMask::from(mask).contains(FragmentTypeFlag::Source)
1612 && let Some(source_id) = stream_node.to_protobuf().find_stream_source()
1613 {
1614 source_fragment_ids
1615 .entry(source_id)
1616 .or_default()
1617 .insert(fragment_id);
1618 }
1619 if FragmentTypeMask::from(mask).contains(FragmentTypeFlag::Sink) {
1620 sink_fragment_ids.insert(fragment_id);
1621 }
1622 }
1623
1624 Ok((
1625 source_fragment_ids,
1626 sink_fragment_ids,
1627 actors.into_iter().collect(),
1628 fragment_ids,
1629 ))
1630}
1631
1632pub(crate) fn build_object_group_for_delete(
1637 partial_objects: Vec<PartialObject>,
1638) -> NotificationInfo {
1639 let mut objects = vec![];
1640 for obj in partial_objects {
1641 match obj.obj_type {
1642 ObjectType::Database => objects.push(PbObject {
1643 object_info: Some(PbObjectInfo::Database(PbDatabase {
1644 id: obj.oid.as_database_id(),
1645 ..Default::default()
1646 })),
1647 }),
1648 ObjectType::Schema => objects.push(PbObject {
1649 object_info: Some(PbObjectInfo::Schema(PbSchema {
1650 id: obj.oid.as_schema_id(),
1651 database_id: obj.database_id.unwrap(),
1652 ..Default::default()
1653 })),
1654 }),
1655 ObjectType::Table => objects.push(PbObject {
1656 object_info: Some(PbObjectInfo::Table(PbTable {
1657 id: obj.oid.as_table_id(),
1658 schema_id: obj.schema_id.unwrap(),
1659 database_id: obj.database_id.unwrap(),
1660 ..Default::default()
1661 })),
1662 }),
1663 ObjectType::Source => objects.push(PbObject {
1664 object_info: Some(PbObjectInfo::Source(PbSource {
1665 id: obj.oid.as_source_id(),
1666 schema_id: obj.schema_id.unwrap(),
1667 database_id: obj.database_id.unwrap(),
1668 ..Default::default()
1669 })),
1670 }),
1671 ObjectType::Sink => objects.push(PbObject {
1672 object_info: Some(PbObjectInfo::Sink(PbSink {
1673 id: obj.oid.as_sink_id(),
1674 schema_id: obj.schema_id.unwrap(),
1675 database_id: obj.database_id.unwrap(),
1676 ..Default::default()
1677 })),
1678 }),
1679 ObjectType::Subscription => objects.push(PbObject {
1680 object_info: Some(PbObjectInfo::Subscription(PbSubscription {
1681 id: obj.oid.as_subscription_id(),
1682 schema_id: obj.schema_id.unwrap(),
1683 database_id: obj.database_id.unwrap(),
1684 ..Default::default()
1685 })),
1686 }),
1687 ObjectType::View => objects.push(PbObject {
1688 object_info: Some(PbObjectInfo::View(PbView {
1689 id: obj.oid.as_view_id(),
1690 schema_id: obj.schema_id.unwrap(),
1691 database_id: obj.database_id.unwrap(),
1692 ..Default::default()
1693 })),
1694 }),
1695 ObjectType::Index => {
1696 objects.push(PbObject {
1697 object_info: Some(PbObjectInfo::Index(PbIndex {
1698 id: obj.oid.as_index_id(),
1699 schema_id: obj.schema_id.unwrap(),
1700 database_id: obj.database_id.unwrap(),
1701 ..Default::default()
1702 })),
1703 });
1704 objects.push(PbObject {
1705 object_info: Some(PbObjectInfo::Table(PbTable {
1706 id: obj.oid.as_table_id(),
1707 schema_id: obj.schema_id.unwrap(),
1708 database_id: obj.database_id.unwrap(),
1709 ..Default::default()
1710 })),
1711 });
1712 }
1713 ObjectType::Function => objects.push(PbObject {
1714 object_info: Some(PbObjectInfo::Function(PbFunction {
1715 id: obj.oid.as_function_id(),
1716 schema_id: obj.schema_id.unwrap(),
1717 database_id: obj.database_id.unwrap(),
1718 ..Default::default()
1719 })),
1720 }),
1721 ObjectType::Connection => objects.push(PbObject {
1722 object_info: Some(PbObjectInfo::Connection(PbConnection {
1723 id: obj.oid.as_connection_id(),
1724 schema_id: obj.schema_id.unwrap(),
1725 database_id: obj.database_id.unwrap(),
1726 ..Default::default()
1727 })),
1728 }),
1729 ObjectType::Secret => objects.push(PbObject {
1730 object_info: Some(PbObjectInfo::Secret(PbSecret {
1731 id: obj.oid.as_secret_id(),
1732 schema_id: obj.schema_id.unwrap(),
1733 database_id: obj.database_id.unwrap(),
1734 ..Default::default()
1735 })),
1736 }),
1737 }
1738 }
1739 NotificationInfo::ObjectGroup(PbObjectGroup { objects })
1740}
1741
1742pub fn extract_external_table_name_from_definition(table_definition: &str) -> Option<String> {
1743 let [mut definition]: [_; 1] = Parser::parse_sql(table_definition)
1744 .context("unable to parse table definition")
1745 .inspect_err(|e| {
1746 tracing::error!(
1747 target: "auto_schema_change",
1748 error = %e.as_report(),
1749 "failed to parse table definition")
1750 })
1751 .unwrap()
1752 .try_into()
1753 .unwrap();
1754 if let SqlStatement::CreateTable { cdc_table_info, .. } = &mut definition {
1755 cdc_table_info
1756 .clone()
1757 .map(|cdc_table_info| cdc_table_info.external_table_name)
1758 } else {
1759 None
1760 }
1761}
1762
1763pub async fn rename_relation(
1766 txn: &DatabaseTransaction,
1767 object_type: ObjectType,
1768 object_id: ObjectId,
1769 object_name: &str,
1770) -> MetaResult<(Vec<PbObject>, String)> {
1771 use sea_orm::ActiveModelTrait;
1772
1773 use crate::controller::rename::alter_relation_rename;
1774
1775 let mut to_update_relations = vec![];
1776 macro_rules! rename_relation {
1778 ($entity:ident, $table:ident, $identity:ident, $object_id:expr) => {{
1779 let (mut relation, obj) = $entity::find_by_id($object_id)
1780 .find_also_related(Object)
1781 .one(txn)
1782 .await?
1783 .unwrap();
1784 let obj = obj.unwrap();
1785 let old_name = relation.name.clone();
1786 relation.name = object_name.into();
1787 if obj.obj_type != ObjectType::View {
1788 relation.definition = alter_relation_rename(&relation.definition, object_name);
1789 }
1790 let active_model = $table::ActiveModel {
1791 $identity: Set(relation.$identity),
1792 name: Set(object_name.into()),
1793 definition: Set(relation.definition.clone()),
1794 ..Default::default()
1795 };
1796 active_model.update(txn).await?;
1797 to_update_relations.push(PbObject {
1798 object_info: Some(PbObjectInfo::$entity(ObjectModel(relation, obj).into())),
1799 });
1800 old_name
1801 }};
1802 }
1803 let old_name = match object_type {
1805 ObjectType::Table => {
1806 let associated_source_id: Option<SourceId> = Source::find()
1807 .select_only()
1808 .column(source::Column::SourceId)
1809 .filter(source::Column::OptionalAssociatedTableId.eq(object_id))
1810 .into_tuple()
1811 .one(txn)
1812 .await?;
1813 if let Some(source_id) = associated_source_id {
1814 rename_relation!(Source, source, source_id, source_id);
1815 }
1816 rename_relation!(Table, table, table_id, object_id.as_table_id())
1817 }
1818 ObjectType::Source => {
1819 rename_relation!(Source, source, source_id, object_id.as_source_id())
1820 }
1821 ObjectType::Sink => rename_relation!(Sink, sink, sink_id, object_id.as_sink_id()),
1822 ObjectType::Subscription => {
1823 rename_relation!(
1824 Subscription,
1825 subscription,
1826 subscription_id,
1827 object_id.as_subscription_id()
1828 )
1829 }
1830 ObjectType::View => rename_relation!(View, view, view_id, object_id.as_view_id()),
1831 ObjectType::Index => {
1832 let (mut index, obj) = Index::find_by_id(object_id.as_index_id())
1833 .find_also_related(Object)
1834 .one(txn)
1835 .await?
1836 .unwrap();
1837 index.name = object_name.into();
1838 let index_table_id = index.index_table_id;
1839 let old_name = rename_relation!(Table, table, table_id, index_table_id);
1840
1841 let active_model = index::ActiveModel {
1843 index_id: sea_orm::ActiveValue::Set(index.index_id),
1844 name: sea_orm::ActiveValue::Set(object_name.into()),
1845 ..Default::default()
1846 };
1847 active_model.update(txn).await?;
1848 to_update_relations.push(PbObject {
1849 object_info: Some(PbObjectInfo::Index(ObjectModel(index, obj.unwrap()).into())),
1850 });
1851 old_name
1852 }
1853 _ => unreachable!("only relation name can be altered."),
1854 };
1855
1856 Ok((to_update_relations, old_name))
1857}
1858
1859pub async fn get_database_resource_group<C>(txn: &C, database_id: DatabaseId) -> MetaResult<String>
1860where
1861 C: ConnectionTrait,
1862{
1863 let database_resource_group: Option<String> = Database::find_by_id(database_id)
1864 .select_only()
1865 .column(database::Column::ResourceGroup)
1866 .into_tuple()
1867 .one(txn)
1868 .await?
1869 .ok_or_else(|| MetaError::catalog_id_not_found("database", database_id))?;
1870
1871 Ok(database_resource_group.unwrap_or_else(|| DEFAULT_RESOURCE_GROUP.to_owned()))
1872}
1873
1874pub async fn get_existing_job_resource_group<C>(
1875 txn: &C,
1876 streaming_job_id: JobId,
1877) -> MetaResult<String>
1878where
1879 C: ConnectionTrait,
1880{
1881 let (job_specific_resource_group, database_resource_group): (Option<String>, Option<String>) =
1882 StreamingJob::find_by_id(streaming_job_id)
1883 .select_only()
1884 .join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
1885 .join(JoinType::InnerJoin, object::Relation::Database2.def())
1886 .column(streaming_job::Column::SpecificResourceGroup)
1887 .column(database::Column::ResourceGroup)
1888 .into_tuple()
1889 .one(txn)
1890 .await?
1891 .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", streaming_job_id))?;
1892
1893 Ok(job_specific_resource_group.unwrap_or_else(|| {
1894 database_resource_group.unwrap_or_else(|| DEFAULT_RESOURCE_GROUP.to_owned())
1895 }))
1896}
1897
1898pub fn filter_workers_by_resource_group(
1899 workers: &HashMap<WorkerId, WorkerNode>,
1900 resource_group: &str,
1901) -> BTreeSet<WorkerId> {
1902 workers
1903 .iter()
1904 .filter(|&(_, worker)| {
1905 worker
1906 .resource_group()
1907 .map(|node_label| node_label.as_str() == resource_group)
1908 .unwrap_or(false)
1909 })
1910 .map(|(id, _)| *id)
1911 .collect()
1912}
1913
1914pub async fn rename_relation_refer(
1917 txn: &DatabaseTransaction,
1918 object_type: ObjectType,
1919 object_id: ObjectId,
1920 object_name: &str,
1921 old_name: &str,
1922) -> MetaResult<Vec<PbObject>> {
1923 use sea_orm::ActiveModelTrait;
1924
1925 use crate::controller::rename::alter_relation_rename_refs;
1926
1927 let mut to_update_relations = vec![];
1928 macro_rules! rename_relation_ref {
1929 ($entity:ident, $table:ident, $identity:ident, $object_id:expr) => {{
1930 let (mut relation, obj) = $entity::find_by_id($object_id)
1931 .find_also_related(Object)
1932 .one(txn)
1933 .await?
1934 .unwrap();
1935 relation.definition =
1936 alter_relation_rename_refs(&relation.definition, old_name, object_name);
1937 let active_model = $table::ActiveModel {
1938 $identity: Set(relation.$identity),
1939 definition: Set(relation.definition.clone()),
1940 ..Default::default()
1941 };
1942 active_model.update(txn).await?;
1943 to_update_relations.push(PbObject {
1944 object_info: Some(PbObjectInfo::$entity(
1945 ObjectModel(relation, obj.unwrap()).into(),
1946 )),
1947 });
1948 }};
1949 }
1950 let mut objs = get_referring_objects(object_id, txn).await?;
1951 if object_type == ObjectType::Table {
1952 let incoming_sinks: Vec<SinkId> = Sink::find()
1953 .select_only()
1954 .column(sink::Column::SinkId)
1955 .filter(sink::Column::TargetTable.eq(object_id))
1956 .into_tuple()
1957 .all(txn)
1958 .await?;
1959
1960 objs.extend(incoming_sinks.into_iter().map(|id| PartialObject {
1961 oid: id.as_object_id(),
1962 obj_type: ObjectType::Sink,
1963 schema_id: None,
1964 database_id: None,
1965 }));
1966 }
1967
1968 for obj in objs {
1969 match obj.obj_type {
1970 ObjectType::Table => {
1971 rename_relation_ref!(Table, table, table_id, obj.oid.as_table_id())
1972 }
1973 ObjectType::Sink => {
1974 rename_relation_ref!(Sink, sink, sink_id, obj.oid.as_sink_id())
1975 }
1976 ObjectType::Subscription => {
1977 rename_relation_ref!(
1978 Subscription,
1979 subscription,
1980 subscription_id,
1981 obj.oid.as_subscription_id()
1982 )
1983 }
1984 ObjectType::View => rename_relation_ref!(View, view, view_id, obj.oid.as_view_id()),
1985 ObjectType::Index => {
1986 let index_table_id: Option<TableId> = Index::find_by_id(obj.oid.as_index_id())
1987 .select_only()
1988 .column(index::Column::IndexTableId)
1989 .into_tuple()
1990 .one(txn)
1991 .await?;
1992 rename_relation_ref!(Table, table, table_id, index_table_id.unwrap());
1993 }
1994 _ => {
1995 bail!(
1996 "only the table, sink, subscription, view and index will depend on other objects."
1997 )
1998 }
1999 }
2000 }
2001
2002 Ok(to_update_relations)
2003}
2004
2005pub async fn validate_subscription_deletion<C>(
2009 txn: &C,
2010 subscription_id: SubscriptionId,
2011) -> MetaResult<()>
2012where
2013 C: ConnectionTrait,
2014{
2015 let upstream_table_id: ObjectId = Subscription::find_by_id(subscription_id)
2016 .select_only()
2017 .column(subscription::Column::DependentTableId)
2018 .into_tuple()
2019 .one(txn)
2020 .await?
2021 .ok_or_else(|| MetaError::catalog_id_not_found("subscription", subscription_id))?;
2022
2023 let cnt = Subscription::find()
2024 .filter(subscription::Column::DependentTableId.eq(upstream_table_id))
2025 .count(txn)
2026 .await?;
2027 if cnt > 1 {
2028 return Ok(());
2031 }
2032
2033 let obj_alias = Alias::new("o1");
2035 let used_by_alias = Alias::new("o2");
2036 let count = ObjectDependency::find()
2037 .join_as(
2038 JoinType::InnerJoin,
2039 object_dependency::Relation::Object2.def(),
2040 obj_alias.clone(),
2041 )
2042 .join_as(
2043 JoinType::InnerJoin,
2044 object_dependency::Relation::Object1.def(),
2045 used_by_alias.clone(),
2046 )
2047 .filter(
2048 object_dependency::Column::Oid
2049 .eq(upstream_table_id)
2050 .and(object_dependency::Column::UsedBy.ne(subscription_id))
2051 .and(
2052 Expr::col((obj_alias, object::Column::DatabaseId))
2053 .ne(Expr::col((used_by_alias, object::Column::DatabaseId))),
2054 ),
2055 )
2056 .count(txn)
2057 .await?;
2058
2059 if count != 0 {
2060 return Err(MetaError::permission_denied(format!(
2061 "Referenced by {} cross-db objects.",
2062 count
2063 )));
2064 }
2065
2066 Ok(())
2067}
2068
2069pub async fn fetch_target_fragments<C>(
2070 txn: &C,
2071 src_fragment_id: impl IntoIterator<Item = FragmentId>,
2072) -> MetaResult<HashMap<FragmentId, Vec<FragmentId>>>
2073where
2074 C: ConnectionTrait,
2075{
2076 let source_target_fragments: Vec<(FragmentId, FragmentId)> = FragmentRelation::find()
2077 .select_only()
2078 .columns([
2079 fragment_relation::Column::SourceFragmentId,
2080 fragment_relation::Column::TargetFragmentId,
2081 ])
2082 .filter(fragment_relation::Column::SourceFragmentId.is_in(src_fragment_id))
2083 .into_tuple()
2084 .all(txn)
2085 .await?;
2086
2087 let source_target_fragments = source_target_fragments.into_iter().into_group_map();
2088
2089 Ok(source_target_fragments)
2090}
2091
2092pub async fn get_sink_fragment_by_ids<C>(
2093 txn: &C,
2094 sink_ids: Vec<SinkId>,
2095) -> MetaResult<HashMap<SinkId, FragmentId>>
2096where
2097 C: ConnectionTrait,
2098{
2099 let sink_num = sink_ids.len();
2100 let sink_fragment_ids: Vec<(SinkId, FragmentId)> = Fragment::find()
2101 .select_only()
2102 .columns([fragment::Column::JobId, fragment::Column::FragmentId])
2103 .filter(
2104 fragment::Column::JobId
2105 .is_in(sink_ids)
2106 .and(FragmentTypeMask::intersects(FragmentTypeFlag::Sink)),
2107 )
2108 .into_tuple()
2109 .all(txn)
2110 .await?;
2111
2112 if sink_fragment_ids.len() != sink_num {
2113 return Err(anyhow::anyhow!(
2114 "expected exactly one sink fragment for each sink, but got {} fragments for {} sinks",
2115 sink_fragment_ids.len(),
2116 sink_num
2117 )
2118 .into());
2119 }
2120
2121 Ok(sink_fragment_ids.into_iter().collect())
2122}
2123
2124pub async fn has_table_been_migrated<C>(txn: &C, table_id: TableId) -> MetaResult<bool>
2125where
2126 C: ConnectionTrait,
2127{
2128 let mview_fragment: Vec<i32> = Fragment::find()
2129 .select_only()
2130 .column(fragment::Column::FragmentTypeMask)
2131 .filter(
2132 fragment::Column::JobId
2133 .eq(table_id)
2134 .and(FragmentTypeMask::intersects(FragmentTypeFlag::Mview)),
2135 )
2136 .into_tuple()
2137 .all(txn)
2138 .await?;
2139
2140 let mview_fragment_len = mview_fragment.len();
2141 if mview_fragment_len != 1 {
2142 bail!(
2143 "expected exactly one mview fragment for table {}, found {}",
2144 table_id,
2145 mview_fragment_len
2146 );
2147 }
2148
2149 let mview_fragment = mview_fragment.into_iter().next().unwrap();
2150 let migrated =
2151 FragmentTypeMask::from(mview_fragment).contains(FragmentTypeFlag::UpstreamSinkUnion);
2152
2153 Ok(migrated)
2154}
2155
2156pub async fn try_get_iceberg_table_by_downstream_sink<C>(
2157 txn: &C,
2158 sink_id: SinkId,
2159) -> MetaResult<Option<TableId>>
2160where
2161 C: ConnectionTrait,
2162{
2163 let sink = Sink::find_by_id(sink_id).one(txn).await?;
2164 let Some(sink) = sink else {
2165 return Ok(None);
2166 };
2167
2168 if sink.name.starts_with(ICEBERG_SINK_PREFIX) {
2169 let object_ids: Vec<ObjectId> = ObjectDependency::find()
2170 .select_only()
2171 .column(object_dependency::Column::Oid)
2172 .filter(object_dependency::Column::UsedBy.eq(sink_id))
2173 .into_tuple()
2174 .all(txn)
2175 .await?;
2176 let mut iceberg_table_ids = vec![];
2177 for object_id in object_ids {
2178 let table_id = object_id.as_table_id();
2179 if let Some(table_engine) = Table::find_by_id(table_id)
2180 .select_only()
2181 .column(table::Column::Engine)
2182 .into_tuple::<table::Engine>()
2183 .one(txn)
2184 .await?
2185 && table_engine == table::Engine::Iceberg
2186 {
2187 iceberg_table_ids.push(table_id);
2188 }
2189 }
2190 if iceberg_table_ids.len() == 1 {
2191 return Ok(Some(iceberg_table_ids[0]));
2192 }
2193 }
2194 Ok(None)
2195}
2196
2197pub async fn check_if_belongs_to_iceberg_table<C>(txn: &C, job_id: JobId) -> MetaResult<bool>
2198where
2199 C: ConnectionTrait,
2200{
2201 if let Some(engine) = Table::find_by_id(job_id.as_mv_table_id())
2202 .select_only()
2203 .column(table::Column::Engine)
2204 .into_tuple::<table::Engine>()
2205 .one(txn)
2206 .await?
2207 && engine == table::Engine::Iceberg
2208 {
2209 return Ok(true);
2210 }
2211 if let Some(sink_name) = Sink::find_by_id(job_id.as_sink_id())
2212 .select_only()
2213 .column(sink::Column::Name)
2214 .into_tuple::<String>()
2215 .one(txn)
2216 .await?
2217 && sink_name.starts_with(ICEBERG_SINK_PREFIX)
2218 {
2219 return Ok(true);
2220 }
2221 Ok(false)
2222}
2223
2224pub async fn find_dirty_iceberg_table_jobs<C>(
2225 txn: &C,
2226 database_id: Option<DatabaseId>,
2227) -> MetaResult<Vec<PartialObject>>
2228where
2229 C: ConnectionTrait,
2230{
2231 let mut filter_condition = streaming_job::Column::JobStatus
2232 .ne(JobStatus::Created)
2233 .and(object::Column::ObjType.is_in([ObjectType::Table, ObjectType::Sink]))
2234 .and(streaming_job::Column::CreateType.eq(CreateType::Background));
2235 if let Some(database_id) = database_id {
2236 filter_condition = filter_condition.and(object::Column::DatabaseId.eq(database_id));
2237 }
2238 let creating_table_sink_jobs: Vec<PartialObject> = StreamingJob::find()
2239 .select_only()
2240 .columns([
2241 object::Column::Oid,
2242 object::Column::ObjType,
2243 object::Column::SchemaId,
2244 object::Column::DatabaseId,
2245 ])
2246 .join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
2247 .filter(filter_condition)
2248 .into_partial_model()
2249 .all(txn)
2250 .await?;
2251
2252 let mut dirty_iceberg_table_jobs = vec![];
2253 for job in creating_table_sink_jobs {
2254 if check_if_belongs_to_iceberg_table(txn, job.oid.as_job_id()).await? {
2255 tracing::info!("Found dirty iceberg job with id: {}", job.oid);
2256 dirty_iceberg_table_jobs.push(job);
2257 }
2258 }
2259
2260 Ok(dirty_iceberg_table_jobs)
2261}
2262
2263pub fn build_select_node_list(
2264 from: &[ColumnCatalog],
2265 to: &[ColumnCatalog],
2266) -> MetaResult<Vec<PbExprNode>> {
2267 let mut exprs = Vec::with_capacity(to.len());
2268 let idx_by_col_id = from
2269 .iter()
2270 .enumerate()
2271 .map(|(idx, col)| (col.column_desc.as_ref().unwrap().column_id, idx))
2272 .collect::<HashMap<_, _>>();
2273
2274 for to_col in to {
2275 let to_col = to_col.column_desc.as_ref().unwrap();
2276 let to_col_type_ref = to_col.column_type.as_ref().unwrap();
2277 let to_col_type = DataType::from(to_col_type_ref);
2278 if let Some(from_idx) = idx_by_col_id.get(&to_col.column_id) {
2279 let from_col_type = DataType::from(
2280 from[*from_idx]
2281 .column_desc
2282 .as_ref()
2283 .unwrap()
2284 .column_type
2285 .as_ref()
2286 .unwrap(),
2287 );
2288 if !to_col_type.equals_datatype(&from_col_type) {
2289 return Err(anyhow!(
2290 "Column type mismatch: {:?} != {:?}",
2291 from_col_type,
2292 to_col_type
2293 )
2294 .into());
2295 }
2296 exprs.push(PbExprNode {
2297 function_type: expr_node::Type::Unspecified.into(),
2298 return_type: Some(to_col_type_ref.clone()),
2299 rex_node: Some(expr_node::RexNode::InputRef(*from_idx as _)),
2300 });
2301 } else {
2302 let to_default_node =
2303 if let Some(GeneratedOrDefaultColumn::DefaultColumn(DefaultColumnDesc {
2304 expr,
2305 ..
2306 })) = &to_col.generated_or_default_column
2307 {
2308 expr.clone().unwrap()
2309 } else {
2310 let null = Datum::None.to_protobuf();
2311 PbExprNode {
2312 function_type: expr_node::Type::Unspecified.into(),
2313 return_type: Some(to_col_type_ref.clone()),
2314 rex_node: Some(expr_node::RexNode::Constant(null)),
2315 }
2316 };
2317 exprs.push(to_default_node);
2318 }
2319 }
2320
2321 Ok(exprs)
2322}
2323
2324#[derive(Clone, Debug, Default)]
2325pub struct StreamingJobExtraInfo {
2326 pub timezone: Option<String>,
2327 pub config_override: Arc<str>,
2328 pub job_definition: String,
2329}
2330
2331impl StreamingJobExtraInfo {
2332 pub fn stream_context(&self) -> StreamContext {
2333 StreamContext {
2334 timezone: self.timezone.clone(),
2335 config_override: self.config_override.clone(),
2336 }
2337 }
2338}
2339
2340pub async fn get_streaming_job_extra_info<C>(
2341 txn: &C,
2342 job_ids: Vec<JobId>,
2343) -> MetaResult<HashMap<JobId, StreamingJobExtraInfo>>
2344where
2345 C: ConnectionTrait,
2346{
2347 let pairs: Vec<(JobId, Option<String>, Option<String>)> = StreamingJob::find()
2348 .select_only()
2349 .columns([
2350 streaming_job::Column::JobId,
2351 streaming_job::Column::Timezone,
2352 streaming_job::Column::ConfigOverride,
2353 ])
2354 .filter(streaming_job::Column::JobId.is_in(job_ids.clone()))
2355 .into_tuple()
2356 .all(txn)
2357 .await?;
2358
2359 let job_ids = job_ids.into_iter().collect();
2360
2361 let mut definitions = resolve_streaming_job_definition(txn, &job_ids).await?;
2362
2363 let result = pairs
2364 .into_iter()
2365 .map(|(job_id, timezone, config_override)| {
2366 let job_definition = definitions.remove(&job_id).unwrap_or_default();
2367 (
2368 job_id,
2369 StreamingJobExtraInfo {
2370 timezone,
2371 config_override: config_override.unwrap_or_default().into(),
2372 job_definition,
2373 },
2374 )
2375 })
2376 .collect();
2377
2378 Ok(result)
2379}
2380
2381#[cfg(test)]
2382mod tests {
2383 use super::*;
2384
2385 #[test]
2386 fn test_extract_cdc_table_name() {
2387 let ddl1 = "CREATE TABLE t1 () FROM pg_source TABLE 'public.t1'";
2388 let ddl2 = "CREATE TABLE t2 (v1 int) FROM pg_source TABLE 'mydb.t2'";
2389 assert_eq!(
2390 extract_external_table_name_from_definition(ddl1),
2391 Some("public.t1".into())
2392 );
2393 assert_eq!(
2394 extract_external_table_name_from_definition(ddl2),
2395 Some("mydb.t2".into())
2396 );
2397 }
2398}