risingwave_meta/controller/
utils.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeSet, HashMap, HashSet};
16
17use anyhow::{Context, anyhow};
18use itertools::Itertools;
19use risingwave_common::bitmap::Bitmap;
20use risingwave_common::catalog::{FragmentTypeFlag, FragmentTypeMask};
21use risingwave_common::hash::{ActorMapping, VnodeBitmapExt, WorkerSlotId, WorkerSlotMapping};
22use risingwave_common::id::JobId;
23use risingwave_common::types::Datum;
24use risingwave_common::util::value_encoding::DatumToProtoExt;
25use risingwave_common::util::worker_util::DEFAULT_RESOURCE_GROUP;
26use risingwave_common::{bail, hash};
27use risingwave_meta_model::fragment::DistributionType;
28use risingwave_meta_model::object::ObjectType;
29use risingwave_meta_model::prelude::*;
30use risingwave_meta_model::table::TableType;
31use risingwave_meta_model::user_privilege::Action;
32use risingwave_meta_model::{
33    ActorId, ColumnCatalogArray, DataTypeArray, DatabaseId, DispatcherType, FragmentId, JobStatus,
34    ObjectId, PrivilegeId, SchemaId, SinkId, SourceId, StreamNode, StreamSourceInfo, TableId,
35    TableIdArray, UserId, WorkerId, connection, database, fragment, fragment_relation, function,
36    index, object, object_dependency, schema, secret, sink, source, streaming_job, subscription,
37    table, user, user_default_privilege, user_privilege, view,
38};
39use risingwave_meta_model_migration::WithQuery;
40use risingwave_pb::catalog::{
41    PbConnection, PbDatabase, PbFunction, PbIndex, PbSchema, PbSecret, PbSink, PbSource,
42    PbSubscription, PbTable, PbView,
43};
44use risingwave_pb::common::WorkerNode;
45use risingwave_pb::expr::{PbExprNode, expr_node};
46use risingwave_pb::meta::object::PbObjectInfo;
47use risingwave_pb::meta::subscribe_response::Info as NotificationInfo;
48use risingwave_pb::meta::{PbFragmentWorkerSlotMapping, PbObject, PbObjectGroup};
49use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn;
50use risingwave_pb::plan_common::{ColumnCatalog, DefaultColumnDesc};
51use risingwave_pb::stream_plan::{PbDispatchOutputMapping, PbDispatcher, PbDispatcherType};
52use risingwave_pb::user::grant_privilege::{PbActionWithGrantOption, PbObject as PbGrantObject};
53use risingwave_pb::user::{PbAction, PbGrantPrivilege, PbUserInfo};
54use risingwave_sqlparser::ast::Statement as SqlStatement;
55use risingwave_sqlparser::parser::Parser;
56use sea_orm::sea_query::{
57    Alias, CommonTableExpression, Expr, Query, QueryStatementBuilder, SelectStatement, UnionType,
58    WithClause,
59};
60use sea_orm::{
61    ColumnTrait, ConnectionTrait, DatabaseTransaction, DerivePartialModel, EntityTrait,
62    FromQueryResult, IntoActiveModel, JoinType, Order, PaginatorTrait, QueryFilter, QuerySelect,
63    RelationTrait, Set, Statement,
64};
65use thiserror_ext::AsReport;
66
67use crate::barrier::{SharedActorInfos, SharedFragmentInfo};
68use crate::controller::ObjectModel;
69use crate::controller::fragment::FragmentTypeMaskExt;
70use crate::controller::scale::resolve_streaming_job_definition;
71use crate::model::FragmentDownstreamRelation;
72use crate::{MetaError, MetaResult};
73
74/// This function will construct a query using recursive cte to find all objects[(id, `obj_type`)] that are used by the given object.
75///
76/// # Examples
77///
78/// ```
79/// use risingwave_meta::controller::utils::construct_obj_dependency_query;
80/// use sea_orm::sea_query::*;
81/// use sea_orm::*;
82///
83/// let query = construct_obj_dependency_query(1);
84///
85/// assert_eq!(
86///     query.to_string(MysqlQueryBuilder),
87///     r#"WITH RECURSIVE `used_by_object_ids` (`used_by`) AS (SELECT `used_by` FROM `object_dependency` WHERE `object_dependency`.`oid` = 1 UNION ALL (SELECT `oid` FROM `object` WHERE `object`.`database_id` = 1 OR `object`.`schema_id` = 1) UNION ALL (SELECT `object_dependency`.`used_by` FROM `object_dependency` INNER JOIN `used_by_object_ids` ON `used_by_object_ids`.`used_by` = `oid`)) SELECT DISTINCT `oid`, `obj_type`, `schema_id`, `database_id` FROM `used_by_object_ids` INNER JOIN `object` ON `used_by_object_ids`.`used_by` = `oid` ORDER BY `oid` DESC"#
88/// );
89/// assert_eq!(
90///     query.to_string(PostgresQueryBuilder),
91///     r#"WITH RECURSIVE "used_by_object_ids" ("used_by") AS (SELECT "used_by" FROM "object_dependency" WHERE "object_dependency"."oid" = 1 UNION ALL (SELECT "oid" FROM "object" WHERE "object"."database_id" = 1 OR "object"."schema_id" = 1) UNION ALL (SELECT "object_dependency"."used_by" FROM "object_dependency" INNER JOIN "used_by_object_ids" ON "used_by_object_ids"."used_by" = "oid")) SELECT DISTINCT "oid", "obj_type", "schema_id", "database_id" FROM "used_by_object_ids" INNER JOIN "object" ON "used_by_object_ids"."used_by" = "oid" ORDER BY "oid" DESC"#
92/// );
93/// assert_eq!(
94///     query.to_string(SqliteQueryBuilder),
95///     r#"WITH RECURSIVE "used_by_object_ids" ("used_by") AS (SELECT "used_by" FROM "object_dependency" WHERE "object_dependency"."oid" = 1 UNION ALL SELECT "oid" FROM "object" WHERE "object"."database_id" = 1 OR "object"."schema_id" = 1 UNION ALL SELECT "object_dependency"."used_by" FROM "object_dependency" INNER JOIN "used_by_object_ids" ON "used_by_object_ids"."used_by" = "oid") SELECT DISTINCT "oid", "obj_type", "schema_id", "database_id" FROM "used_by_object_ids" INNER JOIN "object" ON "used_by_object_ids"."used_by" = "oid" ORDER BY "oid" DESC"#
96/// );
97/// ```
98pub fn construct_obj_dependency_query(obj_id: ObjectId) -> WithQuery {
99    let cte_alias = Alias::new("used_by_object_ids");
100    let cte_return_alias = Alias::new("used_by");
101
102    let mut base_query = SelectStatement::new()
103        .column(object_dependency::Column::UsedBy)
104        .from(ObjectDependency)
105        .and_where(object_dependency::Column::Oid.eq(obj_id))
106        .to_owned();
107
108    let belonged_obj_query = SelectStatement::new()
109        .column(object::Column::Oid)
110        .from(Object)
111        .and_where(
112            object::Column::DatabaseId
113                .eq(obj_id)
114                .or(object::Column::SchemaId.eq(obj_id)),
115        )
116        .to_owned();
117
118    let cte_referencing = Query::select()
119        .column((ObjectDependency, object_dependency::Column::UsedBy))
120        .from(ObjectDependency)
121        .inner_join(
122            cte_alias.clone(),
123            Expr::col((cte_alias.clone(), cte_return_alias.clone()))
124                .equals(object_dependency::Column::Oid),
125        )
126        .to_owned();
127
128    let mut common_table_expr = CommonTableExpression::new();
129    common_table_expr
130        .query(
131            base_query
132                .union(UnionType::All, belonged_obj_query)
133                .union(UnionType::All, cte_referencing)
134                .to_owned(),
135        )
136        .column(cte_return_alias.clone())
137        .table_name(cte_alias.clone());
138
139    SelectStatement::new()
140        .distinct()
141        .columns([
142            object::Column::Oid,
143            object::Column::ObjType,
144            object::Column::SchemaId,
145            object::Column::DatabaseId,
146        ])
147        .from(cte_alias.clone())
148        .inner_join(
149            Object,
150            Expr::col((cte_alias, cte_return_alias)).equals(object::Column::Oid),
151        )
152        .order_by(object::Column::Oid, Order::Desc)
153        .to_owned()
154        .with(
155            WithClause::new()
156                .recursive(true)
157                .cte(common_table_expr)
158                .to_owned(),
159        )
160}
161
162/// This function will construct a query using recursive cte to find if dependent objects are already relying on the target table.
163///
164/// # Examples
165///
166/// ```
167/// use risingwave_meta::controller::utils::construct_sink_cycle_check_query;
168/// use sea_orm::sea_query::*;
169/// use sea_orm::*;
170///
171/// let query = construct_sink_cycle_check_query(1, vec![2, 3]);
172///
173/// assert_eq!(
174///     query.to_string(MysqlQueryBuilder),
175///     r#"WITH RECURSIVE `used_by_object_ids_with_sink` (`oid`, `used_by`) AS (SELECT `oid`, `used_by` FROM `object_dependency` WHERE `object_dependency`.`oid` = 1 UNION ALL (SELECT `obj_dependency_with_sink`.`oid`, `obj_dependency_with_sink`.`used_by` FROM (SELECT `oid`, `used_by` FROM `object_dependency` UNION ALL (SELECT `sink_id`, `target_table` FROM `sink` WHERE `sink`.`target_table` IS NOT NULL)) AS `obj_dependency_with_sink` INNER JOIN `used_by_object_ids_with_sink` ON `used_by_object_ids_with_sink`.`used_by` = `obj_dependency_with_sink`.`oid` WHERE `used_by_object_ids_with_sink`.`used_by` <> `used_by_object_ids_with_sink`.`oid`)) SELECT COUNT(`used_by_object_ids_with_sink`.`used_by`) FROM `used_by_object_ids_with_sink` WHERE `used_by_object_ids_with_sink`.`used_by` IN (2, 3)"#
176/// );
177/// assert_eq!(
178///     query.to_string(PostgresQueryBuilder),
179///     r#"WITH RECURSIVE "used_by_object_ids_with_sink" ("oid", "used_by") AS (SELECT "oid", "used_by" FROM "object_dependency" WHERE "object_dependency"."oid" = 1 UNION ALL (SELECT "obj_dependency_with_sink"."oid", "obj_dependency_with_sink"."used_by" FROM (SELECT "oid", "used_by" FROM "object_dependency" UNION ALL (SELECT "sink_id", "target_table" FROM "sink" WHERE "sink"."target_table" IS NOT NULL)) AS "obj_dependency_with_sink" INNER JOIN "used_by_object_ids_with_sink" ON "used_by_object_ids_with_sink"."used_by" = "obj_dependency_with_sink"."oid" WHERE "used_by_object_ids_with_sink"."used_by" <> "used_by_object_ids_with_sink"."oid")) SELECT COUNT("used_by_object_ids_with_sink"."used_by") FROM "used_by_object_ids_with_sink" WHERE "used_by_object_ids_with_sink"."used_by" IN (2, 3)"#
180/// );
181/// assert_eq!(
182///     query.to_string(SqliteQueryBuilder),
183///     r#"WITH RECURSIVE "used_by_object_ids_with_sink" ("oid", "used_by") AS (SELECT "oid", "used_by" FROM "object_dependency" WHERE "object_dependency"."oid" = 1 UNION ALL SELECT "obj_dependency_with_sink"."oid", "obj_dependency_with_sink"."used_by" FROM (SELECT "oid", "used_by" FROM "object_dependency" UNION ALL SELECT "sink_id", "target_table" FROM "sink" WHERE "sink"."target_table" IS NOT NULL) AS "obj_dependency_with_sink" INNER JOIN "used_by_object_ids_with_sink" ON "used_by_object_ids_with_sink"."used_by" = "obj_dependency_with_sink"."oid" WHERE "used_by_object_ids_with_sink"."used_by" <> "used_by_object_ids_with_sink"."oid") SELECT COUNT("used_by_object_ids_with_sink"."used_by") FROM "used_by_object_ids_with_sink" WHERE "used_by_object_ids_with_sink"."used_by" IN (2, 3)"#
184/// );
185/// ```
186pub fn construct_sink_cycle_check_query(
187    target_table: ObjectId,
188    dependent_objects: Vec<ObjectId>,
189) -> WithQuery {
190    let cte_alias = Alias::new("used_by_object_ids_with_sink");
191    let depend_alias = Alias::new("obj_dependency_with_sink");
192
193    let mut base_query = SelectStatement::new()
194        .columns([
195            object_dependency::Column::Oid,
196            object_dependency::Column::UsedBy,
197        ])
198        .from(ObjectDependency)
199        .and_where(object_dependency::Column::Oid.eq(target_table))
200        .to_owned();
201
202    let query_sink_deps = SelectStatement::new()
203        .columns([sink::Column::SinkId, sink::Column::TargetTable])
204        .from(Sink)
205        .and_where(sink::Column::TargetTable.is_not_null())
206        .to_owned();
207
208    let cte_referencing = Query::select()
209        .column((depend_alias.clone(), object_dependency::Column::Oid))
210        .column((depend_alias.clone(), object_dependency::Column::UsedBy))
211        .from_subquery(
212            SelectStatement::new()
213                .columns([
214                    object_dependency::Column::Oid,
215                    object_dependency::Column::UsedBy,
216                ])
217                .from(ObjectDependency)
218                .union(UnionType::All, query_sink_deps)
219                .to_owned(),
220            depend_alias.clone(),
221        )
222        .inner_join(
223            cte_alias.clone(),
224            Expr::col((cte_alias.clone(), object_dependency::Column::UsedBy))
225                .eq(Expr::col((depend_alias, object_dependency::Column::Oid))),
226        )
227        .and_where(
228            Expr::col((cte_alias.clone(), object_dependency::Column::UsedBy)).ne(Expr::col((
229                cte_alias.clone(),
230                object_dependency::Column::Oid,
231            ))),
232        )
233        .to_owned();
234
235    let mut common_table_expr = CommonTableExpression::new();
236    common_table_expr
237        .query(base_query.union(UnionType::All, cte_referencing).to_owned())
238        .columns([
239            object_dependency::Column::Oid,
240            object_dependency::Column::UsedBy,
241        ])
242        .table_name(cte_alias.clone());
243
244    SelectStatement::new()
245        .expr(Expr::col((cte_alias.clone(), object_dependency::Column::UsedBy)).count())
246        .from(cte_alias.clone())
247        .and_where(
248            Expr::col((cte_alias, object_dependency::Column::UsedBy)).is_in(dependent_objects),
249        )
250        .to_owned()
251        .with(
252            WithClause::new()
253                .recursive(true)
254                .cte(common_table_expr)
255                .to_owned(),
256        )
257}
258
259#[derive(Clone, DerivePartialModel, FromQueryResult, Debug)]
260#[sea_orm(entity = "Object")]
261pub struct PartialObject {
262    pub oid: ObjectId,
263    pub obj_type: ObjectType,
264    pub schema_id: Option<SchemaId>,
265    pub database_id: Option<DatabaseId>,
266}
267
268#[derive(Clone, DerivePartialModel, FromQueryResult)]
269#[sea_orm(entity = "Fragment")]
270pub struct PartialFragmentStateTables {
271    pub fragment_id: FragmentId,
272    pub job_id: ObjectId,
273    pub state_table_ids: TableIdArray,
274}
275
276#[derive(Clone, Eq, PartialEq, Debug)]
277pub struct PartialActorLocation {
278    pub actor_id: ActorId,
279    pub fragment_id: FragmentId,
280    pub worker_id: WorkerId,
281}
282
283#[derive(FromQueryResult, Debug, Eq, PartialEq, Clone)]
284pub struct FragmentDesc {
285    pub fragment_id: FragmentId,
286    pub job_id: JobId,
287    pub fragment_type_mask: i32,
288    pub distribution_type: DistributionType,
289    pub state_table_ids: TableIdArray,
290    pub parallelism: i64,
291    pub vnode_count: i32,
292    pub stream_node: StreamNode,
293    pub parallelism_policy: String,
294}
295
296/// List all objects that are using the given one in a cascade way. It runs a recursive CTE to find all the dependencies.
297pub async fn get_referring_objects_cascade<C>(
298    obj_id: ObjectId,
299    db: &C,
300) -> MetaResult<Vec<PartialObject>>
301where
302    C: ConnectionTrait,
303{
304    let query = construct_obj_dependency_query(obj_id);
305    let (sql, values) = query.build_any(&*db.get_database_backend().get_query_builder());
306    let objects = PartialObject::find_by_statement(Statement::from_sql_and_values(
307        db.get_database_backend(),
308        sql,
309        values,
310    ))
311    .all(db)
312    .await?;
313    Ok(objects)
314}
315
316/// Check if create a sink with given dependent objects into the target table will cause a cycle, return true if it will.
317pub async fn check_sink_into_table_cycle<C>(
318    target_table: ObjectId,
319    dependent_objs: Vec<ObjectId>,
320    db: &C,
321) -> MetaResult<bool>
322where
323    C: ConnectionTrait,
324{
325    if dependent_objs.is_empty() {
326        return Ok(false);
327    }
328
329    // special check for self referencing
330    if dependent_objs.contains(&target_table) {
331        return Ok(true);
332    }
333
334    let query = construct_sink_cycle_check_query(target_table, dependent_objs);
335    let (sql, values) = query.build_any(&*db.get_database_backend().get_query_builder());
336
337    let res = db
338        .query_one(Statement::from_sql_and_values(
339            db.get_database_backend(),
340            sql,
341            values,
342        ))
343        .await?
344        .unwrap();
345
346    let cnt: i64 = res.try_get_by(0)?;
347
348    Ok(cnt != 0)
349}
350
351/// `ensure_object_id` ensures the existence of target object in the cluster.
352pub async fn ensure_object_id<C>(
353    object_type: ObjectType,
354    obj_id: ObjectId,
355    db: &C,
356) -> MetaResult<()>
357where
358    C: ConnectionTrait,
359{
360    let count = Object::find_by_id(obj_id).count(db).await?;
361    if count == 0 {
362        return Err(MetaError::catalog_id_not_found(
363            object_type.as_str(),
364            obj_id,
365        ));
366    }
367    Ok(())
368}
369
370pub async fn ensure_job_not_canceled<C>(job_id: JobId, db: &C) -> MetaResult<()>
371where
372    C: ConnectionTrait,
373{
374    let count = Object::find_by_id(job_id.as_raw_id() as ObjectId)
375        .count(db)
376        .await?;
377    if count == 0 {
378        return Err(MetaError::cancelled(format!(
379            "job {} might be cancelled manually or by recovery",
380            job_id
381        )));
382    }
383    Ok(())
384}
385
386/// `ensure_user_id` ensures the existence of target user in the cluster.
387pub async fn ensure_user_id<C>(user_id: UserId, db: &C) -> MetaResult<()>
388where
389    C: ConnectionTrait,
390{
391    let count = User::find_by_id(user_id).count(db).await?;
392    if count == 0 {
393        return Err(anyhow!("user {} was concurrently dropped", user_id).into());
394    }
395    Ok(())
396}
397
398/// `check_database_name_duplicate` checks whether the database name is already used in the cluster.
399pub async fn check_database_name_duplicate<C>(name: &str, db: &C) -> MetaResult<()>
400where
401    C: ConnectionTrait,
402{
403    let count = Database::find()
404        .filter(database::Column::Name.eq(name))
405        .count(db)
406        .await?;
407    if count > 0 {
408        assert_eq!(count, 1);
409        return Err(MetaError::catalog_duplicated("database", name));
410    }
411    Ok(())
412}
413
414/// `check_function_signature_duplicate` checks whether the function name and its signature is already used in the target namespace.
415pub async fn check_function_signature_duplicate<C>(
416    pb_function: &PbFunction,
417    db: &C,
418) -> MetaResult<()>
419where
420    C: ConnectionTrait,
421{
422    let count = Function::find()
423        .inner_join(Object)
424        .filter(
425            object::Column::DatabaseId
426                .eq(pb_function.database_id)
427                .and(object::Column::SchemaId.eq(pb_function.schema_id))
428                .and(function::Column::Name.eq(&pb_function.name))
429                .and(
430                    function::Column::ArgTypes
431                        .eq(DataTypeArray::from(pb_function.arg_types.clone())),
432                ),
433        )
434        .count(db)
435        .await?;
436    if count > 0 {
437        assert_eq!(count, 1);
438        return Err(MetaError::catalog_duplicated("function", &pb_function.name));
439    }
440    Ok(())
441}
442
443/// `check_connection_name_duplicate` checks whether the connection name is already used in the target namespace.
444pub async fn check_connection_name_duplicate<C>(
445    pb_connection: &PbConnection,
446    db: &C,
447) -> MetaResult<()>
448where
449    C: ConnectionTrait,
450{
451    let count = Connection::find()
452        .inner_join(Object)
453        .filter(
454            object::Column::DatabaseId
455                .eq(pb_connection.database_id)
456                .and(object::Column::SchemaId.eq(pb_connection.schema_id))
457                .and(connection::Column::Name.eq(&pb_connection.name)),
458        )
459        .count(db)
460        .await?;
461    if count > 0 {
462        assert_eq!(count, 1);
463        return Err(MetaError::catalog_duplicated(
464            "connection",
465            &pb_connection.name,
466        ));
467    }
468    Ok(())
469}
470
471pub async fn check_secret_name_duplicate<C>(pb_secret: &PbSecret, db: &C) -> MetaResult<()>
472where
473    C: ConnectionTrait,
474{
475    let count = Secret::find()
476        .inner_join(Object)
477        .filter(
478            object::Column::DatabaseId
479                .eq(pb_secret.database_id)
480                .and(object::Column::SchemaId.eq(pb_secret.schema_id))
481                .and(secret::Column::Name.eq(&pb_secret.name)),
482        )
483        .count(db)
484        .await?;
485    if count > 0 {
486        assert_eq!(count, 1);
487        return Err(MetaError::catalog_duplicated("secret", &pb_secret.name));
488    }
489    Ok(())
490}
491
492pub async fn check_subscription_name_duplicate<C>(
493    pb_subscription: &PbSubscription,
494    db: &C,
495) -> MetaResult<()>
496where
497    C: ConnectionTrait,
498{
499    let count = Subscription::find()
500        .inner_join(Object)
501        .filter(
502            object::Column::DatabaseId
503                .eq(pb_subscription.database_id)
504                .and(object::Column::SchemaId.eq(pb_subscription.schema_id))
505                .and(subscription::Column::Name.eq(&pb_subscription.name)),
506        )
507        .count(db)
508        .await?;
509    if count > 0 {
510        assert_eq!(count, 1);
511        return Err(MetaError::catalog_duplicated(
512            "subscription",
513            &pb_subscription.name,
514        ));
515    }
516    Ok(())
517}
518
519/// `check_user_name_duplicate` checks whether the user is already existed in the cluster.
520pub async fn check_user_name_duplicate<C>(name: &str, db: &C) -> MetaResult<()>
521where
522    C: ConnectionTrait,
523{
524    let count = User::find()
525        .filter(user::Column::Name.eq(name))
526        .count(db)
527        .await?;
528    if count > 0 {
529        assert_eq!(count, 1);
530        return Err(MetaError::catalog_duplicated("user", name));
531    }
532    Ok(())
533}
534
535/// `check_relation_name_duplicate` checks whether the relation name is already used in the target namespace.
536pub async fn check_relation_name_duplicate<C>(
537    name: &str,
538    database_id: DatabaseId,
539    schema_id: SchemaId,
540    db: &C,
541) -> MetaResult<()>
542where
543    C: ConnectionTrait,
544{
545    macro_rules! check_duplicated {
546        ($obj_type:expr, $entity:ident, $table:ident) => {
547            let object_id = Object::find()
548                .select_only()
549                .column(object::Column::Oid)
550                .inner_join($entity)
551                .filter(
552                    object::Column::DatabaseId
553                        .eq(Some(database_id))
554                        .and(object::Column::SchemaId.eq(Some(schema_id)))
555                        .and($table::Column::Name.eq(name)),
556                )
557                .into_tuple::<ObjectId>()
558                .one(db)
559                .await?;
560            if let Some(oid) = object_id {
561                let check_creation = if $obj_type == ObjectType::View {
562                    false
563                } else if $obj_type == ObjectType::Source {
564                    let source_info = Source::find_by_id(oid)
565                        .select_only()
566                        .column(source::Column::SourceInfo)
567                        .into_tuple::<Option<StreamSourceInfo>>()
568                        .one(db)
569                        .await?
570                        .unwrap();
571                    source_info.map_or(false, |info| info.to_protobuf().is_shared())
572                } else {
573                    true
574                };
575                let job_id = JobId::new(oid as u32);
576                return if check_creation
577                    && !matches!(
578                        StreamingJob::find_by_id(job_id)
579                            .select_only()
580                            .column(streaming_job::Column::JobStatus)
581                            .into_tuple::<JobStatus>()
582                            .one(db)
583                            .await?,
584                        Some(JobStatus::Created)
585                    ) {
586                    Err(MetaError::catalog_under_creation(
587                        $obj_type.as_str(),
588                        name,
589                        job_id,
590                    ))
591                } else {
592                    Err(MetaError::catalog_duplicated($obj_type.as_str(), name))
593                };
594            }
595        };
596    }
597    check_duplicated!(ObjectType::Table, Table, table);
598    check_duplicated!(ObjectType::Source, Source, source);
599    check_duplicated!(ObjectType::Sink, Sink, sink);
600    check_duplicated!(ObjectType::Index, Index, index);
601    check_duplicated!(ObjectType::View, View, view);
602
603    Ok(())
604}
605
606/// `check_schema_name_duplicate` checks whether the schema name is already used in the target database.
607pub async fn check_schema_name_duplicate<C>(
608    name: &str,
609    database_id: DatabaseId,
610    db: &C,
611) -> MetaResult<()>
612where
613    C: ConnectionTrait,
614{
615    let count = Object::find()
616        .inner_join(Schema)
617        .filter(
618            object::Column::ObjType
619                .eq(ObjectType::Schema)
620                .and(object::Column::DatabaseId.eq(Some(database_id)))
621                .and(schema::Column::Name.eq(name)),
622        )
623        .count(db)
624        .await?;
625    if count != 0 {
626        return Err(MetaError::catalog_duplicated("schema", name));
627    }
628
629    Ok(())
630}
631
632/// `check_object_refer_for_drop` checks whether the object is used by other objects except indexes.
633/// It returns an error that contains the details of the referring objects if it is used by others.
634pub async fn check_object_refer_for_drop<C>(
635    object_type: ObjectType,
636    object_id: ObjectId,
637    db: &C,
638) -> MetaResult<()>
639where
640    C: ConnectionTrait,
641{
642    // Ignore indexes.
643    let count = if object_type == ObjectType::Table {
644        ObjectDependency::find()
645            .join(
646                JoinType::InnerJoin,
647                object_dependency::Relation::Object1.def(),
648            )
649            .filter(
650                object_dependency::Column::Oid
651                    .eq(object_id)
652                    .and(object::Column::ObjType.ne(ObjectType::Index)),
653            )
654            .count(db)
655            .await?
656    } else {
657        ObjectDependency::find()
658            .filter(object_dependency::Column::Oid.eq(object_id))
659            .count(db)
660            .await?
661    };
662    if count != 0 {
663        // find the name of all objects that are using the given one.
664        let referring_objects = get_referring_objects(object_id, db).await?;
665        let referring_objs_map = referring_objects
666            .into_iter()
667            .filter(|o| o.obj_type != ObjectType::Index)
668            .into_group_map_by(|o| o.obj_type);
669        let mut details = vec![];
670        for (obj_type, objs) in referring_objs_map {
671            match obj_type {
672                ObjectType::Table => {
673                    let tables: Vec<(String, String)> = Object::find()
674                        .join(JoinType::InnerJoin, object::Relation::Table.def())
675                        .join(JoinType::InnerJoin, object::Relation::Database2.def())
676                        .join(JoinType::InnerJoin, object::Relation::Schema2.def())
677                        .select_only()
678                        .column(schema::Column::Name)
679                        .column(table::Column::Name)
680                        .filter(object::Column::Oid.is_in(objs.iter().map(|o| o.oid)))
681                        .into_tuple()
682                        .all(db)
683                        .await?;
684                    details.extend(tables.into_iter().map(|(schema_name, table_name)| {
685                        format!(
686                            "materialized view {}.{} depends on it",
687                            schema_name, table_name
688                        )
689                    }));
690                }
691                ObjectType::Sink => {
692                    let sinks: Vec<(String, String)> = Object::find()
693                        .join(JoinType::InnerJoin, object::Relation::Sink.def())
694                        .join(JoinType::InnerJoin, object::Relation::Database2.def())
695                        .join(JoinType::InnerJoin, object::Relation::Schema2.def())
696                        .select_only()
697                        .column(schema::Column::Name)
698                        .column(sink::Column::Name)
699                        .filter(object::Column::Oid.is_in(objs.iter().map(|o| o.oid)))
700                        .into_tuple()
701                        .all(db)
702                        .await?;
703                    details.extend(sinks.into_iter().map(|(schema_name, sink_name)| {
704                        format!("sink {}.{} depends on it", schema_name, sink_name)
705                    }));
706                }
707                ObjectType::View => {
708                    let views: Vec<(String, String)> = Object::find()
709                        .join(JoinType::InnerJoin, object::Relation::View.def())
710                        .join(JoinType::InnerJoin, object::Relation::Database2.def())
711                        .join(JoinType::InnerJoin, object::Relation::Schema2.def())
712                        .select_only()
713                        .column(schema::Column::Name)
714                        .column(view::Column::Name)
715                        .filter(object::Column::Oid.is_in(objs.iter().map(|o| o.oid)))
716                        .into_tuple()
717                        .all(db)
718                        .await?;
719                    details.extend(views.into_iter().map(|(schema_name, view_name)| {
720                        format!("view {}.{} depends on it", schema_name, view_name)
721                    }));
722                }
723                ObjectType::Subscription => {
724                    let subscriptions: Vec<(String, String)> = Object::find()
725                        .join(JoinType::InnerJoin, object::Relation::Subscription.def())
726                        .join(JoinType::InnerJoin, object::Relation::Database2.def())
727                        .join(JoinType::InnerJoin, object::Relation::Schema2.def())
728                        .select_only()
729                        .column(schema::Column::Name)
730                        .column(subscription::Column::Name)
731                        .filter(object::Column::Oid.is_in(objs.iter().map(|o| o.oid)))
732                        .into_tuple()
733                        .all(db)
734                        .await?;
735                    details.extend(subscriptions.into_iter().map(
736                        |(schema_name, subscription_name)| {
737                            format!(
738                                "subscription {}.{} depends on it",
739                                schema_name, subscription_name
740                            )
741                        },
742                    ));
743                }
744                ObjectType::Source => {
745                    let sources: Vec<(String, String)> = Object::find()
746                        .join(JoinType::InnerJoin, object::Relation::Source.def())
747                        .join(JoinType::InnerJoin, object::Relation::Database2.def())
748                        .join(JoinType::InnerJoin, object::Relation::Schema2.def())
749                        .select_only()
750                        .column(schema::Column::Name)
751                        .column(source::Column::Name)
752                        .filter(object::Column::Oid.is_in(objs.iter().map(|o| o.oid)))
753                        .into_tuple()
754                        .all(db)
755                        .await?;
756                    details.extend(sources.into_iter().map(|(schema_name, view_name)| {
757                        format!("source {}.{} depends on it", schema_name, view_name)
758                    }));
759                }
760                ObjectType::Connection => {
761                    let connections: Vec<(String, String)> = Object::find()
762                        .join(JoinType::InnerJoin, object::Relation::Connection.def())
763                        .join(JoinType::InnerJoin, object::Relation::Database2.def())
764                        .join(JoinType::InnerJoin, object::Relation::Schema2.def())
765                        .select_only()
766                        .column(schema::Column::Name)
767                        .column(connection::Column::Name)
768                        .filter(object::Column::Oid.is_in(objs.iter().map(|o| o.oid)))
769                        .into_tuple()
770                        .all(db)
771                        .await?;
772                    details.extend(connections.into_iter().map(|(schema_name, view_name)| {
773                        format!("connection {}.{} depends on it", schema_name, view_name)
774                    }));
775                }
776                // only the table, source, sink, subscription, view, connection and index will depend on other objects.
777                _ => bail!("unexpected referring object type: {}", obj_type.as_str()),
778            }
779        }
780
781        return Err(MetaError::permission_denied(format!(
782            "{} used by {} other objects. \nDETAIL: {}\n\
783            {}",
784            object_type.as_str(),
785            count,
786            details.join("\n"),
787            match object_type {
788                ObjectType::Function | ObjectType::Connection | ObjectType::Secret =>
789                    "HINT: DROP the dependent objects first.",
790                ObjectType::Database | ObjectType::Schema => unreachable!(),
791                _ => "HINT:  Use DROP ... CASCADE to drop the dependent objects too.",
792            }
793        )));
794    }
795    Ok(())
796}
797
798/// List all objects that are using the given one.
799pub async fn get_referring_objects<C>(object_id: ObjectId, db: &C) -> MetaResult<Vec<PartialObject>>
800where
801    C: ConnectionTrait,
802{
803    let objs = ObjectDependency::find()
804        .filter(object_dependency::Column::Oid.eq(object_id))
805        .join(
806            JoinType::InnerJoin,
807            object_dependency::Relation::Object1.def(),
808        )
809        .into_partial_model()
810        .all(db)
811        .await?;
812
813    Ok(objs)
814}
815
816/// `ensure_schema_empty` ensures that the schema is empty, used by `DROP SCHEMA`.
817pub async fn ensure_schema_empty<C>(schema_id: SchemaId, db: &C) -> MetaResult<()>
818where
819    C: ConnectionTrait,
820{
821    let count = Object::find()
822        .filter(object::Column::SchemaId.eq(Some(schema_id)))
823        .count(db)
824        .await?;
825    if count != 0 {
826        return Err(MetaError::permission_denied("schema is not empty"));
827    }
828
829    Ok(())
830}
831
832/// `list_user_info_by_ids` lists all users' info by their ids.
833pub async fn list_user_info_by_ids<C>(
834    user_ids: impl IntoIterator<Item = UserId>,
835    db: &C,
836) -> MetaResult<Vec<PbUserInfo>>
837where
838    C: ConnectionTrait,
839{
840    let mut user_infos = vec![];
841    for user_id in user_ids {
842        let user = User::find_by_id(user_id)
843            .one(db)
844            .await?
845            .ok_or_else(|| MetaError::catalog_id_not_found("user", user_id))?;
846        let mut user_info: PbUserInfo = user.into();
847        user_info.grant_privileges = get_user_privilege(user_id, db).await?;
848        user_infos.push(user_info);
849    }
850    Ok(user_infos)
851}
852
853/// `get_object_owner` returns the owner of the given object.
854pub async fn get_object_owner<C>(object_id: ObjectId, db: &C) -> MetaResult<UserId>
855where
856    C: ConnectionTrait,
857{
858    let obj_owner: UserId = Object::find_by_id(object_id)
859        .select_only()
860        .column(object::Column::OwnerId)
861        .into_tuple()
862        .one(db)
863        .await?
864        .ok_or_else(|| MetaError::catalog_id_not_found("object", object_id))?;
865    Ok(obj_owner)
866}
867
868/// `construct_privilege_dependency_query` constructs a query to find all privileges that are dependent on the given one.
869///
870/// # Examples
871///
872/// ```
873/// use risingwave_meta::controller::utils::construct_privilege_dependency_query;
874/// use sea_orm::sea_query::*;
875/// use sea_orm::*;
876///
877/// let query = construct_privilege_dependency_query(vec![1, 2, 3]);
878///
879/// assert_eq!(
880///    query.to_string(MysqlQueryBuilder),
881///   r#"WITH RECURSIVE `granted_privilege_ids` (`id`, `user_id`) AS (SELECT `id`, `user_id` FROM `user_privilege` WHERE `user_privilege`.`id` IN (1, 2, 3) UNION ALL (SELECT `user_privilege`.`id`, `user_privilege`.`user_id` FROM `user_privilege` INNER JOIN `granted_privilege_ids` ON `granted_privilege_ids`.`id` = `dependent_id`)) SELECT `id`, `user_id` FROM `granted_privilege_ids`"#
882/// );
883/// assert_eq!(
884///   query.to_string(PostgresQueryBuilder),
885///  r#"WITH RECURSIVE "granted_privilege_ids" ("id", "user_id") AS (SELECT "id", "user_id" FROM "user_privilege" WHERE "user_privilege"."id" IN (1, 2, 3) UNION ALL (SELECT "user_privilege"."id", "user_privilege"."user_id" FROM "user_privilege" INNER JOIN "granted_privilege_ids" ON "granted_privilege_ids"."id" = "dependent_id")) SELECT "id", "user_id" FROM "granted_privilege_ids""#
886/// );
887/// assert_eq!(
888///  query.to_string(SqliteQueryBuilder),
889///  r#"WITH RECURSIVE "granted_privilege_ids" ("id", "user_id") AS (SELECT "id", "user_id" FROM "user_privilege" WHERE "user_privilege"."id" IN (1, 2, 3) UNION ALL SELECT "user_privilege"."id", "user_privilege"."user_id" FROM "user_privilege" INNER JOIN "granted_privilege_ids" ON "granted_privilege_ids"."id" = "dependent_id") SELECT "id", "user_id" FROM "granted_privilege_ids""#
890/// );
891/// ```
892pub fn construct_privilege_dependency_query(ids: Vec<PrivilegeId>) -> WithQuery {
893    let cte_alias = Alias::new("granted_privilege_ids");
894    let cte_return_privilege_alias = Alias::new("id");
895    let cte_return_user_alias = Alias::new("user_id");
896
897    let mut base_query = SelectStatement::new()
898        .columns([user_privilege::Column::Id, user_privilege::Column::UserId])
899        .from(UserPrivilege)
900        .and_where(user_privilege::Column::Id.is_in(ids))
901        .to_owned();
902
903    let cte_referencing = Query::select()
904        .columns([
905            (UserPrivilege, user_privilege::Column::Id),
906            (UserPrivilege, user_privilege::Column::UserId),
907        ])
908        .from(UserPrivilege)
909        .inner_join(
910            cte_alias.clone(),
911            Expr::col((cte_alias.clone(), cte_return_privilege_alias.clone()))
912                .equals(user_privilege::Column::DependentId),
913        )
914        .to_owned();
915
916    let mut common_table_expr = CommonTableExpression::new();
917    common_table_expr
918        .query(base_query.union(UnionType::All, cte_referencing).to_owned())
919        .columns([
920            cte_return_privilege_alias.clone(),
921            cte_return_user_alias.clone(),
922        ])
923        .table_name(cte_alias.clone());
924
925    SelectStatement::new()
926        .columns([cte_return_privilege_alias, cte_return_user_alias])
927        .from(cte_alias)
928        .to_owned()
929        .with(
930            WithClause::new()
931                .recursive(true)
932                .cte(common_table_expr)
933                .to_owned(),
934        )
935}
936
937pub async fn get_internal_tables_by_id<C>(job_id: JobId, db: &C) -> MetaResult<Vec<TableId>>
938where
939    C: ConnectionTrait,
940{
941    let table_ids: Vec<TableId> = Table::find()
942        .select_only()
943        .column(table::Column::TableId)
944        .filter(
945            table::Column::TableType
946                .eq(TableType::Internal)
947                .and(table::Column::BelongsToJobId.eq(job_id)),
948        )
949        .into_tuple()
950        .all(db)
951        .await?;
952    Ok(table_ids)
953}
954
955pub async fn get_index_state_tables_by_table_id<C>(
956    table_id: TableId,
957    db: &C,
958) -> MetaResult<Vec<TableId>>
959where
960    C: ConnectionTrait,
961{
962    let mut index_table_ids: Vec<TableId> = Index::find()
963        .select_only()
964        .column(index::Column::IndexTableId)
965        .filter(index::Column::PrimaryTableId.eq(table_id))
966        .into_tuple()
967        .all(db)
968        .await?;
969
970    if !index_table_ids.is_empty() {
971        let internal_table_ids: Vec<TableId> = Table::find()
972            .select_only()
973            .column(table::Column::TableId)
974            .filter(
975                table::Column::TableType
976                    .eq(TableType::Internal)
977                    .and(table::Column::BelongsToJobId.is_in(index_table_ids.clone())),
978            )
979            .into_tuple()
980            .all(db)
981            .await?;
982
983        index_table_ids.extend(internal_table_ids.into_iter());
984    }
985
986    Ok(index_table_ids)
987}
988
989#[derive(Clone, DerivePartialModel, FromQueryResult)]
990#[sea_orm(entity = "UserPrivilege")]
991pub struct PartialUserPrivilege {
992    pub id: PrivilegeId,
993    pub user_id: UserId,
994}
995
996pub async fn get_referring_privileges_cascade<C>(
997    ids: Vec<PrivilegeId>,
998    db: &C,
999) -> MetaResult<Vec<PartialUserPrivilege>>
1000where
1001    C: ConnectionTrait,
1002{
1003    let query = construct_privilege_dependency_query(ids);
1004    let (sql, values) = query.build_any(&*db.get_database_backend().get_query_builder());
1005    let privileges = PartialUserPrivilege::find_by_statement(Statement::from_sql_and_values(
1006        db.get_database_backend(),
1007        sql,
1008        values,
1009    ))
1010    .all(db)
1011    .await?;
1012
1013    Ok(privileges)
1014}
1015
1016/// `ensure_privileges_not_referred` ensures that the privileges are not granted to any other users.
1017pub async fn ensure_privileges_not_referred<C>(ids: Vec<PrivilegeId>, db: &C) -> MetaResult<()>
1018where
1019    C: ConnectionTrait,
1020{
1021    let count = UserPrivilege::find()
1022        .filter(user_privilege::Column::DependentId.is_in(ids))
1023        .count(db)
1024        .await?;
1025    if count != 0 {
1026        return Err(MetaError::permission_denied(format!(
1027            "privileges granted to {} other ones.",
1028            count
1029        )));
1030    }
1031    Ok(())
1032}
1033
1034/// `get_user_privilege` returns the privileges of the given user.
1035pub async fn get_user_privilege<C>(user_id: UserId, db: &C) -> MetaResult<Vec<PbGrantPrivilege>>
1036where
1037    C: ConnectionTrait,
1038{
1039    let user_privileges = UserPrivilege::find()
1040        .find_also_related(Object)
1041        .filter(user_privilege::Column::UserId.eq(user_id))
1042        .all(db)
1043        .await?;
1044    Ok(user_privileges
1045        .into_iter()
1046        .map(|(privilege, object)| {
1047            let object = object.unwrap();
1048            let oid = object.oid as _;
1049            let obj = match object.obj_type {
1050                ObjectType::Database => PbGrantObject::DatabaseId(oid),
1051                ObjectType::Schema => PbGrantObject::SchemaId(oid),
1052                ObjectType::Table | ObjectType::Index => PbGrantObject::TableId(oid),
1053                ObjectType::Source => PbGrantObject::SourceId(oid),
1054                ObjectType::Sink => PbGrantObject::SinkId(oid),
1055                ObjectType::View => PbGrantObject::ViewId(oid),
1056                ObjectType::Function => PbGrantObject::FunctionId(oid),
1057                ObjectType::Connection => PbGrantObject::ConnectionId(oid),
1058                ObjectType::Subscription => PbGrantObject::SubscriptionId(oid),
1059                ObjectType::Secret => PbGrantObject::SecretId(oid),
1060            };
1061            PbGrantPrivilege {
1062                action_with_opts: vec![PbActionWithGrantOption {
1063                    action: PbAction::from(privilege.action) as _,
1064                    with_grant_option: privilege.with_grant_option,
1065                    granted_by: privilege.granted_by as _,
1066                }],
1067                object: Some(obj),
1068            }
1069        })
1070        .collect())
1071}
1072
1073pub async fn get_table_columns(
1074    txn: &impl ConnectionTrait,
1075    id: TableId,
1076) -> MetaResult<ColumnCatalogArray> {
1077    let columns = Table::find_by_id(id)
1078        .select_only()
1079        .columns([table::Column::Columns])
1080        .into_tuple::<ColumnCatalogArray>()
1081        .one(txn)
1082        .await?
1083        .ok_or_else(|| MetaError::catalog_id_not_found("table", id))?;
1084    Ok(columns)
1085}
1086
1087/// `grant_default_privileges_automatically` grants default privileges automatically
1088/// for the given new object. It returns the list of user infos whose privileges are updated.
1089pub async fn grant_default_privileges_automatically<C>(
1090    db: &C,
1091    object_id: ObjectId,
1092) -> MetaResult<Vec<PbUserInfo>>
1093where
1094    C: ConnectionTrait,
1095{
1096    let object = Object::find_by_id(object_id)
1097        .one(db)
1098        .await?
1099        .ok_or_else(|| MetaError::catalog_id_not_found("object", object_id))?;
1100    assert_ne!(object.obj_type, ObjectType::Database);
1101
1102    let for_mview_filter = if object.obj_type == ObjectType::Table {
1103        let table_type = Table::find_by_id(TableId::new(object_id as _))
1104            .select_only()
1105            .column(table::Column::TableType)
1106            .into_tuple::<TableType>()
1107            .one(db)
1108            .await?
1109            .ok_or_else(|| MetaError::catalog_id_not_found("table", object_id))?;
1110        user_default_privilege::Column::ForMaterializedView
1111            .eq(table_type == TableType::MaterializedView)
1112    } else {
1113        user_default_privilege::Column::ForMaterializedView.eq(false)
1114    };
1115    let schema_filter = if let Some(schema_id) = &object.schema_id {
1116        user_default_privilege::Column::SchemaId.eq(*schema_id)
1117    } else {
1118        user_default_privilege::Column::SchemaId.is_null()
1119    };
1120
1121    let default_privileges: Vec<(UserId, UserId, Action, bool)> = UserDefaultPrivilege::find()
1122        .select_only()
1123        .columns([
1124            user_default_privilege::Column::Grantee,
1125            user_default_privilege::Column::GrantedBy,
1126            user_default_privilege::Column::Action,
1127            user_default_privilege::Column::WithGrantOption,
1128        ])
1129        .filter(
1130            user_default_privilege::Column::DatabaseId
1131                .eq(object.database_id.unwrap())
1132                .and(schema_filter)
1133                .and(user_default_privilege::Column::UserId.eq(object.owner_id))
1134                .and(user_default_privilege::Column::ObjectType.eq(object.obj_type))
1135                .and(for_mview_filter),
1136        )
1137        .into_tuple()
1138        .all(db)
1139        .await?;
1140    if default_privileges.is_empty() {
1141        return Ok(vec![]);
1142    }
1143
1144    let updated_user_ids = default_privileges
1145        .iter()
1146        .map(|(grantee, _, _, _)| *grantee)
1147        .collect::<HashSet<_>>();
1148
1149    let internal_table_ids = get_internal_tables_by_id(JobId::new(object_id as _), db).await?;
1150
1151    for (grantee, granted_by, action, with_grant_option) in default_privileges {
1152        UserPrivilege::insert(user_privilege::ActiveModel {
1153            user_id: Set(grantee),
1154            oid: Set(object_id),
1155            granted_by: Set(granted_by),
1156            action: Set(action),
1157            with_grant_option: Set(with_grant_option),
1158            ..Default::default()
1159        })
1160        .exec(db)
1161        .await?;
1162        if action == Action::Select && !internal_table_ids.is_empty() {
1163            // Grant SELECT privilege for internal tables if the action is SELECT.
1164            for internal_table_id in &internal_table_ids {
1165                UserPrivilege::insert(user_privilege::ActiveModel {
1166                    user_id: Set(grantee),
1167                    oid: Set(internal_table_id.as_raw_id() as _),
1168                    granted_by: Set(granted_by),
1169                    action: Set(Action::Select),
1170                    with_grant_option: Set(with_grant_option),
1171                    ..Default::default()
1172                })
1173                .exec(db)
1174                .await?;
1175            }
1176        }
1177    }
1178
1179    let updated_user_infos = list_user_info_by_ids(updated_user_ids, db).await?;
1180    Ok(updated_user_infos)
1181}
1182
1183// todo: remove it after migrated to sql backend.
1184pub fn extract_grant_obj_id(object: &PbGrantObject) -> ObjectId {
1185    match object {
1186        PbGrantObject::DatabaseId(id)
1187        | PbGrantObject::SchemaId(id)
1188        | PbGrantObject::TableId(id)
1189        | PbGrantObject::SourceId(id)
1190        | PbGrantObject::SinkId(id)
1191        | PbGrantObject::ViewId(id)
1192        | PbGrantObject::FunctionId(id)
1193        | PbGrantObject::SubscriptionId(id)
1194        | PbGrantObject::ConnectionId(id)
1195        | PbGrantObject::SecretId(id) => *id as _,
1196    }
1197}
1198
1199pub async fn insert_fragment_relations(
1200    db: &impl ConnectionTrait,
1201    downstream_fragment_relations: &FragmentDownstreamRelation,
1202) -> MetaResult<()> {
1203    for (upstream_fragment_id, downstreams) in downstream_fragment_relations {
1204        for downstream in downstreams {
1205            let relation = fragment_relation::Model {
1206                source_fragment_id: *upstream_fragment_id as _,
1207                target_fragment_id: downstream.downstream_fragment_id as _,
1208                dispatcher_type: downstream.dispatcher_type,
1209                dist_key_indices: downstream
1210                    .dist_key_indices
1211                    .iter()
1212                    .map(|idx| *idx as i32)
1213                    .collect_vec()
1214                    .into(),
1215                output_indices: downstream
1216                    .output_mapping
1217                    .indices
1218                    .iter()
1219                    .map(|idx| *idx as i32)
1220                    .collect_vec()
1221                    .into(),
1222                output_type_mapping: Some(downstream.output_mapping.types.clone().into()),
1223            };
1224            FragmentRelation::insert(relation.into_active_model())
1225                .exec(db)
1226                .await?;
1227        }
1228    }
1229    Ok(())
1230}
1231
1232pub fn compose_dispatchers(
1233    source_fragment_distribution: DistributionType,
1234    source_fragment_actors: &HashMap<crate::model::ActorId, Option<Bitmap>>,
1235    target_fragment_id: crate::model::FragmentId,
1236    target_fragment_distribution: DistributionType,
1237    target_fragment_actors: &HashMap<crate::model::ActorId, Option<Bitmap>>,
1238    dispatcher_type: DispatcherType,
1239    dist_key_indices: Vec<u32>,
1240    output_mapping: PbDispatchOutputMapping,
1241) -> HashMap<crate::model::ActorId, PbDispatcher> {
1242    match dispatcher_type {
1243        DispatcherType::Hash => {
1244            let dispatcher = PbDispatcher {
1245                r#type: PbDispatcherType::from(dispatcher_type) as _,
1246                dist_key_indices,
1247                output_mapping: output_mapping.into(),
1248                hash_mapping: Some(
1249                    ActorMapping::from_bitmaps(
1250                        &target_fragment_actors
1251                            .iter()
1252                            .map(|(actor_id, bitmap)| {
1253                                (
1254                                    *actor_id as _,
1255                                    bitmap
1256                                        .clone()
1257                                        .expect("downstream hash dispatch must have distribution"),
1258                                )
1259                            })
1260                            .collect(),
1261                    )
1262                    .to_protobuf(),
1263                ),
1264                dispatcher_id: target_fragment_id as _,
1265                downstream_actor_id: target_fragment_actors
1266                    .keys()
1267                    .map(|actor_id| *actor_id as _)
1268                    .collect(),
1269            };
1270            source_fragment_actors
1271                .keys()
1272                .map(|source_actor_id| (*source_actor_id, dispatcher.clone()))
1273                .collect()
1274        }
1275        DispatcherType::Broadcast | DispatcherType::Simple => {
1276            let dispatcher = PbDispatcher {
1277                r#type: PbDispatcherType::from(dispatcher_type) as _,
1278                dist_key_indices,
1279                output_mapping: output_mapping.into(),
1280                hash_mapping: None,
1281                dispatcher_id: target_fragment_id as _,
1282                downstream_actor_id: target_fragment_actors
1283                    .keys()
1284                    .map(|actor_id| *actor_id as _)
1285                    .collect(),
1286            };
1287            source_fragment_actors
1288                .keys()
1289                .map(|source_actor_id| (*source_actor_id, dispatcher.clone()))
1290                .collect()
1291        }
1292        DispatcherType::NoShuffle => resolve_no_shuffle_actor_dispatcher(
1293            source_fragment_distribution,
1294            source_fragment_actors,
1295            target_fragment_distribution,
1296            target_fragment_actors,
1297        )
1298        .into_iter()
1299        .map(|(upstream_actor_id, downstream_actor_id)| {
1300            (
1301                upstream_actor_id,
1302                PbDispatcher {
1303                    r#type: PbDispatcherType::NoShuffle as _,
1304                    dist_key_indices: dist_key_indices.clone(),
1305                    output_mapping: output_mapping.clone().into(),
1306                    hash_mapping: None,
1307                    dispatcher_id: target_fragment_id as _,
1308                    downstream_actor_id: vec![downstream_actor_id as _],
1309                },
1310            )
1311        })
1312        .collect(),
1313    }
1314}
1315
1316/// return (`upstream_actor_id` -> `downstream_actor_id`)
1317pub fn resolve_no_shuffle_actor_dispatcher(
1318    source_fragment_distribution: DistributionType,
1319    source_fragment_actors: &HashMap<crate::model::ActorId, Option<Bitmap>>,
1320    target_fragment_distribution: DistributionType,
1321    target_fragment_actors: &HashMap<crate::model::ActorId, Option<Bitmap>>,
1322) -> Vec<(crate::model::ActorId, crate::model::ActorId)> {
1323    assert_eq!(source_fragment_distribution, target_fragment_distribution);
1324    assert_eq!(
1325        source_fragment_actors.len(),
1326        target_fragment_actors.len(),
1327        "no-shuffle should have equal upstream downstream actor count: {:?} {:?}",
1328        source_fragment_actors,
1329        target_fragment_actors
1330    );
1331    match source_fragment_distribution {
1332        DistributionType::Single => {
1333            let assert_singleton = |bitmap: &Option<Bitmap>| {
1334                assert!(
1335                    bitmap.as_ref().map(|bitmap| bitmap.all()).unwrap_or(true),
1336                    "not singleton: {:?}",
1337                    bitmap
1338                );
1339            };
1340            assert_eq!(
1341                source_fragment_actors.len(),
1342                1,
1343                "singleton distribution actor count not 1: {:?}",
1344                source_fragment_distribution
1345            );
1346            assert_eq!(
1347                target_fragment_actors.len(),
1348                1,
1349                "singleton distribution actor count not 1: {:?}",
1350                target_fragment_distribution
1351            );
1352            let (source_actor_id, bitmap) = source_fragment_actors.iter().next().unwrap();
1353            assert_singleton(bitmap);
1354            let (target_actor_id, bitmap) = target_fragment_actors.iter().next().unwrap();
1355            assert_singleton(bitmap);
1356            vec![(*source_actor_id, *target_actor_id)]
1357        }
1358        DistributionType::Hash => {
1359            let mut target_fragment_actor_index: HashMap<_, _> = target_fragment_actors
1360                .iter()
1361                .map(|(actor_id, bitmap)| {
1362                    let bitmap = bitmap
1363                        .as_ref()
1364                        .expect("hash distribution should have bitmap");
1365                    let first_vnode = bitmap.iter_vnodes().next().expect("non-empty bitmap");
1366                    (first_vnode, (*actor_id, bitmap))
1367                })
1368                .collect();
1369            source_fragment_actors
1370                .iter()
1371                .map(|(source_actor_id, bitmap)| {
1372                    let bitmap = bitmap
1373                        .as_ref()
1374                        .expect("hash distribution should have bitmap");
1375                    let first_vnode = bitmap.iter_vnodes().next().expect("non-empty bitmap");
1376                    let (target_actor_id, target_bitmap) =
1377                        target_fragment_actor_index.remove(&first_vnode).unwrap_or_else(|| {
1378                            panic!(
1379                                "cannot find matched target actor: {} {:?} {:?} {:?}",
1380                                source_actor_id,
1381                                first_vnode,
1382                                source_fragment_actors,
1383                                target_fragment_actors
1384                            );
1385                        });
1386                    assert_eq!(
1387                        bitmap,
1388                        target_bitmap,
1389                        "cannot find matched target actor due to bitmap mismatch: {} {:?} {:?} {:?}",
1390                        source_actor_id,
1391                        first_vnode,
1392                        source_fragment_actors,
1393                        target_fragment_actors
1394                    );
1395                    (*source_actor_id, target_actor_id)
1396                }).collect()
1397        }
1398    }
1399}
1400
1401pub fn rebuild_fragment_mapping(fragment: &SharedFragmentInfo) -> PbFragmentWorkerSlotMapping {
1402    let fragment_worker_slot_mapping = match fragment.distribution_type {
1403        DistributionType::Single => {
1404            let actor = fragment.actors.values().exactly_one().unwrap();
1405            WorkerSlotMapping::new_single(WorkerSlotId::new(actor.worker_id as _, 0))
1406        }
1407        DistributionType::Hash => {
1408            let actor_bitmaps: HashMap<_, _> = fragment
1409                .actors
1410                .iter()
1411                .map(|(actor_id, actor_info)| {
1412                    let vnode_bitmap = actor_info
1413                        .vnode_bitmap
1414                        .as_ref()
1415                        .cloned()
1416                        .expect("actor bitmap shouldn't be none in hash fragment");
1417
1418                    (*actor_id as hash::ActorId, vnode_bitmap)
1419                })
1420                .collect();
1421
1422            let actor_mapping = ActorMapping::from_bitmaps(&actor_bitmaps);
1423
1424            let actor_locations = fragment
1425                .actors
1426                .iter()
1427                .map(|(actor_id, actor_info)| {
1428                    (*actor_id as hash::ActorId, actor_info.worker_id as u32)
1429                })
1430                .collect();
1431
1432            actor_mapping.to_worker_slot(&actor_locations)
1433        }
1434    };
1435
1436    PbFragmentWorkerSlotMapping {
1437        fragment_id: fragment.fragment_id,
1438        mapping: Some(fragment_worker_slot_mapping.to_protobuf()),
1439    }
1440}
1441
1442/// For the given streaming jobs, returns
1443/// - All source fragments
1444/// - All sink fragments
1445/// - All actors
1446/// - All fragments
1447pub async fn get_fragments_for_jobs<C>(
1448    db: &C,
1449    actor_info: &SharedActorInfos,
1450    streaming_jobs: Vec<JobId>,
1451) -> MetaResult<(
1452    HashMap<SourceId, BTreeSet<FragmentId>>,
1453    HashSet<FragmentId>,
1454    HashSet<ActorId>,
1455    HashSet<FragmentId>,
1456)>
1457where
1458    C: ConnectionTrait,
1459{
1460    if streaming_jobs.is_empty() {
1461        return Ok((
1462            HashMap::default(),
1463            HashSet::default(),
1464            HashSet::default(),
1465            HashSet::default(),
1466        ));
1467    }
1468
1469    let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find()
1470        .select_only()
1471        .columns([
1472            fragment::Column::FragmentId,
1473            fragment::Column::FragmentTypeMask,
1474            fragment::Column::StreamNode,
1475        ])
1476        .filter(fragment::Column::JobId.is_in(streaming_jobs))
1477        .into_tuple()
1478        .all(db)
1479        .await?;
1480
1481    let fragment_ids: HashSet<_> = fragments
1482        .iter()
1483        .map(|(fragment_id, _, _)| *fragment_id)
1484        .collect();
1485
1486    let actors = {
1487        let guard = actor_info.read_guard();
1488        fragment_ids
1489            .iter()
1490            .flat_map(|id| guard.get_fragment(*id as _))
1491            .flat_map(|f| f.actors.keys().cloned().map(|id| id as _))
1492            .collect::<HashSet<_>>()
1493    };
1494
1495    let mut source_fragment_ids: HashMap<SourceId, BTreeSet<FragmentId>> = HashMap::new();
1496    let mut sink_fragment_ids: HashSet<FragmentId> = HashSet::new();
1497    for (fragment_id, mask, stream_node) in fragments {
1498        if FragmentTypeMask::from(mask).contains(FragmentTypeFlag::Source)
1499            && let Some(source_id) = stream_node.to_protobuf().find_stream_source()
1500        {
1501            source_fragment_ids
1502                .entry(source_id as _)
1503                .or_default()
1504                .insert(fragment_id);
1505        }
1506        if FragmentTypeMask::from(mask).contains(FragmentTypeFlag::Sink) {
1507            sink_fragment_ids.insert(fragment_id);
1508        }
1509    }
1510
1511    Ok((
1512        source_fragment_ids,
1513        sink_fragment_ids,
1514        actors.into_iter().collect(),
1515        fragment_ids,
1516    ))
1517}
1518
1519/// Build a object group for notifying the deletion of the given objects.
1520///
1521/// Note that only id fields are filled in the object info, as the arguments are partial objects.
1522/// As a result, the returned notification info should only be used for deletion.
1523pub(crate) fn build_object_group_for_delete(
1524    partial_objects: Vec<PartialObject>,
1525) -> NotificationInfo {
1526    let mut objects = vec![];
1527    for obj in partial_objects {
1528        match obj.obj_type {
1529            ObjectType::Database => objects.push(PbObject {
1530                object_info: Some(PbObjectInfo::Database(PbDatabase {
1531                    id: (obj.oid as u32).into(),
1532                    ..Default::default()
1533                })),
1534            }),
1535            ObjectType::Schema => objects.push(PbObject {
1536                object_info: Some(PbObjectInfo::Schema(PbSchema {
1537                    id: (obj.oid as u32).into(),
1538                    database_id: obj.database_id.unwrap(),
1539                    ..Default::default()
1540                })),
1541            }),
1542            ObjectType::Table => objects.push(PbObject {
1543                object_info: Some(PbObjectInfo::Table(PbTable {
1544                    id: (obj.oid as u32).into(),
1545                    schema_id: obj.schema_id.unwrap(),
1546                    database_id: obj.database_id.unwrap(),
1547                    ..Default::default()
1548                })),
1549            }),
1550            ObjectType::Source => objects.push(PbObject {
1551                object_info: Some(PbObjectInfo::Source(PbSource {
1552                    id: obj.oid as _,
1553                    schema_id: obj.schema_id.unwrap(),
1554                    database_id: obj.database_id.unwrap(),
1555                    ..Default::default()
1556                })),
1557            }),
1558            ObjectType::Sink => objects.push(PbObject {
1559                object_info: Some(PbObjectInfo::Sink(PbSink {
1560                    id: obj.oid as _,
1561                    schema_id: obj.schema_id.unwrap(),
1562                    database_id: obj.database_id.unwrap(),
1563                    ..Default::default()
1564                })),
1565            }),
1566            ObjectType::Subscription => objects.push(PbObject {
1567                object_info: Some(PbObjectInfo::Subscription(PbSubscription {
1568                    id: obj.oid as _,
1569                    schema_id: obj.schema_id.unwrap(),
1570                    database_id: obj.database_id.unwrap(),
1571                    ..Default::default()
1572                })),
1573            }),
1574            ObjectType::View => objects.push(PbObject {
1575                object_info: Some(PbObjectInfo::View(PbView {
1576                    id: obj.oid as _,
1577                    schema_id: obj.schema_id.unwrap(),
1578                    database_id: obj.database_id.unwrap(),
1579                    ..Default::default()
1580                })),
1581            }),
1582            ObjectType::Index => {
1583                objects.push(PbObject {
1584                    object_info: Some(PbObjectInfo::Index(PbIndex {
1585                        id: obj.oid as _,
1586                        schema_id: obj.schema_id.unwrap(),
1587                        database_id: obj.database_id.unwrap(),
1588                        ..Default::default()
1589                    })),
1590                });
1591                objects.push(PbObject {
1592                    object_info: Some(PbObjectInfo::Table(PbTable {
1593                        id: (obj.oid as u32).into(),
1594                        schema_id: obj.schema_id.unwrap(),
1595                        database_id: obj.database_id.unwrap(),
1596                        ..Default::default()
1597                    })),
1598                });
1599            }
1600            ObjectType::Function => objects.push(PbObject {
1601                object_info: Some(PbObjectInfo::Function(PbFunction {
1602                    id: obj.oid as _,
1603                    schema_id: obj.schema_id.unwrap(),
1604                    database_id: obj.database_id.unwrap(),
1605                    ..Default::default()
1606                })),
1607            }),
1608            ObjectType::Connection => objects.push(PbObject {
1609                object_info: Some(PbObjectInfo::Connection(PbConnection {
1610                    id: obj.oid as _,
1611                    schema_id: obj.schema_id.unwrap(),
1612                    database_id: obj.database_id.unwrap(),
1613                    ..Default::default()
1614                })),
1615            }),
1616            ObjectType::Secret => objects.push(PbObject {
1617                object_info: Some(PbObjectInfo::Secret(PbSecret {
1618                    id: obj.oid as _,
1619                    schema_id: obj.schema_id.unwrap(),
1620                    database_id: obj.database_id.unwrap(),
1621                    ..Default::default()
1622                })),
1623            }),
1624        }
1625    }
1626    NotificationInfo::ObjectGroup(PbObjectGroup { objects })
1627}
1628
1629pub fn extract_external_table_name_from_definition(table_definition: &str) -> Option<String> {
1630    let [mut definition]: [_; 1] = Parser::parse_sql(table_definition)
1631        .context("unable to parse table definition")
1632        .inspect_err(|e| {
1633            tracing::error!(
1634                target: "auto_schema_change",
1635                error = %e.as_report(),
1636                "failed to parse table definition")
1637        })
1638        .unwrap()
1639        .try_into()
1640        .unwrap();
1641    if let SqlStatement::CreateTable { cdc_table_info, .. } = &mut definition {
1642        cdc_table_info
1643            .clone()
1644            .map(|cdc_table_info| cdc_table_info.external_table_name)
1645    } else {
1646        None
1647    }
1648}
1649
1650/// `rename_relation` renames the target relation and its definition,
1651/// it commits the changes to the transaction and returns the updated relations and the old name.
1652pub async fn rename_relation(
1653    txn: &DatabaseTransaction,
1654    object_type: ObjectType,
1655    object_id: ObjectId,
1656    object_name: &str,
1657) -> MetaResult<(Vec<PbObject>, String)> {
1658    use sea_orm::ActiveModelTrait;
1659
1660    use crate::controller::rename::alter_relation_rename;
1661
1662    let mut to_update_relations = vec![];
1663    // rename relation.
1664    macro_rules! rename_relation {
1665        ($entity:ident, $table:ident, $identity:ident, $object_id:expr) => {{
1666            let (mut relation, obj) = $entity::find_by_id($object_id)
1667                .find_also_related(Object)
1668                .one(txn)
1669                .await?
1670                .unwrap();
1671            let obj = obj.unwrap();
1672            let old_name = relation.name.clone();
1673            relation.name = object_name.into();
1674            if obj.obj_type != ObjectType::View {
1675                relation.definition = alter_relation_rename(&relation.definition, object_name);
1676            }
1677            let active_model = $table::ActiveModel {
1678                $identity: Set(relation.$identity),
1679                name: Set(object_name.into()),
1680                definition: Set(relation.definition.clone()),
1681                ..Default::default()
1682            };
1683            active_model.update(txn).await?;
1684            to_update_relations.push(PbObject {
1685                object_info: Some(PbObjectInfo::$entity(ObjectModel(relation, obj).into())),
1686            });
1687            old_name
1688        }};
1689    }
1690    // TODO: check is there any thing to change for shared source?
1691    let old_name = match object_type {
1692        ObjectType::Table => {
1693            let associated_source_id: Option<SourceId> = Source::find()
1694                .select_only()
1695                .column(source::Column::SourceId)
1696                .filter(source::Column::OptionalAssociatedTableId.eq(object_id))
1697                .into_tuple()
1698                .one(txn)
1699                .await?;
1700            if let Some(source_id) = associated_source_id {
1701                rename_relation!(Source, source, source_id, source_id);
1702            }
1703            rename_relation!(Table, table, table_id, TableId::new(object_id as _))
1704        }
1705        ObjectType::Source => rename_relation!(Source, source, source_id, object_id),
1706        ObjectType::Sink => rename_relation!(Sink, sink, sink_id, object_id),
1707        ObjectType::Subscription => {
1708            rename_relation!(Subscription, subscription, subscription_id, object_id)
1709        }
1710        ObjectType::View => rename_relation!(View, view, view_id, object_id),
1711        ObjectType::Index => {
1712            let (mut index, obj) = Index::find_by_id(object_id)
1713                .find_also_related(Object)
1714                .one(txn)
1715                .await?
1716                .unwrap();
1717            index.name = object_name.into();
1718            let index_table_id = index.index_table_id;
1719            let old_name = rename_relation!(Table, table, table_id, index_table_id);
1720
1721            // the name of index and its associated table is the same.
1722            let active_model = index::ActiveModel {
1723                index_id: sea_orm::ActiveValue::Set(index.index_id),
1724                name: sea_orm::ActiveValue::Set(object_name.into()),
1725                ..Default::default()
1726            };
1727            active_model.update(txn).await?;
1728            to_update_relations.push(PbObject {
1729                object_info: Some(PbObjectInfo::Index(ObjectModel(index, obj.unwrap()).into())),
1730            });
1731            old_name
1732        }
1733        _ => unreachable!("only relation name can be altered."),
1734    };
1735
1736    Ok((to_update_relations, old_name))
1737}
1738
1739pub async fn get_database_resource_group<C>(txn: &C, database_id: DatabaseId) -> MetaResult<String>
1740where
1741    C: ConnectionTrait,
1742{
1743    let database_resource_group: Option<String> = Database::find_by_id(database_id)
1744        .select_only()
1745        .column(database::Column::ResourceGroup)
1746        .into_tuple()
1747        .one(txn)
1748        .await?
1749        .ok_or_else(|| MetaError::catalog_id_not_found("database", database_id))?;
1750
1751    Ok(database_resource_group.unwrap_or_else(|| DEFAULT_RESOURCE_GROUP.to_owned()))
1752}
1753
1754pub async fn get_existing_job_resource_group<C>(
1755    txn: &C,
1756    streaming_job_id: JobId,
1757) -> MetaResult<String>
1758where
1759    C: ConnectionTrait,
1760{
1761    let (job_specific_resource_group, database_resource_group): (Option<String>, Option<String>) =
1762        StreamingJob::find_by_id(streaming_job_id)
1763            .select_only()
1764            .join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
1765            .join(JoinType::InnerJoin, object::Relation::Database2.def())
1766            .column(streaming_job::Column::SpecificResourceGroup)
1767            .column(database::Column::ResourceGroup)
1768            .into_tuple()
1769            .one(txn)
1770            .await?
1771            .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", streaming_job_id))?;
1772
1773    Ok(job_specific_resource_group.unwrap_or_else(|| {
1774        database_resource_group.unwrap_or_else(|| DEFAULT_RESOURCE_GROUP.to_owned())
1775    }))
1776}
1777
1778pub fn filter_workers_by_resource_group(
1779    workers: &HashMap<u32, WorkerNode>,
1780    resource_group: &str,
1781) -> BTreeSet<WorkerId> {
1782    workers
1783        .iter()
1784        .filter(|&(_, worker)| {
1785            worker
1786                .resource_group()
1787                .map(|node_label| node_label.as_str() == resource_group)
1788                .unwrap_or(false)
1789        })
1790        .map(|(id, _)| *id as WorkerId)
1791        .collect()
1792}
1793
1794/// `rename_relation_refer` updates the definition of relations that refer to the target one,
1795/// it commits the changes to the transaction and returns all the updated relations.
1796pub async fn rename_relation_refer(
1797    txn: &DatabaseTransaction,
1798    object_type: ObjectType,
1799    object_id: ObjectId,
1800    object_name: &str,
1801    old_name: &str,
1802) -> MetaResult<Vec<PbObject>> {
1803    use sea_orm::ActiveModelTrait;
1804
1805    use crate::controller::rename::alter_relation_rename_refs;
1806
1807    let mut to_update_relations = vec![];
1808    macro_rules! rename_relation_ref {
1809        ($entity:ident, $table:ident, $identity:ident, $object_id:expr) => {{
1810            let (mut relation, obj) = $entity::find_by_id($object_id)
1811                .find_also_related(Object)
1812                .one(txn)
1813                .await?
1814                .unwrap();
1815            relation.definition =
1816                alter_relation_rename_refs(&relation.definition, old_name, object_name);
1817            let active_model = $table::ActiveModel {
1818                $identity: Set(relation.$identity),
1819                definition: Set(relation.definition.clone()),
1820                ..Default::default()
1821            };
1822            active_model.update(txn).await?;
1823            to_update_relations.push(PbObject {
1824                object_info: Some(PbObjectInfo::$entity(
1825                    ObjectModel(relation, obj.unwrap()).into(),
1826                )),
1827            });
1828        }};
1829    }
1830    let mut objs = get_referring_objects(object_id, txn).await?;
1831    if object_type == ObjectType::Table {
1832        let incoming_sinks: Vec<SinkId> = Sink::find()
1833            .select_only()
1834            .column(sink::Column::SinkId)
1835            .filter(sink::Column::TargetTable.eq(object_id))
1836            .into_tuple()
1837            .all(txn)
1838            .await?;
1839
1840        objs.extend(incoming_sinks.into_iter().map(|id| PartialObject {
1841            oid: id,
1842            obj_type: ObjectType::Sink,
1843            schema_id: None,
1844            database_id: None,
1845        }));
1846    }
1847
1848    for obj in objs {
1849        match obj.obj_type {
1850            ObjectType::Table => {
1851                rename_relation_ref!(Table, table, table_id, TableId::new(obj.oid as _))
1852            }
1853            ObjectType::Sink => rename_relation_ref!(Sink, sink, sink_id, obj.oid),
1854            ObjectType::Subscription => {
1855                rename_relation_ref!(Subscription, subscription, subscription_id, obj.oid)
1856            }
1857            ObjectType::View => rename_relation_ref!(View, view, view_id, obj.oid),
1858            ObjectType::Index => {
1859                let index_table_id: Option<TableId> = Index::find_by_id(obj.oid)
1860                    .select_only()
1861                    .column(index::Column::IndexTableId)
1862                    .into_tuple()
1863                    .one(txn)
1864                    .await?;
1865                rename_relation_ref!(Table, table, table_id, index_table_id.unwrap());
1866            }
1867            _ => {
1868                bail!(
1869                    "only the table, sink, subscription, view and index will depend on other objects."
1870                )
1871            }
1872        }
1873    }
1874
1875    Ok(to_update_relations)
1876}
1877
1878/// Validate that subscription can be safely deleted, meeting any of the following conditions:
1879/// 1. The upstream table is not referred to by any cross-db mv.
1880/// 2. After deleting the subscription, the upstream table still has at least one subscription.
1881pub async fn validate_subscription_deletion<C>(txn: &C, subscription_id: ObjectId) -> MetaResult<()>
1882where
1883    C: ConnectionTrait,
1884{
1885    let upstream_table_id: ObjectId = Subscription::find_by_id(subscription_id)
1886        .select_only()
1887        .column(subscription::Column::DependentTableId)
1888        .into_tuple()
1889        .one(txn)
1890        .await?
1891        .ok_or_else(|| MetaError::catalog_id_not_found("subscription", subscription_id))?;
1892
1893    let cnt = Subscription::find()
1894        .filter(subscription::Column::DependentTableId.eq(upstream_table_id))
1895        .count(txn)
1896        .await?;
1897    if cnt > 1 {
1898        // Ensure that at least one subscription is remained for the upstream table
1899        // once the subscription is dropped.
1900        return Ok(());
1901    }
1902
1903    // Ensure that the upstream table is not referred by any cross-db mv.
1904    let obj_alias = Alias::new("o1");
1905    let used_by_alias = Alias::new("o2");
1906    let count = ObjectDependency::find()
1907        .join_as(
1908            JoinType::InnerJoin,
1909            object_dependency::Relation::Object2.def(),
1910            obj_alias.clone(),
1911        )
1912        .join_as(
1913            JoinType::InnerJoin,
1914            object_dependency::Relation::Object1.def(),
1915            used_by_alias.clone(),
1916        )
1917        .filter(
1918            object_dependency::Column::Oid
1919                .eq(upstream_table_id)
1920                .and(object_dependency::Column::UsedBy.ne(subscription_id))
1921                .and(
1922                    Expr::col((obj_alias, object::Column::DatabaseId))
1923                        .ne(Expr::col((used_by_alias, object::Column::DatabaseId))),
1924                ),
1925        )
1926        .count(txn)
1927        .await?;
1928
1929    if count != 0 {
1930        return Err(MetaError::permission_denied(format!(
1931            "Referenced by {} cross-db objects.",
1932            count
1933        )));
1934    }
1935
1936    Ok(())
1937}
1938
1939pub async fn fetch_target_fragments<C>(
1940    txn: &C,
1941    src_fragment_id: impl IntoIterator<Item = FragmentId>,
1942) -> MetaResult<HashMap<FragmentId, Vec<FragmentId>>>
1943where
1944    C: ConnectionTrait,
1945{
1946    let source_target_fragments: Vec<(FragmentId, FragmentId)> = FragmentRelation::find()
1947        .select_only()
1948        .columns([
1949            fragment_relation::Column::SourceFragmentId,
1950            fragment_relation::Column::TargetFragmentId,
1951        ])
1952        .filter(fragment_relation::Column::SourceFragmentId.is_in(src_fragment_id))
1953        .into_tuple()
1954        .all(txn)
1955        .await?;
1956
1957    let source_target_fragments = source_target_fragments.into_iter().into_group_map();
1958
1959    Ok(source_target_fragments)
1960}
1961
1962pub async fn get_sink_fragment_by_ids<C>(
1963    txn: &C,
1964    sink_ids: Vec<SinkId>,
1965) -> MetaResult<HashMap<SinkId, FragmentId>>
1966where
1967    C: ConnectionTrait,
1968{
1969    let sink_num = sink_ids.len();
1970    let sink_fragment_ids: Vec<(SinkId, FragmentId)> = Fragment::find()
1971        .select_only()
1972        .columns([fragment::Column::JobId, fragment::Column::FragmentId])
1973        .filter(
1974            fragment::Column::JobId
1975                .is_in(sink_ids)
1976                .and(FragmentTypeMask::intersects(FragmentTypeFlag::Sink)),
1977        )
1978        .into_tuple()
1979        .all(txn)
1980        .await?;
1981
1982    if sink_fragment_ids.len() != sink_num {
1983        return Err(anyhow::anyhow!(
1984            "expected exactly one sink fragment for each sink, but got {} fragments for {} sinks",
1985            sink_fragment_ids.len(),
1986            sink_num
1987        )
1988        .into());
1989    }
1990
1991    Ok(sink_fragment_ids.into_iter().collect())
1992}
1993
1994pub async fn has_table_been_migrated<C>(txn: &C, table_id: TableId) -> MetaResult<bool>
1995where
1996    C: ConnectionTrait,
1997{
1998    let mview_fragment: Vec<i32> = Fragment::find()
1999        .select_only()
2000        .column(fragment::Column::FragmentTypeMask)
2001        .filter(
2002            fragment::Column::JobId
2003                .eq(table_id)
2004                .and(FragmentTypeMask::intersects(FragmentTypeFlag::Mview)),
2005        )
2006        .into_tuple()
2007        .all(txn)
2008        .await?;
2009
2010    let mview_fragment_len = mview_fragment.len();
2011    if mview_fragment_len != 1 {
2012        bail!(
2013            "expected exactly one mview fragment for table {}, found {}",
2014            table_id,
2015            mview_fragment_len
2016        );
2017    }
2018
2019    let mview_fragment = mview_fragment.into_iter().next().unwrap();
2020    let migrated =
2021        FragmentTypeMask::from(mview_fragment).contains(FragmentTypeFlag::UpstreamSinkUnion);
2022
2023    Ok(migrated)
2024}
2025
2026pub fn build_select_node_list(
2027    from: &[ColumnCatalog],
2028    to: &[ColumnCatalog],
2029) -> MetaResult<Vec<PbExprNode>> {
2030    let mut exprs = Vec::with_capacity(to.len());
2031    let idx_by_col_id = from
2032        .iter()
2033        .enumerate()
2034        .map(|(idx, col)| (col.column_desc.as_ref().unwrap().column_id, idx))
2035        .collect::<HashMap<_, _>>();
2036
2037    for to_col in to {
2038        let to_col = to_col.column_desc.as_ref().unwrap();
2039        let to_col_type = to_col.column_type.clone();
2040        if let Some(from_idx) = idx_by_col_id.get(&to_col.column_id) {
2041            let from_col_type = from[*from_idx]
2042                .column_desc
2043                .as_ref()
2044                .unwrap()
2045                .column_type
2046                .clone();
2047            if to_col_type != from_col_type {
2048                return Err(anyhow!(
2049                    "Column type mismatch: {:?} != {:?}",
2050                    from_col_type,
2051                    to_col_type
2052                )
2053                .into());
2054            }
2055            exprs.push(PbExprNode {
2056                function_type: expr_node::Type::Unspecified.into(),
2057                return_type: to_col_type,
2058                rex_node: Some(expr_node::RexNode::InputRef(*from_idx as _)),
2059            });
2060        } else {
2061            let to_default_node =
2062                if let Some(GeneratedOrDefaultColumn::DefaultColumn(DefaultColumnDesc {
2063                    expr,
2064                    ..
2065                })) = &to_col.generated_or_default_column
2066                {
2067                    expr.clone().unwrap()
2068                } else {
2069                    let null = Datum::None.to_protobuf();
2070                    PbExprNode {
2071                        function_type: expr_node::Type::Unspecified.into(),
2072                        return_type: to_col_type,
2073                        rex_node: Some(expr_node::RexNode::Constant(null)),
2074                    }
2075                };
2076            exprs.push(to_default_node);
2077        }
2078    }
2079
2080    Ok(exprs)
2081}
2082
2083#[derive(Clone, Debug, Default)]
2084pub struct StreamingJobExtraInfo {
2085    pub timezone: Option<String>,
2086    pub job_definition: String,
2087}
2088
2089pub async fn get_streaming_job_extra_info<C>(
2090    txn: &C,
2091    job_ids: Vec<JobId>,
2092) -> MetaResult<HashMap<JobId, StreamingJobExtraInfo>>
2093where
2094    C: ConnectionTrait,
2095{
2096    let timezone_pairs: Vec<(JobId, Option<String>)> = StreamingJob::find()
2097        .select_only()
2098        .columns([
2099            streaming_job::Column::JobId,
2100            streaming_job::Column::Timezone,
2101        ])
2102        .filter(streaming_job::Column::JobId.is_in(job_ids.clone()))
2103        .into_tuple()
2104        .all(txn)
2105        .await?;
2106
2107    let job_ids = job_ids.into_iter().collect();
2108
2109    let mut definitions = resolve_streaming_job_definition(txn, &job_ids).await?;
2110
2111    let result = timezone_pairs
2112        .into_iter()
2113        .map(|(job_id, timezone)| {
2114            let job_definition = definitions.remove(&job_id).unwrap_or_default();
2115            (
2116                job_id,
2117                StreamingJobExtraInfo {
2118                    timezone,
2119                    job_definition,
2120                },
2121            )
2122        })
2123        .collect();
2124
2125    Ok(result)
2126}
2127
2128#[cfg(test)]
2129mod tests {
2130    use super::*;
2131
2132    #[test]
2133    fn test_extract_cdc_table_name() {
2134        let ddl1 = "CREATE TABLE t1 () FROM pg_source TABLE 'public.t1'";
2135        let ddl2 = "CREATE TABLE t2 (v1 int) FROM pg_source TABLE 'mydb.t2'";
2136        assert_eq!(
2137            extract_external_table_name_from_definition(ddl1),
2138            Some("public.t1".into())
2139        );
2140        assert_eq!(
2141            extract_external_table_name_from_definition(ddl2),
2142            Some("mydb.t2".into())
2143        );
2144    }
2145}