1use std::collections::{BTreeSet, HashMap, HashSet};
16use std::sync::Arc;
17
18use anyhow::{Context, anyhow};
19use itertools::Itertools;
20use risingwave_common::bitmap::Bitmap;
21use risingwave_common::catalog::{
22 FragmentTypeFlag, FragmentTypeMask, ICEBERG_SINK_PREFIX, ICEBERG_SOURCE_PREFIX,
23};
24use risingwave_common::hash::{ActorMapping, VnodeBitmapExt, WorkerSlotId, WorkerSlotMapping};
25use risingwave_common::id::{JobId, SubscriptionId};
26use risingwave_common::system_param::AdaptiveParallelismStrategy;
27use risingwave_common::system_param::adaptive_parallelism_strategy::parse_strategy;
28use risingwave_common::types::{DataType, Datum};
29use risingwave_common::util::value_encoding::DatumToProtoExt;
30use risingwave_common::util::worker_util::DEFAULT_RESOURCE_GROUP;
31use risingwave_common::{bail, hash};
32use risingwave_meta_model::fragment::DistributionType;
33use risingwave_meta_model::object::ObjectType;
34use risingwave_meta_model::prelude::*;
35use risingwave_meta_model::streaming_job::BackfillOrders;
36use risingwave_meta_model::table::TableType;
37use risingwave_meta_model::user_privilege::Action;
38use risingwave_meta_model::{
39 ActorId, ColumnCatalogArray, CreateType, DataTypeArray, DatabaseId, DispatcherType, FragmentId,
40 JobStatus, ObjectId, PrivilegeId, SchemaId, SinkId, SourceId, StreamNode, StreamSourceInfo,
41 TableId, TableIdArray, UserId, WorkerId, connection, database, fragment, fragment_relation,
42 function, index, object, object_dependency, schema, secret, sink, source, streaming_job,
43 subscription, table, user, user_default_privilege, user_privilege, view,
44};
45use risingwave_meta_model_migration::WithQuery;
46use risingwave_pb::catalog::{
47 PbConnection, PbDatabase, PbFunction, PbIndex, PbSchema, PbSecret, PbSink, PbSource,
48 PbSubscription, PbTable, PbView,
49};
50use risingwave_pb::common::WorkerNode;
51use risingwave_pb::expr::{PbExprNode, expr_node};
52use risingwave_pb::meta::object::PbObjectInfo;
53use risingwave_pb::meta::subscribe_response::Info as NotificationInfo;
54use risingwave_pb::meta::{PbFragmentWorkerSlotMapping, PbObject, PbObjectGroup};
55use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn;
56use risingwave_pb::plan_common::{ColumnCatalog, DefaultColumnDesc};
57use risingwave_pb::stream_plan::{PbDispatchOutputMapping, PbDispatcher, PbDispatcherType};
58use risingwave_pb::user::grant_privilege::{PbActionWithGrantOption, PbObject as PbGrantObject};
59use risingwave_pb::user::{PbAction, PbGrantPrivilege, PbUserInfo};
60use risingwave_sqlparser::ast::Statement as SqlStatement;
61use risingwave_sqlparser::parser::Parser;
62use sea_orm::sea_query::{
63 Alias, CommonTableExpression, Expr, Query, QueryStatementBuilder, SelectStatement, UnionType,
64 WithClause,
65};
66use sea_orm::{
67 ColumnTrait, ConnectionTrait, DatabaseTransaction, DerivePartialModel, EntityTrait,
68 FromQueryResult, IntoActiveModel, JoinType, Order, PaginatorTrait, QueryFilter, QuerySelect,
69 RelationTrait, Set, Statement,
70};
71use thiserror_ext::AsReport;
72use tracing::warn;
73
74use crate::barrier::{SharedActorInfos, SharedFragmentInfo};
75use crate::controller::ObjectModel;
76use crate::controller::fragment::FragmentTypeMaskExt;
77use crate::controller::scale::resolve_streaming_job_definition;
78use crate::model::{FragmentDownstreamRelation, StreamContext};
79use crate::{MetaError, MetaResult};
80
81pub fn construct_obj_dependency_query(obj_id: ObjectId) -> WithQuery {
106 let cte_alias = Alias::new("used_by_object_ids");
107 let cte_return_alias = Alias::new("used_by");
108
109 let mut base_query = SelectStatement::new()
110 .column(object_dependency::Column::UsedBy)
111 .from(ObjectDependency)
112 .and_where(object_dependency::Column::Oid.eq(obj_id))
113 .to_owned();
114
115 let belonged_obj_query = SelectStatement::new()
116 .column(object::Column::Oid)
117 .from(Object)
118 .and_where(
119 object::Column::DatabaseId
120 .eq(obj_id)
121 .or(object::Column::SchemaId.eq(obj_id)),
122 )
123 .to_owned();
124
125 let cte_referencing = Query::select()
126 .column((ObjectDependency, object_dependency::Column::UsedBy))
127 .from(ObjectDependency)
128 .inner_join(
129 cte_alias.clone(),
130 Expr::col((cte_alias.clone(), cte_return_alias.clone()))
131 .equals(object_dependency::Column::Oid),
132 )
133 .to_owned();
134
135 let mut common_table_expr = CommonTableExpression::new();
136 common_table_expr
137 .query(
138 base_query
139 .union(UnionType::All, belonged_obj_query)
140 .union(UnionType::All, cte_referencing)
141 .to_owned(),
142 )
143 .column(cte_return_alias.clone())
144 .table_name(cte_alias.clone());
145
146 SelectStatement::new()
147 .distinct()
148 .columns([
149 object::Column::Oid,
150 object::Column::ObjType,
151 object::Column::SchemaId,
152 object::Column::DatabaseId,
153 ])
154 .from(cte_alias.clone())
155 .inner_join(
156 Object,
157 Expr::col((cte_alias, cte_return_alias)).equals(object::Column::Oid),
158 )
159 .order_by(object::Column::Oid, Order::Desc)
160 .to_owned()
161 .with(
162 WithClause::new()
163 .recursive(true)
164 .cte(common_table_expr)
165 .to_owned(),
166 )
167}
168
169pub fn construct_sink_cycle_check_query(
194 target_table: ObjectId,
195 dependent_objects: Vec<ObjectId>,
196) -> WithQuery {
197 let cte_alias = Alias::new("used_by_object_ids_with_sink");
198 let depend_alias = Alias::new("obj_dependency_with_sink");
199
200 let mut base_query = SelectStatement::new()
201 .columns([
202 object_dependency::Column::Oid,
203 object_dependency::Column::UsedBy,
204 ])
205 .from(ObjectDependency)
206 .and_where(object_dependency::Column::Oid.eq(target_table))
207 .to_owned();
208
209 let query_sink_deps = SelectStatement::new()
210 .columns([sink::Column::SinkId, sink::Column::TargetTable])
211 .from(Sink)
212 .and_where(sink::Column::TargetTable.is_not_null())
213 .to_owned();
214
215 let cte_referencing = Query::select()
216 .column((depend_alias.clone(), object_dependency::Column::Oid))
217 .column((depend_alias.clone(), object_dependency::Column::UsedBy))
218 .from_subquery(
219 SelectStatement::new()
220 .columns([
221 object_dependency::Column::Oid,
222 object_dependency::Column::UsedBy,
223 ])
224 .from(ObjectDependency)
225 .union(UnionType::All, query_sink_deps)
226 .to_owned(),
227 depend_alias.clone(),
228 )
229 .inner_join(
230 cte_alias.clone(),
231 Expr::col((cte_alias.clone(), object_dependency::Column::UsedBy))
232 .eq(Expr::col((depend_alias, object_dependency::Column::Oid))),
233 )
234 .and_where(
235 Expr::col((cte_alias.clone(), object_dependency::Column::UsedBy)).ne(Expr::col((
236 cte_alias.clone(),
237 object_dependency::Column::Oid,
238 ))),
239 )
240 .to_owned();
241
242 let mut common_table_expr = CommonTableExpression::new();
243 common_table_expr
244 .query(base_query.union(UnionType::All, cte_referencing).to_owned())
245 .columns([
246 object_dependency::Column::Oid,
247 object_dependency::Column::UsedBy,
248 ])
249 .table_name(cte_alias.clone());
250
251 SelectStatement::new()
252 .expr(Expr::col((cte_alias.clone(), object_dependency::Column::UsedBy)).count())
253 .from(cte_alias.clone())
254 .and_where(
255 Expr::col((cte_alias, object_dependency::Column::UsedBy)).is_in(dependent_objects),
256 )
257 .to_owned()
258 .with(
259 WithClause::new()
260 .recursive(true)
261 .cte(common_table_expr)
262 .to_owned(),
263 )
264}
265
266#[derive(Clone, DerivePartialModel, FromQueryResult, Debug)]
267#[sea_orm(entity = "Object")]
268pub struct PartialObject {
269 pub oid: ObjectId,
270 pub obj_type: ObjectType,
271 pub schema_id: Option<SchemaId>,
272 pub database_id: Option<DatabaseId>,
273}
274
275#[derive(Clone, DerivePartialModel, FromQueryResult)]
276#[sea_orm(entity = "Fragment")]
277pub struct PartialFragmentStateTables {
278 pub fragment_id: FragmentId,
279 pub job_id: ObjectId,
280 pub state_table_ids: TableIdArray,
281}
282
283#[derive(Clone, Eq, PartialEq, Debug)]
284pub struct PartialActorLocation {
285 pub actor_id: ActorId,
286 pub fragment_id: FragmentId,
287 pub worker_id: WorkerId,
288}
289
290#[derive(FromQueryResult, Debug, Eq, PartialEq, Clone)]
291pub struct FragmentDesc {
292 pub fragment_id: FragmentId,
293 pub job_id: JobId,
294 pub fragment_type_mask: i32,
295 pub distribution_type: DistributionType,
296 pub state_table_ids: TableIdArray,
297 pub parallelism: i64,
298 pub vnode_count: i32,
299 pub stream_node: StreamNode,
300 pub parallelism_policy: String,
301}
302
303pub async fn get_referring_objects_cascade<C>(
305 obj_id: ObjectId,
306 db: &C,
307) -> MetaResult<Vec<PartialObject>>
308where
309 C: ConnectionTrait,
310{
311 let query = construct_obj_dependency_query(obj_id);
312 let (sql, values) = query.build_any(&*db.get_database_backend().get_query_builder());
313 let objects = PartialObject::find_by_statement(Statement::from_sql_and_values(
314 db.get_database_backend(),
315 sql,
316 values,
317 ))
318 .all(db)
319 .await?;
320 Ok(objects)
321}
322
323pub async fn check_sink_into_table_cycle<C>(
325 target_table: ObjectId,
326 dependent_objs: Vec<ObjectId>,
327 db: &C,
328) -> MetaResult<bool>
329where
330 C: ConnectionTrait,
331{
332 if dependent_objs.is_empty() {
333 return Ok(false);
334 }
335
336 if dependent_objs.contains(&target_table) {
338 return Ok(true);
339 }
340
341 let query = construct_sink_cycle_check_query(target_table, dependent_objs);
342 let (sql, values) = query.build_any(&*db.get_database_backend().get_query_builder());
343
344 let res = db
345 .query_one(Statement::from_sql_and_values(
346 db.get_database_backend(),
347 sql,
348 values,
349 ))
350 .await?
351 .unwrap();
352
353 let cnt: i64 = res.try_get_by(0)?;
354
355 Ok(cnt != 0)
356}
357
358pub async fn ensure_object_id<C>(
360 object_type: ObjectType,
361 obj_id: impl Into<ObjectId>,
362 db: &C,
363) -> MetaResult<()>
364where
365 C: ConnectionTrait,
366{
367 let obj_id = obj_id.into();
368 let count = Object::find_by_id(obj_id).count(db).await?;
369 if count == 0 {
370 return Err(MetaError::catalog_id_not_found(
371 object_type.as_str(),
372 obj_id,
373 ));
374 }
375 Ok(())
376}
377
378pub async fn ensure_job_not_canceled<C>(job_id: JobId, db: &C) -> MetaResult<()>
379where
380 C: ConnectionTrait,
381{
382 let count = Object::find_by_id(job_id).count(db).await?;
383 if count == 0 {
384 return Err(MetaError::cancelled(format!(
385 "job {} might be cancelled manually or by recovery",
386 job_id
387 )));
388 }
389 Ok(())
390}
391
392pub async fn ensure_user_id<C>(user_id: UserId, db: &C) -> MetaResult<()>
394where
395 C: ConnectionTrait,
396{
397 let count = User::find_by_id(user_id).count(db).await?;
398 if count == 0 {
399 return Err(anyhow!("user {} was concurrently dropped", user_id).into());
400 }
401 Ok(())
402}
403
404pub async fn check_database_name_duplicate<C>(name: &str, db: &C) -> MetaResult<()>
406where
407 C: ConnectionTrait,
408{
409 let count = Database::find()
410 .filter(database::Column::Name.eq(name))
411 .count(db)
412 .await?;
413 if count > 0 {
414 assert_eq!(count, 1);
415 return Err(MetaError::catalog_duplicated("database", name));
416 }
417 Ok(())
418}
419
420pub async fn check_function_signature_duplicate<C>(
422 pb_function: &PbFunction,
423 db: &C,
424) -> MetaResult<()>
425where
426 C: ConnectionTrait,
427{
428 let count = Function::find()
429 .inner_join(Object)
430 .filter(
431 object::Column::DatabaseId
432 .eq(pb_function.database_id)
433 .and(object::Column::SchemaId.eq(pb_function.schema_id))
434 .and(function::Column::Name.eq(&pb_function.name))
435 .and(
436 function::Column::ArgTypes
437 .eq(DataTypeArray::from(pb_function.arg_types.clone())),
438 ),
439 )
440 .count(db)
441 .await?;
442 if count > 0 {
443 assert_eq!(count, 1);
444 return Err(MetaError::catalog_duplicated("function", &pb_function.name));
445 }
446 Ok(())
447}
448
449pub async fn check_connection_name_duplicate<C>(
451 pb_connection: &PbConnection,
452 db: &C,
453) -> MetaResult<()>
454where
455 C: ConnectionTrait,
456{
457 let count = Connection::find()
458 .inner_join(Object)
459 .filter(
460 object::Column::DatabaseId
461 .eq(pb_connection.database_id)
462 .and(object::Column::SchemaId.eq(pb_connection.schema_id))
463 .and(connection::Column::Name.eq(&pb_connection.name)),
464 )
465 .count(db)
466 .await?;
467 if count > 0 {
468 assert_eq!(count, 1);
469 return Err(MetaError::catalog_duplicated(
470 "connection",
471 &pb_connection.name,
472 ));
473 }
474 Ok(())
475}
476
477pub async fn check_secret_name_duplicate<C>(pb_secret: &PbSecret, db: &C) -> MetaResult<()>
478where
479 C: ConnectionTrait,
480{
481 let count = Secret::find()
482 .inner_join(Object)
483 .filter(
484 object::Column::DatabaseId
485 .eq(pb_secret.database_id)
486 .and(object::Column::SchemaId.eq(pb_secret.schema_id))
487 .and(secret::Column::Name.eq(&pb_secret.name)),
488 )
489 .count(db)
490 .await?;
491 if count > 0 {
492 assert_eq!(count, 1);
493 return Err(MetaError::catalog_duplicated("secret", &pb_secret.name));
494 }
495 Ok(())
496}
497
498pub async fn check_subscription_name_duplicate<C>(
499 pb_subscription: &PbSubscription,
500 db: &C,
501) -> MetaResult<()>
502where
503 C: ConnectionTrait,
504{
505 let count = Subscription::find()
506 .inner_join(Object)
507 .filter(
508 object::Column::DatabaseId
509 .eq(pb_subscription.database_id)
510 .and(object::Column::SchemaId.eq(pb_subscription.schema_id))
511 .and(subscription::Column::Name.eq(&pb_subscription.name)),
512 )
513 .count(db)
514 .await?;
515 if count > 0 {
516 assert_eq!(count, 1);
517 return Err(MetaError::catalog_duplicated(
518 "subscription",
519 &pb_subscription.name,
520 ));
521 }
522 Ok(())
523}
524
525pub async fn check_user_name_duplicate<C>(name: &str, db: &C) -> MetaResult<()>
527where
528 C: ConnectionTrait,
529{
530 let count = User::find()
531 .filter(user::Column::Name.eq(name))
532 .count(db)
533 .await?;
534 if count > 0 {
535 assert_eq!(count, 1);
536 return Err(MetaError::catalog_duplicated("user", name));
537 }
538 Ok(())
539}
540
541pub async fn check_relation_name_duplicate<C>(
543 name: &str,
544 database_id: DatabaseId,
545 schema_id: SchemaId,
546 db: &C,
547) -> MetaResult<()>
548where
549 C: ConnectionTrait,
550{
551 macro_rules! check_duplicated {
552 ($obj_type:expr, $entity:ident, $table:ident) => {
553 let object_id = Object::find()
554 .select_only()
555 .column(object::Column::Oid)
556 .inner_join($entity)
557 .filter(
558 object::Column::DatabaseId
559 .eq(Some(database_id))
560 .and(object::Column::SchemaId.eq(Some(schema_id)))
561 .and($table::Column::Name.eq(name)),
562 )
563 .into_tuple::<ObjectId>()
564 .one(db)
565 .await?;
566 if let Some(oid) = object_id {
567 let check_creation = if $obj_type == ObjectType::View {
568 false
569 } else if $obj_type == ObjectType::Source {
570 let source_info = Source::find_by_id(oid.as_source_id())
571 .select_only()
572 .column(source::Column::SourceInfo)
573 .into_tuple::<Option<StreamSourceInfo>>()
574 .one(db)
575 .await?
576 .unwrap();
577 source_info.map_or(false, |info| info.to_protobuf().is_shared())
578 } else {
579 true
580 };
581 let job_id = oid.as_job_id();
582 return if check_creation
583 && !matches!(
584 StreamingJob::find_by_id(job_id)
585 .select_only()
586 .column(streaming_job::Column::JobStatus)
587 .into_tuple::<JobStatus>()
588 .one(db)
589 .await?,
590 Some(JobStatus::Created)
591 ) {
592 Err(MetaError::catalog_under_creation(
593 $obj_type.as_str(),
594 name,
595 job_id,
596 ))
597 } else {
598 Err(MetaError::catalog_duplicated($obj_type.as_str(), name))
599 };
600 }
601 };
602 }
603 check_duplicated!(ObjectType::Table, Table, table);
604 check_duplicated!(ObjectType::Source, Source, source);
605 check_duplicated!(ObjectType::Sink, Sink, sink);
606 check_duplicated!(ObjectType::Index, Index, index);
607 check_duplicated!(ObjectType::View, View, view);
608
609 Ok(())
610}
611
612pub async fn check_schema_name_duplicate<C>(
614 name: &str,
615 database_id: DatabaseId,
616 db: &C,
617) -> MetaResult<()>
618where
619 C: ConnectionTrait,
620{
621 let count = Object::find()
622 .inner_join(Schema)
623 .filter(
624 object::Column::ObjType
625 .eq(ObjectType::Schema)
626 .and(object::Column::DatabaseId.eq(Some(database_id)))
627 .and(schema::Column::Name.eq(name)),
628 )
629 .count(db)
630 .await?;
631 if count != 0 {
632 return Err(MetaError::catalog_duplicated("schema", name));
633 }
634
635 Ok(())
636}
637
638pub async fn check_object_refer_for_drop<C>(
641 object_type: ObjectType,
642 object_id: ObjectId,
643 db: &C,
644) -> MetaResult<()>
645where
646 C: ConnectionTrait,
647{
648 let count = if object_type == ObjectType::Table {
650 ObjectDependency::find()
651 .join(
652 JoinType::InnerJoin,
653 object_dependency::Relation::Object1.def(),
654 )
655 .filter(
656 object_dependency::Column::Oid
657 .eq(object_id)
658 .and(object::Column::ObjType.ne(ObjectType::Index)),
659 )
660 .count(db)
661 .await?
662 } else {
663 ObjectDependency::find()
664 .filter(object_dependency::Column::Oid.eq(object_id))
665 .count(db)
666 .await?
667 };
668 if count != 0 {
669 let referring_objects = get_referring_objects(object_id, db).await?;
671 let referring_objs_map = referring_objects
672 .into_iter()
673 .filter(|o| o.obj_type != ObjectType::Index)
674 .into_group_map_by(|o| o.obj_type);
675 let mut details = vec![];
676 for (obj_type, objs) in referring_objs_map {
677 match obj_type {
678 ObjectType::Table => {
679 let tables: Vec<(String, String)> = Object::find()
680 .join(JoinType::InnerJoin, object::Relation::Table.def())
681 .join(JoinType::InnerJoin, object::Relation::Database2.def())
682 .join(JoinType::InnerJoin, object::Relation::Schema2.def())
683 .select_only()
684 .column(schema::Column::Name)
685 .column(table::Column::Name)
686 .filter(object::Column::Oid.is_in(objs.iter().map(|o| o.oid)))
687 .into_tuple()
688 .all(db)
689 .await?;
690 details.extend(tables.into_iter().map(|(schema_name, table_name)| {
691 format!(
692 "materialized view {}.{} depends on it",
693 schema_name, table_name
694 )
695 }));
696 }
697 ObjectType::Sink => {
698 let sinks: Vec<(String, String)> = Object::find()
699 .join(JoinType::InnerJoin, object::Relation::Sink.def())
700 .join(JoinType::InnerJoin, object::Relation::Database2.def())
701 .join(JoinType::InnerJoin, object::Relation::Schema2.def())
702 .select_only()
703 .column(schema::Column::Name)
704 .column(sink::Column::Name)
705 .filter(object::Column::Oid.is_in(objs.iter().map(|o| o.oid)))
706 .into_tuple()
707 .all(db)
708 .await?;
709 if object_type == ObjectType::Table {
710 let engine = Table::find_by_id(object_id.as_table_id())
711 .select_only()
712 .column(table::Column::Engine)
713 .into_tuple::<table::Engine>()
714 .one(db)
715 .await?;
716 if engine == Some(table::Engine::Iceberg) && sinks.len() == 1 {
717 continue;
718 }
719 }
720 details.extend(sinks.into_iter().map(|(schema_name, sink_name)| {
721 format!("sink {}.{} depends on it", schema_name, sink_name)
722 }));
723 }
724 ObjectType::View => {
725 let views: Vec<(String, String)> = Object::find()
726 .join(JoinType::InnerJoin, object::Relation::View.def())
727 .join(JoinType::InnerJoin, object::Relation::Database2.def())
728 .join(JoinType::InnerJoin, object::Relation::Schema2.def())
729 .select_only()
730 .column(schema::Column::Name)
731 .column(view::Column::Name)
732 .filter(object::Column::Oid.is_in(objs.iter().map(|o| o.oid)))
733 .into_tuple()
734 .all(db)
735 .await?;
736 details.extend(views.into_iter().map(|(schema_name, view_name)| {
737 format!("view {}.{} depends on it", schema_name, view_name)
738 }));
739 }
740 ObjectType::Subscription => {
741 let subscriptions: Vec<(String, String)> = Object::find()
742 .join(JoinType::InnerJoin, object::Relation::Subscription.def())
743 .join(JoinType::InnerJoin, object::Relation::Database2.def())
744 .join(JoinType::InnerJoin, object::Relation::Schema2.def())
745 .select_only()
746 .column(schema::Column::Name)
747 .column(subscription::Column::Name)
748 .filter(object::Column::Oid.is_in(objs.iter().map(|o| o.oid)))
749 .into_tuple()
750 .all(db)
751 .await?;
752 details.extend(subscriptions.into_iter().map(
753 |(schema_name, subscription_name)| {
754 format!(
755 "subscription {}.{} depends on it",
756 schema_name, subscription_name
757 )
758 },
759 ));
760 }
761 ObjectType::Source => {
762 let sources: Vec<(String, String)> = Object::find()
763 .join(JoinType::InnerJoin, object::Relation::Source.def())
764 .join(JoinType::InnerJoin, object::Relation::Database2.def())
765 .join(JoinType::InnerJoin, object::Relation::Schema2.def())
766 .select_only()
767 .column(schema::Column::Name)
768 .column(source::Column::Name)
769 .filter(object::Column::Oid.is_in(objs.iter().map(|o| o.oid)))
770 .into_tuple()
771 .all(db)
772 .await?;
773 details.extend(sources.into_iter().map(|(schema_name, view_name)| {
774 format!("source {}.{} depends on it", schema_name, view_name)
775 }));
776 }
777 ObjectType::Connection => {
778 let connections: Vec<(String, String)> = Object::find()
779 .join(JoinType::InnerJoin, object::Relation::Connection.def())
780 .join(JoinType::InnerJoin, object::Relation::Database2.def())
781 .join(JoinType::InnerJoin, object::Relation::Schema2.def())
782 .select_only()
783 .column(schema::Column::Name)
784 .column(connection::Column::Name)
785 .filter(object::Column::Oid.is_in(objs.iter().map(|o| o.oid)))
786 .into_tuple()
787 .all(db)
788 .await?;
789 details.extend(connections.into_iter().map(|(schema_name, view_name)| {
790 format!("connection {}.{} depends on it", schema_name, view_name)
791 }));
792 }
793 _ => bail!("unexpected referring object type: {}", obj_type.as_str()),
795 }
796 }
797 if details.is_empty() {
798 return Ok(());
799 }
800
801 return Err(MetaError::permission_denied(format!(
802 "{} used by {} other objects. \nDETAIL: {}\n\
803 {}",
804 object_type.as_str(),
805 details.len(),
806 details.join("\n"),
807 match object_type {
808 ObjectType::Function | ObjectType::Connection | ObjectType::Secret =>
809 "HINT: DROP the dependent objects first.",
810 ObjectType::Database | ObjectType::Schema => unreachable!(),
811 _ => "HINT: Use DROP ... CASCADE to drop the dependent objects too.",
812 }
813 )));
814 }
815 Ok(())
816}
817
818pub async fn get_referring_objects<C>(object_id: ObjectId, db: &C) -> MetaResult<Vec<PartialObject>>
820where
821 C: ConnectionTrait,
822{
823 let objs = ObjectDependency::find()
824 .filter(object_dependency::Column::Oid.eq(object_id))
825 .join(
826 JoinType::InnerJoin,
827 object_dependency::Relation::Object1.def(),
828 )
829 .into_partial_model()
830 .all(db)
831 .await?;
832
833 Ok(objs)
834}
835
836pub async fn ensure_schema_empty<C>(schema_id: SchemaId, db: &C) -> MetaResult<()>
838where
839 C: ConnectionTrait,
840{
841 let count = Object::find()
842 .filter(object::Column::SchemaId.eq(Some(schema_id)))
843 .count(db)
844 .await?;
845 if count != 0 {
846 return Err(MetaError::permission_denied("schema is not empty"));
847 }
848
849 Ok(())
850}
851
852pub async fn list_user_info_by_ids<C>(
854 user_ids: impl IntoIterator<Item = UserId>,
855 db: &C,
856) -> MetaResult<Vec<PbUserInfo>>
857where
858 C: ConnectionTrait,
859{
860 let mut user_infos = vec![];
861 for user_id in user_ids {
862 let user = User::find_by_id(user_id)
863 .one(db)
864 .await?
865 .ok_or_else(|| MetaError::catalog_id_not_found("user", user_id))?;
866 let mut user_info: PbUserInfo = user.into();
867 user_info.grant_privileges = get_user_privilege(user_id, db).await?;
868 user_infos.push(user_info);
869 }
870 Ok(user_infos)
871}
872
873pub async fn get_object_owner<C>(object_id: ObjectId, db: &C) -> MetaResult<UserId>
875where
876 C: ConnectionTrait,
877{
878 let obj_owner: UserId = Object::find_by_id(object_id)
879 .select_only()
880 .column(object::Column::OwnerId)
881 .into_tuple()
882 .one(db)
883 .await?
884 .ok_or_else(|| MetaError::catalog_id_not_found("object", object_id))?;
885 Ok(obj_owner)
886}
887
888pub fn construct_privilege_dependency_query(ids: Vec<PrivilegeId>) -> WithQuery {
913 let cte_alias = Alias::new("granted_privilege_ids");
914 let cte_return_privilege_alias = Alias::new("id");
915 let cte_return_user_alias = Alias::new("user_id");
916
917 let mut base_query = SelectStatement::new()
918 .columns([user_privilege::Column::Id, user_privilege::Column::UserId])
919 .from(UserPrivilege)
920 .and_where(user_privilege::Column::Id.is_in(ids))
921 .to_owned();
922
923 let cte_referencing = Query::select()
924 .columns([
925 (UserPrivilege, user_privilege::Column::Id),
926 (UserPrivilege, user_privilege::Column::UserId),
927 ])
928 .from(UserPrivilege)
929 .inner_join(
930 cte_alias.clone(),
931 Expr::col((cte_alias.clone(), cte_return_privilege_alias.clone()))
932 .equals(user_privilege::Column::DependentId),
933 )
934 .to_owned();
935
936 let mut common_table_expr = CommonTableExpression::new();
937 common_table_expr
938 .query(base_query.union(UnionType::All, cte_referencing).to_owned())
939 .columns([
940 cte_return_privilege_alias.clone(),
941 cte_return_user_alias.clone(),
942 ])
943 .table_name(cte_alias.clone());
944
945 SelectStatement::new()
946 .columns([cte_return_privilege_alias, cte_return_user_alias])
947 .from(cte_alias)
948 .to_owned()
949 .with(
950 WithClause::new()
951 .recursive(true)
952 .cte(common_table_expr)
953 .to_owned(),
954 )
955}
956
957pub async fn get_internal_tables_by_id<C>(job_id: JobId, db: &C) -> MetaResult<Vec<TableId>>
958where
959 C: ConnectionTrait,
960{
961 let table_ids: Vec<TableId> = Table::find()
962 .select_only()
963 .column(table::Column::TableId)
964 .filter(
965 table::Column::TableType
966 .eq(TableType::Internal)
967 .and(table::Column::BelongsToJobId.eq(job_id)),
968 )
969 .into_tuple()
970 .all(db)
971 .await?;
972 Ok(table_ids)
973}
974
975pub async fn get_index_state_tables_by_table_id<C>(
976 table_id: TableId,
977 db: &C,
978) -> MetaResult<Vec<TableId>>
979where
980 C: ConnectionTrait,
981{
982 let mut index_table_ids: Vec<TableId> = Index::find()
983 .select_only()
984 .column(index::Column::IndexTableId)
985 .filter(index::Column::PrimaryTableId.eq(table_id))
986 .into_tuple()
987 .all(db)
988 .await?;
989
990 if !index_table_ids.is_empty() {
991 let internal_table_ids: Vec<TableId> = Table::find()
992 .select_only()
993 .column(table::Column::TableId)
994 .filter(
995 table::Column::TableType
996 .eq(TableType::Internal)
997 .and(table::Column::BelongsToJobId.is_in(index_table_ids.clone())),
998 )
999 .into_tuple()
1000 .all(db)
1001 .await?;
1002
1003 index_table_ids.extend(internal_table_ids.into_iter());
1004 }
1005
1006 Ok(index_table_ids)
1007}
1008
1009pub async fn get_iceberg_related_object_ids<C>(
1011 object_id: ObjectId,
1012 db: &C,
1013) -> MetaResult<Vec<ObjectId>>
1014where
1015 C: ConnectionTrait,
1016{
1017 let object = Object::find_by_id(object_id)
1018 .one(db)
1019 .await?
1020 .ok_or_else(|| MetaError::catalog_id_not_found("object", object_id))?;
1021 if object.obj_type != ObjectType::Table {
1022 return Ok(vec![]);
1023 }
1024
1025 let table = Table::find_by_id(object_id.as_table_id())
1026 .one(db)
1027 .await?
1028 .ok_or_else(|| MetaError::catalog_id_not_found("table", object_id))?;
1029 if !matches!(table.engine, Some(table::Engine::Iceberg)) {
1030 return Ok(vec![]);
1031 }
1032
1033 let database_id = object.database_id.unwrap();
1034 let schema_id = object.schema_id.unwrap();
1035
1036 let mut related_objects = vec![];
1037
1038 let iceberg_sink_name = format!("{}{}", ICEBERG_SINK_PREFIX, table.name);
1039 let iceberg_sink_id = Sink::find()
1040 .inner_join(Object)
1041 .select_only()
1042 .column(sink::Column::SinkId)
1043 .filter(
1044 object::Column::DatabaseId
1045 .eq(database_id)
1046 .and(object::Column::SchemaId.eq(schema_id))
1047 .and(sink::Column::Name.eq(&iceberg_sink_name)),
1048 )
1049 .into_tuple::<SinkId>()
1050 .one(db)
1051 .await?;
1052 if let Some(sink_id) = iceberg_sink_id {
1053 related_objects.push(sink_id.as_object_id());
1054 let sink_internal_tables = get_internal_tables_by_id(sink_id.as_job_id(), db).await?;
1055 related_objects.extend(
1056 sink_internal_tables
1057 .into_iter()
1058 .map(|tid| tid.as_object_id()),
1059 );
1060 } else {
1061 warn!(
1062 "iceberg table {} missing sink {}",
1063 table.name, iceberg_sink_name
1064 );
1065 }
1066
1067 let iceberg_source_name = format!("{}{}", ICEBERG_SOURCE_PREFIX, table.name);
1068 let iceberg_source_id = Source::find()
1069 .inner_join(Object)
1070 .select_only()
1071 .column(source::Column::SourceId)
1072 .filter(
1073 object::Column::DatabaseId
1074 .eq(database_id)
1075 .and(object::Column::SchemaId.eq(schema_id))
1076 .and(source::Column::Name.eq(&iceberg_source_name)),
1077 )
1078 .into_tuple::<SourceId>()
1079 .one(db)
1080 .await?;
1081 if let Some(source_id) = iceberg_source_id {
1082 related_objects.push(source_id.as_object_id());
1083 } else {
1084 warn!(
1085 "iceberg table {} missing source {}",
1086 table.name, iceberg_source_name
1087 );
1088 }
1089
1090 Ok(related_objects)
1091}
1092
1093pub(crate) async fn load_streaming_jobs_by_ids<C>(
1095 txn: &C,
1096 job_ids: impl IntoIterator<Item = JobId>,
1097) -> MetaResult<HashMap<JobId, streaming_job::Model>>
1098where
1099 C: ConnectionTrait,
1100{
1101 let job_ids: HashSet<JobId> = job_ids.into_iter().collect();
1102 if job_ids.is_empty() {
1103 return Ok(HashMap::new());
1104 }
1105 let jobs = streaming_job::Entity::find()
1106 .filter(streaming_job::Column::JobId.is_in(job_ids.clone()))
1107 .all(txn)
1108 .await?;
1109 Ok(jobs.into_iter().map(|job| (job.job_id, job)).collect())
1110}
1111
1112#[derive(Clone, DerivePartialModel, FromQueryResult)]
1113#[sea_orm(entity = "UserPrivilege")]
1114pub struct PartialUserPrivilege {
1115 pub id: PrivilegeId,
1116 pub user_id: UserId,
1117}
1118
1119pub async fn get_referring_privileges_cascade<C>(
1120 ids: Vec<PrivilegeId>,
1121 db: &C,
1122) -> MetaResult<Vec<PartialUserPrivilege>>
1123where
1124 C: ConnectionTrait,
1125{
1126 let query = construct_privilege_dependency_query(ids);
1127 let (sql, values) = query.build_any(&*db.get_database_backend().get_query_builder());
1128 let privileges = PartialUserPrivilege::find_by_statement(Statement::from_sql_and_values(
1129 db.get_database_backend(),
1130 sql,
1131 values,
1132 ))
1133 .all(db)
1134 .await?;
1135
1136 Ok(privileges)
1137}
1138
1139pub async fn ensure_privileges_not_referred<C>(ids: Vec<PrivilegeId>, db: &C) -> MetaResult<()>
1141where
1142 C: ConnectionTrait,
1143{
1144 let count = UserPrivilege::find()
1145 .filter(user_privilege::Column::DependentId.is_in(ids))
1146 .count(db)
1147 .await?;
1148 if count != 0 {
1149 return Err(MetaError::permission_denied(format!(
1150 "privileges granted to {} other ones.",
1151 count
1152 )));
1153 }
1154 Ok(())
1155}
1156
1157pub async fn get_user_privilege<C>(user_id: UserId, db: &C) -> MetaResult<Vec<PbGrantPrivilege>>
1159where
1160 C: ConnectionTrait,
1161{
1162 let user_privileges = UserPrivilege::find()
1163 .find_also_related(Object)
1164 .filter(user_privilege::Column::UserId.eq(user_id))
1165 .all(db)
1166 .await?;
1167 Ok(user_privileges
1168 .into_iter()
1169 .map(|(privilege, object)| {
1170 let object = object.unwrap();
1171 let oid = object.oid.as_raw_id();
1172 let obj = match object.obj_type {
1173 ObjectType::Database => PbGrantObject::DatabaseId(oid),
1174 ObjectType::Schema => PbGrantObject::SchemaId(oid),
1175 ObjectType::Table | ObjectType::Index => PbGrantObject::TableId(oid),
1176 ObjectType::Source => PbGrantObject::SourceId(oid),
1177 ObjectType::Sink => PbGrantObject::SinkId(oid),
1178 ObjectType::View => PbGrantObject::ViewId(oid),
1179 ObjectType::Function => PbGrantObject::FunctionId(oid),
1180 ObjectType::Connection => PbGrantObject::ConnectionId(oid),
1181 ObjectType::Subscription => PbGrantObject::SubscriptionId(oid),
1182 ObjectType::Secret => PbGrantObject::SecretId(oid),
1183 };
1184 PbGrantPrivilege {
1185 action_with_opts: vec![PbActionWithGrantOption {
1186 action: PbAction::from(privilege.action) as _,
1187 with_grant_option: privilege.with_grant_option,
1188 granted_by: privilege.granted_by as _,
1189 }],
1190 object: Some(obj),
1191 }
1192 })
1193 .collect())
1194}
1195
1196pub async fn get_table_columns(
1197 txn: &impl ConnectionTrait,
1198 id: TableId,
1199) -> MetaResult<ColumnCatalogArray> {
1200 let columns = Table::find_by_id(id)
1201 .select_only()
1202 .columns([table::Column::Columns])
1203 .into_tuple::<ColumnCatalogArray>()
1204 .one(txn)
1205 .await?
1206 .ok_or_else(|| MetaError::catalog_id_not_found("table", id))?;
1207 Ok(columns)
1208}
1209
1210pub async fn grant_default_privileges_automatically<C>(
1213 db: &C,
1214 object_id: impl Into<ObjectId>,
1215) -> MetaResult<Vec<PbUserInfo>>
1216where
1217 C: ConnectionTrait,
1218{
1219 let object_id = object_id.into();
1220 let object = Object::find_by_id(object_id)
1221 .one(db)
1222 .await?
1223 .ok_or_else(|| MetaError::catalog_id_not_found("object", object_id))?;
1224 assert_ne!(object.obj_type, ObjectType::Database);
1225
1226 let for_mview_filter = if object.obj_type == ObjectType::Table {
1227 let table_type = Table::find_by_id(object_id.as_table_id())
1228 .select_only()
1229 .column(table::Column::TableType)
1230 .into_tuple::<TableType>()
1231 .one(db)
1232 .await?
1233 .ok_or_else(|| MetaError::catalog_id_not_found("table", object_id))?;
1234 user_default_privilege::Column::ForMaterializedView
1235 .eq(table_type == TableType::MaterializedView)
1236 } else {
1237 user_default_privilege::Column::ForMaterializedView.eq(false)
1238 };
1239 let schema_filter = if let Some(schema_id) = &object.schema_id {
1240 user_default_privilege::Column::SchemaId.eq(*schema_id)
1241 } else {
1242 user_default_privilege::Column::SchemaId.is_null()
1243 };
1244
1245 let default_privileges: Vec<(UserId, UserId, Action, bool)> = UserDefaultPrivilege::find()
1246 .select_only()
1247 .columns([
1248 user_default_privilege::Column::Grantee,
1249 user_default_privilege::Column::GrantedBy,
1250 user_default_privilege::Column::Action,
1251 user_default_privilege::Column::WithGrantOption,
1252 ])
1253 .filter(
1254 user_default_privilege::Column::DatabaseId
1255 .eq(object.database_id.unwrap())
1256 .and(schema_filter)
1257 .and(user_default_privilege::Column::UserId.eq(object.owner_id))
1258 .and(user_default_privilege::Column::ObjectType.eq(object.obj_type))
1259 .and(for_mview_filter),
1260 )
1261 .into_tuple()
1262 .all(db)
1263 .await?;
1264 if default_privileges.is_empty() {
1265 return Ok(vec![]);
1266 }
1267
1268 let updated_user_ids = default_privileges
1269 .iter()
1270 .map(|(grantee, _, _, _)| *grantee)
1271 .collect::<HashSet<_>>();
1272
1273 for (grantee, granted_by, action, with_grant_option) in default_privileges {
1274 UserPrivilege::insert(user_privilege::ActiveModel {
1275 user_id: Set(grantee),
1276 oid: Set(object_id),
1277 granted_by: Set(granted_by),
1278 action: Set(action),
1279 with_grant_option: Set(with_grant_option),
1280 ..Default::default()
1281 })
1282 .exec(db)
1283 .await?;
1284 if action == Action::Select {
1285 let internal_table_ids = get_internal_tables_by_id(object_id.as_job_id(), db).await?;
1287 if !internal_table_ids.is_empty() {
1288 for internal_table_id in &internal_table_ids {
1289 UserPrivilege::insert(user_privilege::ActiveModel {
1290 user_id: Set(grantee),
1291 oid: Set(internal_table_id.as_object_id()),
1292 granted_by: Set(granted_by),
1293 action: Set(Action::Select),
1294 with_grant_option: Set(with_grant_option),
1295 ..Default::default()
1296 })
1297 .exec(db)
1298 .await?;
1299 }
1300 }
1301
1302 let iceberg_privilege_object_ids =
1304 get_iceberg_related_object_ids(object_id, db).await?;
1305 if !iceberg_privilege_object_ids.is_empty() {
1306 for iceberg_object_id in &iceberg_privilege_object_ids {
1307 UserPrivilege::insert(user_privilege::ActiveModel {
1308 user_id: Set(grantee),
1309 oid: Set(*iceberg_object_id),
1310 granted_by: Set(granted_by),
1311 action: Set(action),
1312 with_grant_option: Set(with_grant_option),
1313 ..Default::default()
1314 })
1315 .exec(db)
1316 .await?;
1317 }
1318 }
1319 }
1320 }
1321
1322 let updated_user_infos = list_user_info_by_ids(updated_user_ids, db).await?;
1323 Ok(updated_user_infos)
1324}
1325
1326pub fn extract_grant_obj_id(object: &PbGrantObject) -> ObjectId {
1328 match object {
1329 PbGrantObject::DatabaseId(id)
1330 | PbGrantObject::SchemaId(id)
1331 | PbGrantObject::TableId(id)
1332 | PbGrantObject::SourceId(id)
1333 | PbGrantObject::SinkId(id)
1334 | PbGrantObject::ViewId(id)
1335 | PbGrantObject::FunctionId(id)
1336 | PbGrantObject::SubscriptionId(id)
1337 | PbGrantObject::ConnectionId(id)
1338 | PbGrantObject::SecretId(id) => (*id).into(),
1339 }
1340}
1341
1342pub async fn insert_fragment_relations(
1343 db: &impl ConnectionTrait,
1344 downstream_fragment_relations: &FragmentDownstreamRelation,
1345) -> MetaResult<()> {
1346 let mut relations = vec![];
1347 for (upstream_fragment_id, downstreams) in downstream_fragment_relations {
1348 for downstream in downstreams {
1349 relations.push(
1350 fragment_relation::Model {
1351 source_fragment_id: *upstream_fragment_id as _,
1352 target_fragment_id: downstream.downstream_fragment_id as _,
1353 dispatcher_type: downstream.dispatcher_type,
1354 dist_key_indices: downstream
1355 .dist_key_indices
1356 .iter()
1357 .map(|idx| *idx as i32)
1358 .collect_vec()
1359 .into(),
1360 output_indices: downstream
1361 .output_mapping
1362 .indices
1363 .iter()
1364 .map(|idx| *idx as i32)
1365 .collect_vec()
1366 .into(),
1367 output_type_mapping: Some(downstream.output_mapping.types.clone().into()),
1368 }
1369 .into_active_model(),
1370 );
1371 }
1372 }
1373 if !relations.is_empty() {
1374 FragmentRelation::insert_many(relations).exec(db).await?;
1375 }
1376 Ok(())
1377}
1378
1379pub fn compose_dispatchers(
1380 source_fragment_distribution: DistributionType,
1381 source_fragment_actors: &HashMap<crate::model::ActorId, Option<Bitmap>>,
1382 target_fragment_id: crate::model::FragmentId,
1383 target_fragment_distribution: DistributionType,
1384 target_fragment_actors: &HashMap<crate::model::ActorId, Option<Bitmap>>,
1385 dispatcher_type: DispatcherType,
1386 dist_key_indices: Vec<u32>,
1387 output_mapping: PbDispatchOutputMapping,
1388) -> HashMap<crate::model::ActorId, PbDispatcher> {
1389 match dispatcher_type {
1390 DispatcherType::Hash => {
1391 let dispatcher = PbDispatcher {
1392 r#type: PbDispatcherType::from(dispatcher_type) as _,
1393 dist_key_indices,
1394 output_mapping: output_mapping.into(),
1395 hash_mapping: Some(
1396 ActorMapping::from_bitmaps(
1397 &target_fragment_actors
1398 .iter()
1399 .map(|(actor_id, bitmap)| {
1400 (
1401 *actor_id as _,
1402 bitmap
1403 .clone()
1404 .expect("downstream hash dispatch must have distribution"),
1405 )
1406 })
1407 .collect(),
1408 )
1409 .to_protobuf(),
1410 ),
1411 dispatcher_id: target_fragment_id,
1412 downstream_actor_id: target_fragment_actors.keys().copied().collect(),
1413 };
1414 source_fragment_actors
1415 .keys()
1416 .map(|source_actor_id| (*source_actor_id, dispatcher.clone()))
1417 .collect()
1418 }
1419 DispatcherType::Broadcast | DispatcherType::Simple => {
1420 let dispatcher = PbDispatcher {
1421 r#type: PbDispatcherType::from(dispatcher_type) as _,
1422 dist_key_indices,
1423 output_mapping: output_mapping.into(),
1424 hash_mapping: None,
1425 dispatcher_id: target_fragment_id,
1426 downstream_actor_id: target_fragment_actors.keys().copied().collect(),
1427 };
1428 source_fragment_actors
1429 .keys()
1430 .map(|source_actor_id| (*source_actor_id, dispatcher.clone()))
1431 .collect()
1432 }
1433 DispatcherType::NoShuffle => resolve_no_shuffle_actor_dispatcher(
1434 source_fragment_distribution,
1435 source_fragment_actors,
1436 target_fragment_distribution,
1437 target_fragment_actors,
1438 )
1439 .into_iter()
1440 .map(|(upstream_actor_id, downstream_actor_id)| {
1441 (
1442 upstream_actor_id,
1443 PbDispatcher {
1444 r#type: PbDispatcherType::NoShuffle as _,
1445 dist_key_indices: dist_key_indices.clone(),
1446 output_mapping: output_mapping.clone().into(),
1447 hash_mapping: None,
1448 dispatcher_id: target_fragment_id,
1449 downstream_actor_id: vec![downstream_actor_id],
1450 },
1451 )
1452 })
1453 .collect(),
1454 }
1455}
1456
1457pub fn resolve_no_shuffle_actor_dispatcher(
1459 source_fragment_distribution: DistributionType,
1460 source_fragment_actors: &HashMap<crate::model::ActorId, Option<Bitmap>>,
1461 target_fragment_distribution: DistributionType,
1462 target_fragment_actors: &HashMap<crate::model::ActorId, Option<Bitmap>>,
1463) -> Vec<(crate::model::ActorId, crate::model::ActorId)> {
1464 assert_eq!(source_fragment_distribution, target_fragment_distribution);
1465 assert_eq!(
1466 source_fragment_actors.len(),
1467 target_fragment_actors.len(),
1468 "no-shuffle should have equal upstream downstream actor count: {:?} {:?}",
1469 source_fragment_actors,
1470 target_fragment_actors
1471 );
1472 match source_fragment_distribution {
1473 DistributionType::Single => {
1474 let assert_singleton = |bitmap: &Option<Bitmap>| {
1475 assert!(
1476 bitmap.as_ref().map(|bitmap| bitmap.all()).unwrap_or(true),
1477 "not singleton: {:?}",
1478 bitmap
1479 );
1480 };
1481 assert_eq!(
1482 source_fragment_actors.len(),
1483 1,
1484 "singleton distribution actor count not 1: {:?}",
1485 source_fragment_distribution
1486 );
1487 assert_eq!(
1488 target_fragment_actors.len(),
1489 1,
1490 "singleton distribution actor count not 1: {:?}",
1491 target_fragment_distribution
1492 );
1493 let (source_actor_id, bitmap) = source_fragment_actors.iter().next().unwrap();
1494 assert_singleton(bitmap);
1495 let (target_actor_id, bitmap) = target_fragment_actors.iter().next().unwrap();
1496 assert_singleton(bitmap);
1497 vec![(*source_actor_id, *target_actor_id)]
1498 }
1499 DistributionType::Hash => {
1500 let mut target_fragment_actor_index: HashMap<_, _> = target_fragment_actors
1501 .iter()
1502 .map(|(actor_id, bitmap)| {
1503 let bitmap = bitmap
1504 .as_ref()
1505 .expect("hash distribution should have bitmap");
1506 let first_vnode = bitmap.iter_vnodes().next().expect("non-empty bitmap");
1507 (first_vnode, (*actor_id, bitmap))
1508 })
1509 .collect();
1510 source_fragment_actors
1511 .iter()
1512 .map(|(source_actor_id, bitmap)| {
1513 let bitmap = bitmap
1514 .as_ref()
1515 .expect("hash distribution should have bitmap");
1516 let first_vnode = bitmap.iter_vnodes().next().expect("non-empty bitmap");
1517 let (target_actor_id, target_bitmap) =
1518 target_fragment_actor_index.remove(&first_vnode).unwrap_or_else(|| {
1519 panic!(
1520 "cannot find matched target actor: {} {:?} {:?} {:?}",
1521 source_actor_id,
1522 first_vnode,
1523 source_fragment_actors,
1524 target_fragment_actors
1525 );
1526 });
1527 assert_eq!(
1528 bitmap,
1529 target_bitmap,
1530 "cannot find matched target actor due to bitmap mismatch: {} {:?} {:?} {:?}",
1531 source_actor_id,
1532 first_vnode,
1533 source_fragment_actors,
1534 target_fragment_actors
1535 );
1536 (*source_actor_id, target_actor_id)
1537 }).collect()
1538 }
1539 }
1540}
1541
1542pub fn rebuild_fragment_mapping(fragment: &SharedFragmentInfo) -> PbFragmentWorkerSlotMapping {
1543 let fragment_worker_slot_mapping = match fragment.distribution_type {
1544 DistributionType::Single => {
1545 let actor = fragment.actors.values().exactly_one().unwrap();
1546 WorkerSlotMapping::new_single(WorkerSlotId::new(actor.worker_id as _, 0))
1547 }
1548 DistributionType::Hash => {
1549 let actor_bitmaps: HashMap<_, _> = fragment
1550 .actors
1551 .iter()
1552 .map(|(actor_id, actor_info)| {
1553 let vnode_bitmap = actor_info
1554 .vnode_bitmap
1555 .as_ref()
1556 .cloned()
1557 .expect("actor bitmap shouldn't be none in hash fragment");
1558
1559 (*actor_id as hash::ActorId, vnode_bitmap)
1560 })
1561 .collect();
1562
1563 let actor_mapping = ActorMapping::from_bitmaps(&actor_bitmaps);
1564
1565 let actor_locations = fragment
1566 .actors
1567 .iter()
1568 .map(|(actor_id, actor_info)| (*actor_id as hash::ActorId, actor_info.worker_id))
1569 .collect();
1570
1571 actor_mapping.to_worker_slot(&actor_locations)
1572 }
1573 };
1574
1575 PbFragmentWorkerSlotMapping {
1576 fragment_id: fragment.fragment_id,
1577 mapping: Some(fragment_worker_slot_mapping.to_protobuf()),
1578 }
1579}
1580
1581pub async fn get_fragments_for_jobs<C>(
1587 db: &C,
1588 actor_info: &SharedActorInfos,
1589 streaming_jobs: Vec<JobId>,
1590) -> MetaResult<(
1591 HashMap<SourceId, BTreeSet<FragmentId>>,
1592 HashSet<FragmentId>,
1593 HashSet<ActorId>,
1594 HashSet<FragmentId>,
1595)>
1596where
1597 C: ConnectionTrait,
1598{
1599 if streaming_jobs.is_empty() {
1600 return Ok((
1601 HashMap::default(),
1602 HashSet::default(),
1603 HashSet::default(),
1604 HashSet::default(),
1605 ));
1606 }
1607
1608 let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find()
1609 .select_only()
1610 .columns([
1611 fragment::Column::FragmentId,
1612 fragment::Column::FragmentTypeMask,
1613 fragment::Column::StreamNode,
1614 ])
1615 .filter(fragment::Column::JobId.is_in(streaming_jobs))
1616 .into_tuple()
1617 .all(db)
1618 .await?;
1619
1620 let fragment_ids: HashSet<_> = fragments
1621 .iter()
1622 .map(|(fragment_id, _, _)| *fragment_id)
1623 .collect();
1624
1625 let actors = {
1626 let guard = actor_info.read_guard();
1627 fragment_ids
1628 .iter()
1629 .flat_map(|id| guard.get_fragment(*id as _))
1630 .flat_map(|f| f.actors.keys().cloned().map(|id| id as _))
1631 .collect::<HashSet<_>>()
1632 };
1633
1634 let mut source_fragment_ids: HashMap<SourceId, BTreeSet<FragmentId>> = HashMap::new();
1635 let mut sink_fragment_ids: HashSet<FragmentId> = HashSet::new();
1636 for (fragment_id, mask, stream_node) in fragments {
1637 if FragmentTypeMask::from(mask).contains(FragmentTypeFlag::Source)
1638 && let Some(source_id) = stream_node.to_protobuf().find_stream_source()
1639 {
1640 source_fragment_ids
1641 .entry(source_id)
1642 .or_default()
1643 .insert(fragment_id);
1644 }
1645 if FragmentTypeMask::from(mask).contains(FragmentTypeFlag::Sink) {
1646 sink_fragment_ids.insert(fragment_id);
1647 }
1648 }
1649
1650 Ok((
1651 source_fragment_ids,
1652 sink_fragment_ids,
1653 actors.into_iter().collect(),
1654 fragment_ids,
1655 ))
1656}
1657
1658pub(crate) fn build_object_group_for_delete(
1663 partial_objects: Vec<PartialObject>,
1664) -> NotificationInfo {
1665 let mut objects = vec![];
1666 for obj in partial_objects {
1667 match obj.obj_type {
1668 ObjectType::Database => objects.push(PbObject {
1669 object_info: Some(PbObjectInfo::Database(PbDatabase {
1670 id: obj.oid.as_database_id(),
1671 ..Default::default()
1672 })),
1673 }),
1674 ObjectType::Schema => objects.push(PbObject {
1675 object_info: Some(PbObjectInfo::Schema(PbSchema {
1676 id: obj.oid.as_schema_id(),
1677 database_id: obj.database_id.unwrap(),
1678 ..Default::default()
1679 })),
1680 }),
1681 ObjectType::Table => objects.push(PbObject {
1682 object_info: Some(PbObjectInfo::Table(PbTable {
1683 id: obj.oid.as_table_id(),
1684 schema_id: obj.schema_id.unwrap(),
1685 database_id: obj.database_id.unwrap(),
1686 ..Default::default()
1687 })),
1688 }),
1689 ObjectType::Source => objects.push(PbObject {
1690 object_info: Some(PbObjectInfo::Source(PbSource {
1691 id: obj.oid.as_source_id(),
1692 schema_id: obj.schema_id.unwrap(),
1693 database_id: obj.database_id.unwrap(),
1694 ..Default::default()
1695 })),
1696 }),
1697 ObjectType::Sink => objects.push(PbObject {
1698 object_info: Some(PbObjectInfo::Sink(PbSink {
1699 id: obj.oid.as_sink_id(),
1700 schema_id: obj.schema_id.unwrap(),
1701 database_id: obj.database_id.unwrap(),
1702 ..Default::default()
1703 })),
1704 }),
1705 ObjectType::Subscription => objects.push(PbObject {
1706 object_info: Some(PbObjectInfo::Subscription(PbSubscription {
1707 id: obj.oid.as_subscription_id(),
1708 schema_id: obj.schema_id.unwrap(),
1709 database_id: obj.database_id.unwrap(),
1710 ..Default::default()
1711 })),
1712 }),
1713 ObjectType::View => objects.push(PbObject {
1714 object_info: Some(PbObjectInfo::View(PbView {
1715 id: obj.oid.as_view_id(),
1716 schema_id: obj.schema_id.unwrap(),
1717 database_id: obj.database_id.unwrap(),
1718 ..Default::default()
1719 })),
1720 }),
1721 ObjectType::Index => {
1722 objects.push(PbObject {
1723 object_info: Some(PbObjectInfo::Index(PbIndex {
1724 id: obj.oid.as_index_id(),
1725 schema_id: obj.schema_id.unwrap(),
1726 database_id: obj.database_id.unwrap(),
1727 ..Default::default()
1728 })),
1729 });
1730 objects.push(PbObject {
1731 object_info: Some(PbObjectInfo::Table(PbTable {
1732 id: obj.oid.as_table_id(),
1733 schema_id: obj.schema_id.unwrap(),
1734 database_id: obj.database_id.unwrap(),
1735 ..Default::default()
1736 })),
1737 });
1738 }
1739 ObjectType::Function => objects.push(PbObject {
1740 object_info: Some(PbObjectInfo::Function(PbFunction {
1741 id: obj.oid.as_function_id(),
1742 schema_id: obj.schema_id.unwrap(),
1743 database_id: obj.database_id.unwrap(),
1744 ..Default::default()
1745 })),
1746 }),
1747 ObjectType::Connection => objects.push(PbObject {
1748 object_info: Some(PbObjectInfo::Connection(PbConnection {
1749 id: obj.oid.as_connection_id(),
1750 schema_id: obj.schema_id.unwrap(),
1751 database_id: obj.database_id.unwrap(),
1752 ..Default::default()
1753 })),
1754 }),
1755 ObjectType::Secret => objects.push(PbObject {
1756 object_info: Some(PbObjectInfo::Secret(PbSecret {
1757 id: obj.oid.as_secret_id(),
1758 schema_id: obj.schema_id.unwrap(),
1759 database_id: obj.database_id.unwrap(),
1760 ..Default::default()
1761 })),
1762 }),
1763 }
1764 }
1765 NotificationInfo::ObjectGroup(PbObjectGroup { objects })
1766}
1767
1768pub fn extract_external_table_name_from_definition(table_definition: &str) -> Option<String> {
1769 let [mut definition]: [_; 1] = Parser::parse_sql(table_definition)
1770 .context("unable to parse table definition")
1771 .inspect_err(|e| {
1772 tracing::error!(
1773 target: "auto_schema_change",
1774 error = %e.as_report(),
1775 "failed to parse table definition")
1776 })
1777 .unwrap()
1778 .try_into()
1779 .unwrap();
1780 if let SqlStatement::CreateTable { cdc_table_info, .. } = &mut definition {
1781 cdc_table_info
1782 .clone()
1783 .map(|cdc_table_info| cdc_table_info.external_table_name)
1784 } else {
1785 None
1786 }
1787}
1788
1789pub async fn rename_relation(
1792 txn: &DatabaseTransaction,
1793 object_type: ObjectType,
1794 object_id: ObjectId,
1795 object_name: &str,
1796) -> MetaResult<(Vec<PbObject>, String)> {
1797 use sea_orm::ActiveModelTrait;
1798
1799 use crate::controller::rename::alter_relation_rename;
1800
1801 let mut to_update_relations = vec![];
1802 macro_rules! rename_relation {
1804 ($entity:ident, $table:ident, $identity:ident, $object_id:expr) => {{
1805 let (mut relation, obj) = $entity::find_by_id($object_id)
1806 .find_also_related(Object)
1807 .one(txn)
1808 .await?
1809 .unwrap();
1810 let obj = obj.unwrap();
1811 let old_name = relation.name.clone();
1812 relation.name = object_name.into();
1813 if obj.obj_type != ObjectType::View {
1814 relation.definition = alter_relation_rename(&relation.definition, object_name);
1815 }
1816 let active_model = $table::ActiveModel {
1817 $identity: Set(relation.$identity),
1818 name: Set(object_name.into()),
1819 definition: Set(relation.definition.clone()),
1820 ..Default::default()
1821 };
1822 active_model.update(txn).await?;
1823 let streaming_job = streaming_job::Entity::find_by_id($object_id.as_raw_id())
1824 .one(txn)
1825 .await?;
1826 to_update_relations.push(PbObject {
1827 object_info: Some(PbObjectInfo::$entity(
1828 ObjectModel(relation, obj, streaming_job).into(),
1829 )),
1830 });
1831 old_name
1832 }};
1833 }
1834 let old_name = match object_type {
1836 ObjectType::Table => {
1837 let associated_source_id: Option<SourceId> = Source::find()
1838 .select_only()
1839 .column(source::Column::SourceId)
1840 .filter(source::Column::OptionalAssociatedTableId.eq(object_id))
1841 .into_tuple()
1842 .one(txn)
1843 .await?;
1844 if let Some(source_id) = associated_source_id {
1845 rename_relation!(Source, source, source_id, source_id);
1846 }
1847 rename_relation!(Table, table, table_id, object_id.as_table_id())
1848 }
1849 ObjectType::Source => {
1850 rename_relation!(Source, source, source_id, object_id.as_source_id())
1851 }
1852 ObjectType::Sink => rename_relation!(Sink, sink, sink_id, object_id.as_sink_id()),
1853 ObjectType::Subscription => {
1854 rename_relation!(
1855 Subscription,
1856 subscription,
1857 subscription_id,
1858 object_id.as_subscription_id()
1859 )
1860 }
1861 ObjectType::View => rename_relation!(View, view, view_id, object_id.as_view_id()),
1862 ObjectType::Index => {
1863 let (mut index, obj) = Index::find_by_id(object_id.as_index_id())
1864 .find_also_related(Object)
1865 .one(txn)
1866 .await?
1867 .unwrap();
1868 let streaming_job = streaming_job::Entity::find_by_id(index.index_id.as_job_id())
1869 .one(txn)
1870 .await?;
1871 index.name = object_name.into();
1872 let index_table_id = index.index_table_id;
1873 let old_name = rename_relation!(Table, table, table_id, index_table_id);
1874
1875 let active_model = index::ActiveModel {
1877 index_id: sea_orm::ActiveValue::Set(index.index_id),
1878 name: sea_orm::ActiveValue::Set(object_name.into()),
1879 ..Default::default()
1880 };
1881 active_model.update(txn).await?;
1882 to_update_relations.push(PbObject {
1883 object_info: Some(PbObjectInfo::Index(
1884 ObjectModel(index, obj.unwrap(), streaming_job).into(),
1885 )),
1886 });
1887 old_name
1888 }
1889 _ => unreachable!("only relation name can be altered."),
1890 };
1891
1892 Ok((to_update_relations, old_name))
1893}
1894
1895pub async fn get_database_resource_group<C>(txn: &C, database_id: DatabaseId) -> MetaResult<String>
1896where
1897 C: ConnectionTrait,
1898{
1899 let database_resource_group: Option<String> = Database::find_by_id(database_id)
1900 .select_only()
1901 .column(database::Column::ResourceGroup)
1902 .into_tuple()
1903 .one(txn)
1904 .await?
1905 .ok_or_else(|| MetaError::catalog_id_not_found("database", database_id))?;
1906
1907 Ok(database_resource_group.unwrap_or_else(|| DEFAULT_RESOURCE_GROUP.to_owned()))
1908}
1909
1910pub async fn get_existing_job_resource_group<C>(
1911 txn: &C,
1912 streaming_job_id: JobId,
1913) -> MetaResult<String>
1914where
1915 C: ConnectionTrait,
1916{
1917 let (job_specific_resource_group, database_resource_group): (Option<String>, Option<String>) =
1918 StreamingJob::find_by_id(streaming_job_id)
1919 .select_only()
1920 .join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
1921 .join(JoinType::InnerJoin, object::Relation::Database2.def())
1922 .column(streaming_job::Column::SpecificResourceGroup)
1923 .column(database::Column::ResourceGroup)
1924 .into_tuple()
1925 .one(txn)
1926 .await?
1927 .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", streaming_job_id))?;
1928
1929 Ok(job_specific_resource_group.unwrap_or_else(|| {
1930 database_resource_group.unwrap_or_else(|| DEFAULT_RESOURCE_GROUP.to_owned())
1931 }))
1932}
1933
1934pub fn filter_workers_by_resource_group(
1935 workers: &HashMap<WorkerId, WorkerNode>,
1936 resource_group: &str,
1937) -> BTreeSet<WorkerId> {
1938 workers
1939 .iter()
1940 .filter(|&(_, worker)| {
1941 worker
1942 .resource_group()
1943 .map(|node_label| node_label.as_str() == resource_group)
1944 .unwrap_or(false)
1945 })
1946 .map(|(id, _)| *id)
1947 .collect()
1948}
1949
1950pub async fn rename_relation_refer(
1953 txn: &DatabaseTransaction,
1954 object_type: ObjectType,
1955 object_id: ObjectId,
1956 object_name: &str,
1957 old_name: &str,
1958) -> MetaResult<Vec<PbObject>> {
1959 use sea_orm::ActiveModelTrait;
1960
1961 use crate::controller::rename::alter_relation_rename_refs;
1962
1963 let mut to_update_relations = vec![];
1964 macro_rules! rename_relation_ref {
1965 ($entity:ident, $table:ident, $identity:ident, $object_id:expr) => {{
1966 let (mut relation, obj) = $entity::find_by_id($object_id)
1967 .find_also_related(Object)
1968 .one(txn)
1969 .await?
1970 .unwrap();
1971 relation.definition =
1972 alter_relation_rename_refs(&relation.definition, old_name, object_name);
1973 let active_model = $table::ActiveModel {
1974 $identity: Set(relation.$identity),
1975 definition: Set(relation.definition.clone()),
1976 ..Default::default()
1977 };
1978 active_model.update(txn).await?;
1979 let streaming_job = streaming_job::Entity::find_by_id($object_id.as_raw_id())
1980 .one(txn)
1981 .await?;
1982 to_update_relations.push(PbObject {
1983 object_info: Some(PbObjectInfo::$entity(
1984 ObjectModel(relation, obj.unwrap(), streaming_job).into(),
1985 )),
1986 });
1987 }};
1988 }
1989 let mut objs = get_referring_objects(object_id, txn).await?;
1990 if object_type == ObjectType::Table {
1991 let incoming_sinks: Vec<SinkId> = Sink::find()
1992 .select_only()
1993 .column(sink::Column::SinkId)
1994 .filter(sink::Column::TargetTable.eq(object_id))
1995 .into_tuple()
1996 .all(txn)
1997 .await?;
1998
1999 objs.extend(incoming_sinks.into_iter().map(|id| PartialObject {
2000 oid: id.as_object_id(),
2001 obj_type: ObjectType::Sink,
2002 schema_id: None,
2003 database_id: None,
2004 }));
2005 }
2006
2007 for obj in objs {
2008 match obj.obj_type {
2009 ObjectType::Table => {
2010 rename_relation_ref!(Table, table, table_id, obj.oid.as_table_id())
2011 }
2012 ObjectType::Sink => {
2013 rename_relation_ref!(Sink, sink, sink_id, obj.oid.as_sink_id())
2014 }
2015 ObjectType::Subscription => {
2016 rename_relation_ref!(
2017 Subscription,
2018 subscription,
2019 subscription_id,
2020 obj.oid.as_subscription_id()
2021 )
2022 }
2023 ObjectType::View => rename_relation_ref!(View, view, view_id, obj.oid.as_view_id()),
2024 ObjectType::Index => {
2025 let index_table_id: Option<TableId> = Index::find_by_id(obj.oid.as_index_id())
2026 .select_only()
2027 .column(index::Column::IndexTableId)
2028 .into_tuple()
2029 .one(txn)
2030 .await?;
2031 rename_relation_ref!(Table, table, table_id, index_table_id.unwrap());
2032 }
2033 _ => {
2034 bail!(
2035 "only the table, sink, subscription, view and index will depend on other objects."
2036 )
2037 }
2038 }
2039 }
2040
2041 Ok(to_update_relations)
2042}
2043
2044pub async fn validate_subscription_deletion<C>(
2048 txn: &C,
2049 subscription_id: SubscriptionId,
2050) -> MetaResult<()>
2051where
2052 C: ConnectionTrait,
2053{
2054 let upstream_table_id: ObjectId = Subscription::find_by_id(subscription_id)
2055 .select_only()
2056 .column(subscription::Column::DependentTableId)
2057 .into_tuple()
2058 .one(txn)
2059 .await?
2060 .ok_or_else(|| MetaError::catalog_id_not_found("subscription", subscription_id))?;
2061
2062 let cnt = Subscription::find()
2063 .filter(subscription::Column::DependentTableId.eq(upstream_table_id))
2064 .count(txn)
2065 .await?;
2066 if cnt > 1 {
2067 return Ok(());
2070 }
2071
2072 let obj_alias = Alias::new("o1");
2074 let used_by_alias = Alias::new("o2");
2075 let count = ObjectDependency::find()
2076 .join_as(
2077 JoinType::InnerJoin,
2078 object_dependency::Relation::Object2.def(),
2079 obj_alias.clone(),
2080 )
2081 .join_as(
2082 JoinType::InnerJoin,
2083 object_dependency::Relation::Object1.def(),
2084 used_by_alias.clone(),
2085 )
2086 .filter(
2087 object_dependency::Column::Oid
2088 .eq(upstream_table_id)
2089 .and(object_dependency::Column::UsedBy.ne(subscription_id))
2090 .and(
2091 Expr::col((obj_alias, object::Column::DatabaseId))
2092 .ne(Expr::col((used_by_alias, object::Column::DatabaseId))),
2093 ),
2094 )
2095 .count(txn)
2096 .await?;
2097
2098 if count != 0 {
2099 return Err(MetaError::permission_denied(format!(
2100 "Referenced by {} cross-db objects.",
2101 count
2102 )));
2103 }
2104
2105 Ok(())
2106}
2107
2108pub async fn fetch_target_fragments<C>(
2109 txn: &C,
2110 src_fragment_id: impl IntoIterator<Item = FragmentId>,
2111) -> MetaResult<HashMap<FragmentId, Vec<FragmentId>>>
2112where
2113 C: ConnectionTrait,
2114{
2115 let source_target_fragments: Vec<(FragmentId, FragmentId)> = FragmentRelation::find()
2116 .select_only()
2117 .columns([
2118 fragment_relation::Column::SourceFragmentId,
2119 fragment_relation::Column::TargetFragmentId,
2120 ])
2121 .filter(fragment_relation::Column::SourceFragmentId.is_in(src_fragment_id))
2122 .into_tuple()
2123 .all(txn)
2124 .await?;
2125
2126 let source_target_fragments = source_target_fragments.into_iter().into_group_map();
2127
2128 Ok(source_target_fragments)
2129}
2130
2131pub async fn get_sink_fragment_by_ids<C>(
2132 txn: &C,
2133 sink_ids: Vec<SinkId>,
2134) -> MetaResult<HashMap<SinkId, FragmentId>>
2135where
2136 C: ConnectionTrait,
2137{
2138 let sink_num = sink_ids.len();
2139 let sink_fragment_ids: Vec<(SinkId, FragmentId)> = Fragment::find()
2140 .select_only()
2141 .columns([fragment::Column::JobId, fragment::Column::FragmentId])
2142 .filter(
2143 fragment::Column::JobId
2144 .is_in(sink_ids)
2145 .and(FragmentTypeMask::intersects(FragmentTypeFlag::Sink)),
2146 )
2147 .into_tuple()
2148 .all(txn)
2149 .await?;
2150
2151 if sink_fragment_ids.len() != sink_num {
2152 return Err(anyhow::anyhow!(
2153 "expected exactly one sink fragment for each sink, but got {} fragments for {} sinks",
2154 sink_fragment_ids.len(),
2155 sink_num
2156 )
2157 .into());
2158 }
2159
2160 Ok(sink_fragment_ids.into_iter().collect())
2161}
2162
2163pub async fn has_table_been_migrated<C>(txn: &C, table_id: TableId) -> MetaResult<bool>
2164where
2165 C: ConnectionTrait,
2166{
2167 let mview_fragment: Vec<i32> = Fragment::find()
2168 .select_only()
2169 .column(fragment::Column::FragmentTypeMask)
2170 .filter(
2171 fragment::Column::JobId
2172 .eq(table_id)
2173 .and(FragmentTypeMask::intersects(FragmentTypeFlag::Mview)),
2174 )
2175 .into_tuple()
2176 .all(txn)
2177 .await?;
2178
2179 let mview_fragment_len = mview_fragment.len();
2180 if mview_fragment_len != 1 {
2181 bail!(
2182 "expected exactly one mview fragment for table {}, found {}",
2183 table_id,
2184 mview_fragment_len
2185 );
2186 }
2187
2188 let mview_fragment = mview_fragment.into_iter().next().unwrap();
2189 let migrated =
2190 FragmentTypeMask::from(mview_fragment).contains(FragmentTypeFlag::UpstreamSinkUnion);
2191
2192 Ok(migrated)
2193}
2194
2195pub async fn try_get_iceberg_table_by_downstream_sink<C>(
2196 txn: &C,
2197 sink_id: SinkId,
2198) -> MetaResult<Option<TableId>>
2199where
2200 C: ConnectionTrait,
2201{
2202 let sink = Sink::find_by_id(sink_id).one(txn).await?;
2203 let Some(sink) = sink else {
2204 return Ok(None);
2205 };
2206
2207 if sink.name.starts_with(ICEBERG_SINK_PREFIX) {
2208 let object_ids: Vec<ObjectId> = ObjectDependency::find()
2209 .select_only()
2210 .column(object_dependency::Column::Oid)
2211 .filter(object_dependency::Column::UsedBy.eq(sink_id))
2212 .into_tuple()
2213 .all(txn)
2214 .await?;
2215 let mut iceberg_table_ids = vec![];
2216 for object_id in object_ids {
2217 let table_id = object_id.as_table_id();
2218 if let Some(table_engine) = Table::find_by_id(table_id)
2219 .select_only()
2220 .column(table::Column::Engine)
2221 .into_tuple::<table::Engine>()
2222 .one(txn)
2223 .await?
2224 && table_engine == table::Engine::Iceberg
2225 {
2226 iceberg_table_ids.push(table_id);
2227 }
2228 }
2229 if iceberg_table_ids.len() == 1 {
2230 return Ok(Some(iceberg_table_ids[0]));
2231 }
2232 }
2233 Ok(None)
2234}
2235
2236pub async fn check_if_belongs_to_iceberg_table<C>(txn: &C, job_id: JobId) -> MetaResult<bool>
2237where
2238 C: ConnectionTrait,
2239{
2240 if let Some(engine) = Table::find_by_id(job_id.as_mv_table_id())
2241 .select_only()
2242 .column(table::Column::Engine)
2243 .into_tuple::<table::Engine>()
2244 .one(txn)
2245 .await?
2246 && engine == table::Engine::Iceberg
2247 {
2248 return Ok(true);
2249 }
2250 if let Some(sink_name) = Sink::find_by_id(job_id.as_sink_id())
2251 .select_only()
2252 .column(sink::Column::Name)
2253 .into_tuple::<String>()
2254 .one(txn)
2255 .await?
2256 && sink_name.starts_with(ICEBERG_SINK_PREFIX)
2257 {
2258 return Ok(true);
2259 }
2260 Ok(false)
2261}
2262
2263pub async fn find_dirty_iceberg_table_jobs<C>(
2264 txn: &C,
2265 database_id: Option<DatabaseId>,
2266) -> MetaResult<Vec<PartialObject>>
2267where
2268 C: ConnectionTrait,
2269{
2270 let mut filter_condition = streaming_job::Column::JobStatus
2271 .ne(JobStatus::Created)
2272 .and(object::Column::ObjType.is_in([ObjectType::Table, ObjectType::Sink]))
2273 .and(streaming_job::Column::CreateType.eq(CreateType::Background));
2274 if let Some(database_id) = database_id {
2275 filter_condition = filter_condition.and(object::Column::DatabaseId.eq(database_id));
2276 }
2277 let creating_table_sink_jobs: Vec<PartialObject> = StreamingJob::find()
2278 .select_only()
2279 .columns([
2280 object::Column::Oid,
2281 object::Column::ObjType,
2282 object::Column::SchemaId,
2283 object::Column::DatabaseId,
2284 ])
2285 .join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
2286 .filter(filter_condition)
2287 .into_partial_model()
2288 .all(txn)
2289 .await?;
2290
2291 let mut dirty_iceberg_table_jobs = vec![];
2292 for job in creating_table_sink_jobs {
2293 if check_if_belongs_to_iceberg_table(txn, job.oid.as_job_id()).await? {
2294 tracing::info!("Found dirty iceberg job with id: {}", job.oid);
2295 dirty_iceberg_table_jobs.push(job);
2296 }
2297 }
2298
2299 Ok(dirty_iceberg_table_jobs)
2300}
2301
2302pub fn build_select_node_list(
2303 from: &[ColumnCatalog],
2304 to: &[ColumnCatalog],
2305) -> MetaResult<Vec<PbExprNode>> {
2306 let mut exprs = Vec::with_capacity(to.len());
2307 let idx_by_col_id = from
2308 .iter()
2309 .enumerate()
2310 .map(|(idx, col)| (col.column_desc.as_ref().unwrap().column_id, idx))
2311 .collect::<HashMap<_, _>>();
2312
2313 for to_col in to {
2314 let to_col = to_col.column_desc.as_ref().unwrap();
2315 let to_col_type_ref = to_col.column_type.as_ref().unwrap();
2316 let to_col_type = DataType::from(to_col_type_ref);
2317 if let Some(from_idx) = idx_by_col_id.get(&to_col.column_id) {
2318 let from_col_type = DataType::from(
2319 from[*from_idx]
2320 .column_desc
2321 .as_ref()
2322 .unwrap()
2323 .column_type
2324 .as_ref()
2325 .unwrap(),
2326 );
2327 if !to_col_type.equals_datatype(&from_col_type) {
2328 return Err(anyhow!(
2329 "Column type mismatch: {:?} != {:?}",
2330 from_col_type,
2331 to_col_type
2332 )
2333 .into());
2334 }
2335 exprs.push(PbExprNode {
2336 function_type: expr_node::Type::Unspecified.into(),
2337 return_type: Some(to_col_type_ref.clone()),
2338 rex_node: Some(expr_node::RexNode::InputRef(*from_idx as _)),
2339 });
2340 } else {
2341 let to_default_node =
2342 if let Some(GeneratedOrDefaultColumn::DefaultColumn(DefaultColumnDesc {
2343 expr,
2344 ..
2345 })) = &to_col.generated_or_default_column
2346 {
2347 expr.clone().unwrap()
2348 } else {
2349 let null = Datum::None.to_protobuf();
2350 PbExprNode {
2351 function_type: expr_node::Type::Unspecified.into(),
2352 return_type: Some(to_col_type_ref.clone()),
2353 rex_node: Some(expr_node::RexNode::Constant(null)),
2354 }
2355 };
2356 exprs.push(to_default_node);
2357 }
2358 }
2359
2360 Ok(exprs)
2361}
2362
2363#[derive(Clone, Debug, Default)]
2364pub struct StreamingJobExtraInfo {
2365 pub timezone: Option<String>,
2366 pub config_override: Arc<str>,
2367 pub adaptive_parallelism_strategy: Option<AdaptiveParallelismStrategy>,
2368 pub job_definition: String,
2369 pub backfill_orders: Option<BackfillOrders>,
2370}
2371
2372impl StreamingJobExtraInfo {
2373 pub fn stream_context(&self) -> StreamContext {
2374 StreamContext {
2375 timezone: self.timezone.clone(),
2376 config_override: self.config_override.clone(),
2377 adaptive_parallelism_strategy: self.adaptive_parallelism_strategy,
2378 }
2379 }
2380}
2381
2382type StreamingJobExtraInfoRow = (
2384 JobId,
2385 Option<String>,
2386 Option<String>,
2387 Option<String>,
2388 Option<BackfillOrders>,
2389);
2390
2391pub async fn get_streaming_job_extra_info<C>(
2392 txn: &C,
2393 job_ids: Vec<JobId>,
2394) -> MetaResult<HashMap<JobId, StreamingJobExtraInfo>>
2395where
2396 C: ConnectionTrait,
2397{
2398 let pairs: Vec<StreamingJobExtraInfoRow> = StreamingJob::find()
2399 .select_only()
2400 .columns([
2401 streaming_job::Column::JobId,
2402 streaming_job::Column::Timezone,
2403 streaming_job::Column::ConfigOverride,
2404 streaming_job::Column::AdaptiveParallelismStrategy,
2405 streaming_job::Column::BackfillOrders,
2406 ])
2407 .filter(streaming_job::Column::JobId.is_in(job_ids.clone()))
2408 .into_tuple()
2409 .all(txn)
2410 .await?;
2411
2412 let job_ids = job_ids.into_iter().collect();
2413
2414 let mut definitions = resolve_streaming_job_definition(txn, &job_ids).await?;
2415
2416 let result = pairs
2417 .into_iter()
2418 .map(
2419 |(job_id, timezone, config_override, strategy, backfill_orders)| {
2420 let job_definition = definitions.remove(&job_id).unwrap_or_default();
2421 let adaptive_parallelism_strategy = strategy.as_deref().map(|s| {
2422 parse_strategy(s).expect("strategy should be validated before storing")
2423 });
2424 (
2425 job_id,
2426 StreamingJobExtraInfo {
2427 timezone,
2428 config_override: config_override.unwrap_or_default().into(),
2429 adaptive_parallelism_strategy,
2430 job_definition,
2431 backfill_orders,
2432 },
2433 )
2434 },
2435 )
2436 .collect();
2437
2438 Ok(result)
2439}
2440
2441#[cfg(test)]
2442mod tests {
2443 use super::*;
2444
2445 #[test]
2446 fn test_extract_cdc_table_name() {
2447 let ddl1 = "CREATE TABLE t1 () FROM pg_source TABLE 'public.t1'";
2448 let ddl2 = "CREATE TABLE t2 (v1 int) FROM pg_source TABLE 'mydb.t2'";
2449 assert_eq!(
2450 extract_external_table_name_from_definition(ddl1),
2451 Some("public.t1".into())
2452 );
2453 assert_eq!(
2454 extract_external_table_name_from_definition(ddl2),
2455 Some("mydb.t2".into())
2456 );
2457 }
2458}