1use std::collections::{BTreeSet, HashMap, HashSet};
16
17use anyhow::{Context, anyhow};
18use itertools::Itertools;
19use risingwave_common::bitmap::Bitmap;
20use risingwave_common::catalog::{FragmentTypeFlag, FragmentTypeMask};
21use risingwave_common::hash::{ActorMapping, VnodeBitmapExt, WorkerSlotId, WorkerSlotMapping};
22use risingwave_common::id::JobId;
23use risingwave_common::types::Datum;
24use risingwave_common::util::value_encoding::DatumToProtoExt;
25use risingwave_common::util::worker_util::DEFAULT_RESOURCE_GROUP;
26use risingwave_common::{bail, hash};
27use risingwave_meta_model::fragment::DistributionType;
28use risingwave_meta_model::object::ObjectType;
29use risingwave_meta_model::prelude::*;
30use risingwave_meta_model::table::TableType;
31use risingwave_meta_model::user_privilege::Action;
32use risingwave_meta_model::{
33 ActorId, ColumnCatalogArray, DataTypeArray, DatabaseId, DispatcherType, FragmentId, JobStatus,
34 ObjectId, PrivilegeId, SchemaId, SinkId, SourceId, StreamNode, StreamSourceInfo, TableId,
35 TableIdArray, UserId, WorkerId, connection, database, fragment, fragment_relation, function,
36 index, object, object_dependency, schema, secret, sink, source, streaming_job, subscription,
37 table, user, user_default_privilege, user_privilege, view,
38};
39use risingwave_meta_model_migration::WithQuery;
40use risingwave_pb::catalog::{
41 PbConnection, PbDatabase, PbFunction, PbIndex, PbSchema, PbSecret, PbSink, PbSource,
42 PbSubscription, PbTable, PbView,
43};
44use risingwave_pb::common::WorkerNode;
45use risingwave_pb::expr::{PbExprNode, expr_node};
46use risingwave_pb::meta::object::PbObjectInfo;
47use risingwave_pb::meta::subscribe_response::Info as NotificationInfo;
48use risingwave_pb::meta::{PbFragmentWorkerSlotMapping, PbObject, PbObjectGroup};
49use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn;
50use risingwave_pb::plan_common::{ColumnCatalog, DefaultColumnDesc};
51use risingwave_pb::stream_plan::{PbDispatchOutputMapping, PbDispatcher, PbDispatcherType};
52use risingwave_pb::user::grant_privilege::{PbActionWithGrantOption, PbObject as PbGrantObject};
53use risingwave_pb::user::{PbAction, PbGrantPrivilege, PbUserInfo};
54use risingwave_sqlparser::ast::Statement as SqlStatement;
55use risingwave_sqlparser::parser::Parser;
56use sea_orm::sea_query::{
57 Alias, CommonTableExpression, Expr, Query, QueryStatementBuilder, SelectStatement, UnionType,
58 WithClause,
59};
60use sea_orm::{
61 ColumnTrait, ConnectionTrait, DatabaseTransaction, DerivePartialModel, EntityTrait,
62 FromQueryResult, IntoActiveModel, JoinType, Order, PaginatorTrait, QueryFilter, QuerySelect,
63 RelationTrait, Set, Statement,
64};
65use thiserror_ext::AsReport;
66
67use crate::barrier::{SharedActorInfos, SharedFragmentInfo};
68use crate::controller::ObjectModel;
69use crate::controller::fragment::FragmentTypeMaskExt;
70use crate::controller::scale::resolve_streaming_job_definition;
71use crate::model::FragmentDownstreamRelation;
72use crate::{MetaError, MetaResult};
73
74pub fn construct_obj_dependency_query(obj_id: ObjectId) -> WithQuery {
99 let cte_alias = Alias::new("used_by_object_ids");
100 let cte_return_alias = Alias::new("used_by");
101
102 let mut base_query = SelectStatement::new()
103 .column(object_dependency::Column::UsedBy)
104 .from(ObjectDependency)
105 .and_where(object_dependency::Column::Oid.eq(obj_id))
106 .to_owned();
107
108 let belonged_obj_query = SelectStatement::new()
109 .column(object::Column::Oid)
110 .from(Object)
111 .and_where(
112 object::Column::DatabaseId
113 .eq(obj_id)
114 .or(object::Column::SchemaId.eq(obj_id)),
115 )
116 .to_owned();
117
118 let cte_referencing = Query::select()
119 .column((ObjectDependency, object_dependency::Column::UsedBy))
120 .from(ObjectDependency)
121 .inner_join(
122 cte_alias.clone(),
123 Expr::col((cte_alias.clone(), cte_return_alias.clone()))
124 .equals(object_dependency::Column::Oid),
125 )
126 .to_owned();
127
128 let mut common_table_expr = CommonTableExpression::new();
129 common_table_expr
130 .query(
131 base_query
132 .union(UnionType::All, belonged_obj_query)
133 .union(UnionType::All, cte_referencing)
134 .to_owned(),
135 )
136 .column(cte_return_alias.clone())
137 .table_name(cte_alias.clone());
138
139 SelectStatement::new()
140 .distinct()
141 .columns([
142 object::Column::Oid,
143 object::Column::ObjType,
144 object::Column::SchemaId,
145 object::Column::DatabaseId,
146 ])
147 .from(cte_alias.clone())
148 .inner_join(
149 Object,
150 Expr::col((cte_alias, cte_return_alias)).equals(object::Column::Oid),
151 )
152 .order_by(object::Column::Oid, Order::Desc)
153 .to_owned()
154 .with(
155 WithClause::new()
156 .recursive(true)
157 .cte(common_table_expr)
158 .to_owned(),
159 )
160}
161
162pub fn construct_sink_cycle_check_query(
187 target_table: ObjectId,
188 dependent_objects: Vec<ObjectId>,
189) -> WithQuery {
190 let cte_alias = Alias::new("used_by_object_ids_with_sink");
191 let depend_alias = Alias::new("obj_dependency_with_sink");
192
193 let mut base_query = SelectStatement::new()
194 .columns([
195 object_dependency::Column::Oid,
196 object_dependency::Column::UsedBy,
197 ])
198 .from(ObjectDependency)
199 .and_where(object_dependency::Column::Oid.eq(target_table))
200 .to_owned();
201
202 let query_sink_deps = SelectStatement::new()
203 .columns([sink::Column::SinkId, sink::Column::TargetTable])
204 .from(Sink)
205 .and_where(sink::Column::TargetTable.is_not_null())
206 .to_owned();
207
208 let cte_referencing = Query::select()
209 .column((depend_alias.clone(), object_dependency::Column::Oid))
210 .column((depend_alias.clone(), object_dependency::Column::UsedBy))
211 .from_subquery(
212 SelectStatement::new()
213 .columns([
214 object_dependency::Column::Oid,
215 object_dependency::Column::UsedBy,
216 ])
217 .from(ObjectDependency)
218 .union(UnionType::All, query_sink_deps)
219 .to_owned(),
220 depend_alias.clone(),
221 )
222 .inner_join(
223 cte_alias.clone(),
224 Expr::col((cte_alias.clone(), object_dependency::Column::UsedBy))
225 .eq(Expr::col((depend_alias, object_dependency::Column::Oid))),
226 )
227 .and_where(
228 Expr::col((cte_alias.clone(), object_dependency::Column::UsedBy)).ne(Expr::col((
229 cte_alias.clone(),
230 object_dependency::Column::Oid,
231 ))),
232 )
233 .to_owned();
234
235 let mut common_table_expr = CommonTableExpression::new();
236 common_table_expr
237 .query(base_query.union(UnionType::All, cte_referencing).to_owned())
238 .columns([
239 object_dependency::Column::Oid,
240 object_dependency::Column::UsedBy,
241 ])
242 .table_name(cte_alias.clone());
243
244 SelectStatement::new()
245 .expr(Expr::col((cte_alias.clone(), object_dependency::Column::UsedBy)).count())
246 .from(cte_alias.clone())
247 .and_where(
248 Expr::col((cte_alias, object_dependency::Column::UsedBy)).is_in(dependent_objects),
249 )
250 .to_owned()
251 .with(
252 WithClause::new()
253 .recursive(true)
254 .cte(common_table_expr)
255 .to_owned(),
256 )
257}
258
259#[derive(Clone, DerivePartialModel, FromQueryResult, Debug)]
260#[sea_orm(entity = "Object")]
261pub struct PartialObject {
262 pub oid: ObjectId,
263 pub obj_type: ObjectType,
264 pub schema_id: Option<SchemaId>,
265 pub database_id: Option<DatabaseId>,
266}
267
268#[derive(Clone, DerivePartialModel, FromQueryResult)]
269#[sea_orm(entity = "Fragment")]
270pub struct PartialFragmentStateTables {
271 pub fragment_id: FragmentId,
272 pub job_id: ObjectId,
273 pub state_table_ids: TableIdArray,
274}
275
276#[derive(Clone, Eq, PartialEq, Debug)]
277pub struct PartialActorLocation {
278 pub actor_id: ActorId,
279 pub fragment_id: FragmentId,
280 pub worker_id: WorkerId,
281}
282
283#[derive(FromQueryResult, Debug, Eq, PartialEq, Clone)]
284pub struct FragmentDesc {
285 pub fragment_id: FragmentId,
286 pub job_id: JobId,
287 pub fragment_type_mask: i32,
288 pub distribution_type: DistributionType,
289 pub state_table_ids: TableIdArray,
290 pub parallelism: i64,
291 pub vnode_count: i32,
292 pub stream_node: StreamNode,
293 pub parallelism_policy: String,
294}
295
296pub async fn get_referring_objects_cascade<C>(
298 obj_id: ObjectId,
299 db: &C,
300) -> MetaResult<Vec<PartialObject>>
301where
302 C: ConnectionTrait,
303{
304 let query = construct_obj_dependency_query(obj_id);
305 let (sql, values) = query.build_any(&*db.get_database_backend().get_query_builder());
306 let objects = PartialObject::find_by_statement(Statement::from_sql_and_values(
307 db.get_database_backend(),
308 sql,
309 values,
310 ))
311 .all(db)
312 .await?;
313 Ok(objects)
314}
315
316pub async fn check_sink_into_table_cycle<C>(
318 target_table: ObjectId,
319 dependent_objs: Vec<ObjectId>,
320 db: &C,
321) -> MetaResult<bool>
322where
323 C: ConnectionTrait,
324{
325 if dependent_objs.is_empty() {
326 return Ok(false);
327 }
328
329 if dependent_objs.contains(&target_table) {
331 return Ok(true);
332 }
333
334 let query = construct_sink_cycle_check_query(target_table, dependent_objs);
335 let (sql, values) = query.build_any(&*db.get_database_backend().get_query_builder());
336
337 let res = db
338 .query_one(Statement::from_sql_and_values(
339 db.get_database_backend(),
340 sql,
341 values,
342 ))
343 .await?
344 .unwrap();
345
346 let cnt: i64 = res.try_get_by(0)?;
347
348 Ok(cnt != 0)
349}
350
351pub async fn ensure_object_id<C>(
353 object_type: ObjectType,
354 obj_id: ObjectId,
355 db: &C,
356) -> MetaResult<()>
357where
358 C: ConnectionTrait,
359{
360 let count = Object::find_by_id(obj_id).count(db).await?;
361 if count == 0 {
362 return Err(MetaError::catalog_id_not_found(
363 object_type.as_str(),
364 obj_id,
365 ));
366 }
367 Ok(())
368}
369
370pub async fn ensure_job_not_canceled<C>(job_id: JobId, db: &C) -> MetaResult<()>
371where
372 C: ConnectionTrait,
373{
374 let count = Object::find_by_id(job_id.as_raw_id() as ObjectId)
375 .count(db)
376 .await?;
377 if count == 0 {
378 return Err(MetaError::cancelled(format!(
379 "job {} might be cancelled manually or by recovery",
380 job_id
381 )));
382 }
383 Ok(())
384}
385
386pub async fn ensure_user_id<C>(user_id: UserId, db: &C) -> MetaResult<()>
388where
389 C: ConnectionTrait,
390{
391 let count = User::find_by_id(user_id).count(db).await?;
392 if count == 0 {
393 return Err(anyhow!("user {} was concurrently dropped", user_id).into());
394 }
395 Ok(())
396}
397
398pub async fn check_database_name_duplicate<C>(name: &str, db: &C) -> MetaResult<()>
400where
401 C: ConnectionTrait,
402{
403 let count = Database::find()
404 .filter(database::Column::Name.eq(name))
405 .count(db)
406 .await?;
407 if count > 0 {
408 assert_eq!(count, 1);
409 return Err(MetaError::catalog_duplicated("database", name));
410 }
411 Ok(())
412}
413
414pub async fn check_function_signature_duplicate<C>(
416 pb_function: &PbFunction,
417 db: &C,
418) -> MetaResult<()>
419where
420 C: ConnectionTrait,
421{
422 let count = Function::find()
423 .inner_join(Object)
424 .filter(
425 object::Column::DatabaseId
426 .eq(pb_function.database_id)
427 .and(object::Column::SchemaId.eq(pb_function.schema_id))
428 .and(function::Column::Name.eq(&pb_function.name))
429 .and(
430 function::Column::ArgTypes
431 .eq(DataTypeArray::from(pb_function.arg_types.clone())),
432 ),
433 )
434 .count(db)
435 .await?;
436 if count > 0 {
437 assert_eq!(count, 1);
438 return Err(MetaError::catalog_duplicated("function", &pb_function.name));
439 }
440 Ok(())
441}
442
443pub async fn check_connection_name_duplicate<C>(
445 pb_connection: &PbConnection,
446 db: &C,
447) -> MetaResult<()>
448where
449 C: ConnectionTrait,
450{
451 let count = Connection::find()
452 .inner_join(Object)
453 .filter(
454 object::Column::DatabaseId
455 .eq(pb_connection.database_id)
456 .and(object::Column::SchemaId.eq(pb_connection.schema_id))
457 .and(connection::Column::Name.eq(&pb_connection.name)),
458 )
459 .count(db)
460 .await?;
461 if count > 0 {
462 assert_eq!(count, 1);
463 return Err(MetaError::catalog_duplicated(
464 "connection",
465 &pb_connection.name,
466 ));
467 }
468 Ok(())
469}
470
471pub async fn check_secret_name_duplicate<C>(pb_secret: &PbSecret, db: &C) -> MetaResult<()>
472where
473 C: ConnectionTrait,
474{
475 let count = Secret::find()
476 .inner_join(Object)
477 .filter(
478 object::Column::DatabaseId
479 .eq(pb_secret.database_id)
480 .and(object::Column::SchemaId.eq(pb_secret.schema_id))
481 .and(secret::Column::Name.eq(&pb_secret.name)),
482 )
483 .count(db)
484 .await?;
485 if count > 0 {
486 assert_eq!(count, 1);
487 return Err(MetaError::catalog_duplicated("secret", &pb_secret.name));
488 }
489 Ok(())
490}
491
492pub async fn check_subscription_name_duplicate<C>(
493 pb_subscription: &PbSubscription,
494 db: &C,
495) -> MetaResult<()>
496where
497 C: ConnectionTrait,
498{
499 let count = Subscription::find()
500 .inner_join(Object)
501 .filter(
502 object::Column::DatabaseId
503 .eq(pb_subscription.database_id)
504 .and(object::Column::SchemaId.eq(pb_subscription.schema_id))
505 .and(subscription::Column::Name.eq(&pb_subscription.name)),
506 )
507 .count(db)
508 .await?;
509 if count > 0 {
510 assert_eq!(count, 1);
511 return Err(MetaError::catalog_duplicated(
512 "subscription",
513 &pb_subscription.name,
514 ));
515 }
516 Ok(())
517}
518
519pub async fn check_user_name_duplicate<C>(name: &str, db: &C) -> MetaResult<()>
521where
522 C: ConnectionTrait,
523{
524 let count = User::find()
525 .filter(user::Column::Name.eq(name))
526 .count(db)
527 .await?;
528 if count > 0 {
529 assert_eq!(count, 1);
530 return Err(MetaError::catalog_duplicated("user", name));
531 }
532 Ok(())
533}
534
535pub async fn check_relation_name_duplicate<C>(
537 name: &str,
538 database_id: DatabaseId,
539 schema_id: SchemaId,
540 db: &C,
541) -> MetaResult<()>
542where
543 C: ConnectionTrait,
544{
545 macro_rules! check_duplicated {
546 ($obj_type:expr, $entity:ident, $table:ident) => {
547 let object_id = Object::find()
548 .select_only()
549 .column(object::Column::Oid)
550 .inner_join($entity)
551 .filter(
552 object::Column::DatabaseId
553 .eq(Some(database_id))
554 .and(object::Column::SchemaId.eq(Some(schema_id)))
555 .and($table::Column::Name.eq(name)),
556 )
557 .into_tuple::<ObjectId>()
558 .one(db)
559 .await?;
560 if let Some(oid) = object_id {
561 let check_creation = if $obj_type == ObjectType::View {
562 false
563 } else if $obj_type == ObjectType::Source {
564 let source_info = Source::find_by_id(oid)
565 .select_only()
566 .column(source::Column::SourceInfo)
567 .into_tuple::<Option<StreamSourceInfo>>()
568 .one(db)
569 .await?
570 .unwrap();
571 source_info.map_or(false, |info| info.to_protobuf().is_shared())
572 } else {
573 true
574 };
575 let job_id = JobId::new(oid as u32);
576 return if check_creation
577 && !matches!(
578 StreamingJob::find_by_id(job_id)
579 .select_only()
580 .column(streaming_job::Column::JobStatus)
581 .into_tuple::<JobStatus>()
582 .one(db)
583 .await?,
584 Some(JobStatus::Created)
585 ) {
586 Err(MetaError::catalog_under_creation(
587 $obj_type.as_str(),
588 name,
589 job_id,
590 ))
591 } else {
592 Err(MetaError::catalog_duplicated($obj_type.as_str(), name))
593 };
594 }
595 };
596 }
597 check_duplicated!(ObjectType::Table, Table, table);
598 check_duplicated!(ObjectType::Source, Source, source);
599 check_duplicated!(ObjectType::Sink, Sink, sink);
600 check_duplicated!(ObjectType::Index, Index, index);
601 check_duplicated!(ObjectType::View, View, view);
602
603 Ok(())
604}
605
606pub async fn check_schema_name_duplicate<C>(
608 name: &str,
609 database_id: DatabaseId,
610 db: &C,
611) -> MetaResult<()>
612where
613 C: ConnectionTrait,
614{
615 let count = Object::find()
616 .inner_join(Schema)
617 .filter(
618 object::Column::ObjType
619 .eq(ObjectType::Schema)
620 .and(object::Column::DatabaseId.eq(Some(database_id)))
621 .and(schema::Column::Name.eq(name)),
622 )
623 .count(db)
624 .await?;
625 if count != 0 {
626 return Err(MetaError::catalog_duplicated("schema", name));
627 }
628
629 Ok(())
630}
631
632pub async fn check_object_refer_for_drop<C>(
635 object_type: ObjectType,
636 object_id: ObjectId,
637 db: &C,
638) -> MetaResult<()>
639where
640 C: ConnectionTrait,
641{
642 let count = if object_type == ObjectType::Table {
644 ObjectDependency::find()
645 .join(
646 JoinType::InnerJoin,
647 object_dependency::Relation::Object1.def(),
648 )
649 .filter(
650 object_dependency::Column::Oid
651 .eq(object_id)
652 .and(object::Column::ObjType.ne(ObjectType::Index)),
653 )
654 .count(db)
655 .await?
656 } else {
657 ObjectDependency::find()
658 .filter(object_dependency::Column::Oid.eq(object_id))
659 .count(db)
660 .await?
661 };
662 if count != 0 {
663 let referring_objects = get_referring_objects(object_id, db).await?;
665 let referring_objs_map = referring_objects
666 .into_iter()
667 .filter(|o| o.obj_type != ObjectType::Index)
668 .into_group_map_by(|o| o.obj_type);
669 let mut details = vec![];
670 for (obj_type, objs) in referring_objs_map {
671 match obj_type {
672 ObjectType::Table => {
673 let tables: Vec<(String, String)> = Object::find()
674 .join(JoinType::InnerJoin, object::Relation::Table.def())
675 .join(JoinType::InnerJoin, object::Relation::Database2.def())
676 .join(JoinType::InnerJoin, object::Relation::Schema2.def())
677 .select_only()
678 .column(schema::Column::Name)
679 .column(table::Column::Name)
680 .filter(object::Column::Oid.is_in(objs.iter().map(|o| o.oid)))
681 .into_tuple()
682 .all(db)
683 .await?;
684 details.extend(tables.into_iter().map(|(schema_name, table_name)| {
685 format!(
686 "materialized view {}.{} depends on it",
687 schema_name, table_name
688 )
689 }));
690 }
691 ObjectType::Sink => {
692 let sinks: Vec<(String, String)> = Object::find()
693 .join(JoinType::InnerJoin, object::Relation::Sink.def())
694 .join(JoinType::InnerJoin, object::Relation::Database2.def())
695 .join(JoinType::InnerJoin, object::Relation::Schema2.def())
696 .select_only()
697 .column(schema::Column::Name)
698 .column(sink::Column::Name)
699 .filter(object::Column::Oid.is_in(objs.iter().map(|o| o.oid)))
700 .into_tuple()
701 .all(db)
702 .await?;
703 details.extend(sinks.into_iter().map(|(schema_name, sink_name)| {
704 format!("sink {}.{} depends on it", schema_name, sink_name)
705 }));
706 }
707 ObjectType::View => {
708 let views: Vec<(String, String)> = Object::find()
709 .join(JoinType::InnerJoin, object::Relation::View.def())
710 .join(JoinType::InnerJoin, object::Relation::Database2.def())
711 .join(JoinType::InnerJoin, object::Relation::Schema2.def())
712 .select_only()
713 .column(schema::Column::Name)
714 .column(view::Column::Name)
715 .filter(object::Column::Oid.is_in(objs.iter().map(|o| o.oid)))
716 .into_tuple()
717 .all(db)
718 .await?;
719 details.extend(views.into_iter().map(|(schema_name, view_name)| {
720 format!("view {}.{} depends on it", schema_name, view_name)
721 }));
722 }
723 ObjectType::Subscription => {
724 let subscriptions: Vec<(String, String)> = Object::find()
725 .join(JoinType::InnerJoin, object::Relation::Subscription.def())
726 .join(JoinType::InnerJoin, object::Relation::Database2.def())
727 .join(JoinType::InnerJoin, object::Relation::Schema2.def())
728 .select_only()
729 .column(schema::Column::Name)
730 .column(subscription::Column::Name)
731 .filter(object::Column::Oid.is_in(objs.iter().map(|o| o.oid)))
732 .into_tuple()
733 .all(db)
734 .await?;
735 details.extend(subscriptions.into_iter().map(
736 |(schema_name, subscription_name)| {
737 format!(
738 "subscription {}.{} depends on it",
739 schema_name, subscription_name
740 )
741 },
742 ));
743 }
744 ObjectType::Source => {
745 let sources: Vec<(String, String)> = Object::find()
746 .join(JoinType::InnerJoin, object::Relation::Source.def())
747 .join(JoinType::InnerJoin, object::Relation::Database2.def())
748 .join(JoinType::InnerJoin, object::Relation::Schema2.def())
749 .select_only()
750 .column(schema::Column::Name)
751 .column(source::Column::Name)
752 .filter(object::Column::Oid.is_in(objs.iter().map(|o| o.oid)))
753 .into_tuple()
754 .all(db)
755 .await?;
756 details.extend(sources.into_iter().map(|(schema_name, view_name)| {
757 format!("source {}.{} depends on it", schema_name, view_name)
758 }));
759 }
760 ObjectType::Connection => {
761 let connections: Vec<(String, String)> = Object::find()
762 .join(JoinType::InnerJoin, object::Relation::Connection.def())
763 .join(JoinType::InnerJoin, object::Relation::Database2.def())
764 .join(JoinType::InnerJoin, object::Relation::Schema2.def())
765 .select_only()
766 .column(schema::Column::Name)
767 .column(connection::Column::Name)
768 .filter(object::Column::Oid.is_in(objs.iter().map(|o| o.oid)))
769 .into_tuple()
770 .all(db)
771 .await?;
772 details.extend(connections.into_iter().map(|(schema_name, view_name)| {
773 format!("connection {}.{} depends on it", schema_name, view_name)
774 }));
775 }
776 _ => bail!("unexpected referring object type: {}", obj_type.as_str()),
778 }
779 }
780
781 return Err(MetaError::permission_denied(format!(
782 "{} used by {} other objects. \nDETAIL: {}\n\
783 {}",
784 object_type.as_str(),
785 count,
786 details.join("\n"),
787 match object_type {
788 ObjectType::Function | ObjectType::Connection | ObjectType::Secret =>
789 "HINT: DROP the dependent objects first.",
790 ObjectType::Database | ObjectType::Schema => unreachable!(),
791 _ => "HINT: Use DROP ... CASCADE to drop the dependent objects too.",
792 }
793 )));
794 }
795 Ok(())
796}
797
798pub async fn get_referring_objects<C>(object_id: ObjectId, db: &C) -> MetaResult<Vec<PartialObject>>
800where
801 C: ConnectionTrait,
802{
803 let objs = ObjectDependency::find()
804 .filter(object_dependency::Column::Oid.eq(object_id))
805 .join(
806 JoinType::InnerJoin,
807 object_dependency::Relation::Object1.def(),
808 )
809 .into_partial_model()
810 .all(db)
811 .await?;
812
813 Ok(objs)
814}
815
816pub async fn ensure_schema_empty<C>(schema_id: SchemaId, db: &C) -> MetaResult<()>
818where
819 C: ConnectionTrait,
820{
821 let count = Object::find()
822 .filter(object::Column::SchemaId.eq(Some(schema_id)))
823 .count(db)
824 .await?;
825 if count != 0 {
826 return Err(MetaError::permission_denied("schema is not empty"));
827 }
828
829 Ok(())
830}
831
832pub async fn list_user_info_by_ids<C>(
834 user_ids: impl IntoIterator<Item = UserId>,
835 db: &C,
836) -> MetaResult<Vec<PbUserInfo>>
837where
838 C: ConnectionTrait,
839{
840 let mut user_infos = vec![];
841 for user_id in user_ids {
842 let user = User::find_by_id(user_id)
843 .one(db)
844 .await?
845 .ok_or_else(|| MetaError::catalog_id_not_found("user", user_id))?;
846 let mut user_info: PbUserInfo = user.into();
847 user_info.grant_privileges = get_user_privilege(user_id, db).await?;
848 user_infos.push(user_info);
849 }
850 Ok(user_infos)
851}
852
853pub async fn get_object_owner<C>(object_id: ObjectId, db: &C) -> MetaResult<UserId>
855where
856 C: ConnectionTrait,
857{
858 let obj_owner: UserId = Object::find_by_id(object_id)
859 .select_only()
860 .column(object::Column::OwnerId)
861 .into_tuple()
862 .one(db)
863 .await?
864 .ok_or_else(|| MetaError::catalog_id_not_found("object", object_id))?;
865 Ok(obj_owner)
866}
867
868pub fn construct_privilege_dependency_query(ids: Vec<PrivilegeId>) -> WithQuery {
893 let cte_alias = Alias::new("granted_privilege_ids");
894 let cte_return_privilege_alias = Alias::new("id");
895 let cte_return_user_alias = Alias::new("user_id");
896
897 let mut base_query = SelectStatement::new()
898 .columns([user_privilege::Column::Id, user_privilege::Column::UserId])
899 .from(UserPrivilege)
900 .and_where(user_privilege::Column::Id.is_in(ids))
901 .to_owned();
902
903 let cte_referencing = Query::select()
904 .columns([
905 (UserPrivilege, user_privilege::Column::Id),
906 (UserPrivilege, user_privilege::Column::UserId),
907 ])
908 .from(UserPrivilege)
909 .inner_join(
910 cte_alias.clone(),
911 Expr::col((cte_alias.clone(), cte_return_privilege_alias.clone()))
912 .equals(user_privilege::Column::DependentId),
913 )
914 .to_owned();
915
916 let mut common_table_expr = CommonTableExpression::new();
917 common_table_expr
918 .query(base_query.union(UnionType::All, cte_referencing).to_owned())
919 .columns([
920 cte_return_privilege_alias.clone(),
921 cte_return_user_alias.clone(),
922 ])
923 .table_name(cte_alias.clone());
924
925 SelectStatement::new()
926 .columns([cte_return_privilege_alias, cte_return_user_alias])
927 .from(cte_alias)
928 .to_owned()
929 .with(
930 WithClause::new()
931 .recursive(true)
932 .cte(common_table_expr)
933 .to_owned(),
934 )
935}
936
937pub async fn get_internal_tables_by_id<C>(job_id: JobId, db: &C) -> MetaResult<Vec<TableId>>
938where
939 C: ConnectionTrait,
940{
941 let table_ids: Vec<TableId> = Table::find()
942 .select_only()
943 .column(table::Column::TableId)
944 .filter(
945 table::Column::TableType
946 .eq(TableType::Internal)
947 .and(table::Column::BelongsToJobId.eq(job_id)),
948 )
949 .into_tuple()
950 .all(db)
951 .await?;
952 Ok(table_ids)
953}
954
955pub async fn get_index_state_tables_by_table_id<C>(
956 table_id: TableId,
957 db: &C,
958) -> MetaResult<Vec<TableId>>
959where
960 C: ConnectionTrait,
961{
962 let mut index_table_ids: Vec<TableId> = Index::find()
963 .select_only()
964 .column(index::Column::IndexTableId)
965 .filter(index::Column::PrimaryTableId.eq(table_id))
966 .into_tuple()
967 .all(db)
968 .await?;
969
970 if !index_table_ids.is_empty() {
971 let internal_table_ids: Vec<TableId> = Table::find()
972 .select_only()
973 .column(table::Column::TableId)
974 .filter(
975 table::Column::TableType
976 .eq(TableType::Internal)
977 .and(table::Column::BelongsToJobId.is_in(index_table_ids.clone())),
978 )
979 .into_tuple()
980 .all(db)
981 .await?;
982
983 index_table_ids.extend(internal_table_ids.into_iter());
984 }
985
986 Ok(index_table_ids)
987}
988
989#[derive(Clone, DerivePartialModel, FromQueryResult)]
990#[sea_orm(entity = "UserPrivilege")]
991pub struct PartialUserPrivilege {
992 pub id: PrivilegeId,
993 pub user_id: UserId,
994}
995
996pub async fn get_referring_privileges_cascade<C>(
997 ids: Vec<PrivilegeId>,
998 db: &C,
999) -> MetaResult<Vec<PartialUserPrivilege>>
1000where
1001 C: ConnectionTrait,
1002{
1003 let query = construct_privilege_dependency_query(ids);
1004 let (sql, values) = query.build_any(&*db.get_database_backend().get_query_builder());
1005 let privileges = PartialUserPrivilege::find_by_statement(Statement::from_sql_and_values(
1006 db.get_database_backend(),
1007 sql,
1008 values,
1009 ))
1010 .all(db)
1011 .await?;
1012
1013 Ok(privileges)
1014}
1015
1016pub async fn ensure_privileges_not_referred<C>(ids: Vec<PrivilegeId>, db: &C) -> MetaResult<()>
1018where
1019 C: ConnectionTrait,
1020{
1021 let count = UserPrivilege::find()
1022 .filter(user_privilege::Column::DependentId.is_in(ids))
1023 .count(db)
1024 .await?;
1025 if count != 0 {
1026 return Err(MetaError::permission_denied(format!(
1027 "privileges granted to {} other ones.",
1028 count
1029 )));
1030 }
1031 Ok(())
1032}
1033
1034pub async fn get_user_privilege<C>(user_id: UserId, db: &C) -> MetaResult<Vec<PbGrantPrivilege>>
1036where
1037 C: ConnectionTrait,
1038{
1039 let user_privileges = UserPrivilege::find()
1040 .find_also_related(Object)
1041 .filter(user_privilege::Column::UserId.eq(user_id))
1042 .all(db)
1043 .await?;
1044 Ok(user_privileges
1045 .into_iter()
1046 .map(|(privilege, object)| {
1047 let object = object.unwrap();
1048 let oid = object.oid as _;
1049 let obj = match object.obj_type {
1050 ObjectType::Database => PbGrantObject::DatabaseId(oid),
1051 ObjectType::Schema => PbGrantObject::SchemaId(oid),
1052 ObjectType::Table | ObjectType::Index => PbGrantObject::TableId(oid),
1053 ObjectType::Source => PbGrantObject::SourceId(oid),
1054 ObjectType::Sink => PbGrantObject::SinkId(oid),
1055 ObjectType::View => PbGrantObject::ViewId(oid),
1056 ObjectType::Function => PbGrantObject::FunctionId(oid),
1057 ObjectType::Connection => PbGrantObject::ConnectionId(oid),
1058 ObjectType::Subscription => PbGrantObject::SubscriptionId(oid),
1059 ObjectType::Secret => PbGrantObject::SecretId(oid),
1060 };
1061 PbGrantPrivilege {
1062 action_with_opts: vec![PbActionWithGrantOption {
1063 action: PbAction::from(privilege.action) as _,
1064 with_grant_option: privilege.with_grant_option,
1065 granted_by: privilege.granted_by as _,
1066 }],
1067 object: Some(obj),
1068 }
1069 })
1070 .collect())
1071}
1072
1073pub async fn get_table_columns(
1074 txn: &impl ConnectionTrait,
1075 id: TableId,
1076) -> MetaResult<ColumnCatalogArray> {
1077 let columns = Table::find_by_id(id)
1078 .select_only()
1079 .columns([table::Column::Columns])
1080 .into_tuple::<ColumnCatalogArray>()
1081 .one(txn)
1082 .await?
1083 .ok_or_else(|| MetaError::catalog_id_not_found("table", id))?;
1084 Ok(columns)
1085}
1086
1087pub async fn grant_default_privileges_automatically<C>(
1090 db: &C,
1091 object_id: ObjectId,
1092) -> MetaResult<Vec<PbUserInfo>>
1093where
1094 C: ConnectionTrait,
1095{
1096 let object = Object::find_by_id(object_id)
1097 .one(db)
1098 .await?
1099 .ok_or_else(|| MetaError::catalog_id_not_found("object", object_id))?;
1100 assert_ne!(object.obj_type, ObjectType::Database);
1101
1102 let for_mview_filter = if object.obj_type == ObjectType::Table {
1103 let table_type = Table::find_by_id(TableId::new(object_id as _))
1104 .select_only()
1105 .column(table::Column::TableType)
1106 .into_tuple::<TableType>()
1107 .one(db)
1108 .await?
1109 .ok_or_else(|| MetaError::catalog_id_not_found("table", object_id))?;
1110 user_default_privilege::Column::ForMaterializedView
1111 .eq(table_type == TableType::MaterializedView)
1112 } else {
1113 user_default_privilege::Column::ForMaterializedView.eq(false)
1114 };
1115 let schema_filter = if let Some(schema_id) = &object.schema_id {
1116 user_default_privilege::Column::SchemaId.eq(*schema_id)
1117 } else {
1118 user_default_privilege::Column::SchemaId.is_null()
1119 };
1120
1121 let default_privileges: Vec<(UserId, UserId, Action, bool)> = UserDefaultPrivilege::find()
1122 .select_only()
1123 .columns([
1124 user_default_privilege::Column::Grantee,
1125 user_default_privilege::Column::GrantedBy,
1126 user_default_privilege::Column::Action,
1127 user_default_privilege::Column::WithGrantOption,
1128 ])
1129 .filter(
1130 user_default_privilege::Column::DatabaseId
1131 .eq(object.database_id.unwrap())
1132 .and(schema_filter)
1133 .and(user_default_privilege::Column::UserId.eq(object.owner_id))
1134 .and(user_default_privilege::Column::ObjectType.eq(object.obj_type))
1135 .and(for_mview_filter),
1136 )
1137 .into_tuple()
1138 .all(db)
1139 .await?;
1140 if default_privileges.is_empty() {
1141 return Ok(vec![]);
1142 }
1143
1144 let updated_user_ids = default_privileges
1145 .iter()
1146 .map(|(grantee, _, _, _)| *grantee)
1147 .collect::<HashSet<_>>();
1148
1149 let internal_table_ids = get_internal_tables_by_id(JobId::new(object_id as _), db).await?;
1150
1151 for (grantee, granted_by, action, with_grant_option) in default_privileges {
1152 UserPrivilege::insert(user_privilege::ActiveModel {
1153 user_id: Set(grantee),
1154 oid: Set(object_id),
1155 granted_by: Set(granted_by),
1156 action: Set(action),
1157 with_grant_option: Set(with_grant_option),
1158 ..Default::default()
1159 })
1160 .exec(db)
1161 .await?;
1162 if action == Action::Select && !internal_table_ids.is_empty() {
1163 for internal_table_id in &internal_table_ids {
1165 UserPrivilege::insert(user_privilege::ActiveModel {
1166 user_id: Set(grantee),
1167 oid: Set(internal_table_id.as_raw_id() as _),
1168 granted_by: Set(granted_by),
1169 action: Set(Action::Select),
1170 with_grant_option: Set(with_grant_option),
1171 ..Default::default()
1172 })
1173 .exec(db)
1174 .await?;
1175 }
1176 }
1177 }
1178
1179 let updated_user_infos = list_user_info_by_ids(updated_user_ids, db).await?;
1180 Ok(updated_user_infos)
1181}
1182
1183pub fn extract_grant_obj_id(object: &PbGrantObject) -> ObjectId {
1185 match object {
1186 PbGrantObject::DatabaseId(id)
1187 | PbGrantObject::SchemaId(id)
1188 | PbGrantObject::TableId(id)
1189 | PbGrantObject::SourceId(id)
1190 | PbGrantObject::SinkId(id)
1191 | PbGrantObject::ViewId(id)
1192 | PbGrantObject::FunctionId(id)
1193 | PbGrantObject::SubscriptionId(id)
1194 | PbGrantObject::ConnectionId(id)
1195 | PbGrantObject::SecretId(id) => *id as _,
1196 }
1197}
1198
1199pub async fn insert_fragment_relations(
1200 db: &impl ConnectionTrait,
1201 downstream_fragment_relations: &FragmentDownstreamRelation,
1202) -> MetaResult<()> {
1203 for (upstream_fragment_id, downstreams) in downstream_fragment_relations {
1204 for downstream in downstreams {
1205 let relation = fragment_relation::Model {
1206 source_fragment_id: *upstream_fragment_id as _,
1207 target_fragment_id: downstream.downstream_fragment_id as _,
1208 dispatcher_type: downstream.dispatcher_type,
1209 dist_key_indices: downstream
1210 .dist_key_indices
1211 .iter()
1212 .map(|idx| *idx as i32)
1213 .collect_vec()
1214 .into(),
1215 output_indices: downstream
1216 .output_mapping
1217 .indices
1218 .iter()
1219 .map(|idx| *idx as i32)
1220 .collect_vec()
1221 .into(),
1222 output_type_mapping: Some(downstream.output_mapping.types.clone().into()),
1223 };
1224 FragmentRelation::insert(relation.into_active_model())
1225 .exec(db)
1226 .await?;
1227 }
1228 }
1229 Ok(())
1230}
1231
1232pub fn compose_dispatchers(
1233 source_fragment_distribution: DistributionType,
1234 source_fragment_actors: &HashMap<crate::model::ActorId, Option<Bitmap>>,
1235 target_fragment_id: crate::model::FragmentId,
1236 target_fragment_distribution: DistributionType,
1237 target_fragment_actors: &HashMap<crate::model::ActorId, Option<Bitmap>>,
1238 dispatcher_type: DispatcherType,
1239 dist_key_indices: Vec<u32>,
1240 output_mapping: PbDispatchOutputMapping,
1241) -> HashMap<crate::model::ActorId, PbDispatcher> {
1242 match dispatcher_type {
1243 DispatcherType::Hash => {
1244 let dispatcher = PbDispatcher {
1245 r#type: PbDispatcherType::from(dispatcher_type) as _,
1246 dist_key_indices,
1247 output_mapping: output_mapping.into(),
1248 hash_mapping: Some(
1249 ActorMapping::from_bitmaps(
1250 &target_fragment_actors
1251 .iter()
1252 .map(|(actor_id, bitmap)| {
1253 (
1254 *actor_id as _,
1255 bitmap
1256 .clone()
1257 .expect("downstream hash dispatch must have distribution"),
1258 )
1259 })
1260 .collect(),
1261 )
1262 .to_protobuf(),
1263 ),
1264 dispatcher_id: target_fragment_id as _,
1265 downstream_actor_id: target_fragment_actors
1266 .keys()
1267 .map(|actor_id| *actor_id as _)
1268 .collect(),
1269 };
1270 source_fragment_actors
1271 .keys()
1272 .map(|source_actor_id| (*source_actor_id, dispatcher.clone()))
1273 .collect()
1274 }
1275 DispatcherType::Broadcast | DispatcherType::Simple => {
1276 let dispatcher = PbDispatcher {
1277 r#type: PbDispatcherType::from(dispatcher_type) as _,
1278 dist_key_indices,
1279 output_mapping: output_mapping.into(),
1280 hash_mapping: None,
1281 dispatcher_id: target_fragment_id as _,
1282 downstream_actor_id: target_fragment_actors
1283 .keys()
1284 .map(|actor_id| *actor_id as _)
1285 .collect(),
1286 };
1287 source_fragment_actors
1288 .keys()
1289 .map(|source_actor_id| (*source_actor_id, dispatcher.clone()))
1290 .collect()
1291 }
1292 DispatcherType::NoShuffle => resolve_no_shuffle_actor_dispatcher(
1293 source_fragment_distribution,
1294 source_fragment_actors,
1295 target_fragment_distribution,
1296 target_fragment_actors,
1297 )
1298 .into_iter()
1299 .map(|(upstream_actor_id, downstream_actor_id)| {
1300 (
1301 upstream_actor_id,
1302 PbDispatcher {
1303 r#type: PbDispatcherType::NoShuffle as _,
1304 dist_key_indices: dist_key_indices.clone(),
1305 output_mapping: output_mapping.clone().into(),
1306 hash_mapping: None,
1307 dispatcher_id: target_fragment_id as _,
1308 downstream_actor_id: vec![downstream_actor_id as _],
1309 },
1310 )
1311 })
1312 .collect(),
1313 }
1314}
1315
1316pub fn resolve_no_shuffle_actor_dispatcher(
1318 source_fragment_distribution: DistributionType,
1319 source_fragment_actors: &HashMap<crate::model::ActorId, Option<Bitmap>>,
1320 target_fragment_distribution: DistributionType,
1321 target_fragment_actors: &HashMap<crate::model::ActorId, Option<Bitmap>>,
1322) -> Vec<(crate::model::ActorId, crate::model::ActorId)> {
1323 assert_eq!(source_fragment_distribution, target_fragment_distribution);
1324 assert_eq!(
1325 source_fragment_actors.len(),
1326 target_fragment_actors.len(),
1327 "no-shuffle should have equal upstream downstream actor count: {:?} {:?}",
1328 source_fragment_actors,
1329 target_fragment_actors
1330 );
1331 match source_fragment_distribution {
1332 DistributionType::Single => {
1333 let assert_singleton = |bitmap: &Option<Bitmap>| {
1334 assert!(
1335 bitmap.as_ref().map(|bitmap| bitmap.all()).unwrap_or(true),
1336 "not singleton: {:?}",
1337 bitmap
1338 );
1339 };
1340 assert_eq!(
1341 source_fragment_actors.len(),
1342 1,
1343 "singleton distribution actor count not 1: {:?}",
1344 source_fragment_distribution
1345 );
1346 assert_eq!(
1347 target_fragment_actors.len(),
1348 1,
1349 "singleton distribution actor count not 1: {:?}",
1350 target_fragment_distribution
1351 );
1352 let (source_actor_id, bitmap) = source_fragment_actors.iter().next().unwrap();
1353 assert_singleton(bitmap);
1354 let (target_actor_id, bitmap) = target_fragment_actors.iter().next().unwrap();
1355 assert_singleton(bitmap);
1356 vec![(*source_actor_id, *target_actor_id)]
1357 }
1358 DistributionType::Hash => {
1359 let mut target_fragment_actor_index: HashMap<_, _> = target_fragment_actors
1360 .iter()
1361 .map(|(actor_id, bitmap)| {
1362 let bitmap = bitmap
1363 .as_ref()
1364 .expect("hash distribution should have bitmap");
1365 let first_vnode = bitmap.iter_vnodes().next().expect("non-empty bitmap");
1366 (first_vnode, (*actor_id, bitmap))
1367 })
1368 .collect();
1369 source_fragment_actors
1370 .iter()
1371 .map(|(source_actor_id, bitmap)| {
1372 let bitmap = bitmap
1373 .as_ref()
1374 .expect("hash distribution should have bitmap");
1375 let first_vnode = bitmap.iter_vnodes().next().expect("non-empty bitmap");
1376 let (target_actor_id, target_bitmap) =
1377 target_fragment_actor_index.remove(&first_vnode).unwrap_or_else(|| {
1378 panic!(
1379 "cannot find matched target actor: {} {:?} {:?} {:?}",
1380 source_actor_id,
1381 first_vnode,
1382 source_fragment_actors,
1383 target_fragment_actors
1384 );
1385 });
1386 assert_eq!(
1387 bitmap,
1388 target_bitmap,
1389 "cannot find matched target actor due to bitmap mismatch: {} {:?} {:?} {:?}",
1390 source_actor_id,
1391 first_vnode,
1392 source_fragment_actors,
1393 target_fragment_actors
1394 );
1395 (*source_actor_id, target_actor_id)
1396 }).collect()
1397 }
1398 }
1399}
1400
1401pub fn rebuild_fragment_mapping(fragment: &SharedFragmentInfo) -> PbFragmentWorkerSlotMapping {
1402 let fragment_worker_slot_mapping = match fragment.distribution_type {
1403 DistributionType::Single => {
1404 let actor = fragment.actors.values().exactly_one().unwrap();
1405 WorkerSlotMapping::new_single(WorkerSlotId::new(actor.worker_id as _, 0))
1406 }
1407 DistributionType::Hash => {
1408 let actor_bitmaps: HashMap<_, _> = fragment
1409 .actors
1410 .iter()
1411 .map(|(actor_id, actor_info)| {
1412 let vnode_bitmap = actor_info
1413 .vnode_bitmap
1414 .as_ref()
1415 .cloned()
1416 .expect("actor bitmap shouldn't be none in hash fragment");
1417
1418 (*actor_id as hash::ActorId, vnode_bitmap)
1419 })
1420 .collect();
1421
1422 let actor_mapping = ActorMapping::from_bitmaps(&actor_bitmaps);
1423
1424 let actor_locations = fragment
1425 .actors
1426 .iter()
1427 .map(|(actor_id, actor_info)| {
1428 (*actor_id as hash::ActorId, actor_info.worker_id as u32)
1429 })
1430 .collect();
1431
1432 actor_mapping.to_worker_slot(&actor_locations)
1433 }
1434 };
1435
1436 PbFragmentWorkerSlotMapping {
1437 fragment_id: fragment.fragment_id,
1438 mapping: Some(fragment_worker_slot_mapping.to_protobuf()),
1439 }
1440}
1441
1442pub async fn get_fragments_for_jobs<C>(
1448 db: &C,
1449 actor_info: &SharedActorInfos,
1450 streaming_jobs: Vec<JobId>,
1451) -> MetaResult<(
1452 HashMap<SourceId, BTreeSet<FragmentId>>,
1453 HashSet<FragmentId>,
1454 HashSet<ActorId>,
1455 HashSet<FragmentId>,
1456)>
1457where
1458 C: ConnectionTrait,
1459{
1460 if streaming_jobs.is_empty() {
1461 return Ok((
1462 HashMap::default(),
1463 HashSet::default(),
1464 HashSet::default(),
1465 HashSet::default(),
1466 ));
1467 }
1468
1469 let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find()
1470 .select_only()
1471 .columns([
1472 fragment::Column::FragmentId,
1473 fragment::Column::FragmentTypeMask,
1474 fragment::Column::StreamNode,
1475 ])
1476 .filter(fragment::Column::JobId.is_in(streaming_jobs))
1477 .into_tuple()
1478 .all(db)
1479 .await?;
1480
1481 let fragment_ids: HashSet<_> = fragments
1482 .iter()
1483 .map(|(fragment_id, _, _)| *fragment_id)
1484 .collect();
1485
1486 let actors = {
1487 let guard = actor_info.read_guard();
1488 fragment_ids
1489 .iter()
1490 .flat_map(|id| guard.get_fragment(*id as _))
1491 .flat_map(|f| f.actors.keys().cloned().map(|id| id as _))
1492 .collect::<HashSet<_>>()
1493 };
1494
1495 let mut source_fragment_ids: HashMap<SourceId, BTreeSet<FragmentId>> = HashMap::new();
1496 let mut sink_fragment_ids: HashSet<FragmentId> = HashSet::new();
1497 for (fragment_id, mask, stream_node) in fragments {
1498 if FragmentTypeMask::from(mask).contains(FragmentTypeFlag::Source)
1499 && let Some(source_id) = stream_node.to_protobuf().find_stream_source()
1500 {
1501 source_fragment_ids
1502 .entry(source_id as _)
1503 .or_default()
1504 .insert(fragment_id);
1505 }
1506 if FragmentTypeMask::from(mask).contains(FragmentTypeFlag::Sink) {
1507 sink_fragment_ids.insert(fragment_id);
1508 }
1509 }
1510
1511 Ok((
1512 source_fragment_ids,
1513 sink_fragment_ids,
1514 actors.into_iter().collect(),
1515 fragment_ids,
1516 ))
1517}
1518
1519pub(crate) fn build_object_group_for_delete(
1524 partial_objects: Vec<PartialObject>,
1525) -> NotificationInfo {
1526 let mut objects = vec![];
1527 for obj in partial_objects {
1528 match obj.obj_type {
1529 ObjectType::Database => objects.push(PbObject {
1530 object_info: Some(PbObjectInfo::Database(PbDatabase {
1531 id: (obj.oid as u32).into(),
1532 ..Default::default()
1533 })),
1534 }),
1535 ObjectType::Schema => objects.push(PbObject {
1536 object_info: Some(PbObjectInfo::Schema(PbSchema {
1537 id: (obj.oid as u32).into(),
1538 database_id: obj.database_id.unwrap(),
1539 ..Default::default()
1540 })),
1541 }),
1542 ObjectType::Table => objects.push(PbObject {
1543 object_info: Some(PbObjectInfo::Table(PbTable {
1544 id: (obj.oid as u32).into(),
1545 schema_id: obj.schema_id.unwrap(),
1546 database_id: obj.database_id.unwrap(),
1547 ..Default::default()
1548 })),
1549 }),
1550 ObjectType::Source => objects.push(PbObject {
1551 object_info: Some(PbObjectInfo::Source(PbSource {
1552 id: obj.oid as _,
1553 schema_id: obj.schema_id.unwrap(),
1554 database_id: obj.database_id.unwrap(),
1555 ..Default::default()
1556 })),
1557 }),
1558 ObjectType::Sink => objects.push(PbObject {
1559 object_info: Some(PbObjectInfo::Sink(PbSink {
1560 id: obj.oid as _,
1561 schema_id: obj.schema_id.unwrap(),
1562 database_id: obj.database_id.unwrap(),
1563 ..Default::default()
1564 })),
1565 }),
1566 ObjectType::Subscription => objects.push(PbObject {
1567 object_info: Some(PbObjectInfo::Subscription(PbSubscription {
1568 id: obj.oid as _,
1569 schema_id: obj.schema_id.unwrap(),
1570 database_id: obj.database_id.unwrap(),
1571 ..Default::default()
1572 })),
1573 }),
1574 ObjectType::View => objects.push(PbObject {
1575 object_info: Some(PbObjectInfo::View(PbView {
1576 id: obj.oid as _,
1577 schema_id: obj.schema_id.unwrap(),
1578 database_id: obj.database_id.unwrap(),
1579 ..Default::default()
1580 })),
1581 }),
1582 ObjectType::Index => {
1583 objects.push(PbObject {
1584 object_info: Some(PbObjectInfo::Index(PbIndex {
1585 id: obj.oid as _,
1586 schema_id: obj.schema_id.unwrap(),
1587 database_id: obj.database_id.unwrap(),
1588 ..Default::default()
1589 })),
1590 });
1591 objects.push(PbObject {
1592 object_info: Some(PbObjectInfo::Table(PbTable {
1593 id: (obj.oid as u32).into(),
1594 schema_id: obj.schema_id.unwrap(),
1595 database_id: obj.database_id.unwrap(),
1596 ..Default::default()
1597 })),
1598 });
1599 }
1600 ObjectType::Function => objects.push(PbObject {
1601 object_info: Some(PbObjectInfo::Function(PbFunction {
1602 id: obj.oid as _,
1603 schema_id: obj.schema_id.unwrap(),
1604 database_id: obj.database_id.unwrap(),
1605 ..Default::default()
1606 })),
1607 }),
1608 ObjectType::Connection => objects.push(PbObject {
1609 object_info: Some(PbObjectInfo::Connection(PbConnection {
1610 id: obj.oid as _,
1611 schema_id: obj.schema_id.unwrap(),
1612 database_id: obj.database_id.unwrap(),
1613 ..Default::default()
1614 })),
1615 }),
1616 ObjectType::Secret => objects.push(PbObject {
1617 object_info: Some(PbObjectInfo::Secret(PbSecret {
1618 id: obj.oid as _,
1619 schema_id: obj.schema_id.unwrap(),
1620 database_id: obj.database_id.unwrap(),
1621 ..Default::default()
1622 })),
1623 }),
1624 }
1625 }
1626 NotificationInfo::ObjectGroup(PbObjectGroup { objects })
1627}
1628
1629pub fn extract_external_table_name_from_definition(table_definition: &str) -> Option<String> {
1630 let [mut definition]: [_; 1] = Parser::parse_sql(table_definition)
1631 .context("unable to parse table definition")
1632 .inspect_err(|e| {
1633 tracing::error!(
1634 target: "auto_schema_change",
1635 error = %e.as_report(),
1636 "failed to parse table definition")
1637 })
1638 .unwrap()
1639 .try_into()
1640 .unwrap();
1641 if let SqlStatement::CreateTable { cdc_table_info, .. } = &mut definition {
1642 cdc_table_info
1643 .clone()
1644 .map(|cdc_table_info| cdc_table_info.external_table_name)
1645 } else {
1646 None
1647 }
1648}
1649
1650pub async fn rename_relation(
1653 txn: &DatabaseTransaction,
1654 object_type: ObjectType,
1655 object_id: ObjectId,
1656 object_name: &str,
1657) -> MetaResult<(Vec<PbObject>, String)> {
1658 use sea_orm::ActiveModelTrait;
1659
1660 use crate::controller::rename::alter_relation_rename;
1661
1662 let mut to_update_relations = vec![];
1663 macro_rules! rename_relation {
1665 ($entity:ident, $table:ident, $identity:ident, $object_id:expr) => {{
1666 let (mut relation, obj) = $entity::find_by_id($object_id)
1667 .find_also_related(Object)
1668 .one(txn)
1669 .await?
1670 .unwrap();
1671 let obj = obj.unwrap();
1672 let old_name = relation.name.clone();
1673 relation.name = object_name.into();
1674 if obj.obj_type != ObjectType::View {
1675 relation.definition = alter_relation_rename(&relation.definition, object_name);
1676 }
1677 let active_model = $table::ActiveModel {
1678 $identity: Set(relation.$identity),
1679 name: Set(object_name.into()),
1680 definition: Set(relation.definition.clone()),
1681 ..Default::default()
1682 };
1683 active_model.update(txn).await?;
1684 to_update_relations.push(PbObject {
1685 object_info: Some(PbObjectInfo::$entity(ObjectModel(relation, obj).into())),
1686 });
1687 old_name
1688 }};
1689 }
1690 let old_name = match object_type {
1692 ObjectType::Table => {
1693 let associated_source_id: Option<SourceId> = Source::find()
1694 .select_only()
1695 .column(source::Column::SourceId)
1696 .filter(source::Column::OptionalAssociatedTableId.eq(object_id))
1697 .into_tuple()
1698 .one(txn)
1699 .await?;
1700 if let Some(source_id) = associated_source_id {
1701 rename_relation!(Source, source, source_id, source_id);
1702 }
1703 rename_relation!(Table, table, table_id, TableId::new(object_id as _))
1704 }
1705 ObjectType::Source => rename_relation!(Source, source, source_id, object_id),
1706 ObjectType::Sink => rename_relation!(Sink, sink, sink_id, object_id),
1707 ObjectType::Subscription => {
1708 rename_relation!(Subscription, subscription, subscription_id, object_id)
1709 }
1710 ObjectType::View => rename_relation!(View, view, view_id, object_id),
1711 ObjectType::Index => {
1712 let (mut index, obj) = Index::find_by_id(object_id)
1713 .find_also_related(Object)
1714 .one(txn)
1715 .await?
1716 .unwrap();
1717 index.name = object_name.into();
1718 let index_table_id = index.index_table_id;
1719 let old_name = rename_relation!(Table, table, table_id, index_table_id);
1720
1721 let active_model = index::ActiveModel {
1723 index_id: sea_orm::ActiveValue::Set(index.index_id),
1724 name: sea_orm::ActiveValue::Set(object_name.into()),
1725 ..Default::default()
1726 };
1727 active_model.update(txn).await?;
1728 to_update_relations.push(PbObject {
1729 object_info: Some(PbObjectInfo::Index(ObjectModel(index, obj.unwrap()).into())),
1730 });
1731 old_name
1732 }
1733 _ => unreachable!("only relation name can be altered."),
1734 };
1735
1736 Ok((to_update_relations, old_name))
1737}
1738
1739pub async fn get_database_resource_group<C>(txn: &C, database_id: DatabaseId) -> MetaResult<String>
1740where
1741 C: ConnectionTrait,
1742{
1743 let database_resource_group: Option<String> = Database::find_by_id(database_id)
1744 .select_only()
1745 .column(database::Column::ResourceGroup)
1746 .into_tuple()
1747 .one(txn)
1748 .await?
1749 .ok_or_else(|| MetaError::catalog_id_not_found("database", database_id))?;
1750
1751 Ok(database_resource_group.unwrap_or_else(|| DEFAULT_RESOURCE_GROUP.to_owned()))
1752}
1753
1754pub async fn get_existing_job_resource_group<C>(
1755 txn: &C,
1756 streaming_job_id: JobId,
1757) -> MetaResult<String>
1758where
1759 C: ConnectionTrait,
1760{
1761 let (job_specific_resource_group, database_resource_group): (Option<String>, Option<String>) =
1762 StreamingJob::find_by_id(streaming_job_id)
1763 .select_only()
1764 .join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
1765 .join(JoinType::InnerJoin, object::Relation::Database2.def())
1766 .column(streaming_job::Column::SpecificResourceGroup)
1767 .column(database::Column::ResourceGroup)
1768 .into_tuple()
1769 .one(txn)
1770 .await?
1771 .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", streaming_job_id))?;
1772
1773 Ok(job_specific_resource_group.unwrap_or_else(|| {
1774 database_resource_group.unwrap_or_else(|| DEFAULT_RESOURCE_GROUP.to_owned())
1775 }))
1776}
1777
1778pub fn filter_workers_by_resource_group(
1779 workers: &HashMap<u32, WorkerNode>,
1780 resource_group: &str,
1781) -> BTreeSet<WorkerId> {
1782 workers
1783 .iter()
1784 .filter(|&(_, worker)| {
1785 worker
1786 .resource_group()
1787 .map(|node_label| node_label.as_str() == resource_group)
1788 .unwrap_or(false)
1789 })
1790 .map(|(id, _)| *id as WorkerId)
1791 .collect()
1792}
1793
1794pub async fn rename_relation_refer(
1797 txn: &DatabaseTransaction,
1798 object_type: ObjectType,
1799 object_id: ObjectId,
1800 object_name: &str,
1801 old_name: &str,
1802) -> MetaResult<Vec<PbObject>> {
1803 use sea_orm::ActiveModelTrait;
1804
1805 use crate::controller::rename::alter_relation_rename_refs;
1806
1807 let mut to_update_relations = vec![];
1808 macro_rules! rename_relation_ref {
1809 ($entity:ident, $table:ident, $identity:ident, $object_id:expr) => {{
1810 let (mut relation, obj) = $entity::find_by_id($object_id)
1811 .find_also_related(Object)
1812 .one(txn)
1813 .await?
1814 .unwrap();
1815 relation.definition =
1816 alter_relation_rename_refs(&relation.definition, old_name, object_name);
1817 let active_model = $table::ActiveModel {
1818 $identity: Set(relation.$identity),
1819 definition: Set(relation.definition.clone()),
1820 ..Default::default()
1821 };
1822 active_model.update(txn).await?;
1823 to_update_relations.push(PbObject {
1824 object_info: Some(PbObjectInfo::$entity(
1825 ObjectModel(relation, obj.unwrap()).into(),
1826 )),
1827 });
1828 }};
1829 }
1830 let mut objs = get_referring_objects(object_id, txn).await?;
1831 if object_type == ObjectType::Table {
1832 let incoming_sinks: Vec<SinkId> = Sink::find()
1833 .select_only()
1834 .column(sink::Column::SinkId)
1835 .filter(sink::Column::TargetTable.eq(object_id))
1836 .into_tuple()
1837 .all(txn)
1838 .await?;
1839
1840 objs.extend(incoming_sinks.into_iter().map(|id| PartialObject {
1841 oid: id,
1842 obj_type: ObjectType::Sink,
1843 schema_id: None,
1844 database_id: None,
1845 }));
1846 }
1847
1848 for obj in objs {
1849 match obj.obj_type {
1850 ObjectType::Table => {
1851 rename_relation_ref!(Table, table, table_id, TableId::new(obj.oid as _))
1852 }
1853 ObjectType::Sink => rename_relation_ref!(Sink, sink, sink_id, obj.oid),
1854 ObjectType::Subscription => {
1855 rename_relation_ref!(Subscription, subscription, subscription_id, obj.oid)
1856 }
1857 ObjectType::View => rename_relation_ref!(View, view, view_id, obj.oid),
1858 ObjectType::Index => {
1859 let index_table_id: Option<TableId> = Index::find_by_id(obj.oid)
1860 .select_only()
1861 .column(index::Column::IndexTableId)
1862 .into_tuple()
1863 .one(txn)
1864 .await?;
1865 rename_relation_ref!(Table, table, table_id, index_table_id.unwrap());
1866 }
1867 _ => {
1868 bail!(
1869 "only the table, sink, subscription, view and index will depend on other objects."
1870 )
1871 }
1872 }
1873 }
1874
1875 Ok(to_update_relations)
1876}
1877
1878pub async fn validate_subscription_deletion<C>(txn: &C, subscription_id: ObjectId) -> MetaResult<()>
1882where
1883 C: ConnectionTrait,
1884{
1885 let upstream_table_id: ObjectId = Subscription::find_by_id(subscription_id)
1886 .select_only()
1887 .column(subscription::Column::DependentTableId)
1888 .into_tuple()
1889 .one(txn)
1890 .await?
1891 .ok_or_else(|| MetaError::catalog_id_not_found("subscription", subscription_id))?;
1892
1893 let cnt = Subscription::find()
1894 .filter(subscription::Column::DependentTableId.eq(upstream_table_id))
1895 .count(txn)
1896 .await?;
1897 if cnt > 1 {
1898 return Ok(());
1901 }
1902
1903 let obj_alias = Alias::new("o1");
1905 let used_by_alias = Alias::new("o2");
1906 let count = ObjectDependency::find()
1907 .join_as(
1908 JoinType::InnerJoin,
1909 object_dependency::Relation::Object2.def(),
1910 obj_alias.clone(),
1911 )
1912 .join_as(
1913 JoinType::InnerJoin,
1914 object_dependency::Relation::Object1.def(),
1915 used_by_alias.clone(),
1916 )
1917 .filter(
1918 object_dependency::Column::Oid
1919 .eq(upstream_table_id)
1920 .and(object_dependency::Column::UsedBy.ne(subscription_id))
1921 .and(
1922 Expr::col((obj_alias, object::Column::DatabaseId))
1923 .ne(Expr::col((used_by_alias, object::Column::DatabaseId))),
1924 ),
1925 )
1926 .count(txn)
1927 .await?;
1928
1929 if count != 0 {
1930 return Err(MetaError::permission_denied(format!(
1931 "Referenced by {} cross-db objects.",
1932 count
1933 )));
1934 }
1935
1936 Ok(())
1937}
1938
1939pub async fn fetch_target_fragments<C>(
1940 txn: &C,
1941 src_fragment_id: impl IntoIterator<Item = FragmentId>,
1942) -> MetaResult<HashMap<FragmentId, Vec<FragmentId>>>
1943where
1944 C: ConnectionTrait,
1945{
1946 let source_target_fragments: Vec<(FragmentId, FragmentId)> = FragmentRelation::find()
1947 .select_only()
1948 .columns([
1949 fragment_relation::Column::SourceFragmentId,
1950 fragment_relation::Column::TargetFragmentId,
1951 ])
1952 .filter(fragment_relation::Column::SourceFragmentId.is_in(src_fragment_id))
1953 .into_tuple()
1954 .all(txn)
1955 .await?;
1956
1957 let source_target_fragments = source_target_fragments.into_iter().into_group_map();
1958
1959 Ok(source_target_fragments)
1960}
1961
1962pub async fn get_sink_fragment_by_ids<C>(
1963 txn: &C,
1964 sink_ids: Vec<SinkId>,
1965) -> MetaResult<HashMap<SinkId, FragmentId>>
1966where
1967 C: ConnectionTrait,
1968{
1969 let sink_num = sink_ids.len();
1970 let sink_fragment_ids: Vec<(SinkId, FragmentId)> = Fragment::find()
1971 .select_only()
1972 .columns([fragment::Column::JobId, fragment::Column::FragmentId])
1973 .filter(
1974 fragment::Column::JobId
1975 .is_in(sink_ids)
1976 .and(FragmentTypeMask::intersects(FragmentTypeFlag::Sink)),
1977 )
1978 .into_tuple()
1979 .all(txn)
1980 .await?;
1981
1982 if sink_fragment_ids.len() != sink_num {
1983 return Err(anyhow::anyhow!(
1984 "expected exactly one sink fragment for each sink, but got {} fragments for {} sinks",
1985 sink_fragment_ids.len(),
1986 sink_num
1987 )
1988 .into());
1989 }
1990
1991 Ok(sink_fragment_ids.into_iter().collect())
1992}
1993
1994pub async fn has_table_been_migrated<C>(txn: &C, table_id: TableId) -> MetaResult<bool>
1995where
1996 C: ConnectionTrait,
1997{
1998 let mview_fragment: Vec<i32> = Fragment::find()
1999 .select_only()
2000 .column(fragment::Column::FragmentTypeMask)
2001 .filter(
2002 fragment::Column::JobId
2003 .eq(table_id)
2004 .and(FragmentTypeMask::intersects(FragmentTypeFlag::Mview)),
2005 )
2006 .into_tuple()
2007 .all(txn)
2008 .await?;
2009
2010 let mview_fragment_len = mview_fragment.len();
2011 if mview_fragment_len != 1 {
2012 bail!(
2013 "expected exactly one mview fragment for table {}, found {}",
2014 table_id,
2015 mview_fragment_len
2016 );
2017 }
2018
2019 let mview_fragment = mview_fragment.into_iter().next().unwrap();
2020 let migrated =
2021 FragmentTypeMask::from(mview_fragment).contains(FragmentTypeFlag::UpstreamSinkUnion);
2022
2023 Ok(migrated)
2024}
2025
2026pub fn build_select_node_list(
2027 from: &[ColumnCatalog],
2028 to: &[ColumnCatalog],
2029) -> MetaResult<Vec<PbExprNode>> {
2030 let mut exprs = Vec::with_capacity(to.len());
2031 let idx_by_col_id = from
2032 .iter()
2033 .enumerate()
2034 .map(|(idx, col)| (col.column_desc.as_ref().unwrap().column_id, idx))
2035 .collect::<HashMap<_, _>>();
2036
2037 for to_col in to {
2038 let to_col = to_col.column_desc.as_ref().unwrap();
2039 let to_col_type = to_col.column_type.clone();
2040 if let Some(from_idx) = idx_by_col_id.get(&to_col.column_id) {
2041 let from_col_type = from[*from_idx]
2042 .column_desc
2043 .as_ref()
2044 .unwrap()
2045 .column_type
2046 .clone();
2047 if to_col_type != from_col_type {
2048 return Err(anyhow!(
2049 "Column type mismatch: {:?} != {:?}",
2050 from_col_type,
2051 to_col_type
2052 )
2053 .into());
2054 }
2055 exprs.push(PbExprNode {
2056 function_type: expr_node::Type::Unspecified.into(),
2057 return_type: to_col_type,
2058 rex_node: Some(expr_node::RexNode::InputRef(*from_idx as _)),
2059 });
2060 } else {
2061 let to_default_node =
2062 if let Some(GeneratedOrDefaultColumn::DefaultColumn(DefaultColumnDesc {
2063 expr,
2064 ..
2065 })) = &to_col.generated_or_default_column
2066 {
2067 expr.clone().unwrap()
2068 } else {
2069 let null = Datum::None.to_protobuf();
2070 PbExprNode {
2071 function_type: expr_node::Type::Unspecified.into(),
2072 return_type: to_col_type,
2073 rex_node: Some(expr_node::RexNode::Constant(null)),
2074 }
2075 };
2076 exprs.push(to_default_node);
2077 }
2078 }
2079
2080 Ok(exprs)
2081}
2082
2083#[derive(Clone, Debug, Default)]
2084pub struct StreamingJobExtraInfo {
2085 pub timezone: Option<String>,
2086 pub job_definition: String,
2087}
2088
2089pub async fn get_streaming_job_extra_info<C>(
2090 txn: &C,
2091 job_ids: Vec<JobId>,
2092) -> MetaResult<HashMap<JobId, StreamingJobExtraInfo>>
2093where
2094 C: ConnectionTrait,
2095{
2096 let timezone_pairs: Vec<(JobId, Option<String>)> = StreamingJob::find()
2097 .select_only()
2098 .columns([
2099 streaming_job::Column::JobId,
2100 streaming_job::Column::Timezone,
2101 ])
2102 .filter(streaming_job::Column::JobId.is_in(job_ids.clone()))
2103 .into_tuple()
2104 .all(txn)
2105 .await?;
2106
2107 let job_ids = job_ids.into_iter().collect();
2108
2109 let mut definitions = resolve_streaming_job_definition(txn, &job_ids).await?;
2110
2111 let result = timezone_pairs
2112 .into_iter()
2113 .map(|(job_id, timezone)| {
2114 let job_definition = definitions.remove(&job_id).unwrap_or_default();
2115 (
2116 job_id,
2117 StreamingJobExtraInfo {
2118 timezone,
2119 job_definition,
2120 },
2121 )
2122 })
2123 .collect();
2124
2125 Ok(result)
2126}
2127
2128#[cfg(test)]
2129mod tests {
2130 use super::*;
2131
2132 #[test]
2133 fn test_extract_cdc_table_name() {
2134 let ddl1 = "CREATE TABLE t1 () FROM pg_source TABLE 'public.t1'";
2135 let ddl2 = "CREATE TABLE t2 (v1 int) FROM pg_source TABLE 'mydb.t2'";
2136 assert_eq!(
2137 extract_external_table_name_from_definition(ddl1),
2138 Some("public.t1".into())
2139 );
2140 assert_eq!(
2141 extract_external_table_name_from_definition(ddl2),
2142 Some("mydb.t2".into())
2143 );
2144 }
2145}