risingwave_meta/controller/
utils.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeSet, HashMap, HashSet};
16use std::sync::Arc;
17
18use anyhow::{Context, anyhow};
19use itertools::Itertools;
20use risingwave_common::bitmap::Bitmap;
21use risingwave_common::catalog::{
22    FragmentTypeFlag, FragmentTypeMask, ICEBERG_SINK_PREFIX, ICEBERG_SOURCE_PREFIX,
23};
24use risingwave_common::hash::{ActorMapping, VnodeBitmapExt, WorkerSlotId, WorkerSlotMapping};
25use risingwave_common::id::{JobId, SubscriptionId};
26use risingwave_common::types::{DataType, Datum};
27use risingwave_common::util::value_encoding::DatumToProtoExt;
28use risingwave_common::util::worker_util::DEFAULT_RESOURCE_GROUP;
29use risingwave_common::{bail, hash};
30use risingwave_meta_model::fragment::DistributionType;
31use risingwave_meta_model::object::ObjectType;
32use risingwave_meta_model::prelude::*;
33use risingwave_meta_model::table::TableType;
34use risingwave_meta_model::user_privilege::Action;
35use risingwave_meta_model::{
36    ActorId, ColumnCatalogArray, CreateType, DataTypeArray, DatabaseId, DispatcherType, FragmentId,
37    JobStatus, ObjectId, PrivilegeId, SchemaId, SinkId, SourceId, StreamNode, StreamSourceInfo,
38    TableId, TableIdArray, UserId, WorkerId, connection, database, fragment, fragment_relation,
39    function, index, object, object_dependency, schema, secret, sink, source, streaming_job,
40    subscription, table, user, user_default_privilege, user_privilege, view,
41};
42use risingwave_meta_model_migration::WithQuery;
43use risingwave_pb::catalog::{
44    PbConnection, PbDatabase, PbFunction, PbIndex, PbSchema, PbSecret, PbSink, PbSource,
45    PbSubscription, PbTable, PbView,
46};
47use risingwave_pb::common::WorkerNode;
48use risingwave_pb::expr::{PbExprNode, expr_node};
49use risingwave_pb::meta::object::PbObjectInfo;
50use risingwave_pb::meta::subscribe_response::Info as NotificationInfo;
51use risingwave_pb::meta::{PbFragmentWorkerSlotMapping, PbObject, PbObjectGroup};
52use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn;
53use risingwave_pb::plan_common::{ColumnCatalog, DefaultColumnDesc};
54use risingwave_pb::stream_plan::{PbDispatchOutputMapping, PbDispatcher, PbDispatcherType};
55use risingwave_pb::user::grant_privilege::{PbActionWithGrantOption, PbObject as PbGrantObject};
56use risingwave_pb::user::{PbAction, PbGrantPrivilege, PbUserInfo};
57use risingwave_sqlparser::ast::Statement as SqlStatement;
58use risingwave_sqlparser::parser::Parser;
59use sea_orm::sea_query::{
60    Alias, CommonTableExpression, Expr, Query, QueryStatementBuilder, SelectStatement, UnionType,
61    WithClause,
62};
63use sea_orm::{
64    ColumnTrait, ConnectionTrait, DatabaseTransaction, DerivePartialModel, EntityTrait,
65    FromQueryResult, IntoActiveModel, JoinType, Order, PaginatorTrait, QueryFilter, QuerySelect,
66    RelationTrait, Set, Statement,
67};
68use thiserror_ext::AsReport;
69use tracing::warn;
70
71use crate::barrier::{SharedActorInfos, SharedFragmentInfo};
72use crate::controller::ObjectModel;
73use crate::controller::fragment::FragmentTypeMaskExt;
74use crate::controller::scale::resolve_streaming_job_definition;
75use crate::model::{FragmentDownstreamRelation, StreamContext};
76use crate::{MetaError, MetaResult};
77
78/// This function will construct a query using recursive cte to find all objects[(id, `obj_type`)] that are used by the given object.
79///
80/// # Examples
81///
82/// ```
83/// use risingwave_meta::controller::utils::construct_obj_dependency_query;
84/// use sea_orm::sea_query::*;
85/// use sea_orm::*;
86///
87/// let query = construct_obj_dependency_query(1.into());
88///
89/// assert_eq!(
90///     query.to_string(MysqlQueryBuilder),
91///     r#"WITH RECURSIVE `used_by_object_ids` (`used_by`) AS (SELECT `used_by` FROM `object_dependency` WHERE `object_dependency`.`oid` = 1 UNION ALL (SELECT `oid` FROM `object` WHERE `object`.`database_id` = 1 OR `object`.`schema_id` = 1) UNION ALL (SELECT `object_dependency`.`used_by` FROM `object_dependency` INNER JOIN `used_by_object_ids` ON `used_by_object_ids`.`used_by` = `oid`)) SELECT DISTINCT `oid`, `obj_type`, `schema_id`, `database_id` FROM `used_by_object_ids` INNER JOIN `object` ON `used_by_object_ids`.`used_by` = `oid` ORDER BY `oid` DESC"#
92/// );
93/// assert_eq!(
94///     query.to_string(PostgresQueryBuilder),
95///     r#"WITH RECURSIVE "used_by_object_ids" ("used_by") AS (SELECT "used_by" FROM "object_dependency" WHERE "object_dependency"."oid" = 1 UNION ALL (SELECT "oid" FROM "object" WHERE "object"."database_id" = 1 OR "object"."schema_id" = 1) UNION ALL (SELECT "object_dependency"."used_by" FROM "object_dependency" INNER JOIN "used_by_object_ids" ON "used_by_object_ids"."used_by" = "oid")) SELECT DISTINCT "oid", "obj_type", "schema_id", "database_id" FROM "used_by_object_ids" INNER JOIN "object" ON "used_by_object_ids"."used_by" = "oid" ORDER BY "oid" DESC"#
96/// );
97/// assert_eq!(
98///     query.to_string(SqliteQueryBuilder),
99///     r#"WITH RECURSIVE "used_by_object_ids" ("used_by") AS (SELECT "used_by" FROM "object_dependency" WHERE "object_dependency"."oid" = 1 UNION ALL SELECT "oid" FROM "object" WHERE "object"."database_id" = 1 OR "object"."schema_id" = 1 UNION ALL SELECT "object_dependency"."used_by" FROM "object_dependency" INNER JOIN "used_by_object_ids" ON "used_by_object_ids"."used_by" = "oid") SELECT DISTINCT "oid", "obj_type", "schema_id", "database_id" FROM "used_by_object_ids" INNER JOIN "object" ON "used_by_object_ids"."used_by" = "oid" ORDER BY "oid" DESC"#
100/// );
101/// ```
102pub fn construct_obj_dependency_query(obj_id: ObjectId) -> WithQuery {
103    let cte_alias = Alias::new("used_by_object_ids");
104    let cte_return_alias = Alias::new("used_by");
105
106    let mut base_query = SelectStatement::new()
107        .column(object_dependency::Column::UsedBy)
108        .from(ObjectDependency)
109        .and_where(object_dependency::Column::Oid.eq(obj_id))
110        .to_owned();
111
112    let belonged_obj_query = SelectStatement::new()
113        .column(object::Column::Oid)
114        .from(Object)
115        .and_where(
116            object::Column::DatabaseId
117                .eq(obj_id)
118                .or(object::Column::SchemaId.eq(obj_id)),
119        )
120        .to_owned();
121
122    let cte_referencing = Query::select()
123        .column((ObjectDependency, object_dependency::Column::UsedBy))
124        .from(ObjectDependency)
125        .inner_join(
126            cte_alias.clone(),
127            Expr::col((cte_alias.clone(), cte_return_alias.clone()))
128                .equals(object_dependency::Column::Oid),
129        )
130        .to_owned();
131
132    let mut common_table_expr = CommonTableExpression::new();
133    common_table_expr
134        .query(
135            base_query
136                .union(UnionType::All, belonged_obj_query)
137                .union(UnionType::All, cte_referencing)
138                .to_owned(),
139        )
140        .column(cte_return_alias.clone())
141        .table_name(cte_alias.clone());
142
143    SelectStatement::new()
144        .distinct()
145        .columns([
146            object::Column::Oid,
147            object::Column::ObjType,
148            object::Column::SchemaId,
149            object::Column::DatabaseId,
150        ])
151        .from(cte_alias.clone())
152        .inner_join(
153            Object,
154            Expr::col((cte_alias, cte_return_alias)).equals(object::Column::Oid),
155        )
156        .order_by(object::Column::Oid, Order::Desc)
157        .to_owned()
158        .with(
159            WithClause::new()
160                .recursive(true)
161                .cte(common_table_expr)
162                .to_owned(),
163        )
164}
165
166/// This function will construct a query using recursive cte to find if dependent objects are already relying on the target table.
167///
168/// # Examples
169///
170/// ```
171/// use risingwave_meta::controller::utils::construct_sink_cycle_check_query;
172/// use sea_orm::sea_query::*;
173/// use sea_orm::*;
174///
175/// let query = construct_sink_cycle_check_query(1.into(), vec![2.into(), 3.into()]);
176///
177/// assert_eq!(
178///     query.to_string(MysqlQueryBuilder),
179///     r#"WITH RECURSIVE `used_by_object_ids_with_sink` (`oid`, `used_by`) AS (SELECT `oid`, `used_by` FROM `object_dependency` WHERE `object_dependency`.`oid` = 1 UNION ALL (SELECT `obj_dependency_with_sink`.`oid`, `obj_dependency_with_sink`.`used_by` FROM (SELECT `oid`, `used_by` FROM `object_dependency` UNION ALL (SELECT `sink_id`, `target_table` FROM `sink` WHERE `sink`.`target_table` IS NOT NULL)) AS `obj_dependency_with_sink` INNER JOIN `used_by_object_ids_with_sink` ON `used_by_object_ids_with_sink`.`used_by` = `obj_dependency_with_sink`.`oid` WHERE `used_by_object_ids_with_sink`.`used_by` <> `used_by_object_ids_with_sink`.`oid`)) SELECT COUNT(`used_by_object_ids_with_sink`.`used_by`) FROM `used_by_object_ids_with_sink` WHERE `used_by_object_ids_with_sink`.`used_by` IN (2, 3)"#
180/// );
181/// assert_eq!(
182///     query.to_string(PostgresQueryBuilder),
183///     r#"WITH RECURSIVE "used_by_object_ids_with_sink" ("oid", "used_by") AS (SELECT "oid", "used_by" FROM "object_dependency" WHERE "object_dependency"."oid" = 1 UNION ALL (SELECT "obj_dependency_with_sink"."oid", "obj_dependency_with_sink"."used_by" FROM (SELECT "oid", "used_by" FROM "object_dependency" UNION ALL (SELECT "sink_id", "target_table" FROM "sink" WHERE "sink"."target_table" IS NOT NULL)) AS "obj_dependency_with_sink" INNER JOIN "used_by_object_ids_with_sink" ON "used_by_object_ids_with_sink"."used_by" = "obj_dependency_with_sink"."oid" WHERE "used_by_object_ids_with_sink"."used_by" <> "used_by_object_ids_with_sink"."oid")) SELECT COUNT("used_by_object_ids_with_sink"."used_by") FROM "used_by_object_ids_with_sink" WHERE "used_by_object_ids_with_sink"."used_by" IN (2, 3)"#
184/// );
185/// assert_eq!(
186///     query.to_string(SqliteQueryBuilder),
187///     r#"WITH RECURSIVE "used_by_object_ids_with_sink" ("oid", "used_by") AS (SELECT "oid", "used_by" FROM "object_dependency" WHERE "object_dependency"."oid" = 1 UNION ALL SELECT "obj_dependency_with_sink"."oid", "obj_dependency_with_sink"."used_by" FROM (SELECT "oid", "used_by" FROM "object_dependency" UNION ALL SELECT "sink_id", "target_table" FROM "sink" WHERE "sink"."target_table" IS NOT NULL) AS "obj_dependency_with_sink" INNER JOIN "used_by_object_ids_with_sink" ON "used_by_object_ids_with_sink"."used_by" = "obj_dependency_with_sink"."oid" WHERE "used_by_object_ids_with_sink"."used_by" <> "used_by_object_ids_with_sink"."oid") SELECT COUNT("used_by_object_ids_with_sink"."used_by") FROM "used_by_object_ids_with_sink" WHERE "used_by_object_ids_with_sink"."used_by" IN (2, 3)"#
188/// );
189/// ```
190pub fn construct_sink_cycle_check_query(
191    target_table: ObjectId,
192    dependent_objects: Vec<ObjectId>,
193) -> WithQuery {
194    let cte_alias = Alias::new("used_by_object_ids_with_sink");
195    let depend_alias = Alias::new("obj_dependency_with_sink");
196
197    let mut base_query = SelectStatement::new()
198        .columns([
199            object_dependency::Column::Oid,
200            object_dependency::Column::UsedBy,
201        ])
202        .from(ObjectDependency)
203        .and_where(object_dependency::Column::Oid.eq(target_table))
204        .to_owned();
205
206    let query_sink_deps = SelectStatement::new()
207        .columns([sink::Column::SinkId, sink::Column::TargetTable])
208        .from(Sink)
209        .and_where(sink::Column::TargetTable.is_not_null())
210        .to_owned();
211
212    let cte_referencing = Query::select()
213        .column((depend_alias.clone(), object_dependency::Column::Oid))
214        .column((depend_alias.clone(), object_dependency::Column::UsedBy))
215        .from_subquery(
216            SelectStatement::new()
217                .columns([
218                    object_dependency::Column::Oid,
219                    object_dependency::Column::UsedBy,
220                ])
221                .from(ObjectDependency)
222                .union(UnionType::All, query_sink_deps)
223                .to_owned(),
224            depend_alias.clone(),
225        )
226        .inner_join(
227            cte_alias.clone(),
228            Expr::col((cte_alias.clone(), object_dependency::Column::UsedBy))
229                .eq(Expr::col((depend_alias, object_dependency::Column::Oid))),
230        )
231        .and_where(
232            Expr::col((cte_alias.clone(), object_dependency::Column::UsedBy)).ne(Expr::col((
233                cte_alias.clone(),
234                object_dependency::Column::Oid,
235            ))),
236        )
237        .to_owned();
238
239    let mut common_table_expr = CommonTableExpression::new();
240    common_table_expr
241        .query(base_query.union(UnionType::All, cte_referencing).to_owned())
242        .columns([
243            object_dependency::Column::Oid,
244            object_dependency::Column::UsedBy,
245        ])
246        .table_name(cte_alias.clone());
247
248    SelectStatement::new()
249        .expr(Expr::col((cte_alias.clone(), object_dependency::Column::UsedBy)).count())
250        .from(cte_alias.clone())
251        .and_where(
252            Expr::col((cte_alias, object_dependency::Column::UsedBy)).is_in(dependent_objects),
253        )
254        .to_owned()
255        .with(
256            WithClause::new()
257                .recursive(true)
258                .cte(common_table_expr)
259                .to_owned(),
260        )
261}
262
263#[derive(Clone, DerivePartialModel, FromQueryResult, Debug)]
264#[sea_orm(entity = "Object")]
265pub struct PartialObject {
266    pub oid: ObjectId,
267    pub obj_type: ObjectType,
268    pub schema_id: Option<SchemaId>,
269    pub database_id: Option<DatabaseId>,
270}
271
272#[derive(Clone, DerivePartialModel, FromQueryResult)]
273#[sea_orm(entity = "Fragment")]
274pub struct PartialFragmentStateTables {
275    pub fragment_id: FragmentId,
276    pub job_id: ObjectId,
277    pub state_table_ids: TableIdArray,
278}
279
280#[derive(Clone, Eq, PartialEq, Debug)]
281pub struct PartialActorLocation {
282    pub actor_id: ActorId,
283    pub fragment_id: FragmentId,
284    pub worker_id: WorkerId,
285}
286
287#[derive(FromQueryResult, Debug, Eq, PartialEq, Clone)]
288pub struct FragmentDesc {
289    pub fragment_id: FragmentId,
290    pub job_id: JobId,
291    pub fragment_type_mask: i32,
292    pub distribution_type: DistributionType,
293    pub state_table_ids: TableIdArray,
294    pub parallelism: i64,
295    pub vnode_count: i32,
296    pub stream_node: StreamNode,
297    pub parallelism_policy: String,
298}
299
300/// List all objects that are using the given one in a cascade way. It runs a recursive CTE to find all the dependencies.
301pub async fn get_referring_objects_cascade<C>(
302    obj_id: ObjectId,
303    db: &C,
304) -> MetaResult<Vec<PartialObject>>
305where
306    C: ConnectionTrait,
307{
308    let query = construct_obj_dependency_query(obj_id);
309    let (sql, values) = query.build_any(&*db.get_database_backend().get_query_builder());
310    let objects = PartialObject::find_by_statement(Statement::from_sql_and_values(
311        db.get_database_backend(),
312        sql,
313        values,
314    ))
315    .all(db)
316    .await?;
317    Ok(objects)
318}
319
320/// Check if create a sink with given dependent objects into the target table will cause a cycle, return true if it will.
321pub async fn check_sink_into_table_cycle<C>(
322    target_table: ObjectId,
323    dependent_objs: Vec<ObjectId>,
324    db: &C,
325) -> MetaResult<bool>
326where
327    C: ConnectionTrait,
328{
329    if dependent_objs.is_empty() {
330        return Ok(false);
331    }
332
333    // special check for self referencing
334    if dependent_objs.contains(&target_table) {
335        return Ok(true);
336    }
337
338    let query = construct_sink_cycle_check_query(target_table, dependent_objs);
339    let (sql, values) = query.build_any(&*db.get_database_backend().get_query_builder());
340
341    let res = db
342        .query_one(Statement::from_sql_and_values(
343            db.get_database_backend(),
344            sql,
345            values,
346        ))
347        .await?
348        .unwrap();
349
350    let cnt: i64 = res.try_get_by(0)?;
351
352    Ok(cnt != 0)
353}
354
355/// `ensure_object_id` ensures the existence of target object in the cluster.
356pub async fn ensure_object_id<C>(
357    object_type: ObjectType,
358    obj_id: impl Into<ObjectId>,
359    db: &C,
360) -> MetaResult<()>
361where
362    C: ConnectionTrait,
363{
364    let obj_id = obj_id.into();
365    let count = Object::find_by_id(obj_id).count(db).await?;
366    if count == 0 {
367        return Err(MetaError::catalog_id_not_found(
368            object_type.as_str(),
369            obj_id,
370        ));
371    }
372    Ok(())
373}
374
375pub async fn ensure_job_not_canceled<C>(job_id: JobId, db: &C) -> MetaResult<()>
376where
377    C: ConnectionTrait,
378{
379    let count = Object::find_by_id(job_id).count(db).await?;
380    if count == 0 {
381        return Err(MetaError::cancelled(format!(
382            "job {} might be cancelled manually or by recovery",
383            job_id
384        )));
385    }
386    Ok(())
387}
388
389/// `ensure_user_id` ensures the existence of target user in the cluster.
390pub async fn ensure_user_id<C>(user_id: UserId, db: &C) -> MetaResult<()>
391where
392    C: ConnectionTrait,
393{
394    let count = User::find_by_id(user_id).count(db).await?;
395    if count == 0 {
396        return Err(anyhow!("user {} was concurrently dropped", user_id).into());
397    }
398    Ok(())
399}
400
401/// `check_database_name_duplicate` checks whether the database name is already used in the cluster.
402pub async fn check_database_name_duplicate<C>(name: &str, db: &C) -> MetaResult<()>
403where
404    C: ConnectionTrait,
405{
406    let count = Database::find()
407        .filter(database::Column::Name.eq(name))
408        .count(db)
409        .await?;
410    if count > 0 {
411        assert_eq!(count, 1);
412        return Err(MetaError::catalog_duplicated("database", name));
413    }
414    Ok(())
415}
416
417/// `check_function_signature_duplicate` checks whether the function name and its signature is already used in the target namespace.
418pub async fn check_function_signature_duplicate<C>(
419    pb_function: &PbFunction,
420    db: &C,
421) -> MetaResult<()>
422where
423    C: ConnectionTrait,
424{
425    let count = Function::find()
426        .inner_join(Object)
427        .filter(
428            object::Column::DatabaseId
429                .eq(pb_function.database_id)
430                .and(object::Column::SchemaId.eq(pb_function.schema_id))
431                .and(function::Column::Name.eq(&pb_function.name))
432                .and(
433                    function::Column::ArgTypes
434                        .eq(DataTypeArray::from(pb_function.arg_types.clone())),
435                ),
436        )
437        .count(db)
438        .await?;
439    if count > 0 {
440        assert_eq!(count, 1);
441        return Err(MetaError::catalog_duplicated("function", &pb_function.name));
442    }
443    Ok(())
444}
445
446/// `check_connection_name_duplicate` checks whether the connection name is already used in the target namespace.
447pub async fn check_connection_name_duplicate<C>(
448    pb_connection: &PbConnection,
449    db: &C,
450) -> MetaResult<()>
451where
452    C: ConnectionTrait,
453{
454    let count = Connection::find()
455        .inner_join(Object)
456        .filter(
457            object::Column::DatabaseId
458                .eq(pb_connection.database_id)
459                .and(object::Column::SchemaId.eq(pb_connection.schema_id))
460                .and(connection::Column::Name.eq(&pb_connection.name)),
461        )
462        .count(db)
463        .await?;
464    if count > 0 {
465        assert_eq!(count, 1);
466        return Err(MetaError::catalog_duplicated(
467            "connection",
468            &pb_connection.name,
469        ));
470    }
471    Ok(())
472}
473
474pub async fn check_secret_name_duplicate<C>(pb_secret: &PbSecret, db: &C) -> MetaResult<()>
475where
476    C: ConnectionTrait,
477{
478    let count = Secret::find()
479        .inner_join(Object)
480        .filter(
481            object::Column::DatabaseId
482                .eq(pb_secret.database_id)
483                .and(object::Column::SchemaId.eq(pb_secret.schema_id))
484                .and(secret::Column::Name.eq(&pb_secret.name)),
485        )
486        .count(db)
487        .await?;
488    if count > 0 {
489        assert_eq!(count, 1);
490        return Err(MetaError::catalog_duplicated("secret", &pb_secret.name));
491    }
492    Ok(())
493}
494
495pub async fn check_subscription_name_duplicate<C>(
496    pb_subscription: &PbSubscription,
497    db: &C,
498) -> MetaResult<()>
499where
500    C: ConnectionTrait,
501{
502    let count = Subscription::find()
503        .inner_join(Object)
504        .filter(
505            object::Column::DatabaseId
506                .eq(pb_subscription.database_id)
507                .and(object::Column::SchemaId.eq(pb_subscription.schema_id))
508                .and(subscription::Column::Name.eq(&pb_subscription.name)),
509        )
510        .count(db)
511        .await?;
512    if count > 0 {
513        assert_eq!(count, 1);
514        return Err(MetaError::catalog_duplicated(
515            "subscription",
516            &pb_subscription.name,
517        ));
518    }
519    Ok(())
520}
521
522/// `check_user_name_duplicate` checks whether the user is already existed in the cluster.
523pub async fn check_user_name_duplicate<C>(name: &str, db: &C) -> MetaResult<()>
524where
525    C: ConnectionTrait,
526{
527    let count = User::find()
528        .filter(user::Column::Name.eq(name))
529        .count(db)
530        .await?;
531    if count > 0 {
532        assert_eq!(count, 1);
533        return Err(MetaError::catalog_duplicated("user", name));
534    }
535    Ok(())
536}
537
538/// `check_relation_name_duplicate` checks whether the relation name is already used in the target namespace.
539pub async fn check_relation_name_duplicate<C>(
540    name: &str,
541    database_id: DatabaseId,
542    schema_id: SchemaId,
543    db: &C,
544) -> MetaResult<()>
545where
546    C: ConnectionTrait,
547{
548    macro_rules! check_duplicated {
549        ($obj_type:expr, $entity:ident, $table:ident) => {
550            let object_id = Object::find()
551                .select_only()
552                .column(object::Column::Oid)
553                .inner_join($entity)
554                .filter(
555                    object::Column::DatabaseId
556                        .eq(Some(database_id))
557                        .and(object::Column::SchemaId.eq(Some(schema_id)))
558                        .and($table::Column::Name.eq(name)),
559                )
560                .into_tuple::<ObjectId>()
561                .one(db)
562                .await?;
563            if let Some(oid) = object_id {
564                let check_creation = if $obj_type == ObjectType::View {
565                    false
566                } else if $obj_type == ObjectType::Source {
567                    let source_info = Source::find_by_id(oid.as_source_id())
568                        .select_only()
569                        .column(source::Column::SourceInfo)
570                        .into_tuple::<Option<StreamSourceInfo>>()
571                        .one(db)
572                        .await?
573                        .unwrap();
574                    source_info.map_or(false, |info| info.to_protobuf().is_shared())
575                } else {
576                    true
577                };
578                let job_id = oid.as_job_id();
579                return if check_creation
580                    && !matches!(
581                        StreamingJob::find_by_id(job_id)
582                            .select_only()
583                            .column(streaming_job::Column::JobStatus)
584                            .into_tuple::<JobStatus>()
585                            .one(db)
586                            .await?,
587                        Some(JobStatus::Created)
588                    ) {
589                    Err(MetaError::catalog_under_creation(
590                        $obj_type.as_str(),
591                        name,
592                        job_id,
593                    ))
594                } else {
595                    Err(MetaError::catalog_duplicated($obj_type.as_str(), name))
596                };
597            }
598        };
599    }
600    check_duplicated!(ObjectType::Table, Table, table);
601    check_duplicated!(ObjectType::Source, Source, source);
602    check_duplicated!(ObjectType::Sink, Sink, sink);
603    check_duplicated!(ObjectType::Index, Index, index);
604    check_duplicated!(ObjectType::View, View, view);
605
606    Ok(())
607}
608
609/// `check_schema_name_duplicate` checks whether the schema name is already used in the target database.
610pub async fn check_schema_name_duplicate<C>(
611    name: &str,
612    database_id: DatabaseId,
613    db: &C,
614) -> MetaResult<()>
615where
616    C: ConnectionTrait,
617{
618    let count = Object::find()
619        .inner_join(Schema)
620        .filter(
621            object::Column::ObjType
622                .eq(ObjectType::Schema)
623                .and(object::Column::DatabaseId.eq(Some(database_id)))
624                .and(schema::Column::Name.eq(name)),
625        )
626        .count(db)
627        .await?;
628    if count != 0 {
629        return Err(MetaError::catalog_duplicated("schema", name));
630    }
631
632    Ok(())
633}
634
635/// `check_object_refer_for_drop` checks whether the object is used by other objects except indexes.
636/// It returns an error that contains the details of the referring objects if it is used by others.
637pub async fn check_object_refer_for_drop<C>(
638    object_type: ObjectType,
639    object_id: ObjectId,
640    db: &C,
641) -> MetaResult<()>
642where
643    C: ConnectionTrait,
644{
645    // Ignore indexes.
646    let count = if object_type == ObjectType::Table {
647        ObjectDependency::find()
648            .join(
649                JoinType::InnerJoin,
650                object_dependency::Relation::Object1.def(),
651            )
652            .filter(
653                object_dependency::Column::Oid
654                    .eq(object_id)
655                    .and(object::Column::ObjType.ne(ObjectType::Index)),
656            )
657            .count(db)
658            .await?
659    } else {
660        ObjectDependency::find()
661            .filter(object_dependency::Column::Oid.eq(object_id))
662            .count(db)
663            .await?
664    };
665    if count != 0 {
666        // find the name of all objects that are using the given one.
667        let referring_objects = get_referring_objects(object_id, db).await?;
668        let referring_objs_map = referring_objects
669            .into_iter()
670            .filter(|o| o.obj_type != ObjectType::Index)
671            .into_group_map_by(|o| o.obj_type);
672        let mut details = vec![];
673        for (obj_type, objs) in referring_objs_map {
674            match obj_type {
675                ObjectType::Table => {
676                    let tables: Vec<(String, String)> = Object::find()
677                        .join(JoinType::InnerJoin, object::Relation::Table.def())
678                        .join(JoinType::InnerJoin, object::Relation::Database2.def())
679                        .join(JoinType::InnerJoin, object::Relation::Schema2.def())
680                        .select_only()
681                        .column(schema::Column::Name)
682                        .column(table::Column::Name)
683                        .filter(object::Column::Oid.is_in(objs.iter().map(|o| o.oid)))
684                        .into_tuple()
685                        .all(db)
686                        .await?;
687                    details.extend(tables.into_iter().map(|(schema_name, table_name)| {
688                        format!(
689                            "materialized view {}.{} depends on it",
690                            schema_name, table_name
691                        )
692                    }));
693                }
694                ObjectType::Sink => {
695                    let sinks: Vec<(String, String)> = Object::find()
696                        .join(JoinType::InnerJoin, object::Relation::Sink.def())
697                        .join(JoinType::InnerJoin, object::Relation::Database2.def())
698                        .join(JoinType::InnerJoin, object::Relation::Schema2.def())
699                        .select_only()
700                        .column(schema::Column::Name)
701                        .column(sink::Column::Name)
702                        .filter(object::Column::Oid.is_in(objs.iter().map(|o| o.oid)))
703                        .into_tuple()
704                        .all(db)
705                        .await?;
706                    if object_type == ObjectType::Table {
707                        let engine = Table::find_by_id(object_id.as_table_id())
708                            .select_only()
709                            .column(table::Column::Engine)
710                            .into_tuple::<table::Engine>()
711                            .one(db)
712                            .await?;
713                        if engine == Some(table::Engine::Iceberg) && sinks.len() == 1 {
714                            continue;
715                        }
716                    }
717                    details.extend(sinks.into_iter().map(|(schema_name, sink_name)| {
718                        format!("sink {}.{} depends on it", schema_name, sink_name)
719                    }));
720                }
721                ObjectType::View => {
722                    let views: Vec<(String, String)> = Object::find()
723                        .join(JoinType::InnerJoin, object::Relation::View.def())
724                        .join(JoinType::InnerJoin, object::Relation::Database2.def())
725                        .join(JoinType::InnerJoin, object::Relation::Schema2.def())
726                        .select_only()
727                        .column(schema::Column::Name)
728                        .column(view::Column::Name)
729                        .filter(object::Column::Oid.is_in(objs.iter().map(|o| o.oid)))
730                        .into_tuple()
731                        .all(db)
732                        .await?;
733                    details.extend(views.into_iter().map(|(schema_name, view_name)| {
734                        format!("view {}.{} depends on it", schema_name, view_name)
735                    }));
736                }
737                ObjectType::Subscription => {
738                    let subscriptions: Vec<(String, String)> = Object::find()
739                        .join(JoinType::InnerJoin, object::Relation::Subscription.def())
740                        .join(JoinType::InnerJoin, object::Relation::Database2.def())
741                        .join(JoinType::InnerJoin, object::Relation::Schema2.def())
742                        .select_only()
743                        .column(schema::Column::Name)
744                        .column(subscription::Column::Name)
745                        .filter(object::Column::Oid.is_in(objs.iter().map(|o| o.oid)))
746                        .into_tuple()
747                        .all(db)
748                        .await?;
749                    details.extend(subscriptions.into_iter().map(
750                        |(schema_name, subscription_name)| {
751                            format!(
752                                "subscription {}.{} depends on it",
753                                schema_name, subscription_name
754                            )
755                        },
756                    ));
757                }
758                ObjectType::Source => {
759                    let sources: Vec<(String, String)> = Object::find()
760                        .join(JoinType::InnerJoin, object::Relation::Source.def())
761                        .join(JoinType::InnerJoin, object::Relation::Database2.def())
762                        .join(JoinType::InnerJoin, object::Relation::Schema2.def())
763                        .select_only()
764                        .column(schema::Column::Name)
765                        .column(source::Column::Name)
766                        .filter(object::Column::Oid.is_in(objs.iter().map(|o| o.oid)))
767                        .into_tuple()
768                        .all(db)
769                        .await?;
770                    details.extend(sources.into_iter().map(|(schema_name, view_name)| {
771                        format!("source {}.{} depends on it", schema_name, view_name)
772                    }));
773                }
774                ObjectType::Connection => {
775                    let connections: Vec<(String, String)> = Object::find()
776                        .join(JoinType::InnerJoin, object::Relation::Connection.def())
777                        .join(JoinType::InnerJoin, object::Relation::Database2.def())
778                        .join(JoinType::InnerJoin, object::Relation::Schema2.def())
779                        .select_only()
780                        .column(schema::Column::Name)
781                        .column(connection::Column::Name)
782                        .filter(object::Column::Oid.is_in(objs.iter().map(|o| o.oid)))
783                        .into_tuple()
784                        .all(db)
785                        .await?;
786                    details.extend(connections.into_iter().map(|(schema_name, view_name)| {
787                        format!("connection {}.{} depends on it", schema_name, view_name)
788                    }));
789                }
790                // only the table, source, sink, subscription, view, connection and index will depend on other objects.
791                _ => bail!("unexpected referring object type: {}", obj_type.as_str()),
792            }
793        }
794        if details.is_empty() {
795            return Ok(());
796        }
797
798        return Err(MetaError::permission_denied(format!(
799            "{} used by {} other objects. \nDETAIL: {}\n\
800            {}",
801            object_type.as_str(),
802            details.len(),
803            details.join("\n"),
804            match object_type {
805                ObjectType::Function | ObjectType::Connection | ObjectType::Secret =>
806                    "HINT: DROP the dependent objects first.",
807                ObjectType::Database | ObjectType::Schema => unreachable!(),
808                _ => "HINT:  Use DROP ... CASCADE to drop the dependent objects too.",
809            }
810        )));
811    }
812    Ok(())
813}
814
815/// List all objects that are using the given one.
816pub async fn get_referring_objects<C>(object_id: ObjectId, db: &C) -> MetaResult<Vec<PartialObject>>
817where
818    C: ConnectionTrait,
819{
820    let objs = ObjectDependency::find()
821        .filter(object_dependency::Column::Oid.eq(object_id))
822        .join(
823            JoinType::InnerJoin,
824            object_dependency::Relation::Object1.def(),
825        )
826        .into_partial_model()
827        .all(db)
828        .await?;
829
830    Ok(objs)
831}
832
833/// `ensure_schema_empty` ensures that the schema is empty, used by `DROP SCHEMA`.
834pub async fn ensure_schema_empty<C>(schema_id: SchemaId, db: &C) -> MetaResult<()>
835where
836    C: ConnectionTrait,
837{
838    let count = Object::find()
839        .filter(object::Column::SchemaId.eq(Some(schema_id)))
840        .count(db)
841        .await?;
842    if count != 0 {
843        return Err(MetaError::permission_denied("schema is not empty"));
844    }
845
846    Ok(())
847}
848
849/// `list_user_info_by_ids` lists all users' info by their ids.
850pub async fn list_user_info_by_ids<C>(
851    user_ids: impl IntoIterator<Item = UserId>,
852    db: &C,
853) -> MetaResult<Vec<PbUserInfo>>
854where
855    C: ConnectionTrait,
856{
857    let mut user_infos = vec![];
858    for user_id in user_ids {
859        let user = User::find_by_id(user_id)
860            .one(db)
861            .await?
862            .ok_or_else(|| MetaError::catalog_id_not_found("user", user_id))?;
863        let mut user_info: PbUserInfo = user.into();
864        user_info.grant_privileges = get_user_privilege(user_id, db).await?;
865        user_infos.push(user_info);
866    }
867    Ok(user_infos)
868}
869
870/// `get_object_owner` returns the owner of the given object.
871pub async fn get_object_owner<C>(object_id: ObjectId, db: &C) -> MetaResult<UserId>
872where
873    C: ConnectionTrait,
874{
875    let obj_owner: UserId = Object::find_by_id(object_id)
876        .select_only()
877        .column(object::Column::OwnerId)
878        .into_tuple()
879        .one(db)
880        .await?
881        .ok_or_else(|| MetaError::catalog_id_not_found("object", object_id))?;
882    Ok(obj_owner)
883}
884
885/// `construct_privilege_dependency_query` constructs a query to find all privileges that are dependent on the given one.
886///
887/// # Examples
888///
889/// ```
890/// use risingwave_meta::controller::utils::construct_privilege_dependency_query;
891/// use sea_orm::sea_query::*;
892/// use sea_orm::*;
893///
894/// let query = construct_privilege_dependency_query(vec![1, 2, 3]);
895///
896/// assert_eq!(
897///    query.to_string(MysqlQueryBuilder),
898///   r#"WITH RECURSIVE `granted_privilege_ids` (`id`, `user_id`) AS (SELECT `id`, `user_id` FROM `user_privilege` WHERE `user_privilege`.`id` IN (1, 2, 3) UNION ALL (SELECT `user_privilege`.`id`, `user_privilege`.`user_id` FROM `user_privilege` INNER JOIN `granted_privilege_ids` ON `granted_privilege_ids`.`id` = `dependent_id`)) SELECT `id`, `user_id` FROM `granted_privilege_ids`"#
899/// );
900/// assert_eq!(
901///   query.to_string(PostgresQueryBuilder),
902///  r#"WITH RECURSIVE "granted_privilege_ids" ("id", "user_id") AS (SELECT "id", "user_id" FROM "user_privilege" WHERE "user_privilege"."id" IN (1, 2, 3) UNION ALL (SELECT "user_privilege"."id", "user_privilege"."user_id" FROM "user_privilege" INNER JOIN "granted_privilege_ids" ON "granted_privilege_ids"."id" = "dependent_id")) SELECT "id", "user_id" FROM "granted_privilege_ids""#
903/// );
904/// assert_eq!(
905///  query.to_string(SqliteQueryBuilder),
906///  r#"WITH RECURSIVE "granted_privilege_ids" ("id", "user_id") AS (SELECT "id", "user_id" FROM "user_privilege" WHERE "user_privilege"."id" IN (1, 2, 3) UNION ALL SELECT "user_privilege"."id", "user_privilege"."user_id" FROM "user_privilege" INNER JOIN "granted_privilege_ids" ON "granted_privilege_ids"."id" = "dependent_id") SELECT "id", "user_id" FROM "granted_privilege_ids""#
907/// );
908/// ```
909pub fn construct_privilege_dependency_query(ids: Vec<PrivilegeId>) -> WithQuery {
910    let cte_alias = Alias::new("granted_privilege_ids");
911    let cte_return_privilege_alias = Alias::new("id");
912    let cte_return_user_alias = Alias::new("user_id");
913
914    let mut base_query = SelectStatement::new()
915        .columns([user_privilege::Column::Id, user_privilege::Column::UserId])
916        .from(UserPrivilege)
917        .and_where(user_privilege::Column::Id.is_in(ids))
918        .to_owned();
919
920    let cte_referencing = Query::select()
921        .columns([
922            (UserPrivilege, user_privilege::Column::Id),
923            (UserPrivilege, user_privilege::Column::UserId),
924        ])
925        .from(UserPrivilege)
926        .inner_join(
927            cte_alias.clone(),
928            Expr::col((cte_alias.clone(), cte_return_privilege_alias.clone()))
929                .equals(user_privilege::Column::DependentId),
930        )
931        .to_owned();
932
933    let mut common_table_expr = CommonTableExpression::new();
934    common_table_expr
935        .query(base_query.union(UnionType::All, cte_referencing).to_owned())
936        .columns([
937            cte_return_privilege_alias.clone(),
938            cte_return_user_alias.clone(),
939        ])
940        .table_name(cte_alias.clone());
941
942    SelectStatement::new()
943        .columns([cte_return_privilege_alias, cte_return_user_alias])
944        .from(cte_alias)
945        .to_owned()
946        .with(
947            WithClause::new()
948                .recursive(true)
949                .cte(common_table_expr)
950                .to_owned(),
951        )
952}
953
954pub async fn get_internal_tables_by_id<C>(job_id: JobId, db: &C) -> MetaResult<Vec<TableId>>
955where
956    C: ConnectionTrait,
957{
958    let table_ids: Vec<TableId> = Table::find()
959        .select_only()
960        .column(table::Column::TableId)
961        .filter(
962            table::Column::TableType
963                .eq(TableType::Internal)
964                .and(table::Column::BelongsToJobId.eq(job_id)),
965        )
966        .into_tuple()
967        .all(db)
968        .await?;
969    Ok(table_ids)
970}
971
972pub async fn get_index_state_tables_by_table_id<C>(
973    table_id: TableId,
974    db: &C,
975) -> MetaResult<Vec<TableId>>
976where
977    C: ConnectionTrait,
978{
979    let mut index_table_ids: Vec<TableId> = Index::find()
980        .select_only()
981        .column(index::Column::IndexTableId)
982        .filter(index::Column::PrimaryTableId.eq(table_id))
983        .into_tuple()
984        .all(db)
985        .await?;
986
987    if !index_table_ids.is_empty() {
988        let internal_table_ids: Vec<TableId> = Table::find()
989            .select_only()
990            .column(table::Column::TableId)
991            .filter(
992                table::Column::TableType
993                    .eq(TableType::Internal)
994                    .and(table::Column::BelongsToJobId.is_in(index_table_ids.clone())),
995            )
996            .into_tuple()
997            .all(db)
998            .await?;
999
1000        index_table_ids.extend(internal_table_ids.into_iter());
1001    }
1002
1003    Ok(index_table_ids)
1004}
1005
1006/// `get_iceberg_related_object_ids` returns the related object ids of the iceberg source, sink and internal tables.
1007pub async fn get_iceberg_related_object_ids<C>(
1008    object_id: ObjectId,
1009    db: &C,
1010) -> MetaResult<Vec<ObjectId>>
1011where
1012    C: ConnectionTrait,
1013{
1014    let object = Object::find_by_id(object_id)
1015        .one(db)
1016        .await?
1017        .ok_or_else(|| MetaError::catalog_id_not_found("object", object_id))?;
1018    if object.obj_type != ObjectType::Table {
1019        return Ok(vec![]);
1020    }
1021
1022    let table = Table::find_by_id(object_id.as_table_id())
1023        .one(db)
1024        .await?
1025        .ok_or_else(|| MetaError::catalog_id_not_found("table", object_id))?;
1026    if !matches!(table.engine, Some(table::Engine::Iceberg)) {
1027        return Ok(vec![]);
1028    }
1029
1030    let database_id = object.database_id.unwrap();
1031    let schema_id = object.schema_id.unwrap();
1032
1033    let mut related_objects = vec![];
1034
1035    let iceberg_sink_name = format!("{}{}", ICEBERG_SINK_PREFIX, table.name);
1036    let iceberg_sink_id = Sink::find()
1037        .inner_join(Object)
1038        .select_only()
1039        .column(sink::Column::SinkId)
1040        .filter(
1041            object::Column::DatabaseId
1042                .eq(database_id)
1043                .and(object::Column::SchemaId.eq(schema_id))
1044                .and(sink::Column::Name.eq(&iceberg_sink_name)),
1045        )
1046        .into_tuple::<SinkId>()
1047        .one(db)
1048        .await?;
1049    if let Some(sink_id) = iceberg_sink_id {
1050        related_objects.push(sink_id.as_object_id());
1051        let sink_internal_tables = get_internal_tables_by_id(sink_id.as_job_id(), db).await?;
1052        related_objects.extend(
1053            sink_internal_tables
1054                .into_iter()
1055                .map(|tid| tid.as_object_id()),
1056        );
1057    } else {
1058        warn!(
1059            "iceberg table {} missing sink {}",
1060            table.name, iceberg_sink_name
1061        );
1062    }
1063
1064    let iceberg_source_name = format!("{}{}", ICEBERG_SOURCE_PREFIX, table.name);
1065    let iceberg_source_id = Source::find()
1066        .inner_join(Object)
1067        .select_only()
1068        .column(source::Column::SourceId)
1069        .filter(
1070            object::Column::DatabaseId
1071                .eq(database_id)
1072                .and(object::Column::SchemaId.eq(schema_id))
1073                .and(source::Column::Name.eq(&iceberg_source_name)),
1074        )
1075        .into_tuple::<SourceId>()
1076        .one(db)
1077        .await?;
1078    if let Some(source_id) = iceberg_source_id {
1079        related_objects.push(source_id.as_object_id());
1080    } else {
1081        warn!(
1082            "iceberg table {} missing source {}",
1083            table.name, iceberg_source_name
1084        );
1085    }
1086
1087    Ok(related_objects)
1088}
1089
1090#[derive(Clone, DerivePartialModel, FromQueryResult)]
1091#[sea_orm(entity = "UserPrivilege")]
1092pub struct PartialUserPrivilege {
1093    pub id: PrivilegeId,
1094    pub user_id: UserId,
1095}
1096
1097pub async fn get_referring_privileges_cascade<C>(
1098    ids: Vec<PrivilegeId>,
1099    db: &C,
1100) -> MetaResult<Vec<PartialUserPrivilege>>
1101where
1102    C: ConnectionTrait,
1103{
1104    let query = construct_privilege_dependency_query(ids);
1105    let (sql, values) = query.build_any(&*db.get_database_backend().get_query_builder());
1106    let privileges = PartialUserPrivilege::find_by_statement(Statement::from_sql_and_values(
1107        db.get_database_backend(),
1108        sql,
1109        values,
1110    ))
1111    .all(db)
1112    .await?;
1113
1114    Ok(privileges)
1115}
1116
1117/// `ensure_privileges_not_referred` ensures that the privileges are not granted to any other users.
1118pub async fn ensure_privileges_not_referred<C>(ids: Vec<PrivilegeId>, db: &C) -> MetaResult<()>
1119where
1120    C: ConnectionTrait,
1121{
1122    let count = UserPrivilege::find()
1123        .filter(user_privilege::Column::DependentId.is_in(ids))
1124        .count(db)
1125        .await?;
1126    if count != 0 {
1127        return Err(MetaError::permission_denied(format!(
1128            "privileges granted to {} other ones.",
1129            count
1130        )));
1131    }
1132    Ok(())
1133}
1134
1135/// `get_user_privilege` returns the privileges of the given user.
1136pub async fn get_user_privilege<C>(user_id: UserId, db: &C) -> MetaResult<Vec<PbGrantPrivilege>>
1137where
1138    C: ConnectionTrait,
1139{
1140    let user_privileges = UserPrivilege::find()
1141        .find_also_related(Object)
1142        .filter(user_privilege::Column::UserId.eq(user_id))
1143        .all(db)
1144        .await?;
1145    Ok(user_privileges
1146        .into_iter()
1147        .map(|(privilege, object)| {
1148            let object = object.unwrap();
1149            let oid = object.oid.as_raw_id();
1150            let obj = match object.obj_type {
1151                ObjectType::Database => PbGrantObject::DatabaseId(oid),
1152                ObjectType::Schema => PbGrantObject::SchemaId(oid),
1153                ObjectType::Table | ObjectType::Index => PbGrantObject::TableId(oid),
1154                ObjectType::Source => PbGrantObject::SourceId(oid),
1155                ObjectType::Sink => PbGrantObject::SinkId(oid),
1156                ObjectType::View => PbGrantObject::ViewId(oid),
1157                ObjectType::Function => PbGrantObject::FunctionId(oid),
1158                ObjectType::Connection => PbGrantObject::ConnectionId(oid),
1159                ObjectType::Subscription => PbGrantObject::SubscriptionId(oid),
1160                ObjectType::Secret => PbGrantObject::SecretId(oid),
1161            };
1162            PbGrantPrivilege {
1163                action_with_opts: vec![PbActionWithGrantOption {
1164                    action: PbAction::from(privilege.action) as _,
1165                    with_grant_option: privilege.with_grant_option,
1166                    granted_by: privilege.granted_by as _,
1167                }],
1168                object: Some(obj),
1169            }
1170        })
1171        .collect())
1172}
1173
1174pub async fn get_table_columns(
1175    txn: &impl ConnectionTrait,
1176    id: TableId,
1177) -> MetaResult<ColumnCatalogArray> {
1178    let columns = Table::find_by_id(id)
1179        .select_only()
1180        .columns([table::Column::Columns])
1181        .into_tuple::<ColumnCatalogArray>()
1182        .one(txn)
1183        .await?
1184        .ok_or_else(|| MetaError::catalog_id_not_found("table", id))?;
1185    Ok(columns)
1186}
1187
1188/// `grant_default_privileges_automatically` grants default privileges automatically
1189/// for the given new object. It returns the list of user infos whose privileges are updated.
1190pub async fn grant_default_privileges_automatically<C>(
1191    db: &C,
1192    object_id: impl Into<ObjectId>,
1193) -> MetaResult<Vec<PbUserInfo>>
1194where
1195    C: ConnectionTrait,
1196{
1197    let object_id = object_id.into();
1198    let object = Object::find_by_id(object_id)
1199        .one(db)
1200        .await?
1201        .ok_or_else(|| MetaError::catalog_id_not_found("object", object_id))?;
1202    assert_ne!(object.obj_type, ObjectType::Database);
1203
1204    let for_mview_filter = if object.obj_type == ObjectType::Table {
1205        let table_type = Table::find_by_id(object_id.as_table_id())
1206            .select_only()
1207            .column(table::Column::TableType)
1208            .into_tuple::<TableType>()
1209            .one(db)
1210            .await?
1211            .ok_or_else(|| MetaError::catalog_id_not_found("table", object_id))?;
1212        user_default_privilege::Column::ForMaterializedView
1213            .eq(table_type == TableType::MaterializedView)
1214    } else {
1215        user_default_privilege::Column::ForMaterializedView.eq(false)
1216    };
1217    let schema_filter = if let Some(schema_id) = &object.schema_id {
1218        user_default_privilege::Column::SchemaId.eq(*schema_id)
1219    } else {
1220        user_default_privilege::Column::SchemaId.is_null()
1221    };
1222
1223    let default_privileges: Vec<(UserId, UserId, Action, bool)> = UserDefaultPrivilege::find()
1224        .select_only()
1225        .columns([
1226            user_default_privilege::Column::Grantee,
1227            user_default_privilege::Column::GrantedBy,
1228            user_default_privilege::Column::Action,
1229            user_default_privilege::Column::WithGrantOption,
1230        ])
1231        .filter(
1232            user_default_privilege::Column::DatabaseId
1233                .eq(object.database_id.unwrap())
1234                .and(schema_filter)
1235                .and(user_default_privilege::Column::UserId.eq(object.owner_id))
1236                .and(user_default_privilege::Column::ObjectType.eq(object.obj_type))
1237                .and(for_mview_filter),
1238        )
1239        .into_tuple()
1240        .all(db)
1241        .await?;
1242    if default_privileges.is_empty() {
1243        return Ok(vec![]);
1244    }
1245
1246    let updated_user_ids = default_privileges
1247        .iter()
1248        .map(|(grantee, _, _, _)| *grantee)
1249        .collect::<HashSet<_>>();
1250
1251    for (grantee, granted_by, action, with_grant_option) in default_privileges {
1252        UserPrivilege::insert(user_privilege::ActiveModel {
1253            user_id: Set(grantee),
1254            oid: Set(object_id),
1255            granted_by: Set(granted_by),
1256            action: Set(action),
1257            with_grant_option: Set(with_grant_option),
1258            ..Default::default()
1259        })
1260        .exec(db)
1261        .await?;
1262        if action == Action::Select {
1263            // Grant SELECT privilege for internal tables if the action is SELECT.
1264            let internal_table_ids = get_internal_tables_by_id(object_id.as_job_id(), db).await?;
1265            if !internal_table_ids.is_empty() {
1266                for internal_table_id in &internal_table_ids {
1267                    UserPrivilege::insert(user_privilege::ActiveModel {
1268                        user_id: Set(grantee),
1269                        oid: Set(internal_table_id.as_object_id()),
1270                        granted_by: Set(granted_by),
1271                        action: Set(Action::Select),
1272                        with_grant_option: Set(with_grant_option),
1273                        ..Default::default()
1274                    })
1275                    .exec(db)
1276                    .await?;
1277                }
1278            }
1279
1280            // Additionally, grant SELECT privilege for iceberg related objects if the action is SELECT.
1281            let iceberg_privilege_object_ids =
1282                get_iceberg_related_object_ids(object_id, db).await?;
1283            if !iceberg_privilege_object_ids.is_empty() {
1284                for iceberg_object_id in &iceberg_privilege_object_ids {
1285                    UserPrivilege::insert(user_privilege::ActiveModel {
1286                        user_id: Set(grantee),
1287                        oid: Set(*iceberg_object_id),
1288                        granted_by: Set(granted_by),
1289                        action: Set(action),
1290                        with_grant_option: Set(with_grant_option),
1291                        ..Default::default()
1292                    })
1293                    .exec(db)
1294                    .await?;
1295                }
1296            }
1297        }
1298    }
1299
1300    let updated_user_infos = list_user_info_by_ids(updated_user_ids, db).await?;
1301    Ok(updated_user_infos)
1302}
1303
1304// todo: remove it after migrated to sql backend.
1305pub fn extract_grant_obj_id(object: &PbGrantObject) -> ObjectId {
1306    match object {
1307        PbGrantObject::DatabaseId(id)
1308        | PbGrantObject::SchemaId(id)
1309        | PbGrantObject::TableId(id)
1310        | PbGrantObject::SourceId(id)
1311        | PbGrantObject::SinkId(id)
1312        | PbGrantObject::ViewId(id)
1313        | PbGrantObject::FunctionId(id)
1314        | PbGrantObject::SubscriptionId(id)
1315        | PbGrantObject::ConnectionId(id)
1316        | PbGrantObject::SecretId(id) => (*id).into(),
1317    }
1318}
1319
1320pub async fn insert_fragment_relations(
1321    db: &impl ConnectionTrait,
1322    downstream_fragment_relations: &FragmentDownstreamRelation,
1323) -> MetaResult<()> {
1324    for (upstream_fragment_id, downstreams) in downstream_fragment_relations {
1325        for downstream in downstreams {
1326            let relation = fragment_relation::Model {
1327                source_fragment_id: *upstream_fragment_id as _,
1328                target_fragment_id: downstream.downstream_fragment_id as _,
1329                dispatcher_type: downstream.dispatcher_type,
1330                dist_key_indices: downstream
1331                    .dist_key_indices
1332                    .iter()
1333                    .map(|idx| *idx as i32)
1334                    .collect_vec()
1335                    .into(),
1336                output_indices: downstream
1337                    .output_mapping
1338                    .indices
1339                    .iter()
1340                    .map(|idx| *idx as i32)
1341                    .collect_vec()
1342                    .into(),
1343                output_type_mapping: Some(downstream.output_mapping.types.clone().into()),
1344            };
1345            FragmentRelation::insert(relation.into_active_model())
1346                .exec(db)
1347                .await?;
1348        }
1349    }
1350    Ok(())
1351}
1352
1353pub fn compose_dispatchers(
1354    source_fragment_distribution: DistributionType,
1355    source_fragment_actors: &HashMap<crate::model::ActorId, Option<Bitmap>>,
1356    target_fragment_id: crate::model::FragmentId,
1357    target_fragment_distribution: DistributionType,
1358    target_fragment_actors: &HashMap<crate::model::ActorId, Option<Bitmap>>,
1359    dispatcher_type: DispatcherType,
1360    dist_key_indices: Vec<u32>,
1361    output_mapping: PbDispatchOutputMapping,
1362) -> HashMap<crate::model::ActorId, PbDispatcher> {
1363    match dispatcher_type {
1364        DispatcherType::Hash => {
1365            let dispatcher = PbDispatcher {
1366                r#type: PbDispatcherType::from(dispatcher_type) as _,
1367                dist_key_indices,
1368                output_mapping: output_mapping.into(),
1369                hash_mapping: Some(
1370                    ActorMapping::from_bitmaps(
1371                        &target_fragment_actors
1372                            .iter()
1373                            .map(|(actor_id, bitmap)| {
1374                                (
1375                                    *actor_id as _,
1376                                    bitmap
1377                                        .clone()
1378                                        .expect("downstream hash dispatch must have distribution"),
1379                                )
1380                            })
1381                            .collect(),
1382                    )
1383                    .to_protobuf(),
1384                ),
1385                dispatcher_id: target_fragment_id.as_raw_id() as _,
1386                downstream_actor_id: target_fragment_actors.keys().copied().collect(),
1387            };
1388            source_fragment_actors
1389                .keys()
1390                .map(|source_actor_id| (*source_actor_id, dispatcher.clone()))
1391                .collect()
1392        }
1393        DispatcherType::Broadcast | DispatcherType::Simple => {
1394            let dispatcher = PbDispatcher {
1395                r#type: PbDispatcherType::from(dispatcher_type) as _,
1396                dist_key_indices,
1397                output_mapping: output_mapping.into(),
1398                hash_mapping: None,
1399                dispatcher_id: target_fragment_id.as_raw_id() as _,
1400                downstream_actor_id: target_fragment_actors.keys().copied().collect(),
1401            };
1402            source_fragment_actors
1403                .keys()
1404                .map(|source_actor_id| (*source_actor_id, dispatcher.clone()))
1405                .collect()
1406        }
1407        DispatcherType::NoShuffle => resolve_no_shuffle_actor_dispatcher(
1408            source_fragment_distribution,
1409            source_fragment_actors,
1410            target_fragment_distribution,
1411            target_fragment_actors,
1412        )
1413        .into_iter()
1414        .map(|(upstream_actor_id, downstream_actor_id)| {
1415            (
1416                upstream_actor_id,
1417                PbDispatcher {
1418                    r#type: PbDispatcherType::NoShuffle as _,
1419                    dist_key_indices: dist_key_indices.clone(),
1420                    output_mapping: output_mapping.clone().into(),
1421                    hash_mapping: None,
1422                    dispatcher_id: target_fragment_id.as_raw_id() as _,
1423                    downstream_actor_id: vec![downstream_actor_id],
1424                },
1425            )
1426        })
1427        .collect(),
1428    }
1429}
1430
1431/// return (`upstream_actor_id` -> `downstream_actor_id`)
1432pub fn resolve_no_shuffle_actor_dispatcher(
1433    source_fragment_distribution: DistributionType,
1434    source_fragment_actors: &HashMap<crate::model::ActorId, Option<Bitmap>>,
1435    target_fragment_distribution: DistributionType,
1436    target_fragment_actors: &HashMap<crate::model::ActorId, Option<Bitmap>>,
1437) -> Vec<(crate::model::ActorId, crate::model::ActorId)> {
1438    assert_eq!(source_fragment_distribution, target_fragment_distribution);
1439    assert_eq!(
1440        source_fragment_actors.len(),
1441        target_fragment_actors.len(),
1442        "no-shuffle should have equal upstream downstream actor count: {:?} {:?}",
1443        source_fragment_actors,
1444        target_fragment_actors
1445    );
1446    match source_fragment_distribution {
1447        DistributionType::Single => {
1448            let assert_singleton = |bitmap: &Option<Bitmap>| {
1449                assert!(
1450                    bitmap.as_ref().map(|bitmap| bitmap.all()).unwrap_or(true),
1451                    "not singleton: {:?}",
1452                    bitmap
1453                );
1454            };
1455            assert_eq!(
1456                source_fragment_actors.len(),
1457                1,
1458                "singleton distribution actor count not 1: {:?}",
1459                source_fragment_distribution
1460            );
1461            assert_eq!(
1462                target_fragment_actors.len(),
1463                1,
1464                "singleton distribution actor count not 1: {:?}",
1465                target_fragment_distribution
1466            );
1467            let (source_actor_id, bitmap) = source_fragment_actors.iter().next().unwrap();
1468            assert_singleton(bitmap);
1469            let (target_actor_id, bitmap) = target_fragment_actors.iter().next().unwrap();
1470            assert_singleton(bitmap);
1471            vec![(*source_actor_id, *target_actor_id)]
1472        }
1473        DistributionType::Hash => {
1474            let mut target_fragment_actor_index: HashMap<_, _> = target_fragment_actors
1475                .iter()
1476                .map(|(actor_id, bitmap)| {
1477                    let bitmap = bitmap
1478                        .as_ref()
1479                        .expect("hash distribution should have bitmap");
1480                    let first_vnode = bitmap.iter_vnodes().next().expect("non-empty bitmap");
1481                    (first_vnode, (*actor_id, bitmap))
1482                })
1483                .collect();
1484            source_fragment_actors
1485                .iter()
1486                .map(|(source_actor_id, bitmap)| {
1487                    let bitmap = bitmap
1488                        .as_ref()
1489                        .expect("hash distribution should have bitmap");
1490                    let first_vnode = bitmap.iter_vnodes().next().expect("non-empty bitmap");
1491                    let (target_actor_id, target_bitmap) =
1492                        target_fragment_actor_index.remove(&first_vnode).unwrap_or_else(|| {
1493                            panic!(
1494                                "cannot find matched target actor: {} {:?} {:?} {:?}",
1495                                source_actor_id,
1496                                first_vnode,
1497                                source_fragment_actors,
1498                                target_fragment_actors
1499                            );
1500                        });
1501                    assert_eq!(
1502                        bitmap,
1503                        target_bitmap,
1504                        "cannot find matched target actor due to bitmap mismatch: {} {:?} {:?} {:?}",
1505                        source_actor_id,
1506                        first_vnode,
1507                        source_fragment_actors,
1508                        target_fragment_actors
1509                    );
1510                    (*source_actor_id, target_actor_id)
1511                }).collect()
1512        }
1513    }
1514}
1515
1516pub fn rebuild_fragment_mapping(fragment: &SharedFragmentInfo) -> PbFragmentWorkerSlotMapping {
1517    let fragment_worker_slot_mapping = match fragment.distribution_type {
1518        DistributionType::Single => {
1519            let actor = fragment.actors.values().exactly_one().unwrap();
1520            WorkerSlotMapping::new_single(WorkerSlotId::new(actor.worker_id as _, 0))
1521        }
1522        DistributionType::Hash => {
1523            let actor_bitmaps: HashMap<_, _> = fragment
1524                .actors
1525                .iter()
1526                .map(|(actor_id, actor_info)| {
1527                    let vnode_bitmap = actor_info
1528                        .vnode_bitmap
1529                        .as_ref()
1530                        .cloned()
1531                        .expect("actor bitmap shouldn't be none in hash fragment");
1532
1533                    (*actor_id as hash::ActorId, vnode_bitmap)
1534                })
1535                .collect();
1536
1537            let actor_mapping = ActorMapping::from_bitmaps(&actor_bitmaps);
1538
1539            let actor_locations = fragment
1540                .actors
1541                .iter()
1542                .map(|(actor_id, actor_info)| (*actor_id as hash::ActorId, actor_info.worker_id))
1543                .collect();
1544
1545            actor_mapping.to_worker_slot(&actor_locations)
1546        }
1547    };
1548
1549    PbFragmentWorkerSlotMapping {
1550        fragment_id: fragment.fragment_id,
1551        mapping: Some(fragment_worker_slot_mapping.to_protobuf()),
1552    }
1553}
1554
1555/// For the given streaming jobs, returns
1556/// - All source fragments
1557/// - All sink fragments
1558/// - All actors
1559/// - All fragments
1560pub async fn get_fragments_for_jobs<C>(
1561    db: &C,
1562    actor_info: &SharedActorInfos,
1563    streaming_jobs: Vec<JobId>,
1564) -> MetaResult<(
1565    HashMap<SourceId, BTreeSet<FragmentId>>,
1566    HashSet<FragmentId>,
1567    HashSet<ActorId>,
1568    HashSet<FragmentId>,
1569)>
1570where
1571    C: ConnectionTrait,
1572{
1573    if streaming_jobs.is_empty() {
1574        return Ok((
1575            HashMap::default(),
1576            HashSet::default(),
1577            HashSet::default(),
1578            HashSet::default(),
1579        ));
1580    }
1581
1582    let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find()
1583        .select_only()
1584        .columns([
1585            fragment::Column::FragmentId,
1586            fragment::Column::FragmentTypeMask,
1587            fragment::Column::StreamNode,
1588        ])
1589        .filter(fragment::Column::JobId.is_in(streaming_jobs))
1590        .into_tuple()
1591        .all(db)
1592        .await?;
1593
1594    let fragment_ids: HashSet<_> = fragments
1595        .iter()
1596        .map(|(fragment_id, _, _)| *fragment_id)
1597        .collect();
1598
1599    let actors = {
1600        let guard = actor_info.read_guard();
1601        fragment_ids
1602            .iter()
1603            .flat_map(|id| guard.get_fragment(*id as _))
1604            .flat_map(|f| f.actors.keys().cloned().map(|id| id as _))
1605            .collect::<HashSet<_>>()
1606    };
1607
1608    let mut source_fragment_ids: HashMap<SourceId, BTreeSet<FragmentId>> = HashMap::new();
1609    let mut sink_fragment_ids: HashSet<FragmentId> = HashSet::new();
1610    for (fragment_id, mask, stream_node) in fragments {
1611        if FragmentTypeMask::from(mask).contains(FragmentTypeFlag::Source)
1612            && let Some(source_id) = stream_node.to_protobuf().find_stream_source()
1613        {
1614            source_fragment_ids
1615                .entry(source_id)
1616                .or_default()
1617                .insert(fragment_id);
1618        }
1619        if FragmentTypeMask::from(mask).contains(FragmentTypeFlag::Sink) {
1620            sink_fragment_ids.insert(fragment_id);
1621        }
1622    }
1623
1624    Ok((
1625        source_fragment_ids,
1626        sink_fragment_ids,
1627        actors.into_iter().collect(),
1628        fragment_ids,
1629    ))
1630}
1631
1632/// Build a object group for notifying the deletion of the given objects.
1633///
1634/// Note that only id fields are filled in the object info, as the arguments are partial objects.
1635/// As a result, the returned notification info should only be used for deletion.
1636pub(crate) fn build_object_group_for_delete(
1637    partial_objects: Vec<PartialObject>,
1638) -> NotificationInfo {
1639    let mut objects = vec![];
1640    for obj in partial_objects {
1641        match obj.obj_type {
1642            ObjectType::Database => objects.push(PbObject {
1643                object_info: Some(PbObjectInfo::Database(PbDatabase {
1644                    id: obj.oid.as_database_id(),
1645                    ..Default::default()
1646                })),
1647            }),
1648            ObjectType::Schema => objects.push(PbObject {
1649                object_info: Some(PbObjectInfo::Schema(PbSchema {
1650                    id: obj.oid.as_schema_id(),
1651                    database_id: obj.database_id.unwrap(),
1652                    ..Default::default()
1653                })),
1654            }),
1655            ObjectType::Table => objects.push(PbObject {
1656                object_info: Some(PbObjectInfo::Table(PbTable {
1657                    id: obj.oid.as_table_id(),
1658                    schema_id: obj.schema_id.unwrap(),
1659                    database_id: obj.database_id.unwrap(),
1660                    ..Default::default()
1661                })),
1662            }),
1663            ObjectType::Source => objects.push(PbObject {
1664                object_info: Some(PbObjectInfo::Source(PbSource {
1665                    id: obj.oid.as_source_id(),
1666                    schema_id: obj.schema_id.unwrap(),
1667                    database_id: obj.database_id.unwrap(),
1668                    ..Default::default()
1669                })),
1670            }),
1671            ObjectType::Sink => objects.push(PbObject {
1672                object_info: Some(PbObjectInfo::Sink(PbSink {
1673                    id: obj.oid.as_sink_id(),
1674                    schema_id: obj.schema_id.unwrap(),
1675                    database_id: obj.database_id.unwrap(),
1676                    ..Default::default()
1677                })),
1678            }),
1679            ObjectType::Subscription => objects.push(PbObject {
1680                object_info: Some(PbObjectInfo::Subscription(PbSubscription {
1681                    id: obj.oid.as_subscription_id(),
1682                    schema_id: obj.schema_id.unwrap(),
1683                    database_id: obj.database_id.unwrap(),
1684                    ..Default::default()
1685                })),
1686            }),
1687            ObjectType::View => objects.push(PbObject {
1688                object_info: Some(PbObjectInfo::View(PbView {
1689                    id: obj.oid.as_view_id(),
1690                    schema_id: obj.schema_id.unwrap(),
1691                    database_id: obj.database_id.unwrap(),
1692                    ..Default::default()
1693                })),
1694            }),
1695            ObjectType::Index => {
1696                objects.push(PbObject {
1697                    object_info: Some(PbObjectInfo::Index(PbIndex {
1698                        id: obj.oid.as_index_id(),
1699                        schema_id: obj.schema_id.unwrap(),
1700                        database_id: obj.database_id.unwrap(),
1701                        ..Default::default()
1702                    })),
1703                });
1704                objects.push(PbObject {
1705                    object_info: Some(PbObjectInfo::Table(PbTable {
1706                        id: obj.oid.as_table_id(),
1707                        schema_id: obj.schema_id.unwrap(),
1708                        database_id: obj.database_id.unwrap(),
1709                        ..Default::default()
1710                    })),
1711                });
1712            }
1713            ObjectType::Function => objects.push(PbObject {
1714                object_info: Some(PbObjectInfo::Function(PbFunction {
1715                    id: obj.oid.as_function_id(),
1716                    schema_id: obj.schema_id.unwrap(),
1717                    database_id: obj.database_id.unwrap(),
1718                    ..Default::default()
1719                })),
1720            }),
1721            ObjectType::Connection => objects.push(PbObject {
1722                object_info: Some(PbObjectInfo::Connection(PbConnection {
1723                    id: obj.oid.as_connection_id(),
1724                    schema_id: obj.schema_id.unwrap(),
1725                    database_id: obj.database_id.unwrap(),
1726                    ..Default::default()
1727                })),
1728            }),
1729            ObjectType::Secret => objects.push(PbObject {
1730                object_info: Some(PbObjectInfo::Secret(PbSecret {
1731                    id: obj.oid.as_secret_id(),
1732                    schema_id: obj.schema_id.unwrap(),
1733                    database_id: obj.database_id.unwrap(),
1734                    ..Default::default()
1735                })),
1736            }),
1737        }
1738    }
1739    NotificationInfo::ObjectGroup(PbObjectGroup { objects })
1740}
1741
1742pub fn extract_external_table_name_from_definition(table_definition: &str) -> Option<String> {
1743    let [mut definition]: [_; 1] = Parser::parse_sql(table_definition)
1744        .context("unable to parse table definition")
1745        .inspect_err(|e| {
1746            tracing::error!(
1747                target: "auto_schema_change",
1748                error = %e.as_report(),
1749                "failed to parse table definition")
1750        })
1751        .unwrap()
1752        .try_into()
1753        .unwrap();
1754    if let SqlStatement::CreateTable { cdc_table_info, .. } = &mut definition {
1755        cdc_table_info
1756            .clone()
1757            .map(|cdc_table_info| cdc_table_info.external_table_name)
1758    } else {
1759        None
1760    }
1761}
1762
1763/// `rename_relation` renames the target relation and its definition,
1764/// it commits the changes to the transaction and returns the updated relations and the old name.
1765pub async fn rename_relation(
1766    txn: &DatabaseTransaction,
1767    object_type: ObjectType,
1768    object_id: ObjectId,
1769    object_name: &str,
1770) -> MetaResult<(Vec<PbObject>, String)> {
1771    use sea_orm::ActiveModelTrait;
1772
1773    use crate::controller::rename::alter_relation_rename;
1774
1775    let mut to_update_relations = vec![];
1776    // rename relation.
1777    macro_rules! rename_relation {
1778        ($entity:ident, $table:ident, $identity:ident, $object_id:expr) => {{
1779            let (mut relation, obj) = $entity::find_by_id($object_id)
1780                .find_also_related(Object)
1781                .one(txn)
1782                .await?
1783                .unwrap();
1784            let obj = obj.unwrap();
1785            let old_name = relation.name.clone();
1786            relation.name = object_name.into();
1787            if obj.obj_type != ObjectType::View {
1788                relation.definition = alter_relation_rename(&relation.definition, object_name);
1789            }
1790            let active_model = $table::ActiveModel {
1791                $identity: Set(relation.$identity),
1792                name: Set(object_name.into()),
1793                definition: Set(relation.definition.clone()),
1794                ..Default::default()
1795            };
1796            active_model.update(txn).await?;
1797            to_update_relations.push(PbObject {
1798                object_info: Some(PbObjectInfo::$entity(ObjectModel(relation, obj).into())),
1799            });
1800            old_name
1801        }};
1802    }
1803    // TODO: check is there any thing to change for shared source?
1804    let old_name = match object_type {
1805        ObjectType::Table => {
1806            let associated_source_id: Option<SourceId> = Source::find()
1807                .select_only()
1808                .column(source::Column::SourceId)
1809                .filter(source::Column::OptionalAssociatedTableId.eq(object_id))
1810                .into_tuple()
1811                .one(txn)
1812                .await?;
1813            if let Some(source_id) = associated_source_id {
1814                rename_relation!(Source, source, source_id, source_id);
1815            }
1816            rename_relation!(Table, table, table_id, object_id.as_table_id())
1817        }
1818        ObjectType::Source => {
1819            rename_relation!(Source, source, source_id, object_id.as_source_id())
1820        }
1821        ObjectType::Sink => rename_relation!(Sink, sink, sink_id, object_id.as_sink_id()),
1822        ObjectType::Subscription => {
1823            rename_relation!(
1824                Subscription,
1825                subscription,
1826                subscription_id,
1827                object_id.as_subscription_id()
1828            )
1829        }
1830        ObjectType::View => rename_relation!(View, view, view_id, object_id.as_view_id()),
1831        ObjectType::Index => {
1832            let (mut index, obj) = Index::find_by_id(object_id.as_index_id())
1833                .find_also_related(Object)
1834                .one(txn)
1835                .await?
1836                .unwrap();
1837            index.name = object_name.into();
1838            let index_table_id = index.index_table_id;
1839            let old_name = rename_relation!(Table, table, table_id, index_table_id);
1840
1841            // the name of index and its associated table is the same.
1842            let active_model = index::ActiveModel {
1843                index_id: sea_orm::ActiveValue::Set(index.index_id),
1844                name: sea_orm::ActiveValue::Set(object_name.into()),
1845                ..Default::default()
1846            };
1847            active_model.update(txn).await?;
1848            to_update_relations.push(PbObject {
1849                object_info: Some(PbObjectInfo::Index(ObjectModel(index, obj.unwrap()).into())),
1850            });
1851            old_name
1852        }
1853        _ => unreachable!("only relation name can be altered."),
1854    };
1855
1856    Ok((to_update_relations, old_name))
1857}
1858
1859pub async fn get_database_resource_group<C>(txn: &C, database_id: DatabaseId) -> MetaResult<String>
1860where
1861    C: ConnectionTrait,
1862{
1863    let database_resource_group: Option<String> = Database::find_by_id(database_id)
1864        .select_only()
1865        .column(database::Column::ResourceGroup)
1866        .into_tuple()
1867        .one(txn)
1868        .await?
1869        .ok_or_else(|| MetaError::catalog_id_not_found("database", database_id))?;
1870
1871    Ok(database_resource_group.unwrap_or_else(|| DEFAULT_RESOURCE_GROUP.to_owned()))
1872}
1873
1874pub async fn get_existing_job_resource_group<C>(
1875    txn: &C,
1876    streaming_job_id: JobId,
1877) -> MetaResult<String>
1878where
1879    C: ConnectionTrait,
1880{
1881    let (job_specific_resource_group, database_resource_group): (Option<String>, Option<String>) =
1882        StreamingJob::find_by_id(streaming_job_id)
1883            .select_only()
1884            .join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
1885            .join(JoinType::InnerJoin, object::Relation::Database2.def())
1886            .column(streaming_job::Column::SpecificResourceGroup)
1887            .column(database::Column::ResourceGroup)
1888            .into_tuple()
1889            .one(txn)
1890            .await?
1891            .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", streaming_job_id))?;
1892
1893    Ok(job_specific_resource_group.unwrap_or_else(|| {
1894        database_resource_group.unwrap_or_else(|| DEFAULT_RESOURCE_GROUP.to_owned())
1895    }))
1896}
1897
1898pub fn filter_workers_by_resource_group(
1899    workers: &HashMap<WorkerId, WorkerNode>,
1900    resource_group: &str,
1901) -> BTreeSet<WorkerId> {
1902    workers
1903        .iter()
1904        .filter(|&(_, worker)| {
1905            worker
1906                .resource_group()
1907                .map(|node_label| node_label.as_str() == resource_group)
1908                .unwrap_or(false)
1909        })
1910        .map(|(id, _)| *id)
1911        .collect()
1912}
1913
1914/// `rename_relation_refer` updates the definition of relations that refer to the target one,
1915/// it commits the changes to the transaction and returns all the updated relations.
1916pub async fn rename_relation_refer(
1917    txn: &DatabaseTransaction,
1918    object_type: ObjectType,
1919    object_id: ObjectId,
1920    object_name: &str,
1921    old_name: &str,
1922) -> MetaResult<Vec<PbObject>> {
1923    use sea_orm::ActiveModelTrait;
1924
1925    use crate::controller::rename::alter_relation_rename_refs;
1926
1927    let mut to_update_relations = vec![];
1928    macro_rules! rename_relation_ref {
1929        ($entity:ident, $table:ident, $identity:ident, $object_id:expr) => {{
1930            let (mut relation, obj) = $entity::find_by_id($object_id)
1931                .find_also_related(Object)
1932                .one(txn)
1933                .await?
1934                .unwrap();
1935            relation.definition =
1936                alter_relation_rename_refs(&relation.definition, old_name, object_name);
1937            let active_model = $table::ActiveModel {
1938                $identity: Set(relation.$identity),
1939                definition: Set(relation.definition.clone()),
1940                ..Default::default()
1941            };
1942            active_model.update(txn).await?;
1943            to_update_relations.push(PbObject {
1944                object_info: Some(PbObjectInfo::$entity(
1945                    ObjectModel(relation, obj.unwrap()).into(),
1946                )),
1947            });
1948        }};
1949    }
1950    let mut objs = get_referring_objects(object_id, txn).await?;
1951    if object_type == ObjectType::Table {
1952        let incoming_sinks: Vec<SinkId> = Sink::find()
1953            .select_only()
1954            .column(sink::Column::SinkId)
1955            .filter(sink::Column::TargetTable.eq(object_id))
1956            .into_tuple()
1957            .all(txn)
1958            .await?;
1959
1960        objs.extend(incoming_sinks.into_iter().map(|id| PartialObject {
1961            oid: id.as_object_id(),
1962            obj_type: ObjectType::Sink,
1963            schema_id: None,
1964            database_id: None,
1965        }));
1966    }
1967
1968    for obj in objs {
1969        match obj.obj_type {
1970            ObjectType::Table => {
1971                rename_relation_ref!(Table, table, table_id, obj.oid.as_table_id())
1972            }
1973            ObjectType::Sink => {
1974                rename_relation_ref!(Sink, sink, sink_id, obj.oid.as_sink_id())
1975            }
1976            ObjectType::Subscription => {
1977                rename_relation_ref!(
1978                    Subscription,
1979                    subscription,
1980                    subscription_id,
1981                    obj.oid.as_subscription_id()
1982                )
1983            }
1984            ObjectType::View => rename_relation_ref!(View, view, view_id, obj.oid.as_view_id()),
1985            ObjectType::Index => {
1986                let index_table_id: Option<TableId> = Index::find_by_id(obj.oid.as_index_id())
1987                    .select_only()
1988                    .column(index::Column::IndexTableId)
1989                    .into_tuple()
1990                    .one(txn)
1991                    .await?;
1992                rename_relation_ref!(Table, table, table_id, index_table_id.unwrap());
1993            }
1994            _ => {
1995                bail!(
1996                    "only the table, sink, subscription, view and index will depend on other objects."
1997                )
1998            }
1999        }
2000    }
2001
2002    Ok(to_update_relations)
2003}
2004
2005/// Validate that subscription can be safely deleted, meeting any of the following conditions:
2006/// 1. The upstream table is not referred to by any cross-db mv.
2007/// 2. After deleting the subscription, the upstream table still has at least one subscription.
2008pub async fn validate_subscription_deletion<C>(
2009    txn: &C,
2010    subscription_id: SubscriptionId,
2011) -> MetaResult<()>
2012where
2013    C: ConnectionTrait,
2014{
2015    let upstream_table_id: ObjectId = Subscription::find_by_id(subscription_id)
2016        .select_only()
2017        .column(subscription::Column::DependentTableId)
2018        .into_tuple()
2019        .one(txn)
2020        .await?
2021        .ok_or_else(|| MetaError::catalog_id_not_found("subscription", subscription_id))?;
2022
2023    let cnt = Subscription::find()
2024        .filter(subscription::Column::DependentTableId.eq(upstream_table_id))
2025        .count(txn)
2026        .await?;
2027    if cnt > 1 {
2028        // Ensure that at least one subscription is remained for the upstream table
2029        // once the subscription is dropped.
2030        return Ok(());
2031    }
2032
2033    // Ensure that the upstream table is not referred by any cross-db mv.
2034    let obj_alias = Alias::new("o1");
2035    let used_by_alias = Alias::new("o2");
2036    let count = ObjectDependency::find()
2037        .join_as(
2038            JoinType::InnerJoin,
2039            object_dependency::Relation::Object2.def(),
2040            obj_alias.clone(),
2041        )
2042        .join_as(
2043            JoinType::InnerJoin,
2044            object_dependency::Relation::Object1.def(),
2045            used_by_alias.clone(),
2046        )
2047        .filter(
2048            object_dependency::Column::Oid
2049                .eq(upstream_table_id)
2050                .and(object_dependency::Column::UsedBy.ne(subscription_id))
2051                .and(
2052                    Expr::col((obj_alias, object::Column::DatabaseId))
2053                        .ne(Expr::col((used_by_alias, object::Column::DatabaseId))),
2054                ),
2055        )
2056        .count(txn)
2057        .await?;
2058
2059    if count != 0 {
2060        return Err(MetaError::permission_denied(format!(
2061            "Referenced by {} cross-db objects.",
2062            count
2063        )));
2064    }
2065
2066    Ok(())
2067}
2068
2069pub async fn fetch_target_fragments<C>(
2070    txn: &C,
2071    src_fragment_id: impl IntoIterator<Item = FragmentId>,
2072) -> MetaResult<HashMap<FragmentId, Vec<FragmentId>>>
2073where
2074    C: ConnectionTrait,
2075{
2076    let source_target_fragments: Vec<(FragmentId, FragmentId)> = FragmentRelation::find()
2077        .select_only()
2078        .columns([
2079            fragment_relation::Column::SourceFragmentId,
2080            fragment_relation::Column::TargetFragmentId,
2081        ])
2082        .filter(fragment_relation::Column::SourceFragmentId.is_in(src_fragment_id))
2083        .into_tuple()
2084        .all(txn)
2085        .await?;
2086
2087    let source_target_fragments = source_target_fragments.into_iter().into_group_map();
2088
2089    Ok(source_target_fragments)
2090}
2091
2092pub async fn get_sink_fragment_by_ids<C>(
2093    txn: &C,
2094    sink_ids: Vec<SinkId>,
2095) -> MetaResult<HashMap<SinkId, FragmentId>>
2096where
2097    C: ConnectionTrait,
2098{
2099    let sink_num = sink_ids.len();
2100    let sink_fragment_ids: Vec<(SinkId, FragmentId)> = Fragment::find()
2101        .select_only()
2102        .columns([fragment::Column::JobId, fragment::Column::FragmentId])
2103        .filter(
2104            fragment::Column::JobId
2105                .is_in(sink_ids)
2106                .and(FragmentTypeMask::intersects(FragmentTypeFlag::Sink)),
2107        )
2108        .into_tuple()
2109        .all(txn)
2110        .await?;
2111
2112    if sink_fragment_ids.len() != sink_num {
2113        return Err(anyhow::anyhow!(
2114            "expected exactly one sink fragment for each sink, but got {} fragments for {} sinks",
2115            sink_fragment_ids.len(),
2116            sink_num
2117        )
2118        .into());
2119    }
2120
2121    Ok(sink_fragment_ids.into_iter().collect())
2122}
2123
2124pub async fn has_table_been_migrated<C>(txn: &C, table_id: TableId) -> MetaResult<bool>
2125where
2126    C: ConnectionTrait,
2127{
2128    let mview_fragment: Vec<i32> = Fragment::find()
2129        .select_only()
2130        .column(fragment::Column::FragmentTypeMask)
2131        .filter(
2132            fragment::Column::JobId
2133                .eq(table_id)
2134                .and(FragmentTypeMask::intersects(FragmentTypeFlag::Mview)),
2135        )
2136        .into_tuple()
2137        .all(txn)
2138        .await?;
2139
2140    let mview_fragment_len = mview_fragment.len();
2141    if mview_fragment_len != 1 {
2142        bail!(
2143            "expected exactly one mview fragment for table {}, found {}",
2144            table_id,
2145            mview_fragment_len
2146        );
2147    }
2148
2149    let mview_fragment = mview_fragment.into_iter().next().unwrap();
2150    let migrated =
2151        FragmentTypeMask::from(mview_fragment).contains(FragmentTypeFlag::UpstreamSinkUnion);
2152
2153    Ok(migrated)
2154}
2155
2156pub async fn try_get_iceberg_table_by_downstream_sink<C>(
2157    txn: &C,
2158    sink_id: SinkId,
2159) -> MetaResult<Option<TableId>>
2160where
2161    C: ConnectionTrait,
2162{
2163    let sink = Sink::find_by_id(sink_id).one(txn).await?;
2164    let Some(sink) = sink else {
2165        return Ok(None);
2166    };
2167
2168    if sink.name.starts_with(ICEBERG_SINK_PREFIX) {
2169        let object_ids: Vec<ObjectId> = ObjectDependency::find()
2170            .select_only()
2171            .column(object_dependency::Column::Oid)
2172            .filter(object_dependency::Column::UsedBy.eq(sink_id))
2173            .into_tuple()
2174            .all(txn)
2175            .await?;
2176        let mut iceberg_table_ids = vec![];
2177        for object_id in object_ids {
2178            let table_id = object_id.as_table_id();
2179            if let Some(table_engine) = Table::find_by_id(table_id)
2180                .select_only()
2181                .column(table::Column::Engine)
2182                .into_tuple::<table::Engine>()
2183                .one(txn)
2184                .await?
2185                && table_engine == table::Engine::Iceberg
2186            {
2187                iceberg_table_ids.push(table_id);
2188            }
2189        }
2190        if iceberg_table_ids.len() == 1 {
2191            return Ok(Some(iceberg_table_ids[0]));
2192        }
2193    }
2194    Ok(None)
2195}
2196
2197pub async fn check_if_belongs_to_iceberg_table<C>(txn: &C, job_id: JobId) -> MetaResult<bool>
2198where
2199    C: ConnectionTrait,
2200{
2201    if let Some(engine) = Table::find_by_id(job_id.as_mv_table_id())
2202        .select_only()
2203        .column(table::Column::Engine)
2204        .into_tuple::<table::Engine>()
2205        .one(txn)
2206        .await?
2207        && engine == table::Engine::Iceberg
2208    {
2209        return Ok(true);
2210    }
2211    if let Some(sink_name) = Sink::find_by_id(job_id.as_sink_id())
2212        .select_only()
2213        .column(sink::Column::Name)
2214        .into_tuple::<String>()
2215        .one(txn)
2216        .await?
2217        && sink_name.starts_with(ICEBERG_SINK_PREFIX)
2218    {
2219        return Ok(true);
2220    }
2221    Ok(false)
2222}
2223
2224pub async fn find_dirty_iceberg_table_jobs<C>(
2225    txn: &C,
2226    database_id: Option<DatabaseId>,
2227) -> MetaResult<Vec<PartialObject>>
2228where
2229    C: ConnectionTrait,
2230{
2231    let mut filter_condition = streaming_job::Column::JobStatus
2232        .ne(JobStatus::Created)
2233        .and(object::Column::ObjType.is_in([ObjectType::Table, ObjectType::Sink]))
2234        .and(streaming_job::Column::CreateType.eq(CreateType::Background));
2235    if let Some(database_id) = database_id {
2236        filter_condition = filter_condition.and(object::Column::DatabaseId.eq(database_id));
2237    }
2238    let creating_table_sink_jobs: Vec<PartialObject> = StreamingJob::find()
2239        .select_only()
2240        .columns([
2241            object::Column::Oid,
2242            object::Column::ObjType,
2243            object::Column::SchemaId,
2244            object::Column::DatabaseId,
2245        ])
2246        .join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
2247        .filter(filter_condition)
2248        .into_partial_model()
2249        .all(txn)
2250        .await?;
2251
2252    let mut dirty_iceberg_table_jobs = vec![];
2253    for job in creating_table_sink_jobs {
2254        if check_if_belongs_to_iceberg_table(txn, job.oid.as_job_id()).await? {
2255            tracing::info!("Found dirty iceberg job with id: {}", job.oid);
2256            dirty_iceberg_table_jobs.push(job);
2257        }
2258    }
2259
2260    Ok(dirty_iceberg_table_jobs)
2261}
2262
2263pub fn build_select_node_list(
2264    from: &[ColumnCatalog],
2265    to: &[ColumnCatalog],
2266) -> MetaResult<Vec<PbExprNode>> {
2267    let mut exprs = Vec::with_capacity(to.len());
2268    let idx_by_col_id = from
2269        .iter()
2270        .enumerate()
2271        .map(|(idx, col)| (col.column_desc.as_ref().unwrap().column_id, idx))
2272        .collect::<HashMap<_, _>>();
2273
2274    for to_col in to {
2275        let to_col = to_col.column_desc.as_ref().unwrap();
2276        let to_col_type_ref = to_col.column_type.as_ref().unwrap();
2277        let to_col_type = DataType::from(to_col_type_ref);
2278        if let Some(from_idx) = idx_by_col_id.get(&to_col.column_id) {
2279            let from_col_type = DataType::from(
2280                from[*from_idx]
2281                    .column_desc
2282                    .as_ref()
2283                    .unwrap()
2284                    .column_type
2285                    .as_ref()
2286                    .unwrap(),
2287            );
2288            if !to_col_type.equals_datatype(&from_col_type) {
2289                return Err(anyhow!(
2290                    "Column type mismatch: {:?} != {:?}",
2291                    from_col_type,
2292                    to_col_type
2293                )
2294                .into());
2295            }
2296            exprs.push(PbExprNode {
2297                function_type: expr_node::Type::Unspecified.into(),
2298                return_type: Some(to_col_type_ref.clone()),
2299                rex_node: Some(expr_node::RexNode::InputRef(*from_idx as _)),
2300            });
2301        } else {
2302            let to_default_node =
2303                if let Some(GeneratedOrDefaultColumn::DefaultColumn(DefaultColumnDesc {
2304                    expr,
2305                    ..
2306                })) = &to_col.generated_or_default_column
2307                {
2308                    expr.clone().unwrap()
2309                } else {
2310                    let null = Datum::None.to_protobuf();
2311                    PbExprNode {
2312                        function_type: expr_node::Type::Unspecified.into(),
2313                        return_type: Some(to_col_type_ref.clone()),
2314                        rex_node: Some(expr_node::RexNode::Constant(null)),
2315                    }
2316                };
2317            exprs.push(to_default_node);
2318        }
2319    }
2320
2321    Ok(exprs)
2322}
2323
2324#[derive(Clone, Debug, Default)]
2325pub struct StreamingJobExtraInfo {
2326    pub timezone: Option<String>,
2327    pub config_override: Arc<str>,
2328    pub job_definition: String,
2329}
2330
2331impl StreamingJobExtraInfo {
2332    pub fn stream_context(&self) -> StreamContext {
2333        StreamContext {
2334            timezone: self.timezone.clone(),
2335            config_override: self.config_override.clone(),
2336        }
2337    }
2338}
2339
2340pub async fn get_streaming_job_extra_info<C>(
2341    txn: &C,
2342    job_ids: Vec<JobId>,
2343) -> MetaResult<HashMap<JobId, StreamingJobExtraInfo>>
2344where
2345    C: ConnectionTrait,
2346{
2347    let pairs: Vec<(JobId, Option<String>, Option<String>)> = StreamingJob::find()
2348        .select_only()
2349        .columns([
2350            streaming_job::Column::JobId,
2351            streaming_job::Column::Timezone,
2352            streaming_job::Column::ConfigOverride,
2353        ])
2354        .filter(streaming_job::Column::JobId.is_in(job_ids.clone()))
2355        .into_tuple()
2356        .all(txn)
2357        .await?;
2358
2359    let job_ids = job_ids.into_iter().collect();
2360
2361    let mut definitions = resolve_streaming_job_definition(txn, &job_ids).await?;
2362
2363    let result = pairs
2364        .into_iter()
2365        .map(|(job_id, timezone, config_override)| {
2366            let job_definition = definitions.remove(&job_id).unwrap_or_default();
2367            (
2368                job_id,
2369                StreamingJobExtraInfo {
2370                    timezone,
2371                    config_override: config_override.unwrap_or_default().into(),
2372                    job_definition,
2373                },
2374            )
2375        })
2376        .collect();
2377
2378    Ok(result)
2379}
2380
2381#[cfg(test)]
2382mod tests {
2383    use super::*;
2384
2385    #[test]
2386    fn test_extract_cdc_table_name() {
2387        let ddl1 = "CREATE TABLE t1 () FROM pg_source TABLE 'public.t1'";
2388        let ddl2 = "CREATE TABLE t2 (v1 int) FROM pg_source TABLE 'mydb.t2'";
2389        assert_eq!(
2390            extract_external_table_name_from_definition(ddl1),
2391            Some("public.t1".into())
2392        );
2393        assert_eq!(
2394            extract_external_table_name_from_definition(ddl2),
2395            Some("mydb.t2".into())
2396        );
2397    }
2398}