risingwave_meta/controller/
utils.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeSet, HashMap, HashSet};
16use std::sync::Arc;
17
18use anyhow::{Context, anyhow};
19use itertools::Itertools;
20use risingwave_common::bitmap::Bitmap;
21use risingwave_common::catalog::{FragmentTypeFlag, FragmentTypeMask, ICEBERG_SINK_PREFIX};
22use risingwave_common::hash::{ActorMapping, VnodeBitmapExt, WorkerSlotId, WorkerSlotMapping};
23use risingwave_common::id::{JobId, SubscriptionId};
24use risingwave_common::types::{DataType, Datum};
25use risingwave_common::util::value_encoding::DatumToProtoExt;
26use risingwave_common::util::worker_util::DEFAULT_RESOURCE_GROUP;
27use risingwave_common::{bail, hash};
28use risingwave_meta_model::fragment::DistributionType;
29use risingwave_meta_model::object::ObjectType;
30use risingwave_meta_model::prelude::*;
31use risingwave_meta_model::table::TableType;
32use risingwave_meta_model::user_privilege::Action;
33use risingwave_meta_model::{
34    ActorId, ColumnCatalogArray, CreateType, DataTypeArray, DatabaseId, DispatcherType, FragmentId,
35    JobStatus, ObjectId, PrivilegeId, SchemaId, SinkId, SourceId, StreamNode, StreamSourceInfo,
36    TableId, TableIdArray, UserId, WorkerId, connection, database, fragment, fragment_relation,
37    function, index, object, object_dependency, schema, secret, sink, source, streaming_job,
38    subscription, table, user, user_default_privilege, user_privilege, view,
39};
40use risingwave_meta_model_migration::WithQuery;
41use risingwave_pb::catalog::{
42    PbConnection, PbDatabase, PbFunction, PbIndex, PbSchema, PbSecret, PbSink, PbSource,
43    PbSubscription, PbTable, PbView,
44};
45use risingwave_pb::common::WorkerNode;
46use risingwave_pb::expr::{PbExprNode, expr_node};
47use risingwave_pb::meta::object::PbObjectInfo;
48use risingwave_pb::meta::subscribe_response::Info as NotificationInfo;
49use risingwave_pb::meta::{PbFragmentWorkerSlotMapping, PbObject, PbObjectGroup};
50use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn;
51use risingwave_pb::plan_common::{ColumnCatalog, DefaultColumnDesc};
52use risingwave_pb::stream_plan::{PbDispatchOutputMapping, PbDispatcher, PbDispatcherType};
53use risingwave_pb::user::grant_privilege::{PbActionWithGrantOption, PbObject as PbGrantObject};
54use risingwave_pb::user::{PbAction, PbGrantPrivilege, PbUserInfo};
55use risingwave_sqlparser::ast::Statement as SqlStatement;
56use risingwave_sqlparser::parser::Parser;
57use sea_orm::sea_query::{
58    Alias, CommonTableExpression, Expr, Query, QueryStatementBuilder, SelectStatement, UnionType,
59    WithClause,
60};
61use sea_orm::{
62    ColumnTrait, ConnectionTrait, DatabaseTransaction, DerivePartialModel, EntityTrait,
63    FromQueryResult, IntoActiveModel, JoinType, Order, PaginatorTrait, QueryFilter, QuerySelect,
64    RelationTrait, Set, Statement,
65};
66use thiserror_ext::AsReport;
67
68use crate::barrier::{SharedActorInfos, SharedFragmentInfo};
69use crate::controller::ObjectModel;
70use crate::controller::fragment::FragmentTypeMaskExt;
71use crate::controller::scale::resolve_streaming_job_definition;
72use crate::model::{FragmentDownstreamRelation, StreamContext};
73use crate::{MetaError, MetaResult};
74
75/// This function will construct a query using recursive cte to find all objects[(id, `obj_type`)] that are used by the given object.
76///
77/// # Examples
78///
79/// ```
80/// use risingwave_meta::controller::utils::construct_obj_dependency_query;
81/// use sea_orm::sea_query::*;
82/// use sea_orm::*;
83///
84/// let query = construct_obj_dependency_query(1.into());
85///
86/// assert_eq!(
87///     query.to_string(MysqlQueryBuilder),
88///     r#"WITH RECURSIVE `used_by_object_ids` (`used_by`) AS (SELECT `used_by` FROM `object_dependency` WHERE `object_dependency`.`oid` = 1 UNION ALL (SELECT `oid` FROM `object` WHERE `object`.`database_id` = 1 OR `object`.`schema_id` = 1) UNION ALL (SELECT `object_dependency`.`used_by` FROM `object_dependency` INNER JOIN `used_by_object_ids` ON `used_by_object_ids`.`used_by` = `oid`)) SELECT DISTINCT `oid`, `obj_type`, `schema_id`, `database_id` FROM `used_by_object_ids` INNER JOIN `object` ON `used_by_object_ids`.`used_by` = `oid` ORDER BY `oid` DESC"#
89/// );
90/// assert_eq!(
91///     query.to_string(PostgresQueryBuilder),
92///     r#"WITH RECURSIVE "used_by_object_ids" ("used_by") AS (SELECT "used_by" FROM "object_dependency" WHERE "object_dependency"."oid" = 1 UNION ALL (SELECT "oid" FROM "object" WHERE "object"."database_id" = 1 OR "object"."schema_id" = 1) UNION ALL (SELECT "object_dependency"."used_by" FROM "object_dependency" INNER JOIN "used_by_object_ids" ON "used_by_object_ids"."used_by" = "oid")) SELECT DISTINCT "oid", "obj_type", "schema_id", "database_id" FROM "used_by_object_ids" INNER JOIN "object" ON "used_by_object_ids"."used_by" = "oid" ORDER BY "oid" DESC"#
93/// );
94/// assert_eq!(
95///     query.to_string(SqliteQueryBuilder),
96///     r#"WITH RECURSIVE "used_by_object_ids" ("used_by") AS (SELECT "used_by" FROM "object_dependency" WHERE "object_dependency"."oid" = 1 UNION ALL SELECT "oid" FROM "object" WHERE "object"."database_id" = 1 OR "object"."schema_id" = 1 UNION ALL SELECT "object_dependency"."used_by" FROM "object_dependency" INNER JOIN "used_by_object_ids" ON "used_by_object_ids"."used_by" = "oid") SELECT DISTINCT "oid", "obj_type", "schema_id", "database_id" FROM "used_by_object_ids" INNER JOIN "object" ON "used_by_object_ids"."used_by" = "oid" ORDER BY "oid" DESC"#
97/// );
98/// ```
99pub fn construct_obj_dependency_query(obj_id: ObjectId) -> WithQuery {
100    let cte_alias = Alias::new("used_by_object_ids");
101    let cte_return_alias = Alias::new("used_by");
102
103    let mut base_query = SelectStatement::new()
104        .column(object_dependency::Column::UsedBy)
105        .from(ObjectDependency)
106        .and_where(object_dependency::Column::Oid.eq(obj_id))
107        .to_owned();
108
109    let belonged_obj_query = SelectStatement::new()
110        .column(object::Column::Oid)
111        .from(Object)
112        .and_where(
113            object::Column::DatabaseId
114                .eq(obj_id)
115                .or(object::Column::SchemaId.eq(obj_id)),
116        )
117        .to_owned();
118
119    let cte_referencing = Query::select()
120        .column((ObjectDependency, object_dependency::Column::UsedBy))
121        .from(ObjectDependency)
122        .inner_join(
123            cte_alias.clone(),
124            Expr::col((cte_alias.clone(), cte_return_alias.clone()))
125                .equals(object_dependency::Column::Oid),
126        )
127        .to_owned();
128
129    let mut common_table_expr = CommonTableExpression::new();
130    common_table_expr
131        .query(
132            base_query
133                .union(UnionType::All, belonged_obj_query)
134                .union(UnionType::All, cte_referencing)
135                .to_owned(),
136        )
137        .column(cte_return_alias.clone())
138        .table_name(cte_alias.clone());
139
140    SelectStatement::new()
141        .distinct()
142        .columns([
143            object::Column::Oid,
144            object::Column::ObjType,
145            object::Column::SchemaId,
146            object::Column::DatabaseId,
147        ])
148        .from(cte_alias.clone())
149        .inner_join(
150            Object,
151            Expr::col((cte_alias, cte_return_alias)).equals(object::Column::Oid),
152        )
153        .order_by(object::Column::Oid, Order::Desc)
154        .to_owned()
155        .with(
156            WithClause::new()
157                .recursive(true)
158                .cte(common_table_expr)
159                .to_owned(),
160        )
161}
162
163/// This function will construct a query using recursive cte to find if dependent objects are already relying on the target table.
164///
165/// # Examples
166///
167/// ```
168/// use risingwave_meta::controller::utils::construct_sink_cycle_check_query;
169/// use sea_orm::sea_query::*;
170/// use sea_orm::*;
171///
172/// let query = construct_sink_cycle_check_query(1.into(), vec![2.into(), 3.into()]);
173///
174/// assert_eq!(
175///     query.to_string(MysqlQueryBuilder),
176///     r#"WITH RECURSIVE `used_by_object_ids_with_sink` (`oid`, `used_by`) AS (SELECT `oid`, `used_by` FROM `object_dependency` WHERE `object_dependency`.`oid` = 1 UNION ALL (SELECT `obj_dependency_with_sink`.`oid`, `obj_dependency_with_sink`.`used_by` FROM (SELECT `oid`, `used_by` FROM `object_dependency` UNION ALL (SELECT `sink_id`, `target_table` FROM `sink` WHERE `sink`.`target_table` IS NOT NULL)) AS `obj_dependency_with_sink` INNER JOIN `used_by_object_ids_with_sink` ON `used_by_object_ids_with_sink`.`used_by` = `obj_dependency_with_sink`.`oid` WHERE `used_by_object_ids_with_sink`.`used_by` <> `used_by_object_ids_with_sink`.`oid`)) SELECT COUNT(`used_by_object_ids_with_sink`.`used_by`) FROM `used_by_object_ids_with_sink` WHERE `used_by_object_ids_with_sink`.`used_by` IN (2, 3)"#
177/// );
178/// assert_eq!(
179///     query.to_string(PostgresQueryBuilder),
180///     r#"WITH RECURSIVE "used_by_object_ids_with_sink" ("oid", "used_by") AS (SELECT "oid", "used_by" FROM "object_dependency" WHERE "object_dependency"."oid" = 1 UNION ALL (SELECT "obj_dependency_with_sink"."oid", "obj_dependency_with_sink"."used_by" FROM (SELECT "oid", "used_by" FROM "object_dependency" UNION ALL (SELECT "sink_id", "target_table" FROM "sink" WHERE "sink"."target_table" IS NOT NULL)) AS "obj_dependency_with_sink" INNER JOIN "used_by_object_ids_with_sink" ON "used_by_object_ids_with_sink"."used_by" = "obj_dependency_with_sink"."oid" WHERE "used_by_object_ids_with_sink"."used_by" <> "used_by_object_ids_with_sink"."oid")) SELECT COUNT("used_by_object_ids_with_sink"."used_by") FROM "used_by_object_ids_with_sink" WHERE "used_by_object_ids_with_sink"."used_by" IN (2, 3)"#
181/// );
182/// assert_eq!(
183///     query.to_string(SqliteQueryBuilder),
184///     r#"WITH RECURSIVE "used_by_object_ids_with_sink" ("oid", "used_by") AS (SELECT "oid", "used_by" FROM "object_dependency" WHERE "object_dependency"."oid" = 1 UNION ALL SELECT "obj_dependency_with_sink"."oid", "obj_dependency_with_sink"."used_by" FROM (SELECT "oid", "used_by" FROM "object_dependency" UNION ALL SELECT "sink_id", "target_table" FROM "sink" WHERE "sink"."target_table" IS NOT NULL) AS "obj_dependency_with_sink" INNER JOIN "used_by_object_ids_with_sink" ON "used_by_object_ids_with_sink"."used_by" = "obj_dependency_with_sink"."oid" WHERE "used_by_object_ids_with_sink"."used_by" <> "used_by_object_ids_with_sink"."oid") SELECT COUNT("used_by_object_ids_with_sink"."used_by") FROM "used_by_object_ids_with_sink" WHERE "used_by_object_ids_with_sink"."used_by" IN (2, 3)"#
185/// );
186/// ```
187pub fn construct_sink_cycle_check_query(
188    target_table: ObjectId,
189    dependent_objects: Vec<ObjectId>,
190) -> WithQuery {
191    let cte_alias = Alias::new("used_by_object_ids_with_sink");
192    let depend_alias = Alias::new("obj_dependency_with_sink");
193
194    let mut base_query = SelectStatement::new()
195        .columns([
196            object_dependency::Column::Oid,
197            object_dependency::Column::UsedBy,
198        ])
199        .from(ObjectDependency)
200        .and_where(object_dependency::Column::Oid.eq(target_table))
201        .to_owned();
202
203    let query_sink_deps = SelectStatement::new()
204        .columns([sink::Column::SinkId, sink::Column::TargetTable])
205        .from(Sink)
206        .and_where(sink::Column::TargetTable.is_not_null())
207        .to_owned();
208
209    let cte_referencing = Query::select()
210        .column((depend_alias.clone(), object_dependency::Column::Oid))
211        .column((depend_alias.clone(), object_dependency::Column::UsedBy))
212        .from_subquery(
213            SelectStatement::new()
214                .columns([
215                    object_dependency::Column::Oid,
216                    object_dependency::Column::UsedBy,
217                ])
218                .from(ObjectDependency)
219                .union(UnionType::All, query_sink_deps)
220                .to_owned(),
221            depend_alias.clone(),
222        )
223        .inner_join(
224            cte_alias.clone(),
225            Expr::col((cte_alias.clone(), object_dependency::Column::UsedBy))
226                .eq(Expr::col((depend_alias, object_dependency::Column::Oid))),
227        )
228        .and_where(
229            Expr::col((cte_alias.clone(), object_dependency::Column::UsedBy)).ne(Expr::col((
230                cte_alias.clone(),
231                object_dependency::Column::Oid,
232            ))),
233        )
234        .to_owned();
235
236    let mut common_table_expr = CommonTableExpression::new();
237    common_table_expr
238        .query(base_query.union(UnionType::All, cte_referencing).to_owned())
239        .columns([
240            object_dependency::Column::Oid,
241            object_dependency::Column::UsedBy,
242        ])
243        .table_name(cte_alias.clone());
244
245    SelectStatement::new()
246        .expr(Expr::col((cte_alias.clone(), object_dependency::Column::UsedBy)).count())
247        .from(cte_alias.clone())
248        .and_where(
249            Expr::col((cte_alias, object_dependency::Column::UsedBy)).is_in(dependent_objects),
250        )
251        .to_owned()
252        .with(
253            WithClause::new()
254                .recursive(true)
255                .cte(common_table_expr)
256                .to_owned(),
257        )
258}
259
260#[derive(Clone, DerivePartialModel, FromQueryResult, Debug)]
261#[sea_orm(entity = "Object")]
262pub struct PartialObject {
263    pub oid: ObjectId,
264    pub obj_type: ObjectType,
265    pub schema_id: Option<SchemaId>,
266    pub database_id: Option<DatabaseId>,
267}
268
269#[derive(Clone, DerivePartialModel, FromQueryResult)]
270#[sea_orm(entity = "Fragment")]
271pub struct PartialFragmentStateTables {
272    pub fragment_id: FragmentId,
273    pub job_id: ObjectId,
274    pub state_table_ids: TableIdArray,
275}
276
277#[derive(Clone, Eq, PartialEq, Debug)]
278pub struct PartialActorLocation {
279    pub actor_id: ActorId,
280    pub fragment_id: FragmentId,
281    pub worker_id: WorkerId,
282}
283
284#[derive(FromQueryResult, Debug, Eq, PartialEq, Clone)]
285pub struct FragmentDesc {
286    pub fragment_id: FragmentId,
287    pub job_id: JobId,
288    pub fragment_type_mask: i32,
289    pub distribution_type: DistributionType,
290    pub state_table_ids: TableIdArray,
291    pub parallelism: i64,
292    pub vnode_count: i32,
293    pub stream_node: StreamNode,
294    pub parallelism_policy: String,
295}
296
297/// List all objects that are using the given one in a cascade way. It runs a recursive CTE to find all the dependencies.
298pub async fn get_referring_objects_cascade<C>(
299    obj_id: ObjectId,
300    db: &C,
301) -> MetaResult<Vec<PartialObject>>
302where
303    C: ConnectionTrait,
304{
305    let query = construct_obj_dependency_query(obj_id);
306    let (sql, values) = query.build_any(&*db.get_database_backend().get_query_builder());
307    let objects = PartialObject::find_by_statement(Statement::from_sql_and_values(
308        db.get_database_backend(),
309        sql,
310        values,
311    ))
312    .all(db)
313    .await?;
314    Ok(objects)
315}
316
317/// Check if create a sink with given dependent objects into the target table will cause a cycle, return true if it will.
318pub async fn check_sink_into_table_cycle<C>(
319    target_table: ObjectId,
320    dependent_objs: Vec<ObjectId>,
321    db: &C,
322) -> MetaResult<bool>
323where
324    C: ConnectionTrait,
325{
326    if dependent_objs.is_empty() {
327        return Ok(false);
328    }
329
330    // special check for self referencing
331    if dependent_objs.contains(&target_table) {
332        return Ok(true);
333    }
334
335    let query = construct_sink_cycle_check_query(target_table, dependent_objs);
336    let (sql, values) = query.build_any(&*db.get_database_backend().get_query_builder());
337
338    let res = db
339        .query_one(Statement::from_sql_and_values(
340            db.get_database_backend(),
341            sql,
342            values,
343        ))
344        .await?
345        .unwrap();
346
347    let cnt: i64 = res.try_get_by(0)?;
348
349    Ok(cnt != 0)
350}
351
352/// `ensure_object_id` ensures the existence of target object in the cluster.
353pub async fn ensure_object_id<C>(
354    object_type: ObjectType,
355    obj_id: impl Into<ObjectId>,
356    db: &C,
357) -> MetaResult<()>
358where
359    C: ConnectionTrait,
360{
361    let obj_id = obj_id.into();
362    let count = Object::find_by_id(obj_id).count(db).await?;
363    if count == 0 {
364        return Err(MetaError::catalog_id_not_found(
365            object_type.as_str(),
366            obj_id,
367        ));
368    }
369    Ok(())
370}
371
372pub async fn ensure_job_not_canceled<C>(job_id: JobId, db: &C) -> MetaResult<()>
373where
374    C: ConnectionTrait,
375{
376    let count = Object::find_by_id(job_id).count(db).await?;
377    if count == 0 {
378        return Err(MetaError::cancelled(format!(
379            "job {} might be cancelled manually or by recovery",
380            job_id
381        )));
382    }
383    Ok(())
384}
385
386/// `ensure_user_id` ensures the existence of target user in the cluster.
387pub async fn ensure_user_id<C>(user_id: UserId, db: &C) -> MetaResult<()>
388where
389    C: ConnectionTrait,
390{
391    let count = User::find_by_id(user_id).count(db).await?;
392    if count == 0 {
393        return Err(anyhow!("user {} was concurrently dropped", user_id).into());
394    }
395    Ok(())
396}
397
398/// `check_database_name_duplicate` checks whether the database name is already used in the cluster.
399pub async fn check_database_name_duplicate<C>(name: &str, db: &C) -> MetaResult<()>
400where
401    C: ConnectionTrait,
402{
403    let count = Database::find()
404        .filter(database::Column::Name.eq(name))
405        .count(db)
406        .await?;
407    if count > 0 {
408        assert_eq!(count, 1);
409        return Err(MetaError::catalog_duplicated("database", name));
410    }
411    Ok(())
412}
413
414/// `check_function_signature_duplicate` checks whether the function name and its signature is already used in the target namespace.
415pub async fn check_function_signature_duplicate<C>(
416    pb_function: &PbFunction,
417    db: &C,
418) -> MetaResult<()>
419where
420    C: ConnectionTrait,
421{
422    let count = Function::find()
423        .inner_join(Object)
424        .filter(
425            object::Column::DatabaseId
426                .eq(pb_function.database_id)
427                .and(object::Column::SchemaId.eq(pb_function.schema_id))
428                .and(function::Column::Name.eq(&pb_function.name))
429                .and(
430                    function::Column::ArgTypes
431                        .eq(DataTypeArray::from(pb_function.arg_types.clone())),
432                ),
433        )
434        .count(db)
435        .await?;
436    if count > 0 {
437        assert_eq!(count, 1);
438        return Err(MetaError::catalog_duplicated("function", &pb_function.name));
439    }
440    Ok(())
441}
442
443/// `check_connection_name_duplicate` checks whether the connection name is already used in the target namespace.
444pub async fn check_connection_name_duplicate<C>(
445    pb_connection: &PbConnection,
446    db: &C,
447) -> MetaResult<()>
448where
449    C: ConnectionTrait,
450{
451    let count = Connection::find()
452        .inner_join(Object)
453        .filter(
454            object::Column::DatabaseId
455                .eq(pb_connection.database_id)
456                .and(object::Column::SchemaId.eq(pb_connection.schema_id))
457                .and(connection::Column::Name.eq(&pb_connection.name)),
458        )
459        .count(db)
460        .await?;
461    if count > 0 {
462        assert_eq!(count, 1);
463        return Err(MetaError::catalog_duplicated(
464            "connection",
465            &pb_connection.name,
466        ));
467    }
468    Ok(())
469}
470
471pub async fn check_secret_name_duplicate<C>(pb_secret: &PbSecret, db: &C) -> MetaResult<()>
472where
473    C: ConnectionTrait,
474{
475    let count = Secret::find()
476        .inner_join(Object)
477        .filter(
478            object::Column::DatabaseId
479                .eq(pb_secret.database_id)
480                .and(object::Column::SchemaId.eq(pb_secret.schema_id))
481                .and(secret::Column::Name.eq(&pb_secret.name)),
482        )
483        .count(db)
484        .await?;
485    if count > 0 {
486        assert_eq!(count, 1);
487        return Err(MetaError::catalog_duplicated("secret", &pb_secret.name));
488    }
489    Ok(())
490}
491
492pub async fn check_subscription_name_duplicate<C>(
493    pb_subscription: &PbSubscription,
494    db: &C,
495) -> MetaResult<()>
496where
497    C: ConnectionTrait,
498{
499    let count = Subscription::find()
500        .inner_join(Object)
501        .filter(
502            object::Column::DatabaseId
503                .eq(pb_subscription.database_id)
504                .and(object::Column::SchemaId.eq(pb_subscription.schema_id))
505                .and(subscription::Column::Name.eq(&pb_subscription.name)),
506        )
507        .count(db)
508        .await?;
509    if count > 0 {
510        assert_eq!(count, 1);
511        return Err(MetaError::catalog_duplicated(
512            "subscription",
513            &pb_subscription.name,
514        ));
515    }
516    Ok(())
517}
518
519/// `check_user_name_duplicate` checks whether the user is already existed in the cluster.
520pub async fn check_user_name_duplicate<C>(name: &str, db: &C) -> MetaResult<()>
521where
522    C: ConnectionTrait,
523{
524    let count = User::find()
525        .filter(user::Column::Name.eq(name))
526        .count(db)
527        .await?;
528    if count > 0 {
529        assert_eq!(count, 1);
530        return Err(MetaError::catalog_duplicated("user", name));
531    }
532    Ok(())
533}
534
535/// `check_relation_name_duplicate` checks whether the relation name is already used in the target namespace.
536pub async fn check_relation_name_duplicate<C>(
537    name: &str,
538    database_id: DatabaseId,
539    schema_id: SchemaId,
540    db: &C,
541) -> MetaResult<()>
542where
543    C: ConnectionTrait,
544{
545    macro_rules! check_duplicated {
546        ($obj_type:expr, $entity:ident, $table:ident) => {
547            let object_id = Object::find()
548                .select_only()
549                .column(object::Column::Oid)
550                .inner_join($entity)
551                .filter(
552                    object::Column::DatabaseId
553                        .eq(Some(database_id))
554                        .and(object::Column::SchemaId.eq(Some(schema_id)))
555                        .and($table::Column::Name.eq(name)),
556                )
557                .into_tuple::<ObjectId>()
558                .one(db)
559                .await?;
560            if let Some(oid) = object_id {
561                let check_creation = if $obj_type == ObjectType::View {
562                    false
563                } else if $obj_type == ObjectType::Source {
564                    let source_info = Source::find_by_id(oid.as_source_id())
565                        .select_only()
566                        .column(source::Column::SourceInfo)
567                        .into_tuple::<Option<StreamSourceInfo>>()
568                        .one(db)
569                        .await?
570                        .unwrap();
571                    source_info.map_or(false, |info| info.to_protobuf().is_shared())
572                } else {
573                    true
574                };
575                let job_id = oid.as_job_id();
576                return if check_creation
577                    && !matches!(
578                        StreamingJob::find_by_id(job_id)
579                            .select_only()
580                            .column(streaming_job::Column::JobStatus)
581                            .into_tuple::<JobStatus>()
582                            .one(db)
583                            .await?,
584                        Some(JobStatus::Created)
585                    ) {
586                    Err(MetaError::catalog_under_creation(
587                        $obj_type.as_str(),
588                        name,
589                        job_id,
590                    ))
591                } else {
592                    Err(MetaError::catalog_duplicated($obj_type.as_str(), name))
593                };
594            }
595        };
596    }
597    check_duplicated!(ObjectType::Table, Table, table);
598    check_duplicated!(ObjectType::Source, Source, source);
599    check_duplicated!(ObjectType::Sink, Sink, sink);
600    check_duplicated!(ObjectType::Index, Index, index);
601    check_duplicated!(ObjectType::View, View, view);
602
603    Ok(())
604}
605
606/// `check_schema_name_duplicate` checks whether the schema name is already used in the target database.
607pub async fn check_schema_name_duplicate<C>(
608    name: &str,
609    database_id: DatabaseId,
610    db: &C,
611) -> MetaResult<()>
612where
613    C: ConnectionTrait,
614{
615    let count = Object::find()
616        .inner_join(Schema)
617        .filter(
618            object::Column::ObjType
619                .eq(ObjectType::Schema)
620                .and(object::Column::DatabaseId.eq(Some(database_id)))
621                .and(schema::Column::Name.eq(name)),
622        )
623        .count(db)
624        .await?;
625    if count != 0 {
626        return Err(MetaError::catalog_duplicated("schema", name));
627    }
628
629    Ok(())
630}
631
632/// `check_object_refer_for_drop` checks whether the object is used by other objects except indexes.
633/// It returns an error that contains the details of the referring objects if it is used by others.
634pub async fn check_object_refer_for_drop<C>(
635    object_type: ObjectType,
636    object_id: ObjectId,
637    db: &C,
638) -> MetaResult<()>
639where
640    C: ConnectionTrait,
641{
642    // Ignore indexes.
643    let count = if object_type == ObjectType::Table {
644        ObjectDependency::find()
645            .join(
646                JoinType::InnerJoin,
647                object_dependency::Relation::Object1.def(),
648            )
649            .filter(
650                object_dependency::Column::Oid
651                    .eq(object_id)
652                    .and(object::Column::ObjType.ne(ObjectType::Index)),
653            )
654            .count(db)
655            .await?
656    } else {
657        ObjectDependency::find()
658            .filter(object_dependency::Column::Oid.eq(object_id))
659            .count(db)
660            .await?
661    };
662    if count != 0 {
663        // find the name of all objects that are using the given one.
664        let referring_objects = get_referring_objects(object_id, db).await?;
665        let referring_objs_map = referring_objects
666            .into_iter()
667            .filter(|o| o.obj_type != ObjectType::Index)
668            .into_group_map_by(|o| o.obj_type);
669        let mut details = vec![];
670        for (obj_type, objs) in referring_objs_map {
671            match obj_type {
672                ObjectType::Table => {
673                    let tables: Vec<(String, String)> = Object::find()
674                        .join(JoinType::InnerJoin, object::Relation::Table.def())
675                        .join(JoinType::InnerJoin, object::Relation::Database2.def())
676                        .join(JoinType::InnerJoin, object::Relation::Schema2.def())
677                        .select_only()
678                        .column(schema::Column::Name)
679                        .column(table::Column::Name)
680                        .filter(object::Column::Oid.is_in(objs.iter().map(|o| o.oid)))
681                        .into_tuple()
682                        .all(db)
683                        .await?;
684                    details.extend(tables.into_iter().map(|(schema_name, table_name)| {
685                        format!(
686                            "materialized view {}.{} depends on it",
687                            schema_name, table_name
688                        )
689                    }));
690                }
691                ObjectType::Sink => {
692                    let sinks: Vec<(String, String)> = Object::find()
693                        .join(JoinType::InnerJoin, object::Relation::Sink.def())
694                        .join(JoinType::InnerJoin, object::Relation::Database2.def())
695                        .join(JoinType::InnerJoin, object::Relation::Schema2.def())
696                        .select_only()
697                        .column(schema::Column::Name)
698                        .column(sink::Column::Name)
699                        .filter(object::Column::Oid.is_in(objs.iter().map(|o| o.oid)))
700                        .into_tuple()
701                        .all(db)
702                        .await?;
703                    if object_type == ObjectType::Table {
704                        let engine = Table::find_by_id(object_id.as_table_id())
705                            .select_only()
706                            .column(table::Column::Engine)
707                            .into_tuple::<table::Engine>()
708                            .one(db)
709                            .await?;
710                        if engine == Some(table::Engine::Iceberg) && sinks.len() == 1 {
711                            continue;
712                        }
713                    }
714                    details.extend(sinks.into_iter().map(|(schema_name, sink_name)| {
715                        format!("sink {}.{} depends on it", schema_name, sink_name)
716                    }));
717                }
718                ObjectType::View => {
719                    let views: Vec<(String, String)> = Object::find()
720                        .join(JoinType::InnerJoin, object::Relation::View.def())
721                        .join(JoinType::InnerJoin, object::Relation::Database2.def())
722                        .join(JoinType::InnerJoin, object::Relation::Schema2.def())
723                        .select_only()
724                        .column(schema::Column::Name)
725                        .column(view::Column::Name)
726                        .filter(object::Column::Oid.is_in(objs.iter().map(|o| o.oid)))
727                        .into_tuple()
728                        .all(db)
729                        .await?;
730                    details.extend(views.into_iter().map(|(schema_name, view_name)| {
731                        format!("view {}.{} depends on it", schema_name, view_name)
732                    }));
733                }
734                ObjectType::Subscription => {
735                    let subscriptions: Vec<(String, String)> = Object::find()
736                        .join(JoinType::InnerJoin, object::Relation::Subscription.def())
737                        .join(JoinType::InnerJoin, object::Relation::Database2.def())
738                        .join(JoinType::InnerJoin, object::Relation::Schema2.def())
739                        .select_only()
740                        .column(schema::Column::Name)
741                        .column(subscription::Column::Name)
742                        .filter(object::Column::Oid.is_in(objs.iter().map(|o| o.oid)))
743                        .into_tuple()
744                        .all(db)
745                        .await?;
746                    details.extend(subscriptions.into_iter().map(
747                        |(schema_name, subscription_name)| {
748                            format!(
749                                "subscription {}.{} depends on it",
750                                schema_name, subscription_name
751                            )
752                        },
753                    ));
754                }
755                ObjectType::Source => {
756                    let sources: Vec<(String, String)> = Object::find()
757                        .join(JoinType::InnerJoin, object::Relation::Source.def())
758                        .join(JoinType::InnerJoin, object::Relation::Database2.def())
759                        .join(JoinType::InnerJoin, object::Relation::Schema2.def())
760                        .select_only()
761                        .column(schema::Column::Name)
762                        .column(source::Column::Name)
763                        .filter(object::Column::Oid.is_in(objs.iter().map(|o| o.oid)))
764                        .into_tuple()
765                        .all(db)
766                        .await?;
767                    details.extend(sources.into_iter().map(|(schema_name, view_name)| {
768                        format!("source {}.{} depends on it", schema_name, view_name)
769                    }));
770                }
771                ObjectType::Connection => {
772                    let connections: Vec<(String, String)> = Object::find()
773                        .join(JoinType::InnerJoin, object::Relation::Connection.def())
774                        .join(JoinType::InnerJoin, object::Relation::Database2.def())
775                        .join(JoinType::InnerJoin, object::Relation::Schema2.def())
776                        .select_only()
777                        .column(schema::Column::Name)
778                        .column(connection::Column::Name)
779                        .filter(object::Column::Oid.is_in(objs.iter().map(|o| o.oid)))
780                        .into_tuple()
781                        .all(db)
782                        .await?;
783                    details.extend(connections.into_iter().map(|(schema_name, view_name)| {
784                        format!("connection {}.{} depends on it", schema_name, view_name)
785                    }));
786                }
787                // only the table, source, sink, subscription, view, connection and index will depend on other objects.
788                _ => bail!("unexpected referring object type: {}", obj_type.as_str()),
789            }
790        }
791        if details.is_empty() {
792            return Ok(());
793        }
794
795        return Err(MetaError::permission_denied(format!(
796            "{} used by {} other objects. \nDETAIL: {}\n\
797            {}",
798            object_type.as_str(),
799            details.len(),
800            details.join("\n"),
801            match object_type {
802                ObjectType::Function | ObjectType::Connection | ObjectType::Secret =>
803                    "HINT: DROP the dependent objects first.",
804                ObjectType::Database | ObjectType::Schema => unreachable!(),
805                _ => "HINT:  Use DROP ... CASCADE to drop the dependent objects too.",
806            }
807        )));
808    }
809    Ok(())
810}
811
812/// List all objects that are using the given one.
813pub async fn get_referring_objects<C>(object_id: ObjectId, db: &C) -> MetaResult<Vec<PartialObject>>
814where
815    C: ConnectionTrait,
816{
817    let objs = ObjectDependency::find()
818        .filter(object_dependency::Column::Oid.eq(object_id))
819        .join(
820            JoinType::InnerJoin,
821            object_dependency::Relation::Object1.def(),
822        )
823        .into_partial_model()
824        .all(db)
825        .await?;
826
827    Ok(objs)
828}
829
830/// `ensure_schema_empty` ensures that the schema is empty, used by `DROP SCHEMA`.
831pub async fn ensure_schema_empty<C>(schema_id: SchemaId, db: &C) -> MetaResult<()>
832where
833    C: ConnectionTrait,
834{
835    let count = Object::find()
836        .filter(object::Column::SchemaId.eq(Some(schema_id)))
837        .count(db)
838        .await?;
839    if count != 0 {
840        return Err(MetaError::permission_denied("schema is not empty"));
841    }
842
843    Ok(())
844}
845
846/// `list_user_info_by_ids` lists all users' info by their ids.
847pub async fn list_user_info_by_ids<C>(
848    user_ids: impl IntoIterator<Item = UserId>,
849    db: &C,
850) -> MetaResult<Vec<PbUserInfo>>
851where
852    C: ConnectionTrait,
853{
854    let mut user_infos = vec![];
855    for user_id in user_ids {
856        let user = User::find_by_id(user_id)
857            .one(db)
858            .await?
859            .ok_or_else(|| MetaError::catalog_id_not_found("user", user_id))?;
860        let mut user_info: PbUserInfo = user.into();
861        user_info.grant_privileges = get_user_privilege(user_id, db).await?;
862        user_infos.push(user_info);
863    }
864    Ok(user_infos)
865}
866
867/// `get_object_owner` returns the owner of the given object.
868pub async fn get_object_owner<C>(object_id: ObjectId, db: &C) -> MetaResult<UserId>
869where
870    C: ConnectionTrait,
871{
872    let obj_owner: UserId = Object::find_by_id(object_id)
873        .select_only()
874        .column(object::Column::OwnerId)
875        .into_tuple()
876        .one(db)
877        .await?
878        .ok_or_else(|| MetaError::catalog_id_not_found("object", object_id))?;
879    Ok(obj_owner)
880}
881
882/// `construct_privilege_dependency_query` constructs a query to find all privileges that are dependent on the given one.
883///
884/// # Examples
885///
886/// ```
887/// use risingwave_meta::controller::utils::construct_privilege_dependency_query;
888/// use sea_orm::sea_query::*;
889/// use sea_orm::*;
890///
891/// let query = construct_privilege_dependency_query(vec![1, 2, 3]);
892///
893/// assert_eq!(
894///    query.to_string(MysqlQueryBuilder),
895///   r#"WITH RECURSIVE `granted_privilege_ids` (`id`, `user_id`) AS (SELECT `id`, `user_id` FROM `user_privilege` WHERE `user_privilege`.`id` IN (1, 2, 3) UNION ALL (SELECT `user_privilege`.`id`, `user_privilege`.`user_id` FROM `user_privilege` INNER JOIN `granted_privilege_ids` ON `granted_privilege_ids`.`id` = `dependent_id`)) SELECT `id`, `user_id` FROM `granted_privilege_ids`"#
896/// );
897/// assert_eq!(
898///   query.to_string(PostgresQueryBuilder),
899///  r#"WITH RECURSIVE "granted_privilege_ids" ("id", "user_id") AS (SELECT "id", "user_id" FROM "user_privilege" WHERE "user_privilege"."id" IN (1, 2, 3) UNION ALL (SELECT "user_privilege"."id", "user_privilege"."user_id" FROM "user_privilege" INNER JOIN "granted_privilege_ids" ON "granted_privilege_ids"."id" = "dependent_id")) SELECT "id", "user_id" FROM "granted_privilege_ids""#
900/// );
901/// assert_eq!(
902///  query.to_string(SqliteQueryBuilder),
903///  r#"WITH RECURSIVE "granted_privilege_ids" ("id", "user_id") AS (SELECT "id", "user_id" FROM "user_privilege" WHERE "user_privilege"."id" IN (1, 2, 3) UNION ALL SELECT "user_privilege"."id", "user_privilege"."user_id" FROM "user_privilege" INNER JOIN "granted_privilege_ids" ON "granted_privilege_ids"."id" = "dependent_id") SELECT "id", "user_id" FROM "granted_privilege_ids""#
904/// );
905/// ```
906pub fn construct_privilege_dependency_query(ids: Vec<PrivilegeId>) -> WithQuery {
907    let cte_alias = Alias::new("granted_privilege_ids");
908    let cte_return_privilege_alias = Alias::new("id");
909    let cte_return_user_alias = Alias::new("user_id");
910
911    let mut base_query = SelectStatement::new()
912        .columns([user_privilege::Column::Id, user_privilege::Column::UserId])
913        .from(UserPrivilege)
914        .and_where(user_privilege::Column::Id.is_in(ids))
915        .to_owned();
916
917    let cte_referencing = Query::select()
918        .columns([
919            (UserPrivilege, user_privilege::Column::Id),
920            (UserPrivilege, user_privilege::Column::UserId),
921        ])
922        .from(UserPrivilege)
923        .inner_join(
924            cte_alias.clone(),
925            Expr::col((cte_alias.clone(), cte_return_privilege_alias.clone()))
926                .equals(user_privilege::Column::DependentId),
927        )
928        .to_owned();
929
930    let mut common_table_expr = CommonTableExpression::new();
931    common_table_expr
932        .query(base_query.union(UnionType::All, cte_referencing).to_owned())
933        .columns([
934            cte_return_privilege_alias.clone(),
935            cte_return_user_alias.clone(),
936        ])
937        .table_name(cte_alias.clone());
938
939    SelectStatement::new()
940        .columns([cte_return_privilege_alias, cte_return_user_alias])
941        .from(cte_alias)
942        .to_owned()
943        .with(
944            WithClause::new()
945                .recursive(true)
946                .cte(common_table_expr)
947                .to_owned(),
948        )
949}
950
951pub async fn get_internal_tables_by_id<C>(job_id: JobId, db: &C) -> MetaResult<Vec<TableId>>
952where
953    C: ConnectionTrait,
954{
955    let table_ids: Vec<TableId> = Table::find()
956        .select_only()
957        .column(table::Column::TableId)
958        .filter(
959            table::Column::TableType
960                .eq(TableType::Internal)
961                .and(table::Column::BelongsToJobId.eq(job_id)),
962        )
963        .into_tuple()
964        .all(db)
965        .await?;
966    Ok(table_ids)
967}
968
969pub async fn get_index_state_tables_by_table_id<C>(
970    table_id: TableId,
971    db: &C,
972) -> MetaResult<Vec<TableId>>
973where
974    C: ConnectionTrait,
975{
976    let mut index_table_ids: Vec<TableId> = Index::find()
977        .select_only()
978        .column(index::Column::IndexTableId)
979        .filter(index::Column::PrimaryTableId.eq(table_id))
980        .into_tuple()
981        .all(db)
982        .await?;
983
984    if !index_table_ids.is_empty() {
985        let internal_table_ids: Vec<TableId> = Table::find()
986            .select_only()
987            .column(table::Column::TableId)
988            .filter(
989                table::Column::TableType
990                    .eq(TableType::Internal)
991                    .and(table::Column::BelongsToJobId.is_in(index_table_ids.clone())),
992            )
993            .into_tuple()
994            .all(db)
995            .await?;
996
997        index_table_ids.extend(internal_table_ids.into_iter());
998    }
999
1000    Ok(index_table_ids)
1001}
1002
1003#[derive(Clone, DerivePartialModel, FromQueryResult)]
1004#[sea_orm(entity = "UserPrivilege")]
1005pub struct PartialUserPrivilege {
1006    pub id: PrivilegeId,
1007    pub user_id: UserId,
1008}
1009
1010pub async fn get_referring_privileges_cascade<C>(
1011    ids: Vec<PrivilegeId>,
1012    db: &C,
1013) -> MetaResult<Vec<PartialUserPrivilege>>
1014where
1015    C: ConnectionTrait,
1016{
1017    let query = construct_privilege_dependency_query(ids);
1018    let (sql, values) = query.build_any(&*db.get_database_backend().get_query_builder());
1019    let privileges = PartialUserPrivilege::find_by_statement(Statement::from_sql_and_values(
1020        db.get_database_backend(),
1021        sql,
1022        values,
1023    ))
1024    .all(db)
1025    .await?;
1026
1027    Ok(privileges)
1028}
1029
1030/// `ensure_privileges_not_referred` ensures that the privileges are not granted to any other users.
1031pub async fn ensure_privileges_not_referred<C>(ids: Vec<PrivilegeId>, db: &C) -> MetaResult<()>
1032where
1033    C: ConnectionTrait,
1034{
1035    let count = UserPrivilege::find()
1036        .filter(user_privilege::Column::DependentId.is_in(ids))
1037        .count(db)
1038        .await?;
1039    if count != 0 {
1040        return Err(MetaError::permission_denied(format!(
1041            "privileges granted to {} other ones.",
1042            count
1043        )));
1044    }
1045    Ok(())
1046}
1047
1048/// `get_user_privilege` returns the privileges of the given user.
1049pub async fn get_user_privilege<C>(user_id: UserId, db: &C) -> MetaResult<Vec<PbGrantPrivilege>>
1050where
1051    C: ConnectionTrait,
1052{
1053    let user_privileges = UserPrivilege::find()
1054        .find_also_related(Object)
1055        .filter(user_privilege::Column::UserId.eq(user_id))
1056        .all(db)
1057        .await?;
1058    Ok(user_privileges
1059        .into_iter()
1060        .map(|(privilege, object)| {
1061            let object = object.unwrap();
1062            let oid = object.oid.as_raw_id();
1063            let obj = match object.obj_type {
1064                ObjectType::Database => PbGrantObject::DatabaseId(oid),
1065                ObjectType::Schema => PbGrantObject::SchemaId(oid),
1066                ObjectType::Table | ObjectType::Index => PbGrantObject::TableId(oid),
1067                ObjectType::Source => PbGrantObject::SourceId(oid),
1068                ObjectType::Sink => PbGrantObject::SinkId(oid),
1069                ObjectType::View => PbGrantObject::ViewId(oid),
1070                ObjectType::Function => PbGrantObject::FunctionId(oid),
1071                ObjectType::Connection => PbGrantObject::ConnectionId(oid),
1072                ObjectType::Subscription => PbGrantObject::SubscriptionId(oid),
1073                ObjectType::Secret => PbGrantObject::SecretId(oid),
1074            };
1075            PbGrantPrivilege {
1076                action_with_opts: vec![PbActionWithGrantOption {
1077                    action: PbAction::from(privilege.action) as _,
1078                    with_grant_option: privilege.with_grant_option,
1079                    granted_by: privilege.granted_by as _,
1080                }],
1081                object: Some(obj),
1082            }
1083        })
1084        .collect())
1085}
1086
1087pub async fn get_table_columns(
1088    txn: &impl ConnectionTrait,
1089    id: TableId,
1090) -> MetaResult<ColumnCatalogArray> {
1091    let columns = Table::find_by_id(id)
1092        .select_only()
1093        .columns([table::Column::Columns])
1094        .into_tuple::<ColumnCatalogArray>()
1095        .one(txn)
1096        .await?
1097        .ok_or_else(|| MetaError::catalog_id_not_found("table", id))?;
1098    Ok(columns)
1099}
1100
1101/// `grant_default_privileges_automatically` grants default privileges automatically
1102/// for the given new object. It returns the list of user infos whose privileges are updated.
1103pub async fn grant_default_privileges_automatically<C>(
1104    db: &C,
1105    object_id: impl Into<ObjectId>,
1106) -> MetaResult<Vec<PbUserInfo>>
1107where
1108    C: ConnectionTrait,
1109{
1110    let object_id = object_id.into();
1111    let object = Object::find_by_id(object_id)
1112        .one(db)
1113        .await?
1114        .ok_or_else(|| MetaError::catalog_id_not_found("object", object_id))?;
1115    assert_ne!(object.obj_type, ObjectType::Database);
1116
1117    let for_mview_filter = if object.obj_type == ObjectType::Table {
1118        let table_type = Table::find_by_id(object_id.as_table_id())
1119            .select_only()
1120            .column(table::Column::TableType)
1121            .into_tuple::<TableType>()
1122            .one(db)
1123            .await?
1124            .ok_or_else(|| MetaError::catalog_id_not_found("table", object_id))?;
1125        user_default_privilege::Column::ForMaterializedView
1126            .eq(table_type == TableType::MaterializedView)
1127    } else {
1128        user_default_privilege::Column::ForMaterializedView.eq(false)
1129    };
1130    let schema_filter = if let Some(schema_id) = &object.schema_id {
1131        user_default_privilege::Column::SchemaId.eq(*schema_id)
1132    } else {
1133        user_default_privilege::Column::SchemaId.is_null()
1134    };
1135
1136    let default_privileges: Vec<(UserId, UserId, Action, bool)> = UserDefaultPrivilege::find()
1137        .select_only()
1138        .columns([
1139            user_default_privilege::Column::Grantee,
1140            user_default_privilege::Column::GrantedBy,
1141            user_default_privilege::Column::Action,
1142            user_default_privilege::Column::WithGrantOption,
1143        ])
1144        .filter(
1145            user_default_privilege::Column::DatabaseId
1146                .eq(object.database_id.unwrap())
1147                .and(schema_filter)
1148                .and(user_default_privilege::Column::UserId.eq(object.owner_id))
1149                .and(user_default_privilege::Column::ObjectType.eq(object.obj_type))
1150                .and(for_mview_filter),
1151        )
1152        .into_tuple()
1153        .all(db)
1154        .await?;
1155    if default_privileges.is_empty() {
1156        return Ok(vec![]);
1157    }
1158
1159    let updated_user_ids = default_privileges
1160        .iter()
1161        .map(|(grantee, _, _, _)| *grantee)
1162        .collect::<HashSet<_>>();
1163
1164    let internal_table_ids = get_internal_tables_by_id(object_id.as_job_id(), db).await?;
1165
1166    for (grantee, granted_by, action, with_grant_option) in default_privileges {
1167        UserPrivilege::insert(user_privilege::ActiveModel {
1168            user_id: Set(grantee),
1169            oid: Set(object_id),
1170            granted_by: Set(granted_by),
1171            action: Set(action),
1172            with_grant_option: Set(with_grant_option),
1173            ..Default::default()
1174        })
1175        .exec(db)
1176        .await?;
1177        if action == Action::Select && !internal_table_ids.is_empty() {
1178            // Grant SELECT privilege for internal tables if the action is SELECT.
1179            for internal_table_id in &internal_table_ids {
1180                UserPrivilege::insert(user_privilege::ActiveModel {
1181                    user_id: Set(grantee),
1182                    oid: Set(internal_table_id.as_object_id()),
1183                    granted_by: Set(granted_by),
1184                    action: Set(Action::Select),
1185                    with_grant_option: Set(with_grant_option),
1186                    ..Default::default()
1187                })
1188                .exec(db)
1189                .await?;
1190            }
1191        }
1192    }
1193
1194    let updated_user_infos = list_user_info_by_ids(updated_user_ids, db).await?;
1195    Ok(updated_user_infos)
1196}
1197
1198// todo: remove it after migrated to sql backend.
1199pub fn extract_grant_obj_id(object: &PbGrantObject) -> ObjectId {
1200    match object {
1201        PbGrantObject::DatabaseId(id)
1202        | PbGrantObject::SchemaId(id)
1203        | PbGrantObject::TableId(id)
1204        | PbGrantObject::SourceId(id)
1205        | PbGrantObject::SinkId(id)
1206        | PbGrantObject::ViewId(id)
1207        | PbGrantObject::FunctionId(id)
1208        | PbGrantObject::SubscriptionId(id)
1209        | PbGrantObject::ConnectionId(id)
1210        | PbGrantObject::SecretId(id) => (*id).into(),
1211    }
1212}
1213
1214pub async fn insert_fragment_relations(
1215    db: &impl ConnectionTrait,
1216    downstream_fragment_relations: &FragmentDownstreamRelation,
1217) -> MetaResult<()> {
1218    for (upstream_fragment_id, downstreams) in downstream_fragment_relations {
1219        for downstream in downstreams {
1220            let relation = fragment_relation::Model {
1221                source_fragment_id: *upstream_fragment_id as _,
1222                target_fragment_id: downstream.downstream_fragment_id as _,
1223                dispatcher_type: downstream.dispatcher_type,
1224                dist_key_indices: downstream
1225                    .dist_key_indices
1226                    .iter()
1227                    .map(|idx| *idx as i32)
1228                    .collect_vec()
1229                    .into(),
1230                output_indices: downstream
1231                    .output_mapping
1232                    .indices
1233                    .iter()
1234                    .map(|idx| *idx as i32)
1235                    .collect_vec()
1236                    .into(),
1237                output_type_mapping: Some(downstream.output_mapping.types.clone().into()),
1238            };
1239            FragmentRelation::insert(relation.into_active_model())
1240                .exec(db)
1241                .await?;
1242        }
1243    }
1244    Ok(())
1245}
1246
1247pub fn compose_dispatchers(
1248    source_fragment_distribution: DistributionType,
1249    source_fragment_actors: &HashMap<crate::model::ActorId, Option<Bitmap>>,
1250    target_fragment_id: crate::model::FragmentId,
1251    target_fragment_distribution: DistributionType,
1252    target_fragment_actors: &HashMap<crate::model::ActorId, Option<Bitmap>>,
1253    dispatcher_type: DispatcherType,
1254    dist_key_indices: Vec<u32>,
1255    output_mapping: PbDispatchOutputMapping,
1256) -> HashMap<crate::model::ActorId, PbDispatcher> {
1257    match dispatcher_type {
1258        DispatcherType::Hash => {
1259            let dispatcher = PbDispatcher {
1260                r#type: PbDispatcherType::from(dispatcher_type) as _,
1261                dist_key_indices,
1262                output_mapping: output_mapping.into(),
1263                hash_mapping: Some(
1264                    ActorMapping::from_bitmaps(
1265                        &target_fragment_actors
1266                            .iter()
1267                            .map(|(actor_id, bitmap)| {
1268                                (
1269                                    *actor_id as _,
1270                                    bitmap
1271                                        .clone()
1272                                        .expect("downstream hash dispatch must have distribution"),
1273                                )
1274                            })
1275                            .collect(),
1276                    )
1277                    .to_protobuf(),
1278                ),
1279                dispatcher_id: target_fragment_id.as_raw_id() as _,
1280                downstream_actor_id: target_fragment_actors.keys().copied().collect(),
1281            };
1282            source_fragment_actors
1283                .keys()
1284                .map(|source_actor_id| (*source_actor_id, dispatcher.clone()))
1285                .collect()
1286        }
1287        DispatcherType::Broadcast | DispatcherType::Simple => {
1288            let dispatcher = PbDispatcher {
1289                r#type: PbDispatcherType::from(dispatcher_type) as _,
1290                dist_key_indices,
1291                output_mapping: output_mapping.into(),
1292                hash_mapping: None,
1293                dispatcher_id: target_fragment_id.as_raw_id() as _,
1294                downstream_actor_id: target_fragment_actors.keys().copied().collect(),
1295            };
1296            source_fragment_actors
1297                .keys()
1298                .map(|source_actor_id| (*source_actor_id, dispatcher.clone()))
1299                .collect()
1300        }
1301        DispatcherType::NoShuffle => resolve_no_shuffle_actor_dispatcher(
1302            source_fragment_distribution,
1303            source_fragment_actors,
1304            target_fragment_distribution,
1305            target_fragment_actors,
1306        )
1307        .into_iter()
1308        .map(|(upstream_actor_id, downstream_actor_id)| {
1309            (
1310                upstream_actor_id,
1311                PbDispatcher {
1312                    r#type: PbDispatcherType::NoShuffle as _,
1313                    dist_key_indices: dist_key_indices.clone(),
1314                    output_mapping: output_mapping.clone().into(),
1315                    hash_mapping: None,
1316                    dispatcher_id: target_fragment_id.as_raw_id() as _,
1317                    downstream_actor_id: vec![downstream_actor_id],
1318                },
1319            )
1320        })
1321        .collect(),
1322    }
1323}
1324
1325/// return (`upstream_actor_id` -> `downstream_actor_id`)
1326pub fn resolve_no_shuffle_actor_dispatcher(
1327    source_fragment_distribution: DistributionType,
1328    source_fragment_actors: &HashMap<crate::model::ActorId, Option<Bitmap>>,
1329    target_fragment_distribution: DistributionType,
1330    target_fragment_actors: &HashMap<crate::model::ActorId, Option<Bitmap>>,
1331) -> Vec<(crate::model::ActorId, crate::model::ActorId)> {
1332    assert_eq!(source_fragment_distribution, target_fragment_distribution);
1333    assert_eq!(
1334        source_fragment_actors.len(),
1335        target_fragment_actors.len(),
1336        "no-shuffle should have equal upstream downstream actor count: {:?} {:?}",
1337        source_fragment_actors,
1338        target_fragment_actors
1339    );
1340    match source_fragment_distribution {
1341        DistributionType::Single => {
1342            let assert_singleton = |bitmap: &Option<Bitmap>| {
1343                assert!(
1344                    bitmap.as_ref().map(|bitmap| bitmap.all()).unwrap_or(true),
1345                    "not singleton: {:?}",
1346                    bitmap
1347                );
1348            };
1349            assert_eq!(
1350                source_fragment_actors.len(),
1351                1,
1352                "singleton distribution actor count not 1: {:?}",
1353                source_fragment_distribution
1354            );
1355            assert_eq!(
1356                target_fragment_actors.len(),
1357                1,
1358                "singleton distribution actor count not 1: {:?}",
1359                target_fragment_distribution
1360            );
1361            let (source_actor_id, bitmap) = source_fragment_actors.iter().next().unwrap();
1362            assert_singleton(bitmap);
1363            let (target_actor_id, bitmap) = target_fragment_actors.iter().next().unwrap();
1364            assert_singleton(bitmap);
1365            vec![(*source_actor_id, *target_actor_id)]
1366        }
1367        DistributionType::Hash => {
1368            let mut target_fragment_actor_index: HashMap<_, _> = target_fragment_actors
1369                .iter()
1370                .map(|(actor_id, bitmap)| {
1371                    let bitmap = bitmap
1372                        .as_ref()
1373                        .expect("hash distribution should have bitmap");
1374                    let first_vnode = bitmap.iter_vnodes().next().expect("non-empty bitmap");
1375                    (first_vnode, (*actor_id, bitmap))
1376                })
1377                .collect();
1378            source_fragment_actors
1379                .iter()
1380                .map(|(source_actor_id, bitmap)| {
1381                    let bitmap = bitmap
1382                        .as_ref()
1383                        .expect("hash distribution should have bitmap");
1384                    let first_vnode = bitmap.iter_vnodes().next().expect("non-empty bitmap");
1385                    let (target_actor_id, target_bitmap) =
1386                        target_fragment_actor_index.remove(&first_vnode).unwrap_or_else(|| {
1387                            panic!(
1388                                "cannot find matched target actor: {} {:?} {:?} {:?}",
1389                                source_actor_id,
1390                                first_vnode,
1391                                source_fragment_actors,
1392                                target_fragment_actors
1393                            );
1394                        });
1395                    assert_eq!(
1396                        bitmap,
1397                        target_bitmap,
1398                        "cannot find matched target actor due to bitmap mismatch: {} {:?} {:?} {:?}",
1399                        source_actor_id,
1400                        first_vnode,
1401                        source_fragment_actors,
1402                        target_fragment_actors
1403                    );
1404                    (*source_actor_id, target_actor_id)
1405                }).collect()
1406        }
1407    }
1408}
1409
1410pub fn rebuild_fragment_mapping(fragment: &SharedFragmentInfo) -> PbFragmentWorkerSlotMapping {
1411    let fragment_worker_slot_mapping = match fragment.distribution_type {
1412        DistributionType::Single => {
1413            let actor = fragment.actors.values().exactly_one().unwrap();
1414            WorkerSlotMapping::new_single(WorkerSlotId::new(actor.worker_id as _, 0))
1415        }
1416        DistributionType::Hash => {
1417            let actor_bitmaps: HashMap<_, _> = fragment
1418                .actors
1419                .iter()
1420                .map(|(actor_id, actor_info)| {
1421                    let vnode_bitmap = actor_info
1422                        .vnode_bitmap
1423                        .as_ref()
1424                        .cloned()
1425                        .expect("actor bitmap shouldn't be none in hash fragment");
1426
1427                    (*actor_id as hash::ActorId, vnode_bitmap)
1428                })
1429                .collect();
1430
1431            let actor_mapping = ActorMapping::from_bitmaps(&actor_bitmaps);
1432
1433            let actor_locations = fragment
1434                .actors
1435                .iter()
1436                .map(|(actor_id, actor_info)| (*actor_id as hash::ActorId, actor_info.worker_id))
1437                .collect();
1438
1439            actor_mapping.to_worker_slot(&actor_locations)
1440        }
1441    };
1442
1443    PbFragmentWorkerSlotMapping {
1444        fragment_id: fragment.fragment_id,
1445        mapping: Some(fragment_worker_slot_mapping.to_protobuf()),
1446    }
1447}
1448
1449/// For the given streaming jobs, returns
1450/// - All source fragments
1451/// - All sink fragments
1452/// - All actors
1453/// - All fragments
1454pub async fn get_fragments_for_jobs<C>(
1455    db: &C,
1456    actor_info: &SharedActorInfos,
1457    streaming_jobs: Vec<JobId>,
1458) -> MetaResult<(
1459    HashMap<SourceId, BTreeSet<FragmentId>>,
1460    HashSet<FragmentId>,
1461    HashSet<ActorId>,
1462    HashSet<FragmentId>,
1463)>
1464where
1465    C: ConnectionTrait,
1466{
1467    if streaming_jobs.is_empty() {
1468        return Ok((
1469            HashMap::default(),
1470            HashSet::default(),
1471            HashSet::default(),
1472            HashSet::default(),
1473        ));
1474    }
1475
1476    let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find()
1477        .select_only()
1478        .columns([
1479            fragment::Column::FragmentId,
1480            fragment::Column::FragmentTypeMask,
1481            fragment::Column::StreamNode,
1482        ])
1483        .filter(fragment::Column::JobId.is_in(streaming_jobs))
1484        .into_tuple()
1485        .all(db)
1486        .await?;
1487
1488    let fragment_ids: HashSet<_> = fragments
1489        .iter()
1490        .map(|(fragment_id, _, _)| *fragment_id)
1491        .collect();
1492
1493    let actors = {
1494        let guard = actor_info.read_guard();
1495        fragment_ids
1496            .iter()
1497            .flat_map(|id| guard.get_fragment(*id as _))
1498            .flat_map(|f| f.actors.keys().cloned().map(|id| id as _))
1499            .collect::<HashSet<_>>()
1500    };
1501
1502    let mut source_fragment_ids: HashMap<SourceId, BTreeSet<FragmentId>> = HashMap::new();
1503    let mut sink_fragment_ids: HashSet<FragmentId> = HashSet::new();
1504    for (fragment_id, mask, stream_node) in fragments {
1505        if FragmentTypeMask::from(mask).contains(FragmentTypeFlag::Source)
1506            && let Some(source_id) = stream_node.to_protobuf().find_stream_source()
1507        {
1508            source_fragment_ids
1509                .entry(source_id)
1510                .or_default()
1511                .insert(fragment_id);
1512        }
1513        if FragmentTypeMask::from(mask).contains(FragmentTypeFlag::Sink) {
1514            sink_fragment_ids.insert(fragment_id);
1515        }
1516    }
1517
1518    Ok((
1519        source_fragment_ids,
1520        sink_fragment_ids,
1521        actors.into_iter().collect(),
1522        fragment_ids,
1523    ))
1524}
1525
1526/// Build a object group for notifying the deletion of the given objects.
1527///
1528/// Note that only id fields are filled in the object info, as the arguments are partial objects.
1529/// As a result, the returned notification info should only be used for deletion.
1530pub(crate) fn build_object_group_for_delete(
1531    partial_objects: Vec<PartialObject>,
1532) -> NotificationInfo {
1533    let mut objects = vec![];
1534    for obj in partial_objects {
1535        match obj.obj_type {
1536            ObjectType::Database => objects.push(PbObject {
1537                object_info: Some(PbObjectInfo::Database(PbDatabase {
1538                    id: obj.oid.as_database_id(),
1539                    ..Default::default()
1540                })),
1541            }),
1542            ObjectType::Schema => objects.push(PbObject {
1543                object_info: Some(PbObjectInfo::Schema(PbSchema {
1544                    id: obj.oid.as_schema_id(),
1545                    database_id: obj.database_id.unwrap(),
1546                    ..Default::default()
1547                })),
1548            }),
1549            ObjectType::Table => objects.push(PbObject {
1550                object_info: Some(PbObjectInfo::Table(PbTable {
1551                    id: obj.oid.as_table_id(),
1552                    schema_id: obj.schema_id.unwrap(),
1553                    database_id: obj.database_id.unwrap(),
1554                    ..Default::default()
1555                })),
1556            }),
1557            ObjectType::Source => objects.push(PbObject {
1558                object_info: Some(PbObjectInfo::Source(PbSource {
1559                    id: obj.oid.as_source_id(),
1560                    schema_id: obj.schema_id.unwrap(),
1561                    database_id: obj.database_id.unwrap(),
1562                    ..Default::default()
1563                })),
1564            }),
1565            ObjectType::Sink => objects.push(PbObject {
1566                object_info: Some(PbObjectInfo::Sink(PbSink {
1567                    id: obj.oid.as_sink_id(),
1568                    schema_id: obj.schema_id.unwrap(),
1569                    database_id: obj.database_id.unwrap(),
1570                    ..Default::default()
1571                })),
1572            }),
1573            ObjectType::Subscription => objects.push(PbObject {
1574                object_info: Some(PbObjectInfo::Subscription(PbSubscription {
1575                    id: obj.oid.as_subscription_id(),
1576                    schema_id: obj.schema_id.unwrap(),
1577                    database_id: obj.database_id.unwrap(),
1578                    ..Default::default()
1579                })),
1580            }),
1581            ObjectType::View => objects.push(PbObject {
1582                object_info: Some(PbObjectInfo::View(PbView {
1583                    id: obj.oid.as_view_id(),
1584                    schema_id: obj.schema_id.unwrap(),
1585                    database_id: obj.database_id.unwrap(),
1586                    ..Default::default()
1587                })),
1588            }),
1589            ObjectType::Index => {
1590                objects.push(PbObject {
1591                    object_info: Some(PbObjectInfo::Index(PbIndex {
1592                        id: obj.oid.as_index_id(),
1593                        schema_id: obj.schema_id.unwrap(),
1594                        database_id: obj.database_id.unwrap(),
1595                        ..Default::default()
1596                    })),
1597                });
1598                objects.push(PbObject {
1599                    object_info: Some(PbObjectInfo::Table(PbTable {
1600                        id: obj.oid.as_table_id(),
1601                        schema_id: obj.schema_id.unwrap(),
1602                        database_id: obj.database_id.unwrap(),
1603                        ..Default::default()
1604                    })),
1605                });
1606            }
1607            ObjectType::Function => objects.push(PbObject {
1608                object_info: Some(PbObjectInfo::Function(PbFunction {
1609                    id: obj.oid.as_function_id(),
1610                    schema_id: obj.schema_id.unwrap(),
1611                    database_id: obj.database_id.unwrap(),
1612                    ..Default::default()
1613                })),
1614            }),
1615            ObjectType::Connection => objects.push(PbObject {
1616                object_info: Some(PbObjectInfo::Connection(PbConnection {
1617                    id: obj.oid.as_connection_id(),
1618                    schema_id: obj.schema_id.unwrap(),
1619                    database_id: obj.database_id.unwrap(),
1620                    ..Default::default()
1621                })),
1622            }),
1623            ObjectType::Secret => objects.push(PbObject {
1624                object_info: Some(PbObjectInfo::Secret(PbSecret {
1625                    id: obj.oid.as_secret_id(),
1626                    schema_id: obj.schema_id.unwrap(),
1627                    database_id: obj.database_id.unwrap(),
1628                    ..Default::default()
1629                })),
1630            }),
1631        }
1632    }
1633    NotificationInfo::ObjectGroup(PbObjectGroup { objects })
1634}
1635
1636pub fn extract_external_table_name_from_definition(table_definition: &str) -> Option<String> {
1637    let [mut definition]: [_; 1] = Parser::parse_sql(table_definition)
1638        .context("unable to parse table definition")
1639        .inspect_err(|e| {
1640            tracing::error!(
1641                target: "auto_schema_change",
1642                error = %e.as_report(),
1643                "failed to parse table definition")
1644        })
1645        .unwrap()
1646        .try_into()
1647        .unwrap();
1648    if let SqlStatement::CreateTable { cdc_table_info, .. } = &mut definition {
1649        cdc_table_info
1650            .clone()
1651            .map(|cdc_table_info| cdc_table_info.external_table_name)
1652    } else {
1653        None
1654    }
1655}
1656
1657/// `rename_relation` renames the target relation and its definition,
1658/// it commits the changes to the transaction and returns the updated relations and the old name.
1659pub async fn rename_relation(
1660    txn: &DatabaseTransaction,
1661    object_type: ObjectType,
1662    object_id: ObjectId,
1663    object_name: &str,
1664) -> MetaResult<(Vec<PbObject>, String)> {
1665    use sea_orm::ActiveModelTrait;
1666
1667    use crate::controller::rename::alter_relation_rename;
1668
1669    let mut to_update_relations = vec![];
1670    // rename relation.
1671    macro_rules! rename_relation {
1672        ($entity:ident, $table:ident, $identity:ident, $object_id:expr) => {{
1673            let (mut relation, obj) = $entity::find_by_id($object_id)
1674                .find_also_related(Object)
1675                .one(txn)
1676                .await?
1677                .unwrap();
1678            let obj = obj.unwrap();
1679            let old_name = relation.name.clone();
1680            relation.name = object_name.into();
1681            if obj.obj_type != ObjectType::View {
1682                relation.definition = alter_relation_rename(&relation.definition, object_name);
1683            }
1684            let active_model = $table::ActiveModel {
1685                $identity: Set(relation.$identity),
1686                name: Set(object_name.into()),
1687                definition: Set(relation.definition.clone()),
1688                ..Default::default()
1689            };
1690            active_model.update(txn).await?;
1691            to_update_relations.push(PbObject {
1692                object_info: Some(PbObjectInfo::$entity(ObjectModel(relation, obj).into())),
1693            });
1694            old_name
1695        }};
1696    }
1697    // TODO: check is there any thing to change for shared source?
1698    let old_name = match object_type {
1699        ObjectType::Table => {
1700            let associated_source_id: Option<SourceId> = Source::find()
1701                .select_only()
1702                .column(source::Column::SourceId)
1703                .filter(source::Column::OptionalAssociatedTableId.eq(object_id))
1704                .into_tuple()
1705                .one(txn)
1706                .await?;
1707            if let Some(source_id) = associated_source_id {
1708                rename_relation!(Source, source, source_id, source_id);
1709            }
1710            rename_relation!(Table, table, table_id, object_id.as_table_id())
1711        }
1712        ObjectType::Source => {
1713            rename_relation!(Source, source, source_id, object_id.as_source_id())
1714        }
1715        ObjectType::Sink => rename_relation!(Sink, sink, sink_id, object_id.as_sink_id()),
1716        ObjectType::Subscription => {
1717            rename_relation!(
1718                Subscription,
1719                subscription,
1720                subscription_id,
1721                object_id.as_subscription_id()
1722            )
1723        }
1724        ObjectType::View => rename_relation!(View, view, view_id, object_id.as_view_id()),
1725        ObjectType::Index => {
1726            let (mut index, obj) = Index::find_by_id(object_id.as_index_id())
1727                .find_also_related(Object)
1728                .one(txn)
1729                .await?
1730                .unwrap();
1731            index.name = object_name.into();
1732            let index_table_id = index.index_table_id;
1733            let old_name = rename_relation!(Table, table, table_id, index_table_id);
1734
1735            // the name of index and its associated table is the same.
1736            let active_model = index::ActiveModel {
1737                index_id: sea_orm::ActiveValue::Set(index.index_id),
1738                name: sea_orm::ActiveValue::Set(object_name.into()),
1739                ..Default::default()
1740            };
1741            active_model.update(txn).await?;
1742            to_update_relations.push(PbObject {
1743                object_info: Some(PbObjectInfo::Index(ObjectModel(index, obj.unwrap()).into())),
1744            });
1745            old_name
1746        }
1747        _ => unreachable!("only relation name can be altered."),
1748    };
1749
1750    Ok((to_update_relations, old_name))
1751}
1752
1753pub async fn get_database_resource_group<C>(txn: &C, database_id: DatabaseId) -> MetaResult<String>
1754where
1755    C: ConnectionTrait,
1756{
1757    let database_resource_group: Option<String> = Database::find_by_id(database_id)
1758        .select_only()
1759        .column(database::Column::ResourceGroup)
1760        .into_tuple()
1761        .one(txn)
1762        .await?
1763        .ok_or_else(|| MetaError::catalog_id_not_found("database", database_id))?;
1764
1765    Ok(database_resource_group.unwrap_or_else(|| DEFAULT_RESOURCE_GROUP.to_owned()))
1766}
1767
1768pub async fn get_existing_job_resource_group<C>(
1769    txn: &C,
1770    streaming_job_id: JobId,
1771) -> MetaResult<String>
1772where
1773    C: ConnectionTrait,
1774{
1775    let (job_specific_resource_group, database_resource_group): (Option<String>, Option<String>) =
1776        StreamingJob::find_by_id(streaming_job_id)
1777            .select_only()
1778            .join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
1779            .join(JoinType::InnerJoin, object::Relation::Database2.def())
1780            .column(streaming_job::Column::SpecificResourceGroup)
1781            .column(database::Column::ResourceGroup)
1782            .into_tuple()
1783            .one(txn)
1784            .await?
1785            .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", streaming_job_id))?;
1786
1787    Ok(job_specific_resource_group.unwrap_or_else(|| {
1788        database_resource_group.unwrap_or_else(|| DEFAULT_RESOURCE_GROUP.to_owned())
1789    }))
1790}
1791
1792pub fn filter_workers_by_resource_group(
1793    workers: &HashMap<WorkerId, WorkerNode>,
1794    resource_group: &str,
1795) -> BTreeSet<WorkerId> {
1796    workers
1797        .iter()
1798        .filter(|&(_, worker)| {
1799            worker
1800                .resource_group()
1801                .map(|node_label| node_label.as_str() == resource_group)
1802                .unwrap_or(false)
1803        })
1804        .map(|(id, _)| *id)
1805        .collect()
1806}
1807
1808/// `rename_relation_refer` updates the definition of relations that refer to the target one,
1809/// it commits the changes to the transaction and returns all the updated relations.
1810pub async fn rename_relation_refer(
1811    txn: &DatabaseTransaction,
1812    object_type: ObjectType,
1813    object_id: ObjectId,
1814    object_name: &str,
1815    old_name: &str,
1816) -> MetaResult<Vec<PbObject>> {
1817    use sea_orm::ActiveModelTrait;
1818
1819    use crate::controller::rename::alter_relation_rename_refs;
1820
1821    let mut to_update_relations = vec![];
1822    macro_rules! rename_relation_ref {
1823        ($entity:ident, $table:ident, $identity:ident, $object_id:expr) => {{
1824            let (mut relation, obj) = $entity::find_by_id($object_id)
1825                .find_also_related(Object)
1826                .one(txn)
1827                .await?
1828                .unwrap();
1829            relation.definition =
1830                alter_relation_rename_refs(&relation.definition, old_name, object_name);
1831            let active_model = $table::ActiveModel {
1832                $identity: Set(relation.$identity),
1833                definition: Set(relation.definition.clone()),
1834                ..Default::default()
1835            };
1836            active_model.update(txn).await?;
1837            to_update_relations.push(PbObject {
1838                object_info: Some(PbObjectInfo::$entity(
1839                    ObjectModel(relation, obj.unwrap()).into(),
1840                )),
1841            });
1842        }};
1843    }
1844    let mut objs = get_referring_objects(object_id, txn).await?;
1845    if object_type == ObjectType::Table {
1846        let incoming_sinks: Vec<SinkId> = Sink::find()
1847            .select_only()
1848            .column(sink::Column::SinkId)
1849            .filter(sink::Column::TargetTable.eq(object_id))
1850            .into_tuple()
1851            .all(txn)
1852            .await?;
1853
1854        objs.extend(incoming_sinks.into_iter().map(|id| PartialObject {
1855            oid: id.as_object_id(),
1856            obj_type: ObjectType::Sink,
1857            schema_id: None,
1858            database_id: None,
1859        }));
1860    }
1861
1862    for obj in objs {
1863        match obj.obj_type {
1864            ObjectType::Table => {
1865                rename_relation_ref!(Table, table, table_id, obj.oid.as_table_id())
1866            }
1867            ObjectType::Sink => {
1868                rename_relation_ref!(Sink, sink, sink_id, obj.oid.as_sink_id())
1869            }
1870            ObjectType::Subscription => {
1871                rename_relation_ref!(
1872                    Subscription,
1873                    subscription,
1874                    subscription_id,
1875                    obj.oid.as_subscription_id()
1876                )
1877            }
1878            ObjectType::View => rename_relation_ref!(View, view, view_id, obj.oid.as_view_id()),
1879            ObjectType::Index => {
1880                let index_table_id: Option<TableId> = Index::find_by_id(obj.oid.as_index_id())
1881                    .select_only()
1882                    .column(index::Column::IndexTableId)
1883                    .into_tuple()
1884                    .one(txn)
1885                    .await?;
1886                rename_relation_ref!(Table, table, table_id, index_table_id.unwrap());
1887            }
1888            _ => {
1889                bail!(
1890                    "only the table, sink, subscription, view and index will depend on other objects."
1891                )
1892            }
1893        }
1894    }
1895
1896    Ok(to_update_relations)
1897}
1898
1899/// Validate that subscription can be safely deleted, meeting any of the following conditions:
1900/// 1. The upstream table is not referred to by any cross-db mv.
1901/// 2. After deleting the subscription, the upstream table still has at least one subscription.
1902pub async fn validate_subscription_deletion<C>(
1903    txn: &C,
1904    subscription_id: SubscriptionId,
1905) -> MetaResult<()>
1906where
1907    C: ConnectionTrait,
1908{
1909    let upstream_table_id: ObjectId = Subscription::find_by_id(subscription_id)
1910        .select_only()
1911        .column(subscription::Column::DependentTableId)
1912        .into_tuple()
1913        .one(txn)
1914        .await?
1915        .ok_or_else(|| MetaError::catalog_id_not_found("subscription", subscription_id))?;
1916
1917    let cnt = Subscription::find()
1918        .filter(subscription::Column::DependentTableId.eq(upstream_table_id))
1919        .count(txn)
1920        .await?;
1921    if cnt > 1 {
1922        // Ensure that at least one subscription is remained for the upstream table
1923        // once the subscription is dropped.
1924        return Ok(());
1925    }
1926
1927    // Ensure that the upstream table is not referred by any cross-db mv.
1928    let obj_alias = Alias::new("o1");
1929    let used_by_alias = Alias::new("o2");
1930    let count = ObjectDependency::find()
1931        .join_as(
1932            JoinType::InnerJoin,
1933            object_dependency::Relation::Object2.def(),
1934            obj_alias.clone(),
1935        )
1936        .join_as(
1937            JoinType::InnerJoin,
1938            object_dependency::Relation::Object1.def(),
1939            used_by_alias.clone(),
1940        )
1941        .filter(
1942            object_dependency::Column::Oid
1943                .eq(upstream_table_id)
1944                .and(object_dependency::Column::UsedBy.ne(subscription_id))
1945                .and(
1946                    Expr::col((obj_alias, object::Column::DatabaseId))
1947                        .ne(Expr::col((used_by_alias, object::Column::DatabaseId))),
1948                ),
1949        )
1950        .count(txn)
1951        .await?;
1952
1953    if count != 0 {
1954        return Err(MetaError::permission_denied(format!(
1955            "Referenced by {} cross-db objects.",
1956            count
1957        )));
1958    }
1959
1960    Ok(())
1961}
1962
1963pub async fn fetch_target_fragments<C>(
1964    txn: &C,
1965    src_fragment_id: impl IntoIterator<Item = FragmentId>,
1966) -> MetaResult<HashMap<FragmentId, Vec<FragmentId>>>
1967where
1968    C: ConnectionTrait,
1969{
1970    let source_target_fragments: Vec<(FragmentId, FragmentId)> = FragmentRelation::find()
1971        .select_only()
1972        .columns([
1973            fragment_relation::Column::SourceFragmentId,
1974            fragment_relation::Column::TargetFragmentId,
1975        ])
1976        .filter(fragment_relation::Column::SourceFragmentId.is_in(src_fragment_id))
1977        .into_tuple()
1978        .all(txn)
1979        .await?;
1980
1981    let source_target_fragments = source_target_fragments.into_iter().into_group_map();
1982
1983    Ok(source_target_fragments)
1984}
1985
1986pub async fn get_sink_fragment_by_ids<C>(
1987    txn: &C,
1988    sink_ids: Vec<SinkId>,
1989) -> MetaResult<HashMap<SinkId, FragmentId>>
1990where
1991    C: ConnectionTrait,
1992{
1993    let sink_num = sink_ids.len();
1994    let sink_fragment_ids: Vec<(SinkId, FragmentId)> = Fragment::find()
1995        .select_only()
1996        .columns([fragment::Column::JobId, fragment::Column::FragmentId])
1997        .filter(
1998            fragment::Column::JobId
1999                .is_in(sink_ids)
2000                .and(FragmentTypeMask::intersects(FragmentTypeFlag::Sink)),
2001        )
2002        .into_tuple()
2003        .all(txn)
2004        .await?;
2005
2006    if sink_fragment_ids.len() != sink_num {
2007        return Err(anyhow::anyhow!(
2008            "expected exactly one sink fragment for each sink, but got {} fragments for {} sinks",
2009            sink_fragment_ids.len(),
2010            sink_num
2011        )
2012        .into());
2013    }
2014
2015    Ok(sink_fragment_ids.into_iter().collect())
2016}
2017
2018pub async fn has_table_been_migrated<C>(txn: &C, table_id: TableId) -> MetaResult<bool>
2019where
2020    C: ConnectionTrait,
2021{
2022    let mview_fragment: Vec<i32> = Fragment::find()
2023        .select_only()
2024        .column(fragment::Column::FragmentTypeMask)
2025        .filter(
2026            fragment::Column::JobId
2027                .eq(table_id)
2028                .and(FragmentTypeMask::intersects(FragmentTypeFlag::Mview)),
2029        )
2030        .into_tuple()
2031        .all(txn)
2032        .await?;
2033
2034    let mview_fragment_len = mview_fragment.len();
2035    if mview_fragment_len != 1 {
2036        bail!(
2037            "expected exactly one mview fragment for table {}, found {}",
2038            table_id,
2039            mview_fragment_len
2040        );
2041    }
2042
2043    let mview_fragment = mview_fragment.into_iter().next().unwrap();
2044    let migrated =
2045        FragmentTypeMask::from(mview_fragment).contains(FragmentTypeFlag::UpstreamSinkUnion);
2046
2047    Ok(migrated)
2048}
2049
2050pub async fn try_get_iceberg_table_by_downstream_sink<C>(
2051    txn: &C,
2052    sink_id: SinkId,
2053) -> MetaResult<Option<TableId>>
2054where
2055    C: ConnectionTrait,
2056{
2057    let sink = Sink::find_by_id(sink_id).one(txn).await?;
2058    let Some(sink) = sink else {
2059        return Ok(None);
2060    };
2061
2062    if sink.name.starts_with(ICEBERG_SINK_PREFIX) {
2063        let object_ids: Vec<ObjectId> = ObjectDependency::find()
2064            .select_only()
2065            .column(object_dependency::Column::Oid)
2066            .filter(object_dependency::Column::UsedBy.eq(sink_id))
2067            .into_tuple()
2068            .all(txn)
2069            .await?;
2070        let mut iceberg_table_ids = vec![];
2071        for object_id in object_ids {
2072            let table_id = object_id.as_table_id();
2073            if let Some(table_engine) = Table::find_by_id(table_id)
2074                .select_only()
2075                .column(table::Column::Engine)
2076                .into_tuple::<table::Engine>()
2077                .one(txn)
2078                .await?
2079                && table_engine == table::Engine::Iceberg
2080            {
2081                iceberg_table_ids.push(table_id);
2082            }
2083        }
2084        if iceberg_table_ids.len() == 1 {
2085            return Ok(Some(iceberg_table_ids[0]));
2086        }
2087    }
2088    Ok(None)
2089}
2090
2091pub async fn check_if_belongs_to_iceberg_table<C>(txn: &C, job_id: JobId) -> MetaResult<bool>
2092where
2093    C: ConnectionTrait,
2094{
2095    if let Some(engine) = Table::find_by_id(job_id.as_mv_table_id())
2096        .select_only()
2097        .column(table::Column::Engine)
2098        .into_tuple::<table::Engine>()
2099        .one(txn)
2100        .await?
2101        && engine == table::Engine::Iceberg
2102    {
2103        return Ok(true);
2104    }
2105    if let Some(sink_name) = Sink::find_by_id(job_id.as_sink_id())
2106        .select_only()
2107        .column(sink::Column::Name)
2108        .into_tuple::<String>()
2109        .one(txn)
2110        .await?
2111        && sink_name.starts_with(ICEBERG_SINK_PREFIX)
2112    {
2113        return Ok(true);
2114    }
2115    Ok(false)
2116}
2117
2118pub async fn find_dirty_iceberg_table_jobs<C>(
2119    txn: &C,
2120    database_id: Option<DatabaseId>,
2121) -> MetaResult<Vec<PartialObject>>
2122where
2123    C: ConnectionTrait,
2124{
2125    let mut filter_condition = streaming_job::Column::JobStatus
2126        .ne(JobStatus::Created)
2127        .and(object::Column::ObjType.is_in([ObjectType::Table, ObjectType::Sink]))
2128        .and(streaming_job::Column::CreateType.eq(CreateType::Background));
2129    if let Some(database_id) = database_id {
2130        filter_condition = filter_condition.and(object::Column::DatabaseId.eq(database_id));
2131    }
2132    let creating_table_sink_jobs: Vec<PartialObject> = StreamingJob::find()
2133        .select_only()
2134        .columns([
2135            object::Column::Oid,
2136            object::Column::ObjType,
2137            object::Column::SchemaId,
2138            object::Column::DatabaseId,
2139        ])
2140        .join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
2141        .filter(filter_condition)
2142        .into_partial_model()
2143        .all(txn)
2144        .await?;
2145
2146    let mut dirty_iceberg_table_jobs = vec![];
2147    for job in creating_table_sink_jobs {
2148        if check_if_belongs_to_iceberg_table(txn, job.oid.as_job_id()).await? {
2149            tracing::info!("Found dirty iceberg job with id: {}", job.oid);
2150            dirty_iceberg_table_jobs.push(job);
2151        }
2152    }
2153
2154    Ok(dirty_iceberg_table_jobs)
2155}
2156
2157pub fn build_select_node_list(
2158    from: &[ColumnCatalog],
2159    to: &[ColumnCatalog],
2160) -> MetaResult<Vec<PbExprNode>> {
2161    let mut exprs = Vec::with_capacity(to.len());
2162    let idx_by_col_id = from
2163        .iter()
2164        .enumerate()
2165        .map(|(idx, col)| (col.column_desc.as_ref().unwrap().column_id, idx))
2166        .collect::<HashMap<_, _>>();
2167
2168    for to_col in to {
2169        let to_col = to_col.column_desc.as_ref().unwrap();
2170        let to_col_type_ref = to_col.column_type.as_ref().unwrap();
2171        let to_col_type = DataType::from(to_col_type_ref);
2172        if let Some(from_idx) = idx_by_col_id.get(&to_col.column_id) {
2173            let from_col_type = DataType::from(
2174                from[*from_idx]
2175                    .column_desc
2176                    .as_ref()
2177                    .unwrap()
2178                    .column_type
2179                    .as_ref()
2180                    .unwrap(),
2181            );
2182            if !to_col_type.equals_datatype(&from_col_type) {
2183                return Err(anyhow!(
2184                    "Column type mismatch: {:?} != {:?}",
2185                    from_col_type,
2186                    to_col_type
2187                )
2188                .into());
2189            }
2190            exprs.push(PbExprNode {
2191                function_type: expr_node::Type::Unspecified.into(),
2192                return_type: Some(to_col_type_ref.clone()),
2193                rex_node: Some(expr_node::RexNode::InputRef(*from_idx as _)),
2194            });
2195        } else {
2196            let to_default_node =
2197                if let Some(GeneratedOrDefaultColumn::DefaultColumn(DefaultColumnDesc {
2198                    expr,
2199                    ..
2200                })) = &to_col.generated_or_default_column
2201                {
2202                    expr.clone().unwrap()
2203                } else {
2204                    let null = Datum::None.to_protobuf();
2205                    PbExprNode {
2206                        function_type: expr_node::Type::Unspecified.into(),
2207                        return_type: Some(to_col_type_ref.clone()),
2208                        rex_node: Some(expr_node::RexNode::Constant(null)),
2209                    }
2210                };
2211            exprs.push(to_default_node);
2212        }
2213    }
2214
2215    Ok(exprs)
2216}
2217
2218#[derive(Clone, Debug, Default)]
2219pub struct StreamingJobExtraInfo {
2220    pub timezone: Option<String>,
2221    pub config_override: Arc<str>,
2222    pub job_definition: String,
2223}
2224
2225impl StreamingJobExtraInfo {
2226    pub fn stream_context(&self) -> StreamContext {
2227        StreamContext {
2228            timezone: self.timezone.clone(),
2229            config_override: self.config_override.clone(),
2230        }
2231    }
2232}
2233
2234pub async fn get_streaming_job_extra_info<C>(
2235    txn: &C,
2236    job_ids: Vec<JobId>,
2237) -> MetaResult<HashMap<JobId, StreamingJobExtraInfo>>
2238where
2239    C: ConnectionTrait,
2240{
2241    let pairs: Vec<(JobId, Option<String>, Option<String>)> = StreamingJob::find()
2242        .select_only()
2243        .columns([
2244            streaming_job::Column::JobId,
2245            streaming_job::Column::Timezone,
2246            streaming_job::Column::ConfigOverride,
2247        ])
2248        .filter(streaming_job::Column::JobId.is_in(job_ids.clone()))
2249        .into_tuple()
2250        .all(txn)
2251        .await?;
2252
2253    let job_ids = job_ids.into_iter().collect();
2254
2255    let mut definitions = resolve_streaming_job_definition(txn, &job_ids).await?;
2256
2257    let result = pairs
2258        .into_iter()
2259        .map(|(job_id, timezone, config_override)| {
2260            let job_definition = definitions.remove(&job_id).unwrap_or_default();
2261            (
2262                job_id,
2263                StreamingJobExtraInfo {
2264                    timezone,
2265                    config_override: config_override.unwrap_or_default().into(),
2266                    job_definition,
2267                },
2268            )
2269        })
2270        .collect();
2271
2272    Ok(result)
2273}
2274
2275#[cfg(test)]
2276mod tests {
2277    use super::*;
2278
2279    #[test]
2280    fn test_extract_cdc_table_name() {
2281        let ddl1 = "CREATE TABLE t1 () FROM pg_source TABLE 'public.t1'";
2282        let ddl2 = "CREATE TABLE t2 (v1 int) FROM pg_source TABLE 'mydb.t2'";
2283        assert_eq!(
2284            extract_external_table_name_from_definition(ddl1),
2285            Some("public.t1".into())
2286        );
2287        assert_eq!(
2288            extract_external_table_name_from_definition(ddl2),
2289            Some("mydb.t2".into())
2290        );
2291    }
2292}