1use std::collections::{BTreeSet, HashMap, HashSet};
16
17use anyhow::{Context, anyhow};
18use itertools::Itertools;
19use risingwave_common::bitmap::Bitmap;
20use risingwave_common::catalog::{FragmentTypeFlag, FragmentTypeMask, ICEBERG_SINK_PREFIX};
21use risingwave_common::hash::{ActorMapping, VnodeBitmapExt, WorkerSlotId, WorkerSlotMapping};
22use risingwave_common::id::{JobId, SubscriptionId};
23use risingwave_common::types::Datum;
24use risingwave_common::util::value_encoding::DatumToProtoExt;
25use risingwave_common::util::worker_util::DEFAULT_RESOURCE_GROUP;
26use risingwave_common::{bail, hash};
27use risingwave_meta_model::fragment::DistributionType;
28use risingwave_meta_model::object::ObjectType;
29use risingwave_meta_model::prelude::*;
30use risingwave_meta_model::table::TableType;
31use risingwave_meta_model::user_privilege::Action;
32use risingwave_meta_model::{
33 ActorId, ColumnCatalogArray, CreateType, DataTypeArray, DatabaseId, DispatcherType, FragmentId,
34 JobStatus, ObjectId, PrivilegeId, SchemaId, SinkId, SourceId, StreamNode, StreamSourceInfo,
35 TableId, TableIdArray, UserId, WorkerId, connection, database, fragment, fragment_relation,
36 function, index, object, object_dependency, schema, secret, sink, source, streaming_job,
37 subscription, table, user, user_default_privilege, user_privilege, view,
38};
39use risingwave_meta_model_migration::WithQuery;
40use risingwave_pb::catalog::{
41 PbConnection, PbDatabase, PbFunction, PbIndex, PbSchema, PbSecret, PbSink, PbSource,
42 PbSubscription, PbTable, PbView,
43};
44use risingwave_pb::common::WorkerNode;
45use risingwave_pb::expr::{PbExprNode, expr_node};
46use risingwave_pb::meta::object::PbObjectInfo;
47use risingwave_pb::meta::subscribe_response::Info as NotificationInfo;
48use risingwave_pb::meta::{PbFragmentWorkerSlotMapping, PbObject, PbObjectGroup};
49use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn;
50use risingwave_pb::plan_common::{ColumnCatalog, DefaultColumnDesc};
51use risingwave_pb::stream_plan::{PbDispatchOutputMapping, PbDispatcher, PbDispatcherType};
52use risingwave_pb::user::grant_privilege::{PbActionWithGrantOption, PbObject as PbGrantObject};
53use risingwave_pb::user::{PbAction, PbGrantPrivilege, PbUserInfo};
54use risingwave_sqlparser::ast::Statement as SqlStatement;
55use risingwave_sqlparser::parser::Parser;
56use sea_orm::sea_query::{
57 Alias, CommonTableExpression, Expr, Query, QueryStatementBuilder, SelectStatement, UnionType,
58 WithClause,
59};
60use sea_orm::{
61 ColumnTrait, ConnectionTrait, DatabaseTransaction, DerivePartialModel, EntityTrait,
62 FromQueryResult, IntoActiveModel, JoinType, Order, PaginatorTrait, QueryFilter, QuerySelect,
63 RelationTrait, Set, Statement,
64};
65use thiserror_ext::AsReport;
66
67use crate::barrier::{SharedActorInfos, SharedFragmentInfo};
68use crate::controller::ObjectModel;
69use crate::controller::fragment::FragmentTypeMaskExt;
70use crate::controller::scale::resolve_streaming_job_definition;
71use crate::model::FragmentDownstreamRelation;
72use crate::{MetaError, MetaResult};
73
74pub fn construct_obj_dependency_query(obj_id: ObjectId) -> WithQuery {
99 let cte_alias = Alias::new("used_by_object_ids");
100 let cte_return_alias = Alias::new("used_by");
101
102 let mut base_query = SelectStatement::new()
103 .column(object_dependency::Column::UsedBy)
104 .from(ObjectDependency)
105 .and_where(object_dependency::Column::Oid.eq(obj_id))
106 .to_owned();
107
108 let belonged_obj_query = SelectStatement::new()
109 .column(object::Column::Oid)
110 .from(Object)
111 .and_where(
112 object::Column::DatabaseId
113 .eq(obj_id)
114 .or(object::Column::SchemaId.eq(obj_id)),
115 )
116 .to_owned();
117
118 let cte_referencing = Query::select()
119 .column((ObjectDependency, object_dependency::Column::UsedBy))
120 .from(ObjectDependency)
121 .inner_join(
122 cte_alias.clone(),
123 Expr::col((cte_alias.clone(), cte_return_alias.clone()))
124 .equals(object_dependency::Column::Oid),
125 )
126 .to_owned();
127
128 let mut common_table_expr = CommonTableExpression::new();
129 common_table_expr
130 .query(
131 base_query
132 .union(UnionType::All, belonged_obj_query)
133 .union(UnionType::All, cte_referencing)
134 .to_owned(),
135 )
136 .column(cte_return_alias.clone())
137 .table_name(cte_alias.clone());
138
139 SelectStatement::new()
140 .distinct()
141 .columns([
142 object::Column::Oid,
143 object::Column::ObjType,
144 object::Column::SchemaId,
145 object::Column::DatabaseId,
146 ])
147 .from(cte_alias.clone())
148 .inner_join(
149 Object,
150 Expr::col((cte_alias, cte_return_alias)).equals(object::Column::Oid),
151 )
152 .order_by(object::Column::Oid, Order::Desc)
153 .to_owned()
154 .with(
155 WithClause::new()
156 .recursive(true)
157 .cte(common_table_expr)
158 .to_owned(),
159 )
160}
161
162pub fn construct_sink_cycle_check_query(
187 target_table: ObjectId,
188 dependent_objects: Vec<ObjectId>,
189) -> WithQuery {
190 let cte_alias = Alias::new("used_by_object_ids_with_sink");
191 let depend_alias = Alias::new("obj_dependency_with_sink");
192
193 let mut base_query = SelectStatement::new()
194 .columns([
195 object_dependency::Column::Oid,
196 object_dependency::Column::UsedBy,
197 ])
198 .from(ObjectDependency)
199 .and_where(object_dependency::Column::Oid.eq(target_table))
200 .to_owned();
201
202 let query_sink_deps = SelectStatement::new()
203 .columns([sink::Column::SinkId, sink::Column::TargetTable])
204 .from(Sink)
205 .and_where(sink::Column::TargetTable.is_not_null())
206 .to_owned();
207
208 let cte_referencing = Query::select()
209 .column((depend_alias.clone(), object_dependency::Column::Oid))
210 .column((depend_alias.clone(), object_dependency::Column::UsedBy))
211 .from_subquery(
212 SelectStatement::new()
213 .columns([
214 object_dependency::Column::Oid,
215 object_dependency::Column::UsedBy,
216 ])
217 .from(ObjectDependency)
218 .union(UnionType::All, query_sink_deps)
219 .to_owned(),
220 depend_alias.clone(),
221 )
222 .inner_join(
223 cte_alias.clone(),
224 Expr::col((cte_alias.clone(), object_dependency::Column::UsedBy))
225 .eq(Expr::col((depend_alias, object_dependency::Column::Oid))),
226 )
227 .and_where(
228 Expr::col((cte_alias.clone(), object_dependency::Column::UsedBy)).ne(Expr::col((
229 cte_alias.clone(),
230 object_dependency::Column::Oid,
231 ))),
232 )
233 .to_owned();
234
235 let mut common_table_expr = CommonTableExpression::new();
236 common_table_expr
237 .query(base_query.union(UnionType::All, cte_referencing).to_owned())
238 .columns([
239 object_dependency::Column::Oid,
240 object_dependency::Column::UsedBy,
241 ])
242 .table_name(cte_alias.clone());
243
244 SelectStatement::new()
245 .expr(Expr::col((cte_alias.clone(), object_dependency::Column::UsedBy)).count())
246 .from(cte_alias.clone())
247 .and_where(
248 Expr::col((cte_alias, object_dependency::Column::UsedBy)).is_in(dependent_objects),
249 )
250 .to_owned()
251 .with(
252 WithClause::new()
253 .recursive(true)
254 .cte(common_table_expr)
255 .to_owned(),
256 )
257}
258
259#[derive(Clone, DerivePartialModel, FromQueryResult, Debug)]
260#[sea_orm(entity = "Object")]
261pub struct PartialObject {
262 pub oid: ObjectId,
263 pub obj_type: ObjectType,
264 pub schema_id: Option<SchemaId>,
265 pub database_id: Option<DatabaseId>,
266}
267
268#[derive(Clone, DerivePartialModel, FromQueryResult)]
269#[sea_orm(entity = "Fragment")]
270pub struct PartialFragmentStateTables {
271 pub fragment_id: FragmentId,
272 pub job_id: ObjectId,
273 pub state_table_ids: TableIdArray,
274}
275
276#[derive(Clone, Eq, PartialEq, Debug)]
277pub struct PartialActorLocation {
278 pub actor_id: ActorId,
279 pub fragment_id: FragmentId,
280 pub worker_id: WorkerId,
281}
282
283#[derive(FromQueryResult, Debug, Eq, PartialEq, Clone)]
284pub struct FragmentDesc {
285 pub fragment_id: FragmentId,
286 pub job_id: JobId,
287 pub fragment_type_mask: i32,
288 pub distribution_type: DistributionType,
289 pub state_table_ids: TableIdArray,
290 pub parallelism: i64,
291 pub vnode_count: i32,
292 pub stream_node: StreamNode,
293 pub parallelism_policy: String,
294}
295
296pub async fn get_referring_objects_cascade<C>(
298 obj_id: ObjectId,
299 db: &C,
300) -> MetaResult<Vec<PartialObject>>
301where
302 C: ConnectionTrait,
303{
304 let query = construct_obj_dependency_query(obj_id);
305 let (sql, values) = query.build_any(&*db.get_database_backend().get_query_builder());
306 let objects = PartialObject::find_by_statement(Statement::from_sql_and_values(
307 db.get_database_backend(),
308 sql,
309 values,
310 ))
311 .all(db)
312 .await?;
313 Ok(objects)
314}
315
316pub async fn check_sink_into_table_cycle<C>(
318 target_table: ObjectId,
319 dependent_objs: Vec<ObjectId>,
320 db: &C,
321) -> MetaResult<bool>
322where
323 C: ConnectionTrait,
324{
325 if dependent_objs.is_empty() {
326 return Ok(false);
327 }
328
329 if dependent_objs.contains(&target_table) {
331 return Ok(true);
332 }
333
334 let query = construct_sink_cycle_check_query(target_table, dependent_objs);
335 let (sql, values) = query.build_any(&*db.get_database_backend().get_query_builder());
336
337 let res = db
338 .query_one(Statement::from_sql_and_values(
339 db.get_database_backend(),
340 sql,
341 values,
342 ))
343 .await?
344 .unwrap();
345
346 let cnt: i64 = res.try_get_by(0)?;
347
348 Ok(cnt != 0)
349}
350
351pub async fn ensure_object_id<C>(
353 object_type: ObjectType,
354 obj_id: impl Into<ObjectId>,
355 db: &C,
356) -> MetaResult<()>
357where
358 C: ConnectionTrait,
359{
360 let obj_id = obj_id.into();
361 let count = Object::find_by_id(obj_id).count(db).await?;
362 if count == 0 {
363 return Err(MetaError::catalog_id_not_found(
364 object_type.as_str(),
365 obj_id,
366 ));
367 }
368 Ok(())
369}
370
371pub async fn ensure_job_not_canceled<C>(job_id: JobId, db: &C) -> MetaResult<()>
372where
373 C: ConnectionTrait,
374{
375 let count = Object::find_by_id(job_id).count(db).await?;
376 if count == 0 {
377 return Err(MetaError::cancelled(format!(
378 "job {} might be cancelled manually or by recovery",
379 job_id
380 )));
381 }
382 Ok(())
383}
384
385pub async fn ensure_user_id<C>(user_id: UserId, db: &C) -> MetaResult<()>
387where
388 C: ConnectionTrait,
389{
390 let count = User::find_by_id(user_id).count(db).await?;
391 if count == 0 {
392 return Err(anyhow!("user {} was concurrently dropped", user_id).into());
393 }
394 Ok(())
395}
396
397pub async fn check_database_name_duplicate<C>(name: &str, db: &C) -> MetaResult<()>
399where
400 C: ConnectionTrait,
401{
402 let count = Database::find()
403 .filter(database::Column::Name.eq(name))
404 .count(db)
405 .await?;
406 if count > 0 {
407 assert_eq!(count, 1);
408 return Err(MetaError::catalog_duplicated("database", name));
409 }
410 Ok(())
411}
412
413pub async fn check_function_signature_duplicate<C>(
415 pb_function: &PbFunction,
416 db: &C,
417) -> MetaResult<()>
418where
419 C: ConnectionTrait,
420{
421 let count = Function::find()
422 .inner_join(Object)
423 .filter(
424 object::Column::DatabaseId
425 .eq(pb_function.database_id)
426 .and(object::Column::SchemaId.eq(pb_function.schema_id))
427 .and(function::Column::Name.eq(&pb_function.name))
428 .and(
429 function::Column::ArgTypes
430 .eq(DataTypeArray::from(pb_function.arg_types.clone())),
431 ),
432 )
433 .count(db)
434 .await?;
435 if count > 0 {
436 assert_eq!(count, 1);
437 return Err(MetaError::catalog_duplicated("function", &pb_function.name));
438 }
439 Ok(())
440}
441
442pub async fn check_connection_name_duplicate<C>(
444 pb_connection: &PbConnection,
445 db: &C,
446) -> MetaResult<()>
447where
448 C: ConnectionTrait,
449{
450 let count = Connection::find()
451 .inner_join(Object)
452 .filter(
453 object::Column::DatabaseId
454 .eq(pb_connection.database_id)
455 .and(object::Column::SchemaId.eq(pb_connection.schema_id))
456 .and(connection::Column::Name.eq(&pb_connection.name)),
457 )
458 .count(db)
459 .await?;
460 if count > 0 {
461 assert_eq!(count, 1);
462 return Err(MetaError::catalog_duplicated(
463 "connection",
464 &pb_connection.name,
465 ));
466 }
467 Ok(())
468}
469
470pub async fn check_secret_name_duplicate<C>(pb_secret: &PbSecret, db: &C) -> MetaResult<()>
471where
472 C: ConnectionTrait,
473{
474 let count = Secret::find()
475 .inner_join(Object)
476 .filter(
477 object::Column::DatabaseId
478 .eq(pb_secret.database_id)
479 .and(object::Column::SchemaId.eq(pb_secret.schema_id))
480 .and(secret::Column::Name.eq(&pb_secret.name)),
481 )
482 .count(db)
483 .await?;
484 if count > 0 {
485 assert_eq!(count, 1);
486 return Err(MetaError::catalog_duplicated("secret", &pb_secret.name));
487 }
488 Ok(())
489}
490
491pub async fn check_subscription_name_duplicate<C>(
492 pb_subscription: &PbSubscription,
493 db: &C,
494) -> MetaResult<()>
495where
496 C: ConnectionTrait,
497{
498 let count = Subscription::find()
499 .inner_join(Object)
500 .filter(
501 object::Column::DatabaseId
502 .eq(pb_subscription.database_id)
503 .and(object::Column::SchemaId.eq(pb_subscription.schema_id))
504 .and(subscription::Column::Name.eq(&pb_subscription.name)),
505 )
506 .count(db)
507 .await?;
508 if count > 0 {
509 assert_eq!(count, 1);
510 return Err(MetaError::catalog_duplicated(
511 "subscription",
512 &pb_subscription.name,
513 ));
514 }
515 Ok(())
516}
517
518pub async fn check_user_name_duplicate<C>(name: &str, db: &C) -> MetaResult<()>
520where
521 C: ConnectionTrait,
522{
523 let count = User::find()
524 .filter(user::Column::Name.eq(name))
525 .count(db)
526 .await?;
527 if count > 0 {
528 assert_eq!(count, 1);
529 return Err(MetaError::catalog_duplicated("user", name));
530 }
531 Ok(())
532}
533
534pub async fn check_relation_name_duplicate<C>(
536 name: &str,
537 database_id: DatabaseId,
538 schema_id: SchemaId,
539 db: &C,
540) -> MetaResult<()>
541where
542 C: ConnectionTrait,
543{
544 macro_rules! check_duplicated {
545 ($obj_type:expr, $entity:ident, $table:ident) => {
546 let object_id = Object::find()
547 .select_only()
548 .column(object::Column::Oid)
549 .inner_join($entity)
550 .filter(
551 object::Column::DatabaseId
552 .eq(Some(database_id))
553 .and(object::Column::SchemaId.eq(Some(schema_id)))
554 .and($table::Column::Name.eq(name)),
555 )
556 .into_tuple::<ObjectId>()
557 .one(db)
558 .await?;
559 if let Some(oid) = object_id {
560 let check_creation = if $obj_type == ObjectType::View {
561 false
562 } else if $obj_type == ObjectType::Source {
563 let source_info = Source::find_by_id(oid.as_source_id())
564 .select_only()
565 .column(source::Column::SourceInfo)
566 .into_tuple::<Option<StreamSourceInfo>>()
567 .one(db)
568 .await?
569 .unwrap();
570 source_info.map_or(false, |info| info.to_protobuf().is_shared())
571 } else {
572 true
573 };
574 let job_id = oid.as_job_id();
575 return if check_creation
576 && !matches!(
577 StreamingJob::find_by_id(job_id)
578 .select_only()
579 .column(streaming_job::Column::JobStatus)
580 .into_tuple::<JobStatus>()
581 .one(db)
582 .await?,
583 Some(JobStatus::Created)
584 ) {
585 Err(MetaError::catalog_under_creation(
586 $obj_type.as_str(),
587 name,
588 job_id,
589 ))
590 } else {
591 Err(MetaError::catalog_duplicated($obj_type.as_str(), name))
592 };
593 }
594 };
595 }
596 check_duplicated!(ObjectType::Table, Table, table);
597 check_duplicated!(ObjectType::Source, Source, source);
598 check_duplicated!(ObjectType::Sink, Sink, sink);
599 check_duplicated!(ObjectType::Index, Index, index);
600 check_duplicated!(ObjectType::View, View, view);
601
602 Ok(())
603}
604
605pub async fn check_schema_name_duplicate<C>(
607 name: &str,
608 database_id: DatabaseId,
609 db: &C,
610) -> MetaResult<()>
611where
612 C: ConnectionTrait,
613{
614 let count = Object::find()
615 .inner_join(Schema)
616 .filter(
617 object::Column::ObjType
618 .eq(ObjectType::Schema)
619 .and(object::Column::DatabaseId.eq(Some(database_id)))
620 .and(schema::Column::Name.eq(name)),
621 )
622 .count(db)
623 .await?;
624 if count != 0 {
625 return Err(MetaError::catalog_duplicated("schema", name));
626 }
627
628 Ok(())
629}
630
631pub async fn check_object_refer_for_drop<C>(
634 object_type: ObjectType,
635 object_id: ObjectId,
636 db: &C,
637) -> MetaResult<()>
638where
639 C: ConnectionTrait,
640{
641 let count = if object_type == ObjectType::Table {
643 ObjectDependency::find()
644 .join(
645 JoinType::InnerJoin,
646 object_dependency::Relation::Object1.def(),
647 )
648 .filter(
649 object_dependency::Column::Oid
650 .eq(object_id)
651 .and(object::Column::ObjType.ne(ObjectType::Index)),
652 )
653 .count(db)
654 .await?
655 } else {
656 ObjectDependency::find()
657 .filter(object_dependency::Column::Oid.eq(object_id))
658 .count(db)
659 .await?
660 };
661 if count != 0 {
662 let referring_objects = get_referring_objects(object_id, db).await?;
664 let referring_objs_map = referring_objects
665 .into_iter()
666 .filter(|o| o.obj_type != ObjectType::Index)
667 .into_group_map_by(|o| o.obj_type);
668 let mut details = vec![];
669 for (obj_type, objs) in referring_objs_map {
670 match obj_type {
671 ObjectType::Table => {
672 let tables: Vec<(String, String)> = Object::find()
673 .join(JoinType::InnerJoin, object::Relation::Table.def())
674 .join(JoinType::InnerJoin, object::Relation::Database2.def())
675 .join(JoinType::InnerJoin, object::Relation::Schema2.def())
676 .select_only()
677 .column(schema::Column::Name)
678 .column(table::Column::Name)
679 .filter(object::Column::Oid.is_in(objs.iter().map(|o| o.oid)))
680 .into_tuple()
681 .all(db)
682 .await?;
683 details.extend(tables.into_iter().map(|(schema_name, table_name)| {
684 format!(
685 "materialized view {}.{} depends on it",
686 schema_name, table_name
687 )
688 }));
689 }
690 ObjectType::Sink => {
691 let sinks: Vec<(String, String)> = Object::find()
692 .join(JoinType::InnerJoin, object::Relation::Sink.def())
693 .join(JoinType::InnerJoin, object::Relation::Database2.def())
694 .join(JoinType::InnerJoin, object::Relation::Schema2.def())
695 .select_only()
696 .column(schema::Column::Name)
697 .column(sink::Column::Name)
698 .filter(object::Column::Oid.is_in(objs.iter().map(|o| o.oid)))
699 .into_tuple()
700 .all(db)
701 .await?;
702 if object_type == ObjectType::Table {
703 let engine = Table::find_by_id(object_id.as_table_id())
704 .select_only()
705 .column(table::Column::Engine)
706 .into_tuple::<table::Engine>()
707 .one(db)
708 .await?;
709 if engine == Some(table::Engine::Iceberg) && sinks.len() == 1 {
710 continue;
711 }
712 }
713 details.extend(sinks.into_iter().map(|(schema_name, sink_name)| {
714 format!("sink {}.{} depends on it", schema_name, sink_name)
715 }));
716 }
717 ObjectType::View => {
718 let views: Vec<(String, String)> = Object::find()
719 .join(JoinType::InnerJoin, object::Relation::View.def())
720 .join(JoinType::InnerJoin, object::Relation::Database2.def())
721 .join(JoinType::InnerJoin, object::Relation::Schema2.def())
722 .select_only()
723 .column(schema::Column::Name)
724 .column(view::Column::Name)
725 .filter(object::Column::Oid.is_in(objs.iter().map(|o| o.oid)))
726 .into_tuple()
727 .all(db)
728 .await?;
729 details.extend(views.into_iter().map(|(schema_name, view_name)| {
730 format!("view {}.{} depends on it", schema_name, view_name)
731 }));
732 }
733 ObjectType::Subscription => {
734 let subscriptions: Vec<(String, String)> = Object::find()
735 .join(JoinType::InnerJoin, object::Relation::Subscription.def())
736 .join(JoinType::InnerJoin, object::Relation::Database2.def())
737 .join(JoinType::InnerJoin, object::Relation::Schema2.def())
738 .select_only()
739 .column(schema::Column::Name)
740 .column(subscription::Column::Name)
741 .filter(object::Column::Oid.is_in(objs.iter().map(|o| o.oid)))
742 .into_tuple()
743 .all(db)
744 .await?;
745 details.extend(subscriptions.into_iter().map(
746 |(schema_name, subscription_name)| {
747 format!(
748 "subscription {}.{} depends on it",
749 schema_name, subscription_name
750 )
751 },
752 ));
753 }
754 ObjectType::Source => {
755 let sources: Vec<(String, String)> = Object::find()
756 .join(JoinType::InnerJoin, object::Relation::Source.def())
757 .join(JoinType::InnerJoin, object::Relation::Database2.def())
758 .join(JoinType::InnerJoin, object::Relation::Schema2.def())
759 .select_only()
760 .column(schema::Column::Name)
761 .column(source::Column::Name)
762 .filter(object::Column::Oid.is_in(objs.iter().map(|o| o.oid)))
763 .into_tuple()
764 .all(db)
765 .await?;
766 details.extend(sources.into_iter().map(|(schema_name, view_name)| {
767 format!("source {}.{} depends on it", schema_name, view_name)
768 }));
769 }
770 ObjectType::Connection => {
771 let connections: Vec<(String, String)> = Object::find()
772 .join(JoinType::InnerJoin, object::Relation::Connection.def())
773 .join(JoinType::InnerJoin, object::Relation::Database2.def())
774 .join(JoinType::InnerJoin, object::Relation::Schema2.def())
775 .select_only()
776 .column(schema::Column::Name)
777 .column(connection::Column::Name)
778 .filter(object::Column::Oid.is_in(objs.iter().map(|o| o.oid)))
779 .into_tuple()
780 .all(db)
781 .await?;
782 details.extend(connections.into_iter().map(|(schema_name, view_name)| {
783 format!("connection {}.{} depends on it", schema_name, view_name)
784 }));
785 }
786 _ => bail!("unexpected referring object type: {}", obj_type.as_str()),
788 }
789 }
790 if details.is_empty() {
791 return Ok(());
792 }
793
794 return Err(MetaError::permission_denied(format!(
795 "{} used by {} other objects. \nDETAIL: {}\n\
796 {}",
797 object_type.as_str(),
798 details.len(),
799 details.join("\n"),
800 match object_type {
801 ObjectType::Function | ObjectType::Connection | ObjectType::Secret =>
802 "HINT: DROP the dependent objects first.",
803 ObjectType::Database | ObjectType::Schema => unreachable!(),
804 _ => "HINT: Use DROP ... CASCADE to drop the dependent objects too.",
805 }
806 )));
807 }
808 Ok(())
809}
810
811pub async fn get_referring_objects<C>(object_id: ObjectId, db: &C) -> MetaResult<Vec<PartialObject>>
813where
814 C: ConnectionTrait,
815{
816 let objs = ObjectDependency::find()
817 .filter(object_dependency::Column::Oid.eq(object_id))
818 .join(
819 JoinType::InnerJoin,
820 object_dependency::Relation::Object1.def(),
821 )
822 .into_partial_model()
823 .all(db)
824 .await?;
825
826 Ok(objs)
827}
828
829pub async fn ensure_schema_empty<C>(schema_id: SchemaId, db: &C) -> MetaResult<()>
831where
832 C: ConnectionTrait,
833{
834 let count = Object::find()
835 .filter(object::Column::SchemaId.eq(Some(schema_id)))
836 .count(db)
837 .await?;
838 if count != 0 {
839 return Err(MetaError::permission_denied("schema is not empty"));
840 }
841
842 Ok(())
843}
844
845pub async fn list_user_info_by_ids<C>(
847 user_ids: impl IntoIterator<Item = UserId>,
848 db: &C,
849) -> MetaResult<Vec<PbUserInfo>>
850where
851 C: ConnectionTrait,
852{
853 let mut user_infos = vec![];
854 for user_id in user_ids {
855 let user = User::find_by_id(user_id)
856 .one(db)
857 .await?
858 .ok_or_else(|| MetaError::catalog_id_not_found("user", user_id))?;
859 let mut user_info: PbUserInfo = user.into();
860 user_info.grant_privileges = get_user_privilege(user_id, db).await?;
861 user_infos.push(user_info);
862 }
863 Ok(user_infos)
864}
865
866pub async fn get_object_owner<C>(object_id: ObjectId, db: &C) -> MetaResult<UserId>
868where
869 C: ConnectionTrait,
870{
871 let obj_owner: UserId = Object::find_by_id(object_id)
872 .select_only()
873 .column(object::Column::OwnerId)
874 .into_tuple()
875 .one(db)
876 .await?
877 .ok_or_else(|| MetaError::catalog_id_not_found("object", object_id))?;
878 Ok(obj_owner)
879}
880
881pub fn construct_privilege_dependency_query(ids: Vec<PrivilegeId>) -> WithQuery {
906 let cte_alias = Alias::new("granted_privilege_ids");
907 let cte_return_privilege_alias = Alias::new("id");
908 let cte_return_user_alias = Alias::new("user_id");
909
910 let mut base_query = SelectStatement::new()
911 .columns([user_privilege::Column::Id, user_privilege::Column::UserId])
912 .from(UserPrivilege)
913 .and_where(user_privilege::Column::Id.is_in(ids))
914 .to_owned();
915
916 let cte_referencing = Query::select()
917 .columns([
918 (UserPrivilege, user_privilege::Column::Id),
919 (UserPrivilege, user_privilege::Column::UserId),
920 ])
921 .from(UserPrivilege)
922 .inner_join(
923 cte_alias.clone(),
924 Expr::col((cte_alias.clone(), cte_return_privilege_alias.clone()))
925 .equals(user_privilege::Column::DependentId),
926 )
927 .to_owned();
928
929 let mut common_table_expr = CommonTableExpression::new();
930 common_table_expr
931 .query(base_query.union(UnionType::All, cte_referencing).to_owned())
932 .columns([
933 cte_return_privilege_alias.clone(),
934 cte_return_user_alias.clone(),
935 ])
936 .table_name(cte_alias.clone());
937
938 SelectStatement::new()
939 .columns([cte_return_privilege_alias, cte_return_user_alias])
940 .from(cte_alias)
941 .to_owned()
942 .with(
943 WithClause::new()
944 .recursive(true)
945 .cte(common_table_expr)
946 .to_owned(),
947 )
948}
949
950pub async fn get_internal_tables_by_id<C>(job_id: JobId, db: &C) -> MetaResult<Vec<TableId>>
951where
952 C: ConnectionTrait,
953{
954 let table_ids: Vec<TableId> = Table::find()
955 .select_only()
956 .column(table::Column::TableId)
957 .filter(
958 table::Column::TableType
959 .eq(TableType::Internal)
960 .and(table::Column::BelongsToJobId.eq(job_id)),
961 )
962 .into_tuple()
963 .all(db)
964 .await?;
965 Ok(table_ids)
966}
967
968pub async fn get_index_state_tables_by_table_id<C>(
969 table_id: TableId,
970 db: &C,
971) -> MetaResult<Vec<TableId>>
972where
973 C: ConnectionTrait,
974{
975 let mut index_table_ids: Vec<TableId> = Index::find()
976 .select_only()
977 .column(index::Column::IndexTableId)
978 .filter(index::Column::PrimaryTableId.eq(table_id))
979 .into_tuple()
980 .all(db)
981 .await?;
982
983 if !index_table_ids.is_empty() {
984 let internal_table_ids: Vec<TableId> = Table::find()
985 .select_only()
986 .column(table::Column::TableId)
987 .filter(
988 table::Column::TableType
989 .eq(TableType::Internal)
990 .and(table::Column::BelongsToJobId.is_in(index_table_ids.clone())),
991 )
992 .into_tuple()
993 .all(db)
994 .await?;
995
996 index_table_ids.extend(internal_table_ids.into_iter());
997 }
998
999 Ok(index_table_ids)
1000}
1001
1002#[derive(Clone, DerivePartialModel, FromQueryResult)]
1003#[sea_orm(entity = "UserPrivilege")]
1004pub struct PartialUserPrivilege {
1005 pub id: PrivilegeId,
1006 pub user_id: UserId,
1007}
1008
1009pub async fn get_referring_privileges_cascade<C>(
1010 ids: Vec<PrivilegeId>,
1011 db: &C,
1012) -> MetaResult<Vec<PartialUserPrivilege>>
1013where
1014 C: ConnectionTrait,
1015{
1016 let query = construct_privilege_dependency_query(ids);
1017 let (sql, values) = query.build_any(&*db.get_database_backend().get_query_builder());
1018 let privileges = PartialUserPrivilege::find_by_statement(Statement::from_sql_and_values(
1019 db.get_database_backend(),
1020 sql,
1021 values,
1022 ))
1023 .all(db)
1024 .await?;
1025
1026 Ok(privileges)
1027}
1028
1029pub async fn ensure_privileges_not_referred<C>(ids: Vec<PrivilegeId>, db: &C) -> MetaResult<()>
1031where
1032 C: ConnectionTrait,
1033{
1034 let count = UserPrivilege::find()
1035 .filter(user_privilege::Column::DependentId.is_in(ids))
1036 .count(db)
1037 .await?;
1038 if count != 0 {
1039 return Err(MetaError::permission_denied(format!(
1040 "privileges granted to {} other ones.",
1041 count
1042 )));
1043 }
1044 Ok(())
1045}
1046
1047pub async fn get_user_privilege<C>(user_id: UserId, db: &C) -> MetaResult<Vec<PbGrantPrivilege>>
1049where
1050 C: ConnectionTrait,
1051{
1052 let user_privileges = UserPrivilege::find()
1053 .find_also_related(Object)
1054 .filter(user_privilege::Column::UserId.eq(user_id))
1055 .all(db)
1056 .await?;
1057 Ok(user_privileges
1058 .into_iter()
1059 .map(|(privilege, object)| {
1060 let object = object.unwrap();
1061 let oid = object.oid.as_raw_id();
1062 let obj = match object.obj_type {
1063 ObjectType::Database => PbGrantObject::DatabaseId(oid),
1064 ObjectType::Schema => PbGrantObject::SchemaId(oid),
1065 ObjectType::Table | ObjectType::Index => PbGrantObject::TableId(oid),
1066 ObjectType::Source => PbGrantObject::SourceId(oid),
1067 ObjectType::Sink => PbGrantObject::SinkId(oid),
1068 ObjectType::View => PbGrantObject::ViewId(oid),
1069 ObjectType::Function => PbGrantObject::FunctionId(oid),
1070 ObjectType::Connection => PbGrantObject::ConnectionId(oid),
1071 ObjectType::Subscription => PbGrantObject::SubscriptionId(oid),
1072 ObjectType::Secret => PbGrantObject::SecretId(oid),
1073 };
1074 PbGrantPrivilege {
1075 action_with_opts: vec![PbActionWithGrantOption {
1076 action: PbAction::from(privilege.action) as _,
1077 with_grant_option: privilege.with_grant_option,
1078 granted_by: privilege.granted_by as _,
1079 }],
1080 object: Some(obj),
1081 }
1082 })
1083 .collect())
1084}
1085
1086pub async fn get_table_columns(
1087 txn: &impl ConnectionTrait,
1088 id: TableId,
1089) -> MetaResult<ColumnCatalogArray> {
1090 let columns = Table::find_by_id(id)
1091 .select_only()
1092 .columns([table::Column::Columns])
1093 .into_tuple::<ColumnCatalogArray>()
1094 .one(txn)
1095 .await?
1096 .ok_or_else(|| MetaError::catalog_id_not_found("table", id))?;
1097 Ok(columns)
1098}
1099
1100pub async fn grant_default_privileges_automatically<C>(
1103 db: &C,
1104 object_id: impl Into<ObjectId>,
1105) -> MetaResult<Vec<PbUserInfo>>
1106where
1107 C: ConnectionTrait,
1108{
1109 let object_id = object_id.into();
1110 let object = Object::find_by_id(object_id)
1111 .one(db)
1112 .await?
1113 .ok_or_else(|| MetaError::catalog_id_not_found("object", object_id))?;
1114 assert_ne!(object.obj_type, ObjectType::Database);
1115
1116 let for_mview_filter = if object.obj_type == ObjectType::Table {
1117 let table_type = Table::find_by_id(object_id.as_table_id())
1118 .select_only()
1119 .column(table::Column::TableType)
1120 .into_tuple::<TableType>()
1121 .one(db)
1122 .await?
1123 .ok_or_else(|| MetaError::catalog_id_not_found("table", object_id))?;
1124 user_default_privilege::Column::ForMaterializedView
1125 .eq(table_type == TableType::MaterializedView)
1126 } else {
1127 user_default_privilege::Column::ForMaterializedView.eq(false)
1128 };
1129 let schema_filter = if let Some(schema_id) = &object.schema_id {
1130 user_default_privilege::Column::SchemaId.eq(*schema_id)
1131 } else {
1132 user_default_privilege::Column::SchemaId.is_null()
1133 };
1134
1135 let default_privileges: Vec<(UserId, UserId, Action, bool)> = UserDefaultPrivilege::find()
1136 .select_only()
1137 .columns([
1138 user_default_privilege::Column::Grantee,
1139 user_default_privilege::Column::GrantedBy,
1140 user_default_privilege::Column::Action,
1141 user_default_privilege::Column::WithGrantOption,
1142 ])
1143 .filter(
1144 user_default_privilege::Column::DatabaseId
1145 .eq(object.database_id.unwrap())
1146 .and(schema_filter)
1147 .and(user_default_privilege::Column::UserId.eq(object.owner_id))
1148 .and(user_default_privilege::Column::ObjectType.eq(object.obj_type))
1149 .and(for_mview_filter),
1150 )
1151 .into_tuple()
1152 .all(db)
1153 .await?;
1154 if default_privileges.is_empty() {
1155 return Ok(vec![]);
1156 }
1157
1158 let updated_user_ids = default_privileges
1159 .iter()
1160 .map(|(grantee, _, _, _)| *grantee)
1161 .collect::<HashSet<_>>();
1162
1163 let internal_table_ids = get_internal_tables_by_id(object_id.as_job_id(), db).await?;
1164
1165 for (grantee, granted_by, action, with_grant_option) in default_privileges {
1166 UserPrivilege::insert(user_privilege::ActiveModel {
1167 user_id: Set(grantee),
1168 oid: Set(object_id),
1169 granted_by: Set(granted_by),
1170 action: Set(action),
1171 with_grant_option: Set(with_grant_option),
1172 ..Default::default()
1173 })
1174 .exec(db)
1175 .await?;
1176 if action == Action::Select && !internal_table_ids.is_empty() {
1177 for internal_table_id in &internal_table_ids {
1179 UserPrivilege::insert(user_privilege::ActiveModel {
1180 user_id: Set(grantee),
1181 oid: Set(internal_table_id.as_object_id()),
1182 granted_by: Set(granted_by),
1183 action: Set(Action::Select),
1184 with_grant_option: Set(with_grant_option),
1185 ..Default::default()
1186 })
1187 .exec(db)
1188 .await?;
1189 }
1190 }
1191 }
1192
1193 let updated_user_infos = list_user_info_by_ids(updated_user_ids, db).await?;
1194 Ok(updated_user_infos)
1195}
1196
1197pub fn extract_grant_obj_id(object: &PbGrantObject) -> ObjectId {
1199 match object {
1200 PbGrantObject::DatabaseId(id)
1201 | PbGrantObject::SchemaId(id)
1202 | PbGrantObject::TableId(id)
1203 | PbGrantObject::SourceId(id)
1204 | PbGrantObject::SinkId(id)
1205 | PbGrantObject::ViewId(id)
1206 | PbGrantObject::FunctionId(id)
1207 | PbGrantObject::SubscriptionId(id)
1208 | PbGrantObject::ConnectionId(id)
1209 | PbGrantObject::SecretId(id) => (*id).into(),
1210 }
1211}
1212
1213pub async fn insert_fragment_relations(
1214 db: &impl ConnectionTrait,
1215 downstream_fragment_relations: &FragmentDownstreamRelation,
1216) -> MetaResult<()> {
1217 for (upstream_fragment_id, downstreams) in downstream_fragment_relations {
1218 for downstream in downstreams {
1219 let relation = fragment_relation::Model {
1220 source_fragment_id: *upstream_fragment_id as _,
1221 target_fragment_id: downstream.downstream_fragment_id as _,
1222 dispatcher_type: downstream.dispatcher_type,
1223 dist_key_indices: downstream
1224 .dist_key_indices
1225 .iter()
1226 .map(|idx| *idx as i32)
1227 .collect_vec()
1228 .into(),
1229 output_indices: downstream
1230 .output_mapping
1231 .indices
1232 .iter()
1233 .map(|idx| *idx as i32)
1234 .collect_vec()
1235 .into(),
1236 output_type_mapping: Some(downstream.output_mapping.types.clone().into()),
1237 };
1238 FragmentRelation::insert(relation.into_active_model())
1239 .exec(db)
1240 .await?;
1241 }
1242 }
1243 Ok(())
1244}
1245
1246pub fn compose_dispatchers(
1247 source_fragment_distribution: DistributionType,
1248 source_fragment_actors: &HashMap<crate::model::ActorId, Option<Bitmap>>,
1249 target_fragment_id: crate::model::FragmentId,
1250 target_fragment_distribution: DistributionType,
1251 target_fragment_actors: &HashMap<crate::model::ActorId, Option<Bitmap>>,
1252 dispatcher_type: DispatcherType,
1253 dist_key_indices: Vec<u32>,
1254 output_mapping: PbDispatchOutputMapping,
1255) -> HashMap<crate::model::ActorId, PbDispatcher> {
1256 match dispatcher_type {
1257 DispatcherType::Hash => {
1258 let dispatcher = PbDispatcher {
1259 r#type: PbDispatcherType::from(dispatcher_type) as _,
1260 dist_key_indices,
1261 output_mapping: output_mapping.into(),
1262 hash_mapping: Some(
1263 ActorMapping::from_bitmaps(
1264 &target_fragment_actors
1265 .iter()
1266 .map(|(actor_id, bitmap)| {
1267 (
1268 *actor_id as _,
1269 bitmap
1270 .clone()
1271 .expect("downstream hash dispatch must have distribution"),
1272 )
1273 })
1274 .collect(),
1275 )
1276 .to_protobuf(),
1277 ),
1278 dispatcher_id: target_fragment_id.as_raw_id() as _,
1279 downstream_actor_id: target_fragment_actors.keys().copied().collect(),
1280 };
1281 source_fragment_actors
1282 .keys()
1283 .map(|source_actor_id| (*source_actor_id, dispatcher.clone()))
1284 .collect()
1285 }
1286 DispatcherType::Broadcast | DispatcherType::Simple => {
1287 let dispatcher = PbDispatcher {
1288 r#type: PbDispatcherType::from(dispatcher_type) as _,
1289 dist_key_indices,
1290 output_mapping: output_mapping.into(),
1291 hash_mapping: None,
1292 dispatcher_id: target_fragment_id.as_raw_id() as _,
1293 downstream_actor_id: target_fragment_actors.keys().copied().collect(),
1294 };
1295 source_fragment_actors
1296 .keys()
1297 .map(|source_actor_id| (*source_actor_id, dispatcher.clone()))
1298 .collect()
1299 }
1300 DispatcherType::NoShuffle => resolve_no_shuffle_actor_dispatcher(
1301 source_fragment_distribution,
1302 source_fragment_actors,
1303 target_fragment_distribution,
1304 target_fragment_actors,
1305 )
1306 .into_iter()
1307 .map(|(upstream_actor_id, downstream_actor_id)| {
1308 (
1309 upstream_actor_id,
1310 PbDispatcher {
1311 r#type: PbDispatcherType::NoShuffle as _,
1312 dist_key_indices: dist_key_indices.clone(),
1313 output_mapping: output_mapping.clone().into(),
1314 hash_mapping: None,
1315 dispatcher_id: target_fragment_id.as_raw_id() as _,
1316 downstream_actor_id: vec![downstream_actor_id],
1317 },
1318 )
1319 })
1320 .collect(),
1321 }
1322}
1323
1324pub fn resolve_no_shuffle_actor_dispatcher(
1326 source_fragment_distribution: DistributionType,
1327 source_fragment_actors: &HashMap<crate::model::ActorId, Option<Bitmap>>,
1328 target_fragment_distribution: DistributionType,
1329 target_fragment_actors: &HashMap<crate::model::ActorId, Option<Bitmap>>,
1330) -> Vec<(crate::model::ActorId, crate::model::ActorId)> {
1331 assert_eq!(source_fragment_distribution, target_fragment_distribution);
1332 assert_eq!(
1333 source_fragment_actors.len(),
1334 target_fragment_actors.len(),
1335 "no-shuffle should have equal upstream downstream actor count: {:?} {:?}",
1336 source_fragment_actors,
1337 target_fragment_actors
1338 );
1339 match source_fragment_distribution {
1340 DistributionType::Single => {
1341 let assert_singleton = |bitmap: &Option<Bitmap>| {
1342 assert!(
1343 bitmap.as_ref().map(|bitmap| bitmap.all()).unwrap_or(true),
1344 "not singleton: {:?}",
1345 bitmap
1346 );
1347 };
1348 assert_eq!(
1349 source_fragment_actors.len(),
1350 1,
1351 "singleton distribution actor count not 1: {:?}",
1352 source_fragment_distribution
1353 );
1354 assert_eq!(
1355 target_fragment_actors.len(),
1356 1,
1357 "singleton distribution actor count not 1: {:?}",
1358 target_fragment_distribution
1359 );
1360 let (source_actor_id, bitmap) = source_fragment_actors.iter().next().unwrap();
1361 assert_singleton(bitmap);
1362 let (target_actor_id, bitmap) = target_fragment_actors.iter().next().unwrap();
1363 assert_singleton(bitmap);
1364 vec![(*source_actor_id, *target_actor_id)]
1365 }
1366 DistributionType::Hash => {
1367 let mut target_fragment_actor_index: HashMap<_, _> = target_fragment_actors
1368 .iter()
1369 .map(|(actor_id, bitmap)| {
1370 let bitmap = bitmap
1371 .as_ref()
1372 .expect("hash distribution should have bitmap");
1373 let first_vnode = bitmap.iter_vnodes().next().expect("non-empty bitmap");
1374 (first_vnode, (*actor_id, bitmap))
1375 })
1376 .collect();
1377 source_fragment_actors
1378 .iter()
1379 .map(|(source_actor_id, bitmap)| {
1380 let bitmap = bitmap
1381 .as_ref()
1382 .expect("hash distribution should have bitmap");
1383 let first_vnode = bitmap.iter_vnodes().next().expect("non-empty bitmap");
1384 let (target_actor_id, target_bitmap) =
1385 target_fragment_actor_index.remove(&first_vnode).unwrap_or_else(|| {
1386 panic!(
1387 "cannot find matched target actor: {} {:?} {:?} {:?}",
1388 source_actor_id,
1389 first_vnode,
1390 source_fragment_actors,
1391 target_fragment_actors
1392 );
1393 });
1394 assert_eq!(
1395 bitmap,
1396 target_bitmap,
1397 "cannot find matched target actor due to bitmap mismatch: {} {:?} {:?} {:?}",
1398 source_actor_id,
1399 first_vnode,
1400 source_fragment_actors,
1401 target_fragment_actors
1402 );
1403 (*source_actor_id, target_actor_id)
1404 }).collect()
1405 }
1406 }
1407}
1408
1409pub fn rebuild_fragment_mapping(fragment: &SharedFragmentInfo) -> PbFragmentWorkerSlotMapping {
1410 let fragment_worker_slot_mapping = match fragment.distribution_type {
1411 DistributionType::Single => {
1412 let actor = fragment.actors.values().exactly_one().unwrap();
1413 WorkerSlotMapping::new_single(WorkerSlotId::new(actor.worker_id as _, 0))
1414 }
1415 DistributionType::Hash => {
1416 let actor_bitmaps: HashMap<_, _> = fragment
1417 .actors
1418 .iter()
1419 .map(|(actor_id, actor_info)| {
1420 let vnode_bitmap = actor_info
1421 .vnode_bitmap
1422 .as_ref()
1423 .cloned()
1424 .expect("actor bitmap shouldn't be none in hash fragment");
1425
1426 (*actor_id as hash::ActorId, vnode_bitmap)
1427 })
1428 .collect();
1429
1430 let actor_mapping = ActorMapping::from_bitmaps(&actor_bitmaps);
1431
1432 let actor_locations = fragment
1433 .actors
1434 .iter()
1435 .map(|(actor_id, actor_info)| (*actor_id as hash::ActorId, actor_info.worker_id))
1436 .collect();
1437
1438 actor_mapping.to_worker_slot(&actor_locations)
1439 }
1440 };
1441
1442 PbFragmentWorkerSlotMapping {
1443 fragment_id: fragment.fragment_id,
1444 mapping: Some(fragment_worker_slot_mapping.to_protobuf()),
1445 }
1446}
1447
1448pub async fn get_fragments_for_jobs<C>(
1454 db: &C,
1455 actor_info: &SharedActorInfos,
1456 streaming_jobs: Vec<JobId>,
1457) -> MetaResult<(
1458 HashMap<SourceId, BTreeSet<FragmentId>>,
1459 HashSet<FragmentId>,
1460 HashSet<ActorId>,
1461 HashSet<FragmentId>,
1462)>
1463where
1464 C: ConnectionTrait,
1465{
1466 if streaming_jobs.is_empty() {
1467 return Ok((
1468 HashMap::default(),
1469 HashSet::default(),
1470 HashSet::default(),
1471 HashSet::default(),
1472 ));
1473 }
1474
1475 let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find()
1476 .select_only()
1477 .columns([
1478 fragment::Column::FragmentId,
1479 fragment::Column::FragmentTypeMask,
1480 fragment::Column::StreamNode,
1481 ])
1482 .filter(fragment::Column::JobId.is_in(streaming_jobs))
1483 .into_tuple()
1484 .all(db)
1485 .await?;
1486
1487 let fragment_ids: HashSet<_> = fragments
1488 .iter()
1489 .map(|(fragment_id, _, _)| *fragment_id)
1490 .collect();
1491
1492 let actors = {
1493 let guard = actor_info.read_guard();
1494 fragment_ids
1495 .iter()
1496 .flat_map(|id| guard.get_fragment(*id as _))
1497 .flat_map(|f| f.actors.keys().cloned().map(|id| id as _))
1498 .collect::<HashSet<_>>()
1499 };
1500
1501 let mut source_fragment_ids: HashMap<SourceId, BTreeSet<FragmentId>> = HashMap::new();
1502 let mut sink_fragment_ids: HashSet<FragmentId> = HashSet::new();
1503 for (fragment_id, mask, stream_node) in fragments {
1504 if FragmentTypeMask::from(mask).contains(FragmentTypeFlag::Source)
1505 && let Some(source_id) = stream_node.to_protobuf().find_stream_source()
1506 {
1507 source_fragment_ids
1508 .entry(source_id)
1509 .or_default()
1510 .insert(fragment_id);
1511 }
1512 if FragmentTypeMask::from(mask).contains(FragmentTypeFlag::Sink) {
1513 sink_fragment_ids.insert(fragment_id);
1514 }
1515 }
1516
1517 Ok((
1518 source_fragment_ids,
1519 sink_fragment_ids,
1520 actors.into_iter().collect(),
1521 fragment_ids,
1522 ))
1523}
1524
1525pub(crate) fn build_object_group_for_delete(
1530 partial_objects: Vec<PartialObject>,
1531) -> NotificationInfo {
1532 let mut objects = vec![];
1533 for obj in partial_objects {
1534 match obj.obj_type {
1535 ObjectType::Database => objects.push(PbObject {
1536 object_info: Some(PbObjectInfo::Database(PbDatabase {
1537 id: obj.oid.as_database_id(),
1538 ..Default::default()
1539 })),
1540 }),
1541 ObjectType::Schema => objects.push(PbObject {
1542 object_info: Some(PbObjectInfo::Schema(PbSchema {
1543 id: obj.oid.as_schema_id(),
1544 database_id: obj.database_id.unwrap(),
1545 ..Default::default()
1546 })),
1547 }),
1548 ObjectType::Table => objects.push(PbObject {
1549 object_info: Some(PbObjectInfo::Table(PbTable {
1550 id: obj.oid.as_table_id(),
1551 schema_id: obj.schema_id.unwrap(),
1552 database_id: obj.database_id.unwrap(),
1553 ..Default::default()
1554 })),
1555 }),
1556 ObjectType::Source => objects.push(PbObject {
1557 object_info: Some(PbObjectInfo::Source(PbSource {
1558 id: obj.oid.as_source_id(),
1559 schema_id: obj.schema_id.unwrap(),
1560 database_id: obj.database_id.unwrap(),
1561 ..Default::default()
1562 })),
1563 }),
1564 ObjectType::Sink => objects.push(PbObject {
1565 object_info: Some(PbObjectInfo::Sink(PbSink {
1566 id: obj.oid.as_sink_id(),
1567 schema_id: obj.schema_id.unwrap(),
1568 database_id: obj.database_id.unwrap(),
1569 ..Default::default()
1570 })),
1571 }),
1572 ObjectType::Subscription => objects.push(PbObject {
1573 object_info: Some(PbObjectInfo::Subscription(PbSubscription {
1574 id: obj.oid.as_subscription_id(),
1575 schema_id: obj.schema_id.unwrap(),
1576 database_id: obj.database_id.unwrap(),
1577 ..Default::default()
1578 })),
1579 }),
1580 ObjectType::View => objects.push(PbObject {
1581 object_info: Some(PbObjectInfo::View(PbView {
1582 id: obj.oid.as_view_id(),
1583 schema_id: obj.schema_id.unwrap(),
1584 database_id: obj.database_id.unwrap(),
1585 ..Default::default()
1586 })),
1587 }),
1588 ObjectType::Index => {
1589 objects.push(PbObject {
1590 object_info: Some(PbObjectInfo::Index(PbIndex {
1591 id: obj.oid.as_index_id(),
1592 schema_id: obj.schema_id.unwrap(),
1593 database_id: obj.database_id.unwrap(),
1594 ..Default::default()
1595 })),
1596 });
1597 objects.push(PbObject {
1598 object_info: Some(PbObjectInfo::Table(PbTable {
1599 id: obj.oid.as_table_id(),
1600 schema_id: obj.schema_id.unwrap(),
1601 database_id: obj.database_id.unwrap(),
1602 ..Default::default()
1603 })),
1604 });
1605 }
1606 ObjectType::Function => objects.push(PbObject {
1607 object_info: Some(PbObjectInfo::Function(PbFunction {
1608 id: obj.oid.as_function_id(),
1609 schema_id: obj.schema_id.unwrap(),
1610 database_id: obj.database_id.unwrap(),
1611 ..Default::default()
1612 })),
1613 }),
1614 ObjectType::Connection => objects.push(PbObject {
1615 object_info: Some(PbObjectInfo::Connection(PbConnection {
1616 id: obj.oid.as_connection_id(),
1617 schema_id: obj.schema_id.unwrap(),
1618 database_id: obj.database_id.unwrap(),
1619 ..Default::default()
1620 })),
1621 }),
1622 ObjectType::Secret => objects.push(PbObject {
1623 object_info: Some(PbObjectInfo::Secret(PbSecret {
1624 id: obj.oid.as_secret_id(),
1625 schema_id: obj.schema_id.unwrap(),
1626 database_id: obj.database_id.unwrap(),
1627 ..Default::default()
1628 })),
1629 }),
1630 }
1631 }
1632 NotificationInfo::ObjectGroup(PbObjectGroup { objects })
1633}
1634
1635pub fn extract_external_table_name_from_definition(table_definition: &str) -> Option<String> {
1636 let [mut definition]: [_; 1] = Parser::parse_sql(table_definition)
1637 .context("unable to parse table definition")
1638 .inspect_err(|e| {
1639 tracing::error!(
1640 target: "auto_schema_change",
1641 error = %e.as_report(),
1642 "failed to parse table definition")
1643 })
1644 .unwrap()
1645 .try_into()
1646 .unwrap();
1647 if let SqlStatement::CreateTable { cdc_table_info, .. } = &mut definition {
1648 cdc_table_info
1649 .clone()
1650 .map(|cdc_table_info| cdc_table_info.external_table_name)
1651 } else {
1652 None
1653 }
1654}
1655
1656pub async fn rename_relation(
1659 txn: &DatabaseTransaction,
1660 object_type: ObjectType,
1661 object_id: ObjectId,
1662 object_name: &str,
1663) -> MetaResult<(Vec<PbObject>, String)> {
1664 use sea_orm::ActiveModelTrait;
1665
1666 use crate::controller::rename::alter_relation_rename;
1667
1668 let mut to_update_relations = vec![];
1669 macro_rules! rename_relation {
1671 ($entity:ident, $table:ident, $identity:ident, $object_id:expr) => {{
1672 let (mut relation, obj) = $entity::find_by_id($object_id)
1673 .find_also_related(Object)
1674 .one(txn)
1675 .await?
1676 .unwrap();
1677 let obj = obj.unwrap();
1678 let old_name = relation.name.clone();
1679 relation.name = object_name.into();
1680 if obj.obj_type != ObjectType::View {
1681 relation.definition = alter_relation_rename(&relation.definition, object_name);
1682 }
1683 let active_model = $table::ActiveModel {
1684 $identity: Set(relation.$identity),
1685 name: Set(object_name.into()),
1686 definition: Set(relation.definition.clone()),
1687 ..Default::default()
1688 };
1689 active_model.update(txn).await?;
1690 to_update_relations.push(PbObject {
1691 object_info: Some(PbObjectInfo::$entity(ObjectModel(relation, obj).into())),
1692 });
1693 old_name
1694 }};
1695 }
1696 let old_name = match object_type {
1698 ObjectType::Table => {
1699 let associated_source_id: Option<SourceId> = Source::find()
1700 .select_only()
1701 .column(source::Column::SourceId)
1702 .filter(source::Column::OptionalAssociatedTableId.eq(object_id))
1703 .into_tuple()
1704 .one(txn)
1705 .await?;
1706 if let Some(source_id) = associated_source_id {
1707 rename_relation!(Source, source, source_id, source_id);
1708 }
1709 rename_relation!(Table, table, table_id, object_id.as_table_id())
1710 }
1711 ObjectType::Source => {
1712 rename_relation!(Source, source, source_id, object_id.as_source_id())
1713 }
1714 ObjectType::Sink => rename_relation!(Sink, sink, sink_id, object_id.as_sink_id()),
1715 ObjectType::Subscription => {
1716 rename_relation!(
1717 Subscription,
1718 subscription,
1719 subscription_id,
1720 object_id.as_subscription_id()
1721 )
1722 }
1723 ObjectType::View => rename_relation!(View, view, view_id, object_id.as_view_id()),
1724 ObjectType::Index => {
1725 let (mut index, obj) = Index::find_by_id(object_id.as_index_id())
1726 .find_also_related(Object)
1727 .one(txn)
1728 .await?
1729 .unwrap();
1730 index.name = object_name.into();
1731 let index_table_id = index.index_table_id;
1732 let old_name = rename_relation!(Table, table, table_id, index_table_id);
1733
1734 let active_model = index::ActiveModel {
1736 index_id: sea_orm::ActiveValue::Set(index.index_id),
1737 name: sea_orm::ActiveValue::Set(object_name.into()),
1738 ..Default::default()
1739 };
1740 active_model.update(txn).await?;
1741 to_update_relations.push(PbObject {
1742 object_info: Some(PbObjectInfo::Index(ObjectModel(index, obj.unwrap()).into())),
1743 });
1744 old_name
1745 }
1746 _ => unreachable!("only relation name can be altered."),
1747 };
1748
1749 Ok((to_update_relations, old_name))
1750}
1751
1752pub async fn get_database_resource_group<C>(txn: &C, database_id: DatabaseId) -> MetaResult<String>
1753where
1754 C: ConnectionTrait,
1755{
1756 let database_resource_group: Option<String> = Database::find_by_id(database_id)
1757 .select_only()
1758 .column(database::Column::ResourceGroup)
1759 .into_tuple()
1760 .one(txn)
1761 .await?
1762 .ok_or_else(|| MetaError::catalog_id_not_found("database", database_id))?;
1763
1764 Ok(database_resource_group.unwrap_or_else(|| DEFAULT_RESOURCE_GROUP.to_owned()))
1765}
1766
1767pub async fn get_existing_job_resource_group<C>(
1768 txn: &C,
1769 streaming_job_id: JobId,
1770) -> MetaResult<String>
1771where
1772 C: ConnectionTrait,
1773{
1774 let (job_specific_resource_group, database_resource_group): (Option<String>, Option<String>) =
1775 StreamingJob::find_by_id(streaming_job_id)
1776 .select_only()
1777 .join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
1778 .join(JoinType::InnerJoin, object::Relation::Database2.def())
1779 .column(streaming_job::Column::SpecificResourceGroup)
1780 .column(database::Column::ResourceGroup)
1781 .into_tuple()
1782 .one(txn)
1783 .await?
1784 .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", streaming_job_id))?;
1785
1786 Ok(job_specific_resource_group.unwrap_or_else(|| {
1787 database_resource_group.unwrap_or_else(|| DEFAULT_RESOURCE_GROUP.to_owned())
1788 }))
1789}
1790
1791pub fn filter_workers_by_resource_group(
1792 workers: &HashMap<WorkerId, WorkerNode>,
1793 resource_group: &str,
1794) -> BTreeSet<WorkerId> {
1795 workers
1796 .iter()
1797 .filter(|&(_, worker)| {
1798 worker
1799 .resource_group()
1800 .map(|node_label| node_label.as_str() == resource_group)
1801 .unwrap_or(false)
1802 })
1803 .map(|(id, _)| *id)
1804 .collect()
1805}
1806
1807pub async fn rename_relation_refer(
1810 txn: &DatabaseTransaction,
1811 object_type: ObjectType,
1812 object_id: ObjectId,
1813 object_name: &str,
1814 old_name: &str,
1815) -> MetaResult<Vec<PbObject>> {
1816 use sea_orm::ActiveModelTrait;
1817
1818 use crate::controller::rename::alter_relation_rename_refs;
1819
1820 let mut to_update_relations = vec![];
1821 macro_rules! rename_relation_ref {
1822 ($entity:ident, $table:ident, $identity:ident, $object_id:expr) => {{
1823 let (mut relation, obj) = $entity::find_by_id($object_id)
1824 .find_also_related(Object)
1825 .one(txn)
1826 .await?
1827 .unwrap();
1828 relation.definition =
1829 alter_relation_rename_refs(&relation.definition, old_name, object_name);
1830 let active_model = $table::ActiveModel {
1831 $identity: Set(relation.$identity),
1832 definition: Set(relation.definition.clone()),
1833 ..Default::default()
1834 };
1835 active_model.update(txn).await?;
1836 to_update_relations.push(PbObject {
1837 object_info: Some(PbObjectInfo::$entity(
1838 ObjectModel(relation, obj.unwrap()).into(),
1839 )),
1840 });
1841 }};
1842 }
1843 let mut objs = get_referring_objects(object_id, txn).await?;
1844 if object_type == ObjectType::Table {
1845 let incoming_sinks: Vec<SinkId> = Sink::find()
1846 .select_only()
1847 .column(sink::Column::SinkId)
1848 .filter(sink::Column::TargetTable.eq(object_id))
1849 .into_tuple()
1850 .all(txn)
1851 .await?;
1852
1853 objs.extend(incoming_sinks.into_iter().map(|id| PartialObject {
1854 oid: id.as_object_id(),
1855 obj_type: ObjectType::Sink,
1856 schema_id: None,
1857 database_id: None,
1858 }));
1859 }
1860
1861 for obj in objs {
1862 match obj.obj_type {
1863 ObjectType::Table => {
1864 rename_relation_ref!(Table, table, table_id, obj.oid.as_table_id())
1865 }
1866 ObjectType::Sink => {
1867 rename_relation_ref!(Sink, sink, sink_id, obj.oid.as_sink_id())
1868 }
1869 ObjectType::Subscription => {
1870 rename_relation_ref!(
1871 Subscription,
1872 subscription,
1873 subscription_id,
1874 obj.oid.as_subscription_id()
1875 )
1876 }
1877 ObjectType::View => rename_relation_ref!(View, view, view_id, obj.oid.as_view_id()),
1878 ObjectType::Index => {
1879 let index_table_id: Option<TableId> = Index::find_by_id(obj.oid.as_index_id())
1880 .select_only()
1881 .column(index::Column::IndexTableId)
1882 .into_tuple()
1883 .one(txn)
1884 .await?;
1885 rename_relation_ref!(Table, table, table_id, index_table_id.unwrap());
1886 }
1887 _ => {
1888 bail!(
1889 "only the table, sink, subscription, view and index will depend on other objects."
1890 )
1891 }
1892 }
1893 }
1894
1895 Ok(to_update_relations)
1896}
1897
1898pub async fn validate_subscription_deletion<C>(
1902 txn: &C,
1903 subscription_id: SubscriptionId,
1904) -> MetaResult<()>
1905where
1906 C: ConnectionTrait,
1907{
1908 let upstream_table_id: ObjectId = Subscription::find_by_id(subscription_id)
1909 .select_only()
1910 .column(subscription::Column::DependentTableId)
1911 .into_tuple()
1912 .one(txn)
1913 .await?
1914 .ok_or_else(|| MetaError::catalog_id_not_found("subscription", subscription_id))?;
1915
1916 let cnt = Subscription::find()
1917 .filter(subscription::Column::DependentTableId.eq(upstream_table_id))
1918 .count(txn)
1919 .await?;
1920 if cnt > 1 {
1921 return Ok(());
1924 }
1925
1926 let obj_alias = Alias::new("o1");
1928 let used_by_alias = Alias::new("o2");
1929 let count = ObjectDependency::find()
1930 .join_as(
1931 JoinType::InnerJoin,
1932 object_dependency::Relation::Object2.def(),
1933 obj_alias.clone(),
1934 )
1935 .join_as(
1936 JoinType::InnerJoin,
1937 object_dependency::Relation::Object1.def(),
1938 used_by_alias.clone(),
1939 )
1940 .filter(
1941 object_dependency::Column::Oid
1942 .eq(upstream_table_id)
1943 .and(object_dependency::Column::UsedBy.ne(subscription_id))
1944 .and(
1945 Expr::col((obj_alias, object::Column::DatabaseId))
1946 .ne(Expr::col((used_by_alias, object::Column::DatabaseId))),
1947 ),
1948 )
1949 .count(txn)
1950 .await?;
1951
1952 if count != 0 {
1953 return Err(MetaError::permission_denied(format!(
1954 "Referenced by {} cross-db objects.",
1955 count
1956 )));
1957 }
1958
1959 Ok(())
1960}
1961
1962pub async fn fetch_target_fragments<C>(
1963 txn: &C,
1964 src_fragment_id: impl IntoIterator<Item = FragmentId>,
1965) -> MetaResult<HashMap<FragmentId, Vec<FragmentId>>>
1966where
1967 C: ConnectionTrait,
1968{
1969 let source_target_fragments: Vec<(FragmentId, FragmentId)> = FragmentRelation::find()
1970 .select_only()
1971 .columns([
1972 fragment_relation::Column::SourceFragmentId,
1973 fragment_relation::Column::TargetFragmentId,
1974 ])
1975 .filter(fragment_relation::Column::SourceFragmentId.is_in(src_fragment_id))
1976 .into_tuple()
1977 .all(txn)
1978 .await?;
1979
1980 let source_target_fragments = source_target_fragments.into_iter().into_group_map();
1981
1982 Ok(source_target_fragments)
1983}
1984
1985pub async fn get_sink_fragment_by_ids<C>(
1986 txn: &C,
1987 sink_ids: Vec<SinkId>,
1988) -> MetaResult<HashMap<SinkId, FragmentId>>
1989where
1990 C: ConnectionTrait,
1991{
1992 let sink_num = sink_ids.len();
1993 let sink_fragment_ids: Vec<(SinkId, FragmentId)> = Fragment::find()
1994 .select_only()
1995 .columns([fragment::Column::JobId, fragment::Column::FragmentId])
1996 .filter(
1997 fragment::Column::JobId
1998 .is_in(sink_ids)
1999 .and(FragmentTypeMask::intersects(FragmentTypeFlag::Sink)),
2000 )
2001 .into_tuple()
2002 .all(txn)
2003 .await?;
2004
2005 if sink_fragment_ids.len() != sink_num {
2006 return Err(anyhow::anyhow!(
2007 "expected exactly one sink fragment for each sink, but got {} fragments for {} sinks",
2008 sink_fragment_ids.len(),
2009 sink_num
2010 )
2011 .into());
2012 }
2013
2014 Ok(sink_fragment_ids.into_iter().collect())
2015}
2016
2017pub async fn has_table_been_migrated<C>(txn: &C, table_id: TableId) -> MetaResult<bool>
2018where
2019 C: ConnectionTrait,
2020{
2021 let mview_fragment: Vec<i32> = Fragment::find()
2022 .select_only()
2023 .column(fragment::Column::FragmentTypeMask)
2024 .filter(
2025 fragment::Column::JobId
2026 .eq(table_id)
2027 .and(FragmentTypeMask::intersects(FragmentTypeFlag::Mview)),
2028 )
2029 .into_tuple()
2030 .all(txn)
2031 .await?;
2032
2033 let mview_fragment_len = mview_fragment.len();
2034 if mview_fragment_len != 1 {
2035 bail!(
2036 "expected exactly one mview fragment for table {}, found {}",
2037 table_id,
2038 mview_fragment_len
2039 );
2040 }
2041
2042 let mview_fragment = mview_fragment.into_iter().next().unwrap();
2043 let migrated =
2044 FragmentTypeMask::from(mview_fragment).contains(FragmentTypeFlag::UpstreamSinkUnion);
2045
2046 Ok(migrated)
2047}
2048
2049pub async fn try_get_iceberg_table_by_downstream_sink<C>(
2050 txn: &C,
2051 sink_id: SinkId,
2052) -> MetaResult<Option<TableId>>
2053where
2054 C: ConnectionTrait,
2055{
2056 let sink = Sink::find_by_id(sink_id).one(txn).await?;
2057 let Some(sink) = sink else {
2058 return Ok(None);
2059 };
2060
2061 if sink.name.starts_with(ICEBERG_SINK_PREFIX) {
2062 let object_ids: Vec<ObjectId> = ObjectDependency::find()
2063 .select_only()
2064 .column(object_dependency::Column::Oid)
2065 .filter(object_dependency::Column::UsedBy.eq(sink_id))
2066 .into_tuple()
2067 .all(txn)
2068 .await?;
2069 let mut iceberg_table_ids = vec![];
2070 for object_id in object_ids {
2071 let table_id = object_id.as_table_id();
2072 if let Some(table_engine) = Table::find_by_id(table_id)
2073 .select_only()
2074 .column(table::Column::Engine)
2075 .into_tuple::<table::Engine>()
2076 .one(txn)
2077 .await?
2078 && table_engine == table::Engine::Iceberg
2079 {
2080 iceberg_table_ids.push(table_id);
2081 }
2082 }
2083 if iceberg_table_ids.len() == 1 {
2084 return Ok(Some(iceberg_table_ids[0]));
2085 }
2086 }
2087 Ok(None)
2088}
2089
2090pub async fn check_if_belongs_to_iceberg_table<C>(txn: &C, job_id: JobId) -> MetaResult<bool>
2091where
2092 C: ConnectionTrait,
2093{
2094 if let Some(engine) = Table::find_by_id(job_id.as_mv_table_id())
2095 .select_only()
2096 .column(table::Column::Engine)
2097 .into_tuple::<table::Engine>()
2098 .one(txn)
2099 .await?
2100 && engine == table::Engine::Iceberg
2101 {
2102 return Ok(true);
2103 }
2104 if let Some(sink_name) = Sink::find_by_id(job_id.as_sink_id())
2105 .select_only()
2106 .column(sink::Column::Name)
2107 .into_tuple::<String>()
2108 .one(txn)
2109 .await?
2110 && sink_name.starts_with(ICEBERG_SINK_PREFIX)
2111 {
2112 return Ok(true);
2113 }
2114 Ok(false)
2115}
2116
2117pub async fn find_dirty_iceberg_table_jobs<C>(
2118 txn: &C,
2119 database_id: Option<DatabaseId>,
2120) -> MetaResult<Vec<PartialObject>>
2121where
2122 C: ConnectionTrait,
2123{
2124 let mut filter_condition = streaming_job::Column::JobStatus
2125 .ne(JobStatus::Created)
2126 .and(object::Column::ObjType.is_in([ObjectType::Table, ObjectType::Sink]))
2127 .and(streaming_job::Column::CreateType.eq(CreateType::Background));
2128 if let Some(database_id) = database_id {
2129 filter_condition = filter_condition.and(object::Column::DatabaseId.eq(database_id));
2130 }
2131 let creating_table_sink_jobs: Vec<PartialObject> = StreamingJob::find()
2132 .select_only()
2133 .columns([
2134 object::Column::Oid,
2135 object::Column::ObjType,
2136 object::Column::SchemaId,
2137 object::Column::DatabaseId,
2138 ])
2139 .join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
2140 .filter(filter_condition)
2141 .into_partial_model()
2142 .all(txn)
2143 .await?;
2144
2145 let mut dirty_iceberg_table_jobs = vec![];
2146 for job in creating_table_sink_jobs {
2147 if check_if_belongs_to_iceberg_table(txn, job.oid.as_job_id()).await? {
2148 tracing::info!("Found dirty iceberg job with id: {}", job.oid);
2149 dirty_iceberg_table_jobs.push(job);
2150 }
2151 }
2152
2153 Ok(dirty_iceberg_table_jobs)
2154}
2155
2156pub fn build_select_node_list(
2157 from: &[ColumnCatalog],
2158 to: &[ColumnCatalog],
2159) -> MetaResult<Vec<PbExprNode>> {
2160 let mut exprs = Vec::with_capacity(to.len());
2161 let idx_by_col_id = from
2162 .iter()
2163 .enumerate()
2164 .map(|(idx, col)| (col.column_desc.as_ref().unwrap().column_id, idx))
2165 .collect::<HashMap<_, _>>();
2166
2167 for to_col in to {
2168 let to_col = to_col.column_desc.as_ref().unwrap();
2169 let to_col_type = to_col.column_type.clone();
2170 if let Some(from_idx) = idx_by_col_id.get(&to_col.column_id) {
2171 let from_col_type = from[*from_idx]
2172 .column_desc
2173 .as_ref()
2174 .unwrap()
2175 .column_type
2176 .clone();
2177 if to_col_type != from_col_type {
2178 return Err(anyhow!(
2179 "Column type mismatch: {:?} != {:?}",
2180 from_col_type,
2181 to_col_type
2182 )
2183 .into());
2184 }
2185 exprs.push(PbExprNode {
2186 function_type: expr_node::Type::Unspecified.into(),
2187 return_type: to_col_type,
2188 rex_node: Some(expr_node::RexNode::InputRef(*from_idx as _)),
2189 });
2190 } else {
2191 let to_default_node =
2192 if let Some(GeneratedOrDefaultColumn::DefaultColumn(DefaultColumnDesc {
2193 expr,
2194 ..
2195 })) = &to_col.generated_or_default_column
2196 {
2197 expr.clone().unwrap()
2198 } else {
2199 let null = Datum::None.to_protobuf();
2200 PbExprNode {
2201 function_type: expr_node::Type::Unspecified.into(),
2202 return_type: to_col_type,
2203 rex_node: Some(expr_node::RexNode::Constant(null)),
2204 }
2205 };
2206 exprs.push(to_default_node);
2207 }
2208 }
2209
2210 Ok(exprs)
2211}
2212
2213#[derive(Clone, Debug, Default)]
2214pub struct StreamingJobExtraInfo {
2215 pub timezone: Option<String>,
2216 pub job_definition: String,
2217}
2218
2219pub async fn get_streaming_job_extra_info<C>(
2220 txn: &C,
2221 job_ids: Vec<JobId>,
2222) -> MetaResult<HashMap<JobId, StreamingJobExtraInfo>>
2223where
2224 C: ConnectionTrait,
2225{
2226 let timezone_pairs: Vec<(JobId, Option<String>)> = StreamingJob::find()
2227 .select_only()
2228 .columns([
2229 streaming_job::Column::JobId,
2230 streaming_job::Column::Timezone,
2231 ])
2232 .filter(streaming_job::Column::JobId.is_in(job_ids.clone()))
2233 .into_tuple()
2234 .all(txn)
2235 .await?;
2236
2237 let job_ids = job_ids.into_iter().collect();
2238
2239 let mut definitions = resolve_streaming_job_definition(txn, &job_ids).await?;
2240
2241 let result = timezone_pairs
2242 .into_iter()
2243 .map(|(job_id, timezone)| {
2244 let job_definition = definitions.remove(&job_id).unwrap_or_default();
2245 (
2246 job_id,
2247 StreamingJobExtraInfo {
2248 timezone,
2249 job_definition,
2250 },
2251 )
2252 })
2253 .collect();
2254
2255 Ok(result)
2256}
2257
2258#[cfg(test)]
2259mod tests {
2260 use super::*;
2261
2262 #[test]
2263 fn test_extract_cdc_table_name() {
2264 let ddl1 = "CREATE TABLE t1 () FROM pg_source TABLE 'public.t1'";
2265 let ddl2 = "CREATE TABLE t2 (v1 int) FROM pg_source TABLE 'mydb.t2'";
2266 assert_eq!(
2267 extract_external_table_name_from_definition(ddl1),
2268 Some("public.t1".into())
2269 );
2270 assert_eq!(
2271 extract_external_table_name_from_definition(ddl2),
2272 Some("mydb.t2".into())
2273 );
2274 }
2275}