risingwave_meta/controller/
utils.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeSet, HashMap, HashSet};
16use std::sync::Arc;
17
18use anyhow::{Context, anyhow};
19use itertools::Itertools;
20use risingwave_common::bitmap::Bitmap;
21use risingwave_common::catalog::{
22    FragmentTypeFlag, FragmentTypeMask, ICEBERG_SINK_PREFIX, ICEBERG_SOURCE_PREFIX,
23};
24use risingwave_common::hash::{ActorMapping, VnodeBitmapExt, WorkerSlotId, WorkerSlotMapping};
25use risingwave_common::id::{JobId, SubscriptionId};
26use risingwave_common::system_param::AdaptiveParallelismStrategy;
27use risingwave_common::system_param::adaptive_parallelism_strategy::parse_strategy;
28use risingwave_common::types::{DataType, Datum};
29use risingwave_common::util::value_encoding::DatumToProtoExt;
30use risingwave_common::util::worker_util::DEFAULT_RESOURCE_GROUP;
31use risingwave_common::{bail, hash};
32use risingwave_meta_model::fragment::DistributionType;
33use risingwave_meta_model::object::ObjectType;
34use risingwave_meta_model::prelude::*;
35use risingwave_meta_model::streaming_job::BackfillOrders;
36use risingwave_meta_model::table::TableType;
37use risingwave_meta_model::user_privilege::Action;
38use risingwave_meta_model::{
39    ActorId, ColumnCatalogArray, CreateType, DataTypeArray, DatabaseId, DispatcherType, FragmentId,
40    JobStatus, ObjectId, PrivilegeId, SchemaId, SinkId, SourceId, StreamNode, StreamSourceInfo,
41    TableId, TableIdArray, UserId, WorkerId, connection, database, fragment, fragment_relation,
42    function, index, object, object_dependency, schema, secret, sink, source, streaming_job,
43    subscription, table, user, user_default_privilege, user_privilege, view,
44};
45use risingwave_meta_model_migration::WithQuery;
46use risingwave_pb::catalog::{
47    PbConnection, PbDatabase, PbFunction, PbIndex, PbSchema, PbSecret, PbSink, PbSource,
48    PbSubscription, PbTable, PbView,
49};
50use risingwave_pb::common::WorkerNode;
51use risingwave_pb::expr::{PbExprNode, expr_node};
52use risingwave_pb::meta::object::PbObjectInfo;
53use risingwave_pb::meta::subscribe_response::Info as NotificationInfo;
54use risingwave_pb::meta::{PbFragmentWorkerSlotMapping, PbObject, PbObjectGroup};
55use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn;
56use risingwave_pb::plan_common::{ColumnCatalog, DefaultColumnDesc};
57use risingwave_pb::stream_plan::{PbDispatchOutputMapping, PbDispatcher, PbDispatcherType};
58use risingwave_pb::user::grant_privilege::{PbActionWithGrantOption, PbObject as PbGrantObject};
59use risingwave_pb::user::{PbAction, PbGrantPrivilege, PbUserInfo};
60use risingwave_sqlparser::ast::Statement as SqlStatement;
61use risingwave_sqlparser::parser::Parser;
62use sea_orm::sea_query::{
63    Alias, CommonTableExpression, Expr, Query, QueryStatementBuilder, SelectStatement, UnionType,
64    WithClause,
65};
66use sea_orm::{
67    ColumnTrait, ConnectionTrait, DatabaseTransaction, DerivePartialModel, EntityTrait,
68    FromQueryResult, IntoActiveModel, JoinType, Order, PaginatorTrait, QueryFilter, QuerySelect,
69    RelationTrait, Set, Statement,
70};
71use thiserror_ext::AsReport;
72use tracing::warn;
73
74use crate::barrier::{SharedActorInfos, SharedFragmentInfo};
75use crate::controller::ObjectModel;
76use crate::controller::fragment::FragmentTypeMaskExt;
77use crate::controller::scale::resolve_streaming_job_definition;
78use crate::model::{FragmentDownstreamRelation, StreamContext};
79use crate::{MetaError, MetaResult};
80
81/// This function will construct a query using recursive cte to find all objects[(id, `obj_type`)] that are used by the given object.
82///
83/// # Examples
84///
85/// ```
86/// use risingwave_meta::controller::utils::construct_obj_dependency_query;
87/// use sea_orm::sea_query::*;
88/// use sea_orm::*;
89///
90/// let query = construct_obj_dependency_query(1.into());
91///
92/// assert_eq!(
93///     query.to_string(MysqlQueryBuilder),
94///     r#"WITH RECURSIVE `used_by_object_ids` (`used_by`) AS (SELECT `used_by` FROM `object_dependency` WHERE `object_dependency`.`oid` = 1 UNION ALL (SELECT `oid` FROM `object` WHERE `object`.`database_id` = 1 OR `object`.`schema_id` = 1) UNION ALL (SELECT `object_dependency`.`used_by` FROM `object_dependency` INNER JOIN `used_by_object_ids` ON `used_by_object_ids`.`used_by` = `oid`)) SELECT DISTINCT `oid`, `obj_type`, `schema_id`, `database_id` FROM `used_by_object_ids` INNER JOIN `object` ON `used_by_object_ids`.`used_by` = `oid` ORDER BY `oid` DESC"#
95/// );
96/// assert_eq!(
97///     query.to_string(PostgresQueryBuilder),
98///     r#"WITH RECURSIVE "used_by_object_ids" ("used_by") AS (SELECT "used_by" FROM "object_dependency" WHERE "object_dependency"."oid" = 1 UNION ALL (SELECT "oid" FROM "object" WHERE "object"."database_id" = 1 OR "object"."schema_id" = 1) UNION ALL (SELECT "object_dependency"."used_by" FROM "object_dependency" INNER JOIN "used_by_object_ids" ON "used_by_object_ids"."used_by" = "oid")) SELECT DISTINCT "oid", "obj_type", "schema_id", "database_id" FROM "used_by_object_ids" INNER JOIN "object" ON "used_by_object_ids"."used_by" = "oid" ORDER BY "oid" DESC"#
99/// );
100/// assert_eq!(
101///     query.to_string(SqliteQueryBuilder),
102///     r#"WITH RECURSIVE "used_by_object_ids" ("used_by") AS (SELECT "used_by" FROM "object_dependency" WHERE "object_dependency"."oid" = 1 UNION ALL SELECT "oid" FROM "object" WHERE "object"."database_id" = 1 OR "object"."schema_id" = 1 UNION ALL SELECT "object_dependency"."used_by" FROM "object_dependency" INNER JOIN "used_by_object_ids" ON "used_by_object_ids"."used_by" = "oid") SELECT DISTINCT "oid", "obj_type", "schema_id", "database_id" FROM "used_by_object_ids" INNER JOIN "object" ON "used_by_object_ids"."used_by" = "oid" ORDER BY "oid" DESC"#
103/// );
104/// ```
105pub fn construct_obj_dependency_query(obj_id: ObjectId) -> WithQuery {
106    let cte_alias = Alias::new("used_by_object_ids");
107    let cte_return_alias = Alias::new("used_by");
108
109    let mut base_query = SelectStatement::new()
110        .column(object_dependency::Column::UsedBy)
111        .from(ObjectDependency)
112        .and_where(object_dependency::Column::Oid.eq(obj_id))
113        .to_owned();
114
115    let belonged_obj_query = SelectStatement::new()
116        .column(object::Column::Oid)
117        .from(Object)
118        .and_where(
119            object::Column::DatabaseId
120                .eq(obj_id)
121                .or(object::Column::SchemaId.eq(obj_id)),
122        )
123        .to_owned();
124
125    let cte_referencing = Query::select()
126        .column((ObjectDependency, object_dependency::Column::UsedBy))
127        .from(ObjectDependency)
128        .inner_join(
129            cte_alias.clone(),
130            Expr::col((cte_alias.clone(), cte_return_alias.clone()))
131                .equals(object_dependency::Column::Oid),
132        )
133        .to_owned();
134
135    let mut common_table_expr = CommonTableExpression::new();
136    common_table_expr
137        .query(
138            base_query
139                .union(UnionType::All, belonged_obj_query)
140                .union(UnionType::All, cte_referencing)
141                .to_owned(),
142        )
143        .column(cte_return_alias.clone())
144        .table_name(cte_alias.clone());
145
146    SelectStatement::new()
147        .distinct()
148        .columns([
149            object::Column::Oid,
150            object::Column::ObjType,
151            object::Column::SchemaId,
152            object::Column::DatabaseId,
153        ])
154        .from(cte_alias.clone())
155        .inner_join(
156            Object,
157            Expr::col((cte_alias, cte_return_alias)).equals(object::Column::Oid),
158        )
159        .order_by(object::Column::Oid, Order::Desc)
160        .to_owned()
161        .with(
162            WithClause::new()
163                .recursive(true)
164                .cte(common_table_expr)
165                .to_owned(),
166        )
167}
168
169/// This function will construct a query using recursive cte to find if dependent objects are already relying on the target table.
170///
171/// # Examples
172///
173/// ```
174/// use risingwave_meta::controller::utils::construct_sink_cycle_check_query;
175/// use sea_orm::sea_query::*;
176/// use sea_orm::*;
177///
178/// let query = construct_sink_cycle_check_query(1.into(), vec![2.into(), 3.into()]);
179///
180/// assert_eq!(
181///     query.to_string(MysqlQueryBuilder),
182///     r#"WITH RECURSIVE `used_by_object_ids_with_sink` (`oid`, `used_by`) AS (SELECT `oid`, `used_by` FROM `object_dependency` WHERE `object_dependency`.`oid` = 1 UNION ALL (SELECT `obj_dependency_with_sink`.`oid`, `obj_dependency_with_sink`.`used_by` FROM (SELECT `oid`, `used_by` FROM `object_dependency` UNION ALL (SELECT `sink_id`, `target_table` FROM `sink` WHERE `sink`.`target_table` IS NOT NULL)) AS `obj_dependency_with_sink` INNER JOIN `used_by_object_ids_with_sink` ON `used_by_object_ids_with_sink`.`used_by` = `obj_dependency_with_sink`.`oid` WHERE `used_by_object_ids_with_sink`.`used_by` <> `used_by_object_ids_with_sink`.`oid`)) SELECT COUNT(`used_by_object_ids_with_sink`.`used_by`) FROM `used_by_object_ids_with_sink` WHERE `used_by_object_ids_with_sink`.`used_by` IN (2, 3)"#
183/// );
184/// assert_eq!(
185///     query.to_string(PostgresQueryBuilder),
186///     r#"WITH RECURSIVE "used_by_object_ids_with_sink" ("oid", "used_by") AS (SELECT "oid", "used_by" FROM "object_dependency" WHERE "object_dependency"."oid" = 1 UNION ALL (SELECT "obj_dependency_with_sink"."oid", "obj_dependency_with_sink"."used_by" FROM (SELECT "oid", "used_by" FROM "object_dependency" UNION ALL (SELECT "sink_id", "target_table" FROM "sink" WHERE "sink"."target_table" IS NOT NULL)) AS "obj_dependency_with_sink" INNER JOIN "used_by_object_ids_with_sink" ON "used_by_object_ids_with_sink"."used_by" = "obj_dependency_with_sink"."oid" WHERE "used_by_object_ids_with_sink"."used_by" <> "used_by_object_ids_with_sink"."oid")) SELECT COUNT("used_by_object_ids_with_sink"."used_by") FROM "used_by_object_ids_with_sink" WHERE "used_by_object_ids_with_sink"."used_by" IN (2, 3)"#
187/// );
188/// assert_eq!(
189///     query.to_string(SqliteQueryBuilder),
190///     r#"WITH RECURSIVE "used_by_object_ids_with_sink" ("oid", "used_by") AS (SELECT "oid", "used_by" FROM "object_dependency" WHERE "object_dependency"."oid" = 1 UNION ALL SELECT "obj_dependency_with_sink"."oid", "obj_dependency_with_sink"."used_by" FROM (SELECT "oid", "used_by" FROM "object_dependency" UNION ALL SELECT "sink_id", "target_table" FROM "sink" WHERE "sink"."target_table" IS NOT NULL) AS "obj_dependency_with_sink" INNER JOIN "used_by_object_ids_with_sink" ON "used_by_object_ids_with_sink"."used_by" = "obj_dependency_with_sink"."oid" WHERE "used_by_object_ids_with_sink"."used_by" <> "used_by_object_ids_with_sink"."oid") SELECT COUNT("used_by_object_ids_with_sink"."used_by") FROM "used_by_object_ids_with_sink" WHERE "used_by_object_ids_with_sink"."used_by" IN (2, 3)"#
191/// );
192/// ```
193pub fn construct_sink_cycle_check_query(
194    target_table: ObjectId,
195    dependent_objects: Vec<ObjectId>,
196) -> WithQuery {
197    let cte_alias = Alias::new("used_by_object_ids_with_sink");
198    let depend_alias = Alias::new("obj_dependency_with_sink");
199
200    let mut base_query = SelectStatement::new()
201        .columns([
202            object_dependency::Column::Oid,
203            object_dependency::Column::UsedBy,
204        ])
205        .from(ObjectDependency)
206        .and_where(object_dependency::Column::Oid.eq(target_table))
207        .to_owned();
208
209    let query_sink_deps = SelectStatement::new()
210        .columns([sink::Column::SinkId, sink::Column::TargetTable])
211        .from(Sink)
212        .and_where(sink::Column::TargetTable.is_not_null())
213        .to_owned();
214
215    let cte_referencing = Query::select()
216        .column((depend_alias.clone(), object_dependency::Column::Oid))
217        .column((depend_alias.clone(), object_dependency::Column::UsedBy))
218        .from_subquery(
219            SelectStatement::new()
220                .columns([
221                    object_dependency::Column::Oid,
222                    object_dependency::Column::UsedBy,
223                ])
224                .from(ObjectDependency)
225                .union(UnionType::All, query_sink_deps)
226                .to_owned(),
227            depend_alias.clone(),
228        )
229        .inner_join(
230            cte_alias.clone(),
231            Expr::col((cte_alias.clone(), object_dependency::Column::UsedBy))
232                .eq(Expr::col((depend_alias, object_dependency::Column::Oid))),
233        )
234        .and_where(
235            Expr::col((cte_alias.clone(), object_dependency::Column::UsedBy)).ne(Expr::col((
236                cte_alias.clone(),
237                object_dependency::Column::Oid,
238            ))),
239        )
240        .to_owned();
241
242    let mut common_table_expr = CommonTableExpression::new();
243    common_table_expr
244        .query(base_query.union(UnionType::All, cte_referencing).to_owned())
245        .columns([
246            object_dependency::Column::Oid,
247            object_dependency::Column::UsedBy,
248        ])
249        .table_name(cte_alias.clone());
250
251    SelectStatement::new()
252        .expr(Expr::col((cte_alias.clone(), object_dependency::Column::UsedBy)).count())
253        .from(cte_alias.clone())
254        .and_where(
255            Expr::col((cte_alias, object_dependency::Column::UsedBy)).is_in(dependent_objects),
256        )
257        .to_owned()
258        .with(
259            WithClause::new()
260                .recursive(true)
261                .cte(common_table_expr)
262                .to_owned(),
263        )
264}
265
266#[derive(Clone, DerivePartialModel, FromQueryResult, Debug)]
267#[sea_orm(entity = "Object")]
268pub struct PartialObject {
269    pub oid: ObjectId,
270    pub obj_type: ObjectType,
271    pub schema_id: Option<SchemaId>,
272    pub database_id: Option<DatabaseId>,
273}
274
275#[derive(Clone, DerivePartialModel, FromQueryResult)]
276#[sea_orm(entity = "Fragment")]
277pub struct PartialFragmentStateTables {
278    pub fragment_id: FragmentId,
279    pub job_id: ObjectId,
280    pub state_table_ids: TableIdArray,
281}
282
283#[derive(Clone, Eq, PartialEq, Debug)]
284pub struct PartialActorLocation {
285    pub actor_id: ActorId,
286    pub fragment_id: FragmentId,
287    pub worker_id: WorkerId,
288}
289
290#[derive(FromQueryResult, Debug, Eq, PartialEq, Clone)]
291pub struct FragmentDesc {
292    pub fragment_id: FragmentId,
293    pub job_id: JobId,
294    pub fragment_type_mask: i32,
295    pub distribution_type: DistributionType,
296    pub state_table_ids: TableIdArray,
297    pub parallelism: i64,
298    pub vnode_count: i32,
299    pub stream_node: StreamNode,
300    pub parallelism_policy: String,
301}
302
303/// List all objects that are using the given one in a cascade way. It runs a recursive CTE to find all the dependencies.
304pub async fn get_referring_objects_cascade<C>(
305    obj_id: ObjectId,
306    db: &C,
307) -> MetaResult<Vec<PartialObject>>
308where
309    C: ConnectionTrait,
310{
311    let query = construct_obj_dependency_query(obj_id);
312    let (sql, values) = query.build_any(&*db.get_database_backend().get_query_builder());
313    let objects = PartialObject::find_by_statement(Statement::from_sql_and_values(
314        db.get_database_backend(),
315        sql,
316        values,
317    ))
318    .all(db)
319    .await?;
320    Ok(objects)
321}
322
323/// Check if create a sink with given dependent objects into the target table will cause a cycle, return true if it will.
324pub async fn check_sink_into_table_cycle<C>(
325    target_table: ObjectId,
326    dependent_objs: Vec<ObjectId>,
327    db: &C,
328) -> MetaResult<bool>
329where
330    C: ConnectionTrait,
331{
332    if dependent_objs.is_empty() {
333        return Ok(false);
334    }
335
336    // special check for self referencing
337    if dependent_objs.contains(&target_table) {
338        return Ok(true);
339    }
340
341    let query = construct_sink_cycle_check_query(target_table, dependent_objs);
342    let (sql, values) = query.build_any(&*db.get_database_backend().get_query_builder());
343
344    let res = db
345        .query_one(Statement::from_sql_and_values(
346            db.get_database_backend(),
347            sql,
348            values,
349        ))
350        .await?
351        .unwrap();
352
353    let cnt: i64 = res.try_get_by(0)?;
354
355    Ok(cnt != 0)
356}
357
358/// `ensure_object_id` ensures the existence of target object in the cluster.
359pub async fn ensure_object_id<C>(
360    object_type: ObjectType,
361    obj_id: impl Into<ObjectId>,
362    db: &C,
363) -> MetaResult<()>
364where
365    C: ConnectionTrait,
366{
367    let obj_id = obj_id.into();
368    let count = Object::find_by_id(obj_id).count(db).await?;
369    if count == 0 {
370        return Err(MetaError::catalog_id_not_found(
371            object_type.as_str(),
372            obj_id,
373        ));
374    }
375    Ok(())
376}
377
378pub async fn ensure_job_not_canceled<C>(job_id: JobId, db: &C) -> MetaResult<()>
379where
380    C: ConnectionTrait,
381{
382    let count = Object::find_by_id(job_id).count(db).await?;
383    if count == 0 {
384        return Err(MetaError::cancelled(format!(
385            "job {} might be cancelled manually or by recovery",
386            job_id
387        )));
388    }
389    Ok(())
390}
391
392/// `ensure_user_id` ensures the existence of target user in the cluster.
393pub async fn ensure_user_id<C>(user_id: UserId, db: &C) -> MetaResult<()>
394where
395    C: ConnectionTrait,
396{
397    let count = User::find_by_id(user_id).count(db).await?;
398    if count == 0 {
399        return Err(anyhow!("user {} was concurrently dropped", user_id).into());
400    }
401    Ok(())
402}
403
404/// `check_database_name_duplicate` checks whether the database name is already used in the cluster.
405pub async fn check_database_name_duplicate<C>(name: &str, db: &C) -> MetaResult<()>
406where
407    C: ConnectionTrait,
408{
409    let count = Database::find()
410        .filter(database::Column::Name.eq(name))
411        .count(db)
412        .await?;
413    if count > 0 {
414        assert_eq!(count, 1);
415        return Err(MetaError::catalog_duplicated("database", name));
416    }
417    Ok(())
418}
419
420/// `check_function_signature_duplicate` checks whether the function name and its signature is already used in the target namespace.
421pub async fn check_function_signature_duplicate<C>(
422    pb_function: &PbFunction,
423    db: &C,
424) -> MetaResult<()>
425where
426    C: ConnectionTrait,
427{
428    let count = Function::find()
429        .inner_join(Object)
430        .filter(
431            object::Column::DatabaseId
432                .eq(pb_function.database_id)
433                .and(object::Column::SchemaId.eq(pb_function.schema_id))
434                .and(function::Column::Name.eq(&pb_function.name))
435                .and(
436                    function::Column::ArgTypes
437                        .eq(DataTypeArray::from(pb_function.arg_types.clone())),
438                ),
439        )
440        .count(db)
441        .await?;
442    if count > 0 {
443        assert_eq!(count, 1);
444        return Err(MetaError::catalog_duplicated("function", &pb_function.name));
445    }
446    Ok(())
447}
448
449/// `check_connection_name_duplicate` checks whether the connection name is already used in the target namespace.
450pub async fn check_connection_name_duplicate<C>(
451    pb_connection: &PbConnection,
452    db: &C,
453) -> MetaResult<()>
454where
455    C: ConnectionTrait,
456{
457    let count = Connection::find()
458        .inner_join(Object)
459        .filter(
460            object::Column::DatabaseId
461                .eq(pb_connection.database_id)
462                .and(object::Column::SchemaId.eq(pb_connection.schema_id))
463                .and(connection::Column::Name.eq(&pb_connection.name)),
464        )
465        .count(db)
466        .await?;
467    if count > 0 {
468        assert_eq!(count, 1);
469        return Err(MetaError::catalog_duplicated(
470            "connection",
471            &pb_connection.name,
472        ));
473    }
474    Ok(())
475}
476
477pub async fn check_secret_name_duplicate<C>(pb_secret: &PbSecret, db: &C) -> MetaResult<()>
478where
479    C: ConnectionTrait,
480{
481    let count = Secret::find()
482        .inner_join(Object)
483        .filter(
484            object::Column::DatabaseId
485                .eq(pb_secret.database_id)
486                .and(object::Column::SchemaId.eq(pb_secret.schema_id))
487                .and(secret::Column::Name.eq(&pb_secret.name)),
488        )
489        .count(db)
490        .await?;
491    if count > 0 {
492        assert_eq!(count, 1);
493        return Err(MetaError::catalog_duplicated("secret", &pb_secret.name));
494    }
495    Ok(())
496}
497
498pub async fn check_subscription_name_duplicate<C>(
499    pb_subscription: &PbSubscription,
500    db: &C,
501) -> MetaResult<()>
502where
503    C: ConnectionTrait,
504{
505    let count = Subscription::find()
506        .inner_join(Object)
507        .filter(
508            object::Column::DatabaseId
509                .eq(pb_subscription.database_id)
510                .and(object::Column::SchemaId.eq(pb_subscription.schema_id))
511                .and(subscription::Column::Name.eq(&pb_subscription.name)),
512        )
513        .count(db)
514        .await?;
515    if count > 0 {
516        assert_eq!(count, 1);
517        return Err(MetaError::catalog_duplicated(
518            "subscription",
519            &pb_subscription.name,
520        ));
521    }
522    Ok(())
523}
524
525/// `check_user_name_duplicate` checks whether the user is already existed in the cluster.
526pub async fn check_user_name_duplicate<C>(name: &str, db: &C) -> MetaResult<()>
527where
528    C: ConnectionTrait,
529{
530    let count = User::find()
531        .filter(user::Column::Name.eq(name))
532        .count(db)
533        .await?;
534    if count > 0 {
535        assert_eq!(count, 1);
536        return Err(MetaError::catalog_duplicated("user", name));
537    }
538    Ok(())
539}
540
541/// `check_relation_name_duplicate` checks whether the relation name is already used in the target namespace.
542pub async fn check_relation_name_duplicate<C>(
543    name: &str,
544    database_id: DatabaseId,
545    schema_id: SchemaId,
546    db: &C,
547) -> MetaResult<()>
548where
549    C: ConnectionTrait,
550{
551    macro_rules! check_duplicated {
552        ($obj_type:expr, $entity:ident, $table:ident) => {
553            let object_id = Object::find()
554                .select_only()
555                .column(object::Column::Oid)
556                .inner_join($entity)
557                .filter(
558                    object::Column::DatabaseId
559                        .eq(Some(database_id))
560                        .and(object::Column::SchemaId.eq(Some(schema_id)))
561                        .and($table::Column::Name.eq(name)),
562                )
563                .into_tuple::<ObjectId>()
564                .one(db)
565                .await?;
566            if let Some(oid) = object_id {
567                let check_creation = if $obj_type == ObjectType::View {
568                    false
569                } else if $obj_type == ObjectType::Source {
570                    let source_info = Source::find_by_id(oid.as_source_id())
571                        .select_only()
572                        .column(source::Column::SourceInfo)
573                        .into_tuple::<Option<StreamSourceInfo>>()
574                        .one(db)
575                        .await?
576                        .unwrap();
577                    source_info.map_or(false, |info| info.to_protobuf().is_shared())
578                } else {
579                    true
580                };
581                let job_id = oid.as_job_id();
582                return if check_creation
583                    && !matches!(
584                        StreamingJob::find_by_id(job_id)
585                            .select_only()
586                            .column(streaming_job::Column::JobStatus)
587                            .into_tuple::<JobStatus>()
588                            .one(db)
589                            .await?,
590                        Some(JobStatus::Created)
591                    ) {
592                    Err(MetaError::catalog_under_creation(
593                        $obj_type.as_str(),
594                        name,
595                        job_id,
596                    ))
597                } else {
598                    Err(MetaError::catalog_duplicated($obj_type.as_str(), name))
599                };
600            }
601        };
602    }
603    check_duplicated!(ObjectType::Table, Table, table);
604    check_duplicated!(ObjectType::Source, Source, source);
605    check_duplicated!(ObjectType::Sink, Sink, sink);
606    check_duplicated!(ObjectType::Index, Index, index);
607    check_duplicated!(ObjectType::View, View, view);
608
609    Ok(())
610}
611
612/// `check_schema_name_duplicate` checks whether the schema name is already used in the target database.
613pub async fn check_schema_name_duplicate<C>(
614    name: &str,
615    database_id: DatabaseId,
616    db: &C,
617) -> MetaResult<()>
618where
619    C: ConnectionTrait,
620{
621    let count = Object::find()
622        .inner_join(Schema)
623        .filter(
624            object::Column::ObjType
625                .eq(ObjectType::Schema)
626                .and(object::Column::DatabaseId.eq(Some(database_id)))
627                .and(schema::Column::Name.eq(name)),
628        )
629        .count(db)
630        .await?;
631    if count != 0 {
632        return Err(MetaError::catalog_duplicated("schema", name));
633    }
634
635    Ok(())
636}
637
638/// `check_object_refer_for_drop` checks whether the object is used by other objects except indexes.
639/// It returns an error that contains the details of the referring objects if it is used by others.
640pub async fn check_object_refer_for_drop<C>(
641    object_type: ObjectType,
642    object_id: ObjectId,
643    db: &C,
644) -> MetaResult<()>
645where
646    C: ConnectionTrait,
647{
648    // Ignore indexes.
649    let count = if object_type == ObjectType::Table {
650        ObjectDependency::find()
651            .join(
652                JoinType::InnerJoin,
653                object_dependency::Relation::Object1.def(),
654            )
655            .filter(
656                object_dependency::Column::Oid
657                    .eq(object_id)
658                    .and(object::Column::ObjType.ne(ObjectType::Index)),
659            )
660            .count(db)
661            .await?
662    } else {
663        ObjectDependency::find()
664            .filter(object_dependency::Column::Oid.eq(object_id))
665            .count(db)
666            .await?
667    };
668    if count != 0 {
669        // find the name of all objects that are using the given one.
670        let referring_objects = get_referring_objects(object_id, db).await?;
671        let referring_objs_map = referring_objects
672            .into_iter()
673            .filter(|o| o.obj_type != ObjectType::Index)
674            .into_group_map_by(|o| o.obj_type);
675        let mut details = vec![];
676        for (obj_type, objs) in referring_objs_map {
677            match obj_type {
678                ObjectType::Table => {
679                    let tables: Vec<(String, String)> = Object::find()
680                        .join(JoinType::InnerJoin, object::Relation::Table.def())
681                        .join(JoinType::InnerJoin, object::Relation::Database2.def())
682                        .join(JoinType::InnerJoin, object::Relation::Schema2.def())
683                        .select_only()
684                        .column(schema::Column::Name)
685                        .column(table::Column::Name)
686                        .filter(object::Column::Oid.is_in(objs.iter().map(|o| o.oid)))
687                        .into_tuple()
688                        .all(db)
689                        .await?;
690                    details.extend(tables.into_iter().map(|(schema_name, table_name)| {
691                        format!(
692                            "materialized view {}.{} depends on it",
693                            schema_name, table_name
694                        )
695                    }));
696                }
697                ObjectType::Sink => {
698                    let sinks: Vec<(String, String)> = Object::find()
699                        .join(JoinType::InnerJoin, object::Relation::Sink.def())
700                        .join(JoinType::InnerJoin, object::Relation::Database2.def())
701                        .join(JoinType::InnerJoin, object::Relation::Schema2.def())
702                        .select_only()
703                        .column(schema::Column::Name)
704                        .column(sink::Column::Name)
705                        .filter(object::Column::Oid.is_in(objs.iter().map(|o| o.oid)))
706                        .into_tuple()
707                        .all(db)
708                        .await?;
709                    if object_type == ObjectType::Table {
710                        let engine = Table::find_by_id(object_id.as_table_id())
711                            .select_only()
712                            .column(table::Column::Engine)
713                            .into_tuple::<table::Engine>()
714                            .one(db)
715                            .await?;
716                        if engine == Some(table::Engine::Iceberg) && sinks.len() == 1 {
717                            continue;
718                        }
719                    }
720                    details.extend(sinks.into_iter().map(|(schema_name, sink_name)| {
721                        format!("sink {}.{} depends on it", schema_name, sink_name)
722                    }));
723                }
724                ObjectType::View => {
725                    let views: Vec<(String, String)> = Object::find()
726                        .join(JoinType::InnerJoin, object::Relation::View.def())
727                        .join(JoinType::InnerJoin, object::Relation::Database2.def())
728                        .join(JoinType::InnerJoin, object::Relation::Schema2.def())
729                        .select_only()
730                        .column(schema::Column::Name)
731                        .column(view::Column::Name)
732                        .filter(object::Column::Oid.is_in(objs.iter().map(|o| o.oid)))
733                        .into_tuple()
734                        .all(db)
735                        .await?;
736                    details.extend(views.into_iter().map(|(schema_name, view_name)| {
737                        format!("view {}.{} depends on it", schema_name, view_name)
738                    }));
739                }
740                ObjectType::Subscription => {
741                    let subscriptions: Vec<(String, String)> = Object::find()
742                        .join(JoinType::InnerJoin, object::Relation::Subscription.def())
743                        .join(JoinType::InnerJoin, object::Relation::Database2.def())
744                        .join(JoinType::InnerJoin, object::Relation::Schema2.def())
745                        .select_only()
746                        .column(schema::Column::Name)
747                        .column(subscription::Column::Name)
748                        .filter(object::Column::Oid.is_in(objs.iter().map(|o| o.oid)))
749                        .into_tuple()
750                        .all(db)
751                        .await?;
752                    details.extend(subscriptions.into_iter().map(
753                        |(schema_name, subscription_name)| {
754                            format!(
755                                "subscription {}.{} depends on it",
756                                schema_name, subscription_name
757                            )
758                        },
759                    ));
760                }
761                ObjectType::Source => {
762                    let sources: Vec<(String, String)> = Object::find()
763                        .join(JoinType::InnerJoin, object::Relation::Source.def())
764                        .join(JoinType::InnerJoin, object::Relation::Database2.def())
765                        .join(JoinType::InnerJoin, object::Relation::Schema2.def())
766                        .select_only()
767                        .column(schema::Column::Name)
768                        .column(source::Column::Name)
769                        .filter(object::Column::Oid.is_in(objs.iter().map(|o| o.oid)))
770                        .into_tuple()
771                        .all(db)
772                        .await?;
773                    details.extend(sources.into_iter().map(|(schema_name, view_name)| {
774                        format!("source {}.{} depends on it", schema_name, view_name)
775                    }));
776                }
777                ObjectType::Connection => {
778                    let connections: Vec<(String, String)> = Object::find()
779                        .join(JoinType::InnerJoin, object::Relation::Connection.def())
780                        .join(JoinType::InnerJoin, object::Relation::Database2.def())
781                        .join(JoinType::InnerJoin, object::Relation::Schema2.def())
782                        .select_only()
783                        .column(schema::Column::Name)
784                        .column(connection::Column::Name)
785                        .filter(object::Column::Oid.is_in(objs.iter().map(|o| o.oid)))
786                        .into_tuple()
787                        .all(db)
788                        .await?;
789                    details.extend(connections.into_iter().map(|(schema_name, view_name)| {
790                        format!("connection {}.{} depends on it", schema_name, view_name)
791                    }));
792                }
793                // only the table, source, sink, subscription, view, connection and index will depend on other objects.
794                _ => bail!("unexpected referring object type: {}", obj_type.as_str()),
795            }
796        }
797        if details.is_empty() {
798            return Ok(());
799        }
800
801        return Err(MetaError::permission_denied(format!(
802            "{} used by {} other objects. \nDETAIL: {}\n\
803            {}",
804            object_type.as_str(),
805            details.len(),
806            details.join("\n"),
807            match object_type {
808                ObjectType::Function | ObjectType::Connection | ObjectType::Secret =>
809                    "HINT: DROP the dependent objects first.",
810                ObjectType::Database | ObjectType::Schema => unreachable!(),
811                _ => "HINT:  Use DROP ... CASCADE to drop the dependent objects too.",
812            }
813        )));
814    }
815    Ok(())
816}
817
818/// List all objects that are using the given one.
819pub async fn get_referring_objects<C>(object_id: ObjectId, db: &C) -> MetaResult<Vec<PartialObject>>
820where
821    C: ConnectionTrait,
822{
823    let objs = ObjectDependency::find()
824        .filter(object_dependency::Column::Oid.eq(object_id))
825        .join(
826            JoinType::InnerJoin,
827            object_dependency::Relation::Object1.def(),
828        )
829        .into_partial_model()
830        .all(db)
831        .await?;
832
833    Ok(objs)
834}
835
836/// `ensure_schema_empty` ensures that the schema is empty, used by `DROP SCHEMA`.
837pub async fn ensure_schema_empty<C>(schema_id: SchemaId, db: &C) -> MetaResult<()>
838where
839    C: ConnectionTrait,
840{
841    let count = Object::find()
842        .filter(object::Column::SchemaId.eq(Some(schema_id)))
843        .count(db)
844        .await?;
845    if count != 0 {
846        return Err(MetaError::permission_denied("schema is not empty"));
847    }
848
849    Ok(())
850}
851
852/// `list_user_info_by_ids` lists all users' info by their ids.
853pub async fn list_user_info_by_ids<C>(
854    user_ids: impl IntoIterator<Item = UserId>,
855    db: &C,
856) -> MetaResult<Vec<PbUserInfo>>
857where
858    C: ConnectionTrait,
859{
860    let mut user_infos = vec![];
861    for user_id in user_ids {
862        let user = User::find_by_id(user_id)
863            .one(db)
864            .await?
865            .ok_or_else(|| MetaError::catalog_id_not_found("user", user_id))?;
866        let mut user_info: PbUserInfo = user.into();
867        user_info.grant_privileges = get_user_privilege(user_id, db).await?;
868        user_infos.push(user_info);
869    }
870    Ok(user_infos)
871}
872
873/// `get_object_owner` returns the owner of the given object.
874pub async fn get_object_owner<C>(object_id: ObjectId, db: &C) -> MetaResult<UserId>
875where
876    C: ConnectionTrait,
877{
878    let obj_owner: UserId = Object::find_by_id(object_id)
879        .select_only()
880        .column(object::Column::OwnerId)
881        .into_tuple()
882        .one(db)
883        .await?
884        .ok_or_else(|| MetaError::catalog_id_not_found("object", object_id))?;
885    Ok(obj_owner)
886}
887
888/// `construct_privilege_dependency_query` constructs a query to find all privileges that are dependent on the given one.
889///
890/// # Examples
891///
892/// ```
893/// use risingwave_meta::controller::utils::construct_privilege_dependency_query;
894/// use sea_orm::sea_query::*;
895/// use sea_orm::*;
896///
897/// let query = construct_privilege_dependency_query(vec![1, 2, 3]);
898///
899/// assert_eq!(
900///    query.to_string(MysqlQueryBuilder),
901///   r#"WITH RECURSIVE `granted_privilege_ids` (`id`, `user_id`) AS (SELECT `id`, `user_id` FROM `user_privilege` WHERE `user_privilege`.`id` IN (1, 2, 3) UNION ALL (SELECT `user_privilege`.`id`, `user_privilege`.`user_id` FROM `user_privilege` INNER JOIN `granted_privilege_ids` ON `granted_privilege_ids`.`id` = `dependent_id`)) SELECT `id`, `user_id` FROM `granted_privilege_ids`"#
902/// );
903/// assert_eq!(
904///   query.to_string(PostgresQueryBuilder),
905///  r#"WITH RECURSIVE "granted_privilege_ids" ("id", "user_id") AS (SELECT "id", "user_id" FROM "user_privilege" WHERE "user_privilege"."id" IN (1, 2, 3) UNION ALL (SELECT "user_privilege"."id", "user_privilege"."user_id" FROM "user_privilege" INNER JOIN "granted_privilege_ids" ON "granted_privilege_ids"."id" = "dependent_id")) SELECT "id", "user_id" FROM "granted_privilege_ids""#
906/// );
907/// assert_eq!(
908///  query.to_string(SqliteQueryBuilder),
909///  r#"WITH RECURSIVE "granted_privilege_ids" ("id", "user_id") AS (SELECT "id", "user_id" FROM "user_privilege" WHERE "user_privilege"."id" IN (1, 2, 3) UNION ALL SELECT "user_privilege"."id", "user_privilege"."user_id" FROM "user_privilege" INNER JOIN "granted_privilege_ids" ON "granted_privilege_ids"."id" = "dependent_id") SELECT "id", "user_id" FROM "granted_privilege_ids""#
910/// );
911/// ```
912pub fn construct_privilege_dependency_query(ids: Vec<PrivilegeId>) -> WithQuery {
913    let cte_alias = Alias::new("granted_privilege_ids");
914    let cte_return_privilege_alias = Alias::new("id");
915    let cte_return_user_alias = Alias::new("user_id");
916
917    let mut base_query = SelectStatement::new()
918        .columns([user_privilege::Column::Id, user_privilege::Column::UserId])
919        .from(UserPrivilege)
920        .and_where(user_privilege::Column::Id.is_in(ids))
921        .to_owned();
922
923    let cte_referencing = Query::select()
924        .columns([
925            (UserPrivilege, user_privilege::Column::Id),
926            (UserPrivilege, user_privilege::Column::UserId),
927        ])
928        .from(UserPrivilege)
929        .inner_join(
930            cte_alias.clone(),
931            Expr::col((cte_alias.clone(), cte_return_privilege_alias.clone()))
932                .equals(user_privilege::Column::DependentId),
933        )
934        .to_owned();
935
936    let mut common_table_expr = CommonTableExpression::new();
937    common_table_expr
938        .query(base_query.union(UnionType::All, cte_referencing).to_owned())
939        .columns([
940            cte_return_privilege_alias.clone(),
941            cte_return_user_alias.clone(),
942        ])
943        .table_name(cte_alias.clone());
944
945    SelectStatement::new()
946        .columns([cte_return_privilege_alias, cte_return_user_alias])
947        .from(cte_alias)
948        .to_owned()
949        .with(
950            WithClause::new()
951                .recursive(true)
952                .cte(common_table_expr)
953                .to_owned(),
954        )
955}
956
957pub async fn get_internal_tables_by_id<C>(job_id: JobId, db: &C) -> MetaResult<Vec<TableId>>
958where
959    C: ConnectionTrait,
960{
961    let table_ids: Vec<TableId> = Table::find()
962        .select_only()
963        .column(table::Column::TableId)
964        .filter(
965            table::Column::TableType
966                .eq(TableType::Internal)
967                .and(table::Column::BelongsToJobId.eq(job_id)),
968        )
969        .into_tuple()
970        .all(db)
971        .await?;
972    Ok(table_ids)
973}
974
975pub async fn get_index_state_tables_by_table_id<C>(
976    table_id: TableId,
977    db: &C,
978) -> MetaResult<Vec<TableId>>
979where
980    C: ConnectionTrait,
981{
982    let mut index_table_ids: Vec<TableId> = Index::find()
983        .select_only()
984        .column(index::Column::IndexTableId)
985        .filter(index::Column::PrimaryTableId.eq(table_id))
986        .into_tuple()
987        .all(db)
988        .await?;
989
990    if !index_table_ids.is_empty() {
991        let internal_table_ids: Vec<TableId> = Table::find()
992            .select_only()
993            .column(table::Column::TableId)
994            .filter(
995                table::Column::TableType
996                    .eq(TableType::Internal)
997                    .and(table::Column::BelongsToJobId.is_in(index_table_ids.clone())),
998            )
999            .into_tuple()
1000            .all(db)
1001            .await?;
1002
1003        index_table_ids.extend(internal_table_ids.into_iter());
1004    }
1005
1006    Ok(index_table_ids)
1007}
1008
1009/// `get_iceberg_related_object_ids` returns the related object ids of the iceberg source, sink and internal tables.
1010pub async fn get_iceberg_related_object_ids<C>(
1011    object_id: ObjectId,
1012    db: &C,
1013) -> MetaResult<Vec<ObjectId>>
1014where
1015    C: ConnectionTrait,
1016{
1017    let object = Object::find_by_id(object_id)
1018        .one(db)
1019        .await?
1020        .ok_or_else(|| MetaError::catalog_id_not_found("object", object_id))?;
1021    if object.obj_type != ObjectType::Table {
1022        return Ok(vec![]);
1023    }
1024
1025    let table = Table::find_by_id(object_id.as_table_id())
1026        .one(db)
1027        .await?
1028        .ok_or_else(|| MetaError::catalog_id_not_found("table", object_id))?;
1029    if !matches!(table.engine, Some(table::Engine::Iceberg)) {
1030        return Ok(vec![]);
1031    }
1032
1033    let database_id = object.database_id.unwrap();
1034    let schema_id = object.schema_id.unwrap();
1035
1036    let mut related_objects = vec![];
1037
1038    let iceberg_sink_name = format!("{}{}", ICEBERG_SINK_PREFIX, table.name);
1039    let iceberg_sink_id = Sink::find()
1040        .inner_join(Object)
1041        .select_only()
1042        .column(sink::Column::SinkId)
1043        .filter(
1044            object::Column::DatabaseId
1045                .eq(database_id)
1046                .and(object::Column::SchemaId.eq(schema_id))
1047                .and(sink::Column::Name.eq(&iceberg_sink_name)),
1048        )
1049        .into_tuple::<SinkId>()
1050        .one(db)
1051        .await?;
1052    if let Some(sink_id) = iceberg_sink_id {
1053        related_objects.push(sink_id.as_object_id());
1054        let sink_internal_tables = get_internal_tables_by_id(sink_id.as_job_id(), db).await?;
1055        related_objects.extend(
1056            sink_internal_tables
1057                .into_iter()
1058                .map(|tid| tid.as_object_id()),
1059        );
1060    } else {
1061        warn!(
1062            "iceberg table {} missing sink {}",
1063            table.name, iceberg_sink_name
1064        );
1065    }
1066
1067    let iceberg_source_name = format!("{}{}", ICEBERG_SOURCE_PREFIX, table.name);
1068    let iceberg_source_id = Source::find()
1069        .inner_join(Object)
1070        .select_only()
1071        .column(source::Column::SourceId)
1072        .filter(
1073            object::Column::DatabaseId
1074                .eq(database_id)
1075                .and(object::Column::SchemaId.eq(schema_id))
1076                .and(source::Column::Name.eq(&iceberg_source_name)),
1077        )
1078        .into_tuple::<SourceId>()
1079        .one(db)
1080        .await?;
1081    if let Some(source_id) = iceberg_source_id {
1082        related_objects.push(source_id.as_object_id());
1083    } else {
1084        warn!(
1085            "iceberg table {} missing source {}",
1086            table.name, iceberg_source_name
1087        );
1088    }
1089
1090    Ok(related_objects)
1091}
1092
1093/// Load streaming jobs by job ids.
1094pub(crate) async fn load_streaming_jobs_by_ids<C>(
1095    txn: &C,
1096    job_ids: impl IntoIterator<Item = JobId>,
1097) -> MetaResult<HashMap<JobId, streaming_job::Model>>
1098where
1099    C: ConnectionTrait,
1100{
1101    let job_ids: HashSet<JobId> = job_ids.into_iter().collect();
1102    if job_ids.is_empty() {
1103        return Ok(HashMap::new());
1104    }
1105    let jobs = streaming_job::Entity::find()
1106        .filter(streaming_job::Column::JobId.is_in(job_ids.clone()))
1107        .all(txn)
1108        .await?;
1109    Ok(jobs.into_iter().map(|job| (job.job_id, job)).collect())
1110}
1111
1112#[derive(Clone, DerivePartialModel, FromQueryResult)]
1113#[sea_orm(entity = "UserPrivilege")]
1114pub struct PartialUserPrivilege {
1115    pub id: PrivilegeId,
1116    pub user_id: UserId,
1117}
1118
1119pub async fn get_referring_privileges_cascade<C>(
1120    ids: Vec<PrivilegeId>,
1121    db: &C,
1122) -> MetaResult<Vec<PartialUserPrivilege>>
1123where
1124    C: ConnectionTrait,
1125{
1126    let query = construct_privilege_dependency_query(ids);
1127    let (sql, values) = query.build_any(&*db.get_database_backend().get_query_builder());
1128    let privileges = PartialUserPrivilege::find_by_statement(Statement::from_sql_and_values(
1129        db.get_database_backend(),
1130        sql,
1131        values,
1132    ))
1133    .all(db)
1134    .await?;
1135
1136    Ok(privileges)
1137}
1138
1139/// `ensure_privileges_not_referred` ensures that the privileges are not granted to any other users.
1140pub async fn ensure_privileges_not_referred<C>(ids: Vec<PrivilegeId>, db: &C) -> MetaResult<()>
1141where
1142    C: ConnectionTrait,
1143{
1144    let count = UserPrivilege::find()
1145        .filter(user_privilege::Column::DependentId.is_in(ids))
1146        .count(db)
1147        .await?;
1148    if count != 0 {
1149        return Err(MetaError::permission_denied(format!(
1150            "privileges granted to {} other ones.",
1151            count
1152        )));
1153    }
1154    Ok(())
1155}
1156
1157/// `get_user_privilege` returns the privileges of the given user.
1158pub async fn get_user_privilege<C>(user_id: UserId, db: &C) -> MetaResult<Vec<PbGrantPrivilege>>
1159where
1160    C: ConnectionTrait,
1161{
1162    let user_privileges = UserPrivilege::find()
1163        .find_also_related(Object)
1164        .filter(user_privilege::Column::UserId.eq(user_id))
1165        .all(db)
1166        .await?;
1167    Ok(user_privileges
1168        .into_iter()
1169        .map(|(privilege, object)| {
1170            let object = object.unwrap();
1171            let oid = object.oid.as_raw_id();
1172            let obj = match object.obj_type {
1173                ObjectType::Database => PbGrantObject::DatabaseId(oid),
1174                ObjectType::Schema => PbGrantObject::SchemaId(oid),
1175                ObjectType::Table | ObjectType::Index => PbGrantObject::TableId(oid),
1176                ObjectType::Source => PbGrantObject::SourceId(oid),
1177                ObjectType::Sink => PbGrantObject::SinkId(oid),
1178                ObjectType::View => PbGrantObject::ViewId(oid),
1179                ObjectType::Function => PbGrantObject::FunctionId(oid),
1180                ObjectType::Connection => PbGrantObject::ConnectionId(oid),
1181                ObjectType::Subscription => PbGrantObject::SubscriptionId(oid),
1182                ObjectType::Secret => PbGrantObject::SecretId(oid),
1183            };
1184            PbGrantPrivilege {
1185                action_with_opts: vec![PbActionWithGrantOption {
1186                    action: PbAction::from(privilege.action) as _,
1187                    with_grant_option: privilege.with_grant_option,
1188                    granted_by: privilege.granted_by as _,
1189                }],
1190                object: Some(obj),
1191            }
1192        })
1193        .collect())
1194}
1195
1196pub async fn get_table_columns(
1197    txn: &impl ConnectionTrait,
1198    id: TableId,
1199) -> MetaResult<ColumnCatalogArray> {
1200    let columns = Table::find_by_id(id)
1201        .select_only()
1202        .columns([table::Column::Columns])
1203        .into_tuple::<ColumnCatalogArray>()
1204        .one(txn)
1205        .await?
1206        .ok_or_else(|| MetaError::catalog_id_not_found("table", id))?;
1207    Ok(columns)
1208}
1209
1210/// `grant_default_privileges_automatically` grants default privileges automatically
1211/// for the given new object. It returns the list of user infos whose privileges are updated.
1212pub async fn grant_default_privileges_automatically<C>(
1213    db: &C,
1214    object_id: impl Into<ObjectId>,
1215) -> MetaResult<Vec<PbUserInfo>>
1216where
1217    C: ConnectionTrait,
1218{
1219    let object_id = object_id.into();
1220    let object = Object::find_by_id(object_id)
1221        .one(db)
1222        .await?
1223        .ok_or_else(|| MetaError::catalog_id_not_found("object", object_id))?;
1224    assert_ne!(object.obj_type, ObjectType::Database);
1225
1226    let for_mview_filter = if object.obj_type == ObjectType::Table {
1227        let table_type = Table::find_by_id(object_id.as_table_id())
1228            .select_only()
1229            .column(table::Column::TableType)
1230            .into_tuple::<TableType>()
1231            .one(db)
1232            .await?
1233            .ok_or_else(|| MetaError::catalog_id_not_found("table", object_id))?;
1234        user_default_privilege::Column::ForMaterializedView
1235            .eq(table_type == TableType::MaterializedView)
1236    } else {
1237        user_default_privilege::Column::ForMaterializedView.eq(false)
1238    };
1239    let schema_filter = if let Some(schema_id) = &object.schema_id {
1240        user_default_privilege::Column::SchemaId.eq(*schema_id)
1241    } else {
1242        user_default_privilege::Column::SchemaId.is_null()
1243    };
1244
1245    let default_privileges: Vec<(UserId, UserId, Action, bool)> = UserDefaultPrivilege::find()
1246        .select_only()
1247        .columns([
1248            user_default_privilege::Column::Grantee,
1249            user_default_privilege::Column::GrantedBy,
1250            user_default_privilege::Column::Action,
1251            user_default_privilege::Column::WithGrantOption,
1252        ])
1253        .filter(
1254            user_default_privilege::Column::DatabaseId
1255                .eq(object.database_id.unwrap())
1256                .and(schema_filter)
1257                .and(user_default_privilege::Column::UserId.eq(object.owner_id))
1258                .and(user_default_privilege::Column::ObjectType.eq(object.obj_type))
1259                .and(for_mview_filter),
1260        )
1261        .into_tuple()
1262        .all(db)
1263        .await?;
1264    if default_privileges.is_empty() {
1265        return Ok(vec![]);
1266    }
1267
1268    let updated_user_ids = default_privileges
1269        .iter()
1270        .map(|(grantee, _, _, _)| *grantee)
1271        .collect::<HashSet<_>>();
1272
1273    for (grantee, granted_by, action, with_grant_option) in default_privileges {
1274        UserPrivilege::insert(user_privilege::ActiveModel {
1275            user_id: Set(grantee),
1276            oid: Set(object_id),
1277            granted_by: Set(granted_by),
1278            action: Set(action),
1279            with_grant_option: Set(with_grant_option),
1280            ..Default::default()
1281        })
1282        .exec(db)
1283        .await?;
1284        if action == Action::Select {
1285            // Grant SELECT privilege for internal tables if the action is SELECT.
1286            let internal_table_ids = get_internal_tables_by_id(object_id.as_job_id(), db).await?;
1287            if !internal_table_ids.is_empty() {
1288                for internal_table_id in &internal_table_ids {
1289                    UserPrivilege::insert(user_privilege::ActiveModel {
1290                        user_id: Set(grantee),
1291                        oid: Set(internal_table_id.as_object_id()),
1292                        granted_by: Set(granted_by),
1293                        action: Set(Action::Select),
1294                        with_grant_option: Set(with_grant_option),
1295                        ..Default::default()
1296                    })
1297                    .exec(db)
1298                    .await?;
1299                }
1300            }
1301
1302            // Additionally, grant SELECT privilege for iceberg related objects if the action is SELECT.
1303            let iceberg_privilege_object_ids =
1304                get_iceberg_related_object_ids(object_id, db).await?;
1305            if !iceberg_privilege_object_ids.is_empty() {
1306                for iceberg_object_id in &iceberg_privilege_object_ids {
1307                    UserPrivilege::insert(user_privilege::ActiveModel {
1308                        user_id: Set(grantee),
1309                        oid: Set(*iceberg_object_id),
1310                        granted_by: Set(granted_by),
1311                        action: Set(action),
1312                        with_grant_option: Set(with_grant_option),
1313                        ..Default::default()
1314                    })
1315                    .exec(db)
1316                    .await?;
1317                }
1318            }
1319        }
1320    }
1321
1322    let updated_user_infos = list_user_info_by_ids(updated_user_ids, db).await?;
1323    Ok(updated_user_infos)
1324}
1325
1326// todo: remove it after migrated to sql backend.
1327pub fn extract_grant_obj_id(object: &PbGrantObject) -> ObjectId {
1328    match object {
1329        PbGrantObject::DatabaseId(id)
1330        | PbGrantObject::SchemaId(id)
1331        | PbGrantObject::TableId(id)
1332        | PbGrantObject::SourceId(id)
1333        | PbGrantObject::SinkId(id)
1334        | PbGrantObject::ViewId(id)
1335        | PbGrantObject::FunctionId(id)
1336        | PbGrantObject::SubscriptionId(id)
1337        | PbGrantObject::ConnectionId(id)
1338        | PbGrantObject::SecretId(id) => (*id).into(),
1339    }
1340}
1341
1342pub async fn insert_fragment_relations(
1343    db: &impl ConnectionTrait,
1344    downstream_fragment_relations: &FragmentDownstreamRelation,
1345) -> MetaResult<()> {
1346    let mut relations = vec![];
1347    for (upstream_fragment_id, downstreams) in downstream_fragment_relations {
1348        for downstream in downstreams {
1349            relations.push(
1350                fragment_relation::Model {
1351                    source_fragment_id: *upstream_fragment_id as _,
1352                    target_fragment_id: downstream.downstream_fragment_id as _,
1353                    dispatcher_type: downstream.dispatcher_type,
1354                    dist_key_indices: downstream
1355                        .dist_key_indices
1356                        .iter()
1357                        .map(|idx| *idx as i32)
1358                        .collect_vec()
1359                        .into(),
1360                    output_indices: downstream
1361                        .output_mapping
1362                        .indices
1363                        .iter()
1364                        .map(|idx| *idx as i32)
1365                        .collect_vec()
1366                        .into(),
1367                    output_type_mapping: Some(downstream.output_mapping.types.clone().into()),
1368                }
1369                .into_active_model(),
1370            );
1371        }
1372    }
1373    if !relations.is_empty() {
1374        FragmentRelation::insert_many(relations).exec(db).await?;
1375    }
1376    Ok(())
1377}
1378
1379pub fn compose_dispatchers(
1380    source_fragment_distribution: DistributionType,
1381    source_fragment_actors: &HashMap<crate::model::ActorId, Option<Bitmap>>,
1382    target_fragment_id: crate::model::FragmentId,
1383    target_fragment_distribution: DistributionType,
1384    target_fragment_actors: &HashMap<crate::model::ActorId, Option<Bitmap>>,
1385    dispatcher_type: DispatcherType,
1386    dist_key_indices: Vec<u32>,
1387    output_mapping: PbDispatchOutputMapping,
1388) -> HashMap<crate::model::ActorId, PbDispatcher> {
1389    match dispatcher_type {
1390        DispatcherType::Hash => {
1391            let dispatcher = PbDispatcher {
1392                r#type: PbDispatcherType::from(dispatcher_type) as _,
1393                dist_key_indices,
1394                output_mapping: output_mapping.into(),
1395                hash_mapping: Some(
1396                    ActorMapping::from_bitmaps(
1397                        &target_fragment_actors
1398                            .iter()
1399                            .map(|(actor_id, bitmap)| {
1400                                (
1401                                    *actor_id as _,
1402                                    bitmap
1403                                        .clone()
1404                                        .expect("downstream hash dispatch must have distribution"),
1405                                )
1406                            })
1407                            .collect(),
1408                    )
1409                    .to_protobuf(),
1410                ),
1411                dispatcher_id: target_fragment_id,
1412                downstream_actor_id: target_fragment_actors.keys().copied().collect(),
1413            };
1414            source_fragment_actors
1415                .keys()
1416                .map(|source_actor_id| (*source_actor_id, dispatcher.clone()))
1417                .collect()
1418        }
1419        DispatcherType::Broadcast | DispatcherType::Simple => {
1420            let dispatcher = PbDispatcher {
1421                r#type: PbDispatcherType::from(dispatcher_type) as _,
1422                dist_key_indices,
1423                output_mapping: output_mapping.into(),
1424                hash_mapping: None,
1425                dispatcher_id: target_fragment_id,
1426                downstream_actor_id: target_fragment_actors.keys().copied().collect(),
1427            };
1428            source_fragment_actors
1429                .keys()
1430                .map(|source_actor_id| (*source_actor_id, dispatcher.clone()))
1431                .collect()
1432        }
1433        DispatcherType::NoShuffle => resolve_no_shuffle_actor_dispatcher(
1434            source_fragment_distribution,
1435            source_fragment_actors,
1436            target_fragment_distribution,
1437            target_fragment_actors,
1438        )
1439        .into_iter()
1440        .map(|(upstream_actor_id, downstream_actor_id)| {
1441            (
1442                upstream_actor_id,
1443                PbDispatcher {
1444                    r#type: PbDispatcherType::NoShuffle as _,
1445                    dist_key_indices: dist_key_indices.clone(),
1446                    output_mapping: output_mapping.clone().into(),
1447                    hash_mapping: None,
1448                    dispatcher_id: target_fragment_id,
1449                    downstream_actor_id: vec![downstream_actor_id],
1450                },
1451            )
1452        })
1453        .collect(),
1454    }
1455}
1456
1457/// return (`upstream_actor_id` -> `downstream_actor_id`)
1458pub fn resolve_no_shuffle_actor_dispatcher(
1459    source_fragment_distribution: DistributionType,
1460    source_fragment_actors: &HashMap<crate::model::ActorId, Option<Bitmap>>,
1461    target_fragment_distribution: DistributionType,
1462    target_fragment_actors: &HashMap<crate::model::ActorId, Option<Bitmap>>,
1463) -> Vec<(crate::model::ActorId, crate::model::ActorId)> {
1464    assert_eq!(source_fragment_distribution, target_fragment_distribution);
1465    assert_eq!(
1466        source_fragment_actors.len(),
1467        target_fragment_actors.len(),
1468        "no-shuffle should have equal upstream downstream actor count: {:?} {:?}",
1469        source_fragment_actors,
1470        target_fragment_actors
1471    );
1472    match source_fragment_distribution {
1473        DistributionType::Single => {
1474            let assert_singleton = |bitmap: &Option<Bitmap>| {
1475                assert!(
1476                    bitmap.as_ref().map(|bitmap| bitmap.all()).unwrap_or(true),
1477                    "not singleton: {:?}",
1478                    bitmap
1479                );
1480            };
1481            assert_eq!(
1482                source_fragment_actors.len(),
1483                1,
1484                "singleton distribution actor count not 1: {:?}",
1485                source_fragment_distribution
1486            );
1487            assert_eq!(
1488                target_fragment_actors.len(),
1489                1,
1490                "singleton distribution actor count not 1: {:?}",
1491                target_fragment_distribution
1492            );
1493            let (source_actor_id, bitmap) = source_fragment_actors.iter().next().unwrap();
1494            assert_singleton(bitmap);
1495            let (target_actor_id, bitmap) = target_fragment_actors.iter().next().unwrap();
1496            assert_singleton(bitmap);
1497            vec![(*source_actor_id, *target_actor_id)]
1498        }
1499        DistributionType::Hash => {
1500            let mut target_fragment_actor_index: HashMap<_, _> = target_fragment_actors
1501                .iter()
1502                .map(|(actor_id, bitmap)| {
1503                    let bitmap = bitmap
1504                        .as_ref()
1505                        .expect("hash distribution should have bitmap");
1506                    let first_vnode = bitmap.iter_vnodes().next().expect("non-empty bitmap");
1507                    (first_vnode, (*actor_id, bitmap))
1508                })
1509                .collect();
1510            source_fragment_actors
1511                .iter()
1512                .map(|(source_actor_id, bitmap)| {
1513                    let bitmap = bitmap
1514                        .as_ref()
1515                        .expect("hash distribution should have bitmap");
1516                    let first_vnode = bitmap.iter_vnodes().next().expect("non-empty bitmap");
1517                    let (target_actor_id, target_bitmap) =
1518                        target_fragment_actor_index.remove(&first_vnode).unwrap_or_else(|| {
1519                            panic!(
1520                                "cannot find matched target actor: {} {:?} {:?} {:?}",
1521                                source_actor_id,
1522                                first_vnode,
1523                                source_fragment_actors,
1524                                target_fragment_actors
1525                            );
1526                        });
1527                    assert_eq!(
1528                        bitmap,
1529                        target_bitmap,
1530                        "cannot find matched target actor due to bitmap mismatch: {} {:?} {:?} {:?}",
1531                        source_actor_id,
1532                        first_vnode,
1533                        source_fragment_actors,
1534                        target_fragment_actors
1535                    );
1536                    (*source_actor_id, target_actor_id)
1537                }).collect()
1538        }
1539    }
1540}
1541
1542pub fn rebuild_fragment_mapping(fragment: &SharedFragmentInfo) -> PbFragmentWorkerSlotMapping {
1543    let fragment_worker_slot_mapping = match fragment.distribution_type {
1544        DistributionType::Single => {
1545            let actor = fragment.actors.values().exactly_one().unwrap();
1546            WorkerSlotMapping::new_single(WorkerSlotId::new(actor.worker_id as _, 0))
1547        }
1548        DistributionType::Hash => {
1549            let actor_bitmaps: HashMap<_, _> = fragment
1550                .actors
1551                .iter()
1552                .map(|(actor_id, actor_info)| {
1553                    let vnode_bitmap = actor_info
1554                        .vnode_bitmap
1555                        .as_ref()
1556                        .cloned()
1557                        .expect("actor bitmap shouldn't be none in hash fragment");
1558
1559                    (*actor_id as hash::ActorId, vnode_bitmap)
1560                })
1561                .collect();
1562
1563            let actor_mapping = ActorMapping::from_bitmaps(&actor_bitmaps);
1564
1565            let actor_locations = fragment
1566                .actors
1567                .iter()
1568                .map(|(actor_id, actor_info)| (*actor_id as hash::ActorId, actor_info.worker_id))
1569                .collect();
1570
1571            actor_mapping.to_worker_slot(&actor_locations)
1572        }
1573    };
1574
1575    PbFragmentWorkerSlotMapping {
1576        fragment_id: fragment.fragment_id,
1577        mapping: Some(fragment_worker_slot_mapping.to_protobuf()),
1578    }
1579}
1580
1581/// For the given streaming jobs, returns
1582/// - All source fragments
1583/// - All sink fragments
1584/// - All actors
1585/// - All fragments
1586pub async fn get_fragments_for_jobs<C>(
1587    db: &C,
1588    actor_info: &SharedActorInfos,
1589    streaming_jobs: Vec<JobId>,
1590) -> MetaResult<(
1591    HashMap<SourceId, BTreeSet<FragmentId>>,
1592    HashSet<FragmentId>,
1593    HashSet<ActorId>,
1594    HashSet<FragmentId>,
1595)>
1596where
1597    C: ConnectionTrait,
1598{
1599    if streaming_jobs.is_empty() {
1600        return Ok((
1601            HashMap::default(),
1602            HashSet::default(),
1603            HashSet::default(),
1604            HashSet::default(),
1605        ));
1606    }
1607
1608    let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find()
1609        .select_only()
1610        .columns([
1611            fragment::Column::FragmentId,
1612            fragment::Column::FragmentTypeMask,
1613            fragment::Column::StreamNode,
1614        ])
1615        .filter(fragment::Column::JobId.is_in(streaming_jobs))
1616        .into_tuple()
1617        .all(db)
1618        .await?;
1619
1620    let fragment_ids: HashSet<_> = fragments
1621        .iter()
1622        .map(|(fragment_id, _, _)| *fragment_id)
1623        .collect();
1624
1625    let actors = {
1626        let guard = actor_info.read_guard();
1627        fragment_ids
1628            .iter()
1629            .flat_map(|id| guard.get_fragment(*id as _))
1630            .flat_map(|f| f.actors.keys().cloned().map(|id| id as _))
1631            .collect::<HashSet<_>>()
1632    };
1633
1634    let mut source_fragment_ids: HashMap<SourceId, BTreeSet<FragmentId>> = HashMap::new();
1635    let mut sink_fragment_ids: HashSet<FragmentId> = HashSet::new();
1636    for (fragment_id, mask, stream_node) in fragments {
1637        if FragmentTypeMask::from(mask).contains(FragmentTypeFlag::Source)
1638            && let Some(source_id) = stream_node.to_protobuf().find_stream_source()
1639        {
1640            source_fragment_ids
1641                .entry(source_id)
1642                .or_default()
1643                .insert(fragment_id);
1644        }
1645        if FragmentTypeMask::from(mask).contains(FragmentTypeFlag::Sink) {
1646            sink_fragment_ids.insert(fragment_id);
1647        }
1648    }
1649
1650    Ok((
1651        source_fragment_ids,
1652        sink_fragment_ids,
1653        actors.into_iter().collect(),
1654        fragment_ids,
1655    ))
1656}
1657
1658/// Build a object group for notifying the deletion of the given objects.
1659///
1660/// Note that only id fields are filled in the object info, as the arguments are partial objects.
1661/// As a result, the returned notification info should only be used for deletion.
1662pub(crate) fn build_object_group_for_delete(
1663    partial_objects: Vec<PartialObject>,
1664) -> NotificationInfo {
1665    let mut objects = vec![];
1666    for obj in partial_objects {
1667        match obj.obj_type {
1668            ObjectType::Database => objects.push(PbObject {
1669                object_info: Some(PbObjectInfo::Database(PbDatabase {
1670                    id: obj.oid.as_database_id(),
1671                    ..Default::default()
1672                })),
1673            }),
1674            ObjectType::Schema => objects.push(PbObject {
1675                object_info: Some(PbObjectInfo::Schema(PbSchema {
1676                    id: obj.oid.as_schema_id(),
1677                    database_id: obj.database_id.unwrap(),
1678                    ..Default::default()
1679                })),
1680            }),
1681            ObjectType::Table => objects.push(PbObject {
1682                object_info: Some(PbObjectInfo::Table(PbTable {
1683                    id: obj.oid.as_table_id(),
1684                    schema_id: obj.schema_id.unwrap(),
1685                    database_id: obj.database_id.unwrap(),
1686                    ..Default::default()
1687                })),
1688            }),
1689            ObjectType::Source => objects.push(PbObject {
1690                object_info: Some(PbObjectInfo::Source(PbSource {
1691                    id: obj.oid.as_source_id(),
1692                    schema_id: obj.schema_id.unwrap(),
1693                    database_id: obj.database_id.unwrap(),
1694                    ..Default::default()
1695                })),
1696            }),
1697            ObjectType::Sink => objects.push(PbObject {
1698                object_info: Some(PbObjectInfo::Sink(PbSink {
1699                    id: obj.oid.as_sink_id(),
1700                    schema_id: obj.schema_id.unwrap(),
1701                    database_id: obj.database_id.unwrap(),
1702                    ..Default::default()
1703                })),
1704            }),
1705            ObjectType::Subscription => objects.push(PbObject {
1706                object_info: Some(PbObjectInfo::Subscription(PbSubscription {
1707                    id: obj.oid.as_subscription_id(),
1708                    schema_id: obj.schema_id.unwrap(),
1709                    database_id: obj.database_id.unwrap(),
1710                    ..Default::default()
1711                })),
1712            }),
1713            ObjectType::View => objects.push(PbObject {
1714                object_info: Some(PbObjectInfo::View(PbView {
1715                    id: obj.oid.as_view_id(),
1716                    schema_id: obj.schema_id.unwrap(),
1717                    database_id: obj.database_id.unwrap(),
1718                    ..Default::default()
1719                })),
1720            }),
1721            ObjectType::Index => {
1722                objects.push(PbObject {
1723                    object_info: Some(PbObjectInfo::Index(PbIndex {
1724                        id: obj.oid.as_index_id(),
1725                        schema_id: obj.schema_id.unwrap(),
1726                        database_id: obj.database_id.unwrap(),
1727                        ..Default::default()
1728                    })),
1729                });
1730                objects.push(PbObject {
1731                    object_info: Some(PbObjectInfo::Table(PbTable {
1732                        id: obj.oid.as_table_id(),
1733                        schema_id: obj.schema_id.unwrap(),
1734                        database_id: obj.database_id.unwrap(),
1735                        ..Default::default()
1736                    })),
1737                });
1738            }
1739            ObjectType::Function => objects.push(PbObject {
1740                object_info: Some(PbObjectInfo::Function(PbFunction {
1741                    id: obj.oid.as_function_id(),
1742                    schema_id: obj.schema_id.unwrap(),
1743                    database_id: obj.database_id.unwrap(),
1744                    ..Default::default()
1745                })),
1746            }),
1747            ObjectType::Connection => objects.push(PbObject {
1748                object_info: Some(PbObjectInfo::Connection(PbConnection {
1749                    id: obj.oid.as_connection_id(),
1750                    schema_id: obj.schema_id.unwrap(),
1751                    database_id: obj.database_id.unwrap(),
1752                    ..Default::default()
1753                })),
1754            }),
1755            ObjectType::Secret => objects.push(PbObject {
1756                object_info: Some(PbObjectInfo::Secret(PbSecret {
1757                    id: obj.oid.as_secret_id(),
1758                    schema_id: obj.schema_id.unwrap(),
1759                    database_id: obj.database_id.unwrap(),
1760                    ..Default::default()
1761                })),
1762            }),
1763        }
1764    }
1765    NotificationInfo::ObjectGroup(PbObjectGroup { objects })
1766}
1767
1768pub fn extract_external_table_name_from_definition(table_definition: &str) -> Option<String> {
1769    let [mut definition]: [_; 1] = Parser::parse_sql(table_definition)
1770        .context("unable to parse table definition")
1771        .inspect_err(|e| {
1772            tracing::error!(
1773                target: "auto_schema_change",
1774                error = %e.as_report(),
1775                "failed to parse table definition")
1776        })
1777        .unwrap()
1778        .try_into()
1779        .unwrap();
1780    if let SqlStatement::CreateTable { cdc_table_info, .. } = &mut definition {
1781        cdc_table_info
1782            .clone()
1783            .map(|cdc_table_info| cdc_table_info.external_table_name)
1784    } else {
1785        None
1786    }
1787}
1788
1789/// `rename_relation` renames the target relation and its definition,
1790/// it commits the changes to the transaction and returns the updated relations and the old name.
1791pub async fn rename_relation(
1792    txn: &DatabaseTransaction,
1793    object_type: ObjectType,
1794    object_id: ObjectId,
1795    object_name: &str,
1796) -> MetaResult<(Vec<PbObject>, String)> {
1797    use sea_orm::ActiveModelTrait;
1798
1799    use crate::controller::rename::alter_relation_rename;
1800
1801    let mut to_update_relations = vec![];
1802    // rename relation.
1803    macro_rules! rename_relation {
1804        ($entity:ident, $table:ident, $identity:ident, $object_id:expr) => {{
1805            let (mut relation, obj) = $entity::find_by_id($object_id)
1806                .find_also_related(Object)
1807                .one(txn)
1808                .await?
1809                .unwrap();
1810            let obj = obj.unwrap();
1811            let old_name = relation.name.clone();
1812            relation.name = object_name.into();
1813            if obj.obj_type != ObjectType::View {
1814                relation.definition = alter_relation_rename(&relation.definition, object_name);
1815            }
1816            let active_model = $table::ActiveModel {
1817                $identity: Set(relation.$identity),
1818                name: Set(object_name.into()),
1819                definition: Set(relation.definition.clone()),
1820                ..Default::default()
1821            };
1822            active_model.update(txn).await?;
1823            let streaming_job = streaming_job::Entity::find_by_id($object_id.as_raw_id())
1824                .one(txn)
1825                .await?;
1826            to_update_relations.push(PbObject {
1827                object_info: Some(PbObjectInfo::$entity(
1828                    ObjectModel(relation, obj, streaming_job).into(),
1829                )),
1830            });
1831            old_name
1832        }};
1833    }
1834    // TODO: check is there any thing to change for shared source?
1835    let old_name = match object_type {
1836        ObjectType::Table => {
1837            let associated_source_id: Option<SourceId> = Source::find()
1838                .select_only()
1839                .column(source::Column::SourceId)
1840                .filter(source::Column::OptionalAssociatedTableId.eq(object_id))
1841                .into_tuple()
1842                .one(txn)
1843                .await?;
1844            if let Some(source_id) = associated_source_id {
1845                rename_relation!(Source, source, source_id, source_id);
1846            }
1847            rename_relation!(Table, table, table_id, object_id.as_table_id())
1848        }
1849        ObjectType::Source => {
1850            rename_relation!(Source, source, source_id, object_id.as_source_id())
1851        }
1852        ObjectType::Sink => rename_relation!(Sink, sink, sink_id, object_id.as_sink_id()),
1853        ObjectType::Subscription => {
1854            rename_relation!(
1855                Subscription,
1856                subscription,
1857                subscription_id,
1858                object_id.as_subscription_id()
1859            )
1860        }
1861        ObjectType::View => rename_relation!(View, view, view_id, object_id.as_view_id()),
1862        ObjectType::Index => {
1863            let (mut index, obj) = Index::find_by_id(object_id.as_index_id())
1864                .find_also_related(Object)
1865                .one(txn)
1866                .await?
1867                .unwrap();
1868            let streaming_job = streaming_job::Entity::find_by_id(index.index_id.as_job_id())
1869                .one(txn)
1870                .await?;
1871            index.name = object_name.into();
1872            let index_table_id = index.index_table_id;
1873            let old_name = rename_relation!(Table, table, table_id, index_table_id);
1874
1875            // the name of index and its associated table is the same.
1876            let active_model = index::ActiveModel {
1877                index_id: sea_orm::ActiveValue::Set(index.index_id),
1878                name: sea_orm::ActiveValue::Set(object_name.into()),
1879                ..Default::default()
1880            };
1881            active_model.update(txn).await?;
1882            to_update_relations.push(PbObject {
1883                object_info: Some(PbObjectInfo::Index(
1884                    ObjectModel(index, obj.unwrap(), streaming_job).into(),
1885                )),
1886            });
1887            old_name
1888        }
1889        _ => unreachable!("only relation name can be altered."),
1890    };
1891
1892    Ok((to_update_relations, old_name))
1893}
1894
1895pub async fn get_database_resource_group<C>(txn: &C, database_id: DatabaseId) -> MetaResult<String>
1896where
1897    C: ConnectionTrait,
1898{
1899    let database_resource_group: Option<String> = Database::find_by_id(database_id)
1900        .select_only()
1901        .column(database::Column::ResourceGroup)
1902        .into_tuple()
1903        .one(txn)
1904        .await?
1905        .ok_or_else(|| MetaError::catalog_id_not_found("database", database_id))?;
1906
1907    Ok(database_resource_group.unwrap_or_else(|| DEFAULT_RESOURCE_GROUP.to_owned()))
1908}
1909
1910pub async fn get_existing_job_resource_group<C>(
1911    txn: &C,
1912    streaming_job_id: JobId,
1913) -> MetaResult<String>
1914where
1915    C: ConnectionTrait,
1916{
1917    let (job_specific_resource_group, database_resource_group): (Option<String>, Option<String>) =
1918        StreamingJob::find_by_id(streaming_job_id)
1919            .select_only()
1920            .join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
1921            .join(JoinType::InnerJoin, object::Relation::Database2.def())
1922            .column(streaming_job::Column::SpecificResourceGroup)
1923            .column(database::Column::ResourceGroup)
1924            .into_tuple()
1925            .one(txn)
1926            .await?
1927            .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", streaming_job_id))?;
1928
1929    Ok(job_specific_resource_group.unwrap_or_else(|| {
1930        database_resource_group.unwrap_or_else(|| DEFAULT_RESOURCE_GROUP.to_owned())
1931    }))
1932}
1933
1934pub fn filter_workers_by_resource_group(
1935    workers: &HashMap<WorkerId, WorkerNode>,
1936    resource_group: &str,
1937) -> BTreeSet<WorkerId> {
1938    workers
1939        .iter()
1940        .filter(|&(_, worker)| {
1941            worker
1942                .resource_group()
1943                .map(|node_label| node_label.as_str() == resource_group)
1944                .unwrap_or(false)
1945        })
1946        .map(|(id, _)| *id)
1947        .collect()
1948}
1949
1950/// `rename_relation_refer` updates the definition of relations that refer to the target one,
1951/// it commits the changes to the transaction and returns all the updated relations.
1952pub async fn rename_relation_refer(
1953    txn: &DatabaseTransaction,
1954    object_type: ObjectType,
1955    object_id: ObjectId,
1956    object_name: &str,
1957    old_name: &str,
1958) -> MetaResult<Vec<PbObject>> {
1959    use sea_orm::ActiveModelTrait;
1960
1961    use crate::controller::rename::alter_relation_rename_refs;
1962
1963    let mut to_update_relations = vec![];
1964    macro_rules! rename_relation_ref {
1965        ($entity:ident, $table:ident, $identity:ident, $object_id:expr) => {{
1966            let (mut relation, obj) = $entity::find_by_id($object_id)
1967                .find_also_related(Object)
1968                .one(txn)
1969                .await?
1970                .unwrap();
1971            relation.definition =
1972                alter_relation_rename_refs(&relation.definition, old_name, object_name);
1973            let active_model = $table::ActiveModel {
1974                $identity: Set(relation.$identity),
1975                definition: Set(relation.definition.clone()),
1976                ..Default::default()
1977            };
1978            active_model.update(txn).await?;
1979            let streaming_job = streaming_job::Entity::find_by_id($object_id.as_raw_id())
1980                .one(txn)
1981                .await?;
1982            to_update_relations.push(PbObject {
1983                object_info: Some(PbObjectInfo::$entity(
1984                    ObjectModel(relation, obj.unwrap(), streaming_job).into(),
1985                )),
1986            });
1987        }};
1988    }
1989    let mut objs = get_referring_objects(object_id, txn).await?;
1990    if object_type == ObjectType::Table {
1991        let incoming_sinks: Vec<SinkId> = Sink::find()
1992            .select_only()
1993            .column(sink::Column::SinkId)
1994            .filter(sink::Column::TargetTable.eq(object_id))
1995            .into_tuple()
1996            .all(txn)
1997            .await?;
1998
1999        objs.extend(incoming_sinks.into_iter().map(|id| PartialObject {
2000            oid: id.as_object_id(),
2001            obj_type: ObjectType::Sink,
2002            schema_id: None,
2003            database_id: None,
2004        }));
2005    }
2006
2007    for obj in objs {
2008        match obj.obj_type {
2009            ObjectType::Table => {
2010                rename_relation_ref!(Table, table, table_id, obj.oid.as_table_id())
2011            }
2012            ObjectType::Sink => {
2013                rename_relation_ref!(Sink, sink, sink_id, obj.oid.as_sink_id())
2014            }
2015            ObjectType::Subscription => {
2016                rename_relation_ref!(
2017                    Subscription,
2018                    subscription,
2019                    subscription_id,
2020                    obj.oid.as_subscription_id()
2021                )
2022            }
2023            ObjectType::View => rename_relation_ref!(View, view, view_id, obj.oid.as_view_id()),
2024            ObjectType::Index => {
2025                let index_table_id: Option<TableId> = Index::find_by_id(obj.oid.as_index_id())
2026                    .select_only()
2027                    .column(index::Column::IndexTableId)
2028                    .into_tuple()
2029                    .one(txn)
2030                    .await?;
2031                rename_relation_ref!(Table, table, table_id, index_table_id.unwrap());
2032            }
2033            _ => {
2034                bail!(
2035                    "only the table, sink, subscription, view and index will depend on other objects."
2036                )
2037            }
2038        }
2039    }
2040
2041    Ok(to_update_relations)
2042}
2043
2044/// Validate that subscription can be safely deleted, meeting any of the following conditions:
2045/// 1. The upstream table is not referred to by any cross-db mv.
2046/// 2. After deleting the subscription, the upstream table still has at least one subscription.
2047pub async fn validate_subscription_deletion<C>(
2048    txn: &C,
2049    subscription_id: SubscriptionId,
2050) -> MetaResult<()>
2051where
2052    C: ConnectionTrait,
2053{
2054    let upstream_table_id: ObjectId = Subscription::find_by_id(subscription_id)
2055        .select_only()
2056        .column(subscription::Column::DependentTableId)
2057        .into_tuple()
2058        .one(txn)
2059        .await?
2060        .ok_or_else(|| MetaError::catalog_id_not_found("subscription", subscription_id))?;
2061
2062    let cnt = Subscription::find()
2063        .filter(subscription::Column::DependentTableId.eq(upstream_table_id))
2064        .count(txn)
2065        .await?;
2066    if cnt > 1 {
2067        // Ensure that at least one subscription is remained for the upstream table
2068        // once the subscription is dropped.
2069        return Ok(());
2070    }
2071
2072    // Ensure that the upstream table is not referred by any cross-db mv.
2073    let obj_alias = Alias::new("o1");
2074    let used_by_alias = Alias::new("o2");
2075    let count = ObjectDependency::find()
2076        .join_as(
2077            JoinType::InnerJoin,
2078            object_dependency::Relation::Object2.def(),
2079            obj_alias.clone(),
2080        )
2081        .join_as(
2082            JoinType::InnerJoin,
2083            object_dependency::Relation::Object1.def(),
2084            used_by_alias.clone(),
2085        )
2086        .filter(
2087            object_dependency::Column::Oid
2088                .eq(upstream_table_id)
2089                .and(object_dependency::Column::UsedBy.ne(subscription_id))
2090                .and(
2091                    Expr::col((obj_alias, object::Column::DatabaseId))
2092                        .ne(Expr::col((used_by_alias, object::Column::DatabaseId))),
2093                ),
2094        )
2095        .count(txn)
2096        .await?;
2097
2098    if count != 0 {
2099        return Err(MetaError::permission_denied(format!(
2100            "Referenced by {} cross-db objects.",
2101            count
2102        )));
2103    }
2104
2105    Ok(())
2106}
2107
2108pub async fn fetch_target_fragments<C>(
2109    txn: &C,
2110    src_fragment_id: impl IntoIterator<Item = FragmentId>,
2111) -> MetaResult<HashMap<FragmentId, Vec<FragmentId>>>
2112where
2113    C: ConnectionTrait,
2114{
2115    let source_target_fragments: Vec<(FragmentId, FragmentId)> = FragmentRelation::find()
2116        .select_only()
2117        .columns([
2118            fragment_relation::Column::SourceFragmentId,
2119            fragment_relation::Column::TargetFragmentId,
2120        ])
2121        .filter(fragment_relation::Column::SourceFragmentId.is_in(src_fragment_id))
2122        .into_tuple()
2123        .all(txn)
2124        .await?;
2125
2126    let source_target_fragments = source_target_fragments.into_iter().into_group_map();
2127
2128    Ok(source_target_fragments)
2129}
2130
2131pub async fn get_sink_fragment_by_ids<C>(
2132    txn: &C,
2133    sink_ids: Vec<SinkId>,
2134) -> MetaResult<HashMap<SinkId, FragmentId>>
2135where
2136    C: ConnectionTrait,
2137{
2138    let sink_num = sink_ids.len();
2139    let sink_fragment_ids: Vec<(SinkId, FragmentId)> = Fragment::find()
2140        .select_only()
2141        .columns([fragment::Column::JobId, fragment::Column::FragmentId])
2142        .filter(
2143            fragment::Column::JobId
2144                .is_in(sink_ids)
2145                .and(FragmentTypeMask::intersects(FragmentTypeFlag::Sink)),
2146        )
2147        .into_tuple()
2148        .all(txn)
2149        .await?;
2150
2151    if sink_fragment_ids.len() != sink_num {
2152        return Err(anyhow::anyhow!(
2153            "expected exactly one sink fragment for each sink, but got {} fragments for {} sinks",
2154            sink_fragment_ids.len(),
2155            sink_num
2156        )
2157        .into());
2158    }
2159
2160    Ok(sink_fragment_ids.into_iter().collect())
2161}
2162
2163pub async fn has_table_been_migrated<C>(txn: &C, table_id: TableId) -> MetaResult<bool>
2164where
2165    C: ConnectionTrait,
2166{
2167    let mview_fragment: Vec<i32> = Fragment::find()
2168        .select_only()
2169        .column(fragment::Column::FragmentTypeMask)
2170        .filter(
2171            fragment::Column::JobId
2172                .eq(table_id)
2173                .and(FragmentTypeMask::intersects(FragmentTypeFlag::Mview)),
2174        )
2175        .into_tuple()
2176        .all(txn)
2177        .await?;
2178
2179    let mview_fragment_len = mview_fragment.len();
2180    if mview_fragment_len != 1 {
2181        bail!(
2182            "expected exactly one mview fragment for table {}, found {}",
2183            table_id,
2184            mview_fragment_len
2185        );
2186    }
2187
2188    let mview_fragment = mview_fragment.into_iter().next().unwrap();
2189    let migrated =
2190        FragmentTypeMask::from(mview_fragment).contains(FragmentTypeFlag::UpstreamSinkUnion);
2191
2192    Ok(migrated)
2193}
2194
2195pub async fn try_get_iceberg_table_by_downstream_sink<C>(
2196    txn: &C,
2197    sink_id: SinkId,
2198) -> MetaResult<Option<TableId>>
2199where
2200    C: ConnectionTrait,
2201{
2202    let sink = Sink::find_by_id(sink_id).one(txn).await?;
2203    let Some(sink) = sink else {
2204        return Ok(None);
2205    };
2206
2207    if sink.name.starts_with(ICEBERG_SINK_PREFIX) {
2208        let object_ids: Vec<ObjectId> = ObjectDependency::find()
2209            .select_only()
2210            .column(object_dependency::Column::Oid)
2211            .filter(object_dependency::Column::UsedBy.eq(sink_id))
2212            .into_tuple()
2213            .all(txn)
2214            .await?;
2215        let mut iceberg_table_ids = vec![];
2216        for object_id in object_ids {
2217            let table_id = object_id.as_table_id();
2218            if let Some(table_engine) = Table::find_by_id(table_id)
2219                .select_only()
2220                .column(table::Column::Engine)
2221                .into_tuple::<table::Engine>()
2222                .one(txn)
2223                .await?
2224                && table_engine == table::Engine::Iceberg
2225            {
2226                iceberg_table_ids.push(table_id);
2227            }
2228        }
2229        if iceberg_table_ids.len() == 1 {
2230            return Ok(Some(iceberg_table_ids[0]));
2231        }
2232    }
2233    Ok(None)
2234}
2235
2236pub async fn check_if_belongs_to_iceberg_table<C>(txn: &C, job_id: JobId) -> MetaResult<bool>
2237where
2238    C: ConnectionTrait,
2239{
2240    if let Some(engine) = Table::find_by_id(job_id.as_mv_table_id())
2241        .select_only()
2242        .column(table::Column::Engine)
2243        .into_tuple::<table::Engine>()
2244        .one(txn)
2245        .await?
2246        && engine == table::Engine::Iceberg
2247    {
2248        return Ok(true);
2249    }
2250    if let Some(sink_name) = Sink::find_by_id(job_id.as_sink_id())
2251        .select_only()
2252        .column(sink::Column::Name)
2253        .into_tuple::<String>()
2254        .one(txn)
2255        .await?
2256        && sink_name.starts_with(ICEBERG_SINK_PREFIX)
2257    {
2258        return Ok(true);
2259    }
2260    Ok(false)
2261}
2262
2263pub async fn find_dirty_iceberg_table_jobs<C>(
2264    txn: &C,
2265    database_id: Option<DatabaseId>,
2266) -> MetaResult<Vec<PartialObject>>
2267where
2268    C: ConnectionTrait,
2269{
2270    let mut filter_condition = streaming_job::Column::JobStatus
2271        .ne(JobStatus::Created)
2272        .and(object::Column::ObjType.is_in([ObjectType::Table, ObjectType::Sink]))
2273        .and(streaming_job::Column::CreateType.eq(CreateType::Background));
2274    if let Some(database_id) = database_id {
2275        filter_condition = filter_condition.and(object::Column::DatabaseId.eq(database_id));
2276    }
2277    let creating_table_sink_jobs: Vec<PartialObject> = StreamingJob::find()
2278        .select_only()
2279        .columns([
2280            object::Column::Oid,
2281            object::Column::ObjType,
2282            object::Column::SchemaId,
2283            object::Column::DatabaseId,
2284        ])
2285        .join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
2286        .filter(filter_condition)
2287        .into_partial_model()
2288        .all(txn)
2289        .await?;
2290
2291    let mut dirty_iceberg_table_jobs = vec![];
2292    for job in creating_table_sink_jobs {
2293        if check_if_belongs_to_iceberg_table(txn, job.oid.as_job_id()).await? {
2294            tracing::info!("Found dirty iceberg job with id: {}", job.oid);
2295            dirty_iceberg_table_jobs.push(job);
2296        }
2297    }
2298
2299    Ok(dirty_iceberg_table_jobs)
2300}
2301
2302pub fn build_select_node_list(
2303    from: &[ColumnCatalog],
2304    to: &[ColumnCatalog],
2305) -> MetaResult<Vec<PbExprNode>> {
2306    let mut exprs = Vec::with_capacity(to.len());
2307    let idx_by_col_id = from
2308        .iter()
2309        .enumerate()
2310        .map(|(idx, col)| (col.column_desc.as_ref().unwrap().column_id, idx))
2311        .collect::<HashMap<_, _>>();
2312
2313    for to_col in to {
2314        let to_col = to_col.column_desc.as_ref().unwrap();
2315        let to_col_type_ref = to_col.column_type.as_ref().unwrap();
2316        let to_col_type = DataType::from(to_col_type_ref);
2317        if let Some(from_idx) = idx_by_col_id.get(&to_col.column_id) {
2318            let from_col_type = DataType::from(
2319                from[*from_idx]
2320                    .column_desc
2321                    .as_ref()
2322                    .unwrap()
2323                    .column_type
2324                    .as_ref()
2325                    .unwrap(),
2326            );
2327            if !to_col_type.equals_datatype(&from_col_type) {
2328                return Err(anyhow!(
2329                    "Column type mismatch: {:?} != {:?}",
2330                    from_col_type,
2331                    to_col_type
2332                )
2333                .into());
2334            }
2335            exprs.push(PbExprNode {
2336                function_type: expr_node::Type::Unspecified.into(),
2337                return_type: Some(to_col_type_ref.clone()),
2338                rex_node: Some(expr_node::RexNode::InputRef(*from_idx as _)),
2339            });
2340        } else {
2341            let to_default_node =
2342                if let Some(GeneratedOrDefaultColumn::DefaultColumn(DefaultColumnDesc {
2343                    expr,
2344                    ..
2345                })) = &to_col.generated_or_default_column
2346                {
2347                    expr.clone().unwrap()
2348                } else {
2349                    let null = Datum::None.to_protobuf();
2350                    PbExprNode {
2351                        function_type: expr_node::Type::Unspecified.into(),
2352                        return_type: Some(to_col_type_ref.clone()),
2353                        rex_node: Some(expr_node::RexNode::Constant(null)),
2354                    }
2355                };
2356            exprs.push(to_default_node);
2357        }
2358    }
2359
2360    Ok(exprs)
2361}
2362
2363#[derive(Clone, Debug, Default)]
2364pub struct StreamingJobExtraInfo {
2365    pub timezone: Option<String>,
2366    pub config_override: Arc<str>,
2367    pub adaptive_parallelism_strategy: Option<AdaptiveParallelismStrategy>,
2368    pub job_definition: String,
2369    pub backfill_orders: Option<BackfillOrders>,
2370}
2371
2372impl StreamingJobExtraInfo {
2373    pub fn stream_context(&self) -> StreamContext {
2374        StreamContext {
2375            timezone: self.timezone.clone(),
2376            config_override: self.config_override.clone(),
2377            adaptive_parallelism_strategy: self.adaptive_parallelism_strategy,
2378        }
2379    }
2380}
2381
2382/// Tuple of (`job_id`, `timezone`, `config_override`, `adaptive_parallelism_strategy`, `backfill_orders`)
2383type StreamingJobExtraInfoRow = (
2384    JobId,
2385    Option<String>,
2386    Option<String>,
2387    Option<String>,
2388    Option<BackfillOrders>,
2389);
2390
2391pub async fn get_streaming_job_extra_info<C>(
2392    txn: &C,
2393    job_ids: Vec<JobId>,
2394) -> MetaResult<HashMap<JobId, StreamingJobExtraInfo>>
2395where
2396    C: ConnectionTrait,
2397{
2398    let pairs: Vec<StreamingJobExtraInfoRow> = StreamingJob::find()
2399        .select_only()
2400        .columns([
2401            streaming_job::Column::JobId,
2402            streaming_job::Column::Timezone,
2403            streaming_job::Column::ConfigOverride,
2404            streaming_job::Column::AdaptiveParallelismStrategy,
2405            streaming_job::Column::BackfillOrders,
2406        ])
2407        .filter(streaming_job::Column::JobId.is_in(job_ids.clone()))
2408        .into_tuple()
2409        .all(txn)
2410        .await?;
2411
2412    let job_ids = job_ids.into_iter().collect();
2413
2414    let mut definitions = resolve_streaming_job_definition(txn, &job_ids).await?;
2415
2416    let result = pairs
2417        .into_iter()
2418        .map(
2419            |(job_id, timezone, config_override, strategy, backfill_orders)| {
2420                let job_definition = definitions.remove(&job_id).unwrap_or_default();
2421                let adaptive_parallelism_strategy = strategy.as_deref().map(|s| {
2422                    parse_strategy(s).expect("strategy should be validated before storing")
2423                });
2424                (
2425                    job_id,
2426                    StreamingJobExtraInfo {
2427                        timezone,
2428                        config_override: config_override.unwrap_or_default().into(),
2429                        adaptive_parallelism_strategy,
2430                        job_definition,
2431                        backfill_orders,
2432                    },
2433                )
2434            },
2435        )
2436        .collect();
2437
2438    Ok(result)
2439}
2440
2441#[cfg(test)]
2442mod tests {
2443    use super::*;
2444
2445    #[test]
2446    fn test_extract_cdc_table_name() {
2447        let ddl1 = "CREATE TABLE t1 () FROM pg_source TABLE 'public.t1'";
2448        let ddl2 = "CREATE TABLE t2 (v1 int) FROM pg_source TABLE 'mydb.t2'";
2449        assert_eq!(
2450            extract_external_table_name_from_definition(ddl1),
2451            Some("public.t1".into())
2452        );
2453        assert_eq!(
2454            extract_external_table_name_from_definition(ddl2),
2455            Some("mydb.t2".into())
2456        );
2457    }
2458}