1use std::collections::{BTreeSet, HashMap, HashSet};
16use std::sync::Arc;
17
18use anyhow::{Context, anyhow};
19use itertools::Itertools;
20use risingwave_common::bitmap::Bitmap;
21use risingwave_common::catalog::{FragmentTypeFlag, FragmentTypeMask, ICEBERG_SINK_PREFIX};
22use risingwave_common::hash::{ActorMapping, VnodeBitmapExt, WorkerSlotId, WorkerSlotMapping};
23use risingwave_common::id::{JobId, SubscriptionId};
24use risingwave_common::types::{DataType, Datum};
25use risingwave_common::util::value_encoding::DatumToProtoExt;
26use risingwave_common::util::worker_util::DEFAULT_RESOURCE_GROUP;
27use risingwave_common::{bail, hash};
28use risingwave_meta_model::fragment::DistributionType;
29use risingwave_meta_model::object::ObjectType;
30use risingwave_meta_model::prelude::*;
31use risingwave_meta_model::table::TableType;
32use risingwave_meta_model::user_privilege::Action;
33use risingwave_meta_model::{
34 ActorId, ColumnCatalogArray, CreateType, DataTypeArray, DatabaseId, DispatcherType, FragmentId,
35 JobStatus, ObjectId, PrivilegeId, SchemaId, SinkId, SourceId, StreamNode, StreamSourceInfo,
36 TableId, TableIdArray, UserId, WorkerId, connection, database, fragment, fragment_relation,
37 function, index, object, object_dependency, schema, secret, sink, source, streaming_job,
38 subscription, table, user, user_default_privilege, user_privilege, view,
39};
40use risingwave_meta_model_migration::WithQuery;
41use risingwave_pb::catalog::{
42 PbConnection, PbDatabase, PbFunction, PbIndex, PbSchema, PbSecret, PbSink, PbSource,
43 PbSubscription, PbTable, PbView,
44};
45use risingwave_pb::common::WorkerNode;
46use risingwave_pb::expr::{PbExprNode, expr_node};
47use risingwave_pb::meta::object::PbObjectInfo;
48use risingwave_pb::meta::subscribe_response::Info as NotificationInfo;
49use risingwave_pb::meta::{PbFragmentWorkerSlotMapping, PbObject, PbObjectGroup};
50use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn;
51use risingwave_pb::plan_common::{ColumnCatalog, DefaultColumnDesc};
52use risingwave_pb::stream_plan::{PbDispatchOutputMapping, PbDispatcher, PbDispatcherType};
53use risingwave_pb::user::grant_privilege::{PbActionWithGrantOption, PbObject as PbGrantObject};
54use risingwave_pb::user::{PbAction, PbGrantPrivilege, PbUserInfo};
55use risingwave_sqlparser::ast::Statement as SqlStatement;
56use risingwave_sqlparser::parser::Parser;
57use sea_orm::sea_query::{
58 Alias, CommonTableExpression, Expr, Query, QueryStatementBuilder, SelectStatement, UnionType,
59 WithClause,
60};
61use sea_orm::{
62 ColumnTrait, ConnectionTrait, DatabaseTransaction, DerivePartialModel, EntityTrait,
63 FromQueryResult, IntoActiveModel, JoinType, Order, PaginatorTrait, QueryFilter, QuerySelect,
64 RelationTrait, Set, Statement,
65};
66use thiserror_ext::AsReport;
67
68use crate::barrier::{SharedActorInfos, SharedFragmentInfo};
69use crate::controller::ObjectModel;
70use crate::controller::fragment::FragmentTypeMaskExt;
71use crate::controller::scale::resolve_streaming_job_definition;
72use crate::model::{FragmentDownstreamRelation, StreamContext};
73use crate::{MetaError, MetaResult};
74
75pub fn construct_obj_dependency_query(obj_id: ObjectId) -> WithQuery {
100 let cte_alias = Alias::new("used_by_object_ids");
101 let cte_return_alias = Alias::new("used_by");
102
103 let mut base_query = SelectStatement::new()
104 .column(object_dependency::Column::UsedBy)
105 .from(ObjectDependency)
106 .and_where(object_dependency::Column::Oid.eq(obj_id))
107 .to_owned();
108
109 let belonged_obj_query = SelectStatement::new()
110 .column(object::Column::Oid)
111 .from(Object)
112 .and_where(
113 object::Column::DatabaseId
114 .eq(obj_id)
115 .or(object::Column::SchemaId.eq(obj_id)),
116 )
117 .to_owned();
118
119 let cte_referencing = Query::select()
120 .column((ObjectDependency, object_dependency::Column::UsedBy))
121 .from(ObjectDependency)
122 .inner_join(
123 cte_alias.clone(),
124 Expr::col((cte_alias.clone(), cte_return_alias.clone()))
125 .equals(object_dependency::Column::Oid),
126 )
127 .to_owned();
128
129 let mut common_table_expr = CommonTableExpression::new();
130 common_table_expr
131 .query(
132 base_query
133 .union(UnionType::All, belonged_obj_query)
134 .union(UnionType::All, cte_referencing)
135 .to_owned(),
136 )
137 .column(cte_return_alias.clone())
138 .table_name(cte_alias.clone());
139
140 SelectStatement::new()
141 .distinct()
142 .columns([
143 object::Column::Oid,
144 object::Column::ObjType,
145 object::Column::SchemaId,
146 object::Column::DatabaseId,
147 ])
148 .from(cte_alias.clone())
149 .inner_join(
150 Object,
151 Expr::col((cte_alias, cte_return_alias)).equals(object::Column::Oid),
152 )
153 .order_by(object::Column::Oid, Order::Desc)
154 .to_owned()
155 .with(
156 WithClause::new()
157 .recursive(true)
158 .cte(common_table_expr)
159 .to_owned(),
160 )
161}
162
163pub fn construct_sink_cycle_check_query(
188 target_table: ObjectId,
189 dependent_objects: Vec<ObjectId>,
190) -> WithQuery {
191 let cte_alias = Alias::new("used_by_object_ids_with_sink");
192 let depend_alias = Alias::new("obj_dependency_with_sink");
193
194 let mut base_query = SelectStatement::new()
195 .columns([
196 object_dependency::Column::Oid,
197 object_dependency::Column::UsedBy,
198 ])
199 .from(ObjectDependency)
200 .and_where(object_dependency::Column::Oid.eq(target_table))
201 .to_owned();
202
203 let query_sink_deps = SelectStatement::new()
204 .columns([sink::Column::SinkId, sink::Column::TargetTable])
205 .from(Sink)
206 .and_where(sink::Column::TargetTable.is_not_null())
207 .to_owned();
208
209 let cte_referencing = Query::select()
210 .column((depend_alias.clone(), object_dependency::Column::Oid))
211 .column((depend_alias.clone(), object_dependency::Column::UsedBy))
212 .from_subquery(
213 SelectStatement::new()
214 .columns([
215 object_dependency::Column::Oid,
216 object_dependency::Column::UsedBy,
217 ])
218 .from(ObjectDependency)
219 .union(UnionType::All, query_sink_deps)
220 .to_owned(),
221 depend_alias.clone(),
222 )
223 .inner_join(
224 cte_alias.clone(),
225 Expr::col((cte_alias.clone(), object_dependency::Column::UsedBy))
226 .eq(Expr::col((depend_alias, object_dependency::Column::Oid))),
227 )
228 .and_where(
229 Expr::col((cte_alias.clone(), object_dependency::Column::UsedBy)).ne(Expr::col((
230 cte_alias.clone(),
231 object_dependency::Column::Oid,
232 ))),
233 )
234 .to_owned();
235
236 let mut common_table_expr = CommonTableExpression::new();
237 common_table_expr
238 .query(base_query.union(UnionType::All, cte_referencing).to_owned())
239 .columns([
240 object_dependency::Column::Oid,
241 object_dependency::Column::UsedBy,
242 ])
243 .table_name(cte_alias.clone());
244
245 SelectStatement::new()
246 .expr(Expr::col((cte_alias.clone(), object_dependency::Column::UsedBy)).count())
247 .from(cte_alias.clone())
248 .and_where(
249 Expr::col((cte_alias, object_dependency::Column::UsedBy)).is_in(dependent_objects),
250 )
251 .to_owned()
252 .with(
253 WithClause::new()
254 .recursive(true)
255 .cte(common_table_expr)
256 .to_owned(),
257 )
258}
259
260#[derive(Clone, DerivePartialModel, FromQueryResult, Debug)]
261#[sea_orm(entity = "Object")]
262pub struct PartialObject {
263 pub oid: ObjectId,
264 pub obj_type: ObjectType,
265 pub schema_id: Option<SchemaId>,
266 pub database_id: Option<DatabaseId>,
267}
268
269#[derive(Clone, DerivePartialModel, FromQueryResult)]
270#[sea_orm(entity = "Fragment")]
271pub struct PartialFragmentStateTables {
272 pub fragment_id: FragmentId,
273 pub job_id: ObjectId,
274 pub state_table_ids: TableIdArray,
275}
276
277#[derive(Clone, Eq, PartialEq, Debug)]
278pub struct PartialActorLocation {
279 pub actor_id: ActorId,
280 pub fragment_id: FragmentId,
281 pub worker_id: WorkerId,
282}
283
284#[derive(FromQueryResult, Debug, Eq, PartialEq, Clone)]
285pub struct FragmentDesc {
286 pub fragment_id: FragmentId,
287 pub job_id: JobId,
288 pub fragment_type_mask: i32,
289 pub distribution_type: DistributionType,
290 pub state_table_ids: TableIdArray,
291 pub parallelism: i64,
292 pub vnode_count: i32,
293 pub stream_node: StreamNode,
294 pub parallelism_policy: String,
295}
296
297pub async fn get_referring_objects_cascade<C>(
299 obj_id: ObjectId,
300 db: &C,
301) -> MetaResult<Vec<PartialObject>>
302where
303 C: ConnectionTrait,
304{
305 let query = construct_obj_dependency_query(obj_id);
306 let (sql, values) = query.build_any(&*db.get_database_backend().get_query_builder());
307 let objects = PartialObject::find_by_statement(Statement::from_sql_and_values(
308 db.get_database_backend(),
309 sql,
310 values,
311 ))
312 .all(db)
313 .await?;
314 Ok(objects)
315}
316
317pub async fn check_sink_into_table_cycle<C>(
319 target_table: ObjectId,
320 dependent_objs: Vec<ObjectId>,
321 db: &C,
322) -> MetaResult<bool>
323where
324 C: ConnectionTrait,
325{
326 if dependent_objs.is_empty() {
327 return Ok(false);
328 }
329
330 if dependent_objs.contains(&target_table) {
332 return Ok(true);
333 }
334
335 let query = construct_sink_cycle_check_query(target_table, dependent_objs);
336 let (sql, values) = query.build_any(&*db.get_database_backend().get_query_builder());
337
338 let res = db
339 .query_one(Statement::from_sql_and_values(
340 db.get_database_backend(),
341 sql,
342 values,
343 ))
344 .await?
345 .unwrap();
346
347 let cnt: i64 = res.try_get_by(0)?;
348
349 Ok(cnt != 0)
350}
351
352pub async fn ensure_object_id<C>(
354 object_type: ObjectType,
355 obj_id: impl Into<ObjectId>,
356 db: &C,
357) -> MetaResult<()>
358where
359 C: ConnectionTrait,
360{
361 let obj_id = obj_id.into();
362 let count = Object::find_by_id(obj_id).count(db).await?;
363 if count == 0 {
364 return Err(MetaError::catalog_id_not_found(
365 object_type.as_str(),
366 obj_id,
367 ));
368 }
369 Ok(())
370}
371
372pub async fn ensure_job_not_canceled<C>(job_id: JobId, db: &C) -> MetaResult<()>
373where
374 C: ConnectionTrait,
375{
376 let count = Object::find_by_id(job_id).count(db).await?;
377 if count == 0 {
378 return Err(MetaError::cancelled(format!(
379 "job {} might be cancelled manually or by recovery",
380 job_id
381 )));
382 }
383 Ok(())
384}
385
386pub async fn ensure_user_id<C>(user_id: UserId, db: &C) -> MetaResult<()>
388where
389 C: ConnectionTrait,
390{
391 let count = User::find_by_id(user_id).count(db).await?;
392 if count == 0 {
393 return Err(anyhow!("user {} was concurrently dropped", user_id).into());
394 }
395 Ok(())
396}
397
398pub async fn check_database_name_duplicate<C>(name: &str, db: &C) -> MetaResult<()>
400where
401 C: ConnectionTrait,
402{
403 let count = Database::find()
404 .filter(database::Column::Name.eq(name))
405 .count(db)
406 .await?;
407 if count > 0 {
408 assert_eq!(count, 1);
409 return Err(MetaError::catalog_duplicated("database", name));
410 }
411 Ok(())
412}
413
414pub async fn check_function_signature_duplicate<C>(
416 pb_function: &PbFunction,
417 db: &C,
418) -> MetaResult<()>
419where
420 C: ConnectionTrait,
421{
422 let count = Function::find()
423 .inner_join(Object)
424 .filter(
425 object::Column::DatabaseId
426 .eq(pb_function.database_id)
427 .and(object::Column::SchemaId.eq(pb_function.schema_id))
428 .and(function::Column::Name.eq(&pb_function.name))
429 .and(
430 function::Column::ArgTypes
431 .eq(DataTypeArray::from(pb_function.arg_types.clone())),
432 ),
433 )
434 .count(db)
435 .await?;
436 if count > 0 {
437 assert_eq!(count, 1);
438 return Err(MetaError::catalog_duplicated("function", &pb_function.name));
439 }
440 Ok(())
441}
442
443pub async fn check_connection_name_duplicate<C>(
445 pb_connection: &PbConnection,
446 db: &C,
447) -> MetaResult<()>
448where
449 C: ConnectionTrait,
450{
451 let count = Connection::find()
452 .inner_join(Object)
453 .filter(
454 object::Column::DatabaseId
455 .eq(pb_connection.database_id)
456 .and(object::Column::SchemaId.eq(pb_connection.schema_id))
457 .and(connection::Column::Name.eq(&pb_connection.name)),
458 )
459 .count(db)
460 .await?;
461 if count > 0 {
462 assert_eq!(count, 1);
463 return Err(MetaError::catalog_duplicated(
464 "connection",
465 &pb_connection.name,
466 ));
467 }
468 Ok(())
469}
470
471pub async fn check_secret_name_duplicate<C>(pb_secret: &PbSecret, db: &C) -> MetaResult<()>
472where
473 C: ConnectionTrait,
474{
475 let count = Secret::find()
476 .inner_join(Object)
477 .filter(
478 object::Column::DatabaseId
479 .eq(pb_secret.database_id)
480 .and(object::Column::SchemaId.eq(pb_secret.schema_id))
481 .and(secret::Column::Name.eq(&pb_secret.name)),
482 )
483 .count(db)
484 .await?;
485 if count > 0 {
486 assert_eq!(count, 1);
487 return Err(MetaError::catalog_duplicated("secret", &pb_secret.name));
488 }
489 Ok(())
490}
491
492pub async fn check_subscription_name_duplicate<C>(
493 pb_subscription: &PbSubscription,
494 db: &C,
495) -> MetaResult<()>
496where
497 C: ConnectionTrait,
498{
499 let count = Subscription::find()
500 .inner_join(Object)
501 .filter(
502 object::Column::DatabaseId
503 .eq(pb_subscription.database_id)
504 .and(object::Column::SchemaId.eq(pb_subscription.schema_id))
505 .and(subscription::Column::Name.eq(&pb_subscription.name)),
506 )
507 .count(db)
508 .await?;
509 if count > 0 {
510 assert_eq!(count, 1);
511 return Err(MetaError::catalog_duplicated(
512 "subscription",
513 &pb_subscription.name,
514 ));
515 }
516 Ok(())
517}
518
519pub async fn check_user_name_duplicate<C>(name: &str, db: &C) -> MetaResult<()>
521where
522 C: ConnectionTrait,
523{
524 let count = User::find()
525 .filter(user::Column::Name.eq(name))
526 .count(db)
527 .await?;
528 if count > 0 {
529 assert_eq!(count, 1);
530 return Err(MetaError::catalog_duplicated("user", name));
531 }
532 Ok(())
533}
534
535pub async fn check_relation_name_duplicate<C>(
537 name: &str,
538 database_id: DatabaseId,
539 schema_id: SchemaId,
540 db: &C,
541) -> MetaResult<()>
542where
543 C: ConnectionTrait,
544{
545 macro_rules! check_duplicated {
546 ($obj_type:expr, $entity:ident, $table:ident) => {
547 let object_id = Object::find()
548 .select_only()
549 .column(object::Column::Oid)
550 .inner_join($entity)
551 .filter(
552 object::Column::DatabaseId
553 .eq(Some(database_id))
554 .and(object::Column::SchemaId.eq(Some(schema_id)))
555 .and($table::Column::Name.eq(name)),
556 )
557 .into_tuple::<ObjectId>()
558 .one(db)
559 .await?;
560 if let Some(oid) = object_id {
561 let check_creation = if $obj_type == ObjectType::View {
562 false
563 } else if $obj_type == ObjectType::Source {
564 let source_info = Source::find_by_id(oid.as_source_id())
565 .select_only()
566 .column(source::Column::SourceInfo)
567 .into_tuple::<Option<StreamSourceInfo>>()
568 .one(db)
569 .await?
570 .unwrap();
571 source_info.map_or(false, |info| info.to_protobuf().is_shared())
572 } else {
573 true
574 };
575 let job_id = oid.as_job_id();
576 return if check_creation
577 && !matches!(
578 StreamingJob::find_by_id(job_id)
579 .select_only()
580 .column(streaming_job::Column::JobStatus)
581 .into_tuple::<JobStatus>()
582 .one(db)
583 .await?,
584 Some(JobStatus::Created)
585 ) {
586 Err(MetaError::catalog_under_creation(
587 $obj_type.as_str(),
588 name,
589 job_id,
590 ))
591 } else {
592 Err(MetaError::catalog_duplicated($obj_type.as_str(), name))
593 };
594 }
595 };
596 }
597 check_duplicated!(ObjectType::Table, Table, table);
598 check_duplicated!(ObjectType::Source, Source, source);
599 check_duplicated!(ObjectType::Sink, Sink, sink);
600 check_duplicated!(ObjectType::Index, Index, index);
601 check_duplicated!(ObjectType::View, View, view);
602
603 Ok(())
604}
605
606pub async fn check_schema_name_duplicate<C>(
608 name: &str,
609 database_id: DatabaseId,
610 db: &C,
611) -> MetaResult<()>
612where
613 C: ConnectionTrait,
614{
615 let count = Object::find()
616 .inner_join(Schema)
617 .filter(
618 object::Column::ObjType
619 .eq(ObjectType::Schema)
620 .and(object::Column::DatabaseId.eq(Some(database_id)))
621 .and(schema::Column::Name.eq(name)),
622 )
623 .count(db)
624 .await?;
625 if count != 0 {
626 return Err(MetaError::catalog_duplicated("schema", name));
627 }
628
629 Ok(())
630}
631
632pub async fn check_object_refer_for_drop<C>(
635 object_type: ObjectType,
636 object_id: ObjectId,
637 db: &C,
638) -> MetaResult<()>
639where
640 C: ConnectionTrait,
641{
642 let count = if object_type == ObjectType::Table {
644 ObjectDependency::find()
645 .join(
646 JoinType::InnerJoin,
647 object_dependency::Relation::Object1.def(),
648 )
649 .filter(
650 object_dependency::Column::Oid
651 .eq(object_id)
652 .and(object::Column::ObjType.ne(ObjectType::Index)),
653 )
654 .count(db)
655 .await?
656 } else {
657 ObjectDependency::find()
658 .filter(object_dependency::Column::Oid.eq(object_id))
659 .count(db)
660 .await?
661 };
662 if count != 0 {
663 let referring_objects = get_referring_objects(object_id, db).await?;
665 let referring_objs_map = referring_objects
666 .into_iter()
667 .filter(|o| o.obj_type != ObjectType::Index)
668 .into_group_map_by(|o| o.obj_type);
669 let mut details = vec![];
670 for (obj_type, objs) in referring_objs_map {
671 match obj_type {
672 ObjectType::Table => {
673 let tables: Vec<(String, String)> = Object::find()
674 .join(JoinType::InnerJoin, object::Relation::Table.def())
675 .join(JoinType::InnerJoin, object::Relation::Database2.def())
676 .join(JoinType::InnerJoin, object::Relation::Schema2.def())
677 .select_only()
678 .column(schema::Column::Name)
679 .column(table::Column::Name)
680 .filter(object::Column::Oid.is_in(objs.iter().map(|o| o.oid)))
681 .into_tuple()
682 .all(db)
683 .await?;
684 details.extend(tables.into_iter().map(|(schema_name, table_name)| {
685 format!(
686 "materialized view {}.{} depends on it",
687 schema_name, table_name
688 )
689 }));
690 }
691 ObjectType::Sink => {
692 let sinks: Vec<(String, String)> = Object::find()
693 .join(JoinType::InnerJoin, object::Relation::Sink.def())
694 .join(JoinType::InnerJoin, object::Relation::Database2.def())
695 .join(JoinType::InnerJoin, object::Relation::Schema2.def())
696 .select_only()
697 .column(schema::Column::Name)
698 .column(sink::Column::Name)
699 .filter(object::Column::Oid.is_in(objs.iter().map(|o| o.oid)))
700 .into_tuple()
701 .all(db)
702 .await?;
703 if object_type == ObjectType::Table {
704 let engine = Table::find_by_id(object_id.as_table_id())
705 .select_only()
706 .column(table::Column::Engine)
707 .into_tuple::<table::Engine>()
708 .one(db)
709 .await?;
710 if engine == Some(table::Engine::Iceberg) && sinks.len() == 1 {
711 continue;
712 }
713 }
714 details.extend(sinks.into_iter().map(|(schema_name, sink_name)| {
715 format!("sink {}.{} depends on it", schema_name, sink_name)
716 }));
717 }
718 ObjectType::View => {
719 let views: Vec<(String, String)> = Object::find()
720 .join(JoinType::InnerJoin, object::Relation::View.def())
721 .join(JoinType::InnerJoin, object::Relation::Database2.def())
722 .join(JoinType::InnerJoin, object::Relation::Schema2.def())
723 .select_only()
724 .column(schema::Column::Name)
725 .column(view::Column::Name)
726 .filter(object::Column::Oid.is_in(objs.iter().map(|o| o.oid)))
727 .into_tuple()
728 .all(db)
729 .await?;
730 details.extend(views.into_iter().map(|(schema_name, view_name)| {
731 format!("view {}.{} depends on it", schema_name, view_name)
732 }));
733 }
734 ObjectType::Subscription => {
735 let subscriptions: Vec<(String, String)> = Object::find()
736 .join(JoinType::InnerJoin, object::Relation::Subscription.def())
737 .join(JoinType::InnerJoin, object::Relation::Database2.def())
738 .join(JoinType::InnerJoin, object::Relation::Schema2.def())
739 .select_only()
740 .column(schema::Column::Name)
741 .column(subscription::Column::Name)
742 .filter(object::Column::Oid.is_in(objs.iter().map(|o| o.oid)))
743 .into_tuple()
744 .all(db)
745 .await?;
746 details.extend(subscriptions.into_iter().map(
747 |(schema_name, subscription_name)| {
748 format!(
749 "subscription {}.{} depends on it",
750 schema_name, subscription_name
751 )
752 },
753 ));
754 }
755 ObjectType::Source => {
756 let sources: Vec<(String, String)> = Object::find()
757 .join(JoinType::InnerJoin, object::Relation::Source.def())
758 .join(JoinType::InnerJoin, object::Relation::Database2.def())
759 .join(JoinType::InnerJoin, object::Relation::Schema2.def())
760 .select_only()
761 .column(schema::Column::Name)
762 .column(source::Column::Name)
763 .filter(object::Column::Oid.is_in(objs.iter().map(|o| o.oid)))
764 .into_tuple()
765 .all(db)
766 .await?;
767 details.extend(sources.into_iter().map(|(schema_name, view_name)| {
768 format!("source {}.{} depends on it", schema_name, view_name)
769 }));
770 }
771 ObjectType::Connection => {
772 let connections: Vec<(String, String)> = Object::find()
773 .join(JoinType::InnerJoin, object::Relation::Connection.def())
774 .join(JoinType::InnerJoin, object::Relation::Database2.def())
775 .join(JoinType::InnerJoin, object::Relation::Schema2.def())
776 .select_only()
777 .column(schema::Column::Name)
778 .column(connection::Column::Name)
779 .filter(object::Column::Oid.is_in(objs.iter().map(|o| o.oid)))
780 .into_tuple()
781 .all(db)
782 .await?;
783 details.extend(connections.into_iter().map(|(schema_name, view_name)| {
784 format!("connection {}.{} depends on it", schema_name, view_name)
785 }));
786 }
787 _ => bail!("unexpected referring object type: {}", obj_type.as_str()),
789 }
790 }
791 if details.is_empty() {
792 return Ok(());
793 }
794
795 return Err(MetaError::permission_denied(format!(
796 "{} used by {} other objects. \nDETAIL: {}\n\
797 {}",
798 object_type.as_str(),
799 details.len(),
800 details.join("\n"),
801 match object_type {
802 ObjectType::Function | ObjectType::Connection | ObjectType::Secret =>
803 "HINT: DROP the dependent objects first.",
804 ObjectType::Database | ObjectType::Schema => unreachable!(),
805 _ => "HINT: Use DROP ... CASCADE to drop the dependent objects too.",
806 }
807 )));
808 }
809 Ok(())
810}
811
812pub async fn get_referring_objects<C>(object_id: ObjectId, db: &C) -> MetaResult<Vec<PartialObject>>
814where
815 C: ConnectionTrait,
816{
817 let objs = ObjectDependency::find()
818 .filter(object_dependency::Column::Oid.eq(object_id))
819 .join(
820 JoinType::InnerJoin,
821 object_dependency::Relation::Object1.def(),
822 )
823 .into_partial_model()
824 .all(db)
825 .await?;
826
827 Ok(objs)
828}
829
830pub async fn ensure_schema_empty<C>(schema_id: SchemaId, db: &C) -> MetaResult<()>
832where
833 C: ConnectionTrait,
834{
835 let count = Object::find()
836 .filter(object::Column::SchemaId.eq(Some(schema_id)))
837 .count(db)
838 .await?;
839 if count != 0 {
840 return Err(MetaError::permission_denied("schema is not empty"));
841 }
842
843 Ok(())
844}
845
846pub async fn list_user_info_by_ids<C>(
848 user_ids: impl IntoIterator<Item = UserId>,
849 db: &C,
850) -> MetaResult<Vec<PbUserInfo>>
851where
852 C: ConnectionTrait,
853{
854 let mut user_infos = vec![];
855 for user_id in user_ids {
856 let user = User::find_by_id(user_id)
857 .one(db)
858 .await?
859 .ok_or_else(|| MetaError::catalog_id_not_found("user", user_id))?;
860 let mut user_info: PbUserInfo = user.into();
861 user_info.grant_privileges = get_user_privilege(user_id, db).await?;
862 user_infos.push(user_info);
863 }
864 Ok(user_infos)
865}
866
867pub async fn get_object_owner<C>(object_id: ObjectId, db: &C) -> MetaResult<UserId>
869where
870 C: ConnectionTrait,
871{
872 let obj_owner: UserId = Object::find_by_id(object_id)
873 .select_only()
874 .column(object::Column::OwnerId)
875 .into_tuple()
876 .one(db)
877 .await?
878 .ok_or_else(|| MetaError::catalog_id_not_found("object", object_id))?;
879 Ok(obj_owner)
880}
881
882pub fn construct_privilege_dependency_query(ids: Vec<PrivilegeId>) -> WithQuery {
907 let cte_alias = Alias::new("granted_privilege_ids");
908 let cte_return_privilege_alias = Alias::new("id");
909 let cte_return_user_alias = Alias::new("user_id");
910
911 let mut base_query = SelectStatement::new()
912 .columns([user_privilege::Column::Id, user_privilege::Column::UserId])
913 .from(UserPrivilege)
914 .and_where(user_privilege::Column::Id.is_in(ids))
915 .to_owned();
916
917 let cte_referencing = Query::select()
918 .columns([
919 (UserPrivilege, user_privilege::Column::Id),
920 (UserPrivilege, user_privilege::Column::UserId),
921 ])
922 .from(UserPrivilege)
923 .inner_join(
924 cte_alias.clone(),
925 Expr::col((cte_alias.clone(), cte_return_privilege_alias.clone()))
926 .equals(user_privilege::Column::DependentId),
927 )
928 .to_owned();
929
930 let mut common_table_expr = CommonTableExpression::new();
931 common_table_expr
932 .query(base_query.union(UnionType::All, cte_referencing).to_owned())
933 .columns([
934 cte_return_privilege_alias.clone(),
935 cte_return_user_alias.clone(),
936 ])
937 .table_name(cte_alias.clone());
938
939 SelectStatement::new()
940 .columns([cte_return_privilege_alias, cte_return_user_alias])
941 .from(cte_alias)
942 .to_owned()
943 .with(
944 WithClause::new()
945 .recursive(true)
946 .cte(common_table_expr)
947 .to_owned(),
948 )
949}
950
951pub async fn get_internal_tables_by_id<C>(job_id: JobId, db: &C) -> MetaResult<Vec<TableId>>
952where
953 C: ConnectionTrait,
954{
955 let table_ids: Vec<TableId> = Table::find()
956 .select_only()
957 .column(table::Column::TableId)
958 .filter(
959 table::Column::TableType
960 .eq(TableType::Internal)
961 .and(table::Column::BelongsToJobId.eq(job_id)),
962 )
963 .into_tuple()
964 .all(db)
965 .await?;
966 Ok(table_ids)
967}
968
969pub async fn get_index_state_tables_by_table_id<C>(
970 table_id: TableId,
971 db: &C,
972) -> MetaResult<Vec<TableId>>
973where
974 C: ConnectionTrait,
975{
976 let mut index_table_ids: Vec<TableId> = Index::find()
977 .select_only()
978 .column(index::Column::IndexTableId)
979 .filter(index::Column::PrimaryTableId.eq(table_id))
980 .into_tuple()
981 .all(db)
982 .await?;
983
984 if !index_table_ids.is_empty() {
985 let internal_table_ids: Vec<TableId> = Table::find()
986 .select_only()
987 .column(table::Column::TableId)
988 .filter(
989 table::Column::TableType
990 .eq(TableType::Internal)
991 .and(table::Column::BelongsToJobId.is_in(index_table_ids.clone())),
992 )
993 .into_tuple()
994 .all(db)
995 .await?;
996
997 index_table_ids.extend(internal_table_ids.into_iter());
998 }
999
1000 Ok(index_table_ids)
1001}
1002
1003#[derive(Clone, DerivePartialModel, FromQueryResult)]
1004#[sea_orm(entity = "UserPrivilege")]
1005pub struct PartialUserPrivilege {
1006 pub id: PrivilegeId,
1007 pub user_id: UserId,
1008}
1009
1010pub async fn get_referring_privileges_cascade<C>(
1011 ids: Vec<PrivilegeId>,
1012 db: &C,
1013) -> MetaResult<Vec<PartialUserPrivilege>>
1014where
1015 C: ConnectionTrait,
1016{
1017 let query = construct_privilege_dependency_query(ids);
1018 let (sql, values) = query.build_any(&*db.get_database_backend().get_query_builder());
1019 let privileges = PartialUserPrivilege::find_by_statement(Statement::from_sql_and_values(
1020 db.get_database_backend(),
1021 sql,
1022 values,
1023 ))
1024 .all(db)
1025 .await?;
1026
1027 Ok(privileges)
1028}
1029
1030pub async fn ensure_privileges_not_referred<C>(ids: Vec<PrivilegeId>, db: &C) -> MetaResult<()>
1032where
1033 C: ConnectionTrait,
1034{
1035 let count = UserPrivilege::find()
1036 .filter(user_privilege::Column::DependentId.is_in(ids))
1037 .count(db)
1038 .await?;
1039 if count != 0 {
1040 return Err(MetaError::permission_denied(format!(
1041 "privileges granted to {} other ones.",
1042 count
1043 )));
1044 }
1045 Ok(())
1046}
1047
1048pub async fn get_user_privilege<C>(user_id: UserId, db: &C) -> MetaResult<Vec<PbGrantPrivilege>>
1050where
1051 C: ConnectionTrait,
1052{
1053 let user_privileges = UserPrivilege::find()
1054 .find_also_related(Object)
1055 .filter(user_privilege::Column::UserId.eq(user_id))
1056 .all(db)
1057 .await?;
1058 Ok(user_privileges
1059 .into_iter()
1060 .map(|(privilege, object)| {
1061 let object = object.unwrap();
1062 let oid = object.oid.as_raw_id();
1063 let obj = match object.obj_type {
1064 ObjectType::Database => PbGrantObject::DatabaseId(oid),
1065 ObjectType::Schema => PbGrantObject::SchemaId(oid),
1066 ObjectType::Table | ObjectType::Index => PbGrantObject::TableId(oid),
1067 ObjectType::Source => PbGrantObject::SourceId(oid),
1068 ObjectType::Sink => PbGrantObject::SinkId(oid),
1069 ObjectType::View => PbGrantObject::ViewId(oid),
1070 ObjectType::Function => PbGrantObject::FunctionId(oid),
1071 ObjectType::Connection => PbGrantObject::ConnectionId(oid),
1072 ObjectType::Subscription => PbGrantObject::SubscriptionId(oid),
1073 ObjectType::Secret => PbGrantObject::SecretId(oid),
1074 };
1075 PbGrantPrivilege {
1076 action_with_opts: vec![PbActionWithGrantOption {
1077 action: PbAction::from(privilege.action) as _,
1078 with_grant_option: privilege.with_grant_option,
1079 granted_by: privilege.granted_by as _,
1080 }],
1081 object: Some(obj),
1082 }
1083 })
1084 .collect())
1085}
1086
1087pub async fn get_table_columns(
1088 txn: &impl ConnectionTrait,
1089 id: TableId,
1090) -> MetaResult<ColumnCatalogArray> {
1091 let columns = Table::find_by_id(id)
1092 .select_only()
1093 .columns([table::Column::Columns])
1094 .into_tuple::<ColumnCatalogArray>()
1095 .one(txn)
1096 .await?
1097 .ok_or_else(|| MetaError::catalog_id_not_found("table", id))?;
1098 Ok(columns)
1099}
1100
1101pub async fn grant_default_privileges_automatically<C>(
1104 db: &C,
1105 object_id: impl Into<ObjectId>,
1106) -> MetaResult<Vec<PbUserInfo>>
1107where
1108 C: ConnectionTrait,
1109{
1110 let object_id = object_id.into();
1111 let object = Object::find_by_id(object_id)
1112 .one(db)
1113 .await?
1114 .ok_or_else(|| MetaError::catalog_id_not_found("object", object_id))?;
1115 assert_ne!(object.obj_type, ObjectType::Database);
1116
1117 let for_mview_filter = if object.obj_type == ObjectType::Table {
1118 let table_type = Table::find_by_id(object_id.as_table_id())
1119 .select_only()
1120 .column(table::Column::TableType)
1121 .into_tuple::<TableType>()
1122 .one(db)
1123 .await?
1124 .ok_or_else(|| MetaError::catalog_id_not_found("table", object_id))?;
1125 user_default_privilege::Column::ForMaterializedView
1126 .eq(table_type == TableType::MaterializedView)
1127 } else {
1128 user_default_privilege::Column::ForMaterializedView.eq(false)
1129 };
1130 let schema_filter = if let Some(schema_id) = &object.schema_id {
1131 user_default_privilege::Column::SchemaId.eq(*schema_id)
1132 } else {
1133 user_default_privilege::Column::SchemaId.is_null()
1134 };
1135
1136 let default_privileges: Vec<(UserId, UserId, Action, bool)> = UserDefaultPrivilege::find()
1137 .select_only()
1138 .columns([
1139 user_default_privilege::Column::Grantee,
1140 user_default_privilege::Column::GrantedBy,
1141 user_default_privilege::Column::Action,
1142 user_default_privilege::Column::WithGrantOption,
1143 ])
1144 .filter(
1145 user_default_privilege::Column::DatabaseId
1146 .eq(object.database_id.unwrap())
1147 .and(schema_filter)
1148 .and(user_default_privilege::Column::UserId.eq(object.owner_id))
1149 .and(user_default_privilege::Column::ObjectType.eq(object.obj_type))
1150 .and(for_mview_filter),
1151 )
1152 .into_tuple()
1153 .all(db)
1154 .await?;
1155 if default_privileges.is_empty() {
1156 return Ok(vec![]);
1157 }
1158
1159 let updated_user_ids = default_privileges
1160 .iter()
1161 .map(|(grantee, _, _, _)| *grantee)
1162 .collect::<HashSet<_>>();
1163
1164 let internal_table_ids = get_internal_tables_by_id(object_id.as_job_id(), db).await?;
1165
1166 for (grantee, granted_by, action, with_grant_option) in default_privileges {
1167 UserPrivilege::insert(user_privilege::ActiveModel {
1168 user_id: Set(grantee),
1169 oid: Set(object_id),
1170 granted_by: Set(granted_by),
1171 action: Set(action),
1172 with_grant_option: Set(with_grant_option),
1173 ..Default::default()
1174 })
1175 .exec(db)
1176 .await?;
1177 if action == Action::Select && !internal_table_ids.is_empty() {
1178 for internal_table_id in &internal_table_ids {
1180 UserPrivilege::insert(user_privilege::ActiveModel {
1181 user_id: Set(grantee),
1182 oid: Set(internal_table_id.as_object_id()),
1183 granted_by: Set(granted_by),
1184 action: Set(Action::Select),
1185 with_grant_option: Set(with_grant_option),
1186 ..Default::default()
1187 })
1188 .exec(db)
1189 .await?;
1190 }
1191 }
1192 }
1193
1194 let updated_user_infos = list_user_info_by_ids(updated_user_ids, db).await?;
1195 Ok(updated_user_infos)
1196}
1197
1198pub fn extract_grant_obj_id(object: &PbGrantObject) -> ObjectId {
1200 match object {
1201 PbGrantObject::DatabaseId(id)
1202 | PbGrantObject::SchemaId(id)
1203 | PbGrantObject::TableId(id)
1204 | PbGrantObject::SourceId(id)
1205 | PbGrantObject::SinkId(id)
1206 | PbGrantObject::ViewId(id)
1207 | PbGrantObject::FunctionId(id)
1208 | PbGrantObject::SubscriptionId(id)
1209 | PbGrantObject::ConnectionId(id)
1210 | PbGrantObject::SecretId(id) => (*id).into(),
1211 }
1212}
1213
1214pub async fn insert_fragment_relations(
1215 db: &impl ConnectionTrait,
1216 downstream_fragment_relations: &FragmentDownstreamRelation,
1217) -> MetaResult<()> {
1218 for (upstream_fragment_id, downstreams) in downstream_fragment_relations {
1219 for downstream in downstreams {
1220 let relation = fragment_relation::Model {
1221 source_fragment_id: *upstream_fragment_id as _,
1222 target_fragment_id: downstream.downstream_fragment_id as _,
1223 dispatcher_type: downstream.dispatcher_type,
1224 dist_key_indices: downstream
1225 .dist_key_indices
1226 .iter()
1227 .map(|idx| *idx as i32)
1228 .collect_vec()
1229 .into(),
1230 output_indices: downstream
1231 .output_mapping
1232 .indices
1233 .iter()
1234 .map(|idx| *idx as i32)
1235 .collect_vec()
1236 .into(),
1237 output_type_mapping: Some(downstream.output_mapping.types.clone().into()),
1238 };
1239 FragmentRelation::insert(relation.into_active_model())
1240 .exec(db)
1241 .await?;
1242 }
1243 }
1244 Ok(())
1245}
1246
1247pub fn compose_dispatchers(
1248 source_fragment_distribution: DistributionType,
1249 source_fragment_actors: &HashMap<crate::model::ActorId, Option<Bitmap>>,
1250 target_fragment_id: crate::model::FragmentId,
1251 target_fragment_distribution: DistributionType,
1252 target_fragment_actors: &HashMap<crate::model::ActorId, Option<Bitmap>>,
1253 dispatcher_type: DispatcherType,
1254 dist_key_indices: Vec<u32>,
1255 output_mapping: PbDispatchOutputMapping,
1256) -> HashMap<crate::model::ActorId, PbDispatcher> {
1257 match dispatcher_type {
1258 DispatcherType::Hash => {
1259 let dispatcher = PbDispatcher {
1260 r#type: PbDispatcherType::from(dispatcher_type) as _,
1261 dist_key_indices,
1262 output_mapping: output_mapping.into(),
1263 hash_mapping: Some(
1264 ActorMapping::from_bitmaps(
1265 &target_fragment_actors
1266 .iter()
1267 .map(|(actor_id, bitmap)| {
1268 (
1269 *actor_id as _,
1270 bitmap
1271 .clone()
1272 .expect("downstream hash dispatch must have distribution"),
1273 )
1274 })
1275 .collect(),
1276 )
1277 .to_protobuf(),
1278 ),
1279 dispatcher_id: target_fragment_id.as_raw_id() as _,
1280 downstream_actor_id: target_fragment_actors.keys().copied().collect(),
1281 };
1282 source_fragment_actors
1283 .keys()
1284 .map(|source_actor_id| (*source_actor_id, dispatcher.clone()))
1285 .collect()
1286 }
1287 DispatcherType::Broadcast | DispatcherType::Simple => {
1288 let dispatcher = PbDispatcher {
1289 r#type: PbDispatcherType::from(dispatcher_type) as _,
1290 dist_key_indices,
1291 output_mapping: output_mapping.into(),
1292 hash_mapping: None,
1293 dispatcher_id: target_fragment_id.as_raw_id() as _,
1294 downstream_actor_id: target_fragment_actors.keys().copied().collect(),
1295 };
1296 source_fragment_actors
1297 .keys()
1298 .map(|source_actor_id| (*source_actor_id, dispatcher.clone()))
1299 .collect()
1300 }
1301 DispatcherType::NoShuffle => resolve_no_shuffle_actor_dispatcher(
1302 source_fragment_distribution,
1303 source_fragment_actors,
1304 target_fragment_distribution,
1305 target_fragment_actors,
1306 )
1307 .into_iter()
1308 .map(|(upstream_actor_id, downstream_actor_id)| {
1309 (
1310 upstream_actor_id,
1311 PbDispatcher {
1312 r#type: PbDispatcherType::NoShuffle as _,
1313 dist_key_indices: dist_key_indices.clone(),
1314 output_mapping: output_mapping.clone().into(),
1315 hash_mapping: None,
1316 dispatcher_id: target_fragment_id.as_raw_id() as _,
1317 downstream_actor_id: vec![downstream_actor_id],
1318 },
1319 )
1320 })
1321 .collect(),
1322 }
1323}
1324
1325pub fn resolve_no_shuffle_actor_dispatcher(
1327 source_fragment_distribution: DistributionType,
1328 source_fragment_actors: &HashMap<crate::model::ActorId, Option<Bitmap>>,
1329 target_fragment_distribution: DistributionType,
1330 target_fragment_actors: &HashMap<crate::model::ActorId, Option<Bitmap>>,
1331) -> Vec<(crate::model::ActorId, crate::model::ActorId)> {
1332 assert_eq!(source_fragment_distribution, target_fragment_distribution);
1333 assert_eq!(
1334 source_fragment_actors.len(),
1335 target_fragment_actors.len(),
1336 "no-shuffle should have equal upstream downstream actor count: {:?} {:?}",
1337 source_fragment_actors,
1338 target_fragment_actors
1339 );
1340 match source_fragment_distribution {
1341 DistributionType::Single => {
1342 let assert_singleton = |bitmap: &Option<Bitmap>| {
1343 assert!(
1344 bitmap.as_ref().map(|bitmap| bitmap.all()).unwrap_or(true),
1345 "not singleton: {:?}",
1346 bitmap
1347 );
1348 };
1349 assert_eq!(
1350 source_fragment_actors.len(),
1351 1,
1352 "singleton distribution actor count not 1: {:?}",
1353 source_fragment_distribution
1354 );
1355 assert_eq!(
1356 target_fragment_actors.len(),
1357 1,
1358 "singleton distribution actor count not 1: {:?}",
1359 target_fragment_distribution
1360 );
1361 let (source_actor_id, bitmap) = source_fragment_actors.iter().next().unwrap();
1362 assert_singleton(bitmap);
1363 let (target_actor_id, bitmap) = target_fragment_actors.iter().next().unwrap();
1364 assert_singleton(bitmap);
1365 vec![(*source_actor_id, *target_actor_id)]
1366 }
1367 DistributionType::Hash => {
1368 let mut target_fragment_actor_index: HashMap<_, _> = target_fragment_actors
1369 .iter()
1370 .map(|(actor_id, bitmap)| {
1371 let bitmap = bitmap
1372 .as_ref()
1373 .expect("hash distribution should have bitmap");
1374 let first_vnode = bitmap.iter_vnodes().next().expect("non-empty bitmap");
1375 (first_vnode, (*actor_id, bitmap))
1376 })
1377 .collect();
1378 source_fragment_actors
1379 .iter()
1380 .map(|(source_actor_id, bitmap)| {
1381 let bitmap = bitmap
1382 .as_ref()
1383 .expect("hash distribution should have bitmap");
1384 let first_vnode = bitmap.iter_vnodes().next().expect("non-empty bitmap");
1385 let (target_actor_id, target_bitmap) =
1386 target_fragment_actor_index.remove(&first_vnode).unwrap_or_else(|| {
1387 panic!(
1388 "cannot find matched target actor: {} {:?} {:?} {:?}",
1389 source_actor_id,
1390 first_vnode,
1391 source_fragment_actors,
1392 target_fragment_actors
1393 );
1394 });
1395 assert_eq!(
1396 bitmap,
1397 target_bitmap,
1398 "cannot find matched target actor due to bitmap mismatch: {} {:?} {:?} {:?}",
1399 source_actor_id,
1400 first_vnode,
1401 source_fragment_actors,
1402 target_fragment_actors
1403 );
1404 (*source_actor_id, target_actor_id)
1405 }).collect()
1406 }
1407 }
1408}
1409
1410pub fn rebuild_fragment_mapping(fragment: &SharedFragmentInfo) -> PbFragmentWorkerSlotMapping {
1411 let fragment_worker_slot_mapping = match fragment.distribution_type {
1412 DistributionType::Single => {
1413 let actor = fragment.actors.values().exactly_one().unwrap();
1414 WorkerSlotMapping::new_single(WorkerSlotId::new(actor.worker_id as _, 0))
1415 }
1416 DistributionType::Hash => {
1417 let actor_bitmaps: HashMap<_, _> = fragment
1418 .actors
1419 .iter()
1420 .map(|(actor_id, actor_info)| {
1421 let vnode_bitmap = actor_info
1422 .vnode_bitmap
1423 .as_ref()
1424 .cloned()
1425 .expect("actor bitmap shouldn't be none in hash fragment");
1426
1427 (*actor_id as hash::ActorId, vnode_bitmap)
1428 })
1429 .collect();
1430
1431 let actor_mapping = ActorMapping::from_bitmaps(&actor_bitmaps);
1432
1433 let actor_locations = fragment
1434 .actors
1435 .iter()
1436 .map(|(actor_id, actor_info)| (*actor_id as hash::ActorId, actor_info.worker_id))
1437 .collect();
1438
1439 actor_mapping.to_worker_slot(&actor_locations)
1440 }
1441 };
1442
1443 PbFragmentWorkerSlotMapping {
1444 fragment_id: fragment.fragment_id,
1445 mapping: Some(fragment_worker_slot_mapping.to_protobuf()),
1446 }
1447}
1448
1449pub async fn get_fragments_for_jobs<C>(
1455 db: &C,
1456 actor_info: &SharedActorInfos,
1457 streaming_jobs: Vec<JobId>,
1458) -> MetaResult<(
1459 HashMap<SourceId, BTreeSet<FragmentId>>,
1460 HashSet<FragmentId>,
1461 HashSet<ActorId>,
1462 HashSet<FragmentId>,
1463)>
1464where
1465 C: ConnectionTrait,
1466{
1467 if streaming_jobs.is_empty() {
1468 return Ok((
1469 HashMap::default(),
1470 HashSet::default(),
1471 HashSet::default(),
1472 HashSet::default(),
1473 ));
1474 }
1475
1476 let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find()
1477 .select_only()
1478 .columns([
1479 fragment::Column::FragmentId,
1480 fragment::Column::FragmentTypeMask,
1481 fragment::Column::StreamNode,
1482 ])
1483 .filter(fragment::Column::JobId.is_in(streaming_jobs))
1484 .into_tuple()
1485 .all(db)
1486 .await?;
1487
1488 let fragment_ids: HashSet<_> = fragments
1489 .iter()
1490 .map(|(fragment_id, _, _)| *fragment_id)
1491 .collect();
1492
1493 let actors = {
1494 let guard = actor_info.read_guard();
1495 fragment_ids
1496 .iter()
1497 .flat_map(|id| guard.get_fragment(*id as _))
1498 .flat_map(|f| f.actors.keys().cloned().map(|id| id as _))
1499 .collect::<HashSet<_>>()
1500 };
1501
1502 let mut source_fragment_ids: HashMap<SourceId, BTreeSet<FragmentId>> = HashMap::new();
1503 let mut sink_fragment_ids: HashSet<FragmentId> = HashSet::new();
1504 for (fragment_id, mask, stream_node) in fragments {
1505 if FragmentTypeMask::from(mask).contains(FragmentTypeFlag::Source)
1506 && let Some(source_id) = stream_node.to_protobuf().find_stream_source()
1507 {
1508 source_fragment_ids
1509 .entry(source_id)
1510 .or_default()
1511 .insert(fragment_id);
1512 }
1513 if FragmentTypeMask::from(mask).contains(FragmentTypeFlag::Sink) {
1514 sink_fragment_ids.insert(fragment_id);
1515 }
1516 }
1517
1518 Ok((
1519 source_fragment_ids,
1520 sink_fragment_ids,
1521 actors.into_iter().collect(),
1522 fragment_ids,
1523 ))
1524}
1525
1526pub(crate) fn build_object_group_for_delete(
1531 partial_objects: Vec<PartialObject>,
1532) -> NotificationInfo {
1533 let mut objects = vec![];
1534 for obj in partial_objects {
1535 match obj.obj_type {
1536 ObjectType::Database => objects.push(PbObject {
1537 object_info: Some(PbObjectInfo::Database(PbDatabase {
1538 id: obj.oid.as_database_id(),
1539 ..Default::default()
1540 })),
1541 }),
1542 ObjectType::Schema => objects.push(PbObject {
1543 object_info: Some(PbObjectInfo::Schema(PbSchema {
1544 id: obj.oid.as_schema_id(),
1545 database_id: obj.database_id.unwrap(),
1546 ..Default::default()
1547 })),
1548 }),
1549 ObjectType::Table => objects.push(PbObject {
1550 object_info: Some(PbObjectInfo::Table(PbTable {
1551 id: obj.oid.as_table_id(),
1552 schema_id: obj.schema_id.unwrap(),
1553 database_id: obj.database_id.unwrap(),
1554 ..Default::default()
1555 })),
1556 }),
1557 ObjectType::Source => objects.push(PbObject {
1558 object_info: Some(PbObjectInfo::Source(PbSource {
1559 id: obj.oid.as_source_id(),
1560 schema_id: obj.schema_id.unwrap(),
1561 database_id: obj.database_id.unwrap(),
1562 ..Default::default()
1563 })),
1564 }),
1565 ObjectType::Sink => objects.push(PbObject {
1566 object_info: Some(PbObjectInfo::Sink(PbSink {
1567 id: obj.oid.as_sink_id(),
1568 schema_id: obj.schema_id.unwrap(),
1569 database_id: obj.database_id.unwrap(),
1570 ..Default::default()
1571 })),
1572 }),
1573 ObjectType::Subscription => objects.push(PbObject {
1574 object_info: Some(PbObjectInfo::Subscription(PbSubscription {
1575 id: obj.oid.as_subscription_id(),
1576 schema_id: obj.schema_id.unwrap(),
1577 database_id: obj.database_id.unwrap(),
1578 ..Default::default()
1579 })),
1580 }),
1581 ObjectType::View => objects.push(PbObject {
1582 object_info: Some(PbObjectInfo::View(PbView {
1583 id: obj.oid.as_view_id(),
1584 schema_id: obj.schema_id.unwrap(),
1585 database_id: obj.database_id.unwrap(),
1586 ..Default::default()
1587 })),
1588 }),
1589 ObjectType::Index => {
1590 objects.push(PbObject {
1591 object_info: Some(PbObjectInfo::Index(PbIndex {
1592 id: obj.oid.as_index_id(),
1593 schema_id: obj.schema_id.unwrap(),
1594 database_id: obj.database_id.unwrap(),
1595 ..Default::default()
1596 })),
1597 });
1598 objects.push(PbObject {
1599 object_info: Some(PbObjectInfo::Table(PbTable {
1600 id: obj.oid.as_table_id(),
1601 schema_id: obj.schema_id.unwrap(),
1602 database_id: obj.database_id.unwrap(),
1603 ..Default::default()
1604 })),
1605 });
1606 }
1607 ObjectType::Function => objects.push(PbObject {
1608 object_info: Some(PbObjectInfo::Function(PbFunction {
1609 id: obj.oid.as_function_id(),
1610 schema_id: obj.schema_id.unwrap(),
1611 database_id: obj.database_id.unwrap(),
1612 ..Default::default()
1613 })),
1614 }),
1615 ObjectType::Connection => objects.push(PbObject {
1616 object_info: Some(PbObjectInfo::Connection(PbConnection {
1617 id: obj.oid.as_connection_id(),
1618 schema_id: obj.schema_id.unwrap(),
1619 database_id: obj.database_id.unwrap(),
1620 ..Default::default()
1621 })),
1622 }),
1623 ObjectType::Secret => objects.push(PbObject {
1624 object_info: Some(PbObjectInfo::Secret(PbSecret {
1625 id: obj.oid.as_secret_id(),
1626 schema_id: obj.schema_id.unwrap(),
1627 database_id: obj.database_id.unwrap(),
1628 ..Default::default()
1629 })),
1630 }),
1631 }
1632 }
1633 NotificationInfo::ObjectGroup(PbObjectGroup { objects })
1634}
1635
1636pub fn extract_external_table_name_from_definition(table_definition: &str) -> Option<String> {
1637 let [mut definition]: [_; 1] = Parser::parse_sql(table_definition)
1638 .context("unable to parse table definition")
1639 .inspect_err(|e| {
1640 tracing::error!(
1641 target: "auto_schema_change",
1642 error = %e.as_report(),
1643 "failed to parse table definition")
1644 })
1645 .unwrap()
1646 .try_into()
1647 .unwrap();
1648 if let SqlStatement::CreateTable { cdc_table_info, .. } = &mut definition {
1649 cdc_table_info
1650 .clone()
1651 .map(|cdc_table_info| cdc_table_info.external_table_name)
1652 } else {
1653 None
1654 }
1655}
1656
1657pub async fn rename_relation(
1660 txn: &DatabaseTransaction,
1661 object_type: ObjectType,
1662 object_id: ObjectId,
1663 object_name: &str,
1664) -> MetaResult<(Vec<PbObject>, String)> {
1665 use sea_orm::ActiveModelTrait;
1666
1667 use crate::controller::rename::alter_relation_rename;
1668
1669 let mut to_update_relations = vec![];
1670 macro_rules! rename_relation {
1672 ($entity:ident, $table:ident, $identity:ident, $object_id:expr) => {{
1673 let (mut relation, obj) = $entity::find_by_id($object_id)
1674 .find_also_related(Object)
1675 .one(txn)
1676 .await?
1677 .unwrap();
1678 let obj = obj.unwrap();
1679 let old_name = relation.name.clone();
1680 relation.name = object_name.into();
1681 if obj.obj_type != ObjectType::View {
1682 relation.definition = alter_relation_rename(&relation.definition, object_name);
1683 }
1684 let active_model = $table::ActiveModel {
1685 $identity: Set(relation.$identity),
1686 name: Set(object_name.into()),
1687 definition: Set(relation.definition.clone()),
1688 ..Default::default()
1689 };
1690 active_model.update(txn).await?;
1691 to_update_relations.push(PbObject {
1692 object_info: Some(PbObjectInfo::$entity(ObjectModel(relation, obj).into())),
1693 });
1694 old_name
1695 }};
1696 }
1697 let old_name = match object_type {
1699 ObjectType::Table => {
1700 let associated_source_id: Option<SourceId> = Source::find()
1701 .select_only()
1702 .column(source::Column::SourceId)
1703 .filter(source::Column::OptionalAssociatedTableId.eq(object_id))
1704 .into_tuple()
1705 .one(txn)
1706 .await?;
1707 if let Some(source_id) = associated_source_id {
1708 rename_relation!(Source, source, source_id, source_id);
1709 }
1710 rename_relation!(Table, table, table_id, object_id.as_table_id())
1711 }
1712 ObjectType::Source => {
1713 rename_relation!(Source, source, source_id, object_id.as_source_id())
1714 }
1715 ObjectType::Sink => rename_relation!(Sink, sink, sink_id, object_id.as_sink_id()),
1716 ObjectType::Subscription => {
1717 rename_relation!(
1718 Subscription,
1719 subscription,
1720 subscription_id,
1721 object_id.as_subscription_id()
1722 )
1723 }
1724 ObjectType::View => rename_relation!(View, view, view_id, object_id.as_view_id()),
1725 ObjectType::Index => {
1726 let (mut index, obj) = Index::find_by_id(object_id.as_index_id())
1727 .find_also_related(Object)
1728 .one(txn)
1729 .await?
1730 .unwrap();
1731 index.name = object_name.into();
1732 let index_table_id = index.index_table_id;
1733 let old_name = rename_relation!(Table, table, table_id, index_table_id);
1734
1735 let active_model = index::ActiveModel {
1737 index_id: sea_orm::ActiveValue::Set(index.index_id),
1738 name: sea_orm::ActiveValue::Set(object_name.into()),
1739 ..Default::default()
1740 };
1741 active_model.update(txn).await?;
1742 to_update_relations.push(PbObject {
1743 object_info: Some(PbObjectInfo::Index(ObjectModel(index, obj.unwrap()).into())),
1744 });
1745 old_name
1746 }
1747 _ => unreachable!("only relation name can be altered."),
1748 };
1749
1750 Ok((to_update_relations, old_name))
1751}
1752
1753pub async fn get_database_resource_group<C>(txn: &C, database_id: DatabaseId) -> MetaResult<String>
1754where
1755 C: ConnectionTrait,
1756{
1757 let database_resource_group: Option<String> = Database::find_by_id(database_id)
1758 .select_only()
1759 .column(database::Column::ResourceGroup)
1760 .into_tuple()
1761 .one(txn)
1762 .await?
1763 .ok_or_else(|| MetaError::catalog_id_not_found("database", database_id))?;
1764
1765 Ok(database_resource_group.unwrap_or_else(|| DEFAULT_RESOURCE_GROUP.to_owned()))
1766}
1767
1768pub async fn get_existing_job_resource_group<C>(
1769 txn: &C,
1770 streaming_job_id: JobId,
1771) -> MetaResult<String>
1772where
1773 C: ConnectionTrait,
1774{
1775 let (job_specific_resource_group, database_resource_group): (Option<String>, Option<String>) =
1776 StreamingJob::find_by_id(streaming_job_id)
1777 .select_only()
1778 .join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
1779 .join(JoinType::InnerJoin, object::Relation::Database2.def())
1780 .column(streaming_job::Column::SpecificResourceGroup)
1781 .column(database::Column::ResourceGroup)
1782 .into_tuple()
1783 .one(txn)
1784 .await?
1785 .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", streaming_job_id))?;
1786
1787 Ok(job_specific_resource_group.unwrap_or_else(|| {
1788 database_resource_group.unwrap_or_else(|| DEFAULT_RESOURCE_GROUP.to_owned())
1789 }))
1790}
1791
1792pub fn filter_workers_by_resource_group(
1793 workers: &HashMap<WorkerId, WorkerNode>,
1794 resource_group: &str,
1795) -> BTreeSet<WorkerId> {
1796 workers
1797 .iter()
1798 .filter(|&(_, worker)| {
1799 worker
1800 .resource_group()
1801 .map(|node_label| node_label.as_str() == resource_group)
1802 .unwrap_or(false)
1803 })
1804 .map(|(id, _)| *id)
1805 .collect()
1806}
1807
1808pub async fn rename_relation_refer(
1811 txn: &DatabaseTransaction,
1812 object_type: ObjectType,
1813 object_id: ObjectId,
1814 object_name: &str,
1815 old_name: &str,
1816) -> MetaResult<Vec<PbObject>> {
1817 use sea_orm::ActiveModelTrait;
1818
1819 use crate::controller::rename::alter_relation_rename_refs;
1820
1821 let mut to_update_relations = vec![];
1822 macro_rules! rename_relation_ref {
1823 ($entity:ident, $table:ident, $identity:ident, $object_id:expr) => {{
1824 let (mut relation, obj) = $entity::find_by_id($object_id)
1825 .find_also_related(Object)
1826 .one(txn)
1827 .await?
1828 .unwrap();
1829 relation.definition =
1830 alter_relation_rename_refs(&relation.definition, old_name, object_name);
1831 let active_model = $table::ActiveModel {
1832 $identity: Set(relation.$identity),
1833 definition: Set(relation.definition.clone()),
1834 ..Default::default()
1835 };
1836 active_model.update(txn).await?;
1837 to_update_relations.push(PbObject {
1838 object_info: Some(PbObjectInfo::$entity(
1839 ObjectModel(relation, obj.unwrap()).into(),
1840 )),
1841 });
1842 }};
1843 }
1844 let mut objs = get_referring_objects(object_id, txn).await?;
1845 if object_type == ObjectType::Table {
1846 let incoming_sinks: Vec<SinkId> = Sink::find()
1847 .select_only()
1848 .column(sink::Column::SinkId)
1849 .filter(sink::Column::TargetTable.eq(object_id))
1850 .into_tuple()
1851 .all(txn)
1852 .await?;
1853
1854 objs.extend(incoming_sinks.into_iter().map(|id| PartialObject {
1855 oid: id.as_object_id(),
1856 obj_type: ObjectType::Sink,
1857 schema_id: None,
1858 database_id: None,
1859 }));
1860 }
1861
1862 for obj in objs {
1863 match obj.obj_type {
1864 ObjectType::Table => {
1865 rename_relation_ref!(Table, table, table_id, obj.oid.as_table_id())
1866 }
1867 ObjectType::Sink => {
1868 rename_relation_ref!(Sink, sink, sink_id, obj.oid.as_sink_id())
1869 }
1870 ObjectType::Subscription => {
1871 rename_relation_ref!(
1872 Subscription,
1873 subscription,
1874 subscription_id,
1875 obj.oid.as_subscription_id()
1876 )
1877 }
1878 ObjectType::View => rename_relation_ref!(View, view, view_id, obj.oid.as_view_id()),
1879 ObjectType::Index => {
1880 let index_table_id: Option<TableId> = Index::find_by_id(obj.oid.as_index_id())
1881 .select_only()
1882 .column(index::Column::IndexTableId)
1883 .into_tuple()
1884 .one(txn)
1885 .await?;
1886 rename_relation_ref!(Table, table, table_id, index_table_id.unwrap());
1887 }
1888 _ => {
1889 bail!(
1890 "only the table, sink, subscription, view and index will depend on other objects."
1891 )
1892 }
1893 }
1894 }
1895
1896 Ok(to_update_relations)
1897}
1898
1899pub async fn validate_subscription_deletion<C>(
1903 txn: &C,
1904 subscription_id: SubscriptionId,
1905) -> MetaResult<()>
1906where
1907 C: ConnectionTrait,
1908{
1909 let upstream_table_id: ObjectId = Subscription::find_by_id(subscription_id)
1910 .select_only()
1911 .column(subscription::Column::DependentTableId)
1912 .into_tuple()
1913 .one(txn)
1914 .await?
1915 .ok_or_else(|| MetaError::catalog_id_not_found("subscription", subscription_id))?;
1916
1917 let cnt = Subscription::find()
1918 .filter(subscription::Column::DependentTableId.eq(upstream_table_id))
1919 .count(txn)
1920 .await?;
1921 if cnt > 1 {
1922 return Ok(());
1925 }
1926
1927 let obj_alias = Alias::new("o1");
1929 let used_by_alias = Alias::new("o2");
1930 let count = ObjectDependency::find()
1931 .join_as(
1932 JoinType::InnerJoin,
1933 object_dependency::Relation::Object2.def(),
1934 obj_alias.clone(),
1935 )
1936 .join_as(
1937 JoinType::InnerJoin,
1938 object_dependency::Relation::Object1.def(),
1939 used_by_alias.clone(),
1940 )
1941 .filter(
1942 object_dependency::Column::Oid
1943 .eq(upstream_table_id)
1944 .and(object_dependency::Column::UsedBy.ne(subscription_id))
1945 .and(
1946 Expr::col((obj_alias, object::Column::DatabaseId))
1947 .ne(Expr::col((used_by_alias, object::Column::DatabaseId))),
1948 ),
1949 )
1950 .count(txn)
1951 .await?;
1952
1953 if count != 0 {
1954 return Err(MetaError::permission_denied(format!(
1955 "Referenced by {} cross-db objects.",
1956 count
1957 )));
1958 }
1959
1960 Ok(())
1961}
1962
1963pub async fn fetch_target_fragments<C>(
1964 txn: &C,
1965 src_fragment_id: impl IntoIterator<Item = FragmentId>,
1966) -> MetaResult<HashMap<FragmentId, Vec<FragmentId>>>
1967where
1968 C: ConnectionTrait,
1969{
1970 let source_target_fragments: Vec<(FragmentId, FragmentId)> = FragmentRelation::find()
1971 .select_only()
1972 .columns([
1973 fragment_relation::Column::SourceFragmentId,
1974 fragment_relation::Column::TargetFragmentId,
1975 ])
1976 .filter(fragment_relation::Column::SourceFragmentId.is_in(src_fragment_id))
1977 .into_tuple()
1978 .all(txn)
1979 .await?;
1980
1981 let source_target_fragments = source_target_fragments.into_iter().into_group_map();
1982
1983 Ok(source_target_fragments)
1984}
1985
1986pub async fn get_sink_fragment_by_ids<C>(
1987 txn: &C,
1988 sink_ids: Vec<SinkId>,
1989) -> MetaResult<HashMap<SinkId, FragmentId>>
1990where
1991 C: ConnectionTrait,
1992{
1993 let sink_num = sink_ids.len();
1994 let sink_fragment_ids: Vec<(SinkId, FragmentId)> = Fragment::find()
1995 .select_only()
1996 .columns([fragment::Column::JobId, fragment::Column::FragmentId])
1997 .filter(
1998 fragment::Column::JobId
1999 .is_in(sink_ids)
2000 .and(FragmentTypeMask::intersects(FragmentTypeFlag::Sink)),
2001 )
2002 .into_tuple()
2003 .all(txn)
2004 .await?;
2005
2006 if sink_fragment_ids.len() != sink_num {
2007 return Err(anyhow::anyhow!(
2008 "expected exactly one sink fragment for each sink, but got {} fragments for {} sinks",
2009 sink_fragment_ids.len(),
2010 sink_num
2011 )
2012 .into());
2013 }
2014
2015 Ok(sink_fragment_ids.into_iter().collect())
2016}
2017
2018pub async fn has_table_been_migrated<C>(txn: &C, table_id: TableId) -> MetaResult<bool>
2019where
2020 C: ConnectionTrait,
2021{
2022 let mview_fragment: Vec<i32> = Fragment::find()
2023 .select_only()
2024 .column(fragment::Column::FragmentTypeMask)
2025 .filter(
2026 fragment::Column::JobId
2027 .eq(table_id)
2028 .and(FragmentTypeMask::intersects(FragmentTypeFlag::Mview)),
2029 )
2030 .into_tuple()
2031 .all(txn)
2032 .await?;
2033
2034 let mview_fragment_len = mview_fragment.len();
2035 if mview_fragment_len != 1 {
2036 bail!(
2037 "expected exactly one mview fragment for table {}, found {}",
2038 table_id,
2039 mview_fragment_len
2040 );
2041 }
2042
2043 let mview_fragment = mview_fragment.into_iter().next().unwrap();
2044 let migrated =
2045 FragmentTypeMask::from(mview_fragment).contains(FragmentTypeFlag::UpstreamSinkUnion);
2046
2047 Ok(migrated)
2048}
2049
2050pub async fn try_get_iceberg_table_by_downstream_sink<C>(
2051 txn: &C,
2052 sink_id: SinkId,
2053) -> MetaResult<Option<TableId>>
2054where
2055 C: ConnectionTrait,
2056{
2057 let sink = Sink::find_by_id(sink_id).one(txn).await?;
2058 let Some(sink) = sink else {
2059 return Ok(None);
2060 };
2061
2062 if sink.name.starts_with(ICEBERG_SINK_PREFIX) {
2063 let object_ids: Vec<ObjectId> = ObjectDependency::find()
2064 .select_only()
2065 .column(object_dependency::Column::Oid)
2066 .filter(object_dependency::Column::UsedBy.eq(sink_id))
2067 .into_tuple()
2068 .all(txn)
2069 .await?;
2070 let mut iceberg_table_ids = vec![];
2071 for object_id in object_ids {
2072 let table_id = object_id.as_table_id();
2073 if let Some(table_engine) = Table::find_by_id(table_id)
2074 .select_only()
2075 .column(table::Column::Engine)
2076 .into_tuple::<table::Engine>()
2077 .one(txn)
2078 .await?
2079 && table_engine == table::Engine::Iceberg
2080 {
2081 iceberg_table_ids.push(table_id);
2082 }
2083 }
2084 if iceberg_table_ids.len() == 1 {
2085 return Ok(Some(iceberg_table_ids[0]));
2086 }
2087 }
2088 Ok(None)
2089}
2090
2091pub async fn check_if_belongs_to_iceberg_table<C>(txn: &C, job_id: JobId) -> MetaResult<bool>
2092where
2093 C: ConnectionTrait,
2094{
2095 if let Some(engine) = Table::find_by_id(job_id.as_mv_table_id())
2096 .select_only()
2097 .column(table::Column::Engine)
2098 .into_tuple::<table::Engine>()
2099 .one(txn)
2100 .await?
2101 && engine == table::Engine::Iceberg
2102 {
2103 return Ok(true);
2104 }
2105 if let Some(sink_name) = Sink::find_by_id(job_id.as_sink_id())
2106 .select_only()
2107 .column(sink::Column::Name)
2108 .into_tuple::<String>()
2109 .one(txn)
2110 .await?
2111 && sink_name.starts_with(ICEBERG_SINK_PREFIX)
2112 {
2113 return Ok(true);
2114 }
2115 Ok(false)
2116}
2117
2118pub async fn find_dirty_iceberg_table_jobs<C>(
2119 txn: &C,
2120 database_id: Option<DatabaseId>,
2121) -> MetaResult<Vec<PartialObject>>
2122where
2123 C: ConnectionTrait,
2124{
2125 let mut filter_condition = streaming_job::Column::JobStatus
2126 .ne(JobStatus::Created)
2127 .and(object::Column::ObjType.is_in([ObjectType::Table, ObjectType::Sink]))
2128 .and(streaming_job::Column::CreateType.eq(CreateType::Background));
2129 if let Some(database_id) = database_id {
2130 filter_condition = filter_condition.and(object::Column::DatabaseId.eq(database_id));
2131 }
2132 let creating_table_sink_jobs: Vec<PartialObject> = StreamingJob::find()
2133 .select_only()
2134 .columns([
2135 object::Column::Oid,
2136 object::Column::ObjType,
2137 object::Column::SchemaId,
2138 object::Column::DatabaseId,
2139 ])
2140 .join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
2141 .filter(filter_condition)
2142 .into_partial_model()
2143 .all(txn)
2144 .await?;
2145
2146 let mut dirty_iceberg_table_jobs = vec![];
2147 for job in creating_table_sink_jobs {
2148 if check_if_belongs_to_iceberg_table(txn, job.oid.as_job_id()).await? {
2149 tracing::info!("Found dirty iceberg job with id: {}", job.oid);
2150 dirty_iceberg_table_jobs.push(job);
2151 }
2152 }
2153
2154 Ok(dirty_iceberg_table_jobs)
2155}
2156
2157pub fn build_select_node_list(
2158 from: &[ColumnCatalog],
2159 to: &[ColumnCatalog],
2160) -> MetaResult<Vec<PbExprNode>> {
2161 let mut exprs = Vec::with_capacity(to.len());
2162 let idx_by_col_id = from
2163 .iter()
2164 .enumerate()
2165 .map(|(idx, col)| (col.column_desc.as_ref().unwrap().column_id, idx))
2166 .collect::<HashMap<_, _>>();
2167
2168 for to_col in to {
2169 let to_col = to_col.column_desc.as_ref().unwrap();
2170 let to_col_type_ref = to_col.column_type.as_ref().unwrap();
2171 let to_col_type = DataType::from(to_col_type_ref);
2172 if let Some(from_idx) = idx_by_col_id.get(&to_col.column_id) {
2173 let from_col_type = DataType::from(
2174 from[*from_idx]
2175 .column_desc
2176 .as_ref()
2177 .unwrap()
2178 .column_type
2179 .as_ref()
2180 .unwrap(),
2181 );
2182 if !to_col_type.equals_datatype(&from_col_type) {
2183 return Err(anyhow!(
2184 "Column type mismatch: {:?} != {:?}",
2185 from_col_type,
2186 to_col_type
2187 )
2188 .into());
2189 }
2190 exprs.push(PbExprNode {
2191 function_type: expr_node::Type::Unspecified.into(),
2192 return_type: Some(to_col_type_ref.clone()),
2193 rex_node: Some(expr_node::RexNode::InputRef(*from_idx as _)),
2194 });
2195 } else {
2196 let to_default_node =
2197 if let Some(GeneratedOrDefaultColumn::DefaultColumn(DefaultColumnDesc {
2198 expr,
2199 ..
2200 })) = &to_col.generated_or_default_column
2201 {
2202 expr.clone().unwrap()
2203 } else {
2204 let null = Datum::None.to_protobuf();
2205 PbExprNode {
2206 function_type: expr_node::Type::Unspecified.into(),
2207 return_type: Some(to_col_type_ref.clone()),
2208 rex_node: Some(expr_node::RexNode::Constant(null)),
2209 }
2210 };
2211 exprs.push(to_default_node);
2212 }
2213 }
2214
2215 Ok(exprs)
2216}
2217
2218#[derive(Clone, Debug, Default)]
2219pub struct StreamingJobExtraInfo {
2220 pub timezone: Option<String>,
2221 pub config_override: Arc<str>,
2222 pub job_definition: String,
2223}
2224
2225impl StreamingJobExtraInfo {
2226 pub fn stream_context(&self) -> StreamContext {
2227 StreamContext {
2228 timezone: self.timezone.clone(),
2229 config_override: self.config_override.clone(),
2230 }
2231 }
2232}
2233
2234pub async fn get_streaming_job_extra_info<C>(
2235 txn: &C,
2236 job_ids: Vec<JobId>,
2237) -> MetaResult<HashMap<JobId, StreamingJobExtraInfo>>
2238where
2239 C: ConnectionTrait,
2240{
2241 let pairs: Vec<(JobId, Option<String>, Option<String>)> = StreamingJob::find()
2242 .select_only()
2243 .columns([
2244 streaming_job::Column::JobId,
2245 streaming_job::Column::Timezone,
2246 streaming_job::Column::ConfigOverride,
2247 ])
2248 .filter(streaming_job::Column::JobId.is_in(job_ids.clone()))
2249 .into_tuple()
2250 .all(txn)
2251 .await?;
2252
2253 let job_ids = job_ids.into_iter().collect();
2254
2255 let mut definitions = resolve_streaming_job_definition(txn, &job_ids).await?;
2256
2257 let result = pairs
2258 .into_iter()
2259 .map(|(job_id, timezone, config_override)| {
2260 let job_definition = definitions.remove(&job_id).unwrap_or_default();
2261 (
2262 job_id,
2263 StreamingJobExtraInfo {
2264 timezone,
2265 config_override: config_override.unwrap_or_default().into(),
2266 job_definition,
2267 },
2268 )
2269 })
2270 .collect();
2271
2272 Ok(result)
2273}
2274
2275#[cfg(test)]
2276mod tests {
2277 use super::*;
2278
2279 #[test]
2280 fn test_extract_cdc_table_name() {
2281 let ddl1 = "CREATE TABLE t1 () FROM pg_source TABLE 'public.t1'";
2282 let ddl2 = "CREATE TABLE t2 (v1 int) FROM pg_source TABLE 'mydb.t2'";
2283 assert_eq!(
2284 extract_external_table_name_from_definition(ddl1),
2285 Some("public.t1".into())
2286 );
2287 assert_eq!(
2288 extract_external_table_name_from_definition(ddl2),
2289 Some("mydb.t2".into())
2290 );
2291 }
2292}