risingwave_meta/controller/
utils.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeSet, HashMap, HashSet};
16use std::sync::Arc;
17
18use anyhow::{Context, anyhow};
19use itertools::Itertools;
20use risingwave_common::bitmap::Bitmap;
21use risingwave_common::catalog::{
22    FragmentTypeFlag, FragmentTypeMask, ICEBERG_SINK_PREFIX, ICEBERG_SOURCE_PREFIX,
23};
24use risingwave_common::hash::{ActorMapping, VnodeBitmapExt, WorkerSlotId, WorkerSlotMapping};
25use risingwave_common::id::{JobId, SubscriptionId};
26use risingwave_common::system_param::AdaptiveParallelismStrategy;
27use risingwave_common::system_param::adaptive_parallelism_strategy::parse_strategy;
28use risingwave_common::types::{DataType, Datum};
29use risingwave_common::util::value_encoding::DatumToProtoExt;
30use risingwave_common::util::worker_util::DEFAULT_RESOURCE_GROUP;
31use risingwave_common::{bail, hash};
32use risingwave_meta_model::fragment::DistributionType;
33use risingwave_meta_model::object::ObjectType;
34use risingwave_meta_model::prelude::*;
35use risingwave_meta_model::streaming_job::BackfillOrders;
36use risingwave_meta_model::table::TableType;
37use risingwave_meta_model::user_privilege::Action;
38use risingwave_meta_model::{
39    ActorId, ColumnCatalogArray, CreateType, DataTypeArray, DatabaseId, DispatcherType, FragmentId,
40    JobStatus, ObjectId, PrivilegeId, SchemaId, SinkId, SourceId, StreamNode, StreamSourceInfo,
41    TableId, TableIdArray, UserId, WorkerId, connection, database, fragment, fragment_relation,
42    function, index, object, object_dependency, schema, secret, sink, source, streaming_job,
43    subscription, table, user, user_default_privilege, user_privilege, view,
44};
45use risingwave_meta_model_migration::WithQuery;
46use risingwave_pb::catalog::{
47    PbConnection, PbDatabase, PbFunction, PbIndex, PbSchema, PbSecret, PbSink, PbSource,
48    PbSubscription, PbTable, PbView,
49};
50use risingwave_pb::common::{PbObjectType, WorkerNode};
51use risingwave_pb::expr::{PbExprNode, expr_node};
52use risingwave_pb::meta::object::PbObjectInfo;
53use risingwave_pb::meta::subscribe_response::Info as NotificationInfo;
54use risingwave_pb::meta::{
55    ObjectDependency as PbObjectDependency, PbFragmentWorkerSlotMapping, PbObject, PbObjectGroup,
56};
57use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn;
58use risingwave_pb::plan_common::{ColumnCatalog, DefaultColumnDesc};
59use risingwave_pb::stream_plan::{PbDispatchOutputMapping, PbDispatcher, PbDispatcherType};
60use risingwave_pb::user::grant_privilege::{PbActionWithGrantOption, PbObject as PbGrantObject};
61use risingwave_pb::user::{PbAction, PbGrantPrivilege, PbUserInfo};
62use risingwave_sqlparser::ast::Statement as SqlStatement;
63use risingwave_sqlparser::parser::Parser;
64use sea_orm::sea_query::{
65    Alias, CommonTableExpression, Expr, Query, QueryStatementBuilder, SelectStatement, UnionType,
66    WithClause,
67};
68use sea_orm::{
69    ColumnTrait, ConnectionTrait, DatabaseTransaction, DerivePartialModel, EntityTrait,
70    FromQueryResult, IntoActiveModel, JoinType, Order, PaginatorTrait, QueryFilter, QuerySelect,
71    RelationTrait, Set, Statement,
72};
73use thiserror_ext::AsReport;
74use tracing::warn;
75
76use crate::barrier::SharedFragmentInfo;
77use crate::controller::ObjectModel;
78use crate::controller::fragment::FragmentTypeMaskExt;
79use crate::controller::scale::resolve_streaming_job_definition;
80use crate::model::{FragmentDownstreamRelation, StreamContext};
81use crate::{MetaError, MetaResult};
82
83/// This function will construct a query using recursive cte to find all objects[(id, `obj_type`)] that are used by the given object.
84///
85/// # Examples
86///
87/// ```
88/// use risingwave_meta::controller::utils::construct_obj_dependency_query;
89/// use sea_orm::sea_query::*;
90/// use sea_orm::*;
91///
92/// let query = construct_obj_dependency_query(1.into());
93///
94/// assert_eq!(
95///     query.to_string(MysqlQueryBuilder),
96///     r#"WITH RECURSIVE `used_by_object_ids` (`used_by`) AS (SELECT `used_by` FROM `object_dependency` WHERE `object_dependency`.`oid` = 1 UNION ALL (SELECT `oid` FROM `object` WHERE `object`.`database_id` = 1 OR `object`.`schema_id` = 1) UNION ALL (SELECT `object_dependency`.`used_by` FROM `object_dependency` INNER JOIN `used_by_object_ids` ON `used_by_object_ids`.`used_by` = `oid`)) SELECT DISTINCT `oid`, `obj_type`, `schema_id`, `database_id` FROM `used_by_object_ids` INNER JOIN `object` ON `used_by_object_ids`.`used_by` = `oid` ORDER BY `oid` DESC"#
97/// );
98/// assert_eq!(
99///     query.to_string(PostgresQueryBuilder),
100///     r#"WITH RECURSIVE "used_by_object_ids" ("used_by") AS (SELECT "used_by" FROM "object_dependency" WHERE "object_dependency"."oid" = 1 UNION ALL (SELECT "oid" FROM "object" WHERE "object"."database_id" = 1 OR "object"."schema_id" = 1) UNION ALL (SELECT "object_dependency"."used_by" FROM "object_dependency" INNER JOIN "used_by_object_ids" ON "used_by_object_ids"."used_by" = "oid")) SELECT DISTINCT "oid", "obj_type", "schema_id", "database_id" FROM "used_by_object_ids" INNER JOIN "object" ON "used_by_object_ids"."used_by" = "oid" ORDER BY "oid" DESC"#
101/// );
102/// assert_eq!(
103///     query.to_string(SqliteQueryBuilder),
104///     r#"WITH RECURSIVE "used_by_object_ids" ("used_by") AS (SELECT "used_by" FROM "object_dependency" WHERE "object_dependency"."oid" = 1 UNION ALL SELECT "oid" FROM "object" WHERE "object"."database_id" = 1 OR "object"."schema_id" = 1 UNION ALL SELECT "object_dependency"."used_by" FROM "object_dependency" INNER JOIN "used_by_object_ids" ON "used_by_object_ids"."used_by" = "oid") SELECT DISTINCT "oid", "obj_type", "schema_id", "database_id" FROM "used_by_object_ids" INNER JOIN "object" ON "used_by_object_ids"."used_by" = "oid" ORDER BY "oid" DESC"#
105/// );
106/// ```
107pub fn construct_obj_dependency_query(obj_id: ObjectId) -> WithQuery {
108    let cte_alias = Alias::new("used_by_object_ids");
109    let cte_return_alias = Alias::new("used_by");
110
111    let mut base_query = SelectStatement::new()
112        .column(object_dependency::Column::UsedBy)
113        .from(ObjectDependency)
114        .and_where(object_dependency::Column::Oid.eq(obj_id))
115        .to_owned();
116
117    let belonged_obj_query = SelectStatement::new()
118        .column(object::Column::Oid)
119        .from(Object)
120        .and_where(
121            object::Column::DatabaseId
122                .eq(obj_id)
123                .or(object::Column::SchemaId.eq(obj_id)),
124        )
125        .to_owned();
126
127    let cte_referencing = Query::select()
128        .column((ObjectDependency, object_dependency::Column::UsedBy))
129        .from(ObjectDependency)
130        .inner_join(
131            cte_alias.clone(),
132            Expr::col((cte_alias.clone(), cte_return_alias.clone()))
133                .equals(object_dependency::Column::Oid),
134        )
135        .to_owned();
136
137    let mut common_table_expr = CommonTableExpression::new();
138    common_table_expr
139        .query(
140            base_query
141                .union(UnionType::All, belonged_obj_query)
142                .union(UnionType::All, cte_referencing)
143                .to_owned(),
144        )
145        .column(cte_return_alias.clone())
146        .table_name(cte_alias.clone());
147
148    SelectStatement::new()
149        .distinct()
150        .columns([
151            object::Column::Oid,
152            object::Column::ObjType,
153            object::Column::SchemaId,
154            object::Column::DatabaseId,
155        ])
156        .from(cte_alias.clone())
157        .inner_join(
158            Object,
159            Expr::col((cte_alias, cte_return_alias)).equals(object::Column::Oid),
160        )
161        .order_by(object::Column::Oid, Order::Desc)
162        .to_owned()
163        .with(
164            WithClause::new()
165                .recursive(true)
166                .cte(common_table_expr)
167                .to_owned(),
168        )
169}
170
171fn to_pb_object_type(obj_type: ObjectType) -> PbObjectType {
172    match obj_type {
173        ObjectType::Database => PbObjectType::Database,
174        ObjectType::Schema => PbObjectType::Schema,
175        ObjectType::Table => PbObjectType::Table,
176        ObjectType::Source => PbObjectType::Source,
177        ObjectType::Sink => PbObjectType::Sink,
178        ObjectType::View => PbObjectType::View,
179        ObjectType::Index => PbObjectType::Index,
180        ObjectType::Function => PbObjectType::Function,
181        ObjectType::Connection => PbObjectType::Connection,
182        ObjectType::Subscription => PbObjectType::Subscription,
183        ObjectType::Secret => PbObjectType::Secret,
184    }
185}
186
187/// Collect object dependencies with optional object id filtering, and optionally exclude non-created streaming jobs.
188async fn list_object_dependencies_impl(
189    txn: &DatabaseTransaction,
190    object_id: Option<ObjectId>,
191    include_creating: bool,
192) -> MetaResult<Vec<PbObjectDependency>> {
193    let referenced_alias = Alias::new("referenced_obj");
194    let mut query = ObjectDependency::find()
195        .select_only()
196        .columns([
197            object_dependency::Column::Oid,
198            object_dependency::Column::UsedBy,
199        ])
200        .column_as(
201            Expr::col((referenced_alias.clone(), object::Column::ObjType)),
202            "referenced_obj_type",
203        )
204        .join(
205            JoinType::InnerJoin,
206            object_dependency::Relation::Object1.def(),
207        )
208        .join_as(
209            JoinType::InnerJoin,
210            object_dependency::Relation::Object2.def(),
211            referenced_alias.clone(),
212        );
213    if let Some(object_id) = object_id {
214        query = query.filter(object_dependency::Column::UsedBy.eq(object_id));
215    }
216    let mut obj_dependencies: Vec<PbObjectDependency> = query
217        .into_tuple()
218        .all(txn)
219        .await?
220        .into_iter()
221        .map(|(oid, used_by, referenced_type)| PbObjectDependency {
222            object_id: used_by,
223            referenced_object_id: oid,
224            referenced_object_type: to_pb_object_type(referenced_type) as i32,
225        })
226        .collect();
227
228    let mut sink_query = Sink::find()
229        .select_only()
230        .columns([sink::Column::SinkId, sink::Column::TargetTable])
231        .filter(sink::Column::TargetTable.is_not_null());
232    if let Some(object_id) = object_id {
233        sink_query = sink_query.filter(
234            sink::Column::SinkId
235                .eq(object_id)
236                .or(sink::Column::TargetTable.eq(object_id)),
237        );
238    }
239    let sink_dependencies: Vec<(SinkId, TableId)> = sink_query.into_tuple().all(txn).await?;
240    obj_dependencies.extend(sink_dependencies.into_iter().map(|(sink_id, table_id)| {
241        PbObjectDependency {
242            object_id: table_id.into(),
243            referenced_object_id: sink_id.into(),
244            referenced_object_type: PbObjectType::Sink as i32,
245        }
246    }));
247
248    if !include_creating {
249        let mut streaming_job_ids = obj_dependencies
250            .iter()
251            .map(|dependency| dependency.object_id)
252            .collect_vec();
253        streaming_job_ids.sort_unstable();
254        streaming_job_ids.dedup();
255
256        if !streaming_job_ids.is_empty() {
257            let non_created_jobs: HashSet<JobId> = StreamingJob::find()
258                .select_only()
259                .columns([streaming_job::Column::JobId])
260                .filter(
261                    streaming_job::Column::JobId
262                        .is_in(streaming_job_ids)
263                        .and(streaming_job::Column::JobStatus.ne(JobStatus::Created)),
264                )
265                .into_tuple()
266                .all(txn)
267                .await?
268                .into_iter()
269                .collect();
270
271            if !non_created_jobs.is_empty() {
272                obj_dependencies.retain(|dependency| {
273                    !non_created_jobs.contains(&dependency.object_id.as_job_id())
274                });
275            }
276        }
277    }
278
279    Ok(obj_dependencies)
280}
281
282/// List object dependencies with optional filtering of non-created streaming jobs.
283pub async fn list_object_dependencies(
284    txn: &DatabaseTransaction,
285    include_creating: bool,
286) -> MetaResult<Vec<PbObjectDependency>> {
287    list_object_dependencies_impl(txn, None, include_creating).await
288}
289
290/// List object dependencies for the specified object id without filtering by job status.
291pub async fn list_object_dependencies_by_object_id(
292    txn: &DatabaseTransaction,
293    object_id: ObjectId,
294) -> MetaResult<Vec<PbObjectDependency>> {
295    list_object_dependencies_impl(txn, Some(object_id), true).await
296}
297
298/// This function will construct a query using recursive cte to find if dependent objects are already relying on the target table.
299///
300/// # Examples
301///
302/// ```
303/// use risingwave_meta::controller::utils::construct_sink_cycle_check_query;
304/// use sea_orm::sea_query::*;
305/// use sea_orm::*;
306///
307/// let query = construct_sink_cycle_check_query(1.into(), vec![2.into(), 3.into()]);
308///
309/// assert_eq!(
310///     query.to_string(MysqlQueryBuilder),
311///     r#"WITH RECURSIVE `used_by_object_ids_with_sink` (`oid`, `used_by`) AS (SELECT `oid`, `used_by` FROM `object_dependency` WHERE `object_dependency`.`oid` = 1 UNION ALL (SELECT `obj_dependency_with_sink`.`oid`, `obj_dependency_with_sink`.`used_by` FROM (SELECT `oid`, `used_by` FROM `object_dependency` UNION ALL (SELECT `sink_id`, `target_table` FROM `sink` WHERE `sink`.`target_table` IS NOT NULL)) AS `obj_dependency_with_sink` INNER JOIN `used_by_object_ids_with_sink` ON `used_by_object_ids_with_sink`.`used_by` = `obj_dependency_with_sink`.`oid` WHERE `used_by_object_ids_with_sink`.`used_by` <> `used_by_object_ids_with_sink`.`oid`)) SELECT COUNT(`used_by_object_ids_with_sink`.`used_by`) FROM `used_by_object_ids_with_sink` WHERE `used_by_object_ids_with_sink`.`used_by` IN (2, 3)"#
312/// );
313/// assert_eq!(
314///     query.to_string(PostgresQueryBuilder),
315///     r#"WITH RECURSIVE "used_by_object_ids_with_sink" ("oid", "used_by") AS (SELECT "oid", "used_by" FROM "object_dependency" WHERE "object_dependency"."oid" = 1 UNION ALL (SELECT "obj_dependency_with_sink"."oid", "obj_dependency_with_sink"."used_by" FROM (SELECT "oid", "used_by" FROM "object_dependency" UNION ALL (SELECT "sink_id", "target_table" FROM "sink" WHERE "sink"."target_table" IS NOT NULL)) AS "obj_dependency_with_sink" INNER JOIN "used_by_object_ids_with_sink" ON "used_by_object_ids_with_sink"."used_by" = "obj_dependency_with_sink"."oid" WHERE "used_by_object_ids_with_sink"."used_by" <> "used_by_object_ids_with_sink"."oid")) SELECT COUNT("used_by_object_ids_with_sink"."used_by") FROM "used_by_object_ids_with_sink" WHERE "used_by_object_ids_with_sink"."used_by" IN (2, 3)"#
316/// );
317/// assert_eq!(
318///     query.to_string(SqliteQueryBuilder),
319///     r#"WITH RECURSIVE "used_by_object_ids_with_sink" ("oid", "used_by") AS (SELECT "oid", "used_by" FROM "object_dependency" WHERE "object_dependency"."oid" = 1 UNION ALL SELECT "obj_dependency_with_sink"."oid", "obj_dependency_with_sink"."used_by" FROM (SELECT "oid", "used_by" FROM "object_dependency" UNION ALL SELECT "sink_id", "target_table" FROM "sink" WHERE "sink"."target_table" IS NOT NULL) AS "obj_dependency_with_sink" INNER JOIN "used_by_object_ids_with_sink" ON "used_by_object_ids_with_sink"."used_by" = "obj_dependency_with_sink"."oid" WHERE "used_by_object_ids_with_sink"."used_by" <> "used_by_object_ids_with_sink"."oid") SELECT COUNT("used_by_object_ids_with_sink"."used_by") FROM "used_by_object_ids_with_sink" WHERE "used_by_object_ids_with_sink"."used_by" IN (2, 3)"#
320/// );
321/// ```
322pub fn construct_sink_cycle_check_query(
323    target_table: ObjectId,
324    dependent_objects: Vec<ObjectId>,
325) -> WithQuery {
326    let cte_alias = Alias::new("used_by_object_ids_with_sink");
327    let depend_alias = Alias::new("obj_dependency_with_sink");
328
329    let mut base_query = SelectStatement::new()
330        .columns([
331            object_dependency::Column::Oid,
332            object_dependency::Column::UsedBy,
333        ])
334        .from(ObjectDependency)
335        .and_where(object_dependency::Column::Oid.eq(target_table))
336        .to_owned();
337
338    let query_sink_deps = SelectStatement::new()
339        .columns([sink::Column::SinkId, sink::Column::TargetTable])
340        .from(Sink)
341        .and_where(sink::Column::TargetTable.is_not_null())
342        .to_owned();
343
344    let cte_referencing = Query::select()
345        .column((depend_alias.clone(), object_dependency::Column::Oid))
346        .column((depend_alias.clone(), object_dependency::Column::UsedBy))
347        .from_subquery(
348            SelectStatement::new()
349                .columns([
350                    object_dependency::Column::Oid,
351                    object_dependency::Column::UsedBy,
352                ])
353                .from(ObjectDependency)
354                .union(UnionType::All, query_sink_deps)
355                .to_owned(),
356            depend_alias.clone(),
357        )
358        .inner_join(
359            cte_alias.clone(),
360            Expr::col((cte_alias.clone(), object_dependency::Column::UsedBy))
361                .eq(Expr::col((depend_alias, object_dependency::Column::Oid))),
362        )
363        .and_where(
364            Expr::col((cte_alias.clone(), object_dependency::Column::UsedBy)).ne(Expr::col((
365                cte_alias.clone(),
366                object_dependency::Column::Oid,
367            ))),
368        )
369        .to_owned();
370
371    let mut common_table_expr = CommonTableExpression::new();
372    common_table_expr
373        .query(base_query.union(UnionType::All, cte_referencing).to_owned())
374        .columns([
375            object_dependency::Column::Oid,
376            object_dependency::Column::UsedBy,
377        ])
378        .table_name(cte_alias.clone());
379
380    SelectStatement::new()
381        .expr(Expr::col((cte_alias.clone(), object_dependency::Column::UsedBy)).count())
382        .from(cte_alias.clone())
383        .and_where(
384            Expr::col((cte_alias, object_dependency::Column::UsedBy)).is_in(dependent_objects),
385        )
386        .to_owned()
387        .with(
388            WithClause::new()
389                .recursive(true)
390                .cte(common_table_expr)
391                .to_owned(),
392        )
393}
394
395#[derive(Clone, DerivePartialModel, FromQueryResult, Debug)]
396#[sea_orm(entity = "Object")]
397pub struct PartialObject {
398    pub oid: ObjectId,
399    pub obj_type: ObjectType,
400    pub schema_id: Option<SchemaId>,
401    pub database_id: Option<DatabaseId>,
402}
403
404#[derive(Clone, DerivePartialModel, FromQueryResult)]
405#[sea_orm(entity = "Fragment")]
406pub struct PartialFragmentStateTables {
407    pub fragment_id: FragmentId,
408    pub job_id: ObjectId,
409    pub state_table_ids: TableIdArray,
410}
411
412#[derive(Clone, Eq, PartialEq, Debug)]
413pub struct PartialActorLocation {
414    pub actor_id: ActorId,
415    pub fragment_id: FragmentId,
416    pub worker_id: WorkerId,
417}
418
419#[derive(FromQueryResult, Debug, Eq, PartialEq, Clone)]
420pub struct FragmentDesc {
421    pub fragment_id: FragmentId,
422    pub job_id: JobId,
423    pub fragment_type_mask: i32,
424    pub distribution_type: DistributionType,
425    pub state_table_ids: TableIdArray,
426    pub parallelism: i64,
427    pub vnode_count: i32,
428    pub stream_node: StreamNode,
429    pub parallelism_policy: String,
430}
431
432/// List all objects that are using the given one in a cascade way. It runs a recursive CTE to find all the dependencies.
433pub async fn get_referring_objects_cascade<C>(
434    obj_id: ObjectId,
435    db: &C,
436) -> MetaResult<Vec<PartialObject>>
437where
438    C: ConnectionTrait,
439{
440    let query = construct_obj_dependency_query(obj_id);
441    let (sql, values) = query.build_any(&*db.get_database_backend().get_query_builder());
442    let objects = PartialObject::find_by_statement(Statement::from_sql_and_values(
443        db.get_database_backend(),
444        sql,
445        values,
446    ))
447    .all(db)
448    .await?;
449    Ok(objects)
450}
451
452/// Check if create a sink with given dependent objects into the target table will cause a cycle, return true if it will.
453pub async fn check_sink_into_table_cycle<C>(
454    target_table: ObjectId,
455    dependent_objs: Vec<ObjectId>,
456    db: &C,
457) -> MetaResult<bool>
458where
459    C: ConnectionTrait,
460{
461    if dependent_objs.is_empty() {
462        return Ok(false);
463    }
464
465    // special check for self referencing
466    if dependent_objs.contains(&target_table) {
467        return Ok(true);
468    }
469
470    let query = construct_sink_cycle_check_query(target_table, dependent_objs);
471    let (sql, values) = query.build_any(&*db.get_database_backend().get_query_builder());
472
473    let res = db
474        .query_one(Statement::from_sql_and_values(
475            db.get_database_backend(),
476            sql,
477            values,
478        ))
479        .await?
480        .unwrap();
481
482    let cnt: i64 = res.try_get_by(0)?;
483
484    Ok(cnt != 0)
485}
486
487/// `ensure_object_id` ensures the existence of target object in the cluster.
488pub async fn ensure_object_id<C>(
489    object_type: ObjectType,
490    obj_id: impl Into<ObjectId>,
491    db: &C,
492) -> MetaResult<()>
493where
494    C: ConnectionTrait,
495{
496    let obj_id = obj_id.into();
497    let count = Object::find_by_id(obj_id).count(db).await?;
498    if count == 0 {
499        return Err(MetaError::catalog_id_not_found(
500            object_type.as_str(),
501            obj_id,
502        ));
503    }
504    Ok(())
505}
506
507pub async fn ensure_job_not_canceled<C>(job_id: JobId, db: &C) -> MetaResult<()>
508where
509    C: ConnectionTrait,
510{
511    let count = Object::find_by_id(job_id).count(db).await?;
512    if count == 0 {
513        return Err(MetaError::cancelled(format!(
514            "job {} might be cancelled manually or by recovery",
515            job_id
516        )));
517    }
518    Ok(())
519}
520
521/// `ensure_user_id` ensures the existence of target user in the cluster.
522pub async fn ensure_user_id<C>(user_id: UserId, db: &C) -> MetaResult<()>
523where
524    C: ConnectionTrait,
525{
526    let count = User::find_by_id(user_id).count(db).await?;
527    if count == 0 {
528        return Err(anyhow!("user {} was concurrently dropped", user_id).into());
529    }
530    Ok(())
531}
532
533/// `check_database_name_duplicate` checks whether the database name is already used in the cluster.
534pub async fn check_database_name_duplicate<C>(name: &str, db: &C) -> MetaResult<()>
535where
536    C: ConnectionTrait,
537{
538    let count = Database::find()
539        .filter(database::Column::Name.eq(name))
540        .count(db)
541        .await?;
542    if count > 0 {
543        assert_eq!(count, 1);
544        return Err(MetaError::catalog_duplicated("database", name));
545    }
546    Ok(())
547}
548
549/// `check_function_signature_duplicate` checks whether the function name and its signature is already used in the target namespace.
550pub async fn check_function_signature_duplicate<C>(
551    pb_function: &PbFunction,
552    db: &C,
553) -> MetaResult<()>
554where
555    C: ConnectionTrait,
556{
557    let count = Function::find()
558        .inner_join(Object)
559        .filter(
560            object::Column::DatabaseId
561                .eq(pb_function.database_id)
562                .and(object::Column::SchemaId.eq(pb_function.schema_id))
563                .and(function::Column::Name.eq(&pb_function.name))
564                .and(
565                    function::Column::ArgTypes
566                        .eq(DataTypeArray::from(pb_function.arg_types.clone())),
567                ),
568        )
569        .count(db)
570        .await?;
571    if count > 0 {
572        assert_eq!(count, 1);
573        return Err(MetaError::catalog_duplicated("function", &pb_function.name));
574    }
575    Ok(())
576}
577
578/// `check_connection_name_duplicate` checks whether the connection name is already used in the target namespace.
579pub async fn check_connection_name_duplicate<C>(
580    pb_connection: &PbConnection,
581    db: &C,
582) -> MetaResult<()>
583where
584    C: ConnectionTrait,
585{
586    let count = Connection::find()
587        .inner_join(Object)
588        .filter(
589            object::Column::DatabaseId
590                .eq(pb_connection.database_id)
591                .and(object::Column::SchemaId.eq(pb_connection.schema_id))
592                .and(connection::Column::Name.eq(&pb_connection.name)),
593        )
594        .count(db)
595        .await?;
596    if count > 0 {
597        assert_eq!(count, 1);
598        return Err(MetaError::catalog_duplicated(
599            "connection",
600            &pb_connection.name,
601        ));
602    }
603    Ok(())
604}
605
606pub async fn check_secret_name_duplicate<C>(pb_secret: &PbSecret, db: &C) -> MetaResult<()>
607where
608    C: ConnectionTrait,
609{
610    let count = Secret::find()
611        .inner_join(Object)
612        .filter(
613            object::Column::DatabaseId
614                .eq(pb_secret.database_id)
615                .and(object::Column::SchemaId.eq(pb_secret.schema_id))
616                .and(secret::Column::Name.eq(&pb_secret.name)),
617        )
618        .count(db)
619        .await?;
620    if count > 0 {
621        assert_eq!(count, 1);
622        return Err(MetaError::catalog_duplicated("secret", &pb_secret.name));
623    }
624    Ok(())
625}
626
627pub async fn check_subscription_name_duplicate<C>(
628    pb_subscription: &PbSubscription,
629    db: &C,
630) -> MetaResult<()>
631where
632    C: ConnectionTrait,
633{
634    let count = Subscription::find()
635        .inner_join(Object)
636        .filter(
637            object::Column::DatabaseId
638                .eq(pb_subscription.database_id)
639                .and(object::Column::SchemaId.eq(pb_subscription.schema_id))
640                .and(subscription::Column::Name.eq(&pb_subscription.name)),
641        )
642        .count(db)
643        .await?;
644    if count > 0 {
645        assert_eq!(count, 1);
646        return Err(MetaError::catalog_duplicated(
647            "subscription",
648            &pb_subscription.name,
649        ));
650    }
651    Ok(())
652}
653
654/// `check_user_name_duplicate` checks whether the user is already existed in the cluster.
655pub async fn check_user_name_duplicate<C>(name: &str, db: &C) -> MetaResult<()>
656where
657    C: ConnectionTrait,
658{
659    let count = User::find()
660        .filter(user::Column::Name.eq(name))
661        .count(db)
662        .await?;
663    if count > 0 {
664        assert_eq!(count, 1);
665        return Err(MetaError::catalog_duplicated("user", name));
666    }
667    Ok(())
668}
669
670/// `check_relation_name_duplicate` checks whether the relation name is already used in the target namespace.
671pub async fn check_relation_name_duplicate<C>(
672    name: &str,
673    database_id: DatabaseId,
674    schema_id: SchemaId,
675    db: &C,
676) -> MetaResult<()>
677where
678    C: ConnectionTrait,
679{
680    macro_rules! check_duplicated {
681        ($obj_type:expr, $entity:ident, $table:ident) => {
682            let object_id = Object::find()
683                .select_only()
684                .column(object::Column::Oid)
685                .inner_join($entity)
686                .filter(
687                    object::Column::DatabaseId
688                        .eq(Some(database_id))
689                        .and(object::Column::SchemaId.eq(Some(schema_id)))
690                        .and($table::Column::Name.eq(name)),
691                )
692                .into_tuple::<ObjectId>()
693                .one(db)
694                .await?;
695            if let Some(oid) = object_id {
696                let check_creation = if $obj_type == ObjectType::View {
697                    false
698                } else if $obj_type == ObjectType::Source {
699                    let source_info = Source::find_by_id(oid.as_source_id())
700                        .select_only()
701                        .column(source::Column::SourceInfo)
702                        .into_tuple::<Option<StreamSourceInfo>>()
703                        .one(db)
704                        .await?
705                        .unwrap();
706                    source_info.map_or(false, |info| info.to_protobuf().is_shared())
707                } else {
708                    true
709                };
710                let job_id = oid.as_job_id();
711                return if check_creation
712                    && !matches!(
713                        StreamingJob::find_by_id(job_id)
714                            .select_only()
715                            .column(streaming_job::Column::JobStatus)
716                            .into_tuple::<JobStatus>()
717                            .one(db)
718                            .await?,
719                        Some(JobStatus::Created)
720                    ) {
721                    Err(MetaError::catalog_under_creation(
722                        $obj_type.as_str(),
723                        name,
724                        job_id,
725                    ))
726                } else {
727                    Err(MetaError::catalog_duplicated($obj_type.as_str(), name))
728                };
729            }
730        };
731    }
732    check_duplicated!(ObjectType::Table, Table, table);
733    check_duplicated!(ObjectType::Source, Source, source);
734    check_duplicated!(ObjectType::Sink, Sink, sink);
735    check_duplicated!(ObjectType::Index, Index, index);
736    check_duplicated!(ObjectType::View, View, view);
737
738    Ok(())
739}
740
741/// `check_schema_name_duplicate` checks whether the schema name is already used in the target database.
742pub async fn check_schema_name_duplicate<C>(
743    name: &str,
744    database_id: DatabaseId,
745    db: &C,
746) -> MetaResult<()>
747where
748    C: ConnectionTrait,
749{
750    let count = Object::find()
751        .inner_join(Schema)
752        .filter(
753            object::Column::ObjType
754                .eq(ObjectType::Schema)
755                .and(object::Column::DatabaseId.eq(Some(database_id)))
756                .and(schema::Column::Name.eq(name)),
757        )
758        .count(db)
759        .await?;
760    if count != 0 {
761        return Err(MetaError::catalog_duplicated("schema", name));
762    }
763
764    Ok(())
765}
766
767/// `check_object_refer_for_drop` checks whether the object is used by other objects except indexes.
768/// It returns an error that contains the details of the referring objects if it is used by others.
769pub async fn check_object_refer_for_drop<C>(
770    object_type: ObjectType,
771    object_id: ObjectId,
772    db: &C,
773) -> MetaResult<()>
774where
775    C: ConnectionTrait,
776{
777    // Ignore indexes.
778    let count = if object_type == ObjectType::Table {
779        ObjectDependency::find()
780            .join(
781                JoinType::InnerJoin,
782                object_dependency::Relation::Object1.def(),
783            )
784            .filter(
785                object_dependency::Column::Oid
786                    .eq(object_id)
787                    .and(object::Column::ObjType.ne(ObjectType::Index)),
788            )
789            .count(db)
790            .await?
791    } else {
792        ObjectDependency::find()
793            .filter(object_dependency::Column::Oid.eq(object_id))
794            .count(db)
795            .await?
796    };
797    if count != 0 {
798        // find the name of all objects that are using the given one.
799        let referring_objects = get_referring_objects(object_id, db).await?;
800        let referring_objs_map = referring_objects
801            .into_iter()
802            .filter(|o| o.obj_type != ObjectType::Index)
803            .into_group_map_by(|o| o.obj_type);
804        let mut details = vec![];
805        for (obj_type, objs) in referring_objs_map {
806            match obj_type {
807                ObjectType::Table => {
808                    let tables: Vec<(String, String)> = Object::find()
809                        .join(JoinType::InnerJoin, object::Relation::Table.def())
810                        .join(JoinType::InnerJoin, object::Relation::Database2.def())
811                        .join(JoinType::InnerJoin, object::Relation::Schema2.def())
812                        .select_only()
813                        .column(schema::Column::Name)
814                        .column(table::Column::Name)
815                        .filter(object::Column::Oid.is_in(objs.iter().map(|o| o.oid)))
816                        .into_tuple()
817                        .all(db)
818                        .await?;
819                    details.extend(tables.into_iter().map(|(schema_name, table_name)| {
820                        format!(
821                            "materialized view {}.{} depends on it",
822                            schema_name, table_name
823                        )
824                    }));
825                }
826                ObjectType::Sink => {
827                    let sinks: Vec<(String, String)> = Object::find()
828                        .join(JoinType::InnerJoin, object::Relation::Sink.def())
829                        .join(JoinType::InnerJoin, object::Relation::Database2.def())
830                        .join(JoinType::InnerJoin, object::Relation::Schema2.def())
831                        .select_only()
832                        .column(schema::Column::Name)
833                        .column(sink::Column::Name)
834                        .filter(object::Column::Oid.is_in(objs.iter().map(|o| o.oid)))
835                        .into_tuple()
836                        .all(db)
837                        .await?;
838                    if object_type == ObjectType::Table {
839                        let engine = Table::find_by_id(object_id.as_table_id())
840                            .select_only()
841                            .column(table::Column::Engine)
842                            .into_tuple::<table::Engine>()
843                            .one(db)
844                            .await?;
845                        if engine == Some(table::Engine::Iceberg) && sinks.len() == 1 {
846                            continue;
847                        }
848                    }
849                    details.extend(sinks.into_iter().map(|(schema_name, sink_name)| {
850                        format!("sink {}.{} depends on it", schema_name, sink_name)
851                    }));
852                }
853                ObjectType::View => {
854                    let views: Vec<(String, String)> = Object::find()
855                        .join(JoinType::InnerJoin, object::Relation::View.def())
856                        .join(JoinType::InnerJoin, object::Relation::Database2.def())
857                        .join(JoinType::InnerJoin, object::Relation::Schema2.def())
858                        .select_only()
859                        .column(schema::Column::Name)
860                        .column(view::Column::Name)
861                        .filter(object::Column::Oid.is_in(objs.iter().map(|o| o.oid)))
862                        .into_tuple()
863                        .all(db)
864                        .await?;
865                    details.extend(views.into_iter().map(|(schema_name, view_name)| {
866                        format!("view {}.{} depends on it", schema_name, view_name)
867                    }));
868                }
869                ObjectType::Subscription => {
870                    let subscriptions: Vec<(String, String)> = Object::find()
871                        .join(JoinType::InnerJoin, object::Relation::Subscription.def())
872                        .join(JoinType::InnerJoin, object::Relation::Database2.def())
873                        .join(JoinType::InnerJoin, object::Relation::Schema2.def())
874                        .select_only()
875                        .column(schema::Column::Name)
876                        .column(subscription::Column::Name)
877                        .filter(object::Column::Oid.is_in(objs.iter().map(|o| o.oid)))
878                        .into_tuple()
879                        .all(db)
880                        .await?;
881                    details.extend(subscriptions.into_iter().map(
882                        |(schema_name, subscription_name)| {
883                            format!(
884                                "subscription {}.{} depends on it",
885                                schema_name, subscription_name
886                            )
887                        },
888                    ));
889                }
890                ObjectType::Source => {
891                    let sources: Vec<(String, String)> = Object::find()
892                        .join(JoinType::InnerJoin, object::Relation::Source.def())
893                        .join(JoinType::InnerJoin, object::Relation::Database2.def())
894                        .join(JoinType::InnerJoin, object::Relation::Schema2.def())
895                        .select_only()
896                        .column(schema::Column::Name)
897                        .column(source::Column::Name)
898                        .filter(object::Column::Oid.is_in(objs.iter().map(|o| o.oid)))
899                        .into_tuple()
900                        .all(db)
901                        .await?;
902                    details.extend(sources.into_iter().map(|(schema_name, view_name)| {
903                        format!("source {}.{} depends on it", schema_name, view_name)
904                    }));
905                }
906                ObjectType::Connection => {
907                    let connections: Vec<(String, String)> = Object::find()
908                        .join(JoinType::InnerJoin, object::Relation::Connection.def())
909                        .join(JoinType::InnerJoin, object::Relation::Database2.def())
910                        .join(JoinType::InnerJoin, object::Relation::Schema2.def())
911                        .select_only()
912                        .column(schema::Column::Name)
913                        .column(connection::Column::Name)
914                        .filter(object::Column::Oid.is_in(objs.iter().map(|o| o.oid)))
915                        .into_tuple()
916                        .all(db)
917                        .await?;
918                    details.extend(connections.into_iter().map(|(schema_name, view_name)| {
919                        format!("connection {}.{} depends on it", schema_name, view_name)
920                    }));
921                }
922                // only the table, source, sink, subscription, view, connection and index will depend on other objects.
923                _ => bail!("unexpected referring object type: {}", obj_type.as_str()),
924            }
925        }
926        if details.is_empty() {
927            return Ok(());
928        }
929
930        return Err(MetaError::permission_denied(format!(
931            "{} used by {} other objects. \nDETAIL: {}\n\
932            {}",
933            object_type.as_str(),
934            details.len(),
935            details.join("\n"),
936            match object_type {
937                ObjectType::Function | ObjectType::Connection | ObjectType::Secret =>
938                    "HINT: DROP the dependent objects first.",
939                ObjectType::Database | ObjectType::Schema => unreachable!(),
940                _ => "HINT:  Use DROP ... CASCADE to drop the dependent objects too.",
941            }
942        )));
943    }
944    Ok(())
945}
946
947/// List all objects that are using the given one.
948pub async fn get_referring_objects<C>(object_id: ObjectId, db: &C) -> MetaResult<Vec<PartialObject>>
949where
950    C: ConnectionTrait,
951{
952    let objs = ObjectDependency::find()
953        .filter(object_dependency::Column::Oid.eq(object_id))
954        .join(
955            JoinType::InnerJoin,
956            object_dependency::Relation::Object1.def(),
957        )
958        .into_partial_model()
959        .all(db)
960        .await?;
961
962    Ok(objs)
963}
964
965/// `ensure_schema_empty` ensures that the schema is empty, used by `DROP SCHEMA`.
966pub async fn ensure_schema_empty<C>(schema_id: SchemaId, db: &C) -> MetaResult<()>
967where
968    C: ConnectionTrait,
969{
970    let count = Object::find()
971        .filter(object::Column::SchemaId.eq(Some(schema_id)))
972        .count(db)
973        .await?;
974    if count != 0 {
975        return Err(MetaError::permission_denied("schema is not empty"));
976    }
977
978    Ok(())
979}
980
981/// `list_user_info_by_ids` lists all users' info by their ids.
982pub async fn list_user_info_by_ids<C>(
983    user_ids: impl IntoIterator<Item = UserId>,
984    db: &C,
985) -> MetaResult<Vec<PbUserInfo>>
986where
987    C: ConnectionTrait,
988{
989    let mut user_infos = vec![];
990    for user_id in user_ids {
991        let user = User::find_by_id(user_id)
992            .one(db)
993            .await?
994            .ok_or_else(|| MetaError::catalog_id_not_found("user", user_id))?;
995        let mut user_info: PbUserInfo = user.into();
996        user_info.grant_privileges = get_user_privilege(user_id, db).await?;
997        user_infos.push(user_info);
998    }
999    Ok(user_infos)
1000}
1001
1002/// `get_object_owner` returns the owner of the given object.
1003pub async fn get_object_owner<C>(object_id: ObjectId, db: &C) -> MetaResult<UserId>
1004where
1005    C: ConnectionTrait,
1006{
1007    let obj_owner: UserId = Object::find_by_id(object_id)
1008        .select_only()
1009        .column(object::Column::OwnerId)
1010        .into_tuple()
1011        .one(db)
1012        .await?
1013        .ok_or_else(|| MetaError::catalog_id_not_found("object", object_id))?;
1014    Ok(obj_owner)
1015}
1016
1017/// `construct_privilege_dependency_query` constructs a query to find all privileges that are dependent on the given one.
1018///
1019/// # Examples
1020///
1021/// ```
1022/// use risingwave_meta::controller::utils::construct_privilege_dependency_query;
1023/// use sea_orm::sea_query::*;
1024/// use sea_orm::*;
1025///
1026/// let query = construct_privilege_dependency_query(vec![1, 2, 3]);
1027///
1028/// assert_eq!(
1029///    query.to_string(MysqlQueryBuilder),
1030///   r#"WITH RECURSIVE `granted_privilege_ids` (`id`, `user_id`) AS (SELECT `id`, `user_id` FROM `user_privilege` WHERE `user_privilege`.`id` IN (1, 2, 3) UNION ALL (SELECT `user_privilege`.`id`, `user_privilege`.`user_id` FROM `user_privilege` INNER JOIN `granted_privilege_ids` ON `granted_privilege_ids`.`id` = `dependent_id`)) SELECT `id`, `user_id` FROM `granted_privilege_ids`"#
1031/// );
1032/// assert_eq!(
1033///   query.to_string(PostgresQueryBuilder),
1034///  r#"WITH RECURSIVE "granted_privilege_ids" ("id", "user_id") AS (SELECT "id", "user_id" FROM "user_privilege" WHERE "user_privilege"."id" IN (1, 2, 3) UNION ALL (SELECT "user_privilege"."id", "user_privilege"."user_id" FROM "user_privilege" INNER JOIN "granted_privilege_ids" ON "granted_privilege_ids"."id" = "dependent_id")) SELECT "id", "user_id" FROM "granted_privilege_ids""#
1035/// );
1036/// assert_eq!(
1037///  query.to_string(SqliteQueryBuilder),
1038///  r#"WITH RECURSIVE "granted_privilege_ids" ("id", "user_id") AS (SELECT "id", "user_id" FROM "user_privilege" WHERE "user_privilege"."id" IN (1, 2, 3) UNION ALL SELECT "user_privilege"."id", "user_privilege"."user_id" FROM "user_privilege" INNER JOIN "granted_privilege_ids" ON "granted_privilege_ids"."id" = "dependent_id") SELECT "id", "user_id" FROM "granted_privilege_ids""#
1039/// );
1040/// ```
1041pub fn construct_privilege_dependency_query(ids: Vec<PrivilegeId>) -> WithQuery {
1042    let cte_alias = Alias::new("granted_privilege_ids");
1043    let cte_return_privilege_alias = Alias::new("id");
1044    let cte_return_user_alias = Alias::new("user_id");
1045
1046    let mut base_query = SelectStatement::new()
1047        .columns([user_privilege::Column::Id, user_privilege::Column::UserId])
1048        .from(UserPrivilege)
1049        .and_where(user_privilege::Column::Id.is_in(ids))
1050        .to_owned();
1051
1052    let cte_referencing = Query::select()
1053        .columns([
1054            (UserPrivilege, user_privilege::Column::Id),
1055            (UserPrivilege, user_privilege::Column::UserId),
1056        ])
1057        .from(UserPrivilege)
1058        .inner_join(
1059            cte_alias.clone(),
1060            Expr::col((cte_alias.clone(), cte_return_privilege_alias.clone()))
1061                .equals(user_privilege::Column::DependentId),
1062        )
1063        .to_owned();
1064
1065    let mut common_table_expr = CommonTableExpression::new();
1066    common_table_expr
1067        .query(base_query.union(UnionType::All, cte_referencing).to_owned())
1068        .columns([
1069            cte_return_privilege_alias.clone(),
1070            cte_return_user_alias.clone(),
1071        ])
1072        .table_name(cte_alias.clone());
1073
1074    SelectStatement::new()
1075        .columns([cte_return_privilege_alias, cte_return_user_alias])
1076        .from(cte_alias)
1077        .to_owned()
1078        .with(
1079            WithClause::new()
1080                .recursive(true)
1081                .cte(common_table_expr)
1082                .to_owned(),
1083        )
1084}
1085
1086pub async fn get_internal_tables_by_id<C>(job_id: JobId, db: &C) -> MetaResult<Vec<TableId>>
1087where
1088    C: ConnectionTrait,
1089{
1090    let table_ids: Vec<TableId> = Table::find()
1091        .select_only()
1092        .column(table::Column::TableId)
1093        .filter(
1094            table::Column::TableType
1095                .eq(TableType::Internal)
1096                .and(table::Column::BelongsToJobId.eq(job_id)),
1097        )
1098        .into_tuple()
1099        .all(db)
1100        .await?;
1101    Ok(table_ids)
1102}
1103
1104pub async fn get_index_state_tables_by_table_id<C>(
1105    table_id: TableId,
1106    db: &C,
1107) -> MetaResult<Vec<TableId>>
1108where
1109    C: ConnectionTrait,
1110{
1111    let mut index_table_ids: Vec<TableId> = Index::find()
1112        .select_only()
1113        .column(index::Column::IndexTableId)
1114        .filter(index::Column::PrimaryTableId.eq(table_id))
1115        .into_tuple()
1116        .all(db)
1117        .await?;
1118
1119    if !index_table_ids.is_empty() {
1120        let internal_table_ids: Vec<TableId> = Table::find()
1121            .select_only()
1122            .column(table::Column::TableId)
1123            .filter(
1124                table::Column::TableType
1125                    .eq(TableType::Internal)
1126                    .and(table::Column::BelongsToJobId.is_in(index_table_ids.clone())),
1127            )
1128            .into_tuple()
1129            .all(db)
1130            .await?;
1131
1132        index_table_ids.extend(internal_table_ids.into_iter());
1133    }
1134
1135    Ok(index_table_ids)
1136}
1137
1138/// `get_iceberg_related_object_ids` returns the related object ids of the iceberg source, sink and internal tables.
1139pub async fn get_iceberg_related_object_ids<C>(
1140    object_id: ObjectId,
1141    db: &C,
1142) -> MetaResult<Vec<ObjectId>>
1143where
1144    C: ConnectionTrait,
1145{
1146    let object = Object::find_by_id(object_id)
1147        .one(db)
1148        .await?
1149        .ok_or_else(|| MetaError::catalog_id_not_found("object", object_id))?;
1150    if object.obj_type != ObjectType::Table {
1151        return Ok(vec![]);
1152    }
1153
1154    let table = Table::find_by_id(object_id.as_table_id())
1155        .one(db)
1156        .await?
1157        .ok_or_else(|| MetaError::catalog_id_not_found("table", object_id))?;
1158    if !matches!(table.engine, Some(table::Engine::Iceberg)) {
1159        return Ok(vec![]);
1160    }
1161
1162    let database_id = object.database_id.unwrap();
1163    let schema_id = object.schema_id.unwrap();
1164
1165    let mut related_objects = vec![];
1166
1167    let iceberg_sink_name = format!("{}{}", ICEBERG_SINK_PREFIX, table.name);
1168    let iceberg_sink_id = Sink::find()
1169        .inner_join(Object)
1170        .select_only()
1171        .column(sink::Column::SinkId)
1172        .filter(
1173            object::Column::DatabaseId
1174                .eq(database_id)
1175                .and(object::Column::SchemaId.eq(schema_id))
1176                .and(sink::Column::Name.eq(&iceberg_sink_name)),
1177        )
1178        .into_tuple::<SinkId>()
1179        .one(db)
1180        .await?;
1181    if let Some(sink_id) = iceberg_sink_id {
1182        related_objects.push(sink_id.as_object_id());
1183        let sink_internal_tables = get_internal_tables_by_id(sink_id.as_job_id(), db).await?;
1184        related_objects.extend(
1185            sink_internal_tables
1186                .into_iter()
1187                .map(|tid| tid.as_object_id()),
1188        );
1189    } else {
1190        warn!(
1191            "iceberg table {} missing sink {}",
1192            table.name, iceberg_sink_name
1193        );
1194    }
1195
1196    let iceberg_source_name = format!("{}{}", ICEBERG_SOURCE_PREFIX, table.name);
1197    let iceberg_source_id = Source::find()
1198        .inner_join(Object)
1199        .select_only()
1200        .column(source::Column::SourceId)
1201        .filter(
1202            object::Column::DatabaseId
1203                .eq(database_id)
1204                .and(object::Column::SchemaId.eq(schema_id))
1205                .and(source::Column::Name.eq(&iceberg_source_name)),
1206        )
1207        .into_tuple::<SourceId>()
1208        .one(db)
1209        .await?;
1210    if let Some(source_id) = iceberg_source_id {
1211        related_objects.push(source_id.as_object_id());
1212    } else {
1213        warn!(
1214            "iceberg table {} missing source {}",
1215            table.name, iceberg_source_name
1216        );
1217    }
1218
1219    Ok(related_objects)
1220}
1221
1222/// Load streaming jobs by job ids.
1223pub(crate) async fn load_streaming_jobs_by_ids<C>(
1224    txn: &C,
1225    job_ids: impl IntoIterator<Item = JobId>,
1226) -> MetaResult<HashMap<JobId, streaming_job::Model>>
1227where
1228    C: ConnectionTrait,
1229{
1230    let job_ids: HashSet<JobId> = job_ids.into_iter().collect();
1231    if job_ids.is_empty() {
1232        return Ok(HashMap::new());
1233    }
1234    let jobs = streaming_job::Entity::find()
1235        .filter(streaming_job::Column::JobId.is_in(job_ids.clone()))
1236        .all(txn)
1237        .await?;
1238    Ok(jobs.into_iter().map(|job| (job.job_id, job)).collect())
1239}
1240
1241#[derive(Clone, DerivePartialModel, FromQueryResult)]
1242#[sea_orm(entity = "UserPrivilege")]
1243pub struct PartialUserPrivilege {
1244    pub id: PrivilegeId,
1245    pub user_id: UserId,
1246}
1247
1248pub async fn get_referring_privileges_cascade<C>(
1249    ids: Vec<PrivilegeId>,
1250    db: &C,
1251) -> MetaResult<Vec<PartialUserPrivilege>>
1252where
1253    C: ConnectionTrait,
1254{
1255    let query = construct_privilege_dependency_query(ids);
1256    let (sql, values) = query.build_any(&*db.get_database_backend().get_query_builder());
1257    let privileges = PartialUserPrivilege::find_by_statement(Statement::from_sql_and_values(
1258        db.get_database_backend(),
1259        sql,
1260        values,
1261    ))
1262    .all(db)
1263    .await?;
1264
1265    Ok(privileges)
1266}
1267
1268/// `ensure_privileges_not_referred` ensures that the privileges are not granted to any other users.
1269pub async fn ensure_privileges_not_referred<C>(ids: Vec<PrivilegeId>, db: &C) -> MetaResult<()>
1270where
1271    C: ConnectionTrait,
1272{
1273    let count = UserPrivilege::find()
1274        .filter(user_privilege::Column::DependentId.is_in(ids))
1275        .count(db)
1276        .await?;
1277    if count != 0 {
1278        return Err(MetaError::permission_denied(format!(
1279            "privileges granted to {} other ones.",
1280            count
1281        )));
1282    }
1283    Ok(())
1284}
1285
1286/// `get_user_privilege` returns the privileges of the given user.
1287pub async fn get_user_privilege<C>(user_id: UserId, db: &C) -> MetaResult<Vec<PbGrantPrivilege>>
1288where
1289    C: ConnectionTrait,
1290{
1291    let user_privileges = UserPrivilege::find()
1292        .find_also_related(Object)
1293        .filter(user_privilege::Column::UserId.eq(user_id))
1294        .all(db)
1295        .await?;
1296    Ok(user_privileges
1297        .into_iter()
1298        .map(|(privilege, object)| {
1299            let object = object.unwrap();
1300            let obj = match object.obj_type {
1301                ObjectType::Database => PbGrantObject::DatabaseId(object.oid.as_database_id()),
1302                ObjectType::Schema => PbGrantObject::SchemaId(object.oid.as_schema_id()),
1303                ObjectType::Table | ObjectType::Index => {
1304                    PbGrantObject::TableId(object.oid.as_table_id())
1305                }
1306                ObjectType::Source => PbGrantObject::SourceId(object.oid.as_source_id()),
1307                ObjectType::Sink => PbGrantObject::SinkId(object.oid.as_sink_id()),
1308                ObjectType::View => PbGrantObject::ViewId(object.oid.as_view_id()),
1309                ObjectType::Function => PbGrantObject::FunctionId(object.oid.as_function_id()),
1310                ObjectType::Connection => {
1311                    PbGrantObject::ConnectionId(object.oid.as_connection_id())
1312                }
1313                ObjectType::Subscription => {
1314                    PbGrantObject::SubscriptionId(object.oid.as_subscription_id())
1315                }
1316                ObjectType::Secret => PbGrantObject::SecretId(object.oid.as_secret_id()),
1317            };
1318            PbGrantPrivilege {
1319                action_with_opts: vec![PbActionWithGrantOption {
1320                    action: PbAction::from(privilege.action) as _,
1321                    with_grant_option: privilege.with_grant_option,
1322                    granted_by: privilege.granted_by as _,
1323                }],
1324                object: Some(obj),
1325            }
1326        })
1327        .collect())
1328}
1329
1330pub async fn get_table_columns(
1331    txn: &impl ConnectionTrait,
1332    id: TableId,
1333) -> MetaResult<ColumnCatalogArray> {
1334    let columns = Table::find_by_id(id)
1335        .select_only()
1336        .columns([table::Column::Columns])
1337        .into_tuple::<ColumnCatalogArray>()
1338        .one(txn)
1339        .await?
1340        .ok_or_else(|| MetaError::catalog_id_not_found("table", id))?;
1341    Ok(columns)
1342}
1343
1344/// `grant_default_privileges_automatically` grants default privileges automatically
1345/// for the given new object. It returns the list of user infos whose privileges are updated.
1346pub async fn grant_default_privileges_automatically<C>(
1347    db: &C,
1348    object_id: impl Into<ObjectId>,
1349) -> MetaResult<Vec<PbUserInfo>>
1350where
1351    C: ConnectionTrait,
1352{
1353    let object_id = object_id.into();
1354    let object = Object::find_by_id(object_id)
1355        .one(db)
1356        .await?
1357        .ok_or_else(|| MetaError::catalog_id_not_found("object", object_id))?;
1358    assert_ne!(object.obj_type, ObjectType::Database);
1359
1360    let for_mview_filter = if object.obj_type == ObjectType::Table {
1361        let table_type = Table::find_by_id(object_id.as_table_id())
1362            .select_only()
1363            .column(table::Column::TableType)
1364            .into_tuple::<TableType>()
1365            .one(db)
1366            .await?
1367            .ok_or_else(|| MetaError::catalog_id_not_found("table", object_id))?;
1368        user_default_privilege::Column::ForMaterializedView
1369            .eq(table_type == TableType::MaterializedView)
1370    } else {
1371        user_default_privilege::Column::ForMaterializedView.eq(false)
1372    };
1373    let schema_filter = if let Some(schema_id) = &object.schema_id {
1374        user_default_privilege::Column::SchemaId.eq(*schema_id)
1375    } else {
1376        user_default_privilege::Column::SchemaId.is_null()
1377    };
1378
1379    let default_privileges: Vec<(UserId, UserId, Action, bool)> = UserDefaultPrivilege::find()
1380        .select_only()
1381        .columns([
1382            user_default_privilege::Column::Grantee,
1383            user_default_privilege::Column::GrantedBy,
1384            user_default_privilege::Column::Action,
1385            user_default_privilege::Column::WithGrantOption,
1386        ])
1387        .filter(
1388            user_default_privilege::Column::DatabaseId
1389                .eq(object.database_id.unwrap())
1390                .and(schema_filter)
1391                .and(user_default_privilege::Column::UserId.eq(object.owner_id))
1392                .and(user_default_privilege::Column::ObjectType.eq(object.obj_type))
1393                .and(for_mview_filter),
1394        )
1395        .into_tuple()
1396        .all(db)
1397        .await?;
1398    if default_privileges.is_empty() {
1399        return Ok(vec![]);
1400    }
1401
1402    let updated_user_ids = default_privileges
1403        .iter()
1404        .map(|(grantee, _, _, _)| *grantee)
1405        .collect::<HashSet<_>>();
1406
1407    for (grantee, granted_by, action, with_grant_option) in default_privileges {
1408        UserPrivilege::insert(user_privilege::ActiveModel {
1409            user_id: Set(grantee),
1410            oid: Set(object_id),
1411            granted_by: Set(granted_by),
1412            action: Set(action),
1413            with_grant_option: Set(with_grant_option),
1414            ..Default::default()
1415        })
1416        .exec(db)
1417        .await?;
1418        if action == Action::Select {
1419            // Grant SELECT privilege for internal tables if the action is SELECT.
1420            let internal_table_ids = get_internal_tables_by_id(object_id.as_job_id(), db).await?;
1421            if !internal_table_ids.is_empty() {
1422                for internal_table_id in &internal_table_ids {
1423                    UserPrivilege::insert(user_privilege::ActiveModel {
1424                        user_id: Set(grantee),
1425                        oid: Set(internal_table_id.as_object_id()),
1426                        granted_by: Set(granted_by),
1427                        action: Set(Action::Select),
1428                        with_grant_option: Set(with_grant_option),
1429                        ..Default::default()
1430                    })
1431                    .exec(db)
1432                    .await?;
1433                }
1434            }
1435
1436            // Additionally, grant SELECT privilege for iceberg related objects if the action is SELECT.
1437            let iceberg_privilege_object_ids =
1438                get_iceberg_related_object_ids(object_id, db).await?;
1439            if !iceberg_privilege_object_ids.is_empty() {
1440                for iceberg_object_id in &iceberg_privilege_object_ids {
1441                    UserPrivilege::insert(user_privilege::ActiveModel {
1442                        user_id: Set(grantee),
1443                        oid: Set(*iceberg_object_id),
1444                        granted_by: Set(granted_by),
1445                        action: Set(action),
1446                        with_grant_option: Set(with_grant_option),
1447                        ..Default::default()
1448                    })
1449                    .exec(db)
1450                    .await?;
1451                }
1452            }
1453        }
1454    }
1455
1456    let updated_user_infos = list_user_info_by_ids(updated_user_ids, db).await?;
1457    Ok(updated_user_infos)
1458}
1459
1460// todo: remove it after migrated to sql backend.
1461pub fn extract_grant_obj_id(object: &PbGrantObject) -> ObjectId {
1462    match object {
1463        PbGrantObject::DatabaseId(id) => (*id).into(),
1464        PbGrantObject::SchemaId(id) => (*id).into(),
1465        PbGrantObject::TableId(id) => (*id).into(),
1466        PbGrantObject::SourceId(id) => (*id).into(),
1467        PbGrantObject::SinkId(id) => (*id).into(),
1468        PbGrantObject::ViewId(id) => (*id).into(),
1469        PbGrantObject::FunctionId(id) => (*id).into(),
1470        PbGrantObject::SubscriptionId(id) => (*id).into(),
1471        PbGrantObject::ConnectionId(id) => (*id).into(),
1472        PbGrantObject::SecretId(id) => (*id).into(),
1473    }
1474}
1475
1476pub async fn insert_fragment_relations(
1477    db: &impl ConnectionTrait,
1478    downstream_fragment_relations: &FragmentDownstreamRelation,
1479) -> MetaResult<()> {
1480    let mut relations = vec![];
1481    for (upstream_fragment_id, downstreams) in downstream_fragment_relations {
1482        for downstream in downstreams {
1483            relations.push(
1484                fragment_relation::Model {
1485                    source_fragment_id: *upstream_fragment_id as _,
1486                    target_fragment_id: downstream.downstream_fragment_id as _,
1487                    dispatcher_type: downstream.dispatcher_type,
1488                    dist_key_indices: downstream
1489                        .dist_key_indices
1490                        .iter()
1491                        .map(|idx| *idx as i32)
1492                        .collect_vec()
1493                        .into(),
1494                    output_indices: downstream
1495                        .output_mapping
1496                        .indices
1497                        .iter()
1498                        .map(|idx| *idx as i32)
1499                        .collect_vec()
1500                        .into(),
1501                    output_type_mapping: Some(downstream.output_mapping.types.clone().into()),
1502                }
1503                .into_active_model(),
1504            );
1505        }
1506    }
1507    if !relations.is_empty() {
1508        FragmentRelation::insert_many(relations).exec(db).await?;
1509    }
1510    Ok(())
1511}
1512
1513pub fn compose_dispatchers(
1514    source_fragment_distribution: DistributionType,
1515    source_fragment_actors: &HashMap<crate::model::ActorId, Option<Bitmap>>,
1516    target_fragment_id: crate::model::FragmentId,
1517    target_fragment_distribution: DistributionType,
1518    target_fragment_actors: &HashMap<crate::model::ActorId, Option<Bitmap>>,
1519    dispatcher_type: DispatcherType,
1520    dist_key_indices: Vec<u32>,
1521    output_mapping: PbDispatchOutputMapping,
1522) -> (
1523    HashMap<crate::model::ActorId, PbDispatcher>,
1524    Option<HashMap<crate::model::ActorId, crate::model::ActorId>>,
1525) {
1526    match dispatcher_type {
1527        DispatcherType::Hash => {
1528            let dispatcher = PbDispatcher {
1529                r#type: PbDispatcherType::from(dispatcher_type) as _,
1530                dist_key_indices,
1531                output_mapping: output_mapping.into(),
1532                hash_mapping: Some(
1533                    ActorMapping::from_bitmaps(
1534                        &target_fragment_actors
1535                            .iter()
1536                            .map(|(actor_id, bitmap)| {
1537                                (
1538                                    *actor_id as _,
1539                                    bitmap
1540                                        .clone()
1541                                        .expect("downstream hash dispatch must have distribution"),
1542                                )
1543                            })
1544                            .collect(),
1545                    )
1546                    .to_protobuf(),
1547                ),
1548                dispatcher_id: target_fragment_id,
1549                downstream_actor_id: target_fragment_actors.keys().copied().collect(),
1550            };
1551            (
1552                source_fragment_actors
1553                    .keys()
1554                    .map(|source_actor_id| (*source_actor_id, dispatcher.clone()))
1555                    .collect(),
1556                None,
1557            )
1558        }
1559        DispatcherType::Broadcast | DispatcherType::Simple => {
1560            let dispatcher = PbDispatcher {
1561                r#type: PbDispatcherType::from(dispatcher_type) as _,
1562                dist_key_indices,
1563                output_mapping: output_mapping.into(),
1564                hash_mapping: None,
1565                dispatcher_id: target_fragment_id,
1566                downstream_actor_id: target_fragment_actors.keys().copied().collect(),
1567            };
1568            (
1569                source_fragment_actors
1570                    .keys()
1571                    .map(|source_actor_id| (*source_actor_id, dispatcher.clone()))
1572                    .collect(),
1573                None,
1574            )
1575        }
1576        DispatcherType::NoShuffle => {
1577            let no_shuffle_map = resolve_no_shuffle_actor_mapping(
1578                source_fragment_distribution,
1579                source_fragment_actors
1580                    .iter()
1581                    .map(|(&id, bitmap)| (id, bitmap)),
1582                target_fragment_distribution,
1583                target_fragment_actors
1584                    .iter()
1585                    .map(|(&id, bitmap)| (id, bitmap)),
1586            );
1587            let dispatchers = no_shuffle_map
1588                .iter()
1589                .map(|(&upstream_actor_id, &downstream_actor_id)| {
1590                    (
1591                        upstream_actor_id,
1592                        PbDispatcher {
1593                            r#type: PbDispatcherType::NoShuffle as _,
1594                            dist_key_indices: dist_key_indices.clone(),
1595                            output_mapping: output_mapping.clone().into(),
1596                            hash_mapping: None,
1597                            dispatcher_id: target_fragment_id,
1598                            downstream_actor_id: vec![downstream_actor_id],
1599                        },
1600                    )
1601                })
1602                .collect();
1603            (dispatchers, Some(no_shuffle_map))
1604        }
1605    }
1606}
1607
1608/// Resolve 1:1 actor mapping between two no-shuffle-connected fragments.
1609///
1610/// Returns `Vec<(upstream_actor_id, downstream_actor_id)>` pairs. The actor id
1611/// type is generic so the same logic can be reused both for real `ActorId` mapping
1612/// and for alignment checks with synthetic ids (e.g. `(WorkerId, actor_idx)`).
1613///
1614/// For `Single` distribution the single actors are paired directly.
1615/// For `Hash` distribution actors are matched by their bitmap's first vnode and
1616/// full bitmap equality is asserted.
1617pub fn resolve_no_shuffle_actor_mapping<
1618    'a,
1619    ActorId: Copy + Eq + std::hash::Hash + std::fmt::Debug,
1620>(
1621    source_fragment_distribution: DistributionType,
1622    source_fragment_actors: impl IntoIterator<Item = (ActorId, &'a Option<Bitmap>)>,
1623    target_fragment_distribution: DistributionType,
1624    target_fragment_actors: impl IntoIterator<Item = (ActorId, &'a Option<Bitmap>)>,
1625) -> HashMap<ActorId, ActorId> {
1626    assert_eq!(source_fragment_distribution, target_fragment_distribution);
1627
1628    match source_fragment_distribution {
1629        DistributionType::Single => {
1630            let assert_singleton = |bitmap: &Option<Bitmap>| {
1631                assert!(
1632                    bitmap.as_ref().map(|bitmap| bitmap.all()).unwrap_or(true),
1633                    "not singleton: {:?}",
1634                    bitmap
1635                );
1636            };
1637            let (source_actor_id, bitmap) = source_fragment_actors
1638                .into_iter()
1639                .exactly_one()
1640                .ok()
1641                .expect("Single distribution should have exactly one source actor");
1642            assert_singleton(bitmap);
1643            let (target_actor_id, bitmap) = target_fragment_actors
1644                .into_iter()
1645                .exactly_one()
1646                .ok()
1647                .expect("Single distribution should have exactly one target actor");
1648            assert_singleton(bitmap);
1649            HashMap::from([(source_actor_id, target_actor_id)])
1650        }
1651        DistributionType::Hash => {
1652            // Build target index keyed by first vnode (one pass over target).
1653            let mut target_by_vnode: HashMap<_, _> = target_fragment_actors
1654                .into_iter()
1655                .map(|(actor_id, bitmap)| {
1656                    let bitmap = bitmap
1657                        .as_ref()
1658                        .expect("hash distribution should have bitmap");
1659                    let first_vnode = bitmap.iter_vnodes().next().expect("non-empty bitmap");
1660                    (first_vnode, (actor_id, bitmap))
1661                })
1662                .collect();
1663
1664            let target_count = target_by_vnode.len();
1665
1666            // One pass over source, matching against the target index.
1667            let mapping: HashMap<_, _> = source_fragment_actors
1668                .into_iter()
1669                .map(|(source_actor_id, source_bitmap)| {
1670                    let source_bitmap = source_bitmap
1671                        .as_ref()
1672                        .expect("hash distribution should have bitmap");
1673                    let first_vnode = source_bitmap
1674                        .iter_vnodes()
1675                        .next()
1676                        .expect("non-empty bitmap");
1677                    let (target_actor_id, target_bitmap) =
1678                        target_by_vnode.remove(&first_vnode).unwrap_or_else(|| {
1679                            panic!(
1680                                "cannot find matched target actor: {:?} first_vnode {:?}",
1681                                source_actor_id, first_vnode,
1682                            )
1683                        });
1684                    assert_eq!(
1685                        source_bitmap, target_bitmap,
1686                        "bitmap mismatch for source {:?} target {:?} at first_vnode {:?}",
1687                        source_actor_id, target_actor_id, first_vnode,
1688                    );
1689                    (source_actor_id, target_actor_id)
1690                })
1691                .collect();
1692
1693            assert_eq!(
1694                mapping.len(),
1695                target_count,
1696                "no-shuffle should have equal upstream downstream actor count: {} vs {}",
1697                mapping.len(),
1698                target_count,
1699            );
1700
1701            mapping
1702        }
1703    }
1704}
1705
1706pub fn rebuild_fragment_mapping(fragment: &SharedFragmentInfo) -> PbFragmentWorkerSlotMapping {
1707    let fragment_worker_slot_mapping = match fragment.distribution_type {
1708        DistributionType::Single => {
1709            let actor = fragment.actors.values().exactly_one().unwrap();
1710            WorkerSlotMapping::new_single(WorkerSlotId::new(actor.worker_id as _, 0))
1711        }
1712        DistributionType::Hash => {
1713            let actor_bitmaps: HashMap<_, _> = fragment
1714                .actors
1715                .iter()
1716                .map(|(actor_id, actor_info)| {
1717                    let vnode_bitmap = actor_info
1718                        .vnode_bitmap
1719                        .as_ref()
1720                        .cloned()
1721                        .expect("actor bitmap shouldn't be none in hash fragment");
1722
1723                    (*actor_id as hash::ActorId, vnode_bitmap)
1724                })
1725                .collect();
1726
1727            let actor_mapping = ActorMapping::from_bitmaps(&actor_bitmaps);
1728
1729            let actor_locations = fragment
1730                .actors
1731                .iter()
1732                .map(|(actor_id, actor_info)| (*actor_id as hash::ActorId, actor_info.worker_id))
1733                .collect();
1734
1735            actor_mapping.to_worker_slot(&actor_locations)
1736        }
1737    };
1738
1739    PbFragmentWorkerSlotMapping {
1740        fragment_id: fragment.fragment_id,
1741        mapping: Some(fragment_worker_slot_mapping.to_protobuf()),
1742    }
1743}
1744
1745/// For the given streaming jobs, returns
1746/// - All source fragments
1747/// - All sink fragments
1748/// - All actors
1749/// - All fragments
1750pub async fn get_fragments_for_jobs<C>(
1751    db: &C,
1752    streaming_jobs: Vec<JobId>,
1753) -> MetaResult<(
1754    HashMap<SourceId, BTreeSet<FragmentId>>,
1755    HashSet<FragmentId>,
1756    HashSet<FragmentId>,
1757)>
1758where
1759    C: ConnectionTrait,
1760{
1761    if streaming_jobs.is_empty() {
1762        return Ok((HashMap::default(), HashSet::default(), HashSet::default()));
1763    }
1764
1765    let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find()
1766        .select_only()
1767        .columns([
1768            fragment::Column::FragmentId,
1769            fragment::Column::FragmentTypeMask,
1770            fragment::Column::StreamNode,
1771        ])
1772        .filter(fragment::Column::JobId.is_in(streaming_jobs))
1773        .into_tuple()
1774        .all(db)
1775        .await?;
1776
1777    let fragment_ids: HashSet<_> = fragments
1778        .iter()
1779        .map(|(fragment_id, _, _)| *fragment_id)
1780        .collect();
1781
1782    let mut source_fragment_ids: HashMap<SourceId, BTreeSet<FragmentId>> = HashMap::new();
1783    let mut sink_fragment_ids: HashSet<FragmentId> = HashSet::new();
1784    for (fragment_id, mask, stream_node) in fragments {
1785        if FragmentTypeMask::from(mask).contains(FragmentTypeFlag::Source)
1786            && let Some(source_id) = stream_node.to_protobuf().find_stream_source()
1787        {
1788            source_fragment_ids
1789                .entry(source_id)
1790                .or_default()
1791                .insert(fragment_id);
1792        }
1793        if FragmentTypeMask::from(mask).contains(FragmentTypeFlag::Sink) {
1794            sink_fragment_ids.insert(fragment_id);
1795        }
1796    }
1797
1798    Ok((source_fragment_ids, sink_fragment_ids, fragment_ids))
1799}
1800
1801/// Build a object group for notifying the deletion of the given objects.
1802///
1803/// Note that only id fields are filled in the object info, as the arguments are partial objects.
1804/// As a result, the returned notification info should only be used for deletion.
1805pub(crate) fn build_object_group_for_delete(
1806    partial_objects: Vec<PartialObject>,
1807) -> NotificationInfo {
1808    let mut objects = vec![];
1809    for obj in partial_objects {
1810        match obj.obj_type {
1811            ObjectType::Database => objects.push(PbObject {
1812                object_info: Some(PbObjectInfo::Database(PbDatabase {
1813                    id: obj.oid.as_database_id(),
1814                    ..Default::default()
1815                })),
1816            }),
1817            ObjectType::Schema => objects.push(PbObject {
1818                object_info: Some(PbObjectInfo::Schema(PbSchema {
1819                    id: obj.oid.as_schema_id(),
1820                    database_id: obj.database_id.unwrap(),
1821                    ..Default::default()
1822                })),
1823            }),
1824            ObjectType::Table => objects.push(PbObject {
1825                object_info: Some(PbObjectInfo::Table(PbTable {
1826                    id: obj.oid.as_table_id(),
1827                    schema_id: obj.schema_id.unwrap(),
1828                    database_id: obj.database_id.unwrap(),
1829                    ..Default::default()
1830                })),
1831            }),
1832            ObjectType::Source => objects.push(PbObject {
1833                object_info: Some(PbObjectInfo::Source(PbSource {
1834                    id: obj.oid.as_source_id(),
1835                    schema_id: obj.schema_id.unwrap(),
1836                    database_id: obj.database_id.unwrap(),
1837                    ..Default::default()
1838                })),
1839            }),
1840            ObjectType::Sink => objects.push(PbObject {
1841                object_info: Some(PbObjectInfo::Sink(PbSink {
1842                    id: obj.oid.as_sink_id(),
1843                    schema_id: obj.schema_id.unwrap(),
1844                    database_id: obj.database_id.unwrap(),
1845                    ..Default::default()
1846                })),
1847            }),
1848            ObjectType::Subscription => objects.push(PbObject {
1849                object_info: Some(PbObjectInfo::Subscription(PbSubscription {
1850                    id: obj.oid.as_subscription_id(),
1851                    schema_id: obj.schema_id.unwrap(),
1852                    database_id: obj.database_id.unwrap(),
1853                    ..Default::default()
1854                })),
1855            }),
1856            ObjectType::View => objects.push(PbObject {
1857                object_info: Some(PbObjectInfo::View(PbView {
1858                    id: obj.oid.as_view_id(),
1859                    schema_id: obj.schema_id.unwrap(),
1860                    database_id: obj.database_id.unwrap(),
1861                    ..Default::default()
1862                })),
1863            }),
1864            ObjectType::Index => {
1865                objects.push(PbObject {
1866                    object_info: Some(PbObjectInfo::Index(PbIndex {
1867                        id: obj.oid.as_index_id(),
1868                        schema_id: obj.schema_id.unwrap(),
1869                        database_id: obj.database_id.unwrap(),
1870                        ..Default::default()
1871                    })),
1872                });
1873                objects.push(PbObject {
1874                    object_info: Some(PbObjectInfo::Table(PbTable {
1875                        id: obj.oid.as_table_id(),
1876                        schema_id: obj.schema_id.unwrap(),
1877                        database_id: obj.database_id.unwrap(),
1878                        ..Default::default()
1879                    })),
1880                });
1881            }
1882            ObjectType::Function => objects.push(PbObject {
1883                object_info: Some(PbObjectInfo::Function(PbFunction {
1884                    id: obj.oid.as_function_id(),
1885                    schema_id: obj.schema_id.unwrap(),
1886                    database_id: obj.database_id.unwrap(),
1887                    ..Default::default()
1888                })),
1889            }),
1890            ObjectType::Connection => objects.push(PbObject {
1891                object_info: Some(PbObjectInfo::Connection(PbConnection {
1892                    id: obj.oid.as_connection_id(),
1893                    schema_id: obj.schema_id.unwrap(),
1894                    database_id: obj.database_id.unwrap(),
1895                    ..Default::default()
1896                })),
1897            }),
1898            ObjectType::Secret => objects.push(PbObject {
1899                object_info: Some(PbObjectInfo::Secret(PbSecret {
1900                    id: obj.oid.as_secret_id(),
1901                    schema_id: obj.schema_id.unwrap(),
1902                    database_id: obj.database_id.unwrap(),
1903                    ..Default::default()
1904                })),
1905            }),
1906        }
1907    }
1908    NotificationInfo::ObjectGroup(PbObjectGroup {
1909        objects,
1910        dependencies: vec![],
1911    })
1912}
1913
1914pub fn extract_external_table_name_from_definition(table_definition: &str) -> Option<String> {
1915    let [mut definition]: [_; 1] = Parser::parse_sql(table_definition)
1916        .context("unable to parse table definition")
1917        .inspect_err(|e| {
1918            tracing::error!(
1919                target: "auto_schema_change",
1920                error = %e.as_report(),
1921                "failed to parse table definition")
1922        })
1923        .unwrap()
1924        .try_into()
1925        .unwrap();
1926    if let SqlStatement::CreateTable { cdc_table_info, .. } = &mut definition {
1927        cdc_table_info
1928            .clone()
1929            .map(|cdc_table_info| cdc_table_info.external_table_name)
1930    } else {
1931        None
1932    }
1933}
1934
1935/// `rename_relation` renames the target relation and its definition,
1936/// it commits the changes to the transaction and returns the updated relations and the old name.
1937pub async fn rename_relation(
1938    txn: &DatabaseTransaction,
1939    object_type: ObjectType,
1940    object_id: ObjectId,
1941    object_name: &str,
1942) -> MetaResult<(Vec<PbObject>, String)> {
1943    use sea_orm::ActiveModelTrait;
1944
1945    use crate::controller::rename::alter_relation_rename;
1946
1947    let mut to_update_relations = vec![];
1948    // rename relation.
1949    macro_rules! rename_relation {
1950        ($entity:ident, $table:ident, $identity:ident, $object_id:expr) => {{
1951            let (mut relation, obj) = $entity::find_by_id($object_id)
1952                .find_also_related(Object)
1953                .one(txn)
1954                .await?
1955                .unwrap();
1956            let obj = obj.unwrap();
1957            let old_name = relation.name.clone();
1958            relation.name = object_name.into();
1959            if obj.obj_type != ObjectType::View {
1960                relation.definition = alter_relation_rename(&relation.definition, object_name);
1961            }
1962            let active_model = $table::ActiveModel {
1963                $identity: Set(relation.$identity),
1964                name: Set(object_name.into()),
1965                definition: Set(relation.definition.clone()),
1966                ..Default::default()
1967            };
1968            active_model.update(txn).await?;
1969            let streaming_job = streaming_job::Entity::find_by_id($object_id.as_raw_id())
1970                .one(txn)
1971                .await?;
1972            to_update_relations.push(PbObject {
1973                object_info: Some(PbObjectInfo::$entity(
1974                    ObjectModel(relation, obj, streaming_job).into(),
1975                )),
1976            });
1977            old_name
1978        }};
1979    }
1980    // TODO: check is there any thing to change for shared source?
1981    let old_name = match object_type {
1982        ObjectType::Table => {
1983            let associated_source_id: Option<SourceId> = Source::find()
1984                .select_only()
1985                .column(source::Column::SourceId)
1986                .filter(source::Column::OptionalAssociatedTableId.eq(object_id))
1987                .into_tuple()
1988                .one(txn)
1989                .await?;
1990            if let Some(source_id) = associated_source_id {
1991                rename_relation!(Source, source, source_id, source_id);
1992            }
1993            rename_relation!(Table, table, table_id, object_id.as_table_id())
1994        }
1995        ObjectType::Source => {
1996            rename_relation!(Source, source, source_id, object_id.as_source_id())
1997        }
1998        ObjectType::Sink => rename_relation!(Sink, sink, sink_id, object_id.as_sink_id()),
1999        ObjectType::Subscription => {
2000            rename_relation!(
2001                Subscription,
2002                subscription,
2003                subscription_id,
2004                object_id.as_subscription_id()
2005            )
2006        }
2007        ObjectType::View => rename_relation!(View, view, view_id, object_id.as_view_id()),
2008        ObjectType::Index => {
2009            let (mut index, obj) = Index::find_by_id(object_id.as_index_id())
2010                .find_also_related(Object)
2011                .one(txn)
2012                .await?
2013                .unwrap();
2014            let streaming_job = streaming_job::Entity::find_by_id(index.index_id.as_job_id())
2015                .one(txn)
2016                .await?;
2017            index.name = object_name.into();
2018            let index_table_id = index.index_table_id;
2019            let old_name = rename_relation!(Table, table, table_id, index_table_id);
2020
2021            // the name of index and its associated table is the same.
2022            let active_model = index::ActiveModel {
2023                index_id: sea_orm::ActiveValue::Set(index.index_id),
2024                name: sea_orm::ActiveValue::Set(object_name.into()),
2025                ..Default::default()
2026            };
2027            active_model.update(txn).await?;
2028            to_update_relations.push(PbObject {
2029                object_info: Some(PbObjectInfo::Index(
2030                    ObjectModel(index, obj.unwrap(), streaming_job).into(),
2031                )),
2032            });
2033            old_name
2034        }
2035        _ => unreachable!("only relation name can be altered."),
2036    };
2037
2038    Ok((to_update_relations, old_name))
2039}
2040
2041pub async fn get_database_resource_group<C>(txn: &C, database_id: DatabaseId) -> MetaResult<String>
2042where
2043    C: ConnectionTrait,
2044{
2045    let database_resource_group: Option<String> = Database::find_by_id(database_id)
2046        .select_only()
2047        .column(database::Column::ResourceGroup)
2048        .into_tuple()
2049        .one(txn)
2050        .await?
2051        .ok_or_else(|| MetaError::catalog_id_not_found("database", database_id))?;
2052
2053    Ok(database_resource_group.unwrap_or_else(|| DEFAULT_RESOURCE_GROUP.to_owned()))
2054}
2055
2056pub async fn get_existing_job_resource_group<C>(
2057    txn: &C,
2058    streaming_job_id: JobId,
2059) -> MetaResult<String>
2060where
2061    C: ConnectionTrait,
2062{
2063    let (job_specific_resource_group, database_resource_group): (Option<String>, Option<String>) =
2064        StreamingJob::find_by_id(streaming_job_id)
2065            .select_only()
2066            .join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
2067            .join(JoinType::InnerJoin, object::Relation::Database2.def())
2068            .column(streaming_job::Column::SpecificResourceGroup)
2069            .column(database::Column::ResourceGroup)
2070            .into_tuple()
2071            .one(txn)
2072            .await?
2073            .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", streaming_job_id))?;
2074
2075    Ok(job_specific_resource_group.unwrap_or_else(|| {
2076        database_resource_group.unwrap_or_else(|| DEFAULT_RESOURCE_GROUP.to_owned())
2077    }))
2078}
2079
2080pub fn filter_workers_by_resource_group(
2081    workers: &HashMap<WorkerId, WorkerNode>,
2082    resource_group: &str,
2083) -> BTreeSet<WorkerId> {
2084    workers
2085        .iter()
2086        .filter(|&(_, worker)| {
2087            worker
2088                .resource_group()
2089                .map(|node_label| node_label.as_str() == resource_group)
2090                .unwrap_or(false)
2091        })
2092        .map(|(id, _)| *id)
2093        .collect()
2094}
2095
2096/// `rename_relation_refer` updates the definition of relations that refer to the target one,
2097/// it commits the changes to the transaction and returns all the updated relations.
2098pub async fn rename_relation_refer(
2099    txn: &DatabaseTransaction,
2100    object_type: ObjectType,
2101    object_id: ObjectId,
2102    object_name: &str,
2103    old_name: &str,
2104) -> MetaResult<Vec<PbObject>> {
2105    use sea_orm::ActiveModelTrait;
2106
2107    use crate::controller::rename::alter_relation_rename_refs;
2108
2109    let mut to_update_relations = vec![];
2110    macro_rules! rename_relation_ref {
2111        ($entity:ident, $table:ident, $identity:ident, $object_id:expr) => {{
2112            let (mut relation, obj) = $entity::find_by_id($object_id)
2113                .find_also_related(Object)
2114                .one(txn)
2115                .await?
2116                .unwrap();
2117            relation.definition =
2118                alter_relation_rename_refs(&relation.definition, old_name, object_name);
2119            let active_model = $table::ActiveModel {
2120                $identity: Set(relation.$identity),
2121                definition: Set(relation.definition.clone()),
2122                ..Default::default()
2123            };
2124            active_model.update(txn).await?;
2125            let streaming_job = streaming_job::Entity::find_by_id($object_id.as_raw_id())
2126                .one(txn)
2127                .await?;
2128            to_update_relations.push(PbObject {
2129                object_info: Some(PbObjectInfo::$entity(
2130                    ObjectModel(relation, obj.unwrap(), streaming_job).into(),
2131                )),
2132            });
2133        }};
2134    }
2135    let mut objs = get_referring_objects(object_id, txn).await?;
2136    if object_type == ObjectType::Table {
2137        let incoming_sinks: Vec<SinkId> = Sink::find()
2138            .select_only()
2139            .column(sink::Column::SinkId)
2140            .filter(sink::Column::TargetTable.eq(object_id))
2141            .into_tuple()
2142            .all(txn)
2143            .await?;
2144
2145        objs.extend(incoming_sinks.into_iter().map(|id| PartialObject {
2146            oid: id.as_object_id(),
2147            obj_type: ObjectType::Sink,
2148            schema_id: None,
2149            database_id: None,
2150        }));
2151    }
2152
2153    for obj in objs {
2154        match obj.obj_type {
2155            ObjectType::Table => {
2156                rename_relation_ref!(Table, table, table_id, obj.oid.as_table_id())
2157            }
2158            ObjectType::Sink => {
2159                rename_relation_ref!(Sink, sink, sink_id, obj.oid.as_sink_id())
2160            }
2161            ObjectType::Subscription => {
2162                rename_relation_ref!(
2163                    Subscription,
2164                    subscription,
2165                    subscription_id,
2166                    obj.oid.as_subscription_id()
2167                )
2168            }
2169            ObjectType::View => rename_relation_ref!(View, view, view_id, obj.oid.as_view_id()),
2170            ObjectType::Index => {
2171                let index_table_id: Option<TableId> = Index::find_by_id(obj.oid.as_index_id())
2172                    .select_only()
2173                    .column(index::Column::IndexTableId)
2174                    .into_tuple()
2175                    .one(txn)
2176                    .await?;
2177                rename_relation_ref!(Table, table, table_id, index_table_id.unwrap());
2178            }
2179            _ => {
2180                bail!(
2181                    "only the table, sink, subscription, view and index will depend on other objects."
2182                )
2183            }
2184        }
2185    }
2186
2187    Ok(to_update_relations)
2188}
2189
2190/// Validate that subscription can be safely deleted, meeting any of the following conditions:
2191/// 1. The upstream table is not referred to by any cross-db mv.
2192/// 2. After deleting the subscription, the upstream table still has at least one subscription.
2193pub async fn validate_subscription_deletion<C>(
2194    txn: &C,
2195    subscription_id: SubscriptionId,
2196) -> MetaResult<()>
2197where
2198    C: ConnectionTrait,
2199{
2200    let upstream_table_id: ObjectId = Subscription::find_by_id(subscription_id)
2201        .select_only()
2202        .column(subscription::Column::DependentTableId)
2203        .into_tuple()
2204        .one(txn)
2205        .await?
2206        .ok_or_else(|| MetaError::catalog_id_not_found("subscription", subscription_id))?;
2207
2208    let cnt = Subscription::find()
2209        .filter(subscription::Column::DependentTableId.eq(upstream_table_id))
2210        .count(txn)
2211        .await?;
2212    if cnt > 1 {
2213        // Ensure that at least one subscription is remained for the upstream table
2214        // once the subscription is dropped.
2215        return Ok(());
2216    }
2217
2218    // Ensure that the upstream table is not referred by any cross-db mv.
2219    let obj_alias = Alias::new("o1");
2220    let used_by_alias = Alias::new("o2");
2221    let count = ObjectDependency::find()
2222        .join_as(
2223            JoinType::InnerJoin,
2224            object_dependency::Relation::Object2.def(),
2225            obj_alias.clone(),
2226        )
2227        .join_as(
2228            JoinType::InnerJoin,
2229            object_dependency::Relation::Object1.def(),
2230            used_by_alias.clone(),
2231        )
2232        .filter(
2233            object_dependency::Column::Oid
2234                .eq(upstream_table_id)
2235                .and(object_dependency::Column::UsedBy.ne(subscription_id))
2236                .and(
2237                    Expr::col((obj_alias, object::Column::DatabaseId))
2238                        .ne(Expr::col((used_by_alias, object::Column::DatabaseId))),
2239                ),
2240        )
2241        .count(txn)
2242        .await?;
2243
2244    if count != 0 {
2245        return Err(MetaError::permission_denied(format!(
2246            "Referenced by {} cross-db objects.",
2247            count
2248        )));
2249    }
2250
2251    Ok(())
2252}
2253
2254pub async fn fetch_target_fragments<C>(
2255    txn: &C,
2256    src_fragment_id: impl IntoIterator<Item = FragmentId>,
2257) -> MetaResult<HashMap<FragmentId, Vec<FragmentId>>>
2258where
2259    C: ConnectionTrait,
2260{
2261    let source_target_fragments: Vec<(FragmentId, FragmentId)> = FragmentRelation::find()
2262        .select_only()
2263        .columns([
2264            fragment_relation::Column::SourceFragmentId,
2265            fragment_relation::Column::TargetFragmentId,
2266        ])
2267        .filter(fragment_relation::Column::SourceFragmentId.is_in(src_fragment_id))
2268        .into_tuple()
2269        .all(txn)
2270        .await?;
2271
2272    let source_target_fragments = source_target_fragments.into_iter().into_group_map();
2273
2274    Ok(source_target_fragments)
2275}
2276
2277pub async fn get_sink_fragment_by_ids<C>(
2278    txn: &C,
2279    sink_ids: Vec<SinkId>,
2280) -> MetaResult<HashMap<SinkId, FragmentId>>
2281where
2282    C: ConnectionTrait,
2283{
2284    let sink_num = sink_ids.len();
2285    let sink_fragment_ids: Vec<(SinkId, FragmentId)> = Fragment::find()
2286        .select_only()
2287        .columns([fragment::Column::JobId, fragment::Column::FragmentId])
2288        .filter(
2289            fragment::Column::JobId
2290                .is_in(sink_ids)
2291                .and(FragmentTypeMask::intersects(FragmentTypeFlag::Sink)),
2292        )
2293        .into_tuple()
2294        .all(txn)
2295        .await?;
2296
2297    if sink_fragment_ids.len() != sink_num {
2298        return Err(anyhow::anyhow!(
2299            "expected exactly one sink fragment for each sink, but got {} fragments for {} sinks",
2300            sink_fragment_ids.len(),
2301            sink_num
2302        )
2303        .into());
2304    }
2305
2306    Ok(sink_fragment_ids.into_iter().collect())
2307}
2308
2309pub async fn has_table_been_migrated<C>(txn: &C, table_id: TableId) -> MetaResult<bool>
2310where
2311    C: ConnectionTrait,
2312{
2313    let mview_fragment: Vec<i32> = Fragment::find()
2314        .select_only()
2315        .column(fragment::Column::FragmentTypeMask)
2316        .filter(
2317            fragment::Column::JobId
2318                .eq(table_id)
2319                .and(FragmentTypeMask::intersects(FragmentTypeFlag::Mview)),
2320        )
2321        .into_tuple()
2322        .all(txn)
2323        .await?;
2324
2325    let mview_fragment_len = mview_fragment.len();
2326    if mview_fragment_len != 1 {
2327        bail!(
2328            "expected exactly one mview fragment for table {}, found {}",
2329            table_id,
2330            mview_fragment_len
2331        );
2332    }
2333
2334    let mview_fragment = mview_fragment.into_iter().next().unwrap();
2335    let migrated =
2336        FragmentTypeMask::from(mview_fragment).contains(FragmentTypeFlag::UpstreamSinkUnion);
2337
2338    Ok(migrated)
2339}
2340
2341pub async fn try_get_iceberg_table_by_downstream_sink<C>(
2342    txn: &C,
2343    sink_id: SinkId,
2344) -> MetaResult<Option<TableId>>
2345where
2346    C: ConnectionTrait,
2347{
2348    let sink = Sink::find_by_id(sink_id).one(txn).await?;
2349    let Some(sink) = sink else {
2350        return Ok(None);
2351    };
2352
2353    if sink.name.starts_with(ICEBERG_SINK_PREFIX) {
2354        let object_ids: Vec<ObjectId> = ObjectDependency::find()
2355            .select_only()
2356            .column(object_dependency::Column::Oid)
2357            .filter(object_dependency::Column::UsedBy.eq(sink_id))
2358            .into_tuple()
2359            .all(txn)
2360            .await?;
2361        let mut iceberg_table_ids = vec![];
2362        for object_id in object_ids {
2363            let table_id = object_id.as_table_id();
2364            if let Some(table_engine) = Table::find_by_id(table_id)
2365                .select_only()
2366                .column(table::Column::Engine)
2367                .into_tuple::<table::Engine>()
2368                .one(txn)
2369                .await?
2370                && table_engine == table::Engine::Iceberg
2371            {
2372                iceberg_table_ids.push(table_id);
2373            }
2374        }
2375        if iceberg_table_ids.len() == 1 {
2376            return Ok(Some(iceberg_table_ids[0]));
2377        }
2378    }
2379    Ok(None)
2380}
2381
2382pub async fn check_if_belongs_to_iceberg_table<C>(txn: &C, job_id: JobId) -> MetaResult<bool>
2383where
2384    C: ConnectionTrait,
2385{
2386    if let Some(engine) = Table::find_by_id(job_id.as_mv_table_id())
2387        .select_only()
2388        .column(table::Column::Engine)
2389        .into_tuple::<table::Engine>()
2390        .one(txn)
2391        .await?
2392        && engine == table::Engine::Iceberg
2393    {
2394        return Ok(true);
2395    }
2396    if let Some(sink_name) = Sink::find_by_id(job_id.as_sink_id())
2397        .select_only()
2398        .column(sink::Column::Name)
2399        .into_tuple::<String>()
2400        .one(txn)
2401        .await?
2402        && sink_name.starts_with(ICEBERG_SINK_PREFIX)
2403    {
2404        return Ok(true);
2405    }
2406    Ok(false)
2407}
2408
2409pub async fn find_dirty_iceberg_table_jobs<C>(
2410    txn: &C,
2411    database_id: Option<DatabaseId>,
2412) -> MetaResult<Vec<PartialObject>>
2413where
2414    C: ConnectionTrait,
2415{
2416    let mut filter_condition = streaming_job::Column::JobStatus
2417        .ne(JobStatus::Created)
2418        .and(object::Column::ObjType.is_in([ObjectType::Table, ObjectType::Sink]))
2419        .and(streaming_job::Column::CreateType.eq(CreateType::Background));
2420    if let Some(database_id) = database_id {
2421        filter_condition = filter_condition.and(object::Column::DatabaseId.eq(database_id));
2422    }
2423    let creating_table_sink_jobs: Vec<PartialObject> = StreamingJob::find()
2424        .select_only()
2425        .columns([
2426            object::Column::Oid,
2427            object::Column::ObjType,
2428            object::Column::SchemaId,
2429            object::Column::DatabaseId,
2430        ])
2431        .join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
2432        .filter(filter_condition)
2433        .into_partial_model()
2434        .all(txn)
2435        .await?;
2436
2437    let mut dirty_iceberg_table_jobs = vec![];
2438    for job in creating_table_sink_jobs {
2439        if check_if_belongs_to_iceberg_table(txn, job.oid.as_job_id()).await? {
2440            tracing::info!("Found dirty iceberg job with id: {}", job.oid);
2441            dirty_iceberg_table_jobs.push(job);
2442        }
2443    }
2444
2445    Ok(dirty_iceberg_table_jobs)
2446}
2447
2448pub fn build_select_node_list(
2449    from: &[ColumnCatalog],
2450    to: &[ColumnCatalog],
2451) -> MetaResult<Vec<PbExprNode>> {
2452    let mut exprs = Vec::with_capacity(to.len());
2453    let idx_by_col_id = from
2454        .iter()
2455        .enumerate()
2456        .map(|(idx, col)| (col.column_desc.as_ref().unwrap().column_id, idx))
2457        .collect::<HashMap<_, _>>();
2458
2459    for to_col in to {
2460        let to_col = to_col.column_desc.as_ref().unwrap();
2461        let to_col_type_ref = to_col.column_type.as_ref().unwrap();
2462        let to_col_type = DataType::from(to_col_type_ref);
2463        if let Some(from_idx) = idx_by_col_id.get(&to_col.column_id) {
2464            let from_col_type = DataType::from(
2465                from[*from_idx]
2466                    .column_desc
2467                    .as_ref()
2468                    .unwrap()
2469                    .column_type
2470                    .as_ref()
2471                    .unwrap(),
2472            );
2473            if !to_col_type.equals_datatype(&from_col_type) {
2474                return Err(anyhow!(
2475                    "Column type mismatch: {:?} != {:?}",
2476                    from_col_type,
2477                    to_col_type
2478                )
2479                .into());
2480            }
2481            exprs.push(PbExprNode {
2482                function_type: expr_node::Type::Unspecified.into(),
2483                return_type: Some(to_col_type_ref.clone()),
2484                rex_node: Some(expr_node::RexNode::InputRef(*from_idx as _)),
2485            });
2486        } else {
2487            let to_default_node =
2488                if let Some(GeneratedOrDefaultColumn::DefaultColumn(DefaultColumnDesc {
2489                    expr,
2490                    ..
2491                })) = &to_col.generated_or_default_column
2492                {
2493                    expr.clone().unwrap()
2494                } else {
2495                    let null = Datum::None.to_protobuf();
2496                    PbExprNode {
2497                        function_type: expr_node::Type::Unspecified.into(),
2498                        return_type: Some(to_col_type_ref.clone()),
2499                        rex_node: Some(expr_node::RexNode::Constant(null)),
2500                    }
2501                };
2502            exprs.push(to_default_node);
2503        }
2504    }
2505
2506    Ok(exprs)
2507}
2508
2509#[derive(Clone, Debug, Default)]
2510pub struct StreamingJobExtraInfo {
2511    pub timezone: Option<String>,
2512    pub config_override: Arc<str>,
2513    pub adaptive_parallelism_strategy: Option<AdaptiveParallelismStrategy>,
2514    pub job_definition: String,
2515    pub backfill_orders: Option<BackfillOrders>,
2516}
2517
2518impl StreamingJobExtraInfo {
2519    pub fn stream_context(&self) -> StreamContext {
2520        StreamContext {
2521            timezone: self.timezone.clone(),
2522            config_override: self.config_override.clone(),
2523            adaptive_parallelism_strategy: self.adaptive_parallelism_strategy,
2524        }
2525    }
2526}
2527
2528/// Tuple of (`job_id`, `timezone`, `config_override`, `adaptive_parallelism_strategy`, `backfill_orders`)
2529type StreamingJobExtraInfoRow = (
2530    JobId,
2531    Option<String>,
2532    Option<String>,
2533    Option<String>,
2534    Option<BackfillOrders>,
2535);
2536
2537pub async fn get_streaming_job_extra_info<C>(
2538    txn: &C,
2539    job_ids: Vec<JobId>,
2540) -> MetaResult<HashMap<JobId, StreamingJobExtraInfo>>
2541where
2542    C: ConnectionTrait,
2543{
2544    let pairs: Vec<StreamingJobExtraInfoRow> = StreamingJob::find()
2545        .select_only()
2546        .columns([
2547            streaming_job::Column::JobId,
2548            streaming_job::Column::Timezone,
2549            streaming_job::Column::ConfigOverride,
2550            streaming_job::Column::AdaptiveParallelismStrategy,
2551            streaming_job::Column::BackfillOrders,
2552        ])
2553        .filter(streaming_job::Column::JobId.is_in(job_ids.clone()))
2554        .into_tuple()
2555        .all(txn)
2556        .await?;
2557
2558    let job_ids = job_ids.into_iter().collect();
2559
2560    let mut definitions = resolve_streaming_job_definition(txn, &job_ids).await?;
2561
2562    let result = pairs
2563        .into_iter()
2564        .map(
2565            |(job_id, timezone, config_override, strategy, backfill_orders)| {
2566                let job_definition = definitions.remove(&job_id).unwrap_or_default();
2567                let adaptive_parallelism_strategy = strategy.as_deref().map(|s| {
2568                    parse_strategy(s).expect("strategy should be validated before storing")
2569                });
2570                (
2571                    job_id,
2572                    StreamingJobExtraInfo {
2573                        timezone,
2574                        config_override: config_override.unwrap_or_default().into(),
2575                        adaptive_parallelism_strategy,
2576                        job_definition,
2577                        backfill_orders,
2578                    },
2579                )
2580            },
2581        )
2582        .collect();
2583
2584    Ok(result)
2585}
2586
2587#[cfg(test)]
2588mod tests {
2589    use super::*;
2590
2591    #[test]
2592    fn test_extract_cdc_table_name() {
2593        let ddl1 = "CREATE TABLE t1 () FROM pg_source TABLE 'public.t1'";
2594        let ddl2 = "CREATE TABLE t2 (v1 int) FROM pg_source TABLE 'mydb.t2'";
2595        assert_eq!(
2596            extract_external_table_name_from_definition(ddl1),
2597            Some("public.t1".into())
2598        );
2599        assert_eq!(
2600            extract_external_table_name_from_definition(ddl2),
2601            Some("mydb.t2".into())
2602        );
2603    }
2604}