risingwave_meta/controller/
utils.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeSet, HashMap, HashSet};
16
17use anyhow::{Context, anyhow};
18use itertools::Itertools;
19use risingwave_common::bitmap::Bitmap;
20use risingwave_common::catalog::{FragmentTypeFlag, FragmentTypeMask, ICEBERG_SINK_PREFIX};
21use risingwave_common::hash::{ActorMapping, VnodeBitmapExt, WorkerSlotId, WorkerSlotMapping};
22use risingwave_common::id::{JobId, SubscriptionId};
23use risingwave_common::types::Datum;
24use risingwave_common::util::value_encoding::DatumToProtoExt;
25use risingwave_common::util::worker_util::DEFAULT_RESOURCE_GROUP;
26use risingwave_common::{bail, hash};
27use risingwave_meta_model::fragment::DistributionType;
28use risingwave_meta_model::object::ObjectType;
29use risingwave_meta_model::prelude::*;
30use risingwave_meta_model::table::TableType;
31use risingwave_meta_model::user_privilege::Action;
32use risingwave_meta_model::{
33    ActorId, ColumnCatalogArray, CreateType, DataTypeArray, DatabaseId, DispatcherType, FragmentId,
34    JobStatus, ObjectId, PrivilegeId, SchemaId, SinkId, SourceId, StreamNode, StreamSourceInfo,
35    TableId, TableIdArray, UserId, WorkerId, connection, database, fragment, fragment_relation,
36    function, index, object, object_dependency, schema, secret, sink, source, streaming_job,
37    subscription, table, user, user_default_privilege, user_privilege, view,
38};
39use risingwave_meta_model_migration::WithQuery;
40use risingwave_pb::catalog::{
41    PbConnection, PbDatabase, PbFunction, PbIndex, PbSchema, PbSecret, PbSink, PbSource,
42    PbSubscription, PbTable, PbView,
43};
44use risingwave_pb::common::WorkerNode;
45use risingwave_pb::expr::{PbExprNode, expr_node};
46use risingwave_pb::meta::object::PbObjectInfo;
47use risingwave_pb::meta::subscribe_response::Info as NotificationInfo;
48use risingwave_pb::meta::{PbFragmentWorkerSlotMapping, PbObject, PbObjectGroup};
49use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn;
50use risingwave_pb::plan_common::{ColumnCatalog, DefaultColumnDesc};
51use risingwave_pb::stream_plan::{PbDispatchOutputMapping, PbDispatcher, PbDispatcherType};
52use risingwave_pb::user::grant_privilege::{PbActionWithGrantOption, PbObject as PbGrantObject};
53use risingwave_pb::user::{PbAction, PbGrantPrivilege, PbUserInfo};
54use risingwave_sqlparser::ast::Statement as SqlStatement;
55use risingwave_sqlparser::parser::Parser;
56use sea_orm::sea_query::{
57    Alias, CommonTableExpression, Expr, Query, QueryStatementBuilder, SelectStatement, UnionType,
58    WithClause,
59};
60use sea_orm::{
61    ColumnTrait, ConnectionTrait, DatabaseTransaction, DerivePartialModel, EntityTrait,
62    FromQueryResult, IntoActiveModel, JoinType, Order, PaginatorTrait, QueryFilter, QuerySelect,
63    RelationTrait, Set, Statement,
64};
65use thiserror_ext::AsReport;
66
67use crate::barrier::{SharedActorInfos, SharedFragmentInfo};
68use crate::controller::ObjectModel;
69use crate::controller::fragment::FragmentTypeMaskExt;
70use crate::controller::scale::resolve_streaming_job_definition;
71use crate::model::FragmentDownstreamRelation;
72use crate::{MetaError, MetaResult};
73
74/// This function will construct a query using recursive cte to find all objects[(id, `obj_type`)] that are used by the given object.
75///
76/// # Examples
77///
78/// ```
79/// use risingwave_meta::controller::utils::construct_obj_dependency_query;
80/// use sea_orm::sea_query::*;
81/// use sea_orm::*;
82///
83/// let query = construct_obj_dependency_query(1.into());
84///
85/// assert_eq!(
86///     query.to_string(MysqlQueryBuilder),
87///     r#"WITH RECURSIVE `used_by_object_ids` (`used_by`) AS (SELECT `used_by` FROM `object_dependency` WHERE `object_dependency`.`oid` = 1 UNION ALL (SELECT `oid` FROM `object` WHERE `object`.`database_id` = 1 OR `object`.`schema_id` = 1) UNION ALL (SELECT `object_dependency`.`used_by` FROM `object_dependency` INNER JOIN `used_by_object_ids` ON `used_by_object_ids`.`used_by` = `oid`)) SELECT DISTINCT `oid`, `obj_type`, `schema_id`, `database_id` FROM `used_by_object_ids` INNER JOIN `object` ON `used_by_object_ids`.`used_by` = `oid` ORDER BY `oid` DESC"#
88/// );
89/// assert_eq!(
90///     query.to_string(PostgresQueryBuilder),
91///     r#"WITH RECURSIVE "used_by_object_ids" ("used_by") AS (SELECT "used_by" FROM "object_dependency" WHERE "object_dependency"."oid" = 1 UNION ALL (SELECT "oid" FROM "object" WHERE "object"."database_id" = 1 OR "object"."schema_id" = 1) UNION ALL (SELECT "object_dependency"."used_by" FROM "object_dependency" INNER JOIN "used_by_object_ids" ON "used_by_object_ids"."used_by" = "oid")) SELECT DISTINCT "oid", "obj_type", "schema_id", "database_id" FROM "used_by_object_ids" INNER JOIN "object" ON "used_by_object_ids"."used_by" = "oid" ORDER BY "oid" DESC"#
92/// );
93/// assert_eq!(
94///     query.to_string(SqliteQueryBuilder),
95///     r#"WITH RECURSIVE "used_by_object_ids" ("used_by") AS (SELECT "used_by" FROM "object_dependency" WHERE "object_dependency"."oid" = 1 UNION ALL SELECT "oid" FROM "object" WHERE "object"."database_id" = 1 OR "object"."schema_id" = 1 UNION ALL SELECT "object_dependency"."used_by" FROM "object_dependency" INNER JOIN "used_by_object_ids" ON "used_by_object_ids"."used_by" = "oid") SELECT DISTINCT "oid", "obj_type", "schema_id", "database_id" FROM "used_by_object_ids" INNER JOIN "object" ON "used_by_object_ids"."used_by" = "oid" ORDER BY "oid" DESC"#
96/// );
97/// ```
98pub fn construct_obj_dependency_query(obj_id: ObjectId) -> WithQuery {
99    let cte_alias = Alias::new("used_by_object_ids");
100    let cte_return_alias = Alias::new("used_by");
101
102    let mut base_query = SelectStatement::new()
103        .column(object_dependency::Column::UsedBy)
104        .from(ObjectDependency)
105        .and_where(object_dependency::Column::Oid.eq(obj_id))
106        .to_owned();
107
108    let belonged_obj_query = SelectStatement::new()
109        .column(object::Column::Oid)
110        .from(Object)
111        .and_where(
112            object::Column::DatabaseId
113                .eq(obj_id)
114                .or(object::Column::SchemaId.eq(obj_id)),
115        )
116        .to_owned();
117
118    let cte_referencing = Query::select()
119        .column((ObjectDependency, object_dependency::Column::UsedBy))
120        .from(ObjectDependency)
121        .inner_join(
122            cte_alias.clone(),
123            Expr::col((cte_alias.clone(), cte_return_alias.clone()))
124                .equals(object_dependency::Column::Oid),
125        )
126        .to_owned();
127
128    let mut common_table_expr = CommonTableExpression::new();
129    common_table_expr
130        .query(
131            base_query
132                .union(UnionType::All, belonged_obj_query)
133                .union(UnionType::All, cte_referencing)
134                .to_owned(),
135        )
136        .column(cte_return_alias.clone())
137        .table_name(cte_alias.clone());
138
139    SelectStatement::new()
140        .distinct()
141        .columns([
142            object::Column::Oid,
143            object::Column::ObjType,
144            object::Column::SchemaId,
145            object::Column::DatabaseId,
146        ])
147        .from(cte_alias.clone())
148        .inner_join(
149            Object,
150            Expr::col((cte_alias, cte_return_alias)).equals(object::Column::Oid),
151        )
152        .order_by(object::Column::Oid, Order::Desc)
153        .to_owned()
154        .with(
155            WithClause::new()
156                .recursive(true)
157                .cte(common_table_expr)
158                .to_owned(),
159        )
160}
161
162/// This function will construct a query using recursive cte to find if dependent objects are already relying on the target table.
163///
164/// # Examples
165///
166/// ```
167/// use risingwave_meta::controller::utils::construct_sink_cycle_check_query;
168/// use sea_orm::sea_query::*;
169/// use sea_orm::*;
170///
171/// let query = construct_sink_cycle_check_query(1.into(), vec![2.into(), 3.into()]);
172///
173/// assert_eq!(
174///     query.to_string(MysqlQueryBuilder),
175///     r#"WITH RECURSIVE `used_by_object_ids_with_sink` (`oid`, `used_by`) AS (SELECT `oid`, `used_by` FROM `object_dependency` WHERE `object_dependency`.`oid` = 1 UNION ALL (SELECT `obj_dependency_with_sink`.`oid`, `obj_dependency_with_sink`.`used_by` FROM (SELECT `oid`, `used_by` FROM `object_dependency` UNION ALL (SELECT `sink_id`, `target_table` FROM `sink` WHERE `sink`.`target_table` IS NOT NULL)) AS `obj_dependency_with_sink` INNER JOIN `used_by_object_ids_with_sink` ON `used_by_object_ids_with_sink`.`used_by` = `obj_dependency_with_sink`.`oid` WHERE `used_by_object_ids_with_sink`.`used_by` <> `used_by_object_ids_with_sink`.`oid`)) SELECT COUNT(`used_by_object_ids_with_sink`.`used_by`) FROM `used_by_object_ids_with_sink` WHERE `used_by_object_ids_with_sink`.`used_by` IN (2, 3)"#
176/// );
177/// assert_eq!(
178///     query.to_string(PostgresQueryBuilder),
179///     r#"WITH RECURSIVE "used_by_object_ids_with_sink" ("oid", "used_by") AS (SELECT "oid", "used_by" FROM "object_dependency" WHERE "object_dependency"."oid" = 1 UNION ALL (SELECT "obj_dependency_with_sink"."oid", "obj_dependency_with_sink"."used_by" FROM (SELECT "oid", "used_by" FROM "object_dependency" UNION ALL (SELECT "sink_id", "target_table" FROM "sink" WHERE "sink"."target_table" IS NOT NULL)) AS "obj_dependency_with_sink" INNER JOIN "used_by_object_ids_with_sink" ON "used_by_object_ids_with_sink"."used_by" = "obj_dependency_with_sink"."oid" WHERE "used_by_object_ids_with_sink"."used_by" <> "used_by_object_ids_with_sink"."oid")) SELECT COUNT("used_by_object_ids_with_sink"."used_by") FROM "used_by_object_ids_with_sink" WHERE "used_by_object_ids_with_sink"."used_by" IN (2, 3)"#
180/// );
181/// assert_eq!(
182///     query.to_string(SqliteQueryBuilder),
183///     r#"WITH RECURSIVE "used_by_object_ids_with_sink" ("oid", "used_by") AS (SELECT "oid", "used_by" FROM "object_dependency" WHERE "object_dependency"."oid" = 1 UNION ALL SELECT "obj_dependency_with_sink"."oid", "obj_dependency_with_sink"."used_by" FROM (SELECT "oid", "used_by" FROM "object_dependency" UNION ALL SELECT "sink_id", "target_table" FROM "sink" WHERE "sink"."target_table" IS NOT NULL) AS "obj_dependency_with_sink" INNER JOIN "used_by_object_ids_with_sink" ON "used_by_object_ids_with_sink"."used_by" = "obj_dependency_with_sink"."oid" WHERE "used_by_object_ids_with_sink"."used_by" <> "used_by_object_ids_with_sink"."oid") SELECT COUNT("used_by_object_ids_with_sink"."used_by") FROM "used_by_object_ids_with_sink" WHERE "used_by_object_ids_with_sink"."used_by" IN (2, 3)"#
184/// );
185/// ```
186pub fn construct_sink_cycle_check_query(
187    target_table: ObjectId,
188    dependent_objects: Vec<ObjectId>,
189) -> WithQuery {
190    let cte_alias = Alias::new("used_by_object_ids_with_sink");
191    let depend_alias = Alias::new("obj_dependency_with_sink");
192
193    let mut base_query = SelectStatement::new()
194        .columns([
195            object_dependency::Column::Oid,
196            object_dependency::Column::UsedBy,
197        ])
198        .from(ObjectDependency)
199        .and_where(object_dependency::Column::Oid.eq(target_table))
200        .to_owned();
201
202    let query_sink_deps = SelectStatement::new()
203        .columns([sink::Column::SinkId, sink::Column::TargetTable])
204        .from(Sink)
205        .and_where(sink::Column::TargetTable.is_not_null())
206        .to_owned();
207
208    let cte_referencing = Query::select()
209        .column((depend_alias.clone(), object_dependency::Column::Oid))
210        .column((depend_alias.clone(), object_dependency::Column::UsedBy))
211        .from_subquery(
212            SelectStatement::new()
213                .columns([
214                    object_dependency::Column::Oid,
215                    object_dependency::Column::UsedBy,
216                ])
217                .from(ObjectDependency)
218                .union(UnionType::All, query_sink_deps)
219                .to_owned(),
220            depend_alias.clone(),
221        )
222        .inner_join(
223            cte_alias.clone(),
224            Expr::col((cte_alias.clone(), object_dependency::Column::UsedBy))
225                .eq(Expr::col((depend_alias, object_dependency::Column::Oid))),
226        )
227        .and_where(
228            Expr::col((cte_alias.clone(), object_dependency::Column::UsedBy)).ne(Expr::col((
229                cte_alias.clone(),
230                object_dependency::Column::Oid,
231            ))),
232        )
233        .to_owned();
234
235    let mut common_table_expr = CommonTableExpression::new();
236    common_table_expr
237        .query(base_query.union(UnionType::All, cte_referencing).to_owned())
238        .columns([
239            object_dependency::Column::Oid,
240            object_dependency::Column::UsedBy,
241        ])
242        .table_name(cte_alias.clone());
243
244    SelectStatement::new()
245        .expr(Expr::col((cte_alias.clone(), object_dependency::Column::UsedBy)).count())
246        .from(cte_alias.clone())
247        .and_where(
248            Expr::col((cte_alias, object_dependency::Column::UsedBy)).is_in(dependent_objects),
249        )
250        .to_owned()
251        .with(
252            WithClause::new()
253                .recursive(true)
254                .cte(common_table_expr)
255                .to_owned(),
256        )
257}
258
259#[derive(Clone, DerivePartialModel, FromQueryResult, Debug)]
260#[sea_orm(entity = "Object")]
261pub struct PartialObject {
262    pub oid: ObjectId,
263    pub obj_type: ObjectType,
264    pub schema_id: Option<SchemaId>,
265    pub database_id: Option<DatabaseId>,
266}
267
268#[derive(Clone, DerivePartialModel, FromQueryResult)]
269#[sea_orm(entity = "Fragment")]
270pub struct PartialFragmentStateTables {
271    pub fragment_id: FragmentId,
272    pub job_id: ObjectId,
273    pub state_table_ids: TableIdArray,
274}
275
276#[derive(Clone, Eq, PartialEq, Debug)]
277pub struct PartialActorLocation {
278    pub actor_id: ActorId,
279    pub fragment_id: FragmentId,
280    pub worker_id: WorkerId,
281}
282
283#[derive(FromQueryResult, Debug, Eq, PartialEq, Clone)]
284pub struct FragmentDesc {
285    pub fragment_id: FragmentId,
286    pub job_id: JobId,
287    pub fragment_type_mask: i32,
288    pub distribution_type: DistributionType,
289    pub state_table_ids: TableIdArray,
290    pub parallelism: i64,
291    pub vnode_count: i32,
292    pub stream_node: StreamNode,
293    pub parallelism_policy: String,
294}
295
296/// List all objects that are using the given one in a cascade way. It runs a recursive CTE to find all the dependencies.
297pub async fn get_referring_objects_cascade<C>(
298    obj_id: ObjectId,
299    db: &C,
300) -> MetaResult<Vec<PartialObject>>
301where
302    C: ConnectionTrait,
303{
304    let query = construct_obj_dependency_query(obj_id);
305    let (sql, values) = query.build_any(&*db.get_database_backend().get_query_builder());
306    let objects = PartialObject::find_by_statement(Statement::from_sql_and_values(
307        db.get_database_backend(),
308        sql,
309        values,
310    ))
311    .all(db)
312    .await?;
313    Ok(objects)
314}
315
316/// Check if create a sink with given dependent objects into the target table will cause a cycle, return true if it will.
317pub async fn check_sink_into_table_cycle<C>(
318    target_table: ObjectId,
319    dependent_objs: Vec<ObjectId>,
320    db: &C,
321) -> MetaResult<bool>
322where
323    C: ConnectionTrait,
324{
325    if dependent_objs.is_empty() {
326        return Ok(false);
327    }
328
329    // special check for self referencing
330    if dependent_objs.contains(&target_table) {
331        return Ok(true);
332    }
333
334    let query = construct_sink_cycle_check_query(target_table, dependent_objs);
335    let (sql, values) = query.build_any(&*db.get_database_backend().get_query_builder());
336
337    let res = db
338        .query_one(Statement::from_sql_and_values(
339            db.get_database_backend(),
340            sql,
341            values,
342        ))
343        .await?
344        .unwrap();
345
346    let cnt: i64 = res.try_get_by(0)?;
347
348    Ok(cnt != 0)
349}
350
351/// `ensure_object_id` ensures the existence of target object in the cluster.
352pub async fn ensure_object_id<C>(
353    object_type: ObjectType,
354    obj_id: impl Into<ObjectId>,
355    db: &C,
356) -> MetaResult<()>
357where
358    C: ConnectionTrait,
359{
360    let obj_id = obj_id.into();
361    let count = Object::find_by_id(obj_id).count(db).await?;
362    if count == 0 {
363        return Err(MetaError::catalog_id_not_found(
364            object_type.as_str(),
365            obj_id,
366        ));
367    }
368    Ok(())
369}
370
371pub async fn ensure_job_not_canceled<C>(job_id: JobId, db: &C) -> MetaResult<()>
372where
373    C: ConnectionTrait,
374{
375    let count = Object::find_by_id(job_id).count(db).await?;
376    if count == 0 {
377        return Err(MetaError::cancelled(format!(
378            "job {} might be cancelled manually or by recovery",
379            job_id
380        )));
381    }
382    Ok(())
383}
384
385/// `ensure_user_id` ensures the existence of target user in the cluster.
386pub async fn ensure_user_id<C>(user_id: UserId, db: &C) -> MetaResult<()>
387where
388    C: ConnectionTrait,
389{
390    let count = User::find_by_id(user_id).count(db).await?;
391    if count == 0 {
392        return Err(anyhow!("user {} was concurrently dropped", user_id).into());
393    }
394    Ok(())
395}
396
397/// `check_database_name_duplicate` checks whether the database name is already used in the cluster.
398pub async fn check_database_name_duplicate<C>(name: &str, db: &C) -> MetaResult<()>
399where
400    C: ConnectionTrait,
401{
402    let count = Database::find()
403        .filter(database::Column::Name.eq(name))
404        .count(db)
405        .await?;
406    if count > 0 {
407        assert_eq!(count, 1);
408        return Err(MetaError::catalog_duplicated("database", name));
409    }
410    Ok(())
411}
412
413/// `check_function_signature_duplicate` checks whether the function name and its signature is already used in the target namespace.
414pub async fn check_function_signature_duplicate<C>(
415    pb_function: &PbFunction,
416    db: &C,
417) -> MetaResult<()>
418where
419    C: ConnectionTrait,
420{
421    let count = Function::find()
422        .inner_join(Object)
423        .filter(
424            object::Column::DatabaseId
425                .eq(pb_function.database_id)
426                .and(object::Column::SchemaId.eq(pb_function.schema_id))
427                .and(function::Column::Name.eq(&pb_function.name))
428                .and(
429                    function::Column::ArgTypes
430                        .eq(DataTypeArray::from(pb_function.arg_types.clone())),
431                ),
432        )
433        .count(db)
434        .await?;
435    if count > 0 {
436        assert_eq!(count, 1);
437        return Err(MetaError::catalog_duplicated("function", &pb_function.name));
438    }
439    Ok(())
440}
441
442/// `check_connection_name_duplicate` checks whether the connection name is already used in the target namespace.
443pub async fn check_connection_name_duplicate<C>(
444    pb_connection: &PbConnection,
445    db: &C,
446) -> MetaResult<()>
447where
448    C: ConnectionTrait,
449{
450    let count = Connection::find()
451        .inner_join(Object)
452        .filter(
453            object::Column::DatabaseId
454                .eq(pb_connection.database_id)
455                .and(object::Column::SchemaId.eq(pb_connection.schema_id))
456                .and(connection::Column::Name.eq(&pb_connection.name)),
457        )
458        .count(db)
459        .await?;
460    if count > 0 {
461        assert_eq!(count, 1);
462        return Err(MetaError::catalog_duplicated(
463            "connection",
464            &pb_connection.name,
465        ));
466    }
467    Ok(())
468}
469
470pub async fn check_secret_name_duplicate<C>(pb_secret: &PbSecret, db: &C) -> MetaResult<()>
471where
472    C: ConnectionTrait,
473{
474    let count = Secret::find()
475        .inner_join(Object)
476        .filter(
477            object::Column::DatabaseId
478                .eq(pb_secret.database_id)
479                .and(object::Column::SchemaId.eq(pb_secret.schema_id))
480                .and(secret::Column::Name.eq(&pb_secret.name)),
481        )
482        .count(db)
483        .await?;
484    if count > 0 {
485        assert_eq!(count, 1);
486        return Err(MetaError::catalog_duplicated("secret", &pb_secret.name));
487    }
488    Ok(())
489}
490
491pub async fn check_subscription_name_duplicate<C>(
492    pb_subscription: &PbSubscription,
493    db: &C,
494) -> MetaResult<()>
495where
496    C: ConnectionTrait,
497{
498    let count = Subscription::find()
499        .inner_join(Object)
500        .filter(
501            object::Column::DatabaseId
502                .eq(pb_subscription.database_id)
503                .and(object::Column::SchemaId.eq(pb_subscription.schema_id))
504                .and(subscription::Column::Name.eq(&pb_subscription.name)),
505        )
506        .count(db)
507        .await?;
508    if count > 0 {
509        assert_eq!(count, 1);
510        return Err(MetaError::catalog_duplicated(
511            "subscription",
512            &pb_subscription.name,
513        ));
514    }
515    Ok(())
516}
517
518/// `check_user_name_duplicate` checks whether the user is already existed in the cluster.
519pub async fn check_user_name_duplicate<C>(name: &str, db: &C) -> MetaResult<()>
520where
521    C: ConnectionTrait,
522{
523    let count = User::find()
524        .filter(user::Column::Name.eq(name))
525        .count(db)
526        .await?;
527    if count > 0 {
528        assert_eq!(count, 1);
529        return Err(MetaError::catalog_duplicated("user", name));
530    }
531    Ok(())
532}
533
534/// `check_relation_name_duplicate` checks whether the relation name is already used in the target namespace.
535pub async fn check_relation_name_duplicate<C>(
536    name: &str,
537    database_id: DatabaseId,
538    schema_id: SchemaId,
539    db: &C,
540) -> MetaResult<()>
541where
542    C: ConnectionTrait,
543{
544    macro_rules! check_duplicated {
545        ($obj_type:expr, $entity:ident, $table:ident) => {
546            let object_id = Object::find()
547                .select_only()
548                .column(object::Column::Oid)
549                .inner_join($entity)
550                .filter(
551                    object::Column::DatabaseId
552                        .eq(Some(database_id))
553                        .and(object::Column::SchemaId.eq(Some(schema_id)))
554                        .and($table::Column::Name.eq(name)),
555                )
556                .into_tuple::<ObjectId>()
557                .one(db)
558                .await?;
559            if let Some(oid) = object_id {
560                let check_creation = if $obj_type == ObjectType::View {
561                    false
562                } else if $obj_type == ObjectType::Source {
563                    let source_info = Source::find_by_id(oid.as_source_id())
564                        .select_only()
565                        .column(source::Column::SourceInfo)
566                        .into_tuple::<Option<StreamSourceInfo>>()
567                        .one(db)
568                        .await?
569                        .unwrap();
570                    source_info.map_or(false, |info| info.to_protobuf().is_shared())
571                } else {
572                    true
573                };
574                let job_id = oid.as_job_id();
575                return if check_creation
576                    && !matches!(
577                        StreamingJob::find_by_id(job_id)
578                            .select_only()
579                            .column(streaming_job::Column::JobStatus)
580                            .into_tuple::<JobStatus>()
581                            .one(db)
582                            .await?,
583                        Some(JobStatus::Created)
584                    ) {
585                    Err(MetaError::catalog_under_creation(
586                        $obj_type.as_str(),
587                        name,
588                        job_id,
589                    ))
590                } else {
591                    Err(MetaError::catalog_duplicated($obj_type.as_str(), name))
592                };
593            }
594        };
595    }
596    check_duplicated!(ObjectType::Table, Table, table);
597    check_duplicated!(ObjectType::Source, Source, source);
598    check_duplicated!(ObjectType::Sink, Sink, sink);
599    check_duplicated!(ObjectType::Index, Index, index);
600    check_duplicated!(ObjectType::View, View, view);
601
602    Ok(())
603}
604
605/// `check_schema_name_duplicate` checks whether the schema name is already used in the target database.
606pub async fn check_schema_name_duplicate<C>(
607    name: &str,
608    database_id: DatabaseId,
609    db: &C,
610) -> MetaResult<()>
611where
612    C: ConnectionTrait,
613{
614    let count = Object::find()
615        .inner_join(Schema)
616        .filter(
617            object::Column::ObjType
618                .eq(ObjectType::Schema)
619                .and(object::Column::DatabaseId.eq(Some(database_id)))
620                .and(schema::Column::Name.eq(name)),
621        )
622        .count(db)
623        .await?;
624    if count != 0 {
625        return Err(MetaError::catalog_duplicated("schema", name));
626    }
627
628    Ok(())
629}
630
631/// `check_object_refer_for_drop` checks whether the object is used by other objects except indexes.
632/// It returns an error that contains the details of the referring objects if it is used by others.
633pub async fn check_object_refer_for_drop<C>(
634    object_type: ObjectType,
635    object_id: ObjectId,
636    db: &C,
637) -> MetaResult<()>
638where
639    C: ConnectionTrait,
640{
641    // Ignore indexes.
642    let count = if object_type == ObjectType::Table {
643        ObjectDependency::find()
644            .join(
645                JoinType::InnerJoin,
646                object_dependency::Relation::Object1.def(),
647            )
648            .filter(
649                object_dependency::Column::Oid
650                    .eq(object_id)
651                    .and(object::Column::ObjType.ne(ObjectType::Index)),
652            )
653            .count(db)
654            .await?
655    } else {
656        ObjectDependency::find()
657            .filter(object_dependency::Column::Oid.eq(object_id))
658            .count(db)
659            .await?
660    };
661    if count != 0 {
662        // find the name of all objects that are using the given one.
663        let referring_objects = get_referring_objects(object_id, db).await?;
664        let referring_objs_map = referring_objects
665            .into_iter()
666            .filter(|o| o.obj_type != ObjectType::Index)
667            .into_group_map_by(|o| o.obj_type);
668        let mut details = vec![];
669        for (obj_type, objs) in referring_objs_map {
670            match obj_type {
671                ObjectType::Table => {
672                    let tables: Vec<(String, String)> = Object::find()
673                        .join(JoinType::InnerJoin, object::Relation::Table.def())
674                        .join(JoinType::InnerJoin, object::Relation::Database2.def())
675                        .join(JoinType::InnerJoin, object::Relation::Schema2.def())
676                        .select_only()
677                        .column(schema::Column::Name)
678                        .column(table::Column::Name)
679                        .filter(object::Column::Oid.is_in(objs.iter().map(|o| o.oid)))
680                        .into_tuple()
681                        .all(db)
682                        .await?;
683                    details.extend(tables.into_iter().map(|(schema_name, table_name)| {
684                        format!(
685                            "materialized view {}.{} depends on it",
686                            schema_name, table_name
687                        )
688                    }));
689                }
690                ObjectType::Sink => {
691                    let sinks: Vec<(String, String)> = Object::find()
692                        .join(JoinType::InnerJoin, object::Relation::Sink.def())
693                        .join(JoinType::InnerJoin, object::Relation::Database2.def())
694                        .join(JoinType::InnerJoin, object::Relation::Schema2.def())
695                        .select_only()
696                        .column(schema::Column::Name)
697                        .column(sink::Column::Name)
698                        .filter(object::Column::Oid.is_in(objs.iter().map(|o| o.oid)))
699                        .into_tuple()
700                        .all(db)
701                        .await?;
702                    if object_type == ObjectType::Table {
703                        let engine = Table::find_by_id(object_id.as_table_id())
704                            .select_only()
705                            .column(table::Column::Engine)
706                            .into_tuple::<table::Engine>()
707                            .one(db)
708                            .await?;
709                        if engine == Some(table::Engine::Iceberg) && sinks.len() == 1 {
710                            continue;
711                        }
712                    }
713                    details.extend(sinks.into_iter().map(|(schema_name, sink_name)| {
714                        format!("sink {}.{} depends on it", schema_name, sink_name)
715                    }));
716                }
717                ObjectType::View => {
718                    let views: Vec<(String, String)> = Object::find()
719                        .join(JoinType::InnerJoin, object::Relation::View.def())
720                        .join(JoinType::InnerJoin, object::Relation::Database2.def())
721                        .join(JoinType::InnerJoin, object::Relation::Schema2.def())
722                        .select_only()
723                        .column(schema::Column::Name)
724                        .column(view::Column::Name)
725                        .filter(object::Column::Oid.is_in(objs.iter().map(|o| o.oid)))
726                        .into_tuple()
727                        .all(db)
728                        .await?;
729                    details.extend(views.into_iter().map(|(schema_name, view_name)| {
730                        format!("view {}.{} depends on it", schema_name, view_name)
731                    }));
732                }
733                ObjectType::Subscription => {
734                    let subscriptions: Vec<(String, String)> = Object::find()
735                        .join(JoinType::InnerJoin, object::Relation::Subscription.def())
736                        .join(JoinType::InnerJoin, object::Relation::Database2.def())
737                        .join(JoinType::InnerJoin, object::Relation::Schema2.def())
738                        .select_only()
739                        .column(schema::Column::Name)
740                        .column(subscription::Column::Name)
741                        .filter(object::Column::Oid.is_in(objs.iter().map(|o| o.oid)))
742                        .into_tuple()
743                        .all(db)
744                        .await?;
745                    details.extend(subscriptions.into_iter().map(
746                        |(schema_name, subscription_name)| {
747                            format!(
748                                "subscription {}.{} depends on it",
749                                schema_name, subscription_name
750                            )
751                        },
752                    ));
753                }
754                ObjectType::Source => {
755                    let sources: Vec<(String, String)> = Object::find()
756                        .join(JoinType::InnerJoin, object::Relation::Source.def())
757                        .join(JoinType::InnerJoin, object::Relation::Database2.def())
758                        .join(JoinType::InnerJoin, object::Relation::Schema2.def())
759                        .select_only()
760                        .column(schema::Column::Name)
761                        .column(source::Column::Name)
762                        .filter(object::Column::Oid.is_in(objs.iter().map(|o| o.oid)))
763                        .into_tuple()
764                        .all(db)
765                        .await?;
766                    details.extend(sources.into_iter().map(|(schema_name, view_name)| {
767                        format!("source {}.{} depends on it", schema_name, view_name)
768                    }));
769                }
770                ObjectType::Connection => {
771                    let connections: Vec<(String, String)> = Object::find()
772                        .join(JoinType::InnerJoin, object::Relation::Connection.def())
773                        .join(JoinType::InnerJoin, object::Relation::Database2.def())
774                        .join(JoinType::InnerJoin, object::Relation::Schema2.def())
775                        .select_only()
776                        .column(schema::Column::Name)
777                        .column(connection::Column::Name)
778                        .filter(object::Column::Oid.is_in(objs.iter().map(|o| o.oid)))
779                        .into_tuple()
780                        .all(db)
781                        .await?;
782                    details.extend(connections.into_iter().map(|(schema_name, view_name)| {
783                        format!("connection {}.{} depends on it", schema_name, view_name)
784                    }));
785                }
786                // only the table, source, sink, subscription, view, connection and index will depend on other objects.
787                _ => bail!("unexpected referring object type: {}", obj_type.as_str()),
788            }
789        }
790        if details.is_empty() {
791            return Ok(());
792        }
793
794        return Err(MetaError::permission_denied(format!(
795            "{} used by {} other objects. \nDETAIL: {}\n\
796            {}",
797            object_type.as_str(),
798            details.len(),
799            details.join("\n"),
800            match object_type {
801                ObjectType::Function | ObjectType::Connection | ObjectType::Secret =>
802                    "HINT: DROP the dependent objects first.",
803                ObjectType::Database | ObjectType::Schema => unreachable!(),
804                _ => "HINT:  Use DROP ... CASCADE to drop the dependent objects too.",
805            }
806        )));
807    }
808    Ok(())
809}
810
811/// List all objects that are using the given one.
812pub async fn get_referring_objects<C>(object_id: ObjectId, db: &C) -> MetaResult<Vec<PartialObject>>
813where
814    C: ConnectionTrait,
815{
816    let objs = ObjectDependency::find()
817        .filter(object_dependency::Column::Oid.eq(object_id))
818        .join(
819            JoinType::InnerJoin,
820            object_dependency::Relation::Object1.def(),
821        )
822        .into_partial_model()
823        .all(db)
824        .await?;
825
826    Ok(objs)
827}
828
829/// `ensure_schema_empty` ensures that the schema is empty, used by `DROP SCHEMA`.
830pub async fn ensure_schema_empty<C>(schema_id: SchemaId, db: &C) -> MetaResult<()>
831where
832    C: ConnectionTrait,
833{
834    let count = Object::find()
835        .filter(object::Column::SchemaId.eq(Some(schema_id)))
836        .count(db)
837        .await?;
838    if count != 0 {
839        return Err(MetaError::permission_denied("schema is not empty"));
840    }
841
842    Ok(())
843}
844
845/// `list_user_info_by_ids` lists all users' info by their ids.
846pub async fn list_user_info_by_ids<C>(
847    user_ids: impl IntoIterator<Item = UserId>,
848    db: &C,
849) -> MetaResult<Vec<PbUserInfo>>
850where
851    C: ConnectionTrait,
852{
853    let mut user_infos = vec![];
854    for user_id in user_ids {
855        let user = User::find_by_id(user_id)
856            .one(db)
857            .await?
858            .ok_or_else(|| MetaError::catalog_id_not_found("user", user_id))?;
859        let mut user_info: PbUserInfo = user.into();
860        user_info.grant_privileges = get_user_privilege(user_id, db).await?;
861        user_infos.push(user_info);
862    }
863    Ok(user_infos)
864}
865
866/// `get_object_owner` returns the owner of the given object.
867pub async fn get_object_owner<C>(object_id: ObjectId, db: &C) -> MetaResult<UserId>
868where
869    C: ConnectionTrait,
870{
871    let obj_owner: UserId = Object::find_by_id(object_id)
872        .select_only()
873        .column(object::Column::OwnerId)
874        .into_tuple()
875        .one(db)
876        .await?
877        .ok_or_else(|| MetaError::catalog_id_not_found("object", object_id))?;
878    Ok(obj_owner)
879}
880
881/// `construct_privilege_dependency_query` constructs a query to find all privileges that are dependent on the given one.
882///
883/// # Examples
884///
885/// ```
886/// use risingwave_meta::controller::utils::construct_privilege_dependency_query;
887/// use sea_orm::sea_query::*;
888/// use sea_orm::*;
889///
890/// let query = construct_privilege_dependency_query(vec![1, 2, 3]);
891///
892/// assert_eq!(
893///    query.to_string(MysqlQueryBuilder),
894///   r#"WITH RECURSIVE `granted_privilege_ids` (`id`, `user_id`) AS (SELECT `id`, `user_id` FROM `user_privilege` WHERE `user_privilege`.`id` IN (1, 2, 3) UNION ALL (SELECT `user_privilege`.`id`, `user_privilege`.`user_id` FROM `user_privilege` INNER JOIN `granted_privilege_ids` ON `granted_privilege_ids`.`id` = `dependent_id`)) SELECT `id`, `user_id` FROM `granted_privilege_ids`"#
895/// );
896/// assert_eq!(
897///   query.to_string(PostgresQueryBuilder),
898///  r#"WITH RECURSIVE "granted_privilege_ids" ("id", "user_id") AS (SELECT "id", "user_id" FROM "user_privilege" WHERE "user_privilege"."id" IN (1, 2, 3) UNION ALL (SELECT "user_privilege"."id", "user_privilege"."user_id" FROM "user_privilege" INNER JOIN "granted_privilege_ids" ON "granted_privilege_ids"."id" = "dependent_id")) SELECT "id", "user_id" FROM "granted_privilege_ids""#
899/// );
900/// assert_eq!(
901///  query.to_string(SqliteQueryBuilder),
902///  r#"WITH RECURSIVE "granted_privilege_ids" ("id", "user_id") AS (SELECT "id", "user_id" FROM "user_privilege" WHERE "user_privilege"."id" IN (1, 2, 3) UNION ALL SELECT "user_privilege"."id", "user_privilege"."user_id" FROM "user_privilege" INNER JOIN "granted_privilege_ids" ON "granted_privilege_ids"."id" = "dependent_id") SELECT "id", "user_id" FROM "granted_privilege_ids""#
903/// );
904/// ```
905pub fn construct_privilege_dependency_query(ids: Vec<PrivilegeId>) -> WithQuery {
906    let cte_alias = Alias::new("granted_privilege_ids");
907    let cte_return_privilege_alias = Alias::new("id");
908    let cte_return_user_alias = Alias::new("user_id");
909
910    let mut base_query = SelectStatement::new()
911        .columns([user_privilege::Column::Id, user_privilege::Column::UserId])
912        .from(UserPrivilege)
913        .and_where(user_privilege::Column::Id.is_in(ids))
914        .to_owned();
915
916    let cte_referencing = Query::select()
917        .columns([
918            (UserPrivilege, user_privilege::Column::Id),
919            (UserPrivilege, user_privilege::Column::UserId),
920        ])
921        .from(UserPrivilege)
922        .inner_join(
923            cte_alias.clone(),
924            Expr::col((cte_alias.clone(), cte_return_privilege_alias.clone()))
925                .equals(user_privilege::Column::DependentId),
926        )
927        .to_owned();
928
929    let mut common_table_expr = CommonTableExpression::new();
930    common_table_expr
931        .query(base_query.union(UnionType::All, cte_referencing).to_owned())
932        .columns([
933            cte_return_privilege_alias.clone(),
934            cte_return_user_alias.clone(),
935        ])
936        .table_name(cte_alias.clone());
937
938    SelectStatement::new()
939        .columns([cte_return_privilege_alias, cte_return_user_alias])
940        .from(cte_alias)
941        .to_owned()
942        .with(
943            WithClause::new()
944                .recursive(true)
945                .cte(common_table_expr)
946                .to_owned(),
947        )
948}
949
950pub async fn get_internal_tables_by_id<C>(job_id: JobId, db: &C) -> MetaResult<Vec<TableId>>
951where
952    C: ConnectionTrait,
953{
954    let table_ids: Vec<TableId> = Table::find()
955        .select_only()
956        .column(table::Column::TableId)
957        .filter(
958            table::Column::TableType
959                .eq(TableType::Internal)
960                .and(table::Column::BelongsToJobId.eq(job_id)),
961        )
962        .into_tuple()
963        .all(db)
964        .await?;
965    Ok(table_ids)
966}
967
968pub async fn get_index_state_tables_by_table_id<C>(
969    table_id: TableId,
970    db: &C,
971) -> MetaResult<Vec<TableId>>
972where
973    C: ConnectionTrait,
974{
975    let mut index_table_ids: Vec<TableId> = Index::find()
976        .select_only()
977        .column(index::Column::IndexTableId)
978        .filter(index::Column::PrimaryTableId.eq(table_id))
979        .into_tuple()
980        .all(db)
981        .await?;
982
983    if !index_table_ids.is_empty() {
984        let internal_table_ids: Vec<TableId> = Table::find()
985            .select_only()
986            .column(table::Column::TableId)
987            .filter(
988                table::Column::TableType
989                    .eq(TableType::Internal)
990                    .and(table::Column::BelongsToJobId.is_in(index_table_ids.clone())),
991            )
992            .into_tuple()
993            .all(db)
994            .await?;
995
996        index_table_ids.extend(internal_table_ids.into_iter());
997    }
998
999    Ok(index_table_ids)
1000}
1001
1002#[derive(Clone, DerivePartialModel, FromQueryResult)]
1003#[sea_orm(entity = "UserPrivilege")]
1004pub struct PartialUserPrivilege {
1005    pub id: PrivilegeId,
1006    pub user_id: UserId,
1007}
1008
1009pub async fn get_referring_privileges_cascade<C>(
1010    ids: Vec<PrivilegeId>,
1011    db: &C,
1012) -> MetaResult<Vec<PartialUserPrivilege>>
1013where
1014    C: ConnectionTrait,
1015{
1016    let query = construct_privilege_dependency_query(ids);
1017    let (sql, values) = query.build_any(&*db.get_database_backend().get_query_builder());
1018    let privileges = PartialUserPrivilege::find_by_statement(Statement::from_sql_and_values(
1019        db.get_database_backend(),
1020        sql,
1021        values,
1022    ))
1023    .all(db)
1024    .await?;
1025
1026    Ok(privileges)
1027}
1028
1029/// `ensure_privileges_not_referred` ensures that the privileges are not granted to any other users.
1030pub async fn ensure_privileges_not_referred<C>(ids: Vec<PrivilegeId>, db: &C) -> MetaResult<()>
1031where
1032    C: ConnectionTrait,
1033{
1034    let count = UserPrivilege::find()
1035        .filter(user_privilege::Column::DependentId.is_in(ids))
1036        .count(db)
1037        .await?;
1038    if count != 0 {
1039        return Err(MetaError::permission_denied(format!(
1040            "privileges granted to {} other ones.",
1041            count
1042        )));
1043    }
1044    Ok(())
1045}
1046
1047/// `get_user_privilege` returns the privileges of the given user.
1048pub async fn get_user_privilege<C>(user_id: UserId, db: &C) -> MetaResult<Vec<PbGrantPrivilege>>
1049where
1050    C: ConnectionTrait,
1051{
1052    let user_privileges = UserPrivilege::find()
1053        .find_also_related(Object)
1054        .filter(user_privilege::Column::UserId.eq(user_id))
1055        .all(db)
1056        .await?;
1057    Ok(user_privileges
1058        .into_iter()
1059        .map(|(privilege, object)| {
1060            let object = object.unwrap();
1061            let oid = object.oid.as_raw_id();
1062            let obj = match object.obj_type {
1063                ObjectType::Database => PbGrantObject::DatabaseId(oid),
1064                ObjectType::Schema => PbGrantObject::SchemaId(oid),
1065                ObjectType::Table | ObjectType::Index => PbGrantObject::TableId(oid),
1066                ObjectType::Source => PbGrantObject::SourceId(oid),
1067                ObjectType::Sink => PbGrantObject::SinkId(oid),
1068                ObjectType::View => PbGrantObject::ViewId(oid),
1069                ObjectType::Function => PbGrantObject::FunctionId(oid),
1070                ObjectType::Connection => PbGrantObject::ConnectionId(oid),
1071                ObjectType::Subscription => PbGrantObject::SubscriptionId(oid),
1072                ObjectType::Secret => PbGrantObject::SecretId(oid),
1073            };
1074            PbGrantPrivilege {
1075                action_with_opts: vec![PbActionWithGrantOption {
1076                    action: PbAction::from(privilege.action) as _,
1077                    with_grant_option: privilege.with_grant_option,
1078                    granted_by: privilege.granted_by as _,
1079                }],
1080                object: Some(obj),
1081            }
1082        })
1083        .collect())
1084}
1085
1086pub async fn get_table_columns(
1087    txn: &impl ConnectionTrait,
1088    id: TableId,
1089) -> MetaResult<ColumnCatalogArray> {
1090    let columns = Table::find_by_id(id)
1091        .select_only()
1092        .columns([table::Column::Columns])
1093        .into_tuple::<ColumnCatalogArray>()
1094        .one(txn)
1095        .await?
1096        .ok_or_else(|| MetaError::catalog_id_not_found("table", id))?;
1097    Ok(columns)
1098}
1099
1100/// `grant_default_privileges_automatically` grants default privileges automatically
1101/// for the given new object. It returns the list of user infos whose privileges are updated.
1102pub async fn grant_default_privileges_automatically<C>(
1103    db: &C,
1104    object_id: impl Into<ObjectId>,
1105) -> MetaResult<Vec<PbUserInfo>>
1106where
1107    C: ConnectionTrait,
1108{
1109    let object_id = object_id.into();
1110    let object = Object::find_by_id(object_id)
1111        .one(db)
1112        .await?
1113        .ok_or_else(|| MetaError::catalog_id_not_found("object", object_id))?;
1114    assert_ne!(object.obj_type, ObjectType::Database);
1115
1116    let for_mview_filter = if object.obj_type == ObjectType::Table {
1117        let table_type = Table::find_by_id(object_id.as_table_id())
1118            .select_only()
1119            .column(table::Column::TableType)
1120            .into_tuple::<TableType>()
1121            .one(db)
1122            .await?
1123            .ok_or_else(|| MetaError::catalog_id_not_found("table", object_id))?;
1124        user_default_privilege::Column::ForMaterializedView
1125            .eq(table_type == TableType::MaterializedView)
1126    } else {
1127        user_default_privilege::Column::ForMaterializedView.eq(false)
1128    };
1129    let schema_filter = if let Some(schema_id) = &object.schema_id {
1130        user_default_privilege::Column::SchemaId.eq(*schema_id)
1131    } else {
1132        user_default_privilege::Column::SchemaId.is_null()
1133    };
1134
1135    let default_privileges: Vec<(UserId, UserId, Action, bool)> = UserDefaultPrivilege::find()
1136        .select_only()
1137        .columns([
1138            user_default_privilege::Column::Grantee,
1139            user_default_privilege::Column::GrantedBy,
1140            user_default_privilege::Column::Action,
1141            user_default_privilege::Column::WithGrantOption,
1142        ])
1143        .filter(
1144            user_default_privilege::Column::DatabaseId
1145                .eq(object.database_id.unwrap())
1146                .and(schema_filter)
1147                .and(user_default_privilege::Column::UserId.eq(object.owner_id))
1148                .and(user_default_privilege::Column::ObjectType.eq(object.obj_type))
1149                .and(for_mview_filter),
1150        )
1151        .into_tuple()
1152        .all(db)
1153        .await?;
1154    if default_privileges.is_empty() {
1155        return Ok(vec![]);
1156    }
1157
1158    let updated_user_ids = default_privileges
1159        .iter()
1160        .map(|(grantee, _, _, _)| *grantee)
1161        .collect::<HashSet<_>>();
1162
1163    let internal_table_ids = get_internal_tables_by_id(object_id.as_job_id(), db).await?;
1164
1165    for (grantee, granted_by, action, with_grant_option) in default_privileges {
1166        UserPrivilege::insert(user_privilege::ActiveModel {
1167            user_id: Set(grantee),
1168            oid: Set(object_id),
1169            granted_by: Set(granted_by),
1170            action: Set(action),
1171            with_grant_option: Set(with_grant_option),
1172            ..Default::default()
1173        })
1174        .exec(db)
1175        .await?;
1176        if action == Action::Select && !internal_table_ids.is_empty() {
1177            // Grant SELECT privilege for internal tables if the action is SELECT.
1178            for internal_table_id in &internal_table_ids {
1179                UserPrivilege::insert(user_privilege::ActiveModel {
1180                    user_id: Set(grantee),
1181                    oid: Set(internal_table_id.as_object_id()),
1182                    granted_by: Set(granted_by),
1183                    action: Set(Action::Select),
1184                    with_grant_option: Set(with_grant_option),
1185                    ..Default::default()
1186                })
1187                .exec(db)
1188                .await?;
1189            }
1190        }
1191    }
1192
1193    let updated_user_infos = list_user_info_by_ids(updated_user_ids, db).await?;
1194    Ok(updated_user_infos)
1195}
1196
1197// todo: remove it after migrated to sql backend.
1198pub fn extract_grant_obj_id(object: &PbGrantObject) -> ObjectId {
1199    match object {
1200        PbGrantObject::DatabaseId(id)
1201        | PbGrantObject::SchemaId(id)
1202        | PbGrantObject::TableId(id)
1203        | PbGrantObject::SourceId(id)
1204        | PbGrantObject::SinkId(id)
1205        | PbGrantObject::ViewId(id)
1206        | PbGrantObject::FunctionId(id)
1207        | PbGrantObject::SubscriptionId(id)
1208        | PbGrantObject::ConnectionId(id)
1209        | PbGrantObject::SecretId(id) => (*id).into(),
1210    }
1211}
1212
1213pub async fn insert_fragment_relations(
1214    db: &impl ConnectionTrait,
1215    downstream_fragment_relations: &FragmentDownstreamRelation,
1216) -> MetaResult<()> {
1217    for (upstream_fragment_id, downstreams) in downstream_fragment_relations {
1218        for downstream in downstreams {
1219            let relation = fragment_relation::Model {
1220                source_fragment_id: *upstream_fragment_id as _,
1221                target_fragment_id: downstream.downstream_fragment_id as _,
1222                dispatcher_type: downstream.dispatcher_type,
1223                dist_key_indices: downstream
1224                    .dist_key_indices
1225                    .iter()
1226                    .map(|idx| *idx as i32)
1227                    .collect_vec()
1228                    .into(),
1229                output_indices: downstream
1230                    .output_mapping
1231                    .indices
1232                    .iter()
1233                    .map(|idx| *idx as i32)
1234                    .collect_vec()
1235                    .into(),
1236                output_type_mapping: Some(downstream.output_mapping.types.clone().into()),
1237            };
1238            FragmentRelation::insert(relation.into_active_model())
1239                .exec(db)
1240                .await?;
1241        }
1242    }
1243    Ok(())
1244}
1245
1246pub fn compose_dispatchers(
1247    source_fragment_distribution: DistributionType,
1248    source_fragment_actors: &HashMap<crate::model::ActorId, Option<Bitmap>>,
1249    target_fragment_id: crate::model::FragmentId,
1250    target_fragment_distribution: DistributionType,
1251    target_fragment_actors: &HashMap<crate::model::ActorId, Option<Bitmap>>,
1252    dispatcher_type: DispatcherType,
1253    dist_key_indices: Vec<u32>,
1254    output_mapping: PbDispatchOutputMapping,
1255) -> HashMap<crate::model::ActorId, PbDispatcher> {
1256    match dispatcher_type {
1257        DispatcherType::Hash => {
1258            let dispatcher = PbDispatcher {
1259                r#type: PbDispatcherType::from(dispatcher_type) as _,
1260                dist_key_indices,
1261                output_mapping: output_mapping.into(),
1262                hash_mapping: Some(
1263                    ActorMapping::from_bitmaps(
1264                        &target_fragment_actors
1265                            .iter()
1266                            .map(|(actor_id, bitmap)| {
1267                                (
1268                                    *actor_id as _,
1269                                    bitmap
1270                                        .clone()
1271                                        .expect("downstream hash dispatch must have distribution"),
1272                                )
1273                            })
1274                            .collect(),
1275                    )
1276                    .to_protobuf(),
1277                ),
1278                dispatcher_id: target_fragment_id.as_raw_id() as _,
1279                downstream_actor_id: target_fragment_actors.keys().copied().collect(),
1280            };
1281            source_fragment_actors
1282                .keys()
1283                .map(|source_actor_id| (*source_actor_id, dispatcher.clone()))
1284                .collect()
1285        }
1286        DispatcherType::Broadcast | DispatcherType::Simple => {
1287            let dispatcher = PbDispatcher {
1288                r#type: PbDispatcherType::from(dispatcher_type) as _,
1289                dist_key_indices,
1290                output_mapping: output_mapping.into(),
1291                hash_mapping: None,
1292                dispatcher_id: target_fragment_id.as_raw_id() as _,
1293                downstream_actor_id: target_fragment_actors.keys().copied().collect(),
1294            };
1295            source_fragment_actors
1296                .keys()
1297                .map(|source_actor_id| (*source_actor_id, dispatcher.clone()))
1298                .collect()
1299        }
1300        DispatcherType::NoShuffle => resolve_no_shuffle_actor_dispatcher(
1301            source_fragment_distribution,
1302            source_fragment_actors,
1303            target_fragment_distribution,
1304            target_fragment_actors,
1305        )
1306        .into_iter()
1307        .map(|(upstream_actor_id, downstream_actor_id)| {
1308            (
1309                upstream_actor_id,
1310                PbDispatcher {
1311                    r#type: PbDispatcherType::NoShuffle as _,
1312                    dist_key_indices: dist_key_indices.clone(),
1313                    output_mapping: output_mapping.clone().into(),
1314                    hash_mapping: None,
1315                    dispatcher_id: target_fragment_id.as_raw_id() as _,
1316                    downstream_actor_id: vec![downstream_actor_id],
1317                },
1318            )
1319        })
1320        .collect(),
1321    }
1322}
1323
1324/// return (`upstream_actor_id` -> `downstream_actor_id`)
1325pub fn resolve_no_shuffle_actor_dispatcher(
1326    source_fragment_distribution: DistributionType,
1327    source_fragment_actors: &HashMap<crate::model::ActorId, Option<Bitmap>>,
1328    target_fragment_distribution: DistributionType,
1329    target_fragment_actors: &HashMap<crate::model::ActorId, Option<Bitmap>>,
1330) -> Vec<(crate::model::ActorId, crate::model::ActorId)> {
1331    assert_eq!(source_fragment_distribution, target_fragment_distribution);
1332    assert_eq!(
1333        source_fragment_actors.len(),
1334        target_fragment_actors.len(),
1335        "no-shuffle should have equal upstream downstream actor count: {:?} {:?}",
1336        source_fragment_actors,
1337        target_fragment_actors
1338    );
1339    match source_fragment_distribution {
1340        DistributionType::Single => {
1341            let assert_singleton = |bitmap: &Option<Bitmap>| {
1342                assert!(
1343                    bitmap.as_ref().map(|bitmap| bitmap.all()).unwrap_or(true),
1344                    "not singleton: {:?}",
1345                    bitmap
1346                );
1347            };
1348            assert_eq!(
1349                source_fragment_actors.len(),
1350                1,
1351                "singleton distribution actor count not 1: {:?}",
1352                source_fragment_distribution
1353            );
1354            assert_eq!(
1355                target_fragment_actors.len(),
1356                1,
1357                "singleton distribution actor count not 1: {:?}",
1358                target_fragment_distribution
1359            );
1360            let (source_actor_id, bitmap) = source_fragment_actors.iter().next().unwrap();
1361            assert_singleton(bitmap);
1362            let (target_actor_id, bitmap) = target_fragment_actors.iter().next().unwrap();
1363            assert_singleton(bitmap);
1364            vec![(*source_actor_id, *target_actor_id)]
1365        }
1366        DistributionType::Hash => {
1367            let mut target_fragment_actor_index: HashMap<_, _> = target_fragment_actors
1368                .iter()
1369                .map(|(actor_id, bitmap)| {
1370                    let bitmap = bitmap
1371                        .as_ref()
1372                        .expect("hash distribution should have bitmap");
1373                    let first_vnode = bitmap.iter_vnodes().next().expect("non-empty bitmap");
1374                    (first_vnode, (*actor_id, bitmap))
1375                })
1376                .collect();
1377            source_fragment_actors
1378                .iter()
1379                .map(|(source_actor_id, bitmap)| {
1380                    let bitmap = bitmap
1381                        .as_ref()
1382                        .expect("hash distribution should have bitmap");
1383                    let first_vnode = bitmap.iter_vnodes().next().expect("non-empty bitmap");
1384                    let (target_actor_id, target_bitmap) =
1385                        target_fragment_actor_index.remove(&first_vnode).unwrap_or_else(|| {
1386                            panic!(
1387                                "cannot find matched target actor: {} {:?} {:?} {:?}",
1388                                source_actor_id,
1389                                first_vnode,
1390                                source_fragment_actors,
1391                                target_fragment_actors
1392                            );
1393                        });
1394                    assert_eq!(
1395                        bitmap,
1396                        target_bitmap,
1397                        "cannot find matched target actor due to bitmap mismatch: {} {:?} {:?} {:?}",
1398                        source_actor_id,
1399                        first_vnode,
1400                        source_fragment_actors,
1401                        target_fragment_actors
1402                    );
1403                    (*source_actor_id, target_actor_id)
1404                }).collect()
1405        }
1406    }
1407}
1408
1409pub fn rebuild_fragment_mapping(fragment: &SharedFragmentInfo) -> PbFragmentWorkerSlotMapping {
1410    let fragment_worker_slot_mapping = match fragment.distribution_type {
1411        DistributionType::Single => {
1412            let actor = fragment.actors.values().exactly_one().unwrap();
1413            WorkerSlotMapping::new_single(WorkerSlotId::new(actor.worker_id as _, 0))
1414        }
1415        DistributionType::Hash => {
1416            let actor_bitmaps: HashMap<_, _> = fragment
1417                .actors
1418                .iter()
1419                .map(|(actor_id, actor_info)| {
1420                    let vnode_bitmap = actor_info
1421                        .vnode_bitmap
1422                        .as_ref()
1423                        .cloned()
1424                        .expect("actor bitmap shouldn't be none in hash fragment");
1425
1426                    (*actor_id as hash::ActorId, vnode_bitmap)
1427                })
1428                .collect();
1429
1430            let actor_mapping = ActorMapping::from_bitmaps(&actor_bitmaps);
1431
1432            let actor_locations = fragment
1433                .actors
1434                .iter()
1435                .map(|(actor_id, actor_info)| (*actor_id as hash::ActorId, actor_info.worker_id))
1436                .collect();
1437
1438            actor_mapping.to_worker_slot(&actor_locations)
1439        }
1440    };
1441
1442    PbFragmentWorkerSlotMapping {
1443        fragment_id: fragment.fragment_id,
1444        mapping: Some(fragment_worker_slot_mapping.to_protobuf()),
1445    }
1446}
1447
1448/// For the given streaming jobs, returns
1449/// - All source fragments
1450/// - All sink fragments
1451/// - All actors
1452/// - All fragments
1453pub async fn get_fragments_for_jobs<C>(
1454    db: &C,
1455    actor_info: &SharedActorInfos,
1456    streaming_jobs: Vec<JobId>,
1457) -> MetaResult<(
1458    HashMap<SourceId, BTreeSet<FragmentId>>,
1459    HashSet<FragmentId>,
1460    HashSet<ActorId>,
1461    HashSet<FragmentId>,
1462)>
1463where
1464    C: ConnectionTrait,
1465{
1466    if streaming_jobs.is_empty() {
1467        return Ok((
1468            HashMap::default(),
1469            HashSet::default(),
1470            HashSet::default(),
1471            HashSet::default(),
1472        ));
1473    }
1474
1475    let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find()
1476        .select_only()
1477        .columns([
1478            fragment::Column::FragmentId,
1479            fragment::Column::FragmentTypeMask,
1480            fragment::Column::StreamNode,
1481        ])
1482        .filter(fragment::Column::JobId.is_in(streaming_jobs))
1483        .into_tuple()
1484        .all(db)
1485        .await?;
1486
1487    let fragment_ids: HashSet<_> = fragments
1488        .iter()
1489        .map(|(fragment_id, _, _)| *fragment_id)
1490        .collect();
1491
1492    let actors = {
1493        let guard = actor_info.read_guard();
1494        fragment_ids
1495            .iter()
1496            .flat_map(|id| guard.get_fragment(*id as _))
1497            .flat_map(|f| f.actors.keys().cloned().map(|id| id as _))
1498            .collect::<HashSet<_>>()
1499    };
1500
1501    let mut source_fragment_ids: HashMap<SourceId, BTreeSet<FragmentId>> = HashMap::new();
1502    let mut sink_fragment_ids: HashSet<FragmentId> = HashSet::new();
1503    for (fragment_id, mask, stream_node) in fragments {
1504        if FragmentTypeMask::from(mask).contains(FragmentTypeFlag::Source)
1505            && let Some(source_id) = stream_node.to_protobuf().find_stream_source()
1506        {
1507            source_fragment_ids
1508                .entry(source_id)
1509                .or_default()
1510                .insert(fragment_id);
1511        }
1512        if FragmentTypeMask::from(mask).contains(FragmentTypeFlag::Sink) {
1513            sink_fragment_ids.insert(fragment_id);
1514        }
1515    }
1516
1517    Ok((
1518        source_fragment_ids,
1519        sink_fragment_ids,
1520        actors.into_iter().collect(),
1521        fragment_ids,
1522    ))
1523}
1524
1525/// Build a object group for notifying the deletion of the given objects.
1526///
1527/// Note that only id fields are filled in the object info, as the arguments are partial objects.
1528/// As a result, the returned notification info should only be used for deletion.
1529pub(crate) fn build_object_group_for_delete(
1530    partial_objects: Vec<PartialObject>,
1531) -> NotificationInfo {
1532    let mut objects = vec![];
1533    for obj in partial_objects {
1534        match obj.obj_type {
1535            ObjectType::Database => objects.push(PbObject {
1536                object_info: Some(PbObjectInfo::Database(PbDatabase {
1537                    id: obj.oid.as_database_id(),
1538                    ..Default::default()
1539                })),
1540            }),
1541            ObjectType::Schema => objects.push(PbObject {
1542                object_info: Some(PbObjectInfo::Schema(PbSchema {
1543                    id: obj.oid.as_schema_id(),
1544                    database_id: obj.database_id.unwrap(),
1545                    ..Default::default()
1546                })),
1547            }),
1548            ObjectType::Table => objects.push(PbObject {
1549                object_info: Some(PbObjectInfo::Table(PbTable {
1550                    id: obj.oid.as_table_id(),
1551                    schema_id: obj.schema_id.unwrap(),
1552                    database_id: obj.database_id.unwrap(),
1553                    ..Default::default()
1554                })),
1555            }),
1556            ObjectType::Source => objects.push(PbObject {
1557                object_info: Some(PbObjectInfo::Source(PbSource {
1558                    id: obj.oid.as_source_id(),
1559                    schema_id: obj.schema_id.unwrap(),
1560                    database_id: obj.database_id.unwrap(),
1561                    ..Default::default()
1562                })),
1563            }),
1564            ObjectType::Sink => objects.push(PbObject {
1565                object_info: Some(PbObjectInfo::Sink(PbSink {
1566                    id: obj.oid.as_sink_id(),
1567                    schema_id: obj.schema_id.unwrap(),
1568                    database_id: obj.database_id.unwrap(),
1569                    ..Default::default()
1570                })),
1571            }),
1572            ObjectType::Subscription => objects.push(PbObject {
1573                object_info: Some(PbObjectInfo::Subscription(PbSubscription {
1574                    id: obj.oid.as_subscription_id(),
1575                    schema_id: obj.schema_id.unwrap(),
1576                    database_id: obj.database_id.unwrap(),
1577                    ..Default::default()
1578                })),
1579            }),
1580            ObjectType::View => objects.push(PbObject {
1581                object_info: Some(PbObjectInfo::View(PbView {
1582                    id: obj.oid.as_view_id(),
1583                    schema_id: obj.schema_id.unwrap(),
1584                    database_id: obj.database_id.unwrap(),
1585                    ..Default::default()
1586                })),
1587            }),
1588            ObjectType::Index => {
1589                objects.push(PbObject {
1590                    object_info: Some(PbObjectInfo::Index(PbIndex {
1591                        id: obj.oid.as_index_id(),
1592                        schema_id: obj.schema_id.unwrap(),
1593                        database_id: obj.database_id.unwrap(),
1594                        ..Default::default()
1595                    })),
1596                });
1597                objects.push(PbObject {
1598                    object_info: Some(PbObjectInfo::Table(PbTable {
1599                        id: obj.oid.as_table_id(),
1600                        schema_id: obj.schema_id.unwrap(),
1601                        database_id: obj.database_id.unwrap(),
1602                        ..Default::default()
1603                    })),
1604                });
1605            }
1606            ObjectType::Function => objects.push(PbObject {
1607                object_info: Some(PbObjectInfo::Function(PbFunction {
1608                    id: obj.oid.as_function_id(),
1609                    schema_id: obj.schema_id.unwrap(),
1610                    database_id: obj.database_id.unwrap(),
1611                    ..Default::default()
1612                })),
1613            }),
1614            ObjectType::Connection => objects.push(PbObject {
1615                object_info: Some(PbObjectInfo::Connection(PbConnection {
1616                    id: obj.oid.as_connection_id(),
1617                    schema_id: obj.schema_id.unwrap(),
1618                    database_id: obj.database_id.unwrap(),
1619                    ..Default::default()
1620                })),
1621            }),
1622            ObjectType::Secret => objects.push(PbObject {
1623                object_info: Some(PbObjectInfo::Secret(PbSecret {
1624                    id: obj.oid.as_secret_id(),
1625                    schema_id: obj.schema_id.unwrap(),
1626                    database_id: obj.database_id.unwrap(),
1627                    ..Default::default()
1628                })),
1629            }),
1630        }
1631    }
1632    NotificationInfo::ObjectGroup(PbObjectGroup { objects })
1633}
1634
1635pub fn extract_external_table_name_from_definition(table_definition: &str) -> Option<String> {
1636    let [mut definition]: [_; 1] = Parser::parse_sql(table_definition)
1637        .context("unable to parse table definition")
1638        .inspect_err(|e| {
1639            tracing::error!(
1640                target: "auto_schema_change",
1641                error = %e.as_report(),
1642                "failed to parse table definition")
1643        })
1644        .unwrap()
1645        .try_into()
1646        .unwrap();
1647    if let SqlStatement::CreateTable { cdc_table_info, .. } = &mut definition {
1648        cdc_table_info
1649            .clone()
1650            .map(|cdc_table_info| cdc_table_info.external_table_name)
1651    } else {
1652        None
1653    }
1654}
1655
1656/// `rename_relation` renames the target relation and its definition,
1657/// it commits the changes to the transaction and returns the updated relations and the old name.
1658pub async fn rename_relation(
1659    txn: &DatabaseTransaction,
1660    object_type: ObjectType,
1661    object_id: ObjectId,
1662    object_name: &str,
1663) -> MetaResult<(Vec<PbObject>, String)> {
1664    use sea_orm::ActiveModelTrait;
1665
1666    use crate::controller::rename::alter_relation_rename;
1667
1668    let mut to_update_relations = vec![];
1669    // rename relation.
1670    macro_rules! rename_relation {
1671        ($entity:ident, $table:ident, $identity:ident, $object_id:expr) => {{
1672            let (mut relation, obj) = $entity::find_by_id($object_id)
1673                .find_also_related(Object)
1674                .one(txn)
1675                .await?
1676                .unwrap();
1677            let obj = obj.unwrap();
1678            let old_name = relation.name.clone();
1679            relation.name = object_name.into();
1680            if obj.obj_type != ObjectType::View {
1681                relation.definition = alter_relation_rename(&relation.definition, object_name);
1682            }
1683            let active_model = $table::ActiveModel {
1684                $identity: Set(relation.$identity),
1685                name: Set(object_name.into()),
1686                definition: Set(relation.definition.clone()),
1687                ..Default::default()
1688            };
1689            active_model.update(txn).await?;
1690            to_update_relations.push(PbObject {
1691                object_info: Some(PbObjectInfo::$entity(ObjectModel(relation, obj).into())),
1692            });
1693            old_name
1694        }};
1695    }
1696    // TODO: check is there any thing to change for shared source?
1697    let old_name = match object_type {
1698        ObjectType::Table => {
1699            let associated_source_id: Option<SourceId> = Source::find()
1700                .select_only()
1701                .column(source::Column::SourceId)
1702                .filter(source::Column::OptionalAssociatedTableId.eq(object_id))
1703                .into_tuple()
1704                .one(txn)
1705                .await?;
1706            if let Some(source_id) = associated_source_id {
1707                rename_relation!(Source, source, source_id, source_id);
1708            }
1709            rename_relation!(Table, table, table_id, object_id.as_table_id())
1710        }
1711        ObjectType::Source => {
1712            rename_relation!(Source, source, source_id, object_id.as_source_id())
1713        }
1714        ObjectType::Sink => rename_relation!(Sink, sink, sink_id, object_id.as_sink_id()),
1715        ObjectType::Subscription => {
1716            rename_relation!(
1717                Subscription,
1718                subscription,
1719                subscription_id,
1720                object_id.as_subscription_id()
1721            )
1722        }
1723        ObjectType::View => rename_relation!(View, view, view_id, object_id.as_view_id()),
1724        ObjectType::Index => {
1725            let (mut index, obj) = Index::find_by_id(object_id.as_index_id())
1726                .find_also_related(Object)
1727                .one(txn)
1728                .await?
1729                .unwrap();
1730            index.name = object_name.into();
1731            let index_table_id = index.index_table_id;
1732            let old_name = rename_relation!(Table, table, table_id, index_table_id);
1733
1734            // the name of index and its associated table is the same.
1735            let active_model = index::ActiveModel {
1736                index_id: sea_orm::ActiveValue::Set(index.index_id),
1737                name: sea_orm::ActiveValue::Set(object_name.into()),
1738                ..Default::default()
1739            };
1740            active_model.update(txn).await?;
1741            to_update_relations.push(PbObject {
1742                object_info: Some(PbObjectInfo::Index(ObjectModel(index, obj.unwrap()).into())),
1743            });
1744            old_name
1745        }
1746        _ => unreachable!("only relation name can be altered."),
1747    };
1748
1749    Ok((to_update_relations, old_name))
1750}
1751
1752pub async fn get_database_resource_group<C>(txn: &C, database_id: DatabaseId) -> MetaResult<String>
1753where
1754    C: ConnectionTrait,
1755{
1756    let database_resource_group: Option<String> = Database::find_by_id(database_id)
1757        .select_only()
1758        .column(database::Column::ResourceGroup)
1759        .into_tuple()
1760        .one(txn)
1761        .await?
1762        .ok_or_else(|| MetaError::catalog_id_not_found("database", database_id))?;
1763
1764    Ok(database_resource_group.unwrap_or_else(|| DEFAULT_RESOURCE_GROUP.to_owned()))
1765}
1766
1767pub async fn get_existing_job_resource_group<C>(
1768    txn: &C,
1769    streaming_job_id: JobId,
1770) -> MetaResult<String>
1771where
1772    C: ConnectionTrait,
1773{
1774    let (job_specific_resource_group, database_resource_group): (Option<String>, Option<String>) =
1775        StreamingJob::find_by_id(streaming_job_id)
1776            .select_only()
1777            .join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
1778            .join(JoinType::InnerJoin, object::Relation::Database2.def())
1779            .column(streaming_job::Column::SpecificResourceGroup)
1780            .column(database::Column::ResourceGroup)
1781            .into_tuple()
1782            .one(txn)
1783            .await?
1784            .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", streaming_job_id))?;
1785
1786    Ok(job_specific_resource_group.unwrap_or_else(|| {
1787        database_resource_group.unwrap_or_else(|| DEFAULT_RESOURCE_GROUP.to_owned())
1788    }))
1789}
1790
1791pub fn filter_workers_by_resource_group(
1792    workers: &HashMap<WorkerId, WorkerNode>,
1793    resource_group: &str,
1794) -> BTreeSet<WorkerId> {
1795    workers
1796        .iter()
1797        .filter(|&(_, worker)| {
1798            worker
1799                .resource_group()
1800                .map(|node_label| node_label.as_str() == resource_group)
1801                .unwrap_or(false)
1802        })
1803        .map(|(id, _)| *id)
1804        .collect()
1805}
1806
1807/// `rename_relation_refer` updates the definition of relations that refer to the target one,
1808/// it commits the changes to the transaction and returns all the updated relations.
1809pub async fn rename_relation_refer(
1810    txn: &DatabaseTransaction,
1811    object_type: ObjectType,
1812    object_id: ObjectId,
1813    object_name: &str,
1814    old_name: &str,
1815) -> MetaResult<Vec<PbObject>> {
1816    use sea_orm::ActiveModelTrait;
1817
1818    use crate::controller::rename::alter_relation_rename_refs;
1819
1820    let mut to_update_relations = vec![];
1821    macro_rules! rename_relation_ref {
1822        ($entity:ident, $table:ident, $identity:ident, $object_id:expr) => {{
1823            let (mut relation, obj) = $entity::find_by_id($object_id)
1824                .find_also_related(Object)
1825                .one(txn)
1826                .await?
1827                .unwrap();
1828            relation.definition =
1829                alter_relation_rename_refs(&relation.definition, old_name, object_name);
1830            let active_model = $table::ActiveModel {
1831                $identity: Set(relation.$identity),
1832                definition: Set(relation.definition.clone()),
1833                ..Default::default()
1834            };
1835            active_model.update(txn).await?;
1836            to_update_relations.push(PbObject {
1837                object_info: Some(PbObjectInfo::$entity(
1838                    ObjectModel(relation, obj.unwrap()).into(),
1839                )),
1840            });
1841        }};
1842    }
1843    let mut objs = get_referring_objects(object_id, txn).await?;
1844    if object_type == ObjectType::Table {
1845        let incoming_sinks: Vec<SinkId> = Sink::find()
1846            .select_only()
1847            .column(sink::Column::SinkId)
1848            .filter(sink::Column::TargetTable.eq(object_id))
1849            .into_tuple()
1850            .all(txn)
1851            .await?;
1852
1853        objs.extend(incoming_sinks.into_iter().map(|id| PartialObject {
1854            oid: id.as_object_id(),
1855            obj_type: ObjectType::Sink,
1856            schema_id: None,
1857            database_id: None,
1858        }));
1859    }
1860
1861    for obj in objs {
1862        match obj.obj_type {
1863            ObjectType::Table => {
1864                rename_relation_ref!(Table, table, table_id, obj.oid.as_table_id())
1865            }
1866            ObjectType::Sink => {
1867                rename_relation_ref!(Sink, sink, sink_id, obj.oid.as_sink_id())
1868            }
1869            ObjectType::Subscription => {
1870                rename_relation_ref!(
1871                    Subscription,
1872                    subscription,
1873                    subscription_id,
1874                    obj.oid.as_subscription_id()
1875                )
1876            }
1877            ObjectType::View => rename_relation_ref!(View, view, view_id, obj.oid.as_view_id()),
1878            ObjectType::Index => {
1879                let index_table_id: Option<TableId> = Index::find_by_id(obj.oid.as_index_id())
1880                    .select_only()
1881                    .column(index::Column::IndexTableId)
1882                    .into_tuple()
1883                    .one(txn)
1884                    .await?;
1885                rename_relation_ref!(Table, table, table_id, index_table_id.unwrap());
1886            }
1887            _ => {
1888                bail!(
1889                    "only the table, sink, subscription, view and index will depend on other objects."
1890                )
1891            }
1892        }
1893    }
1894
1895    Ok(to_update_relations)
1896}
1897
1898/// Validate that subscription can be safely deleted, meeting any of the following conditions:
1899/// 1. The upstream table is not referred to by any cross-db mv.
1900/// 2. After deleting the subscription, the upstream table still has at least one subscription.
1901pub async fn validate_subscription_deletion<C>(
1902    txn: &C,
1903    subscription_id: SubscriptionId,
1904) -> MetaResult<()>
1905where
1906    C: ConnectionTrait,
1907{
1908    let upstream_table_id: ObjectId = Subscription::find_by_id(subscription_id)
1909        .select_only()
1910        .column(subscription::Column::DependentTableId)
1911        .into_tuple()
1912        .one(txn)
1913        .await?
1914        .ok_or_else(|| MetaError::catalog_id_not_found("subscription", subscription_id))?;
1915
1916    let cnt = Subscription::find()
1917        .filter(subscription::Column::DependentTableId.eq(upstream_table_id))
1918        .count(txn)
1919        .await?;
1920    if cnt > 1 {
1921        // Ensure that at least one subscription is remained for the upstream table
1922        // once the subscription is dropped.
1923        return Ok(());
1924    }
1925
1926    // Ensure that the upstream table is not referred by any cross-db mv.
1927    let obj_alias = Alias::new("o1");
1928    let used_by_alias = Alias::new("o2");
1929    let count = ObjectDependency::find()
1930        .join_as(
1931            JoinType::InnerJoin,
1932            object_dependency::Relation::Object2.def(),
1933            obj_alias.clone(),
1934        )
1935        .join_as(
1936            JoinType::InnerJoin,
1937            object_dependency::Relation::Object1.def(),
1938            used_by_alias.clone(),
1939        )
1940        .filter(
1941            object_dependency::Column::Oid
1942                .eq(upstream_table_id)
1943                .and(object_dependency::Column::UsedBy.ne(subscription_id))
1944                .and(
1945                    Expr::col((obj_alias, object::Column::DatabaseId))
1946                        .ne(Expr::col((used_by_alias, object::Column::DatabaseId))),
1947                ),
1948        )
1949        .count(txn)
1950        .await?;
1951
1952    if count != 0 {
1953        return Err(MetaError::permission_denied(format!(
1954            "Referenced by {} cross-db objects.",
1955            count
1956        )));
1957    }
1958
1959    Ok(())
1960}
1961
1962pub async fn fetch_target_fragments<C>(
1963    txn: &C,
1964    src_fragment_id: impl IntoIterator<Item = FragmentId>,
1965) -> MetaResult<HashMap<FragmentId, Vec<FragmentId>>>
1966where
1967    C: ConnectionTrait,
1968{
1969    let source_target_fragments: Vec<(FragmentId, FragmentId)> = FragmentRelation::find()
1970        .select_only()
1971        .columns([
1972            fragment_relation::Column::SourceFragmentId,
1973            fragment_relation::Column::TargetFragmentId,
1974        ])
1975        .filter(fragment_relation::Column::SourceFragmentId.is_in(src_fragment_id))
1976        .into_tuple()
1977        .all(txn)
1978        .await?;
1979
1980    let source_target_fragments = source_target_fragments.into_iter().into_group_map();
1981
1982    Ok(source_target_fragments)
1983}
1984
1985pub async fn get_sink_fragment_by_ids<C>(
1986    txn: &C,
1987    sink_ids: Vec<SinkId>,
1988) -> MetaResult<HashMap<SinkId, FragmentId>>
1989where
1990    C: ConnectionTrait,
1991{
1992    let sink_num = sink_ids.len();
1993    let sink_fragment_ids: Vec<(SinkId, FragmentId)> = Fragment::find()
1994        .select_only()
1995        .columns([fragment::Column::JobId, fragment::Column::FragmentId])
1996        .filter(
1997            fragment::Column::JobId
1998                .is_in(sink_ids)
1999                .and(FragmentTypeMask::intersects(FragmentTypeFlag::Sink)),
2000        )
2001        .into_tuple()
2002        .all(txn)
2003        .await?;
2004
2005    if sink_fragment_ids.len() != sink_num {
2006        return Err(anyhow::anyhow!(
2007            "expected exactly one sink fragment for each sink, but got {} fragments for {} sinks",
2008            sink_fragment_ids.len(),
2009            sink_num
2010        )
2011        .into());
2012    }
2013
2014    Ok(sink_fragment_ids.into_iter().collect())
2015}
2016
2017pub async fn has_table_been_migrated<C>(txn: &C, table_id: TableId) -> MetaResult<bool>
2018where
2019    C: ConnectionTrait,
2020{
2021    let mview_fragment: Vec<i32> = Fragment::find()
2022        .select_only()
2023        .column(fragment::Column::FragmentTypeMask)
2024        .filter(
2025            fragment::Column::JobId
2026                .eq(table_id)
2027                .and(FragmentTypeMask::intersects(FragmentTypeFlag::Mview)),
2028        )
2029        .into_tuple()
2030        .all(txn)
2031        .await?;
2032
2033    let mview_fragment_len = mview_fragment.len();
2034    if mview_fragment_len != 1 {
2035        bail!(
2036            "expected exactly one mview fragment for table {}, found {}",
2037            table_id,
2038            mview_fragment_len
2039        );
2040    }
2041
2042    let mview_fragment = mview_fragment.into_iter().next().unwrap();
2043    let migrated =
2044        FragmentTypeMask::from(mview_fragment).contains(FragmentTypeFlag::UpstreamSinkUnion);
2045
2046    Ok(migrated)
2047}
2048
2049pub async fn try_get_iceberg_table_by_downstream_sink<C>(
2050    txn: &C,
2051    sink_id: SinkId,
2052) -> MetaResult<Option<TableId>>
2053where
2054    C: ConnectionTrait,
2055{
2056    let sink = Sink::find_by_id(sink_id).one(txn).await?;
2057    let Some(sink) = sink else {
2058        return Ok(None);
2059    };
2060
2061    if sink.name.starts_with(ICEBERG_SINK_PREFIX) {
2062        let object_ids: Vec<ObjectId> = ObjectDependency::find()
2063            .select_only()
2064            .column(object_dependency::Column::Oid)
2065            .filter(object_dependency::Column::UsedBy.eq(sink_id))
2066            .into_tuple()
2067            .all(txn)
2068            .await?;
2069        let mut iceberg_table_ids = vec![];
2070        for object_id in object_ids {
2071            let table_id = object_id.as_table_id();
2072            if let Some(table_engine) = Table::find_by_id(table_id)
2073                .select_only()
2074                .column(table::Column::Engine)
2075                .into_tuple::<table::Engine>()
2076                .one(txn)
2077                .await?
2078                && table_engine == table::Engine::Iceberg
2079            {
2080                iceberg_table_ids.push(table_id);
2081            }
2082        }
2083        if iceberg_table_ids.len() == 1 {
2084            return Ok(Some(iceberg_table_ids[0]));
2085        }
2086    }
2087    Ok(None)
2088}
2089
2090pub async fn check_if_belongs_to_iceberg_table<C>(txn: &C, job_id: JobId) -> MetaResult<bool>
2091where
2092    C: ConnectionTrait,
2093{
2094    if let Some(engine) = Table::find_by_id(job_id.as_mv_table_id())
2095        .select_only()
2096        .column(table::Column::Engine)
2097        .into_tuple::<table::Engine>()
2098        .one(txn)
2099        .await?
2100        && engine == table::Engine::Iceberg
2101    {
2102        return Ok(true);
2103    }
2104    if let Some(sink_name) = Sink::find_by_id(job_id.as_sink_id())
2105        .select_only()
2106        .column(sink::Column::Name)
2107        .into_tuple::<String>()
2108        .one(txn)
2109        .await?
2110        && sink_name.starts_with(ICEBERG_SINK_PREFIX)
2111    {
2112        return Ok(true);
2113    }
2114    Ok(false)
2115}
2116
2117pub async fn find_dirty_iceberg_table_jobs<C>(
2118    txn: &C,
2119    database_id: Option<DatabaseId>,
2120) -> MetaResult<Vec<PartialObject>>
2121where
2122    C: ConnectionTrait,
2123{
2124    let mut filter_condition = streaming_job::Column::JobStatus
2125        .ne(JobStatus::Created)
2126        .and(object::Column::ObjType.is_in([ObjectType::Table, ObjectType::Sink]))
2127        .and(streaming_job::Column::CreateType.eq(CreateType::Background));
2128    if let Some(database_id) = database_id {
2129        filter_condition = filter_condition.and(object::Column::DatabaseId.eq(database_id));
2130    }
2131    let creating_table_sink_jobs: Vec<PartialObject> = StreamingJob::find()
2132        .select_only()
2133        .columns([
2134            object::Column::Oid,
2135            object::Column::ObjType,
2136            object::Column::SchemaId,
2137            object::Column::DatabaseId,
2138        ])
2139        .join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
2140        .filter(filter_condition)
2141        .into_partial_model()
2142        .all(txn)
2143        .await?;
2144
2145    let mut dirty_iceberg_table_jobs = vec![];
2146    for job in creating_table_sink_jobs {
2147        if check_if_belongs_to_iceberg_table(txn, job.oid.as_job_id()).await? {
2148            tracing::info!("Found dirty iceberg job with id: {}", job.oid);
2149            dirty_iceberg_table_jobs.push(job);
2150        }
2151    }
2152
2153    Ok(dirty_iceberg_table_jobs)
2154}
2155
2156pub fn build_select_node_list(
2157    from: &[ColumnCatalog],
2158    to: &[ColumnCatalog],
2159) -> MetaResult<Vec<PbExprNode>> {
2160    let mut exprs = Vec::with_capacity(to.len());
2161    let idx_by_col_id = from
2162        .iter()
2163        .enumerate()
2164        .map(|(idx, col)| (col.column_desc.as_ref().unwrap().column_id, idx))
2165        .collect::<HashMap<_, _>>();
2166
2167    for to_col in to {
2168        let to_col = to_col.column_desc.as_ref().unwrap();
2169        let to_col_type = to_col.column_type.clone();
2170        if let Some(from_idx) = idx_by_col_id.get(&to_col.column_id) {
2171            let from_col_type = from[*from_idx]
2172                .column_desc
2173                .as_ref()
2174                .unwrap()
2175                .column_type
2176                .clone();
2177            if to_col_type != from_col_type {
2178                return Err(anyhow!(
2179                    "Column type mismatch: {:?} != {:?}",
2180                    from_col_type,
2181                    to_col_type
2182                )
2183                .into());
2184            }
2185            exprs.push(PbExprNode {
2186                function_type: expr_node::Type::Unspecified.into(),
2187                return_type: to_col_type,
2188                rex_node: Some(expr_node::RexNode::InputRef(*from_idx as _)),
2189            });
2190        } else {
2191            let to_default_node =
2192                if let Some(GeneratedOrDefaultColumn::DefaultColumn(DefaultColumnDesc {
2193                    expr,
2194                    ..
2195                })) = &to_col.generated_or_default_column
2196                {
2197                    expr.clone().unwrap()
2198                } else {
2199                    let null = Datum::None.to_protobuf();
2200                    PbExprNode {
2201                        function_type: expr_node::Type::Unspecified.into(),
2202                        return_type: to_col_type,
2203                        rex_node: Some(expr_node::RexNode::Constant(null)),
2204                    }
2205                };
2206            exprs.push(to_default_node);
2207        }
2208    }
2209
2210    Ok(exprs)
2211}
2212
2213#[derive(Clone, Debug, Default)]
2214pub struct StreamingJobExtraInfo {
2215    pub timezone: Option<String>,
2216    pub job_definition: String,
2217}
2218
2219pub async fn get_streaming_job_extra_info<C>(
2220    txn: &C,
2221    job_ids: Vec<JobId>,
2222) -> MetaResult<HashMap<JobId, StreamingJobExtraInfo>>
2223where
2224    C: ConnectionTrait,
2225{
2226    let timezone_pairs: Vec<(JobId, Option<String>)> = StreamingJob::find()
2227        .select_only()
2228        .columns([
2229            streaming_job::Column::JobId,
2230            streaming_job::Column::Timezone,
2231        ])
2232        .filter(streaming_job::Column::JobId.is_in(job_ids.clone()))
2233        .into_tuple()
2234        .all(txn)
2235        .await?;
2236
2237    let job_ids = job_ids.into_iter().collect();
2238
2239    let mut definitions = resolve_streaming_job_definition(txn, &job_ids).await?;
2240
2241    let result = timezone_pairs
2242        .into_iter()
2243        .map(|(job_id, timezone)| {
2244            let job_definition = definitions.remove(&job_id).unwrap_or_default();
2245            (
2246                job_id,
2247                StreamingJobExtraInfo {
2248                    timezone,
2249                    job_definition,
2250                },
2251            )
2252        })
2253        .collect();
2254
2255    Ok(result)
2256}
2257
2258#[cfg(test)]
2259mod tests {
2260    use super::*;
2261
2262    #[test]
2263    fn test_extract_cdc_table_name() {
2264        let ddl1 = "CREATE TABLE t1 () FROM pg_source TABLE 'public.t1'";
2265        let ddl2 = "CREATE TABLE t2 (v1 int) FROM pg_source TABLE 'mydb.t2'";
2266        assert_eq!(
2267            extract_external_table_name_from_definition(ddl1),
2268            Some("public.t1".into())
2269        );
2270        assert_eq!(
2271            extract_external_table_name_from_definition(ddl2),
2272            Some("mydb.t2".into())
2273        );
2274    }
2275}