risingwave_meta/controller/
utils.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeSet, HashMap, HashSet};
16use std::sync::Arc;
17
18use anyhow::{Context, anyhow};
19use itertools::Itertools;
20use risingwave_common::bitmap::Bitmap;
21use risingwave_common::catalog::{
22    FragmentTypeFlag, FragmentTypeMask, ICEBERG_SINK_PREFIX, ICEBERG_SOURCE_PREFIX,
23};
24use risingwave_common::hash::{ActorMapping, VnodeBitmapExt, WorkerSlotId, WorkerSlotMapping};
25use risingwave_common::id::{JobId, SubscriptionId};
26use risingwave_common::types::{DataType, Datum};
27use risingwave_common::util::value_encoding::DatumToProtoExt;
28use risingwave_common::util::worker_util::DEFAULT_RESOURCE_GROUP;
29use risingwave_common::{bail, hash};
30use risingwave_meta_model::fragment::DistributionType;
31use risingwave_meta_model::object::ObjectType;
32use risingwave_meta_model::prelude::*;
33use risingwave_meta_model::streaming_job::BackfillOrders;
34use risingwave_meta_model::table::TableType;
35use risingwave_meta_model::user_privilege::Action;
36use risingwave_meta_model::{
37    ActorId, ColumnCatalogArray, CreateType, DataTypeArray, DatabaseId, DispatcherType, FragmentId,
38    JobStatus, ObjectId, PrivilegeId, SchemaId, SinkId, SourceId, StreamNode, StreamSourceInfo,
39    TableId, TableIdArray, UserId, WorkerId, connection, database, fragment, fragment_relation,
40    function, index, object, object_dependency, schema, secret, sink, source, streaming_job,
41    subscription, table, user, user_default_privilege, user_privilege, view,
42};
43use risingwave_meta_model_migration::WithQuery;
44use risingwave_pb::catalog::{
45    PbConnection, PbDatabase, PbFunction, PbIndex, PbSchema, PbSecret, PbSink, PbSource,
46    PbSubscription, PbTable, PbView,
47};
48use risingwave_pb::common::{PbObjectType, WorkerNode};
49use risingwave_pb::expr::{PbExprNode, expr_node};
50use risingwave_pb::meta::object::PbObjectInfo;
51use risingwave_pb::meta::subscribe_response::Info as NotificationInfo;
52use risingwave_pb::meta::{
53    ObjectDependency as PbObjectDependency, PbFragmentWorkerSlotMapping, PbObject, PbObjectGroup,
54};
55use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn;
56use risingwave_pb::plan_common::{ColumnCatalog, DefaultColumnDesc};
57use risingwave_pb::stream_plan::{PbDispatchOutputMapping, PbDispatcher, PbDispatcherType};
58use risingwave_pb::user::grant_privilege::{PbActionWithGrantOption, PbObject as PbGrantObject};
59use risingwave_pb::user::{PbAction, PbGrantPrivilege, PbUserInfo};
60use risingwave_sqlparser::ast::Statement as SqlStatement;
61use risingwave_sqlparser::parser::Parser;
62use sea_orm::sea_query::{
63    Alias, CommonTableExpression, Expr, Query, QueryStatementBuilder, SelectStatement, UnionType,
64    WithClause,
65};
66use sea_orm::{
67    ColumnTrait, ConnectionTrait, DatabaseTransaction, DerivePartialModel, EntityTrait,
68    FromQueryResult, IntoActiveModel, JoinType, Order, PaginatorTrait, QueryFilter, QuerySelect,
69    RelationTrait, Set, Statement,
70};
71use thiserror_ext::AsReport;
72use tracing::warn;
73
74use crate::barrier::SharedFragmentInfo;
75use crate::controller::ObjectModel;
76use crate::controller::fragment::FragmentTypeMaskExt;
77use crate::controller::scale::resolve_streaming_job_definition;
78use crate::model::{FragmentDownstreamRelation, StreamContext};
79use crate::{MetaError, MetaResult};
80
81/// This function will construct a query using recursive cte to find all objects[(id, `obj_type`)] that are used by the given object.
82///
83/// # Examples
84///
85/// ```
86/// use risingwave_meta::controller::utils::construct_obj_dependency_query;
87/// use sea_orm::sea_query::*;
88/// use sea_orm::*;
89///
90/// let query = construct_obj_dependency_query(1.into());
91///
92/// assert_eq!(
93///     query.to_string(MysqlQueryBuilder),
94///     r#"WITH RECURSIVE `used_by_object_ids` (`used_by`) AS (SELECT `used_by` FROM `object_dependency` WHERE `object_dependency`.`oid` = 1 UNION ALL (SELECT `oid` FROM `object` WHERE `object`.`database_id` = 1 OR `object`.`schema_id` = 1) UNION ALL (SELECT `object_dependency`.`used_by` FROM `object_dependency` INNER JOIN `used_by_object_ids` ON `used_by_object_ids`.`used_by` = `oid`)) SELECT DISTINCT `oid`, `obj_type`, `schema_id`, `database_id` FROM `used_by_object_ids` INNER JOIN `object` ON `used_by_object_ids`.`used_by` = `oid` ORDER BY `oid` DESC"#
95/// );
96/// assert_eq!(
97///     query.to_string(PostgresQueryBuilder),
98///     r#"WITH RECURSIVE "used_by_object_ids" ("used_by") AS (SELECT "used_by" FROM "object_dependency" WHERE "object_dependency"."oid" = 1 UNION ALL (SELECT "oid" FROM "object" WHERE "object"."database_id" = 1 OR "object"."schema_id" = 1) UNION ALL (SELECT "object_dependency"."used_by" FROM "object_dependency" INNER JOIN "used_by_object_ids" ON "used_by_object_ids"."used_by" = "oid")) SELECT DISTINCT "oid", "obj_type", "schema_id", "database_id" FROM "used_by_object_ids" INNER JOIN "object" ON "used_by_object_ids"."used_by" = "oid" ORDER BY "oid" DESC"#
99/// );
100/// assert_eq!(
101///     query.to_string(SqliteQueryBuilder),
102///     r#"WITH RECURSIVE "used_by_object_ids" ("used_by") AS (SELECT "used_by" FROM "object_dependency" WHERE "object_dependency"."oid" = 1 UNION ALL SELECT "oid" FROM "object" WHERE "object"."database_id" = 1 OR "object"."schema_id" = 1 UNION ALL SELECT "object_dependency"."used_by" FROM "object_dependency" INNER JOIN "used_by_object_ids" ON "used_by_object_ids"."used_by" = "oid") SELECT DISTINCT "oid", "obj_type", "schema_id", "database_id" FROM "used_by_object_ids" INNER JOIN "object" ON "used_by_object_ids"."used_by" = "oid" ORDER BY "oid" DESC"#
103/// );
104/// ```
105pub fn construct_obj_dependency_query(obj_id: ObjectId) -> WithQuery {
106    let cte_alias = Alias::new("used_by_object_ids");
107    let cte_return_alias = Alias::new("used_by");
108
109    let mut base_query = SelectStatement::new()
110        .column(object_dependency::Column::UsedBy)
111        .from(ObjectDependency)
112        .and_where(object_dependency::Column::Oid.eq(obj_id))
113        .to_owned();
114
115    let belonged_obj_query = SelectStatement::new()
116        .column(object::Column::Oid)
117        .from(Object)
118        .and_where(
119            object::Column::DatabaseId
120                .eq(obj_id)
121                .or(object::Column::SchemaId.eq(obj_id)),
122        )
123        .to_owned();
124
125    let cte_referencing = Query::select()
126        .column((ObjectDependency, object_dependency::Column::UsedBy))
127        .from(ObjectDependency)
128        .inner_join(
129            cte_alias.clone(),
130            Expr::col((cte_alias.clone(), cte_return_alias.clone()))
131                .equals(object_dependency::Column::Oid),
132        )
133        .to_owned();
134
135    let mut common_table_expr = CommonTableExpression::new();
136    common_table_expr
137        .query(
138            base_query
139                .union(UnionType::All, belonged_obj_query)
140                .union(UnionType::All, cte_referencing)
141                .to_owned(),
142        )
143        .column(cte_return_alias.clone())
144        .table_name(cte_alias.clone());
145
146    SelectStatement::new()
147        .distinct()
148        .columns([
149            object::Column::Oid,
150            object::Column::ObjType,
151            object::Column::SchemaId,
152            object::Column::DatabaseId,
153        ])
154        .from(cte_alias.clone())
155        .inner_join(
156            Object,
157            Expr::col((cte_alias, cte_return_alias)).equals(object::Column::Oid),
158        )
159        .order_by(object::Column::Oid, Order::Desc)
160        .to_owned()
161        .with(
162            WithClause::new()
163                .recursive(true)
164                .cte(common_table_expr)
165                .to_owned(),
166        )
167}
168
169fn to_pb_object_type(obj_type: ObjectType) -> PbObjectType {
170    match obj_type {
171        ObjectType::Database => PbObjectType::Database,
172        ObjectType::Schema => PbObjectType::Schema,
173        ObjectType::Table => PbObjectType::Table,
174        ObjectType::Source => PbObjectType::Source,
175        ObjectType::Sink => PbObjectType::Sink,
176        ObjectType::View => PbObjectType::View,
177        ObjectType::Index => PbObjectType::Index,
178        ObjectType::Function => PbObjectType::Function,
179        ObjectType::Connection => PbObjectType::Connection,
180        ObjectType::Subscription => PbObjectType::Subscription,
181        ObjectType::Secret => PbObjectType::Secret,
182    }
183}
184
185/// Collect object dependencies with optional object id filtering, and optionally exclude non-created streaming jobs.
186async fn list_object_dependencies_impl(
187    txn: &DatabaseTransaction,
188    object_id: Option<ObjectId>,
189    include_creating: bool,
190) -> MetaResult<Vec<PbObjectDependency>> {
191    let referenced_alias = Alias::new("referenced_obj");
192    let mut query = ObjectDependency::find()
193        .select_only()
194        .columns([
195            object_dependency::Column::Oid,
196            object_dependency::Column::UsedBy,
197        ])
198        .column_as(
199            Expr::col((referenced_alias.clone(), object::Column::ObjType)),
200            "referenced_obj_type",
201        )
202        .join(
203            JoinType::InnerJoin,
204            object_dependency::Relation::Object1.def(),
205        )
206        .join_as(
207            JoinType::InnerJoin,
208            object_dependency::Relation::Object2.def(),
209            referenced_alias.clone(),
210        );
211    if let Some(object_id) = object_id {
212        query = query.filter(object_dependency::Column::UsedBy.eq(object_id));
213    }
214    let mut obj_dependencies: Vec<PbObjectDependency> = query
215        .into_tuple()
216        .all(txn)
217        .await?
218        .into_iter()
219        .map(|(oid, used_by, referenced_type)| PbObjectDependency {
220            object_id: used_by,
221            referenced_object_id: oid,
222            referenced_object_type: to_pb_object_type(referenced_type) as i32,
223        })
224        .collect();
225
226    let mut sink_query = Sink::find()
227        .select_only()
228        .columns([sink::Column::SinkId, sink::Column::TargetTable])
229        .filter(sink::Column::TargetTable.is_not_null());
230    if let Some(object_id) = object_id {
231        sink_query = sink_query.filter(
232            sink::Column::SinkId
233                .eq(object_id)
234                .or(sink::Column::TargetTable.eq(object_id)),
235        );
236    }
237    let sink_dependencies: Vec<(SinkId, TableId)> = sink_query.into_tuple().all(txn).await?;
238    obj_dependencies.extend(sink_dependencies.into_iter().map(|(sink_id, table_id)| {
239        PbObjectDependency {
240            object_id: table_id.into(),
241            referenced_object_id: sink_id.into(),
242            referenced_object_type: PbObjectType::Sink as i32,
243        }
244    }));
245
246    if !include_creating {
247        let mut streaming_job_ids = obj_dependencies
248            .iter()
249            .map(|dependency| dependency.object_id)
250            .collect_vec();
251        streaming_job_ids.sort_unstable();
252        streaming_job_ids.dedup();
253
254        if !streaming_job_ids.is_empty() {
255            let non_created_jobs: HashSet<JobId> = StreamingJob::find()
256                .select_only()
257                .columns([streaming_job::Column::JobId])
258                .filter(
259                    streaming_job::Column::JobId
260                        .is_in(streaming_job_ids)
261                        .and(streaming_job::Column::JobStatus.ne(JobStatus::Created)),
262                )
263                .into_tuple()
264                .all(txn)
265                .await?
266                .into_iter()
267                .collect();
268
269            if !non_created_jobs.is_empty() {
270                obj_dependencies.retain(|dependency| {
271                    !non_created_jobs.contains(&dependency.object_id.as_job_id())
272                });
273            }
274        }
275    }
276
277    Ok(obj_dependencies)
278}
279
280/// List object dependencies with optional filtering of non-created streaming jobs.
281pub async fn list_object_dependencies(
282    txn: &DatabaseTransaction,
283    include_creating: bool,
284) -> MetaResult<Vec<PbObjectDependency>> {
285    list_object_dependencies_impl(txn, None, include_creating).await
286}
287
288/// List object dependencies for the specified object id without filtering by job status.
289pub async fn list_object_dependencies_by_object_id(
290    txn: &DatabaseTransaction,
291    object_id: ObjectId,
292) -> MetaResult<Vec<PbObjectDependency>> {
293    list_object_dependencies_impl(txn, Some(object_id), true).await
294}
295
296/// This function will construct a query using recursive cte to find if dependent objects are already relying on the target table.
297///
298/// # Examples
299///
300/// ```
301/// use risingwave_meta::controller::utils::construct_sink_cycle_check_query;
302/// use sea_orm::sea_query::*;
303/// use sea_orm::*;
304///
305/// let query = construct_sink_cycle_check_query(1.into(), vec![2.into(), 3.into()]);
306///
307/// assert_eq!(
308///     query.to_string(MysqlQueryBuilder),
309///     r#"WITH RECURSIVE `used_by_object_ids_with_sink` (`oid`, `used_by`) AS (SELECT `oid`, `used_by` FROM `object_dependency` WHERE `object_dependency`.`oid` = 1 UNION ALL (SELECT `obj_dependency_with_sink`.`oid`, `obj_dependency_with_sink`.`used_by` FROM (SELECT `oid`, `used_by` FROM `object_dependency` UNION ALL (SELECT `sink_id`, `target_table` FROM `sink` WHERE `sink`.`target_table` IS NOT NULL)) AS `obj_dependency_with_sink` INNER JOIN `used_by_object_ids_with_sink` ON `used_by_object_ids_with_sink`.`used_by` = `obj_dependency_with_sink`.`oid` WHERE `used_by_object_ids_with_sink`.`used_by` <> `used_by_object_ids_with_sink`.`oid`)) SELECT COUNT(`used_by_object_ids_with_sink`.`used_by`) FROM `used_by_object_ids_with_sink` WHERE `used_by_object_ids_with_sink`.`used_by` IN (2, 3)"#
310/// );
311/// assert_eq!(
312///     query.to_string(PostgresQueryBuilder),
313///     r#"WITH RECURSIVE "used_by_object_ids_with_sink" ("oid", "used_by") AS (SELECT "oid", "used_by" FROM "object_dependency" WHERE "object_dependency"."oid" = 1 UNION ALL (SELECT "obj_dependency_with_sink"."oid", "obj_dependency_with_sink"."used_by" FROM (SELECT "oid", "used_by" FROM "object_dependency" UNION ALL (SELECT "sink_id", "target_table" FROM "sink" WHERE "sink"."target_table" IS NOT NULL)) AS "obj_dependency_with_sink" INNER JOIN "used_by_object_ids_with_sink" ON "used_by_object_ids_with_sink"."used_by" = "obj_dependency_with_sink"."oid" WHERE "used_by_object_ids_with_sink"."used_by" <> "used_by_object_ids_with_sink"."oid")) SELECT COUNT("used_by_object_ids_with_sink"."used_by") FROM "used_by_object_ids_with_sink" WHERE "used_by_object_ids_with_sink"."used_by" IN (2, 3)"#
314/// );
315/// assert_eq!(
316///     query.to_string(SqliteQueryBuilder),
317///     r#"WITH RECURSIVE "used_by_object_ids_with_sink" ("oid", "used_by") AS (SELECT "oid", "used_by" FROM "object_dependency" WHERE "object_dependency"."oid" = 1 UNION ALL SELECT "obj_dependency_with_sink"."oid", "obj_dependency_with_sink"."used_by" FROM (SELECT "oid", "used_by" FROM "object_dependency" UNION ALL SELECT "sink_id", "target_table" FROM "sink" WHERE "sink"."target_table" IS NOT NULL) AS "obj_dependency_with_sink" INNER JOIN "used_by_object_ids_with_sink" ON "used_by_object_ids_with_sink"."used_by" = "obj_dependency_with_sink"."oid" WHERE "used_by_object_ids_with_sink"."used_by" <> "used_by_object_ids_with_sink"."oid") SELECT COUNT("used_by_object_ids_with_sink"."used_by") FROM "used_by_object_ids_with_sink" WHERE "used_by_object_ids_with_sink"."used_by" IN (2, 3)"#
318/// );
319/// ```
320pub fn construct_sink_cycle_check_query(
321    target_table: ObjectId,
322    dependent_objects: Vec<ObjectId>,
323) -> WithQuery {
324    let cte_alias = Alias::new("used_by_object_ids_with_sink");
325    let depend_alias = Alias::new("obj_dependency_with_sink");
326
327    let mut base_query = SelectStatement::new()
328        .columns([
329            object_dependency::Column::Oid,
330            object_dependency::Column::UsedBy,
331        ])
332        .from(ObjectDependency)
333        .and_where(object_dependency::Column::Oid.eq(target_table))
334        .to_owned();
335
336    let query_sink_deps = SelectStatement::new()
337        .columns([sink::Column::SinkId, sink::Column::TargetTable])
338        .from(Sink)
339        .and_where(sink::Column::TargetTable.is_not_null())
340        .to_owned();
341
342    let cte_referencing = Query::select()
343        .column((depend_alias.clone(), object_dependency::Column::Oid))
344        .column((depend_alias.clone(), object_dependency::Column::UsedBy))
345        .from_subquery(
346            SelectStatement::new()
347                .columns([
348                    object_dependency::Column::Oid,
349                    object_dependency::Column::UsedBy,
350                ])
351                .from(ObjectDependency)
352                .union(UnionType::All, query_sink_deps)
353                .to_owned(),
354            depend_alias.clone(),
355        )
356        .inner_join(
357            cte_alias.clone(),
358            Expr::col((cte_alias.clone(), object_dependency::Column::UsedBy))
359                .eq(Expr::col((depend_alias, object_dependency::Column::Oid))),
360        )
361        .and_where(
362            Expr::col((cte_alias.clone(), object_dependency::Column::UsedBy)).ne(Expr::col((
363                cte_alias.clone(),
364                object_dependency::Column::Oid,
365            ))),
366        )
367        .to_owned();
368
369    let mut common_table_expr = CommonTableExpression::new();
370    common_table_expr
371        .query(base_query.union(UnionType::All, cte_referencing).to_owned())
372        .columns([
373            object_dependency::Column::Oid,
374            object_dependency::Column::UsedBy,
375        ])
376        .table_name(cte_alias.clone());
377
378    SelectStatement::new()
379        .expr(Expr::col((cte_alias.clone(), object_dependency::Column::UsedBy)).count())
380        .from(cte_alias.clone())
381        .and_where(
382            Expr::col((cte_alias, object_dependency::Column::UsedBy)).is_in(dependent_objects),
383        )
384        .to_owned()
385        .with(
386            WithClause::new()
387                .recursive(true)
388                .cte(common_table_expr)
389                .to_owned(),
390        )
391}
392
393#[derive(Clone, DerivePartialModel, FromQueryResult, Debug)]
394#[sea_orm(entity = "Object")]
395pub struct PartialObject {
396    pub oid: ObjectId,
397    pub obj_type: ObjectType,
398    pub schema_id: Option<SchemaId>,
399    pub database_id: Option<DatabaseId>,
400}
401
402#[derive(Clone, DerivePartialModel, FromQueryResult)]
403#[sea_orm(entity = "Fragment")]
404pub struct PartialFragmentStateTables {
405    pub fragment_id: FragmentId,
406    pub job_id: ObjectId,
407    pub state_table_ids: TableIdArray,
408}
409
410#[derive(Clone, Eq, PartialEq, Debug)]
411pub struct PartialActorLocation {
412    pub actor_id: ActorId,
413    pub fragment_id: FragmentId,
414    pub worker_id: WorkerId,
415}
416
417#[derive(FromQueryResult, Debug, Eq, PartialEq, Clone)]
418pub struct FragmentDesc {
419    pub fragment_id: FragmentId,
420    pub job_id: JobId,
421    pub fragment_type_mask: i32,
422    pub distribution_type: DistributionType,
423    pub state_table_ids: TableIdArray,
424    pub parallelism: i64,
425    pub vnode_count: i32,
426    pub stream_node: StreamNode,
427    pub parallelism_policy: String,
428}
429
430/// List all objects that are using the given one in a cascade way. It runs a recursive CTE to find all the dependencies.
431pub async fn get_referring_objects_cascade<C>(
432    obj_id: ObjectId,
433    db: &C,
434) -> MetaResult<Vec<PartialObject>>
435where
436    C: ConnectionTrait,
437{
438    let query = construct_obj_dependency_query(obj_id);
439    let (sql, values) = query.build_any(&*db.get_database_backend().get_query_builder());
440    let objects = PartialObject::find_by_statement(Statement::from_sql_and_values(
441        db.get_database_backend(),
442        sql,
443        values,
444    ))
445    .all(db)
446    .await?;
447    Ok(objects)
448}
449
450/// Check if create a sink with given dependent objects into the target table will cause a cycle, return true if it will.
451pub async fn check_sink_into_table_cycle<C>(
452    target_table: ObjectId,
453    dependent_objs: Vec<ObjectId>,
454    db: &C,
455) -> MetaResult<bool>
456where
457    C: ConnectionTrait,
458{
459    if dependent_objs.is_empty() {
460        return Ok(false);
461    }
462
463    // special check for self referencing
464    if dependent_objs.contains(&target_table) {
465        return Ok(true);
466    }
467
468    let query = construct_sink_cycle_check_query(target_table, dependent_objs);
469    let (sql, values) = query.build_any(&*db.get_database_backend().get_query_builder());
470
471    let res = db
472        .query_one(Statement::from_sql_and_values(
473            db.get_database_backend(),
474            sql,
475            values,
476        ))
477        .await?
478        .unwrap();
479
480    let cnt: i64 = res.try_get_by(0)?;
481
482    Ok(cnt != 0)
483}
484
485/// `ensure_object_id` ensures the existence of target object in the cluster.
486pub async fn ensure_object_id<C>(
487    object_type: ObjectType,
488    obj_id: impl Into<ObjectId>,
489    db: &C,
490) -> MetaResult<()>
491where
492    C: ConnectionTrait,
493{
494    let obj_id = obj_id.into();
495    let count = Object::find_by_id(obj_id).count(db).await?;
496    if count == 0 {
497        return Err(MetaError::catalog_id_not_found(
498            object_type.as_str(),
499            obj_id,
500        ));
501    }
502    Ok(())
503}
504
505pub async fn ensure_job_not_canceled<C>(job_id: JobId, db: &C) -> MetaResult<()>
506where
507    C: ConnectionTrait,
508{
509    let count = Object::find_by_id(job_id).count(db).await?;
510    if count == 0 {
511        return Err(MetaError::cancelled(format!(
512            "job {} might be cancelled manually or by recovery",
513            job_id
514        )));
515    }
516    Ok(())
517}
518
519/// `ensure_user_id` ensures the existence of target user in the cluster.
520pub async fn ensure_user_id<C>(user_id: UserId, db: &C) -> MetaResult<()>
521where
522    C: ConnectionTrait,
523{
524    let count = User::find_by_id(user_id).count(db).await?;
525    if count == 0 {
526        return Err(anyhow!("user {} was concurrently dropped", user_id).into());
527    }
528    Ok(())
529}
530
531/// `check_database_name_duplicate` checks whether the database name is already used in the cluster.
532pub async fn check_database_name_duplicate<C>(name: &str, db: &C) -> MetaResult<()>
533where
534    C: ConnectionTrait,
535{
536    let count = Database::find()
537        .filter(database::Column::Name.eq(name))
538        .count(db)
539        .await?;
540    if count > 0 {
541        assert_eq!(count, 1);
542        return Err(MetaError::catalog_duplicated("database", name));
543    }
544    Ok(())
545}
546
547/// `check_function_signature_duplicate` checks whether the function name and its signature is already used in the target namespace.
548pub async fn check_function_signature_duplicate<C>(
549    pb_function: &PbFunction,
550    db: &C,
551) -> MetaResult<()>
552where
553    C: ConnectionTrait,
554{
555    let count = Function::find()
556        .inner_join(Object)
557        .filter(
558            object::Column::DatabaseId
559                .eq(pb_function.database_id)
560                .and(object::Column::SchemaId.eq(pb_function.schema_id))
561                .and(function::Column::Name.eq(&pb_function.name))
562                .and(
563                    function::Column::ArgTypes
564                        .eq(DataTypeArray::from(pb_function.arg_types.clone())),
565                ),
566        )
567        .count(db)
568        .await?;
569    if count > 0 {
570        assert_eq!(count, 1);
571        return Err(MetaError::catalog_duplicated("function", &pb_function.name));
572    }
573    Ok(())
574}
575
576/// `check_connection_name_duplicate` checks whether the connection name is already used in the target namespace.
577pub async fn check_connection_name_duplicate<C>(
578    pb_connection: &PbConnection,
579    db: &C,
580) -> MetaResult<()>
581where
582    C: ConnectionTrait,
583{
584    let count = Connection::find()
585        .inner_join(Object)
586        .filter(
587            object::Column::DatabaseId
588                .eq(pb_connection.database_id)
589                .and(object::Column::SchemaId.eq(pb_connection.schema_id))
590                .and(connection::Column::Name.eq(&pb_connection.name)),
591        )
592        .count(db)
593        .await?;
594    if count > 0 {
595        assert_eq!(count, 1);
596        return Err(MetaError::catalog_duplicated(
597            "connection",
598            &pb_connection.name,
599        ));
600    }
601    Ok(())
602}
603
604pub async fn check_secret_name_duplicate<C>(pb_secret: &PbSecret, db: &C) -> MetaResult<()>
605where
606    C: ConnectionTrait,
607{
608    let count = Secret::find()
609        .inner_join(Object)
610        .filter(
611            object::Column::DatabaseId
612                .eq(pb_secret.database_id)
613                .and(object::Column::SchemaId.eq(pb_secret.schema_id))
614                .and(secret::Column::Name.eq(&pb_secret.name)),
615        )
616        .count(db)
617        .await?;
618    if count > 0 {
619        assert_eq!(count, 1);
620        return Err(MetaError::catalog_duplicated("secret", &pb_secret.name));
621    }
622    Ok(())
623}
624
625pub async fn check_subscription_name_duplicate<C>(
626    pb_subscription: &PbSubscription,
627    db: &C,
628) -> MetaResult<()>
629where
630    C: ConnectionTrait,
631{
632    let count = Subscription::find()
633        .inner_join(Object)
634        .filter(
635            object::Column::DatabaseId
636                .eq(pb_subscription.database_id)
637                .and(object::Column::SchemaId.eq(pb_subscription.schema_id))
638                .and(subscription::Column::Name.eq(&pb_subscription.name)),
639        )
640        .count(db)
641        .await?;
642    if count > 0 {
643        assert_eq!(count, 1);
644        return Err(MetaError::catalog_duplicated(
645            "subscription",
646            &pb_subscription.name,
647        ));
648    }
649    Ok(())
650}
651
652/// `check_user_name_duplicate` checks whether the user is already existed in the cluster.
653pub async fn check_user_name_duplicate<C>(name: &str, db: &C) -> MetaResult<()>
654where
655    C: ConnectionTrait,
656{
657    let count = User::find()
658        .filter(user::Column::Name.eq(name))
659        .count(db)
660        .await?;
661    if count > 0 {
662        assert_eq!(count, 1);
663        return Err(MetaError::catalog_duplicated("user", name));
664    }
665    Ok(())
666}
667
668/// `check_relation_name_duplicate` checks whether the relation name is already used in the target namespace.
669pub async fn check_relation_name_duplicate<C>(
670    name: &str,
671    database_id: DatabaseId,
672    schema_id: SchemaId,
673    db: &C,
674) -> MetaResult<()>
675where
676    C: ConnectionTrait,
677{
678    macro_rules! check_duplicated {
679        ($obj_type:expr, $entity:ident, $table:ident) => {
680            let object_id = Object::find()
681                .select_only()
682                .column(object::Column::Oid)
683                .inner_join($entity)
684                .filter(
685                    object::Column::DatabaseId
686                        .eq(Some(database_id))
687                        .and(object::Column::SchemaId.eq(Some(schema_id)))
688                        .and($table::Column::Name.eq(name)),
689                )
690                .into_tuple::<ObjectId>()
691                .one(db)
692                .await?;
693            if let Some(oid) = object_id {
694                let check_creation = if $obj_type == ObjectType::View {
695                    false
696                } else if $obj_type == ObjectType::Source {
697                    let source_info = Source::find_by_id(oid.as_source_id())
698                        .select_only()
699                        .column(source::Column::SourceInfo)
700                        .into_tuple::<Option<StreamSourceInfo>>()
701                        .one(db)
702                        .await?
703                        .unwrap();
704                    source_info.map_or(false, |info| info.to_protobuf().is_shared())
705                } else {
706                    true
707                };
708                let job_id = oid.as_job_id();
709                return if check_creation
710                    && !matches!(
711                        StreamingJob::find_by_id(job_id)
712                            .select_only()
713                            .column(streaming_job::Column::JobStatus)
714                            .into_tuple::<JobStatus>()
715                            .one(db)
716                            .await?,
717                        Some(JobStatus::Created)
718                    ) {
719                    Err(MetaError::catalog_under_creation(
720                        $obj_type.as_str(),
721                        name,
722                        job_id,
723                    ))
724                } else {
725                    Err(MetaError::catalog_duplicated($obj_type.as_str(), name))
726                };
727            }
728        };
729    }
730    check_duplicated!(ObjectType::Table, Table, table);
731    check_duplicated!(ObjectType::Source, Source, source);
732    check_duplicated!(ObjectType::Sink, Sink, sink);
733    check_duplicated!(ObjectType::Index, Index, index);
734    check_duplicated!(ObjectType::View, View, view);
735
736    Ok(())
737}
738
739/// `check_schema_name_duplicate` checks whether the schema name is already used in the target database.
740pub async fn check_schema_name_duplicate<C>(
741    name: &str,
742    database_id: DatabaseId,
743    db: &C,
744) -> MetaResult<()>
745where
746    C: ConnectionTrait,
747{
748    let count = Object::find()
749        .inner_join(Schema)
750        .filter(
751            object::Column::ObjType
752                .eq(ObjectType::Schema)
753                .and(object::Column::DatabaseId.eq(Some(database_id)))
754                .and(schema::Column::Name.eq(name)),
755        )
756        .count(db)
757        .await?;
758    if count != 0 {
759        return Err(MetaError::catalog_duplicated("schema", name));
760    }
761
762    Ok(())
763}
764
765/// `check_object_refer_for_drop` checks whether the object is used by other objects except indexes.
766/// It returns an error that contains the details of the referring objects if it is used by others.
767pub async fn check_object_refer_for_drop<C>(
768    object_type: ObjectType,
769    object_id: ObjectId,
770    db: &C,
771) -> MetaResult<()>
772where
773    C: ConnectionTrait,
774{
775    // Ignore indexes.
776    let count = if object_type == ObjectType::Table {
777        ObjectDependency::find()
778            .join(
779                JoinType::InnerJoin,
780                object_dependency::Relation::Object1.def(),
781            )
782            .filter(
783                object_dependency::Column::Oid
784                    .eq(object_id)
785                    .and(object::Column::ObjType.ne(ObjectType::Index)),
786            )
787            .count(db)
788            .await?
789    } else {
790        ObjectDependency::find()
791            .filter(object_dependency::Column::Oid.eq(object_id))
792            .count(db)
793            .await?
794    };
795    if count != 0 {
796        // find the name of all objects that are using the given one.
797        let referring_objects = get_referring_objects(object_id, db).await?;
798        let referring_objs_map = referring_objects
799            .into_iter()
800            .filter(|o| o.obj_type != ObjectType::Index)
801            .into_group_map_by(|o| o.obj_type);
802        let mut details = vec![];
803        for (obj_type, objs) in referring_objs_map {
804            match obj_type {
805                ObjectType::Table => {
806                    let tables: Vec<(String, String)> = Object::find()
807                        .join(JoinType::InnerJoin, object::Relation::Table.def())
808                        .join(JoinType::InnerJoin, object::Relation::Database2.def())
809                        .join(JoinType::InnerJoin, object::Relation::Schema2.def())
810                        .select_only()
811                        .column(schema::Column::Name)
812                        .column(table::Column::Name)
813                        .filter(object::Column::Oid.is_in(objs.iter().map(|o| o.oid)))
814                        .into_tuple()
815                        .all(db)
816                        .await?;
817                    details.extend(tables.into_iter().map(|(schema_name, table_name)| {
818                        format!(
819                            "materialized view {}.{} depends on it",
820                            schema_name, table_name
821                        )
822                    }));
823                }
824                ObjectType::Sink => {
825                    let sinks: Vec<(String, String)> = Object::find()
826                        .join(JoinType::InnerJoin, object::Relation::Sink.def())
827                        .join(JoinType::InnerJoin, object::Relation::Database2.def())
828                        .join(JoinType::InnerJoin, object::Relation::Schema2.def())
829                        .select_only()
830                        .column(schema::Column::Name)
831                        .column(sink::Column::Name)
832                        .filter(object::Column::Oid.is_in(objs.iter().map(|o| o.oid)))
833                        .into_tuple()
834                        .all(db)
835                        .await?;
836                    if object_type == ObjectType::Table {
837                        let engine = Table::find_by_id(object_id.as_table_id())
838                            .select_only()
839                            .column(table::Column::Engine)
840                            .into_tuple::<table::Engine>()
841                            .one(db)
842                            .await?;
843                        if engine == Some(table::Engine::Iceberg) && sinks.len() == 1 {
844                            continue;
845                        }
846                    }
847                    details.extend(sinks.into_iter().map(|(schema_name, sink_name)| {
848                        format!("sink {}.{} depends on it", schema_name, sink_name)
849                    }));
850                }
851                ObjectType::View => {
852                    let views: Vec<(String, String)> = Object::find()
853                        .join(JoinType::InnerJoin, object::Relation::View.def())
854                        .join(JoinType::InnerJoin, object::Relation::Database2.def())
855                        .join(JoinType::InnerJoin, object::Relation::Schema2.def())
856                        .select_only()
857                        .column(schema::Column::Name)
858                        .column(view::Column::Name)
859                        .filter(object::Column::Oid.is_in(objs.iter().map(|o| o.oid)))
860                        .into_tuple()
861                        .all(db)
862                        .await?;
863                    details.extend(views.into_iter().map(|(schema_name, view_name)| {
864                        format!("view {}.{} depends on it", schema_name, view_name)
865                    }));
866                }
867                ObjectType::Subscription => {
868                    let subscriptions: Vec<(String, String)> = Object::find()
869                        .join(JoinType::InnerJoin, object::Relation::Subscription.def())
870                        .join(JoinType::InnerJoin, object::Relation::Database2.def())
871                        .join(JoinType::InnerJoin, object::Relation::Schema2.def())
872                        .select_only()
873                        .column(schema::Column::Name)
874                        .column(subscription::Column::Name)
875                        .filter(object::Column::Oid.is_in(objs.iter().map(|o| o.oid)))
876                        .into_tuple()
877                        .all(db)
878                        .await?;
879                    details.extend(subscriptions.into_iter().map(
880                        |(schema_name, subscription_name)| {
881                            format!(
882                                "subscription {}.{} depends on it",
883                                schema_name, subscription_name
884                            )
885                        },
886                    ));
887                }
888                ObjectType::Source => {
889                    let sources: Vec<(String, String)> = Object::find()
890                        .join(JoinType::InnerJoin, object::Relation::Source.def())
891                        .join(JoinType::InnerJoin, object::Relation::Database2.def())
892                        .join(JoinType::InnerJoin, object::Relation::Schema2.def())
893                        .select_only()
894                        .column(schema::Column::Name)
895                        .column(source::Column::Name)
896                        .filter(object::Column::Oid.is_in(objs.iter().map(|o| o.oid)))
897                        .into_tuple()
898                        .all(db)
899                        .await?;
900                    details.extend(sources.into_iter().map(|(schema_name, view_name)| {
901                        format!("source {}.{} depends on it", schema_name, view_name)
902                    }));
903                }
904                ObjectType::Connection => {
905                    let connections: Vec<(String, String)> = Object::find()
906                        .join(JoinType::InnerJoin, object::Relation::Connection.def())
907                        .join(JoinType::InnerJoin, object::Relation::Database2.def())
908                        .join(JoinType::InnerJoin, object::Relation::Schema2.def())
909                        .select_only()
910                        .column(schema::Column::Name)
911                        .column(connection::Column::Name)
912                        .filter(object::Column::Oid.is_in(objs.iter().map(|o| o.oid)))
913                        .into_tuple()
914                        .all(db)
915                        .await?;
916                    details.extend(connections.into_iter().map(|(schema_name, view_name)| {
917                        format!("connection {}.{} depends on it", schema_name, view_name)
918                    }));
919                }
920                // only the table, source, sink, subscription, view, connection and index will depend on other objects.
921                _ => bail!("unexpected referring object type: {}", obj_type.as_str()),
922            }
923        }
924        if details.is_empty() {
925            return Ok(());
926        }
927
928        return Err(MetaError::permission_denied(format!(
929            "{} used by {} other objects. \nDETAIL: {}\n\
930            {}",
931            object_type.as_str(),
932            details.len(),
933            details.join("\n"),
934            match object_type {
935                ObjectType::Function | ObjectType::Connection | ObjectType::Secret =>
936                    "HINT: DROP the dependent objects first.",
937                ObjectType::Database | ObjectType::Schema => unreachable!(),
938                _ => "HINT:  Use DROP ... CASCADE to drop the dependent objects too.",
939            }
940        )));
941    }
942    Ok(())
943}
944
945/// List all objects that are using the given one.
946pub async fn get_referring_objects<C>(object_id: ObjectId, db: &C) -> MetaResult<Vec<PartialObject>>
947where
948    C: ConnectionTrait,
949{
950    let objs = ObjectDependency::find()
951        .filter(object_dependency::Column::Oid.eq(object_id))
952        .join(
953            JoinType::InnerJoin,
954            object_dependency::Relation::Object1.def(),
955        )
956        .into_partial_model()
957        .all(db)
958        .await?;
959
960    Ok(objs)
961}
962
963/// `ensure_schema_empty` ensures that the schema is empty, used by `DROP SCHEMA`.
964pub async fn ensure_schema_empty<C>(schema_id: SchemaId, db: &C) -> MetaResult<()>
965where
966    C: ConnectionTrait,
967{
968    let count = Object::find()
969        .filter(object::Column::SchemaId.eq(Some(schema_id)))
970        .count(db)
971        .await?;
972    if count != 0 {
973        return Err(MetaError::permission_denied("schema is not empty"));
974    }
975
976    Ok(())
977}
978
979/// `list_user_info_by_ids` lists all users' info by their ids.
980pub async fn list_user_info_by_ids<C>(
981    user_ids: impl IntoIterator<Item = UserId>,
982    db: &C,
983) -> MetaResult<Vec<PbUserInfo>>
984where
985    C: ConnectionTrait,
986{
987    let mut user_infos = vec![];
988    for user_id in user_ids {
989        let user = User::find_by_id(user_id)
990            .one(db)
991            .await?
992            .ok_or_else(|| MetaError::catalog_id_not_found("user", user_id))?;
993        let mut user_info: PbUserInfo = user.into();
994        user_info.grant_privileges = get_user_privilege(user_id, db).await?;
995        user_infos.push(user_info);
996    }
997    Ok(user_infos)
998}
999
1000/// `get_object_owner` returns the owner of the given object.
1001pub async fn get_object_owner<C>(object_id: ObjectId, db: &C) -> MetaResult<UserId>
1002where
1003    C: ConnectionTrait,
1004{
1005    let obj_owner: UserId = Object::find_by_id(object_id)
1006        .select_only()
1007        .column(object::Column::OwnerId)
1008        .into_tuple()
1009        .one(db)
1010        .await?
1011        .ok_or_else(|| MetaError::catalog_id_not_found("object", object_id))?;
1012    Ok(obj_owner)
1013}
1014
1015/// `construct_privilege_dependency_query` constructs a query to find all privileges that are dependent on the given one.
1016///
1017/// # Examples
1018///
1019/// ```
1020/// use risingwave_meta::controller::utils::construct_privilege_dependency_query;
1021/// use sea_orm::sea_query::*;
1022/// use sea_orm::*;
1023///
1024/// let query = construct_privilege_dependency_query(vec![1, 2, 3]);
1025///
1026/// assert_eq!(
1027///    query.to_string(MysqlQueryBuilder),
1028///   r#"WITH RECURSIVE `granted_privilege_ids` (`id`, `user_id`) AS (SELECT `id`, `user_id` FROM `user_privilege` WHERE `user_privilege`.`id` IN (1, 2, 3) UNION ALL (SELECT `user_privilege`.`id`, `user_privilege`.`user_id` FROM `user_privilege` INNER JOIN `granted_privilege_ids` ON `granted_privilege_ids`.`id` = `dependent_id`)) SELECT `id`, `user_id` FROM `granted_privilege_ids`"#
1029/// );
1030/// assert_eq!(
1031///   query.to_string(PostgresQueryBuilder),
1032///  r#"WITH RECURSIVE "granted_privilege_ids" ("id", "user_id") AS (SELECT "id", "user_id" FROM "user_privilege" WHERE "user_privilege"."id" IN (1, 2, 3) UNION ALL (SELECT "user_privilege"."id", "user_privilege"."user_id" FROM "user_privilege" INNER JOIN "granted_privilege_ids" ON "granted_privilege_ids"."id" = "dependent_id")) SELECT "id", "user_id" FROM "granted_privilege_ids""#
1033/// );
1034/// assert_eq!(
1035///  query.to_string(SqliteQueryBuilder),
1036///  r#"WITH RECURSIVE "granted_privilege_ids" ("id", "user_id") AS (SELECT "id", "user_id" FROM "user_privilege" WHERE "user_privilege"."id" IN (1, 2, 3) UNION ALL SELECT "user_privilege"."id", "user_privilege"."user_id" FROM "user_privilege" INNER JOIN "granted_privilege_ids" ON "granted_privilege_ids"."id" = "dependent_id") SELECT "id", "user_id" FROM "granted_privilege_ids""#
1037/// );
1038/// ```
1039pub fn construct_privilege_dependency_query(ids: Vec<PrivilegeId>) -> WithQuery {
1040    let cte_alias = Alias::new("granted_privilege_ids");
1041    let cte_return_privilege_alias = Alias::new("id");
1042    let cte_return_user_alias = Alias::new("user_id");
1043
1044    let mut base_query = SelectStatement::new()
1045        .columns([user_privilege::Column::Id, user_privilege::Column::UserId])
1046        .from(UserPrivilege)
1047        .and_where(user_privilege::Column::Id.is_in(ids))
1048        .to_owned();
1049
1050    let cte_referencing = Query::select()
1051        .columns([
1052            (UserPrivilege, user_privilege::Column::Id),
1053            (UserPrivilege, user_privilege::Column::UserId),
1054        ])
1055        .from(UserPrivilege)
1056        .inner_join(
1057            cte_alias.clone(),
1058            Expr::col((cte_alias.clone(), cte_return_privilege_alias.clone()))
1059                .equals(user_privilege::Column::DependentId),
1060        )
1061        .to_owned();
1062
1063    let mut common_table_expr = CommonTableExpression::new();
1064    common_table_expr
1065        .query(base_query.union(UnionType::All, cte_referencing).to_owned())
1066        .columns([
1067            cte_return_privilege_alias.clone(),
1068            cte_return_user_alias.clone(),
1069        ])
1070        .table_name(cte_alias.clone());
1071
1072    SelectStatement::new()
1073        .columns([cte_return_privilege_alias, cte_return_user_alias])
1074        .from(cte_alias)
1075        .to_owned()
1076        .with(
1077            WithClause::new()
1078                .recursive(true)
1079                .cte(common_table_expr)
1080                .to_owned(),
1081        )
1082}
1083
1084pub async fn get_internal_tables_by_id<C>(job_id: JobId, db: &C) -> MetaResult<Vec<TableId>>
1085where
1086    C: ConnectionTrait,
1087{
1088    let table_ids: Vec<TableId> = Table::find()
1089        .select_only()
1090        .column(table::Column::TableId)
1091        .filter(
1092            table::Column::TableType
1093                .eq(TableType::Internal)
1094                .and(table::Column::BelongsToJobId.eq(job_id)),
1095        )
1096        .into_tuple()
1097        .all(db)
1098        .await?;
1099    Ok(table_ids)
1100}
1101
1102pub async fn get_index_state_tables_by_table_id<C>(
1103    table_id: TableId,
1104    db: &C,
1105) -> MetaResult<Vec<TableId>>
1106where
1107    C: ConnectionTrait,
1108{
1109    let mut index_table_ids: Vec<TableId> = Index::find()
1110        .select_only()
1111        .column(index::Column::IndexTableId)
1112        .filter(index::Column::PrimaryTableId.eq(table_id))
1113        .into_tuple()
1114        .all(db)
1115        .await?;
1116
1117    if !index_table_ids.is_empty() {
1118        let internal_table_ids: Vec<TableId> = Table::find()
1119            .select_only()
1120            .column(table::Column::TableId)
1121            .filter(
1122                table::Column::TableType
1123                    .eq(TableType::Internal)
1124                    .and(table::Column::BelongsToJobId.is_in(index_table_ids.clone())),
1125            )
1126            .into_tuple()
1127            .all(db)
1128            .await?;
1129
1130        index_table_ids.extend(internal_table_ids.into_iter());
1131    }
1132
1133    Ok(index_table_ids)
1134}
1135
1136/// `get_iceberg_related_object_ids` returns the related object ids of the iceberg source, sink and internal tables.
1137pub async fn get_iceberg_related_object_ids<C>(
1138    object_id: ObjectId,
1139    db: &C,
1140) -> MetaResult<Vec<ObjectId>>
1141where
1142    C: ConnectionTrait,
1143{
1144    let object = Object::find_by_id(object_id)
1145        .one(db)
1146        .await?
1147        .ok_or_else(|| MetaError::catalog_id_not_found("object", object_id))?;
1148    if object.obj_type != ObjectType::Table {
1149        return Ok(vec![]);
1150    }
1151
1152    let table = Table::find_by_id(object_id.as_table_id())
1153        .one(db)
1154        .await?
1155        .ok_or_else(|| MetaError::catalog_id_not_found("table", object_id))?;
1156    if !matches!(table.engine, Some(table::Engine::Iceberg)) {
1157        return Ok(vec![]);
1158    }
1159
1160    let database_id = object.database_id.unwrap();
1161    let schema_id = object.schema_id.unwrap();
1162
1163    let mut related_objects = vec![];
1164
1165    let iceberg_sink_name = format!("{}{}", ICEBERG_SINK_PREFIX, table.name);
1166    let iceberg_sink_id = Sink::find()
1167        .inner_join(Object)
1168        .select_only()
1169        .column(sink::Column::SinkId)
1170        .filter(
1171            object::Column::DatabaseId
1172                .eq(database_id)
1173                .and(object::Column::SchemaId.eq(schema_id))
1174                .and(sink::Column::Name.eq(&iceberg_sink_name)),
1175        )
1176        .into_tuple::<SinkId>()
1177        .one(db)
1178        .await?;
1179    if let Some(sink_id) = iceberg_sink_id {
1180        related_objects.push(sink_id.as_object_id());
1181        let sink_internal_tables = get_internal_tables_by_id(sink_id.as_job_id(), db).await?;
1182        related_objects.extend(
1183            sink_internal_tables
1184                .into_iter()
1185                .map(|tid| tid.as_object_id()),
1186        );
1187    } else {
1188        warn!(
1189            "iceberg table {} missing sink {}",
1190            table.name, iceberg_sink_name
1191        );
1192    }
1193
1194    let iceberg_source_name = format!("{}{}", ICEBERG_SOURCE_PREFIX, table.name);
1195    let iceberg_source_id = Source::find()
1196        .inner_join(Object)
1197        .select_only()
1198        .column(source::Column::SourceId)
1199        .filter(
1200            object::Column::DatabaseId
1201                .eq(database_id)
1202                .and(object::Column::SchemaId.eq(schema_id))
1203                .and(source::Column::Name.eq(&iceberg_source_name)),
1204        )
1205        .into_tuple::<SourceId>()
1206        .one(db)
1207        .await?;
1208    if let Some(source_id) = iceberg_source_id {
1209        related_objects.push(source_id.as_object_id());
1210    } else {
1211        warn!(
1212            "iceberg table {} missing source {}",
1213            table.name, iceberg_source_name
1214        );
1215    }
1216
1217    Ok(related_objects)
1218}
1219
1220/// Load streaming jobs by job ids.
1221pub(crate) async fn load_streaming_jobs_by_ids<C>(
1222    txn: &C,
1223    job_ids: impl IntoIterator<Item = JobId>,
1224) -> MetaResult<HashMap<JobId, streaming_job::Model>>
1225where
1226    C: ConnectionTrait,
1227{
1228    let job_ids: HashSet<JobId> = job_ids.into_iter().collect();
1229    if job_ids.is_empty() {
1230        return Ok(HashMap::new());
1231    }
1232    let jobs = streaming_job::Entity::find()
1233        .filter(streaming_job::Column::JobId.is_in(job_ids.clone()))
1234        .all(txn)
1235        .await?;
1236    Ok(jobs.into_iter().map(|job| (job.job_id, job)).collect())
1237}
1238
1239#[derive(Clone, DerivePartialModel, FromQueryResult)]
1240#[sea_orm(entity = "UserPrivilege")]
1241pub struct PartialUserPrivilege {
1242    pub id: PrivilegeId,
1243    pub user_id: UserId,
1244}
1245
1246pub async fn get_referring_privileges_cascade<C>(
1247    ids: Vec<PrivilegeId>,
1248    db: &C,
1249) -> MetaResult<Vec<PartialUserPrivilege>>
1250where
1251    C: ConnectionTrait,
1252{
1253    let query = construct_privilege_dependency_query(ids);
1254    let (sql, values) = query.build_any(&*db.get_database_backend().get_query_builder());
1255    let privileges = PartialUserPrivilege::find_by_statement(Statement::from_sql_and_values(
1256        db.get_database_backend(),
1257        sql,
1258        values,
1259    ))
1260    .all(db)
1261    .await?;
1262
1263    Ok(privileges)
1264}
1265
1266/// `ensure_privileges_not_referred` ensures that the privileges are not granted to any other users.
1267pub async fn ensure_privileges_not_referred<C>(ids: Vec<PrivilegeId>, db: &C) -> MetaResult<()>
1268where
1269    C: ConnectionTrait,
1270{
1271    let count = UserPrivilege::find()
1272        .filter(user_privilege::Column::DependentId.is_in(ids))
1273        .count(db)
1274        .await?;
1275    if count != 0 {
1276        return Err(MetaError::permission_denied(format!(
1277            "privileges granted to {} other ones.",
1278            count
1279        )));
1280    }
1281    Ok(())
1282}
1283
1284/// `get_user_privilege` returns the privileges of the given user.
1285pub async fn get_user_privilege<C>(user_id: UserId, db: &C) -> MetaResult<Vec<PbGrantPrivilege>>
1286where
1287    C: ConnectionTrait,
1288{
1289    let user_privileges = UserPrivilege::find()
1290        .find_also_related(Object)
1291        .filter(user_privilege::Column::UserId.eq(user_id))
1292        .all(db)
1293        .await?;
1294    Ok(user_privileges
1295        .into_iter()
1296        .map(|(privilege, object)| {
1297            let object = object.unwrap();
1298            let obj = match object.obj_type {
1299                ObjectType::Database => PbGrantObject::DatabaseId(object.oid.as_database_id()),
1300                ObjectType::Schema => PbGrantObject::SchemaId(object.oid.as_schema_id()),
1301                ObjectType::Table | ObjectType::Index => {
1302                    PbGrantObject::TableId(object.oid.as_table_id())
1303                }
1304                ObjectType::Source => PbGrantObject::SourceId(object.oid.as_source_id()),
1305                ObjectType::Sink => PbGrantObject::SinkId(object.oid.as_sink_id()),
1306                ObjectType::View => PbGrantObject::ViewId(object.oid.as_view_id()),
1307                ObjectType::Function => PbGrantObject::FunctionId(object.oid.as_function_id()),
1308                ObjectType::Connection => {
1309                    PbGrantObject::ConnectionId(object.oid.as_connection_id())
1310                }
1311                ObjectType::Subscription => {
1312                    PbGrantObject::SubscriptionId(object.oid.as_subscription_id())
1313                }
1314                ObjectType::Secret => PbGrantObject::SecretId(object.oid.as_secret_id()),
1315            };
1316            PbGrantPrivilege {
1317                action_with_opts: vec![PbActionWithGrantOption {
1318                    action: PbAction::from(privilege.action) as _,
1319                    with_grant_option: privilege.with_grant_option,
1320                    granted_by: privilege.granted_by as _,
1321                }],
1322                object: Some(obj),
1323            }
1324        })
1325        .collect())
1326}
1327
1328pub async fn get_table_columns(
1329    txn: &impl ConnectionTrait,
1330    id: TableId,
1331) -> MetaResult<ColumnCatalogArray> {
1332    let columns = Table::find_by_id(id)
1333        .select_only()
1334        .columns([table::Column::Columns])
1335        .into_tuple::<ColumnCatalogArray>()
1336        .one(txn)
1337        .await?
1338        .ok_or_else(|| MetaError::catalog_id_not_found("table", id))?;
1339    Ok(columns)
1340}
1341
1342/// `grant_default_privileges_automatically` grants default privileges automatically
1343/// for the given new object. It returns the list of user infos whose privileges are updated.
1344pub async fn grant_default_privileges_automatically<C>(
1345    db: &C,
1346    object_id: impl Into<ObjectId>,
1347) -> MetaResult<Vec<PbUserInfo>>
1348where
1349    C: ConnectionTrait,
1350{
1351    let object_id = object_id.into();
1352    let object = Object::find_by_id(object_id)
1353        .one(db)
1354        .await?
1355        .ok_or_else(|| MetaError::catalog_id_not_found("object", object_id))?;
1356    assert_ne!(object.obj_type, ObjectType::Database);
1357
1358    let for_mview_filter = if object.obj_type == ObjectType::Table {
1359        let table_type = Table::find_by_id(object_id.as_table_id())
1360            .select_only()
1361            .column(table::Column::TableType)
1362            .into_tuple::<TableType>()
1363            .one(db)
1364            .await?
1365            .ok_or_else(|| MetaError::catalog_id_not_found("table", object_id))?;
1366        user_default_privilege::Column::ForMaterializedView
1367            .eq(table_type == TableType::MaterializedView)
1368    } else {
1369        user_default_privilege::Column::ForMaterializedView.eq(false)
1370    };
1371    let schema_filter = if let Some(schema_id) = &object.schema_id {
1372        user_default_privilege::Column::SchemaId.eq(*schema_id)
1373    } else {
1374        user_default_privilege::Column::SchemaId.is_null()
1375    };
1376
1377    let default_privileges: Vec<(UserId, UserId, Action, bool)> = UserDefaultPrivilege::find()
1378        .select_only()
1379        .columns([
1380            user_default_privilege::Column::Grantee,
1381            user_default_privilege::Column::GrantedBy,
1382            user_default_privilege::Column::Action,
1383            user_default_privilege::Column::WithGrantOption,
1384        ])
1385        .filter(
1386            user_default_privilege::Column::DatabaseId
1387                .eq(object.database_id.unwrap())
1388                .and(schema_filter)
1389                .and(user_default_privilege::Column::UserId.eq(object.owner_id))
1390                .and(user_default_privilege::Column::ObjectType.eq(object.obj_type))
1391                .and(for_mview_filter),
1392        )
1393        .into_tuple()
1394        .all(db)
1395        .await?;
1396    if default_privileges.is_empty() {
1397        return Ok(vec![]);
1398    }
1399
1400    let updated_user_ids = default_privileges
1401        .iter()
1402        .map(|(grantee, _, _, _)| *grantee)
1403        .collect::<HashSet<_>>();
1404
1405    for (grantee, granted_by, action, with_grant_option) in default_privileges {
1406        UserPrivilege::insert(user_privilege::ActiveModel {
1407            user_id: Set(grantee),
1408            oid: Set(object_id),
1409            granted_by: Set(granted_by),
1410            action: Set(action),
1411            with_grant_option: Set(with_grant_option),
1412            ..Default::default()
1413        })
1414        .exec(db)
1415        .await?;
1416        if action == Action::Select {
1417            // Grant SELECT privilege for internal tables if the action is SELECT.
1418            let internal_table_ids = get_internal_tables_by_id(object_id.as_job_id(), db).await?;
1419            if !internal_table_ids.is_empty() {
1420                for internal_table_id in &internal_table_ids {
1421                    UserPrivilege::insert(user_privilege::ActiveModel {
1422                        user_id: Set(grantee),
1423                        oid: Set(internal_table_id.as_object_id()),
1424                        granted_by: Set(granted_by),
1425                        action: Set(Action::Select),
1426                        with_grant_option: Set(with_grant_option),
1427                        ..Default::default()
1428                    })
1429                    .exec(db)
1430                    .await?;
1431                }
1432            }
1433
1434            // Additionally, grant SELECT privilege for iceberg related objects if the action is SELECT.
1435            let iceberg_privilege_object_ids =
1436                get_iceberg_related_object_ids(object_id, db).await?;
1437            if !iceberg_privilege_object_ids.is_empty() {
1438                for iceberg_object_id in &iceberg_privilege_object_ids {
1439                    UserPrivilege::insert(user_privilege::ActiveModel {
1440                        user_id: Set(grantee),
1441                        oid: Set(*iceberg_object_id),
1442                        granted_by: Set(granted_by),
1443                        action: Set(action),
1444                        with_grant_option: Set(with_grant_option),
1445                        ..Default::default()
1446                    })
1447                    .exec(db)
1448                    .await?;
1449                }
1450            }
1451        }
1452    }
1453
1454    let updated_user_infos = list_user_info_by_ids(updated_user_ids, db).await?;
1455    Ok(updated_user_infos)
1456}
1457
1458// todo: remove it after migrated to sql backend.
1459pub fn extract_grant_obj_id(object: &PbGrantObject) -> ObjectId {
1460    match object {
1461        PbGrantObject::DatabaseId(id) => (*id).into(),
1462        PbGrantObject::SchemaId(id) => (*id).into(),
1463        PbGrantObject::TableId(id) => (*id).into(),
1464        PbGrantObject::SourceId(id) => (*id).into(),
1465        PbGrantObject::SinkId(id) => (*id).into(),
1466        PbGrantObject::ViewId(id) => (*id).into(),
1467        PbGrantObject::FunctionId(id) => (*id).into(),
1468        PbGrantObject::SubscriptionId(id) => (*id).into(),
1469        PbGrantObject::ConnectionId(id) => (*id).into(),
1470        PbGrantObject::SecretId(id) => (*id).into(),
1471    }
1472}
1473
1474pub async fn insert_fragment_relations(
1475    db: &impl ConnectionTrait,
1476    downstream_fragment_relations: &FragmentDownstreamRelation,
1477) -> MetaResult<()> {
1478    let mut relations = vec![];
1479    for (upstream_fragment_id, downstreams) in downstream_fragment_relations {
1480        for downstream in downstreams {
1481            relations.push(
1482                fragment_relation::Model {
1483                    source_fragment_id: *upstream_fragment_id as _,
1484                    target_fragment_id: downstream.downstream_fragment_id as _,
1485                    dispatcher_type: downstream.dispatcher_type,
1486                    dist_key_indices: downstream
1487                        .dist_key_indices
1488                        .iter()
1489                        .map(|idx| *idx as i32)
1490                        .collect_vec()
1491                        .into(),
1492                    output_indices: downstream
1493                        .output_mapping
1494                        .indices
1495                        .iter()
1496                        .map(|idx| *idx as i32)
1497                        .collect_vec()
1498                        .into(),
1499                    output_type_mapping: Some(downstream.output_mapping.types.clone().into()),
1500                }
1501                .into_active_model(),
1502            );
1503        }
1504    }
1505    if !relations.is_empty() {
1506        FragmentRelation::insert_many(relations).exec(db).await?;
1507    }
1508    Ok(())
1509}
1510
1511pub fn compose_dispatchers(
1512    source_fragment_distribution: DistributionType,
1513    source_fragment_actors: &HashMap<crate::model::ActorId, Option<Bitmap>>,
1514    target_fragment_id: crate::model::FragmentId,
1515    target_fragment_distribution: DistributionType,
1516    target_fragment_actors: &HashMap<crate::model::ActorId, Option<Bitmap>>,
1517    dispatcher_type: DispatcherType,
1518    dist_key_indices: Vec<u32>,
1519    output_mapping: PbDispatchOutputMapping,
1520) -> (
1521    HashMap<crate::model::ActorId, PbDispatcher>,
1522    Option<HashMap<crate::model::ActorId, crate::model::ActorId>>,
1523) {
1524    match dispatcher_type {
1525        DispatcherType::Hash => {
1526            let dispatcher = PbDispatcher {
1527                r#type: PbDispatcherType::from(dispatcher_type) as _,
1528                dist_key_indices,
1529                output_mapping: output_mapping.into(),
1530                hash_mapping: Some(
1531                    ActorMapping::from_bitmaps(
1532                        &target_fragment_actors
1533                            .iter()
1534                            .map(|(actor_id, bitmap)| {
1535                                (
1536                                    *actor_id as _,
1537                                    bitmap
1538                                        .clone()
1539                                        .expect("downstream hash dispatch must have distribution"),
1540                                )
1541                            })
1542                            .collect(),
1543                    )
1544                    .to_protobuf(),
1545                ),
1546                dispatcher_id: target_fragment_id,
1547                downstream_actor_id: target_fragment_actors.keys().copied().collect(),
1548            };
1549            (
1550                source_fragment_actors
1551                    .keys()
1552                    .map(|source_actor_id| (*source_actor_id, dispatcher.clone()))
1553                    .collect(),
1554                None,
1555            )
1556        }
1557        DispatcherType::Broadcast | DispatcherType::Simple => {
1558            let dispatcher = PbDispatcher {
1559                r#type: PbDispatcherType::from(dispatcher_type) as _,
1560                dist_key_indices,
1561                output_mapping: output_mapping.into(),
1562                hash_mapping: None,
1563                dispatcher_id: target_fragment_id,
1564                downstream_actor_id: target_fragment_actors.keys().copied().collect(),
1565            };
1566            (
1567                source_fragment_actors
1568                    .keys()
1569                    .map(|source_actor_id| (*source_actor_id, dispatcher.clone()))
1570                    .collect(),
1571                None,
1572            )
1573        }
1574        DispatcherType::NoShuffle => {
1575            let no_shuffle_map = resolve_no_shuffle_actor_mapping(
1576                source_fragment_distribution,
1577                source_fragment_actors
1578                    .iter()
1579                    .map(|(&id, bitmap)| (id, bitmap)),
1580                target_fragment_distribution,
1581                target_fragment_actors
1582                    .iter()
1583                    .map(|(&id, bitmap)| (id, bitmap)),
1584            );
1585            let dispatchers = no_shuffle_map
1586                .iter()
1587                .map(|(&upstream_actor_id, &downstream_actor_id)| {
1588                    (
1589                        upstream_actor_id,
1590                        PbDispatcher {
1591                            r#type: PbDispatcherType::NoShuffle as _,
1592                            dist_key_indices: dist_key_indices.clone(),
1593                            output_mapping: output_mapping.clone().into(),
1594                            hash_mapping: None,
1595                            dispatcher_id: target_fragment_id,
1596                            downstream_actor_id: vec![downstream_actor_id],
1597                        },
1598                    )
1599                })
1600                .collect();
1601            (dispatchers, Some(no_shuffle_map))
1602        }
1603    }
1604}
1605
1606/// Resolve 1:1 actor mapping between two no-shuffle-connected fragments.
1607///
1608/// Returns `Vec<(upstream_actor_id, downstream_actor_id)>` pairs. The actor id
1609/// type is generic so the same logic can be reused both for real `ActorId` mapping
1610/// and for alignment checks with synthetic ids (e.g. `(WorkerId, actor_idx)`).
1611///
1612/// For `Single` distribution the single actors are paired directly.
1613/// For `Hash` distribution actors are matched by their bitmap's first vnode and
1614/// full bitmap equality is asserted.
1615pub fn resolve_no_shuffle_actor_mapping<
1616    'a,
1617    ActorId: Copy + Eq + std::hash::Hash + std::fmt::Debug,
1618>(
1619    source_fragment_distribution: DistributionType,
1620    source_fragment_actors: impl IntoIterator<Item = (ActorId, &'a Option<Bitmap>)>,
1621    target_fragment_distribution: DistributionType,
1622    target_fragment_actors: impl IntoIterator<Item = (ActorId, &'a Option<Bitmap>)>,
1623) -> HashMap<ActorId, ActorId> {
1624    assert_eq!(source_fragment_distribution, target_fragment_distribution);
1625
1626    match source_fragment_distribution {
1627        DistributionType::Single => {
1628            let assert_singleton = |bitmap: &Option<Bitmap>| {
1629                assert!(
1630                    bitmap.as_ref().map(|bitmap| bitmap.all()).unwrap_or(true),
1631                    "not singleton: {:?}",
1632                    bitmap
1633                );
1634            };
1635            let (source_actor_id, bitmap) = source_fragment_actors
1636                .into_iter()
1637                .exactly_one()
1638                .ok()
1639                .expect("Single distribution should have exactly one source actor");
1640            assert_singleton(bitmap);
1641            let (target_actor_id, bitmap) = target_fragment_actors
1642                .into_iter()
1643                .exactly_one()
1644                .ok()
1645                .expect("Single distribution should have exactly one target actor");
1646            assert_singleton(bitmap);
1647            HashMap::from([(source_actor_id, target_actor_id)])
1648        }
1649        DistributionType::Hash => {
1650            // Build target index keyed by first vnode (one pass over target).
1651            let mut target_by_vnode: HashMap<_, _> = target_fragment_actors
1652                .into_iter()
1653                .map(|(actor_id, bitmap)| {
1654                    let bitmap = bitmap
1655                        .as_ref()
1656                        .expect("hash distribution should have bitmap");
1657                    let first_vnode = bitmap.iter_vnodes().next().expect("non-empty bitmap");
1658                    (first_vnode, (actor_id, bitmap))
1659                })
1660                .collect();
1661
1662            let target_count = target_by_vnode.len();
1663
1664            // One pass over source, matching against the target index.
1665            let mapping: HashMap<_, _> = source_fragment_actors
1666                .into_iter()
1667                .map(|(source_actor_id, source_bitmap)| {
1668                    let source_bitmap = source_bitmap
1669                        .as_ref()
1670                        .expect("hash distribution should have bitmap");
1671                    let first_vnode = source_bitmap
1672                        .iter_vnodes()
1673                        .next()
1674                        .expect("non-empty bitmap");
1675                    let (target_actor_id, target_bitmap) =
1676                        target_by_vnode.remove(&first_vnode).unwrap_or_else(|| {
1677                            panic!(
1678                                "cannot find matched target actor: {:?} first_vnode {:?}",
1679                                source_actor_id, first_vnode,
1680                            )
1681                        });
1682                    assert_eq!(
1683                        source_bitmap, target_bitmap,
1684                        "bitmap mismatch for source {:?} target {:?} at first_vnode {:?}",
1685                        source_actor_id, target_actor_id, first_vnode,
1686                    );
1687                    (source_actor_id, target_actor_id)
1688                })
1689                .collect();
1690
1691            assert_eq!(
1692                mapping.len(),
1693                target_count,
1694                "no-shuffle should have equal upstream downstream actor count: {} vs {}",
1695                mapping.len(),
1696                target_count,
1697            );
1698
1699            mapping
1700        }
1701    }
1702}
1703
1704pub fn rebuild_fragment_mapping(fragment: &SharedFragmentInfo) -> PbFragmentWorkerSlotMapping {
1705    let fragment_worker_slot_mapping = match fragment.distribution_type {
1706        DistributionType::Single => {
1707            let actor = fragment.actors.values().exactly_one().unwrap();
1708            WorkerSlotMapping::new_single(WorkerSlotId::new(actor.worker_id as _, 0))
1709        }
1710        DistributionType::Hash => {
1711            let actor_bitmaps: HashMap<_, _> = fragment
1712                .actors
1713                .iter()
1714                .map(|(actor_id, actor_info)| {
1715                    let vnode_bitmap = actor_info
1716                        .vnode_bitmap
1717                        .as_ref()
1718                        .cloned()
1719                        .expect("actor bitmap shouldn't be none in hash fragment");
1720
1721                    (*actor_id as hash::ActorId, vnode_bitmap)
1722                })
1723                .collect();
1724
1725            let actor_mapping = ActorMapping::from_bitmaps(&actor_bitmaps);
1726
1727            let actor_locations = fragment
1728                .actors
1729                .iter()
1730                .map(|(actor_id, actor_info)| (*actor_id as hash::ActorId, actor_info.worker_id))
1731                .collect();
1732
1733            actor_mapping.to_worker_slot(&actor_locations)
1734        }
1735    };
1736
1737    PbFragmentWorkerSlotMapping {
1738        fragment_id: fragment.fragment_id,
1739        mapping: Some(fragment_worker_slot_mapping.to_protobuf()),
1740    }
1741}
1742
1743/// For the given streaming jobs, returns
1744/// - All source fragments
1745/// - All sink fragments
1746/// - All actors
1747/// - All fragments
1748pub async fn get_fragments_for_jobs<C>(
1749    db: &C,
1750    streaming_jobs: Vec<JobId>,
1751) -> MetaResult<(
1752    HashMap<SourceId, BTreeSet<FragmentId>>,
1753    HashSet<FragmentId>,
1754    HashSet<FragmentId>,
1755)>
1756where
1757    C: ConnectionTrait,
1758{
1759    if streaming_jobs.is_empty() {
1760        return Ok((HashMap::default(), HashSet::default(), HashSet::default()));
1761    }
1762
1763    let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find()
1764        .select_only()
1765        .columns([
1766            fragment::Column::FragmentId,
1767            fragment::Column::FragmentTypeMask,
1768            fragment::Column::StreamNode,
1769        ])
1770        .filter(fragment::Column::JobId.is_in(streaming_jobs))
1771        .into_tuple()
1772        .all(db)
1773        .await?;
1774
1775    let fragment_ids: HashSet<_> = fragments
1776        .iter()
1777        .map(|(fragment_id, _, _)| *fragment_id)
1778        .collect();
1779
1780    let mut source_fragment_ids: HashMap<SourceId, BTreeSet<FragmentId>> = HashMap::new();
1781    let mut sink_fragment_ids: HashSet<FragmentId> = HashSet::new();
1782    for (fragment_id, mask, stream_node) in fragments {
1783        if FragmentTypeMask::from(mask).contains(FragmentTypeFlag::Source)
1784            && let Some(source_id) = stream_node.to_protobuf().find_stream_source()
1785        {
1786            source_fragment_ids
1787                .entry(source_id)
1788                .or_default()
1789                .insert(fragment_id);
1790        }
1791        if FragmentTypeMask::from(mask).contains(FragmentTypeFlag::Sink) {
1792            sink_fragment_ids.insert(fragment_id);
1793        }
1794    }
1795
1796    Ok((source_fragment_ids, sink_fragment_ids, fragment_ids))
1797}
1798
1799/// Build a object group for notifying the deletion of the given objects.
1800///
1801/// Note that only id fields are filled in the object info, as the arguments are partial objects.
1802/// As a result, the returned notification info should only be used for deletion.
1803pub(crate) fn build_object_group_for_delete(
1804    partial_objects: Vec<PartialObject>,
1805) -> NotificationInfo {
1806    let mut objects = vec![];
1807    for obj in partial_objects {
1808        match obj.obj_type {
1809            ObjectType::Database => objects.push(PbObject {
1810                object_info: Some(PbObjectInfo::Database(PbDatabase {
1811                    id: obj.oid.as_database_id(),
1812                    ..Default::default()
1813                })),
1814            }),
1815            ObjectType::Schema => objects.push(PbObject {
1816                object_info: Some(PbObjectInfo::Schema(PbSchema {
1817                    id: obj.oid.as_schema_id(),
1818                    database_id: obj.database_id.unwrap(),
1819                    ..Default::default()
1820                })),
1821            }),
1822            ObjectType::Table => objects.push(PbObject {
1823                object_info: Some(PbObjectInfo::Table(PbTable {
1824                    id: obj.oid.as_table_id(),
1825                    schema_id: obj.schema_id.unwrap(),
1826                    database_id: obj.database_id.unwrap(),
1827                    ..Default::default()
1828                })),
1829            }),
1830            ObjectType::Source => objects.push(PbObject {
1831                object_info: Some(PbObjectInfo::Source(PbSource {
1832                    id: obj.oid.as_source_id(),
1833                    schema_id: obj.schema_id.unwrap(),
1834                    database_id: obj.database_id.unwrap(),
1835                    ..Default::default()
1836                })),
1837            }),
1838            ObjectType::Sink => objects.push(PbObject {
1839                object_info: Some(PbObjectInfo::Sink(PbSink {
1840                    id: obj.oid.as_sink_id(),
1841                    schema_id: obj.schema_id.unwrap(),
1842                    database_id: obj.database_id.unwrap(),
1843                    ..Default::default()
1844                })),
1845            }),
1846            ObjectType::Subscription => objects.push(PbObject {
1847                object_info: Some(PbObjectInfo::Subscription(PbSubscription {
1848                    id: obj.oid.as_subscription_id(),
1849                    schema_id: obj.schema_id.unwrap(),
1850                    database_id: obj.database_id.unwrap(),
1851                    ..Default::default()
1852                })),
1853            }),
1854            ObjectType::View => objects.push(PbObject {
1855                object_info: Some(PbObjectInfo::View(PbView {
1856                    id: obj.oid.as_view_id(),
1857                    schema_id: obj.schema_id.unwrap(),
1858                    database_id: obj.database_id.unwrap(),
1859                    ..Default::default()
1860                })),
1861            }),
1862            ObjectType::Index => {
1863                objects.push(PbObject {
1864                    object_info: Some(PbObjectInfo::Index(PbIndex {
1865                        id: obj.oid.as_index_id(),
1866                        schema_id: obj.schema_id.unwrap(),
1867                        database_id: obj.database_id.unwrap(),
1868                        ..Default::default()
1869                    })),
1870                });
1871                objects.push(PbObject {
1872                    object_info: Some(PbObjectInfo::Table(PbTable {
1873                        id: obj.oid.as_table_id(),
1874                        schema_id: obj.schema_id.unwrap(),
1875                        database_id: obj.database_id.unwrap(),
1876                        ..Default::default()
1877                    })),
1878                });
1879            }
1880            ObjectType::Function => objects.push(PbObject {
1881                object_info: Some(PbObjectInfo::Function(PbFunction {
1882                    id: obj.oid.as_function_id(),
1883                    schema_id: obj.schema_id.unwrap(),
1884                    database_id: obj.database_id.unwrap(),
1885                    ..Default::default()
1886                })),
1887            }),
1888            ObjectType::Connection => objects.push(PbObject {
1889                object_info: Some(PbObjectInfo::Connection(PbConnection {
1890                    id: obj.oid.as_connection_id(),
1891                    schema_id: obj.schema_id.unwrap(),
1892                    database_id: obj.database_id.unwrap(),
1893                    ..Default::default()
1894                })),
1895            }),
1896            ObjectType::Secret => objects.push(PbObject {
1897                object_info: Some(PbObjectInfo::Secret(PbSecret {
1898                    id: obj.oid.as_secret_id(),
1899                    schema_id: obj.schema_id.unwrap(),
1900                    database_id: obj.database_id.unwrap(),
1901                    ..Default::default()
1902                })),
1903            }),
1904        }
1905    }
1906    NotificationInfo::ObjectGroup(PbObjectGroup {
1907        objects,
1908        dependencies: vec![],
1909    })
1910}
1911
1912pub fn extract_external_table_name_from_definition(table_definition: &str) -> Option<String> {
1913    let [mut definition]: [_; 1] = Parser::parse_sql(table_definition)
1914        .context("unable to parse table definition")
1915        .inspect_err(|e| {
1916            tracing::error!(
1917                target: "auto_schema_change",
1918                error = %e.as_report(),
1919                "failed to parse table definition")
1920        })
1921        .unwrap()
1922        .try_into()
1923        .unwrap();
1924    if let SqlStatement::CreateTable { cdc_table_info, .. } = &mut definition {
1925        cdc_table_info
1926            .clone()
1927            .map(|cdc_table_info| cdc_table_info.external_table_name)
1928    } else {
1929        None
1930    }
1931}
1932
1933/// `rename_relation` renames the target relation and its definition,
1934/// it commits the changes to the transaction and returns the updated relations and the old name.
1935pub async fn rename_relation(
1936    txn: &DatabaseTransaction,
1937    object_type: ObjectType,
1938    object_id: ObjectId,
1939    object_name: &str,
1940) -> MetaResult<(Vec<PbObject>, String)> {
1941    use sea_orm::ActiveModelTrait;
1942
1943    use crate::controller::rename::alter_relation_rename;
1944
1945    let mut to_update_relations = vec![];
1946    // rename relation.
1947    macro_rules! rename_relation {
1948        ($entity:ident, $table:ident, $identity:ident, $object_id:expr) => {{
1949            let (mut relation, obj) = $entity::find_by_id($object_id)
1950                .find_also_related(Object)
1951                .one(txn)
1952                .await?
1953                .unwrap();
1954            let obj = obj.unwrap();
1955            let old_name = relation.name.clone();
1956            relation.name = object_name.into();
1957            if obj.obj_type != ObjectType::View {
1958                relation.definition = alter_relation_rename(&relation.definition, object_name);
1959            }
1960            let active_model = $table::ActiveModel {
1961                $identity: Set(relation.$identity),
1962                name: Set(object_name.into()),
1963                definition: Set(relation.definition.clone()),
1964                ..Default::default()
1965            };
1966            active_model.update(txn).await?;
1967            let streaming_job = streaming_job::Entity::find_by_id($object_id.as_raw_id())
1968                .one(txn)
1969                .await?;
1970            to_update_relations.push(PbObject {
1971                object_info: Some(PbObjectInfo::$entity(
1972                    ObjectModel(relation, obj, streaming_job).into(),
1973                )),
1974            });
1975            old_name
1976        }};
1977    }
1978    // TODO: check is there any thing to change for shared source?
1979    let old_name = match object_type {
1980        ObjectType::Table => {
1981            let associated_source_id: Option<SourceId> = Source::find()
1982                .select_only()
1983                .column(source::Column::SourceId)
1984                .filter(source::Column::OptionalAssociatedTableId.eq(object_id))
1985                .into_tuple()
1986                .one(txn)
1987                .await?;
1988            if let Some(source_id) = associated_source_id {
1989                rename_relation!(Source, source, source_id, source_id);
1990            }
1991            rename_relation!(Table, table, table_id, object_id.as_table_id())
1992        }
1993        ObjectType::Source => {
1994            rename_relation!(Source, source, source_id, object_id.as_source_id())
1995        }
1996        ObjectType::Sink => rename_relation!(Sink, sink, sink_id, object_id.as_sink_id()),
1997        ObjectType::Subscription => {
1998            rename_relation!(
1999                Subscription,
2000                subscription,
2001                subscription_id,
2002                object_id.as_subscription_id()
2003            )
2004        }
2005        ObjectType::View => rename_relation!(View, view, view_id, object_id.as_view_id()),
2006        ObjectType::Index => {
2007            let (mut index, obj) = Index::find_by_id(object_id.as_index_id())
2008                .find_also_related(Object)
2009                .one(txn)
2010                .await?
2011                .unwrap();
2012            let streaming_job = streaming_job::Entity::find_by_id(index.index_id.as_job_id())
2013                .one(txn)
2014                .await?;
2015            index.name = object_name.into();
2016            let index_table_id = index.index_table_id;
2017            let old_name = rename_relation!(Table, table, table_id, index_table_id);
2018
2019            // the name of index and its associated table is the same.
2020            let active_model = index::ActiveModel {
2021                index_id: sea_orm::ActiveValue::Set(index.index_id),
2022                name: sea_orm::ActiveValue::Set(object_name.into()),
2023                ..Default::default()
2024            };
2025            active_model.update(txn).await?;
2026            to_update_relations.push(PbObject {
2027                object_info: Some(PbObjectInfo::Index(
2028                    ObjectModel(index, obj.unwrap(), streaming_job).into(),
2029                )),
2030            });
2031            old_name
2032        }
2033        _ => unreachable!("only relation name can be altered."),
2034    };
2035
2036    Ok((to_update_relations, old_name))
2037}
2038
2039pub async fn get_database_resource_group<C>(txn: &C, database_id: DatabaseId) -> MetaResult<String>
2040where
2041    C: ConnectionTrait,
2042{
2043    let database_resource_group: Option<String> = Database::find_by_id(database_id)
2044        .select_only()
2045        .column(database::Column::ResourceGroup)
2046        .into_tuple()
2047        .one(txn)
2048        .await?
2049        .ok_or_else(|| MetaError::catalog_id_not_found("database", database_id))?;
2050
2051    Ok(database_resource_group.unwrap_or_else(|| DEFAULT_RESOURCE_GROUP.to_owned()))
2052}
2053
2054pub async fn get_existing_job_resource_group<C>(
2055    txn: &C,
2056    streaming_job_id: JobId,
2057) -> MetaResult<String>
2058where
2059    C: ConnectionTrait,
2060{
2061    let (job_specific_resource_group, database_resource_group): (Option<String>, Option<String>) =
2062        StreamingJob::find_by_id(streaming_job_id)
2063            .select_only()
2064            .join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
2065            .join(JoinType::InnerJoin, object::Relation::Database2.def())
2066            .column(streaming_job::Column::SpecificResourceGroup)
2067            .column(database::Column::ResourceGroup)
2068            .into_tuple()
2069            .one(txn)
2070            .await?
2071            .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", streaming_job_id))?;
2072
2073    Ok(job_specific_resource_group.unwrap_or_else(|| {
2074        database_resource_group.unwrap_or_else(|| DEFAULT_RESOURCE_GROUP.to_owned())
2075    }))
2076}
2077
2078pub fn filter_workers_by_resource_group(
2079    workers: &HashMap<WorkerId, WorkerNode>,
2080    resource_group: &str,
2081) -> BTreeSet<WorkerId> {
2082    workers
2083        .iter()
2084        .filter(|&(_, worker)| {
2085            worker
2086                .resource_group()
2087                .map(|node_label| node_label.as_str() == resource_group)
2088                .unwrap_or(false)
2089        })
2090        .map(|(id, _)| *id)
2091        .collect()
2092}
2093
2094/// `rename_relation_refer` updates the definition of relations that refer to the target one,
2095/// it commits the changes to the transaction and returns all the updated relations.
2096pub async fn rename_relation_refer(
2097    txn: &DatabaseTransaction,
2098    object_type: ObjectType,
2099    object_id: ObjectId,
2100    object_name: &str,
2101    old_name: &str,
2102) -> MetaResult<Vec<PbObject>> {
2103    use sea_orm::ActiveModelTrait;
2104
2105    use crate::controller::rename::alter_relation_rename_refs;
2106
2107    let mut to_update_relations = vec![];
2108    macro_rules! rename_relation_ref {
2109        ($entity:ident, $table:ident, $identity:ident, $object_id:expr) => {{
2110            let (mut relation, obj) = $entity::find_by_id($object_id)
2111                .find_also_related(Object)
2112                .one(txn)
2113                .await?
2114                .unwrap();
2115            relation.definition =
2116                alter_relation_rename_refs(&relation.definition, old_name, object_name);
2117            let active_model = $table::ActiveModel {
2118                $identity: Set(relation.$identity),
2119                definition: Set(relation.definition.clone()),
2120                ..Default::default()
2121            };
2122            active_model.update(txn).await?;
2123            let streaming_job = streaming_job::Entity::find_by_id($object_id.as_raw_id())
2124                .one(txn)
2125                .await?;
2126            to_update_relations.push(PbObject {
2127                object_info: Some(PbObjectInfo::$entity(
2128                    ObjectModel(relation, obj.unwrap(), streaming_job).into(),
2129                )),
2130            });
2131        }};
2132    }
2133    let mut objs = get_referring_objects(object_id, txn).await?;
2134    if object_type == ObjectType::Table {
2135        let incoming_sinks: Vec<SinkId> = Sink::find()
2136            .select_only()
2137            .column(sink::Column::SinkId)
2138            .filter(sink::Column::TargetTable.eq(object_id))
2139            .into_tuple()
2140            .all(txn)
2141            .await?;
2142
2143        objs.extend(incoming_sinks.into_iter().map(|id| PartialObject {
2144            oid: id.as_object_id(),
2145            obj_type: ObjectType::Sink,
2146            schema_id: None,
2147            database_id: None,
2148        }));
2149    }
2150
2151    for obj in objs {
2152        match obj.obj_type {
2153            ObjectType::Table => {
2154                rename_relation_ref!(Table, table, table_id, obj.oid.as_table_id())
2155            }
2156            ObjectType::Sink => {
2157                rename_relation_ref!(Sink, sink, sink_id, obj.oid.as_sink_id())
2158            }
2159            ObjectType::Subscription => {
2160                rename_relation_ref!(
2161                    Subscription,
2162                    subscription,
2163                    subscription_id,
2164                    obj.oid.as_subscription_id()
2165                )
2166            }
2167            ObjectType::View => rename_relation_ref!(View, view, view_id, obj.oid.as_view_id()),
2168            ObjectType::Index => {
2169                let index_table_id: Option<TableId> = Index::find_by_id(obj.oid.as_index_id())
2170                    .select_only()
2171                    .column(index::Column::IndexTableId)
2172                    .into_tuple()
2173                    .one(txn)
2174                    .await?;
2175                rename_relation_ref!(Table, table, table_id, index_table_id.unwrap());
2176            }
2177            _ => {
2178                bail!(
2179                    "only the table, sink, subscription, view and index will depend on other objects."
2180                )
2181            }
2182        }
2183    }
2184
2185    Ok(to_update_relations)
2186}
2187
2188/// Validate that subscription can be safely deleted, meeting any of the following conditions:
2189/// 1. The upstream table is not referred to by any cross-db mv.
2190/// 2. After deleting the subscription, the upstream table still has at least one subscription.
2191pub async fn validate_subscription_deletion<C>(
2192    txn: &C,
2193    subscription_id: SubscriptionId,
2194) -> MetaResult<()>
2195where
2196    C: ConnectionTrait,
2197{
2198    let upstream_table_id: ObjectId = Subscription::find_by_id(subscription_id)
2199        .select_only()
2200        .column(subscription::Column::DependentTableId)
2201        .into_tuple()
2202        .one(txn)
2203        .await?
2204        .ok_or_else(|| MetaError::catalog_id_not_found("subscription", subscription_id))?;
2205
2206    let cnt = Subscription::find()
2207        .filter(subscription::Column::DependentTableId.eq(upstream_table_id))
2208        .count(txn)
2209        .await?;
2210    if cnt > 1 {
2211        // Ensure that at least one subscription is remained for the upstream table
2212        // once the subscription is dropped.
2213        return Ok(());
2214    }
2215
2216    // Ensure that the upstream table is not referred by any cross-db mv.
2217    let obj_alias = Alias::new("o1");
2218    let used_by_alias = Alias::new("o2");
2219    let count = ObjectDependency::find()
2220        .join_as(
2221            JoinType::InnerJoin,
2222            object_dependency::Relation::Object2.def(),
2223            obj_alias.clone(),
2224        )
2225        .join_as(
2226            JoinType::InnerJoin,
2227            object_dependency::Relation::Object1.def(),
2228            used_by_alias.clone(),
2229        )
2230        .filter(
2231            object_dependency::Column::Oid
2232                .eq(upstream_table_id)
2233                .and(object_dependency::Column::UsedBy.ne(subscription_id))
2234                .and(
2235                    Expr::col((obj_alias, object::Column::DatabaseId))
2236                        .ne(Expr::col((used_by_alias, object::Column::DatabaseId))),
2237                ),
2238        )
2239        .count(txn)
2240        .await?;
2241
2242    if count != 0 {
2243        return Err(MetaError::permission_denied(format!(
2244            "Referenced by {} cross-db objects.",
2245            count
2246        )));
2247    }
2248
2249    Ok(())
2250}
2251
2252pub async fn fetch_target_fragments<C>(
2253    txn: &C,
2254    src_fragment_id: impl IntoIterator<Item = FragmentId>,
2255) -> MetaResult<HashMap<FragmentId, Vec<FragmentId>>>
2256where
2257    C: ConnectionTrait,
2258{
2259    let source_target_fragments: Vec<(FragmentId, FragmentId)> = FragmentRelation::find()
2260        .select_only()
2261        .columns([
2262            fragment_relation::Column::SourceFragmentId,
2263            fragment_relation::Column::TargetFragmentId,
2264        ])
2265        .filter(fragment_relation::Column::SourceFragmentId.is_in(src_fragment_id))
2266        .into_tuple()
2267        .all(txn)
2268        .await?;
2269
2270    let source_target_fragments = source_target_fragments.into_iter().into_group_map();
2271
2272    Ok(source_target_fragments)
2273}
2274
2275pub async fn get_sink_fragment_by_ids<C>(
2276    txn: &C,
2277    sink_ids: Vec<SinkId>,
2278) -> MetaResult<HashMap<SinkId, FragmentId>>
2279where
2280    C: ConnectionTrait,
2281{
2282    let sink_num = sink_ids.len();
2283    let sink_fragment_ids: Vec<(SinkId, FragmentId)> = Fragment::find()
2284        .select_only()
2285        .columns([fragment::Column::JobId, fragment::Column::FragmentId])
2286        .filter(
2287            fragment::Column::JobId
2288                .is_in(sink_ids)
2289                .and(FragmentTypeMask::intersects(FragmentTypeFlag::Sink)),
2290        )
2291        .into_tuple()
2292        .all(txn)
2293        .await?;
2294
2295    if sink_fragment_ids.len() != sink_num {
2296        return Err(anyhow::anyhow!(
2297            "expected exactly one sink fragment for each sink, but got {} fragments for {} sinks",
2298            sink_fragment_ids.len(),
2299            sink_num
2300        )
2301        .into());
2302    }
2303
2304    Ok(sink_fragment_ids.into_iter().collect())
2305}
2306
2307pub async fn has_table_been_migrated<C>(txn: &C, table_id: TableId) -> MetaResult<bool>
2308where
2309    C: ConnectionTrait,
2310{
2311    let mview_fragment: Vec<i32> = Fragment::find()
2312        .select_only()
2313        .column(fragment::Column::FragmentTypeMask)
2314        .filter(
2315            fragment::Column::JobId
2316                .eq(table_id)
2317                .and(FragmentTypeMask::intersects(FragmentTypeFlag::Mview)),
2318        )
2319        .into_tuple()
2320        .all(txn)
2321        .await?;
2322
2323    let mview_fragment_len = mview_fragment.len();
2324    if mview_fragment_len != 1 {
2325        bail!(
2326            "expected exactly one mview fragment for table {}, found {}",
2327            table_id,
2328            mview_fragment_len
2329        );
2330    }
2331
2332    let mview_fragment = mview_fragment.into_iter().next().unwrap();
2333    let migrated =
2334        FragmentTypeMask::from(mview_fragment).contains(FragmentTypeFlag::UpstreamSinkUnion);
2335
2336    Ok(migrated)
2337}
2338
2339pub async fn try_get_iceberg_table_by_downstream_sink<C>(
2340    txn: &C,
2341    sink_id: SinkId,
2342) -> MetaResult<Option<TableId>>
2343where
2344    C: ConnectionTrait,
2345{
2346    let sink = Sink::find_by_id(sink_id).one(txn).await?;
2347    let Some(sink) = sink else {
2348        return Ok(None);
2349    };
2350
2351    if sink.name.starts_with(ICEBERG_SINK_PREFIX) {
2352        let object_ids: Vec<ObjectId> = ObjectDependency::find()
2353            .select_only()
2354            .column(object_dependency::Column::Oid)
2355            .filter(object_dependency::Column::UsedBy.eq(sink_id))
2356            .into_tuple()
2357            .all(txn)
2358            .await?;
2359        let mut iceberg_table_ids = vec![];
2360        for object_id in object_ids {
2361            let table_id = object_id.as_table_id();
2362            if let Some(table_engine) = Table::find_by_id(table_id)
2363                .select_only()
2364                .column(table::Column::Engine)
2365                .into_tuple::<table::Engine>()
2366                .one(txn)
2367                .await?
2368                && table_engine == table::Engine::Iceberg
2369            {
2370                iceberg_table_ids.push(table_id);
2371            }
2372        }
2373        if iceberg_table_ids.len() == 1 {
2374            return Ok(Some(iceberg_table_ids[0]));
2375        }
2376    }
2377    Ok(None)
2378}
2379
2380pub async fn check_if_belongs_to_iceberg_table<C>(txn: &C, job_id: JobId) -> MetaResult<bool>
2381where
2382    C: ConnectionTrait,
2383{
2384    if let Some(engine) = Table::find_by_id(job_id.as_mv_table_id())
2385        .select_only()
2386        .column(table::Column::Engine)
2387        .into_tuple::<table::Engine>()
2388        .one(txn)
2389        .await?
2390        && engine == table::Engine::Iceberg
2391    {
2392        return Ok(true);
2393    }
2394    if let Some(sink_name) = Sink::find_by_id(job_id.as_sink_id())
2395        .select_only()
2396        .column(sink::Column::Name)
2397        .into_tuple::<String>()
2398        .one(txn)
2399        .await?
2400        && sink_name.starts_with(ICEBERG_SINK_PREFIX)
2401    {
2402        return Ok(true);
2403    }
2404    Ok(false)
2405}
2406
2407pub async fn find_dirty_iceberg_table_jobs<C>(
2408    txn: &C,
2409    database_id: Option<DatabaseId>,
2410) -> MetaResult<Vec<PartialObject>>
2411where
2412    C: ConnectionTrait,
2413{
2414    let mut filter_condition = streaming_job::Column::JobStatus
2415        .ne(JobStatus::Created)
2416        .and(object::Column::ObjType.is_in([ObjectType::Table, ObjectType::Sink]))
2417        .and(streaming_job::Column::CreateType.eq(CreateType::Background));
2418    if let Some(database_id) = database_id {
2419        filter_condition = filter_condition.and(object::Column::DatabaseId.eq(database_id));
2420    }
2421    let creating_table_sink_jobs: Vec<PartialObject> = StreamingJob::find()
2422        .select_only()
2423        .columns([
2424            object::Column::Oid,
2425            object::Column::ObjType,
2426            object::Column::SchemaId,
2427            object::Column::DatabaseId,
2428        ])
2429        .join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
2430        .filter(filter_condition)
2431        .into_partial_model()
2432        .all(txn)
2433        .await?;
2434
2435    let mut dirty_iceberg_table_jobs = vec![];
2436    for job in creating_table_sink_jobs {
2437        if check_if_belongs_to_iceberg_table(txn, job.oid.as_job_id()).await? {
2438            tracing::info!("Found dirty iceberg job with id: {}", job.oid);
2439            dirty_iceberg_table_jobs.push(job);
2440        }
2441    }
2442
2443    Ok(dirty_iceberg_table_jobs)
2444}
2445
2446pub fn build_select_node_list(
2447    from: &[ColumnCatalog],
2448    to: &[ColumnCatalog],
2449) -> MetaResult<Vec<PbExprNode>> {
2450    let mut exprs = Vec::with_capacity(to.len());
2451    let idx_by_col_id = from
2452        .iter()
2453        .enumerate()
2454        .map(|(idx, col)| (col.column_desc.as_ref().unwrap().column_id, idx))
2455        .collect::<HashMap<_, _>>();
2456
2457    for to_col in to {
2458        let to_col = to_col.column_desc.as_ref().unwrap();
2459        let to_col_type_ref = to_col.column_type.as_ref().unwrap();
2460        let to_col_type = DataType::from(to_col_type_ref);
2461        if let Some(from_idx) = idx_by_col_id.get(&to_col.column_id) {
2462            let from_col_type = DataType::from(
2463                from[*from_idx]
2464                    .column_desc
2465                    .as_ref()
2466                    .unwrap()
2467                    .column_type
2468                    .as_ref()
2469                    .unwrap(),
2470            );
2471            if !to_col_type.equals_datatype(&from_col_type) {
2472                return Err(anyhow!(
2473                    "Column type mismatch: {:?} != {:?}",
2474                    from_col_type,
2475                    to_col_type
2476                )
2477                .into());
2478            }
2479            exprs.push(PbExprNode {
2480                function_type: expr_node::Type::Unspecified.into(),
2481                return_type: Some(to_col_type_ref.clone()),
2482                rex_node: Some(expr_node::RexNode::InputRef(*from_idx as _)),
2483            });
2484        } else {
2485            let to_default_node =
2486                if let Some(GeneratedOrDefaultColumn::DefaultColumn(DefaultColumnDesc {
2487                    expr,
2488                    ..
2489                })) = &to_col.generated_or_default_column
2490                {
2491                    expr.clone().unwrap()
2492                } else {
2493                    let null = Datum::None.to_protobuf();
2494                    PbExprNode {
2495                        function_type: expr_node::Type::Unspecified.into(),
2496                        return_type: Some(to_col_type_ref.clone()),
2497                        rex_node: Some(expr_node::RexNode::Constant(null)),
2498                    }
2499                };
2500            exprs.push(to_default_node);
2501        }
2502    }
2503
2504    Ok(exprs)
2505}
2506
2507#[derive(Clone, Debug, Default)]
2508pub struct StreamingJobExtraInfo {
2509    pub timezone: Option<String>,
2510    pub config_override: Arc<str>,
2511    pub job_definition: String,
2512    pub backfill_orders: Option<BackfillOrders>,
2513}
2514
2515impl StreamingJobExtraInfo {
2516    pub fn stream_context(&self) -> StreamContext {
2517        StreamContext {
2518            timezone: self.timezone.clone(),
2519            config_override: self.config_override.clone(),
2520        }
2521    }
2522}
2523
2524/// Tuple of (`job_id`, `timezone`, `config_override`, `backfill_orders`)
2525type StreamingJobExtraInfoRow = (
2526    JobId,
2527    Option<String>,
2528    Option<String>,
2529    Option<BackfillOrders>,
2530);
2531
2532pub async fn get_streaming_job_extra_info<C>(
2533    txn: &C,
2534    job_ids: Vec<JobId>,
2535) -> MetaResult<HashMap<JobId, StreamingJobExtraInfo>>
2536where
2537    C: ConnectionTrait,
2538{
2539    let pairs: Vec<StreamingJobExtraInfoRow> = StreamingJob::find()
2540        .select_only()
2541        .columns([
2542            streaming_job::Column::JobId,
2543            streaming_job::Column::Timezone,
2544            streaming_job::Column::ConfigOverride,
2545            streaming_job::Column::BackfillOrders,
2546        ])
2547        .filter(streaming_job::Column::JobId.is_in(job_ids.clone()))
2548        .into_tuple()
2549        .all(txn)
2550        .await?;
2551
2552    let job_ids = job_ids.into_iter().collect();
2553
2554    let mut definitions = resolve_streaming_job_definition(txn, &job_ids).await?;
2555
2556    let result = pairs
2557        .into_iter()
2558        .map(|(job_id, timezone, config_override, backfill_orders)| {
2559            let job_definition = definitions.remove(&job_id).unwrap_or_default();
2560            (
2561                job_id,
2562                StreamingJobExtraInfo {
2563                    timezone,
2564                    config_override: config_override.unwrap_or_default().into(),
2565                    job_definition,
2566                    backfill_orders,
2567                },
2568            )
2569        })
2570        .collect();
2571
2572    Ok(result)
2573}
2574
2575#[cfg(test)]
2576mod tests {
2577    use super::*;
2578
2579    #[test]
2580    fn test_extract_cdc_table_name() {
2581        let ddl1 = "CREATE TABLE t1 () FROM pg_source TABLE 'public.t1'";
2582        let ddl2 = "CREATE TABLE t2 (v1 int) FROM pg_source TABLE 'mydb.t2'";
2583        assert_eq!(
2584            extract_external_table_name_from_definition(ddl1),
2585            Some("public.t1".into())
2586        );
2587        assert_eq!(
2588            extract_external_table_name_from_definition(ddl2),
2589            Some("mydb.t2".into())
2590        );
2591    }
2592}