risingwave_meta/controller/
utils.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeSet, HashMap, HashSet};
16use std::sync::Arc;
17
18use anyhow::{Context, anyhow};
19use itertools::Itertools;
20use risingwave_common::bitmap::Bitmap;
21use risingwave_common::catalog::{
22    FragmentTypeFlag, FragmentTypeMask, ICEBERG_SINK_PREFIX, ICEBERG_SOURCE_PREFIX,
23};
24use risingwave_common::hash::{ActorMapping, VnodeBitmapExt, WorkerSlotId, WorkerSlotMapping};
25use risingwave_common::id::{JobId, SubscriptionId};
26use risingwave_common::system_param::AdaptiveParallelismStrategy;
27use risingwave_common::system_param::adaptive_parallelism_strategy::parse_strategy;
28use risingwave_common::types::{DataType, Datum};
29use risingwave_common::util::value_encoding::DatumToProtoExt;
30use risingwave_common::util::worker_util::DEFAULT_RESOURCE_GROUP;
31use risingwave_common::{bail, hash};
32use risingwave_meta_model::fragment::DistributionType;
33use risingwave_meta_model::object::ObjectType;
34use risingwave_meta_model::prelude::*;
35use risingwave_meta_model::streaming_job::BackfillOrders;
36use risingwave_meta_model::table::TableType;
37use risingwave_meta_model::user_privilege::Action;
38use risingwave_meta_model::{
39    ActorId, ColumnCatalogArray, CreateType, DataTypeArray, DatabaseId, DispatcherType, FragmentId,
40    JobStatus, ObjectId, PrivilegeId, SchemaId, SinkId, SourceId, StreamNode, StreamSourceInfo,
41    TableId, TableIdArray, UserId, WorkerId, connection, database, fragment, fragment_relation,
42    function, index, object, object_dependency, schema, secret, sink, source, streaming_job,
43    subscription, table, user, user_default_privilege, user_privilege, view,
44};
45use risingwave_meta_model_migration::WithQuery;
46use risingwave_pb::catalog::{
47    PbConnection, PbDatabase, PbFunction, PbIndex, PbSchema, PbSecret, PbSink, PbSource,
48    PbSubscription, PbTable, PbView,
49};
50use risingwave_pb::common::{PbObjectType, WorkerNode};
51use risingwave_pb::expr::{PbExprNode, expr_node};
52use risingwave_pb::meta::object::PbObjectInfo;
53use risingwave_pb::meta::subscribe_response::Info as NotificationInfo;
54use risingwave_pb::meta::{
55    ObjectDependency as PbObjectDependency, PbFragmentWorkerSlotMapping, PbObject, PbObjectGroup,
56};
57use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn;
58use risingwave_pb::plan_common::{ColumnCatalog, DefaultColumnDesc};
59use risingwave_pb::stream_plan::{PbDispatchOutputMapping, PbDispatcher, PbDispatcherType};
60use risingwave_pb::user::grant_privilege::{PbActionWithGrantOption, PbObject as PbGrantObject};
61use risingwave_pb::user::{PbAction, PbGrantPrivilege, PbUserInfo};
62use risingwave_sqlparser::ast::Statement as SqlStatement;
63use risingwave_sqlparser::parser::Parser;
64use sea_orm::sea_query::{
65    Alias, CommonTableExpression, Expr, Query, QueryStatementBuilder, SelectStatement, UnionType,
66    WithClause,
67};
68use sea_orm::{
69    ColumnTrait, ConnectionTrait, DatabaseTransaction, DerivePartialModel, EntityTrait,
70    FromQueryResult, IntoActiveModel, JoinType, Order, PaginatorTrait, QueryFilter, QuerySelect,
71    RelationTrait, Set, Statement,
72};
73use thiserror_ext::AsReport;
74use tracing::warn;
75
76use crate::barrier::{SharedActorInfos, SharedFragmentInfo};
77use crate::controller::ObjectModel;
78use crate::controller::fragment::FragmentTypeMaskExt;
79use crate::controller::scale::resolve_streaming_job_definition;
80use crate::model::{FragmentDownstreamRelation, StreamContext};
81use crate::{MetaError, MetaResult};
82
83/// This function will construct a query using recursive cte to find all objects[(id, `obj_type`)] that are used by the given object.
84///
85/// # Examples
86///
87/// ```
88/// use risingwave_meta::controller::utils::construct_obj_dependency_query;
89/// use sea_orm::sea_query::*;
90/// use sea_orm::*;
91///
92/// let query = construct_obj_dependency_query(1.into());
93///
94/// assert_eq!(
95///     query.to_string(MysqlQueryBuilder),
96///     r#"WITH RECURSIVE `used_by_object_ids` (`used_by`) AS (SELECT `used_by` FROM `object_dependency` WHERE `object_dependency`.`oid` = 1 UNION ALL (SELECT `oid` FROM `object` WHERE `object`.`database_id` = 1 OR `object`.`schema_id` = 1) UNION ALL (SELECT `object_dependency`.`used_by` FROM `object_dependency` INNER JOIN `used_by_object_ids` ON `used_by_object_ids`.`used_by` = `oid`)) SELECT DISTINCT `oid`, `obj_type`, `schema_id`, `database_id` FROM `used_by_object_ids` INNER JOIN `object` ON `used_by_object_ids`.`used_by` = `oid` ORDER BY `oid` DESC"#
97/// );
98/// assert_eq!(
99///     query.to_string(PostgresQueryBuilder),
100///     r#"WITH RECURSIVE "used_by_object_ids" ("used_by") AS (SELECT "used_by" FROM "object_dependency" WHERE "object_dependency"."oid" = 1 UNION ALL (SELECT "oid" FROM "object" WHERE "object"."database_id" = 1 OR "object"."schema_id" = 1) UNION ALL (SELECT "object_dependency"."used_by" FROM "object_dependency" INNER JOIN "used_by_object_ids" ON "used_by_object_ids"."used_by" = "oid")) SELECT DISTINCT "oid", "obj_type", "schema_id", "database_id" FROM "used_by_object_ids" INNER JOIN "object" ON "used_by_object_ids"."used_by" = "oid" ORDER BY "oid" DESC"#
101/// );
102/// assert_eq!(
103///     query.to_string(SqliteQueryBuilder),
104///     r#"WITH RECURSIVE "used_by_object_ids" ("used_by") AS (SELECT "used_by" FROM "object_dependency" WHERE "object_dependency"."oid" = 1 UNION ALL SELECT "oid" FROM "object" WHERE "object"."database_id" = 1 OR "object"."schema_id" = 1 UNION ALL SELECT "object_dependency"."used_by" FROM "object_dependency" INNER JOIN "used_by_object_ids" ON "used_by_object_ids"."used_by" = "oid") SELECT DISTINCT "oid", "obj_type", "schema_id", "database_id" FROM "used_by_object_ids" INNER JOIN "object" ON "used_by_object_ids"."used_by" = "oid" ORDER BY "oid" DESC"#
105/// );
106/// ```
107pub fn construct_obj_dependency_query(obj_id: ObjectId) -> WithQuery {
108    let cte_alias = Alias::new("used_by_object_ids");
109    let cte_return_alias = Alias::new("used_by");
110
111    let mut base_query = SelectStatement::new()
112        .column(object_dependency::Column::UsedBy)
113        .from(ObjectDependency)
114        .and_where(object_dependency::Column::Oid.eq(obj_id))
115        .to_owned();
116
117    let belonged_obj_query = SelectStatement::new()
118        .column(object::Column::Oid)
119        .from(Object)
120        .and_where(
121            object::Column::DatabaseId
122                .eq(obj_id)
123                .or(object::Column::SchemaId.eq(obj_id)),
124        )
125        .to_owned();
126
127    let cte_referencing = Query::select()
128        .column((ObjectDependency, object_dependency::Column::UsedBy))
129        .from(ObjectDependency)
130        .inner_join(
131            cte_alias.clone(),
132            Expr::col((cte_alias.clone(), cte_return_alias.clone()))
133                .equals(object_dependency::Column::Oid),
134        )
135        .to_owned();
136
137    let mut common_table_expr = CommonTableExpression::new();
138    common_table_expr
139        .query(
140            base_query
141                .union(UnionType::All, belonged_obj_query)
142                .union(UnionType::All, cte_referencing)
143                .to_owned(),
144        )
145        .column(cte_return_alias.clone())
146        .table_name(cte_alias.clone());
147
148    SelectStatement::new()
149        .distinct()
150        .columns([
151            object::Column::Oid,
152            object::Column::ObjType,
153            object::Column::SchemaId,
154            object::Column::DatabaseId,
155        ])
156        .from(cte_alias.clone())
157        .inner_join(
158            Object,
159            Expr::col((cte_alias, cte_return_alias)).equals(object::Column::Oid),
160        )
161        .order_by(object::Column::Oid, Order::Desc)
162        .to_owned()
163        .with(
164            WithClause::new()
165                .recursive(true)
166                .cte(common_table_expr)
167                .to_owned(),
168        )
169}
170
171fn to_pb_object_type(obj_type: ObjectType) -> PbObjectType {
172    match obj_type {
173        ObjectType::Database => PbObjectType::Database,
174        ObjectType::Schema => PbObjectType::Schema,
175        ObjectType::Table => PbObjectType::Table,
176        ObjectType::Source => PbObjectType::Source,
177        ObjectType::Sink => PbObjectType::Sink,
178        ObjectType::View => PbObjectType::View,
179        ObjectType::Index => PbObjectType::Index,
180        ObjectType::Function => PbObjectType::Function,
181        ObjectType::Connection => PbObjectType::Connection,
182        ObjectType::Subscription => PbObjectType::Subscription,
183        ObjectType::Secret => PbObjectType::Secret,
184    }
185}
186
187/// Collect object dependencies with optional object id filtering, and optionally exclude non-created streaming jobs.
188async fn list_object_dependencies_impl(
189    txn: &DatabaseTransaction,
190    object_id: Option<ObjectId>,
191    include_creating: bool,
192) -> MetaResult<Vec<PbObjectDependency>> {
193    let referenced_alias = Alias::new("referenced_obj");
194    let mut query = ObjectDependency::find()
195        .select_only()
196        .columns([
197            object_dependency::Column::Oid,
198            object_dependency::Column::UsedBy,
199        ])
200        .column_as(
201            Expr::col((referenced_alias.clone(), object::Column::ObjType)),
202            "referenced_obj_type",
203        )
204        .join(
205            JoinType::InnerJoin,
206            object_dependency::Relation::Object1.def(),
207        )
208        .join_as(
209            JoinType::InnerJoin,
210            object_dependency::Relation::Object2.def(),
211            referenced_alias.clone(),
212        );
213    if let Some(object_id) = object_id {
214        query = query.filter(object_dependency::Column::UsedBy.eq(object_id));
215    }
216    let mut obj_dependencies: Vec<PbObjectDependency> = query
217        .into_tuple()
218        .all(txn)
219        .await?
220        .into_iter()
221        .map(|(oid, used_by, referenced_type)| PbObjectDependency {
222            object_id: used_by,
223            referenced_object_id: oid,
224            referenced_object_type: to_pb_object_type(referenced_type) as i32,
225        })
226        .collect();
227
228    let mut sink_query = Sink::find()
229        .select_only()
230        .columns([sink::Column::SinkId, sink::Column::TargetTable])
231        .filter(sink::Column::TargetTable.is_not_null());
232    if let Some(object_id) = object_id {
233        sink_query = sink_query.filter(
234            sink::Column::SinkId
235                .eq(object_id)
236                .or(sink::Column::TargetTable.eq(object_id)),
237        );
238    }
239    let sink_dependencies: Vec<(SinkId, TableId)> = sink_query.into_tuple().all(txn).await?;
240    obj_dependencies.extend(sink_dependencies.into_iter().map(|(sink_id, table_id)| {
241        PbObjectDependency {
242            object_id: table_id.into(),
243            referenced_object_id: sink_id.into(),
244            referenced_object_type: PbObjectType::Sink as i32,
245        }
246    }));
247
248    if !include_creating {
249        let mut streaming_job_ids = obj_dependencies
250            .iter()
251            .map(|dependency| dependency.object_id)
252            .collect_vec();
253        streaming_job_ids.sort_unstable();
254        streaming_job_ids.dedup();
255
256        if !streaming_job_ids.is_empty() {
257            let non_created_jobs: HashSet<JobId> = StreamingJob::find()
258                .select_only()
259                .columns([streaming_job::Column::JobId])
260                .filter(
261                    streaming_job::Column::JobId
262                        .is_in(streaming_job_ids)
263                        .and(streaming_job::Column::JobStatus.ne(JobStatus::Created)),
264                )
265                .into_tuple()
266                .all(txn)
267                .await?
268                .into_iter()
269                .collect();
270
271            if !non_created_jobs.is_empty() {
272                obj_dependencies.retain(|dependency| {
273                    !non_created_jobs.contains(&dependency.object_id.as_job_id())
274                });
275            }
276        }
277    }
278
279    Ok(obj_dependencies)
280}
281
282/// List object dependencies with optional filtering of non-created streaming jobs.
283pub async fn list_object_dependencies(
284    txn: &DatabaseTransaction,
285    include_creating: bool,
286) -> MetaResult<Vec<PbObjectDependency>> {
287    list_object_dependencies_impl(txn, None, include_creating).await
288}
289
290/// List object dependencies for the specified object id without filtering by job status.
291pub async fn list_object_dependencies_by_object_id(
292    txn: &DatabaseTransaction,
293    object_id: ObjectId,
294) -> MetaResult<Vec<PbObjectDependency>> {
295    list_object_dependencies_impl(txn, Some(object_id), true).await
296}
297
298/// This function will construct a query using recursive cte to find if dependent objects are already relying on the target table.
299///
300/// # Examples
301///
302/// ```
303/// use risingwave_meta::controller::utils::construct_sink_cycle_check_query;
304/// use sea_orm::sea_query::*;
305/// use sea_orm::*;
306///
307/// let query = construct_sink_cycle_check_query(1.into(), vec![2.into(), 3.into()]);
308///
309/// assert_eq!(
310///     query.to_string(MysqlQueryBuilder),
311///     r#"WITH RECURSIVE `used_by_object_ids_with_sink` (`oid`, `used_by`) AS (SELECT `oid`, `used_by` FROM `object_dependency` WHERE `object_dependency`.`oid` = 1 UNION ALL (SELECT `obj_dependency_with_sink`.`oid`, `obj_dependency_with_sink`.`used_by` FROM (SELECT `oid`, `used_by` FROM `object_dependency` UNION ALL (SELECT `sink_id`, `target_table` FROM `sink` WHERE `sink`.`target_table` IS NOT NULL)) AS `obj_dependency_with_sink` INNER JOIN `used_by_object_ids_with_sink` ON `used_by_object_ids_with_sink`.`used_by` = `obj_dependency_with_sink`.`oid` WHERE `used_by_object_ids_with_sink`.`used_by` <> `used_by_object_ids_with_sink`.`oid`)) SELECT COUNT(`used_by_object_ids_with_sink`.`used_by`) FROM `used_by_object_ids_with_sink` WHERE `used_by_object_ids_with_sink`.`used_by` IN (2, 3)"#
312/// );
313/// assert_eq!(
314///     query.to_string(PostgresQueryBuilder),
315///     r#"WITH RECURSIVE "used_by_object_ids_with_sink" ("oid", "used_by") AS (SELECT "oid", "used_by" FROM "object_dependency" WHERE "object_dependency"."oid" = 1 UNION ALL (SELECT "obj_dependency_with_sink"."oid", "obj_dependency_with_sink"."used_by" FROM (SELECT "oid", "used_by" FROM "object_dependency" UNION ALL (SELECT "sink_id", "target_table" FROM "sink" WHERE "sink"."target_table" IS NOT NULL)) AS "obj_dependency_with_sink" INNER JOIN "used_by_object_ids_with_sink" ON "used_by_object_ids_with_sink"."used_by" = "obj_dependency_with_sink"."oid" WHERE "used_by_object_ids_with_sink"."used_by" <> "used_by_object_ids_with_sink"."oid")) SELECT COUNT("used_by_object_ids_with_sink"."used_by") FROM "used_by_object_ids_with_sink" WHERE "used_by_object_ids_with_sink"."used_by" IN (2, 3)"#
316/// );
317/// assert_eq!(
318///     query.to_string(SqliteQueryBuilder),
319///     r#"WITH RECURSIVE "used_by_object_ids_with_sink" ("oid", "used_by") AS (SELECT "oid", "used_by" FROM "object_dependency" WHERE "object_dependency"."oid" = 1 UNION ALL SELECT "obj_dependency_with_sink"."oid", "obj_dependency_with_sink"."used_by" FROM (SELECT "oid", "used_by" FROM "object_dependency" UNION ALL SELECT "sink_id", "target_table" FROM "sink" WHERE "sink"."target_table" IS NOT NULL) AS "obj_dependency_with_sink" INNER JOIN "used_by_object_ids_with_sink" ON "used_by_object_ids_with_sink"."used_by" = "obj_dependency_with_sink"."oid" WHERE "used_by_object_ids_with_sink"."used_by" <> "used_by_object_ids_with_sink"."oid") SELECT COUNT("used_by_object_ids_with_sink"."used_by") FROM "used_by_object_ids_with_sink" WHERE "used_by_object_ids_with_sink"."used_by" IN (2, 3)"#
320/// );
321/// ```
322pub fn construct_sink_cycle_check_query(
323    target_table: ObjectId,
324    dependent_objects: Vec<ObjectId>,
325) -> WithQuery {
326    let cte_alias = Alias::new("used_by_object_ids_with_sink");
327    let depend_alias = Alias::new("obj_dependency_with_sink");
328
329    let mut base_query = SelectStatement::new()
330        .columns([
331            object_dependency::Column::Oid,
332            object_dependency::Column::UsedBy,
333        ])
334        .from(ObjectDependency)
335        .and_where(object_dependency::Column::Oid.eq(target_table))
336        .to_owned();
337
338    let query_sink_deps = SelectStatement::new()
339        .columns([sink::Column::SinkId, sink::Column::TargetTable])
340        .from(Sink)
341        .and_where(sink::Column::TargetTable.is_not_null())
342        .to_owned();
343
344    let cte_referencing = Query::select()
345        .column((depend_alias.clone(), object_dependency::Column::Oid))
346        .column((depend_alias.clone(), object_dependency::Column::UsedBy))
347        .from_subquery(
348            SelectStatement::new()
349                .columns([
350                    object_dependency::Column::Oid,
351                    object_dependency::Column::UsedBy,
352                ])
353                .from(ObjectDependency)
354                .union(UnionType::All, query_sink_deps)
355                .to_owned(),
356            depend_alias.clone(),
357        )
358        .inner_join(
359            cte_alias.clone(),
360            Expr::col((cte_alias.clone(), object_dependency::Column::UsedBy))
361                .eq(Expr::col((depend_alias, object_dependency::Column::Oid))),
362        )
363        .and_where(
364            Expr::col((cte_alias.clone(), object_dependency::Column::UsedBy)).ne(Expr::col((
365                cte_alias.clone(),
366                object_dependency::Column::Oid,
367            ))),
368        )
369        .to_owned();
370
371    let mut common_table_expr = CommonTableExpression::new();
372    common_table_expr
373        .query(base_query.union(UnionType::All, cte_referencing).to_owned())
374        .columns([
375            object_dependency::Column::Oid,
376            object_dependency::Column::UsedBy,
377        ])
378        .table_name(cte_alias.clone());
379
380    SelectStatement::new()
381        .expr(Expr::col((cte_alias.clone(), object_dependency::Column::UsedBy)).count())
382        .from(cte_alias.clone())
383        .and_where(
384            Expr::col((cte_alias, object_dependency::Column::UsedBy)).is_in(dependent_objects),
385        )
386        .to_owned()
387        .with(
388            WithClause::new()
389                .recursive(true)
390                .cte(common_table_expr)
391                .to_owned(),
392        )
393}
394
395#[derive(Clone, DerivePartialModel, FromQueryResult, Debug)]
396#[sea_orm(entity = "Object")]
397pub struct PartialObject {
398    pub oid: ObjectId,
399    pub obj_type: ObjectType,
400    pub schema_id: Option<SchemaId>,
401    pub database_id: Option<DatabaseId>,
402}
403
404#[derive(Clone, DerivePartialModel, FromQueryResult)]
405#[sea_orm(entity = "Fragment")]
406pub struct PartialFragmentStateTables {
407    pub fragment_id: FragmentId,
408    pub job_id: ObjectId,
409    pub state_table_ids: TableIdArray,
410}
411
412#[derive(Clone, Eq, PartialEq, Debug)]
413pub struct PartialActorLocation {
414    pub actor_id: ActorId,
415    pub fragment_id: FragmentId,
416    pub worker_id: WorkerId,
417}
418
419#[derive(FromQueryResult, Debug, Eq, PartialEq, Clone)]
420pub struct FragmentDesc {
421    pub fragment_id: FragmentId,
422    pub job_id: JobId,
423    pub fragment_type_mask: i32,
424    pub distribution_type: DistributionType,
425    pub state_table_ids: TableIdArray,
426    pub parallelism: i64,
427    pub vnode_count: i32,
428    pub stream_node: StreamNode,
429    pub parallelism_policy: String,
430}
431
432/// List all objects that are using the given one in a cascade way. It runs a recursive CTE to find all the dependencies.
433pub async fn get_referring_objects_cascade<C>(
434    obj_id: ObjectId,
435    db: &C,
436) -> MetaResult<Vec<PartialObject>>
437where
438    C: ConnectionTrait,
439{
440    let query = construct_obj_dependency_query(obj_id);
441    let (sql, values) = query.build_any(&*db.get_database_backend().get_query_builder());
442    let objects = PartialObject::find_by_statement(Statement::from_sql_and_values(
443        db.get_database_backend(),
444        sql,
445        values,
446    ))
447    .all(db)
448    .await?;
449    Ok(objects)
450}
451
452/// Check if create a sink with given dependent objects into the target table will cause a cycle, return true if it will.
453pub async fn check_sink_into_table_cycle<C>(
454    target_table: ObjectId,
455    dependent_objs: Vec<ObjectId>,
456    db: &C,
457) -> MetaResult<bool>
458where
459    C: ConnectionTrait,
460{
461    if dependent_objs.is_empty() {
462        return Ok(false);
463    }
464
465    // special check for self referencing
466    if dependent_objs.contains(&target_table) {
467        return Ok(true);
468    }
469
470    let query = construct_sink_cycle_check_query(target_table, dependent_objs);
471    let (sql, values) = query.build_any(&*db.get_database_backend().get_query_builder());
472
473    let res = db
474        .query_one(Statement::from_sql_and_values(
475            db.get_database_backend(),
476            sql,
477            values,
478        ))
479        .await?
480        .unwrap();
481
482    let cnt: i64 = res.try_get_by(0)?;
483
484    Ok(cnt != 0)
485}
486
487/// `ensure_object_id` ensures the existence of target object in the cluster.
488pub async fn ensure_object_id<C>(
489    object_type: ObjectType,
490    obj_id: impl Into<ObjectId>,
491    db: &C,
492) -> MetaResult<()>
493where
494    C: ConnectionTrait,
495{
496    let obj_id = obj_id.into();
497    let count = Object::find_by_id(obj_id).count(db).await?;
498    if count == 0 {
499        return Err(MetaError::catalog_id_not_found(
500            object_type.as_str(),
501            obj_id,
502        ));
503    }
504    Ok(())
505}
506
507pub async fn ensure_job_not_canceled<C>(job_id: JobId, db: &C) -> MetaResult<()>
508where
509    C: ConnectionTrait,
510{
511    let count = Object::find_by_id(job_id).count(db).await?;
512    if count == 0 {
513        return Err(MetaError::cancelled(format!(
514            "job {} might be cancelled manually or by recovery",
515            job_id
516        )));
517    }
518    Ok(())
519}
520
521/// `ensure_user_id` ensures the existence of target user in the cluster.
522pub async fn ensure_user_id<C>(user_id: UserId, db: &C) -> MetaResult<()>
523where
524    C: ConnectionTrait,
525{
526    let count = User::find_by_id(user_id).count(db).await?;
527    if count == 0 {
528        return Err(anyhow!("user {} was concurrently dropped", user_id).into());
529    }
530    Ok(())
531}
532
533/// `check_database_name_duplicate` checks whether the database name is already used in the cluster.
534pub async fn check_database_name_duplicate<C>(name: &str, db: &C) -> MetaResult<()>
535where
536    C: ConnectionTrait,
537{
538    let count = Database::find()
539        .filter(database::Column::Name.eq(name))
540        .count(db)
541        .await?;
542    if count > 0 {
543        assert_eq!(count, 1);
544        return Err(MetaError::catalog_duplicated("database", name));
545    }
546    Ok(())
547}
548
549/// `check_function_signature_duplicate` checks whether the function name and its signature is already used in the target namespace.
550pub async fn check_function_signature_duplicate<C>(
551    pb_function: &PbFunction,
552    db: &C,
553) -> MetaResult<()>
554where
555    C: ConnectionTrait,
556{
557    let count = Function::find()
558        .inner_join(Object)
559        .filter(
560            object::Column::DatabaseId
561                .eq(pb_function.database_id)
562                .and(object::Column::SchemaId.eq(pb_function.schema_id))
563                .and(function::Column::Name.eq(&pb_function.name))
564                .and(
565                    function::Column::ArgTypes
566                        .eq(DataTypeArray::from(pb_function.arg_types.clone())),
567                ),
568        )
569        .count(db)
570        .await?;
571    if count > 0 {
572        assert_eq!(count, 1);
573        return Err(MetaError::catalog_duplicated("function", &pb_function.name));
574    }
575    Ok(())
576}
577
578/// `check_connection_name_duplicate` checks whether the connection name is already used in the target namespace.
579pub async fn check_connection_name_duplicate<C>(
580    pb_connection: &PbConnection,
581    db: &C,
582) -> MetaResult<()>
583where
584    C: ConnectionTrait,
585{
586    let count = Connection::find()
587        .inner_join(Object)
588        .filter(
589            object::Column::DatabaseId
590                .eq(pb_connection.database_id)
591                .and(object::Column::SchemaId.eq(pb_connection.schema_id))
592                .and(connection::Column::Name.eq(&pb_connection.name)),
593        )
594        .count(db)
595        .await?;
596    if count > 0 {
597        assert_eq!(count, 1);
598        return Err(MetaError::catalog_duplicated(
599            "connection",
600            &pb_connection.name,
601        ));
602    }
603    Ok(())
604}
605
606pub async fn check_secret_name_duplicate<C>(pb_secret: &PbSecret, db: &C) -> MetaResult<()>
607where
608    C: ConnectionTrait,
609{
610    let count = Secret::find()
611        .inner_join(Object)
612        .filter(
613            object::Column::DatabaseId
614                .eq(pb_secret.database_id)
615                .and(object::Column::SchemaId.eq(pb_secret.schema_id))
616                .and(secret::Column::Name.eq(&pb_secret.name)),
617        )
618        .count(db)
619        .await?;
620    if count > 0 {
621        assert_eq!(count, 1);
622        return Err(MetaError::catalog_duplicated("secret", &pb_secret.name));
623    }
624    Ok(())
625}
626
627pub async fn check_subscription_name_duplicate<C>(
628    pb_subscription: &PbSubscription,
629    db: &C,
630) -> MetaResult<()>
631where
632    C: ConnectionTrait,
633{
634    let count = Subscription::find()
635        .inner_join(Object)
636        .filter(
637            object::Column::DatabaseId
638                .eq(pb_subscription.database_id)
639                .and(object::Column::SchemaId.eq(pb_subscription.schema_id))
640                .and(subscription::Column::Name.eq(&pb_subscription.name)),
641        )
642        .count(db)
643        .await?;
644    if count > 0 {
645        assert_eq!(count, 1);
646        return Err(MetaError::catalog_duplicated(
647            "subscription",
648            &pb_subscription.name,
649        ));
650    }
651    Ok(())
652}
653
654/// `check_user_name_duplicate` checks whether the user is already existed in the cluster.
655pub async fn check_user_name_duplicate<C>(name: &str, db: &C) -> MetaResult<()>
656where
657    C: ConnectionTrait,
658{
659    let count = User::find()
660        .filter(user::Column::Name.eq(name))
661        .count(db)
662        .await?;
663    if count > 0 {
664        assert_eq!(count, 1);
665        return Err(MetaError::catalog_duplicated("user", name));
666    }
667    Ok(())
668}
669
670/// `check_relation_name_duplicate` checks whether the relation name is already used in the target namespace.
671pub async fn check_relation_name_duplicate<C>(
672    name: &str,
673    database_id: DatabaseId,
674    schema_id: SchemaId,
675    db: &C,
676) -> MetaResult<()>
677where
678    C: ConnectionTrait,
679{
680    macro_rules! check_duplicated {
681        ($obj_type:expr, $entity:ident, $table:ident) => {
682            let object_id = Object::find()
683                .select_only()
684                .column(object::Column::Oid)
685                .inner_join($entity)
686                .filter(
687                    object::Column::DatabaseId
688                        .eq(Some(database_id))
689                        .and(object::Column::SchemaId.eq(Some(schema_id)))
690                        .and($table::Column::Name.eq(name)),
691                )
692                .into_tuple::<ObjectId>()
693                .one(db)
694                .await?;
695            if let Some(oid) = object_id {
696                let check_creation = if $obj_type == ObjectType::View {
697                    false
698                } else if $obj_type == ObjectType::Source {
699                    let source_info = Source::find_by_id(oid.as_source_id())
700                        .select_only()
701                        .column(source::Column::SourceInfo)
702                        .into_tuple::<Option<StreamSourceInfo>>()
703                        .one(db)
704                        .await?
705                        .unwrap();
706                    source_info.map_or(false, |info| info.to_protobuf().is_shared())
707                } else {
708                    true
709                };
710                let job_id = oid.as_job_id();
711                return if check_creation
712                    && !matches!(
713                        StreamingJob::find_by_id(job_id)
714                            .select_only()
715                            .column(streaming_job::Column::JobStatus)
716                            .into_tuple::<JobStatus>()
717                            .one(db)
718                            .await?,
719                        Some(JobStatus::Created)
720                    ) {
721                    Err(MetaError::catalog_under_creation(
722                        $obj_type.as_str(),
723                        name,
724                        job_id,
725                    ))
726                } else {
727                    Err(MetaError::catalog_duplicated($obj_type.as_str(), name))
728                };
729            }
730        };
731    }
732    check_duplicated!(ObjectType::Table, Table, table);
733    check_duplicated!(ObjectType::Source, Source, source);
734    check_duplicated!(ObjectType::Sink, Sink, sink);
735    check_duplicated!(ObjectType::Index, Index, index);
736    check_duplicated!(ObjectType::View, View, view);
737
738    Ok(())
739}
740
741/// `check_schema_name_duplicate` checks whether the schema name is already used in the target database.
742pub async fn check_schema_name_duplicate<C>(
743    name: &str,
744    database_id: DatabaseId,
745    db: &C,
746) -> MetaResult<()>
747where
748    C: ConnectionTrait,
749{
750    let count = Object::find()
751        .inner_join(Schema)
752        .filter(
753            object::Column::ObjType
754                .eq(ObjectType::Schema)
755                .and(object::Column::DatabaseId.eq(Some(database_id)))
756                .and(schema::Column::Name.eq(name)),
757        )
758        .count(db)
759        .await?;
760    if count != 0 {
761        return Err(MetaError::catalog_duplicated("schema", name));
762    }
763
764    Ok(())
765}
766
767/// `check_object_refer_for_drop` checks whether the object is used by other objects except indexes.
768/// It returns an error that contains the details of the referring objects if it is used by others.
769pub async fn check_object_refer_for_drop<C>(
770    object_type: ObjectType,
771    object_id: ObjectId,
772    db: &C,
773) -> MetaResult<()>
774where
775    C: ConnectionTrait,
776{
777    // Ignore indexes.
778    let count = if object_type == ObjectType::Table {
779        ObjectDependency::find()
780            .join(
781                JoinType::InnerJoin,
782                object_dependency::Relation::Object1.def(),
783            )
784            .filter(
785                object_dependency::Column::Oid
786                    .eq(object_id)
787                    .and(object::Column::ObjType.ne(ObjectType::Index)),
788            )
789            .count(db)
790            .await?
791    } else {
792        ObjectDependency::find()
793            .filter(object_dependency::Column::Oid.eq(object_id))
794            .count(db)
795            .await?
796    };
797    if count != 0 {
798        // find the name of all objects that are using the given one.
799        let referring_objects = get_referring_objects(object_id, db).await?;
800        let referring_objs_map = referring_objects
801            .into_iter()
802            .filter(|o| o.obj_type != ObjectType::Index)
803            .into_group_map_by(|o| o.obj_type);
804        let mut details = vec![];
805        for (obj_type, objs) in referring_objs_map {
806            match obj_type {
807                ObjectType::Table => {
808                    let tables: Vec<(String, String)> = Object::find()
809                        .join(JoinType::InnerJoin, object::Relation::Table.def())
810                        .join(JoinType::InnerJoin, object::Relation::Database2.def())
811                        .join(JoinType::InnerJoin, object::Relation::Schema2.def())
812                        .select_only()
813                        .column(schema::Column::Name)
814                        .column(table::Column::Name)
815                        .filter(object::Column::Oid.is_in(objs.iter().map(|o| o.oid)))
816                        .into_tuple()
817                        .all(db)
818                        .await?;
819                    details.extend(tables.into_iter().map(|(schema_name, table_name)| {
820                        format!(
821                            "materialized view {}.{} depends on it",
822                            schema_name, table_name
823                        )
824                    }));
825                }
826                ObjectType::Sink => {
827                    let sinks: Vec<(String, String)> = Object::find()
828                        .join(JoinType::InnerJoin, object::Relation::Sink.def())
829                        .join(JoinType::InnerJoin, object::Relation::Database2.def())
830                        .join(JoinType::InnerJoin, object::Relation::Schema2.def())
831                        .select_only()
832                        .column(schema::Column::Name)
833                        .column(sink::Column::Name)
834                        .filter(object::Column::Oid.is_in(objs.iter().map(|o| o.oid)))
835                        .into_tuple()
836                        .all(db)
837                        .await?;
838                    if object_type == ObjectType::Table {
839                        let engine = Table::find_by_id(object_id.as_table_id())
840                            .select_only()
841                            .column(table::Column::Engine)
842                            .into_tuple::<table::Engine>()
843                            .one(db)
844                            .await?;
845                        if engine == Some(table::Engine::Iceberg) && sinks.len() == 1 {
846                            continue;
847                        }
848                    }
849                    details.extend(sinks.into_iter().map(|(schema_name, sink_name)| {
850                        format!("sink {}.{} depends on it", schema_name, sink_name)
851                    }));
852                }
853                ObjectType::View => {
854                    let views: Vec<(String, String)> = Object::find()
855                        .join(JoinType::InnerJoin, object::Relation::View.def())
856                        .join(JoinType::InnerJoin, object::Relation::Database2.def())
857                        .join(JoinType::InnerJoin, object::Relation::Schema2.def())
858                        .select_only()
859                        .column(schema::Column::Name)
860                        .column(view::Column::Name)
861                        .filter(object::Column::Oid.is_in(objs.iter().map(|o| o.oid)))
862                        .into_tuple()
863                        .all(db)
864                        .await?;
865                    details.extend(views.into_iter().map(|(schema_name, view_name)| {
866                        format!("view {}.{} depends on it", schema_name, view_name)
867                    }));
868                }
869                ObjectType::Subscription => {
870                    let subscriptions: Vec<(String, String)> = Object::find()
871                        .join(JoinType::InnerJoin, object::Relation::Subscription.def())
872                        .join(JoinType::InnerJoin, object::Relation::Database2.def())
873                        .join(JoinType::InnerJoin, object::Relation::Schema2.def())
874                        .select_only()
875                        .column(schema::Column::Name)
876                        .column(subscription::Column::Name)
877                        .filter(object::Column::Oid.is_in(objs.iter().map(|o| o.oid)))
878                        .into_tuple()
879                        .all(db)
880                        .await?;
881                    details.extend(subscriptions.into_iter().map(
882                        |(schema_name, subscription_name)| {
883                            format!(
884                                "subscription {}.{} depends on it",
885                                schema_name, subscription_name
886                            )
887                        },
888                    ));
889                }
890                ObjectType::Source => {
891                    let sources: Vec<(String, String)> = Object::find()
892                        .join(JoinType::InnerJoin, object::Relation::Source.def())
893                        .join(JoinType::InnerJoin, object::Relation::Database2.def())
894                        .join(JoinType::InnerJoin, object::Relation::Schema2.def())
895                        .select_only()
896                        .column(schema::Column::Name)
897                        .column(source::Column::Name)
898                        .filter(object::Column::Oid.is_in(objs.iter().map(|o| o.oid)))
899                        .into_tuple()
900                        .all(db)
901                        .await?;
902                    details.extend(sources.into_iter().map(|(schema_name, view_name)| {
903                        format!("source {}.{} depends on it", schema_name, view_name)
904                    }));
905                }
906                ObjectType::Connection => {
907                    let connections: Vec<(String, String)> = Object::find()
908                        .join(JoinType::InnerJoin, object::Relation::Connection.def())
909                        .join(JoinType::InnerJoin, object::Relation::Database2.def())
910                        .join(JoinType::InnerJoin, object::Relation::Schema2.def())
911                        .select_only()
912                        .column(schema::Column::Name)
913                        .column(connection::Column::Name)
914                        .filter(object::Column::Oid.is_in(objs.iter().map(|o| o.oid)))
915                        .into_tuple()
916                        .all(db)
917                        .await?;
918                    details.extend(connections.into_iter().map(|(schema_name, view_name)| {
919                        format!("connection {}.{} depends on it", schema_name, view_name)
920                    }));
921                }
922                // only the table, source, sink, subscription, view, connection and index will depend on other objects.
923                _ => bail!("unexpected referring object type: {}", obj_type.as_str()),
924            }
925        }
926        if details.is_empty() {
927            return Ok(());
928        }
929
930        return Err(MetaError::permission_denied(format!(
931            "{} used by {} other objects. \nDETAIL: {}\n\
932            {}",
933            object_type.as_str(),
934            details.len(),
935            details.join("\n"),
936            match object_type {
937                ObjectType::Function | ObjectType::Connection | ObjectType::Secret =>
938                    "HINT: DROP the dependent objects first.",
939                ObjectType::Database | ObjectType::Schema => unreachable!(),
940                _ => "HINT:  Use DROP ... CASCADE to drop the dependent objects too.",
941            }
942        )));
943    }
944    Ok(())
945}
946
947/// List all objects that are using the given one.
948pub async fn get_referring_objects<C>(object_id: ObjectId, db: &C) -> MetaResult<Vec<PartialObject>>
949where
950    C: ConnectionTrait,
951{
952    let objs = ObjectDependency::find()
953        .filter(object_dependency::Column::Oid.eq(object_id))
954        .join(
955            JoinType::InnerJoin,
956            object_dependency::Relation::Object1.def(),
957        )
958        .into_partial_model()
959        .all(db)
960        .await?;
961
962    Ok(objs)
963}
964
965/// `ensure_schema_empty` ensures that the schema is empty, used by `DROP SCHEMA`.
966pub async fn ensure_schema_empty<C>(schema_id: SchemaId, db: &C) -> MetaResult<()>
967where
968    C: ConnectionTrait,
969{
970    let count = Object::find()
971        .filter(object::Column::SchemaId.eq(Some(schema_id)))
972        .count(db)
973        .await?;
974    if count != 0 {
975        return Err(MetaError::permission_denied("schema is not empty"));
976    }
977
978    Ok(())
979}
980
981/// `list_user_info_by_ids` lists all users' info by their ids.
982pub async fn list_user_info_by_ids<C>(
983    user_ids: impl IntoIterator<Item = UserId>,
984    db: &C,
985) -> MetaResult<Vec<PbUserInfo>>
986where
987    C: ConnectionTrait,
988{
989    let mut user_infos = vec![];
990    for user_id in user_ids {
991        let user = User::find_by_id(user_id)
992            .one(db)
993            .await?
994            .ok_or_else(|| MetaError::catalog_id_not_found("user", user_id))?;
995        let mut user_info: PbUserInfo = user.into();
996        user_info.grant_privileges = get_user_privilege(user_id, db).await?;
997        user_infos.push(user_info);
998    }
999    Ok(user_infos)
1000}
1001
1002/// `get_object_owner` returns the owner of the given object.
1003pub async fn get_object_owner<C>(object_id: ObjectId, db: &C) -> MetaResult<UserId>
1004where
1005    C: ConnectionTrait,
1006{
1007    let obj_owner: UserId = Object::find_by_id(object_id)
1008        .select_only()
1009        .column(object::Column::OwnerId)
1010        .into_tuple()
1011        .one(db)
1012        .await?
1013        .ok_or_else(|| MetaError::catalog_id_not_found("object", object_id))?;
1014    Ok(obj_owner)
1015}
1016
1017/// `construct_privilege_dependency_query` constructs a query to find all privileges that are dependent on the given one.
1018///
1019/// # Examples
1020///
1021/// ```
1022/// use risingwave_meta::controller::utils::construct_privilege_dependency_query;
1023/// use sea_orm::sea_query::*;
1024/// use sea_orm::*;
1025///
1026/// let query = construct_privilege_dependency_query(vec![1, 2, 3]);
1027///
1028/// assert_eq!(
1029///    query.to_string(MysqlQueryBuilder),
1030///   r#"WITH RECURSIVE `granted_privilege_ids` (`id`, `user_id`) AS (SELECT `id`, `user_id` FROM `user_privilege` WHERE `user_privilege`.`id` IN (1, 2, 3) UNION ALL (SELECT `user_privilege`.`id`, `user_privilege`.`user_id` FROM `user_privilege` INNER JOIN `granted_privilege_ids` ON `granted_privilege_ids`.`id` = `dependent_id`)) SELECT `id`, `user_id` FROM `granted_privilege_ids`"#
1031/// );
1032/// assert_eq!(
1033///   query.to_string(PostgresQueryBuilder),
1034///  r#"WITH RECURSIVE "granted_privilege_ids" ("id", "user_id") AS (SELECT "id", "user_id" FROM "user_privilege" WHERE "user_privilege"."id" IN (1, 2, 3) UNION ALL (SELECT "user_privilege"."id", "user_privilege"."user_id" FROM "user_privilege" INNER JOIN "granted_privilege_ids" ON "granted_privilege_ids"."id" = "dependent_id")) SELECT "id", "user_id" FROM "granted_privilege_ids""#
1035/// );
1036/// assert_eq!(
1037///  query.to_string(SqliteQueryBuilder),
1038///  r#"WITH RECURSIVE "granted_privilege_ids" ("id", "user_id") AS (SELECT "id", "user_id" FROM "user_privilege" WHERE "user_privilege"."id" IN (1, 2, 3) UNION ALL SELECT "user_privilege"."id", "user_privilege"."user_id" FROM "user_privilege" INNER JOIN "granted_privilege_ids" ON "granted_privilege_ids"."id" = "dependent_id") SELECT "id", "user_id" FROM "granted_privilege_ids""#
1039/// );
1040/// ```
1041pub fn construct_privilege_dependency_query(ids: Vec<PrivilegeId>) -> WithQuery {
1042    let cte_alias = Alias::new("granted_privilege_ids");
1043    let cte_return_privilege_alias = Alias::new("id");
1044    let cte_return_user_alias = Alias::new("user_id");
1045
1046    let mut base_query = SelectStatement::new()
1047        .columns([user_privilege::Column::Id, user_privilege::Column::UserId])
1048        .from(UserPrivilege)
1049        .and_where(user_privilege::Column::Id.is_in(ids))
1050        .to_owned();
1051
1052    let cte_referencing = Query::select()
1053        .columns([
1054            (UserPrivilege, user_privilege::Column::Id),
1055            (UserPrivilege, user_privilege::Column::UserId),
1056        ])
1057        .from(UserPrivilege)
1058        .inner_join(
1059            cte_alias.clone(),
1060            Expr::col((cte_alias.clone(), cte_return_privilege_alias.clone()))
1061                .equals(user_privilege::Column::DependentId),
1062        )
1063        .to_owned();
1064
1065    let mut common_table_expr = CommonTableExpression::new();
1066    common_table_expr
1067        .query(base_query.union(UnionType::All, cte_referencing).to_owned())
1068        .columns([
1069            cte_return_privilege_alias.clone(),
1070            cte_return_user_alias.clone(),
1071        ])
1072        .table_name(cte_alias.clone());
1073
1074    SelectStatement::new()
1075        .columns([cte_return_privilege_alias, cte_return_user_alias])
1076        .from(cte_alias)
1077        .to_owned()
1078        .with(
1079            WithClause::new()
1080                .recursive(true)
1081                .cte(common_table_expr)
1082                .to_owned(),
1083        )
1084}
1085
1086pub async fn get_internal_tables_by_id<C>(job_id: JobId, db: &C) -> MetaResult<Vec<TableId>>
1087where
1088    C: ConnectionTrait,
1089{
1090    let table_ids: Vec<TableId> = Table::find()
1091        .select_only()
1092        .column(table::Column::TableId)
1093        .filter(
1094            table::Column::TableType
1095                .eq(TableType::Internal)
1096                .and(table::Column::BelongsToJobId.eq(job_id)),
1097        )
1098        .into_tuple()
1099        .all(db)
1100        .await?;
1101    Ok(table_ids)
1102}
1103
1104pub async fn get_index_state_tables_by_table_id<C>(
1105    table_id: TableId,
1106    db: &C,
1107) -> MetaResult<Vec<TableId>>
1108where
1109    C: ConnectionTrait,
1110{
1111    let mut index_table_ids: Vec<TableId> = Index::find()
1112        .select_only()
1113        .column(index::Column::IndexTableId)
1114        .filter(index::Column::PrimaryTableId.eq(table_id))
1115        .into_tuple()
1116        .all(db)
1117        .await?;
1118
1119    if !index_table_ids.is_empty() {
1120        let internal_table_ids: Vec<TableId> = Table::find()
1121            .select_only()
1122            .column(table::Column::TableId)
1123            .filter(
1124                table::Column::TableType
1125                    .eq(TableType::Internal)
1126                    .and(table::Column::BelongsToJobId.is_in(index_table_ids.clone())),
1127            )
1128            .into_tuple()
1129            .all(db)
1130            .await?;
1131
1132        index_table_ids.extend(internal_table_ids.into_iter());
1133    }
1134
1135    Ok(index_table_ids)
1136}
1137
1138/// `get_iceberg_related_object_ids` returns the related object ids of the iceberg source, sink and internal tables.
1139pub async fn get_iceberg_related_object_ids<C>(
1140    object_id: ObjectId,
1141    db: &C,
1142) -> MetaResult<Vec<ObjectId>>
1143where
1144    C: ConnectionTrait,
1145{
1146    let object = Object::find_by_id(object_id)
1147        .one(db)
1148        .await?
1149        .ok_or_else(|| MetaError::catalog_id_not_found("object", object_id))?;
1150    if object.obj_type != ObjectType::Table {
1151        return Ok(vec![]);
1152    }
1153
1154    let table = Table::find_by_id(object_id.as_table_id())
1155        .one(db)
1156        .await?
1157        .ok_or_else(|| MetaError::catalog_id_not_found("table", object_id))?;
1158    if !matches!(table.engine, Some(table::Engine::Iceberg)) {
1159        return Ok(vec![]);
1160    }
1161
1162    let database_id = object.database_id.unwrap();
1163    let schema_id = object.schema_id.unwrap();
1164
1165    let mut related_objects = vec![];
1166
1167    let iceberg_sink_name = format!("{}{}", ICEBERG_SINK_PREFIX, table.name);
1168    let iceberg_sink_id = Sink::find()
1169        .inner_join(Object)
1170        .select_only()
1171        .column(sink::Column::SinkId)
1172        .filter(
1173            object::Column::DatabaseId
1174                .eq(database_id)
1175                .and(object::Column::SchemaId.eq(schema_id))
1176                .and(sink::Column::Name.eq(&iceberg_sink_name)),
1177        )
1178        .into_tuple::<SinkId>()
1179        .one(db)
1180        .await?;
1181    if let Some(sink_id) = iceberg_sink_id {
1182        related_objects.push(sink_id.as_object_id());
1183        let sink_internal_tables = get_internal_tables_by_id(sink_id.as_job_id(), db).await?;
1184        related_objects.extend(
1185            sink_internal_tables
1186                .into_iter()
1187                .map(|tid| tid.as_object_id()),
1188        );
1189    } else {
1190        warn!(
1191            "iceberg table {} missing sink {}",
1192            table.name, iceberg_sink_name
1193        );
1194    }
1195
1196    let iceberg_source_name = format!("{}{}", ICEBERG_SOURCE_PREFIX, table.name);
1197    let iceberg_source_id = Source::find()
1198        .inner_join(Object)
1199        .select_only()
1200        .column(source::Column::SourceId)
1201        .filter(
1202            object::Column::DatabaseId
1203                .eq(database_id)
1204                .and(object::Column::SchemaId.eq(schema_id))
1205                .and(source::Column::Name.eq(&iceberg_source_name)),
1206        )
1207        .into_tuple::<SourceId>()
1208        .one(db)
1209        .await?;
1210    if let Some(source_id) = iceberg_source_id {
1211        related_objects.push(source_id.as_object_id());
1212    } else {
1213        warn!(
1214            "iceberg table {} missing source {}",
1215            table.name, iceberg_source_name
1216        );
1217    }
1218
1219    Ok(related_objects)
1220}
1221
1222/// Load streaming jobs by job ids.
1223pub(crate) async fn load_streaming_jobs_by_ids<C>(
1224    txn: &C,
1225    job_ids: impl IntoIterator<Item = JobId>,
1226) -> MetaResult<HashMap<JobId, streaming_job::Model>>
1227where
1228    C: ConnectionTrait,
1229{
1230    let job_ids: HashSet<JobId> = job_ids.into_iter().collect();
1231    if job_ids.is_empty() {
1232        return Ok(HashMap::new());
1233    }
1234    let jobs = streaming_job::Entity::find()
1235        .filter(streaming_job::Column::JobId.is_in(job_ids.clone()))
1236        .all(txn)
1237        .await?;
1238    Ok(jobs.into_iter().map(|job| (job.job_id, job)).collect())
1239}
1240
1241#[derive(Clone, DerivePartialModel, FromQueryResult)]
1242#[sea_orm(entity = "UserPrivilege")]
1243pub struct PartialUserPrivilege {
1244    pub id: PrivilegeId,
1245    pub user_id: UserId,
1246}
1247
1248pub async fn get_referring_privileges_cascade<C>(
1249    ids: Vec<PrivilegeId>,
1250    db: &C,
1251) -> MetaResult<Vec<PartialUserPrivilege>>
1252where
1253    C: ConnectionTrait,
1254{
1255    let query = construct_privilege_dependency_query(ids);
1256    let (sql, values) = query.build_any(&*db.get_database_backend().get_query_builder());
1257    let privileges = PartialUserPrivilege::find_by_statement(Statement::from_sql_and_values(
1258        db.get_database_backend(),
1259        sql,
1260        values,
1261    ))
1262    .all(db)
1263    .await?;
1264
1265    Ok(privileges)
1266}
1267
1268/// `ensure_privileges_not_referred` ensures that the privileges are not granted to any other users.
1269pub async fn ensure_privileges_not_referred<C>(ids: Vec<PrivilegeId>, db: &C) -> MetaResult<()>
1270where
1271    C: ConnectionTrait,
1272{
1273    let count = UserPrivilege::find()
1274        .filter(user_privilege::Column::DependentId.is_in(ids))
1275        .count(db)
1276        .await?;
1277    if count != 0 {
1278        return Err(MetaError::permission_denied(format!(
1279            "privileges granted to {} other ones.",
1280            count
1281        )));
1282    }
1283    Ok(())
1284}
1285
1286/// `get_user_privilege` returns the privileges of the given user.
1287pub async fn get_user_privilege<C>(user_id: UserId, db: &C) -> MetaResult<Vec<PbGrantPrivilege>>
1288where
1289    C: ConnectionTrait,
1290{
1291    let user_privileges = UserPrivilege::find()
1292        .find_also_related(Object)
1293        .filter(user_privilege::Column::UserId.eq(user_id))
1294        .all(db)
1295        .await?;
1296    Ok(user_privileges
1297        .into_iter()
1298        .map(|(privilege, object)| {
1299            let object = object.unwrap();
1300            let obj = match object.obj_type {
1301                ObjectType::Database => PbGrantObject::DatabaseId(object.oid.as_database_id()),
1302                ObjectType::Schema => PbGrantObject::SchemaId(object.oid.as_schema_id()),
1303                ObjectType::Table | ObjectType::Index => {
1304                    PbGrantObject::TableId(object.oid.as_table_id())
1305                }
1306                ObjectType::Source => PbGrantObject::SourceId(object.oid.as_source_id()),
1307                ObjectType::Sink => PbGrantObject::SinkId(object.oid.as_sink_id()),
1308                ObjectType::View => PbGrantObject::ViewId(object.oid.as_view_id()),
1309                ObjectType::Function => PbGrantObject::FunctionId(object.oid.as_function_id()),
1310                ObjectType::Connection => {
1311                    PbGrantObject::ConnectionId(object.oid.as_connection_id())
1312                }
1313                ObjectType::Subscription => {
1314                    PbGrantObject::SubscriptionId(object.oid.as_subscription_id())
1315                }
1316                ObjectType::Secret => PbGrantObject::SecretId(object.oid.as_secret_id()),
1317            };
1318            PbGrantPrivilege {
1319                action_with_opts: vec![PbActionWithGrantOption {
1320                    action: PbAction::from(privilege.action) as _,
1321                    with_grant_option: privilege.with_grant_option,
1322                    granted_by: privilege.granted_by as _,
1323                }],
1324                object: Some(obj),
1325            }
1326        })
1327        .collect())
1328}
1329
1330pub async fn get_table_columns(
1331    txn: &impl ConnectionTrait,
1332    id: TableId,
1333) -> MetaResult<ColumnCatalogArray> {
1334    let columns = Table::find_by_id(id)
1335        .select_only()
1336        .columns([table::Column::Columns])
1337        .into_tuple::<ColumnCatalogArray>()
1338        .one(txn)
1339        .await?
1340        .ok_or_else(|| MetaError::catalog_id_not_found("table", id))?;
1341    Ok(columns)
1342}
1343
1344/// `grant_default_privileges_automatically` grants default privileges automatically
1345/// for the given new object. It returns the list of user infos whose privileges are updated.
1346pub async fn grant_default_privileges_automatically<C>(
1347    db: &C,
1348    object_id: impl Into<ObjectId>,
1349) -> MetaResult<Vec<PbUserInfo>>
1350where
1351    C: ConnectionTrait,
1352{
1353    let object_id = object_id.into();
1354    let object = Object::find_by_id(object_id)
1355        .one(db)
1356        .await?
1357        .ok_or_else(|| MetaError::catalog_id_not_found("object", object_id))?;
1358    assert_ne!(object.obj_type, ObjectType::Database);
1359
1360    let for_mview_filter = if object.obj_type == ObjectType::Table {
1361        let table_type = Table::find_by_id(object_id.as_table_id())
1362            .select_only()
1363            .column(table::Column::TableType)
1364            .into_tuple::<TableType>()
1365            .one(db)
1366            .await?
1367            .ok_or_else(|| MetaError::catalog_id_not_found("table", object_id))?;
1368        user_default_privilege::Column::ForMaterializedView
1369            .eq(table_type == TableType::MaterializedView)
1370    } else {
1371        user_default_privilege::Column::ForMaterializedView.eq(false)
1372    };
1373    let schema_filter = if let Some(schema_id) = &object.schema_id {
1374        user_default_privilege::Column::SchemaId.eq(*schema_id)
1375    } else {
1376        user_default_privilege::Column::SchemaId.is_null()
1377    };
1378
1379    let default_privileges: Vec<(UserId, UserId, Action, bool)> = UserDefaultPrivilege::find()
1380        .select_only()
1381        .columns([
1382            user_default_privilege::Column::Grantee,
1383            user_default_privilege::Column::GrantedBy,
1384            user_default_privilege::Column::Action,
1385            user_default_privilege::Column::WithGrantOption,
1386        ])
1387        .filter(
1388            user_default_privilege::Column::DatabaseId
1389                .eq(object.database_id.unwrap())
1390                .and(schema_filter)
1391                .and(user_default_privilege::Column::UserId.eq(object.owner_id))
1392                .and(user_default_privilege::Column::ObjectType.eq(object.obj_type))
1393                .and(for_mview_filter),
1394        )
1395        .into_tuple()
1396        .all(db)
1397        .await?;
1398    if default_privileges.is_empty() {
1399        return Ok(vec![]);
1400    }
1401
1402    let updated_user_ids = default_privileges
1403        .iter()
1404        .map(|(grantee, _, _, _)| *grantee)
1405        .collect::<HashSet<_>>();
1406
1407    for (grantee, granted_by, action, with_grant_option) in default_privileges {
1408        UserPrivilege::insert(user_privilege::ActiveModel {
1409            user_id: Set(grantee),
1410            oid: Set(object_id),
1411            granted_by: Set(granted_by),
1412            action: Set(action),
1413            with_grant_option: Set(with_grant_option),
1414            ..Default::default()
1415        })
1416        .exec(db)
1417        .await?;
1418        if action == Action::Select {
1419            // Grant SELECT privilege for internal tables if the action is SELECT.
1420            let internal_table_ids = get_internal_tables_by_id(object_id.as_job_id(), db).await?;
1421            if !internal_table_ids.is_empty() {
1422                for internal_table_id in &internal_table_ids {
1423                    UserPrivilege::insert(user_privilege::ActiveModel {
1424                        user_id: Set(grantee),
1425                        oid: Set(internal_table_id.as_object_id()),
1426                        granted_by: Set(granted_by),
1427                        action: Set(Action::Select),
1428                        with_grant_option: Set(with_grant_option),
1429                        ..Default::default()
1430                    })
1431                    .exec(db)
1432                    .await?;
1433                }
1434            }
1435
1436            // Additionally, grant SELECT privilege for iceberg related objects if the action is SELECT.
1437            let iceberg_privilege_object_ids =
1438                get_iceberg_related_object_ids(object_id, db).await?;
1439            if !iceberg_privilege_object_ids.is_empty() {
1440                for iceberg_object_id in &iceberg_privilege_object_ids {
1441                    UserPrivilege::insert(user_privilege::ActiveModel {
1442                        user_id: Set(grantee),
1443                        oid: Set(*iceberg_object_id),
1444                        granted_by: Set(granted_by),
1445                        action: Set(action),
1446                        with_grant_option: Set(with_grant_option),
1447                        ..Default::default()
1448                    })
1449                    .exec(db)
1450                    .await?;
1451                }
1452            }
1453        }
1454    }
1455
1456    let updated_user_infos = list_user_info_by_ids(updated_user_ids, db).await?;
1457    Ok(updated_user_infos)
1458}
1459
1460// todo: remove it after migrated to sql backend.
1461pub fn extract_grant_obj_id(object: &PbGrantObject) -> ObjectId {
1462    match object {
1463        PbGrantObject::DatabaseId(id) => (*id).into(),
1464        PbGrantObject::SchemaId(id) => (*id).into(),
1465        PbGrantObject::TableId(id) => (*id).into(),
1466        PbGrantObject::SourceId(id) => (*id).into(),
1467        PbGrantObject::SinkId(id) => (*id).into(),
1468        PbGrantObject::ViewId(id) => (*id).into(),
1469        PbGrantObject::FunctionId(id) => (*id).into(),
1470        PbGrantObject::SubscriptionId(id) => (*id).into(),
1471        PbGrantObject::ConnectionId(id) => (*id).into(),
1472        PbGrantObject::SecretId(id) => (*id).into(),
1473    }
1474}
1475
1476pub async fn insert_fragment_relations(
1477    db: &impl ConnectionTrait,
1478    downstream_fragment_relations: &FragmentDownstreamRelation,
1479) -> MetaResult<()> {
1480    let mut relations = vec![];
1481    for (upstream_fragment_id, downstreams) in downstream_fragment_relations {
1482        for downstream in downstreams {
1483            relations.push(
1484                fragment_relation::Model {
1485                    source_fragment_id: *upstream_fragment_id as _,
1486                    target_fragment_id: downstream.downstream_fragment_id as _,
1487                    dispatcher_type: downstream.dispatcher_type,
1488                    dist_key_indices: downstream
1489                        .dist_key_indices
1490                        .iter()
1491                        .map(|idx| *idx as i32)
1492                        .collect_vec()
1493                        .into(),
1494                    output_indices: downstream
1495                        .output_mapping
1496                        .indices
1497                        .iter()
1498                        .map(|idx| *idx as i32)
1499                        .collect_vec()
1500                        .into(),
1501                    output_type_mapping: Some(downstream.output_mapping.types.clone().into()),
1502                }
1503                .into_active_model(),
1504            );
1505        }
1506    }
1507    if !relations.is_empty() {
1508        FragmentRelation::insert_many(relations).exec(db).await?;
1509    }
1510    Ok(())
1511}
1512
1513pub fn compose_dispatchers(
1514    source_fragment_distribution: DistributionType,
1515    source_fragment_actors: &HashMap<crate::model::ActorId, Option<Bitmap>>,
1516    target_fragment_id: crate::model::FragmentId,
1517    target_fragment_distribution: DistributionType,
1518    target_fragment_actors: &HashMap<crate::model::ActorId, Option<Bitmap>>,
1519    dispatcher_type: DispatcherType,
1520    dist_key_indices: Vec<u32>,
1521    output_mapping: PbDispatchOutputMapping,
1522) -> HashMap<crate::model::ActorId, PbDispatcher> {
1523    match dispatcher_type {
1524        DispatcherType::Hash => {
1525            let dispatcher = PbDispatcher {
1526                r#type: PbDispatcherType::from(dispatcher_type) as _,
1527                dist_key_indices,
1528                output_mapping: output_mapping.into(),
1529                hash_mapping: Some(
1530                    ActorMapping::from_bitmaps(
1531                        &target_fragment_actors
1532                            .iter()
1533                            .map(|(actor_id, bitmap)| {
1534                                (
1535                                    *actor_id as _,
1536                                    bitmap
1537                                        .clone()
1538                                        .expect("downstream hash dispatch must have distribution"),
1539                                )
1540                            })
1541                            .collect(),
1542                    )
1543                    .to_protobuf(),
1544                ),
1545                dispatcher_id: target_fragment_id,
1546                downstream_actor_id: target_fragment_actors.keys().copied().collect(),
1547            };
1548            source_fragment_actors
1549                .keys()
1550                .map(|source_actor_id| (*source_actor_id, dispatcher.clone()))
1551                .collect()
1552        }
1553        DispatcherType::Broadcast | DispatcherType::Simple => {
1554            let dispatcher = PbDispatcher {
1555                r#type: PbDispatcherType::from(dispatcher_type) as _,
1556                dist_key_indices,
1557                output_mapping: output_mapping.into(),
1558                hash_mapping: None,
1559                dispatcher_id: target_fragment_id,
1560                downstream_actor_id: target_fragment_actors.keys().copied().collect(),
1561            };
1562            source_fragment_actors
1563                .keys()
1564                .map(|source_actor_id| (*source_actor_id, dispatcher.clone()))
1565                .collect()
1566        }
1567        DispatcherType::NoShuffle => resolve_no_shuffle_actor_dispatcher(
1568            source_fragment_distribution,
1569            source_fragment_actors,
1570            target_fragment_distribution,
1571            target_fragment_actors,
1572        )
1573        .into_iter()
1574        .map(|(upstream_actor_id, downstream_actor_id)| {
1575            (
1576                upstream_actor_id,
1577                PbDispatcher {
1578                    r#type: PbDispatcherType::NoShuffle as _,
1579                    dist_key_indices: dist_key_indices.clone(),
1580                    output_mapping: output_mapping.clone().into(),
1581                    hash_mapping: None,
1582                    dispatcher_id: target_fragment_id,
1583                    downstream_actor_id: vec![downstream_actor_id],
1584                },
1585            )
1586        })
1587        .collect(),
1588    }
1589}
1590
1591/// return (`upstream_actor_id` -> `downstream_actor_id`)
1592pub fn resolve_no_shuffle_actor_dispatcher(
1593    source_fragment_distribution: DistributionType,
1594    source_fragment_actors: &HashMap<crate::model::ActorId, Option<Bitmap>>,
1595    target_fragment_distribution: DistributionType,
1596    target_fragment_actors: &HashMap<crate::model::ActorId, Option<Bitmap>>,
1597) -> Vec<(crate::model::ActorId, crate::model::ActorId)> {
1598    assert_eq!(source_fragment_distribution, target_fragment_distribution);
1599    assert_eq!(
1600        source_fragment_actors.len(),
1601        target_fragment_actors.len(),
1602        "no-shuffle should have equal upstream downstream actor count: {:?} {:?}",
1603        source_fragment_actors,
1604        target_fragment_actors
1605    );
1606    match source_fragment_distribution {
1607        DistributionType::Single => {
1608            let assert_singleton = |bitmap: &Option<Bitmap>| {
1609                assert!(
1610                    bitmap.as_ref().map(|bitmap| bitmap.all()).unwrap_or(true),
1611                    "not singleton: {:?}",
1612                    bitmap
1613                );
1614            };
1615            assert_eq!(
1616                source_fragment_actors.len(),
1617                1,
1618                "singleton distribution actor count not 1: {:?}",
1619                source_fragment_distribution
1620            );
1621            assert_eq!(
1622                target_fragment_actors.len(),
1623                1,
1624                "singleton distribution actor count not 1: {:?}",
1625                target_fragment_distribution
1626            );
1627            let (source_actor_id, bitmap) = source_fragment_actors.iter().next().unwrap();
1628            assert_singleton(bitmap);
1629            let (target_actor_id, bitmap) = target_fragment_actors.iter().next().unwrap();
1630            assert_singleton(bitmap);
1631            vec![(*source_actor_id, *target_actor_id)]
1632        }
1633        DistributionType::Hash => {
1634            let mut target_fragment_actor_index: HashMap<_, _> = target_fragment_actors
1635                .iter()
1636                .map(|(actor_id, bitmap)| {
1637                    let bitmap = bitmap
1638                        .as_ref()
1639                        .expect("hash distribution should have bitmap");
1640                    let first_vnode = bitmap.iter_vnodes().next().expect("non-empty bitmap");
1641                    (first_vnode, (*actor_id, bitmap))
1642                })
1643                .collect();
1644            source_fragment_actors
1645                .iter()
1646                .map(|(source_actor_id, bitmap)| {
1647                    let bitmap = bitmap
1648                        .as_ref()
1649                        .expect("hash distribution should have bitmap");
1650                    let first_vnode = bitmap.iter_vnodes().next().expect("non-empty bitmap");
1651                    let (target_actor_id, target_bitmap) =
1652                        target_fragment_actor_index.remove(&first_vnode).unwrap_or_else(|| {
1653                            panic!(
1654                                "cannot find matched target actor: {} {:?} {:?} {:?}",
1655                                source_actor_id,
1656                                first_vnode,
1657                                source_fragment_actors,
1658                                target_fragment_actors
1659                            );
1660                        });
1661                    assert_eq!(
1662                        bitmap,
1663                        target_bitmap,
1664                        "cannot find matched target actor due to bitmap mismatch: {} {:?} {:?} {:?}",
1665                        source_actor_id,
1666                        first_vnode,
1667                        source_fragment_actors,
1668                        target_fragment_actors
1669                    );
1670                    (*source_actor_id, target_actor_id)
1671                }).collect()
1672        }
1673    }
1674}
1675
1676pub fn rebuild_fragment_mapping(fragment: &SharedFragmentInfo) -> PbFragmentWorkerSlotMapping {
1677    let fragment_worker_slot_mapping = match fragment.distribution_type {
1678        DistributionType::Single => {
1679            let actor = fragment.actors.values().exactly_one().unwrap();
1680            WorkerSlotMapping::new_single(WorkerSlotId::new(actor.worker_id as _, 0))
1681        }
1682        DistributionType::Hash => {
1683            let actor_bitmaps: HashMap<_, _> = fragment
1684                .actors
1685                .iter()
1686                .map(|(actor_id, actor_info)| {
1687                    let vnode_bitmap = actor_info
1688                        .vnode_bitmap
1689                        .as_ref()
1690                        .cloned()
1691                        .expect("actor bitmap shouldn't be none in hash fragment");
1692
1693                    (*actor_id as hash::ActorId, vnode_bitmap)
1694                })
1695                .collect();
1696
1697            let actor_mapping = ActorMapping::from_bitmaps(&actor_bitmaps);
1698
1699            let actor_locations = fragment
1700                .actors
1701                .iter()
1702                .map(|(actor_id, actor_info)| (*actor_id as hash::ActorId, actor_info.worker_id))
1703                .collect();
1704
1705            actor_mapping.to_worker_slot(&actor_locations)
1706        }
1707    };
1708
1709    PbFragmentWorkerSlotMapping {
1710        fragment_id: fragment.fragment_id,
1711        mapping: Some(fragment_worker_slot_mapping.to_protobuf()),
1712    }
1713}
1714
1715/// For the given streaming jobs, returns
1716/// - All source fragments
1717/// - All sink fragments
1718/// - All actors
1719/// - All fragments
1720pub async fn get_fragments_for_jobs<C>(
1721    db: &C,
1722    actor_info: &SharedActorInfos,
1723    streaming_jobs: Vec<JobId>,
1724) -> MetaResult<(
1725    HashMap<SourceId, BTreeSet<FragmentId>>,
1726    HashSet<FragmentId>,
1727    HashSet<ActorId>,
1728    HashSet<FragmentId>,
1729)>
1730where
1731    C: ConnectionTrait,
1732{
1733    if streaming_jobs.is_empty() {
1734        return Ok((
1735            HashMap::default(),
1736            HashSet::default(),
1737            HashSet::default(),
1738            HashSet::default(),
1739        ));
1740    }
1741
1742    let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find()
1743        .select_only()
1744        .columns([
1745            fragment::Column::FragmentId,
1746            fragment::Column::FragmentTypeMask,
1747            fragment::Column::StreamNode,
1748        ])
1749        .filter(fragment::Column::JobId.is_in(streaming_jobs))
1750        .into_tuple()
1751        .all(db)
1752        .await?;
1753
1754    let fragment_ids: HashSet<_> = fragments
1755        .iter()
1756        .map(|(fragment_id, _, _)| *fragment_id)
1757        .collect();
1758
1759    let actors = {
1760        let guard = actor_info.read_guard();
1761        fragment_ids
1762            .iter()
1763            .flat_map(|id| guard.get_fragment(*id as _))
1764            .flat_map(|f| f.actors.keys().cloned().map(|id| id as _))
1765            .collect::<HashSet<_>>()
1766    };
1767
1768    let mut source_fragment_ids: HashMap<SourceId, BTreeSet<FragmentId>> = HashMap::new();
1769    let mut sink_fragment_ids: HashSet<FragmentId> = HashSet::new();
1770    for (fragment_id, mask, stream_node) in fragments {
1771        if FragmentTypeMask::from(mask).contains(FragmentTypeFlag::Source)
1772            && let Some(source_id) = stream_node.to_protobuf().find_stream_source()
1773        {
1774            source_fragment_ids
1775                .entry(source_id)
1776                .or_default()
1777                .insert(fragment_id);
1778        }
1779        if FragmentTypeMask::from(mask).contains(FragmentTypeFlag::Sink) {
1780            sink_fragment_ids.insert(fragment_id);
1781        }
1782    }
1783
1784    Ok((
1785        source_fragment_ids,
1786        sink_fragment_ids,
1787        actors.into_iter().collect(),
1788        fragment_ids,
1789    ))
1790}
1791
1792/// Build a object group for notifying the deletion of the given objects.
1793///
1794/// Note that only id fields are filled in the object info, as the arguments are partial objects.
1795/// As a result, the returned notification info should only be used for deletion.
1796pub(crate) fn build_object_group_for_delete(
1797    partial_objects: Vec<PartialObject>,
1798) -> NotificationInfo {
1799    let mut objects = vec![];
1800    for obj in partial_objects {
1801        match obj.obj_type {
1802            ObjectType::Database => objects.push(PbObject {
1803                object_info: Some(PbObjectInfo::Database(PbDatabase {
1804                    id: obj.oid.as_database_id(),
1805                    ..Default::default()
1806                })),
1807            }),
1808            ObjectType::Schema => objects.push(PbObject {
1809                object_info: Some(PbObjectInfo::Schema(PbSchema {
1810                    id: obj.oid.as_schema_id(),
1811                    database_id: obj.database_id.unwrap(),
1812                    ..Default::default()
1813                })),
1814            }),
1815            ObjectType::Table => objects.push(PbObject {
1816                object_info: Some(PbObjectInfo::Table(PbTable {
1817                    id: obj.oid.as_table_id(),
1818                    schema_id: obj.schema_id.unwrap(),
1819                    database_id: obj.database_id.unwrap(),
1820                    ..Default::default()
1821                })),
1822            }),
1823            ObjectType::Source => objects.push(PbObject {
1824                object_info: Some(PbObjectInfo::Source(PbSource {
1825                    id: obj.oid.as_source_id(),
1826                    schema_id: obj.schema_id.unwrap(),
1827                    database_id: obj.database_id.unwrap(),
1828                    ..Default::default()
1829                })),
1830            }),
1831            ObjectType::Sink => objects.push(PbObject {
1832                object_info: Some(PbObjectInfo::Sink(PbSink {
1833                    id: obj.oid.as_sink_id(),
1834                    schema_id: obj.schema_id.unwrap(),
1835                    database_id: obj.database_id.unwrap(),
1836                    ..Default::default()
1837                })),
1838            }),
1839            ObjectType::Subscription => objects.push(PbObject {
1840                object_info: Some(PbObjectInfo::Subscription(PbSubscription {
1841                    id: obj.oid.as_subscription_id(),
1842                    schema_id: obj.schema_id.unwrap(),
1843                    database_id: obj.database_id.unwrap(),
1844                    ..Default::default()
1845                })),
1846            }),
1847            ObjectType::View => objects.push(PbObject {
1848                object_info: Some(PbObjectInfo::View(PbView {
1849                    id: obj.oid.as_view_id(),
1850                    schema_id: obj.schema_id.unwrap(),
1851                    database_id: obj.database_id.unwrap(),
1852                    ..Default::default()
1853                })),
1854            }),
1855            ObjectType::Index => {
1856                objects.push(PbObject {
1857                    object_info: Some(PbObjectInfo::Index(PbIndex {
1858                        id: obj.oid.as_index_id(),
1859                        schema_id: obj.schema_id.unwrap(),
1860                        database_id: obj.database_id.unwrap(),
1861                        ..Default::default()
1862                    })),
1863                });
1864                objects.push(PbObject {
1865                    object_info: Some(PbObjectInfo::Table(PbTable {
1866                        id: obj.oid.as_table_id(),
1867                        schema_id: obj.schema_id.unwrap(),
1868                        database_id: obj.database_id.unwrap(),
1869                        ..Default::default()
1870                    })),
1871                });
1872            }
1873            ObjectType::Function => objects.push(PbObject {
1874                object_info: Some(PbObjectInfo::Function(PbFunction {
1875                    id: obj.oid.as_function_id(),
1876                    schema_id: obj.schema_id.unwrap(),
1877                    database_id: obj.database_id.unwrap(),
1878                    ..Default::default()
1879                })),
1880            }),
1881            ObjectType::Connection => objects.push(PbObject {
1882                object_info: Some(PbObjectInfo::Connection(PbConnection {
1883                    id: obj.oid.as_connection_id(),
1884                    schema_id: obj.schema_id.unwrap(),
1885                    database_id: obj.database_id.unwrap(),
1886                    ..Default::default()
1887                })),
1888            }),
1889            ObjectType::Secret => objects.push(PbObject {
1890                object_info: Some(PbObjectInfo::Secret(PbSecret {
1891                    id: obj.oid.as_secret_id(),
1892                    schema_id: obj.schema_id.unwrap(),
1893                    database_id: obj.database_id.unwrap(),
1894                    ..Default::default()
1895                })),
1896            }),
1897        }
1898    }
1899    NotificationInfo::ObjectGroup(PbObjectGroup {
1900        objects,
1901        dependencies: vec![],
1902    })
1903}
1904
1905pub fn extract_external_table_name_from_definition(table_definition: &str) -> Option<String> {
1906    let [mut definition]: [_; 1] = Parser::parse_sql(table_definition)
1907        .context("unable to parse table definition")
1908        .inspect_err(|e| {
1909            tracing::error!(
1910                target: "auto_schema_change",
1911                error = %e.as_report(),
1912                "failed to parse table definition")
1913        })
1914        .unwrap()
1915        .try_into()
1916        .unwrap();
1917    if let SqlStatement::CreateTable { cdc_table_info, .. } = &mut definition {
1918        cdc_table_info
1919            .clone()
1920            .map(|cdc_table_info| cdc_table_info.external_table_name)
1921    } else {
1922        None
1923    }
1924}
1925
1926/// `rename_relation` renames the target relation and its definition,
1927/// it commits the changes to the transaction and returns the updated relations and the old name.
1928pub async fn rename_relation(
1929    txn: &DatabaseTransaction,
1930    object_type: ObjectType,
1931    object_id: ObjectId,
1932    object_name: &str,
1933) -> MetaResult<(Vec<PbObject>, String)> {
1934    use sea_orm::ActiveModelTrait;
1935
1936    use crate::controller::rename::alter_relation_rename;
1937
1938    let mut to_update_relations = vec![];
1939    // rename relation.
1940    macro_rules! rename_relation {
1941        ($entity:ident, $table:ident, $identity:ident, $object_id:expr) => {{
1942            let (mut relation, obj) = $entity::find_by_id($object_id)
1943                .find_also_related(Object)
1944                .one(txn)
1945                .await?
1946                .unwrap();
1947            let obj = obj.unwrap();
1948            let old_name = relation.name.clone();
1949            relation.name = object_name.into();
1950            if obj.obj_type != ObjectType::View {
1951                relation.definition = alter_relation_rename(&relation.definition, object_name);
1952            }
1953            let active_model = $table::ActiveModel {
1954                $identity: Set(relation.$identity),
1955                name: Set(object_name.into()),
1956                definition: Set(relation.definition.clone()),
1957                ..Default::default()
1958            };
1959            active_model.update(txn).await?;
1960            let streaming_job = streaming_job::Entity::find_by_id($object_id.as_raw_id())
1961                .one(txn)
1962                .await?;
1963            to_update_relations.push(PbObject {
1964                object_info: Some(PbObjectInfo::$entity(
1965                    ObjectModel(relation, obj, streaming_job).into(),
1966                )),
1967            });
1968            old_name
1969        }};
1970    }
1971    // TODO: check is there any thing to change for shared source?
1972    let old_name = match object_type {
1973        ObjectType::Table => {
1974            let associated_source_id: Option<SourceId> = Source::find()
1975                .select_only()
1976                .column(source::Column::SourceId)
1977                .filter(source::Column::OptionalAssociatedTableId.eq(object_id))
1978                .into_tuple()
1979                .one(txn)
1980                .await?;
1981            if let Some(source_id) = associated_source_id {
1982                rename_relation!(Source, source, source_id, source_id);
1983            }
1984            rename_relation!(Table, table, table_id, object_id.as_table_id())
1985        }
1986        ObjectType::Source => {
1987            rename_relation!(Source, source, source_id, object_id.as_source_id())
1988        }
1989        ObjectType::Sink => rename_relation!(Sink, sink, sink_id, object_id.as_sink_id()),
1990        ObjectType::Subscription => {
1991            rename_relation!(
1992                Subscription,
1993                subscription,
1994                subscription_id,
1995                object_id.as_subscription_id()
1996            )
1997        }
1998        ObjectType::View => rename_relation!(View, view, view_id, object_id.as_view_id()),
1999        ObjectType::Index => {
2000            let (mut index, obj) = Index::find_by_id(object_id.as_index_id())
2001                .find_also_related(Object)
2002                .one(txn)
2003                .await?
2004                .unwrap();
2005            let streaming_job = streaming_job::Entity::find_by_id(index.index_id.as_job_id())
2006                .one(txn)
2007                .await?;
2008            index.name = object_name.into();
2009            let index_table_id = index.index_table_id;
2010            let old_name = rename_relation!(Table, table, table_id, index_table_id);
2011
2012            // the name of index and its associated table is the same.
2013            let active_model = index::ActiveModel {
2014                index_id: sea_orm::ActiveValue::Set(index.index_id),
2015                name: sea_orm::ActiveValue::Set(object_name.into()),
2016                ..Default::default()
2017            };
2018            active_model.update(txn).await?;
2019            to_update_relations.push(PbObject {
2020                object_info: Some(PbObjectInfo::Index(
2021                    ObjectModel(index, obj.unwrap(), streaming_job).into(),
2022                )),
2023            });
2024            old_name
2025        }
2026        _ => unreachable!("only relation name can be altered."),
2027    };
2028
2029    Ok((to_update_relations, old_name))
2030}
2031
2032pub async fn get_database_resource_group<C>(txn: &C, database_id: DatabaseId) -> MetaResult<String>
2033where
2034    C: ConnectionTrait,
2035{
2036    let database_resource_group: Option<String> = Database::find_by_id(database_id)
2037        .select_only()
2038        .column(database::Column::ResourceGroup)
2039        .into_tuple()
2040        .one(txn)
2041        .await?
2042        .ok_or_else(|| MetaError::catalog_id_not_found("database", database_id))?;
2043
2044    Ok(database_resource_group.unwrap_or_else(|| DEFAULT_RESOURCE_GROUP.to_owned()))
2045}
2046
2047pub async fn get_existing_job_resource_group<C>(
2048    txn: &C,
2049    streaming_job_id: JobId,
2050) -> MetaResult<String>
2051where
2052    C: ConnectionTrait,
2053{
2054    let (job_specific_resource_group, database_resource_group): (Option<String>, Option<String>) =
2055        StreamingJob::find_by_id(streaming_job_id)
2056            .select_only()
2057            .join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
2058            .join(JoinType::InnerJoin, object::Relation::Database2.def())
2059            .column(streaming_job::Column::SpecificResourceGroup)
2060            .column(database::Column::ResourceGroup)
2061            .into_tuple()
2062            .one(txn)
2063            .await?
2064            .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", streaming_job_id))?;
2065
2066    Ok(job_specific_resource_group.unwrap_or_else(|| {
2067        database_resource_group.unwrap_or_else(|| DEFAULT_RESOURCE_GROUP.to_owned())
2068    }))
2069}
2070
2071pub fn filter_workers_by_resource_group(
2072    workers: &HashMap<WorkerId, WorkerNode>,
2073    resource_group: &str,
2074) -> BTreeSet<WorkerId> {
2075    workers
2076        .iter()
2077        .filter(|&(_, worker)| {
2078            worker
2079                .resource_group()
2080                .map(|node_label| node_label.as_str() == resource_group)
2081                .unwrap_or(false)
2082        })
2083        .map(|(id, _)| *id)
2084        .collect()
2085}
2086
2087/// `rename_relation_refer` updates the definition of relations that refer to the target one,
2088/// it commits the changes to the transaction and returns all the updated relations.
2089pub async fn rename_relation_refer(
2090    txn: &DatabaseTransaction,
2091    object_type: ObjectType,
2092    object_id: ObjectId,
2093    object_name: &str,
2094    old_name: &str,
2095) -> MetaResult<Vec<PbObject>> {
2096    use sea_orm::ActiveModelTrait;
2097
2098    use crate::controller::rename::alter_relation_rename_refs;
2099
2100    let mut to_update_relations = vec![];
2101    macro_rules! rename_relation_ref {
2102        ($entity:ident, $table:ident, $identity:ident, $object_id:expr) => {{
2103            let (mut relation, obj) = $entity::find_by_id($object_id)
2104                .find_also_related(Object)
2105                .one(txn)
2106                .await?
2107                .unwrap();
2108            relation.definition =
2109                alter_relation_rename_refs(&relation.definition, old_name, object_name);
2110            let active_model = $table::ActiveModel {
2111                $identity: Set(relation.$identity),
2112                definition: Set(relation.definition.clone()),
2113                ..Default::default()
2114            };
2115            active_model.update(txn).await?;
2116            let streaming_job = streaming_job::Entity::find_by_id($object_id.as_raw_id())
2117                .one(txn)
2118                .await?;
2119            to_update_relations.push(PbObject {
2120                object_info: Some(PbObjectInfo::$entity(
2121                    ObjectModel(relation, obj.unwrap(), streaming_job).into(),
2122                )),
2123            });
2124        }};
2125    }
2126    let mut objs = get_referring_objects(object_id, txn).await?;
2127    if object_type == ObjectType::Table {
2128        let incoming_sinks: Vec<SinkId> = Sink::find()
2129            .select_only()
2130            .column(sink::Column::SinkId)
2131            .filter(sink::Column::TargetTable.eq(object_id))
2132            .into_tuple()
2133            .all(txn)
2134            .await?;
2135
2136        objs.extend(incoming_sinks.into_iter().map(|id| PartialObject {
2137            oid: id.as_object_id(),
2138            obj_type: ObjectType::Sink,
2139            schema_id: None,
2140            database_id: None,
2141        }));
2142    }
2143
2144    for obj in objs {
2145        match obj.obj_type {
2146            ObjectType::Table => {
2147                rename_relation_ref!(Table, table, table_id, obj.oid.as_table_id())
2148            }
2149            ObjectType::Sink => {
2150                rename_relation_ref!(Sink, sink, sink_id, obj.oid.as_sink_id())
2151            }
2152            ObjectType::Subscription => {
2153                rename_relation_ref!(
2154                    Subscription,
2155                    subscription,
2156                    subscription_id,
2157                    obj.oid.as_subscription_id()
2158                )
2159            }
2160            ObjectType::View => rename_relation_ref!(View, view, view_id, obj.oid.as_view_id()),
2161            ObjectType::Index => {
2162                let index_table_id: Option<TableId> = Index::find_by_id(obj.oid.as_index_id())
2163                    .select_only()
2164                    .column(index::Column::IndexTableId)
2165                    .into_tuple()
2166                    .one(txn)
2167                    .await?;
2168                rename_relation_ref!(Table, table, table_id, index_table_id.unwrap());
2169            }
2170            _ => {
2171                bail!(
2172                    "only the table, sink, subscription, view and index will depend on other objects."
2173                )
2174            }
2175        }
2176    }
2177
2178    Ok(to_update_relations)
2179}
2180
2181/// Validate that subscription can be safely deleted, meeting any of the following conditions:
2182/// 1. The upstream table is not referred to by any cross-db mv.
2183/// 2. After deleting the subscription, the upstream table still has at least one subscription.
2184pub async fn validate_subscription_deletion<C>(
2185    txn: &C,
2186    subscription_id: SubscriptionId,
2187) -> MetaResult<()>
2188where
2189    C: ConnectionTrait,
2190{
2191    let upstream_table_id: ObjectId = Subscription::find_by_id(subscription_id)
2192        .select_only()
2193        .column(subscription::Column::DependentTableId)
2194        .into_tuple()
2195        .one(txn)
2196        .await?
2197        .ok_or_else(|| MetaError::catalog_id_not_found("subscription", subscription_id))?;
2198
2199    let cnt = Subscription::find()
2200        .filter(subscription::Column::DependentTableId.eq(upstream_table_id))
2201        .count(txn)
2202        .await?;
2203    if cnt > 1 {
2204        // Ensure that at least one subscription is remained for the upstream table
2205        // once the subscription is dropped.
2206        return Ok(());
2207    }
2208
2209    // Ensure that the upstream table is not referred by any cross-db mv.
2210    let obj_alias = Alias::new("o1");
2211    let used_by_alias = Alias::new("o2");
2212    let count = ObjectDependency::find()
2213        .join_as(
2214            JoinType::InnerJoin,
2215            object_dependency::Relation::Object2.def(),
2216            obj_alias.clone(),
2217        )
2218        .join_as(
2219            JoinType::InnerJoin,
2220            object_dependency::Relation::Object1.def(),
2221            used_by_alias.clone(),
2222        )
2223        .filter(
2224            object_dependency::Column::Oid
2225                .eq(upstream_table_id)
2226                .and(object_dependency::Column::UsedBy.ne(subscription_id))
2227                .and(
2228                    Expr::col((obj_alias, object::Column::DatabaseId))
2229                        .ne(Expr::col((used_by_alias, object::Column::DatabaseId))),
2230                ),
2231        )
2232        .count(txn)
2233        .await?;
2234
2235    if count != 0 {
2236        return Err(MetaError::permission_denied(format!(
2237            "Referenced by {} cross-db objects.",
2238            count
2239        )));
2240    }
2241
2242    Ok(())
2243}
2244
2245pub async fn fetch_target_fragments<C>(
2246    txn: &C,
2247    src_fragment_id: impl IntoIterator<Item = FragmentId>,
2248) -> MetaResult<HashMap<FragmentId, Vec<FragmentId>>>
2249where
2250    C: ConnectionTrait,
2251{
2252    let source_target_fragments: Vec<(FragmentId, FragmentId)> = FragmentRelation::find()
2253        .select_only()
2254        .columns([
2255            fragment_relation::Column::SourceFragmentId,
2256            fragment_relation::Column::TargetFragmentId,
2257        ])
2258        .filter(fragment_relation::Column::SourceFragmentId.is_in(src_fragment_id))
2259        .into_tuple()
2260        .all(txn)
2261        .await?;
2262
2263    let source_target_fragments = source_target_fragments.into_iter().into_group_map();
2264
2265    Ok(source_target_fragments)
2266}
2267
2268pub async fn get_sink_fragment_by_ids<C>(
2269    txn: &C,
2270    sink_ids: Vec<SinkId>,
2271) -> MetaResult<HashMap<SinkId, FragmentId>>
2272where
2273    C: ConnectionTrait,
2274{
2275    let sink_num = sink_ids.len();
2276    let sink_fragment_ids: Vec<(SinkId, FragmentId)> = Fragment::find()
2277        .select_only()
2278        .columns([fragment::Column::JobId, fragment::Column::FragmentId])
2279        .filter(
2280            fragment::Column::JobId
2281                .is_in(sink_ids)
2282                .and(FragmentTypeMask::intersects(FragmentTypeFlag::Sink)),
2283        )
2284        .into_tuple()
2285        .all(txn)
2286        .await?;
2287
2288    if sink_fragment_ids.len() != sink_num {
2289        return Err(anyhow::anyhow!(
2290            "expected exactly one sink fragment for each sink, but got {} fragments for {} sinks",
2291            sink_fragment_ids.len(),
2292            sink_num
2293        )
2294        .into());
2295    }
2296
2297    Ok(sink_fragment_ids.into_iter().collect())
2298}
2299
2300pub async fn has_table_been_migrated<C>(txn: &C, table_id: TableId) -> MetaResult<bool>
2301where
2302    C: ConnectionTrait,
2303{
2304    let mview_fragment: Vec<i32> = Fragment::find()
2305        .select_only()
2306        .column(fragment::Column::FragmentTypeMask)
2307        .filter(
2308            fragment::Column::JobId
2309                .eq(table_id)
2310                .and(FragmentTypeMask::intersects(FragmentTypeFlag::Mview)),
2311        )
2312        .into_tuple()
2313        .all(txn)
2314        .await?;
2315
2316    let mview_fragment_len = mview_fragment.len();
2317    if mview_fragment_len != 1 {
2318        bail!(
2319            "expected exactly one mview fragment for table {}, found {}",
2320            table_id,
2321            mview_fragment_len
2322        );
2323    }
2324
2325    let mview_fragment = mview_fragment.into_iter().next().unwrap();
2326    let migrated =
2327        FragmentTypeMask::from(mview_fragment).contains(FragmentTypeFlag::UpstreamSinkUnion);
2328
2329    Ok(migrated)
2330}
2331
2332pub async fn try_get_iceberg_table_by_downstream_sink<C>(
2333    txn: &C,
2334    sink_id: SinkId,
2335) -> MetaResult<Option<TableId>>
2336where
2337    C: ConnectionTrait,
2338{
2339    let sink = Sink::find_by_id(sink_id).one(txn).await?;
2340    let Some(sink) = sink else {
2341        return Ok(None);
2342    };
2343
2344    if sink.name.starts_with(ICEBERG_SINK_PREFIX) {
2345        let object_ids: Vec<ObjectId> = ObjectDependency::find()
2346            .select_only()
2347            .column(object_dependency::Column::Oid)
2348            .filter(object_dependency::Column::UsedBy.eq(sink_id))
2349            .into_tuple()
2350            .all(txn)
2351            .await?;
2352        let mut iceberg_table_ids = vec![];
2353        for object_id in object_ids {
2354            let table_id = object_id.as_table_id();
2355            if let Some(table_engine) = Table::find_by_id(table_id)
2356                .select_only()
2357                .column(table::Column::Engine)
2358                .into_tuple::<table::Engine>()
2359                .one(txn)
2360                .await?
2361                && table_engine == table::Engine::Iceberg
2362            {
2363                iceberg_table_ids.push(table_id);
2364            }
2365        }
2366        if iceberg_table_ids.len() == 1 {
2367            return Ok(Some(iceberg_table_ids[0]));
2368        }
2369    }
2370    Ok(None)
2371}
2372
2373pub async fn check_if_belongs_to_iceberg_table<C>(txn: &C, job_id: JobId) -> MetaResult<bool>
2374where
2375    C: ConnectionTrait,
2376{
2377    if let Some(engine) = Table::find_by_id(job_id.as_mv_table_id())
2378        .select_only()
2379        .column(table::Column::Engine)
2380        .into_tuple::<table::Engine>()
2381        .one(txn)
2382        .await?
2383        && engine == table::Engine::Iceberg
2384    {
2385        return Ok(true);
2386    }
2387    if let Some(sink_name) = Sink::find_by_id(job_id.as_sink_id())
2388        .select_only()
2389        .column(sink::Column::Name)
2390        .into_tuple::<String>()
2391        .one(txn)
2392        .await?
2393        && sink_name.starts_with(ICEBERG_SINK_PREFIX)
2394    {
2395        return Ok(true);
2396    }
2397    Ok(false)
2398}
2399
2400pub async fn find_dirty_iceberg_table_jobs<C>(
2401    txn: &C,
2402    database_id: Option<DatabaseId>,
2403) -> MetaResult<Vec<PartialObject>>
2404where
2405    C: ConnectionTrait,
2406{
2407    let mut filter_condition = streaming_job::Column::JobStatus
2408        .ne(JobStatus::Created)
2409        .and(object::Column::ObjType.is_in([ObjectType::Table, ObjectType::Sink]))
2410        .and(streaming_job::Column::CreateType.eq(CreateType::Background));
2411    if let Some(database_id) = database_id {
2412        filter_condition = filter_condition.and(object::Column::DatabaseId.eq(database_id));
2413    }
2414    let creating_table_sink_jobs: Vec<PartialObject> = StreamingJob::find()
2415        .select_only()
2416        .columns([
2417            object::Column::Oid,
2418            object::Column::ObjType,
2419            object::Column::SchemaId,
2420            object::Column::DatabaseId,
2421        ])
2422        .join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
2423        .filter(filter_condition)
2424        .into_partial_model()
2425        .all(txn)
2426        .await?;
2427
2428    let mut dirty_iceberg_table_jobs = vec![];
2429    for job in creating_table_sink_jobs {
2430        if check_if_belongs_to_iceberg_table(txn, job.oid.as_job_id()).await? {
2431            tracing::info!("Found dirty iceberg job with id: {}", job.oid);
2432            dirty_iceberg_table_jobs.push(job);
2433        }
2434    }
2435
2436    Ok(dirty_iceberg_table_jobs)
2437}
2438
2439pub fn build_select_node_list(
2440    from: &[ColumnCatalog],
2441    to: &[ColumnCatalog],
2442) -> MetaResult<Vec<PbExprNode>> {
2443    let mut exprs = Vec::with_capacity(to.len());
2444    let idx_by_col_id = from
2445        .iter()
2446        .enumerate()
2447        .map(|(idx, col)| (col.column_desc.as_ref().unwrap().column_id, idx))
2448        .collect::<HashMap<_, _>>();
2449
2450    for to_col in to {
2451        let to_col = to_col.column_desc.as_ref().unwrap();
2452        let to_col_type_ref = to_col.column_type.as_ref().unwrap();
2453        let to_col_type = DataType::from(to_col_type_ref);
2454        if let Some(from_idx) = idx_by_col_id.get(&to_col.column_id) {
2455            let from_col_type = DataType::from(
2456                from[*from_idx]
2457                    .column_desc
2458                    .as_ref()
2459                    .unwrap()
2460                    .column_type
2461                    .as_ref()
2462                    .unwrap(),
2463            );
2464            if !to_col_type.equals_datatype(&from_col_type) {
2465                return Err(anyhow!(
2466                    "Column type mismatch: {:?} != {:?}",
2467                    from_col_type,
2468                    to_col_type
2469                )
2470                .into());
2471            }
2472            exprs.push(PbExprNode {
2473                function_type: expr_node::Type::Unspecified.into(),
2474                return_type: Some(to_col_type_ref.clone()),
2475                rex_node: Some(expr_node::RexNode::InputRef(*from_idx as _)),
2476            });
2477        } else {
2478            let to_default_node =
2479                if let Some(GeneratedOrDefaultColumn::DefaultColumn(DefaultColumnDesc {
2480                    expr,
2481                    ..
2482                })) = &to_col.generated_or_default_column
2483                {
2484                    expr.clone().unwrap()
2485                } else {
2486                    let null = Datum::None.to_protobuf();
2487                    PbExprNode {
2488                        function_type: expr_node::Type::Unspecified.into(),
2489                        return_type: Some(to_col_type_ref.clone()),
2490                        rex_node: Some(expr_node::RexNode::Constant(null)),
2491                    }
2492                };
2493            exprs.push(to_default_node);
2494        }
2495    }
2496
2497    Ok(exprs)
2498}
2499
2500#[derive(Clone, Debug, Default)]
2501pub struct StreamingJobExtraInfo {
2502    pub timezone: Option<String>,
2503    pub config_override: Arc<str>,
2504    pub adaptive_parallelism_strategy: Option<AdaptiveParallelismStrategy>,
2505    pub job_definition: String,
2506    pub backfill_orders: Option<BackfillOrders>,
2507}
2508
2509impl StreamingJobExtraInfo {
2510    pub fn stream_context(&self) -> StreamContext {
2511        StreamContext {
2512            timezone: self.timezone.clone(),
2513            config_override: self.config_override.clone(),
2514            adaptive_parallelism_strategy: self.adaptive_parallelism_strategy,
2515        }
2516    }
2517}
2518
2519/// Tuple of (`job_id`, `timezone`, `config_override`, `adaptive_parallelism_strategy`, `backfill_orders`)
2520type StreamingJobExtraInfoRow = (
2521    JobId,
2522    Option<String>,
2523    Option<String>,
2524    Option<String>,
2525    Option<BackfillOrders>,
2526);
2527
2528pub async fn get_streaming_job_extra_info<C>(
2529    txn: &C,
2530    job_ids: Vec<JobId>,
2531) -> MetaResult<HashMap<JobId, StreamingJobExtraInfo>>
2532where
2533    C: ConnectionTrait,
2534{
2535    let pairs: Vec<StreamingJobExtraInfoRow> = StreamingJob::find()
2536        .select_only()
2537        .columns([
2538            streaming_job::Column::JobId,
2539            streaming_job::Column::Timezone,
2540            streaming_job::Column::ConfigOverride,
2541            streaming_job::Column::AdaptiveParallelismStrategy,
2542            streaming_job::Column::BackfillOrders,
2543        ])
2544        .filter(streaming_job::Column::JobId.is_in(job_ids.clone()))
2545        .into_tuple()
2546        .all(txn)
2547        .await?;
2548
2549    let job_ids = job_ids.into_iter().collect();
2550
2551    let mut definitions = resolve_streaming_job_definition(txn, &job_ids).await?;
2552
2553    let result = pairs
2554        .into_iter()
2555        .map(
2556            |(job_id, timezone, config_override, strategy, backfill_orders)| {
2557                let job_definition = definitions.remove(&job_id).unwrap_or_default();
2558                let adaptive_parallelism_strategy = strategy.as_deref().map(|s| {
2559                    parse_strategy(s).expect("strategy should be validated before storing")
2560                });
2561                (
2562                    job_id,
2563                    StreamingJobExtraInfo {
2564                        timezone,
2565                        config_override: config_override.unwrap_or_default().into(),
2566                        adaptive_parallelism_strategy,
2567                        job_definition,
2568                        backfill_orders,
2569                    },
2570                )
2571            },
2572        )
2573        .collect();
2574
2575    Ok(result)
2576}
2577
2578#[cfg(test)]
2579mod tests {
2580    use super::*;
2581
2582    #[test]
2583    fn test_extract_cdc_table_name() {
2584        let ddl1 = "CREATE TABLE t1 () FROM pg_source TABLE 'public.t1'";
2585        let ddl2 = "CREATE TABLE t2 (v1 int) FROM pg_source TABLE 'mydb.t2'";
2586        assert_eq!(
2587            extract_external_table_name_from_definition(ddl1),
2588            Some("public.t1".into())
2589        );
2590        assert_eq!(
2591            extract_external_table_name_from_definition(ddl2),
2592            Some("mydb.t2".into())
2593        );
2594    }
2595}