risingwave_meta/controller/
utils.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{BTreeSet, HashMap, HashSet};
17use std::sync::Arc;
18
19use anyhow::{Context, anyhow};
20use itertools::Itertools;
21use risingwave_common::bitmap::Bitmap;
22use risingwave_common::hash::{ActorMapping, VnodeBitmapExt, WorkerSlotId, WorkerSlotMapping};
23use risingwave_common::util::worker_util::DEFAULT_RESOURCE_GROUP;
24use risingwave_common::{bail, hash};
25use risingwave_meta_model::actor::ActorStatus;
26use risingwave_meta_model::fragment::DistributionType;
27use risingwave_meta_model::object::ObjectType;
28use risingwave_meta_model::prelude::*;
29use risingwave_meta_model::table::TableType;
30use risingwave_meta_model::{
31    ActorId, DataTypeArray, DatabaseId, DispatcherType, FragmentId, I32Array, JobStatus, ObjectId,
32    PrivilegeId, SchemaId, SourceId, StreamNode, StreamSourceInfo, TableId, UserId, VnodeBitmap,
33    WorkerId, actor, connection, database, fragment, fragment_relation, function, index, object,
34    object_dependency, schema, secret, sink, source, streaming_job, subscription, table, user,
35    user_privilege, view,
36};
37use risingwave_meta_model_migration::WithQuery;
38use risingwave_pb::catalog::{
39    PbConnection, PbDatabase, PbFunction, PbIndex, PbSchema, PbSecret, PbSink, PbSource,
40    PbSubscription, PbTable, PbView,
41};
42use risingwave_pb::common::WorkerNode;
43use risingwave_pb::meta::object::PbObjectInfo;
44use risingwave_pb::meta::subscribe_response::Info as NotificationInfo;
45use risingwave_pb::meta::{
46    FragmentWorkerSlotMapping, PbFragmentWorkerSlotMapping, PbObject, PbObjectGroup,
47};
48use risingwave_pb::stream_plan::{PbDispatcher, PbDispatcherType, PbFragmentTypeFlag};
49use risingwave_pb::user::grant_privilege::{
50    PbAction, PbActionWithGrantOption, PbObject as PbGrantObject,
51};
52use risingwave_pb::user::{PbGrantPrivilege, PbUserInfo};
53use risingwave_sqlparser::ast::Statement as SqlStatement;
54use risingwave_sqlparser::parser::Parser;
55use sea_orm::sea_query::{
56    Alias, CommonTableExpression, Expr, Query, QueryStatementBuilder, SelectStatement, UnionType,
57    WithClause,
58};
59use sea_orm::{
60    ColumnTrait, ConnectionTrait, DatabaseTransaction, DerivePartialModel, EntityTrait,
61    FromQueryResult, IntoActiveModel, JoinType, Order, PaginatorTrait, QueryFilter, QuerySelect,
62    RelationTrait, Set, Statement,
63};
64use thiserror_ext::AsReport;
65
66use crate::controller::ObjectModel;
67use crate::model::{FragmentActorDispatchers, FragmentDownstreamRelation};
68use crate::{MetaError, MetaResult};
69
70/// This function will construct a query using recursive cte to find all objects[(id, `obj_type`)] that are used by the given object.
71///
72/// # Examples
73///
74/// ```
75/// use risingwave_meta::controller::utils::construct_obj_dependency_query;
76/// use sea_orm::sea_query::*;
77/// use sea_orm::*;
78///
79/// let query = construct_obj_dependency_query(1);
80///
81/// assert_eq!(
82///     query.to_string(MysqlQueryBuilder),
83///     r#"WITH RECURSIVE `used_by_object_ids` (`used_by`) AS (SELECT `used_by` FROM `object_dependency` WHERE `object_dependency`.`oid` = 1 UNION ALL (SELECT `oid` FROM `object` WHERE `object`.`database_id` = 1 OR `object`.`schema_id` = 1) UNION ALL (SELECT `object_dependency`.`used_by` FROM `object_dependency` INNER JOIN `used_by_object_ids` ON `used_by_object_ids`.`used_by` = `oid`)) SELECT DISTINCT `oid`, `obj_type`, `schema_id`, `database_id` FROM `used_by_object_ids` INNER JOIN `object` ON `used_by_object_ids`.`used_by` = `oid` ORDER BY `oid` DESC"#
84/// );
85/// assert_eq!(
86///     query.to_string(PostgresQueryBuilder),
87///     r#"WITH RECURSIVE "used_by_object_ids" ("used_by") AS (SELECT "used_by" FROM "object_dependency" WHERE "object_dependency"."oid" = 1 UNION ALL (SELECT "oid" FROM "object" WHERE "object"."database_id" = 1 OR "object"."schema_id" = 1) UNION ALL (SELECT "object_dependency"."used_by" FROM "object_dependency" INNER JOIN "used_by_object_ids" ON "used_by_object_ids"."used_by" = "oid")) SELECT DISTINCT "oid", "obj_type", "schema_id", "database_id" FROM "used_by_object_ids" INNER JOIN "object" ON "used_by_object_ids"."used_by" = "oid" ORDER BY "oid" DESC"#
88/// );
89/// assert_eq!(
90///     query.to_string(SqliteQueryBuilder),
91///     r#"WITH RECURSIVE "used_by_object_ids" ("used_by") AS (SELECT "used_by" FROM "object_dependency" WHERE "object_dependency"."oid" = 1 UNION ALL SELECT "oid" FROM "object" WHERE "object"."database_id" = 1 OR "object"."schema_id" = 1 UNION ALL SELECT "object_dependency"."used_by" FROM "object_dependency" INNER JOIN "used_by_object_ids" ON "used_by_object_ids"."used_by" = "oid") SELECT DISTINCT "oid", "obj_type", "schema_id", "database_id" FROM "used_by_object_ids" INNER JOIN "object" ON "used_by_object_ids"."used_by" = "oid" ORDER BY "oid" DESC"#
92/// );
93/// ```
94pub fn construct_obj_dependency_query(obj_id: ObjectId) -> WithQuery {
95    let cte_alias = Alias::new("used_by_object_ids");
96    let cte_return_alias = Alias::new("used_by");
97
98    let mut base_query = SelectStatement::new()
99        .column(object_dependency::Column::UsedBy)
100        .from(ObjectDependency)
101        .and_where(object_dependency::Column::Oid.eq(obj_id))
102        .to_owned();
103
104    let belonged_obj_query = SelectStatement::new()
105        .column(object::Column::Oid)
106        .from(Object)
107        .and_where(
108            object::Column::DatabaseId
109                .eq(obj_id)
110                .or(object::Column::SchemaId.eq(obj_id)),
111        )
112        .to_owned();
113
114    let cte_referencing = Query::select()
115        .column((ObjectDependency, object_dependency::Column::UsedBy))
116        .from(ObjectDependency)
117        .inner_join(
118            cte_alias.clone(),
119            Expr::col((cte_alias.clone(), cte_return_alias.clone()))
120                .equals(object_dependency::Column::Oid),
121        )
122        .to_owned();
123
124    let common_table_expr = CommonTableExpression::new()
125        .query(
126            base_query
127                .union(UnionType::All, belonged_obj_query)
128                .union(UnionType::All, cte_referencing)
129                .to_owned(),
130        )
131        .column(cte_return_alias.clone())
132        .table_name(cte_alias.clone())
133        .to_owned();
134
135    SelectStatement::new()
136        .distinct()
137        .columns([
138            object::Column::Oid,
139            object::Column::ObjType,
140            object::Column::SchemaId,
141            object::Column::DatabaseId,
142        ])
143        .from(cte_alias.clone())
144        .inner_join(
145            Object,
146            Expr::col((cte_alias, cte_return_alias.clone())).equals(object::Column::Oid),
147        )
148        .order_by(object::Column::Oid, Order::Desc)
149        .to_owned()
150        .with(
151            WithClause::new()
152                .recursive(true)
153                .cte(common_table_expr)
154                .to_owned(),
155        )
156        .to_owned()
157}
158
159/// This function will construct a query using recursive cte to find if dependent objects are already relying on the target table.
160///
161/// # Examples
162///
163/// ```
164/// use risingwave_meta::controller::utils::construct_sink_cycle_check_query;
165/// use sea_orm::sea_query::*;
166/// use sea_orm::*;
167///
168/// let query = construct_sink_cycle_check_query(1, vec![2, 3]);
169///
170/// assert_eq!(
171///     query.to_string(MysqlQueryBuilder),
172///     r#"WITH RECURSIVE `used_by_object_ids_with_sink` (`oid`, `used_by`) AS (SELECT `oid`, `used_by` FROM `object_dependency` WHERE `object_dependency`.`oid` = 1 UNION ALL (SELECT `obj_dependency_with_sink`.`oid`, `obj_dependency_with_sink`.`used_by` FROM (SELECT `oid`, `used_by` FROM `object_dependency` UNION ALL (SELECT `sink_id`, `target_table` FROM `sink` WHERE `sink`.`target_table` IS NOT NULL)) AS `obj_dependency_with_sink` INNER JOIN `used_by_object_ids_with_sink` ON `used_by_object_ids_with_sink`.`used_by` = `obj_dependency_with_sink`.`oid` WHERE `used_by_object_ids_with_sink`.`used_by` <> `used_by_object_ids_with_sink`.`oid`)) SELECT COUNT(`used_by_object_ids_with_sink`.`used_by`) FROM `used_by_object_ids_with_sink` WHERE `used_by_object_ids_with_sink`.`used_by` IN (2, 3)"#
173/// );
174/// assert_eq!(
175///     query.to_string(PostgresQueryBuilder),
176///     r#"WITH RECURSIVE "used_by_object_ids_with_sink" ("oid", "used_by") AS (SELECT "oid", "used_by" FROM "object_dependency" WHERE "object_dependency"."oid" = 1 UNION ALL (SELECT "obj_dependency_with_sink"."oid", "obj_dependency_with_sink"."used_by" FROM (SELECT "oid", "used_by" FROM "object_dependency" UNION ALL (SELECT "sink_id", "target_table" FROM "sink" WHERE "sink"."target_table" IS NOT NULL)) AS "obj_dependency_with_sink" INNER JOIN "used_by_object_ids_with_sink" ON "used_by_object_ids_with_sink"."used_by" = "obj_dependency_with_sink"."oid" WHERE "used_by_object_ids_with_sink"."used_by" <> "used_by_object_ids_with_sink"."oid")) SELECT COUNT("used_by_object_ids_with_sink"."used_by") FROM "used_by_object_ids_with_sink" WHERE "used_by_object_ids_with_sink"."used_by" IN (2, 3)"#
177/// );
178/// assert_eq!(
179///     query.to_string(SqliteQueryBuilder),
180///     r#"WITH RECURSIVE "used_by_object_ids_with_sink" ("oid", "used_by") AS (SELECT "oid", "used_by" FROM "object_dependency" WHERE "object_dependency"."oid" = 1 UNION ALL SELECT "obj_dependency_with_sink"."oid", "obj_dependency_with_sink"."used_by" FROM (SELECT "oid", "used_by" FROM "object_dependency" UNION ALL SELECT "sink_id", "target_table" FROM "sink" WHERE "sink"."target_table" IS NOT NULL) AS "obj_dependency_with_sink" INNER JOIN "used_by_object_ids_with_sink" ON "used_by_object_ids_with_sink"."used_by" = "obj_dependency_with_sink"."oid" WHERE "used_by_object_ids_with_sink"."used_by" <> "used_by_object_ids_with_sink"."oid") SELECT COUNT("used_by_object_ids_with_sink"."used_by") FROM "used_by_object_ids_with_sink" WHERE "used_by_object_ids_with_sink"."used_by" IN (2, 3)"#
181/// );
182/// ```
183pub fn construct_sink_cycle_check_query(
184    target_table: ObjectId,
185    dependent_objects: Vec<ObjectId>,
186) -> WithQuery {
187    let cte_alias = Alias::new("used_by_object_ids_with_sink");
188    let depend_alias = Alias::new("obj_dependency_with_sink");
189
190    let mut base_query = SelectStatement::new()
191        .columns([
192            object_dependency::Column::Oid,
193            object_dependency::Column::UsedBy,
194        ])
195        .from(ObjectDependency)
196        .and_where(object_dependency::Column::Oid.eq(target_table))
197        .to_owned();
198
199    let query_sink_deps = SelectStatement::new()
200        .columns([sink::Column::SinkId, sink::Column::TargetTable])
201        .from(Sink)
202        .and_where(sink::Column::TargetTable.is_not_null())
203        .to_owned();
204
205    let cte_referencing = Query::select()
206        .column((depend_alias.clone(), object_dependency::Column::Oid))
207        .column((depend_alias.clone(), object_dependency::Column::UsedBy))
208        .from_subquery(
209            SelectStatement::new()
210                .columns([
211                    object_dependency::Column::Oid,
212                    object_dependency::Column::UsedBy,
213                ])
214                .from(ObjectDependency)
215                .union(UnionType::All, query_sink_deps)
216                .to_owned(),
217            depend_alias.clone(),
218        )
219        .inner_join(
220            cte_alias.clone(),
221            Expr::col((cte_alias.clone(), object_dependency::Column::UsedBy)).eq(Expr::col((
222                depend_alias.clone(),
223                object_dependency::Column::Oid,
224            ))),
225        )
226        .and_where(
227            Expr::col((cte_alias.clone(), object_dependency::Column::UsedBy)).ne(Expr::col((
228                cte_alias.clone(),
229                object_dependency::Column::Oid,
230            ))),
231        )
232        .to_owned();
233
234    let common_table_expr = CommonTableExpression::new()
235        .query(base_query.union(UnionType::All, cte_referencing).to_owned())
236        .columns([
237            object_dependency::Column::Oid,
238            object_dependency::Column::UsedBy,
239        ])
240        .table_name(cte_alias.clone())
241        .to_owned();
242
243    SelectStatement::new()
244        .expr(Expr::col((cte_alias.clone(), object_dependency::Column::UsedBy)).count())
245        .from(cte_alias.clone())
246        .and_where(
247            Expr::col((cte_alias.clone(), object_dependency::Column::UsedBy))
248                .is_in(dependent_objects),
249        )
250        .to_owned()
251        .with(
252            WithClause::new()
253                .recursive(true)
254                .cte(common_table_expr)
255                .to_owned(),
256        )
257        .to_owned()
258}
259
260#[derive(Clone, DerivePartialModel, FromQueryResult, Debug)]
261#[sea_orm(entity = "Object")]
262pub struct PartialObject {
263    pub oid: ObjectId,
264    pub obj_type: ObjectType,
265    pub schema_id: Option<SchemaId>,
266    pub database_id: Option<DatabaseId>,
267}
268
269#[derive(Clone, DerivePartialModel, FromQueryResult)]
270#[sea_orm(entity = "Fragment")]
271pub struct PartialFragmentStateTables {
272    pub fragment_id: FragmentId,
273    pub job_id: ObjectId,
274    pub state_table_ids: I32Array,
275}
276
277#[derive(Clone, DerivePartialModel, FromQueryResult)]
278#[sea_orm(entity = "Actor")]
279pub struct PartialActorLocation {
280    pub actor_id: ActorId,
281    pub fragment_id: FragmentId,
282    pub worker_id: WorkerId,
283    pub status: ActorStatus,
284}
285
286#[derive(FromQueryResult)]
287pub struct FragmentDesc {
288    pub fragment_id: FragmentId,
289    pub job_id: ObjectId,
290    pub fragment_type_mask: i32,
291    pub distribution_type: DistributionType,
292    pub state_table_ids: I32Array,
293    pub parallelism: i64,
294    pub vnode_count: i32,
295    pub stream_node: StreamNode,
296}
297
298/// List all objects that are using the given one in a cascade way. It runs a recursive CTE to find all the dependencies.
299pub async fn get_referring_objects_cascade<C>(
300    obj_id: ObjectId,
301    db: &C,
302) -> MetaResult<Vec<PartialObject>>
303where
304    C: ConnectionTrait,
305{
306    let query = construct_obj_dependency_query(obj_id);
307    let (sql, values) = query.build_any(&*db.get_database_backend().get_query_builder());
308    let objects = PartialObject::find_by_statement(Statement::from_sql_and_values(
309        db.get_database_backend(),
310        sql,
311        values,
312    ))
313    .all(db)
314    .await?;
315    Ok(objects)
316}
317
318/// Check if create a sink with given dependent objects into the target table will cause a cycle, return true if it will.
319pub async fn check_sink_into_table_cycle<C>(
320    target_table: ObjectId,
321    dependent_objs: Vec<ObjectId>,
322    db: &C,
323) -> MetaResult<bool>
324where
325    C: ConnectionTrait,
326{
327    if dependent_objs.is_empty() {
328        return Ok(false);
329    }
330
331    // special check for self referencing
332    if dependent_objs.contains(&target_table) {
333        return Ok(true);
334    }
335
336    let query = construct_sink_cycle_check_query(target_table, dependent_objs);
337    let (sql, values) = query.build_any(&*db.get_database_backend().get_query_builder());
338
339    let res = db
340        .query_one(Statement::from_sql_and_values(
341            db.get_database_backend(),
342            sql,
343            values,
344        ))
345        .await?
346        .unwrap();
347
348    let cnt: i64 = res.try_get_by(0)?;
349
350    Ok(cnt != 0)
351}
352
353/// `ensure_object_id` ensures the existence of target object in the cluster.
354pub async fn ensure_object_id<C>(
355    object_type: ObjectType,
356    obj_id: ObjectId,
357    db: &C,
358) -> MetaResult<()>
359where
360    C: ConnectionTrait,
361{
362    let count = Object::find_by_id(obj_id).count(db).await?;
363    if count == 0 {
364        return Err(MetaError::catalog_id_not_found(
365            object_type.as_str(),
366            obj_id,
367        ));
368    }
369    Ok(())
370}
371
372/// `ensure_user_id` ensures the existence of target user in the cluster.
373pub async fn ensure_user_id<C>(user_id: UserId, db: &C) -> MetaResult<()>
374where
375    C: ConnectionTrait,
376{
377    let count = User::find_by_id(user_id).count(db).await?;
378    if count == 0 {
379        return Err(anyhow!("user {} was concurrently dropped", user_id).into());
380    }
381    Ok(())
382}
383
384/// `check_database_name_duplicate` checks whether the database name is already used in the cluster.
385pub async fn check_database_name_duplicate<C>(name: &str, db: &C) -> MetaResult<()>
386where
387    C: ConnectionTrait,
388{
389    let count = Database::find()
390        .filter(database::Column::Name.eq(name))
391        .count(db)
392        .await?;
393    if count > 0 {
394        assert_eq!(count, 1);
395        return Err(MetaError::catalog_duplicated("database", name));
396    }
397    Ok(())
398}
399
400/// `check_function_signature_duplicate` checks whether the function name and its signature is already used in the target namespace.
401pub async fn check_function_signature_duplicate<C>(
402    pb_function: &PbFunction,
403    db: &C,
404) -> MetaResult<()>
405where
406    C: ConnectionTrait,
407{
408    let count = Function::find()
409        .inner_join(Object)
410        .filter(
411            object::Column::DatabaseId
412                .eq(pb_function.database_id as DatabaseId)
413                .and(object::Column::SchemaId.eq(pb_function.schema_id as SchemaId))
414                .and(function::Column::Name.eq(&pb_function.name))
415                .and(
416                    function::Column::ArgTypes
417                        .eq(DataTypeArray::from(pb_function.arg_types.clone())),
418                ),
419        )
420        .count(db)
421        .await?;
422    if count > 0 {
423        assert_eq!(count, 1);
424        return Err(MetaError::catalog_duplicated("function", &pb_function.name));
425    }
426    Ok(())
427}
428
429/// `check_connection_name_duplicate` checks whether the connection name is already used in the target namespace.
430pub async fn check_connection_name_duplicate<C>(
431    pb_connection: &PbConnection,
432    db: &C,
433) -> MetaResult<()>
434where
435    C: ConnectionTrait,
436{
437    let count = Connection::find()
438        .inner_join(Object)
439        .filter(
440            object::Column::DatabaseId
441                .eq(pb_connection.database_id as DatabaseId)
442                .and(object::Column::SchemaId.eq(pb_connection.schema_id as SchemaId))
443                .and(connection::Column::Name.eq(&pb_connection.name)),
444        )
445        .count(db)
446        .await?;
447    if count > 0 {
448        assert_eq!(count, 1);
449        return Err(MetaError::catalog_duplicated(
450            "connection",
451            &pb_connection.name,
452        ));
453    }
454    Ok(())
455}
456
457pub async fn check_secret_name_duplicate<C>(pb_secret: &PbSecret, db: &C) -> MetaResult<()>
458where
459    C: ConnectionTrait,
460{
461    let count = Secret::find()
462        .inner_join(Object)
463        .filter(
464            object::Column::DatabaseId
465                .eq(pb_secret.database_id as DatabaseId)
466                .and(object::Column::SchemaId.eq(pb_secret.schema_id as SchemaId))
467                .and(secret::Column::Name.eq(&pb_secret.name)),
468        )
469        .count(db)
470        .await?;
471    if count > 0 {
472        assert_eq!(count, 1);
473        return Err(MetaError::catalog_duplicated("secret", &pb_secret.name));
474    }
475    Ok(())
476}
477
478pub async fn check_subscription_name_duplicate<C>(
479    pb_subscription: &PbSubscription,
480    db: &C,
481) -> MetaResult<()>
482where
483    C: ConnectionTrait,
484{
485    let count = Subscription::find()
486        .inner_join(Object)
487        .filter(
488            object::Column::DatabaseId
489                .eq(pb_subscription.database_id as DatabaseId)
490                .and(object::Column::SchemaId.eq(pb_subscription.schema_id as SchemaId))
491                .and(subscription::Column::Name.eq(&pb_subscription.name)),
492        )
493        .count(db)
494        .await?;
495    if count > 0 {
496        assert_eq!(count, 1);
497        return Err(MetaError::catalog_duplicated(
498            "subscription",
499            &pb_subscription.name,
500        ));
501    }
502    Ok(())
503}
504
505/// `check_user_name_duplicate` checks whether the user is already existed in the cluster.
506pub async fn check_user_name_duplicate<C>(name: &str, db: &C) -> MetaResult<()>
507where
508    C: ConnectionTrait,
509{
510    let count = User::find()
511        .filter(user::Column::Name.eq(name))
512        .count(db)
513        .await?;
514    if count > 0 {
515        assert_eq!(count, 1);
516        return Err(MetaError::catalog_duplicated("user", name));
517    }
518    Ok(())
519}
520
521/// `check_relation_name_duplicate` checks whether the relation name is already used in the target namespace.
522pub async fn check_relation_name_duplicate<C>(
523    name: &str,
524    database_id: DatabaseId,
525    schema_id: SchemaId,
526    db: &C,
527) -> MetaResult<()>
528where
529    C: ConnectionTrait,
530{
531    macro_rules! check_duplicated {
532        ($obj_type:expr, $entity:ident, $table:ident) => {
533            let object_id = Object::find()
534                .select_only()
535                .column(object::Column::Oid)
536                .inner_join($entity)
537                .filter(
538                    object::Column::DatabaseId
539                        .eq(Some(database_id))
540                        .and(object::Column::SchemaId.eq(Some(schema_id)))
541                        .and($table::Column::Name.eq(name)),
542                )
543                .into_tuple::<ObjectId>()
544                .one(db)
545                .await?;
546            if let Some(oid) = object_id {
547                let check_creation = if $obj_type == ObjectType::View {
548                    false
549                } else if $obj_type == ObjectType::Source {
550                    let source_info = Source::find_by_id(oid)
551                        .select_only()
552                        .column(source::Column::SourceInfo)
553                        .into_tuple::<Option<StreamSourceInfo>>()
554                        .one(db)
555                        .await?
556                        .unwrap();
557                    source_info.map_or(false, |info| info.to_protobuf().is_shared())
558                } else {
559                    true
560                };
561                return if check_creation
562                    && !matches!(
563                        StreamingJob::find_by_id(oid)
564                            .select_only()
565                            .column(streaming_job::Column::JobStatus)
566                            .into_tuple::<JobStatus>()
567                            .one(db)
568                            .await?,
569                        Some(JobStatus::Created)
570                    ) {
571                    Err(MetaError::catalog_under_creation($obj_type.as_str(), name))
572                } else {
573                    Err(MetaError::catalog_duplicated($obj_type.as_str(), name))
574                };
575            }
576        };
577    }
578    check_duplicated!(ObjectType::Table, Table, table);
579    check_duplicated!(ObjectType::Source, Source, source);
580    check_duplicated!(ObjectType::Sink, Sink, sink);
581    check_duplicated!(ObjectType::Index, Index, index);
582    check_duplicated!(ObjectType::View, View, view);
583
584    Ok(())
585}
586
587/// `check_schema_name_duplicate` checks whether the schema name is already used in the target database.
588pub async fn check_schema_name_duplicate<C>(
589    name: &str,
590    database_id: DatabaseId,
591    db: &C,
592) -> MetaResult<()>
593where
594    C: ConnectionTrait,
595{
596    let count = Object::find()
597        .inner_join(Schema)
598        .filter(
599            object::Column::ObjType
600                .eq(ObjectType::Schema)
601                .and(object::Column::DatabaseId.eq(Some(database_id)))
602                .and(schema::Column::Name.eq(name)),
603        )
604        .count(db)
605        .await?;
606    if count != 0 {
607        return Err(MetaError::catalog_duplicated("schema", name));
608    }
609
610    Ok(())
611}
612
613/// `ensure_object_not_refer` ensures that object is not used by any other ones except indexes.
614pub async fn ensure_object_not_refer<C>(
615    object_type: ObjectType,
616    object_id: ObjectId,
617    db: &C,
618) -> MetaResult<()>
619where
620    C: ConnectionTrait,
621{
622    // Ignore indexes.
623    let count = if object_type == ObjectType::Table {
624        ObjectDependency::find()
625            .join(
626                JoinType::InnerJoin,
627                object_dependency::Relation::Object1.def(),
628            )
629            .filter(
630                object_dependency::Column::Oid
631                    .eq(object_id)
632                    .and(object::Column::ObjType.ne(ObjectType::Index)),
633            )
634            .count(db)
635            .await?
636    } else {
637        ObjectDependency::find()
638            .filter(object_dependency::Column::Oid.eq(object_id))
639            .count(db)
640            .await?
641    };
642    if count != 0 {
643        return Err(MetaError::permission_denied(format!(
644            "{} used by {} other objects.",
645            object_type.as_str(),
646            count
647        )));
648    }
649    Ok(())
650}
651
652/// List all objects that are using the given one.
653pub async fn get_referring_objects<C>(object_id: ObjectId, db: &C) -> MetaResult<Vec<PartialObject>>
654where
655    C: ConnectionTrait,
656{
657    let objs = ObjectDependency::find()
658        .filter(object_dependency::Column::Oid.eq(object_id))
659        .join(
660            JoinType::InnerJoin,
661            object_dependency::Relation::Object1.def(),
662        )
663        .into_partial_model()
664        .all(db)
665        .await?;
666
667    Ok(objs)
668}
669
670/// `ensure_schema_empty` ensures that the schema is empty, used by `DROP SCHEMA`.
671pub async fn ensure_schema_empty<C>(schema_id: SchemaId, db: &C) -> MetaResult<()>
672where
673    C: ConnectionTrait,
674{
675    let count = Object::find()
676        .filter(object::Column::SchemaId.eq(Some(schema_id)))
677        .count(db)
678        .await?;
679    if count != 0 {
680        return Err(MetaError::permission_denied("schema is not empty"));
681    }
682
683    Ok(())
684}
685
686/// `list_user_info_by_ids` lists all users' info by their ids.
687pub async fn list_user_info_by_ids<C>(user_ids: Vec<UserId>, db: &C) -> MetaResult<Vec<PbUserInfo>>
688where
689    C: ConnectionTrait,
690{
691    let mut user_infos = vec![];
692    for user_id in user_ids {
693        let user = User::find_by_id(user_id)
694            .one(db)
695            .await?
696            .ok_or_else(|| MetaError::catalog_id_not_found("user", user_id))?;
697        let mut user_info: PbUserInfo = user.into();
698        user_info.grant_privileges = get_user_privilege(user_id, db).await?;
699        user_infos.push(user_info);
700    }
701    Ok(user_infos)
702}
703
704/// `get_object_owner` returns the owner of the given object.
705pub async fn get_object_owner<C>(object_id: ObjectId, db: &C) -> MetaResult<UserId>
706where
707    C: ConnectionTrait,
708{
709    let obj_owner: UserId = Object::find_by_id(object_id)
710        .select_only()
711        .column(object::Column::OwnerId)
712        .into_tuple()
713        .one(db)
714        .await?
715        .ok_or_else(|| MetaError::catalog_id_not_found("object", object_id))?;
716    Ok(obj_owner)
717}
718
719/// `construct_privilege_dependency_query` constructs a query to find all privileges that are dependent on the given one.
720///
721/// # Examples
722///
723/// ```
724/// use risingwave_meta::controller::utils::construct_privilege_dependency_query;
725/// use sea_orm::sea_query::*;
726/// use sea_orm::*;
727///
728/// let query = construct_privilege_dependency_query(vec![1, 2, 3]);
729///
730/// assert_eq!(
731///    query.to_string(MysqlQueryBuilder),
732///   r#"WITH RECURSIVE `granted_privilege_ids` (`id`, `user_id`) AS (SELECT `id`, `user_id` FROM `user_privilege` WHERE `user_privilege`.`id` IN (1, 2, 3) UNION ALL (SELECT `user_privilege`.`id`, `user_privilege`.`user_id` FROM `user_privilege` INNER JOIN `granted_privilege_ids` ON `granted_privilege_ids`.`id` = `dependent_id`)) SELECT `id`, `user_id` FROM `granted_privilege_ids`"#
733/// );
734/// assert_eq!(
735///   query.to_string(PostgresQueryBuilder),
736///  r#"WITH RECURSIVE "granted_privilege_ids" ("id", "user_id") AS (SELECT "id", "user_id" FROM "user_privilege" WHERE "user_privilege"."id" IN (1, 2, 3) UNION ALL (SELECT "user_privilege"."id", "user_privilege"."user_id" FROM "user_privilege" INNER JOIN "granted_privilege_ids" ON "granted_privilege_ids"."id" = "dependent_id")) SELECT "id", "user_id" FROM "granted_privilege_ids""#
737/// );
738/// assert_eq!(
739///  query.to_string(SqliteQueryBuilder),
740///  r#"WITH RECURSIVE "granted_privilege_ids" ("id", "user_id") AS (SELECT "id", "user_id" FROM "user_privilege" WHERE "user_privilege"."id" IN (1, 2, 3) UNION ALL SELECT "user_privilege"."id", "user_privilege"."user_id" FROM "user_privilege" INNER JOIN "granted_privilege_ids" ON "granted_privilege_ids"."id" = "dependent_id") SELECT "id", "user_id" FROM "granted_privilege_ids""#
741/// );
742/// ```
743pub fn construct_privilege_dependency_query(ids: Vec<PrivilegeId>) -> WithQuery {
744    let cte_alias = Alias::new("granted_privilege_ids");
745    let cte_return_privilege_alias = Alias::new("id");
746    let cte_return_user_alias = Alias::new("user_id");
747
748    let mut base_query = SelectStatement::new()
749        .columns([user_privilege::Column::Id, user_privilege::Column::UserId])
750        .from(UserPrivilege)
751        .and_where(user_privilege::Column::Id.is_in(ids))
752        .to_owned();
753
754    let cte_referencing = Query::select()
755        .columns([
756            (UserPrivilege, user_privilege::Column::Id),
757            (UserPrivilege, user_privilege::Column::UserId),
758        ])
759        .from(UserPrivilege)
760        .inner_join(
761            cte_alias.clone(),
762            Expr::col((cte_alias.clone(), cte_return_privilege_alias.clone()))
763                .equals(user_privilege::Column::DependentId),
764        )
765        .to_owned();
766
767    let common_table_expr = CommonTableExpression::new()
768        .query(base_query.union(UnionType::All, cte_referencing).to_owned())
769        .columns([
770            cte_return_privilege_alias.clone(),
771            cte_return_user_alias.clone(),
772        ])
773        .table_name(cte_alias.clone())
774        .to_owned();
775
776    SelectStatement::new()
777        .columns([cte_return_privilege_alias, cte_return_user_alias])
778        .from(cte_alias.clone())
779        .to_owned()
780        .with(
781            WithClause::new()
782                .recursive(true)
783                .cte(common_table_expr)
784                .to_owned(),
785        )
786        .to_owned()
787}
788
789pub async fn get_internal_tables_by_id<C>(job_id: ObjectId, db: &C) -> MetaResult<Vec<TableId>>
790where
791    C: ConnectionTrait,
792{
793    let table_ids: Vec<TableId> = Table::find()
794        .select_only()
795        .column(table::Column::TableId)
796        .filter(
797            table::Column::TableType
798                .eq(TableType::Internal)
799                .and(table::Column::BelongsToJobId.eq(job_id)),
800        )
801        .into_tuple()
802        .all(db)
803        .await?;
804    Ok(table_ids)
805}
806
807pub async fn get_index_state_tables_by_table_id<C>(
808    table_id: TableId,
809    db: &C,
810) -> MetaResult<Vec<TableId>>
811where
812    C: ConnectionTrait,
813{
814    let mut index_table_ids: Vec<TableId> = Index::find()
815        .select_only()
816        .column(index::Column::IndexTableId)
817        .filter(index::Column::PrimaryTableId.eq(table_id))
818        .into_tuple()
819        .all(db)
820        .await?;
821
822    if !index_table_ids.is_empty() {
823        let internal_table_ids: Vec<TableId> = Table::find()
824            .select_only()
825            .column(table::Column::TableId)
826            .filter(
827                table::Column::TableType
828                    .eq(TableType::Internal)
829                    .and(table::Column::BelongsToJobId.is_in(index_table_ids.clone())),
830            )
831            .into_tuple()
832            .all(db)
833            .await?;
834
835        index_table_ids.extend(internal_table_ids.into_iter());
836    }
837
838    Ok(index_table_ids)
839}
840
841#[derive(Clone, DerivePartialModel, FromQueryResult)]
842#[sea_orm(entity = "UserPrivilege")]
843pub struct PartialUserPrivilege {
844    pub id: PrivilegeId,
845    pub user_id: UserId,
846}
847
848pub async fn get_referring_privileges_cascade<C>(
849    ids: Vec<PrivilegeId>,
850    db: &C,
851) -> MetaResult<Vec<PartialUserPrivilege>>
852where
853    C: ConnectionTrait,
854{
855    let query = construct_privilege_dependency_query(ids);
856    let (sql, values) = query.build_any(&*db.get_database_backend().get_query_builder());
857    let privileges = PartialUserPrivilege::find_by_statement(Statement::from_sql_and_values(
858        db.get_database_backend(),
859        sql,
860        values,
861    ))
862    .all(db)
863    .await?;
864
865    Ok(privileges)
866}
867
868/// `ensure_privileges_not_referred` ensures that the privileges are not granted to any other users.
869pub async fn ensure_privileges_not_referred<C>(ids: Vec<PrivilegeId>, db: &C) -> MetaResult<()>
870where
871    C: ConnectionTrait,
872{
873    let count = UserPrivilege::find()
874        .filter(user_privilege::Column::DependentId.is_in(ids))
875        .count(db)
876        .await?;
877    if count != 0 {
878        return Err(MetaError::permission_denied(format!(
879            "privileges granted to {} other ones.",
880            count
881        )));
882    }
883    Ok(())
884}
885
886/// `get_user_privilege` returns the privileges of the given user.
887pub async fn get_user_privilege<C>(user_id: UserId, db: &C) -> MetaResult<Vec<PbGrantPrivilege>>
888where
889    C: ConnectionTrait,
890{
891    let user_privileges = UserPrivilege::find()
892        .find_also_related(Object)
893        .filter(user_privilege::Column::UserId.eq(user_id))
894        .all(db)
895        .await?;
896    Ok(user_privileges
897        .into_iter()
898        .map(|(privilege, object)| {
899            let object = object.unwrap();
900            let oid = object.oid as _;
901            let obj = match object.obj_type {
902                ObjectType::Database => PbGrantObject::DatabaseId(oid),
903                ObjectType::Schema => PbGrantObject::SchemaId(oid),
904                ObjectType::Table | ObjectType::Index => PbGrantObject::TableId(oid),
905                ObjectType::Source => PbGrantObject::SourceId(oid),
906                ObjectType::Sink => PbGrantObject::SinkId(oid),
907                ObjectType::View => PbGrantObject::ViewId(oid),
908                ObjectType::Function => PbGrantObject::FunctionId(oid),
909                ObjectType::Connection => unreachable!("connection is not supported yet"),
910                ObjectType::Subscription => PbGrantObject::SubscriptionId(oid),
911                ObjectType::Secret => unreachable!("secret is not supported yet"),
912            };
913            PbGrantPrivilege {
914                action_with_opts: vec![PbActionWithGrantOption {
915                    action: PbAction::from(privilege.action) as _,
916                    with_grant_option: privilege.with_grant_option,
917                    granted_by: privilege.granted_by as _,
918                }],
919                object: Some(obj),
920            }
921        })
922        .collect())
923}
924
925// todo: remove it after migrated to sql backend.
926pub fn extract_grant_obj_id(object: &PbGrantObject) -> ObjectId {
927    match object {
928        PbGrantObject::DatabaseId(id)
929        | PbGrantObject::SchemaId(id)
930        | PbGrantObject::TableId(id)
931        | PbGrantObject::SourceId(id)
932        | PbGrantObject::SinkId(id)
933        | PbGrantObject::ViewId(id)
934        | PbGrantObject::FunctionId(id)
935        | PbGrantObject::SubscriptionId(id)
936        | PbGrantObject::ConnectionId(id)
937        | PbGrantObject::SecretId(id) => *id as _,
938    }
939}
940
941pub async fn insert_fragment_relations(
942    db: &impl ConnectionTrait,
943    downstream_fragment_relations: &FragmentDownstreamRelation,
944) -> MetaResult<()> {
945    for (upstream_fragment_id, downstreams) in downstream_fragment_relations {
946        for downstream in downstreams {
947            let relation = fragment_relation::Model {
948                source_fragment_id: *upstream_fragment_id as _,
949                target_fragment_id: downstream.downstream_fragment_id as _,
950                dispatcher_type: downstream.dispatcher_type,
951                dist_key_indices: downstream
952                    .dist_key_indices
953                    .iter()
954                    .map(|idx| *idx as i32)
955                    .collect_vec()
956                    .into(),
957                output_indices: downstream
958                    .output_indices
959                    .iter()
960                    .map(|idx| *idx as i32)
961                    .collect_vec()
962                    .into(),
963            };
964            FragmentRelation::insert(relation.into_active_model())
965                .exec(db)
966                .await?;
967        }
968    }
969    Ok(())
970}
971
972pub async fn get_fragment_actor_dispatchers<C>(
973    db: &C,
974    fragment_ids: Vec<FragmentId>,
975) -> MetaResult<FragmentActorDispatchers>
976where
977    C: ConnectionTrait,
978{
979    type FragmentActorInfo = (
980        DistributionType,
981        Arc<HashMap<crate::model::ActorId, Option<Bitmap>>>,
982    );
983    let mut fragment_actor_cache: HashMap<FragmentId, FragmentActorInfo> = HashMap::new();
984    let get_fragment_actors = |fragment_id: FragmentId| async move {
985        let result: MetaResult<FragmentActorInfo> = try {
986            let mut fragment_actors = Fragment::find_by_id(fragment_id)
987                .find_with_related(Actor)
988                .filter(actor::Column::Status.eq(ActorStatus::Running))
989                .all(db)
990                .await?;
991            if fragment_actors.is_empty() {
992                return Err(anyhow!("failed to find fragment: {}", fragment_id).into());
993            }
994            assert_eq!(
995                fragment_actors.len(),
996                1,
997                "find multiple fragment {:?}",
998                fragment_actors
999            );
1000            let (fragment, actors) = fragment_actors.pop().unwrap();
1001            (
1002                fragment.distribution_type,
1003                Arc::new(
1004                    actors
1005                        .into_iter()
1006                        .map(|actor| {
1007                            (
1008                                actor.actor_id as _,
1009                                actor
1010                                    .vnode_bitmap
1011                                    .map(|bitmap| Bitmap::from(bitmap.to_protobuf())),
1012                            )
1013                        })
1014                        .collect(),
1015                ),
1016            )
1017        };
1018        result
1019    };
1020    let fragment_relations = FragmentRelation::find()
1021        .filter(fragment_relation::Column::SourceFragmentId.is_in(fragment_ids))
1022        .all(db)
1023        .await?;
1024
1025    let mut actor_dispatchers_map: HashMap<_, HashMap<_, Vec<_>>> = HashMap::new();
1026    for fragment_relation::Model {
1027        source_fragment_id,
1028        target_fragment_id,
1029        dispatcher_type,
1030        dist_key_indices,
1031        output_indices,
1032    } in fragment_relations
1033    {
1034        let (source_fragment_distribution, source_fragment_actors) = {
1035            let (distribution, actors) = {
1036                match fragment_actor_cache.entry(source_fragment_id) {
1037                    Entry::Occupied(entry) => entry.into_mut(),
1038                    Entry::Vacant(entry) => {
1039                        entry.insert(get_fragment_actors(source_fragment_id).await?)
1040                    }
1041                }
1042            };
1043            (*distribution, actors.clone())
1044        };
1045        let (target_fragment_distribution, target_fragment_actors) = {
1046            let (distribution, actors) = {
1047                match fragment_actor_cache.entry(target_fragment_id) {
1048                    Entry::Occupied(entry) => entry.into_mut(),
1049                    Entry::Vacant(entry) => {
1050                        entry.insert(get_fragment_actors(target_fragment_id).await?)
1051                    }
1052                }
1053            };
1054            (*distribution, actors.clone())
1055        };
1056        let dispatchers = compose_dispatchers(
1057            source_fragment_distribution,
1058            &source_fragment_actors,
1059            target_fragment_id as _,
1060            target_fragment_distribution,
1061            &target_fragment_actors,
1062            dispatcher_type,
1063            dist_key_indices.into_u32_array(),
1064            output_indices.into_u32_array(),
1065        );
1066        let actor_dispatchers_map = actor_dispatchers_map
1067            .entry(source_fragment_id as _)
1068            .or_default();
1069        for (actor_id, dispatchers) in dispatchers {
1070            actor_dispatchers_map
1071                .entry(actor_id as _)
1072                .or_default()
1073                .push(dispatchers);
1074        }
1075    }
1076    Ok(actor_dispatchers_map)
1077}
1078
1079pub fn compose_dispatchers(
1080    source_fragment_distribution: DistributionType,
1081    source_fragment_actors: &HashMap<crate::model::ActorId, Option<Bitmap>>,
1082    target_fragment_id: crate::model::FragmentId,
1083    target_fragment_distribution: DistributionType,
1084    target_fragment_actors: &HashMap<crate::model::ActorId, Option<Bitmap>>,
1085    dispatcher_type: DispatcherType,
1086    dist_key_indices: Vec<u32>,
1087    output_indices: Vec<u32>,
1088) -> HashMap<crate::model::ActorId, PbDispatcher> {
1089    match dispatcher_type {
1090        DispatcherType::Hash => {
1091            let dispatcher = PbDispatcher {
1092                r#type: PbDispatcherType::from(dispatcher_type) as _,
1093                dist_key_indices: dist_key_indices.clone(),
1094                output_indices: output_indices.clone(),
1095                hash_mapping: Some(
1096                    ActorMapping::from_bitmaps(
1097                        &target_fragment_actors
1098                            .iter()
1099                            .map(|(actor_id, bitmap)| {
1100                                (
1101                                    *actor_id as _,
1102                                    bitmap
1103                                        .clone()
1104                                        .expect("downstream hash dispatch must have distribution"),
1105                                )
1106                            })
1107                            .collect(),
1108                    )
1109                    .to_protobuf(),
1110                ),
1111                dispatcher_id: target_fragment_id as _,
1112                downstream_actor_id: target_fragment_actors
1113                    .keys()
1114                    .map(|actor_id| *actor_id as _)
1115                    .collect(),
1116            };
1117            source_fragment_actors
1118                .keys()
1119                .map(|source_actor_id| (*source_actor_id, dispatcher.clone()))
1120                .collect()
1121        }
1122        DispatcherType::Broadcast | DispatcherType::Simple => {
1123            let dispatcher = PbDispatcher {
1124                r#type: PbDispatcherType::from(dispatcher_type) as _,
1125                dist_key_indices: dist_key_indices.clone(),
1126                output_indices: output_indices.clone(),
1127                hash_mapping: None,
1128                dispatcher_id: target_fragment_id as _,
1129                downstream_actor_id: target_fragment_actors
1130                    .keys()
1131                    .map(|actor_id| *actor_id as _)
1132                    .collect(),
1133            };
1134            source_fragment_actors
1135                .keys()
1136                .map(|source_actor_id| (*source_actor_id, dispatcher.clone()))
1137                .collect()
1138        }
1139        DispatcherType::NoShuffle => resolve_no_shuffle_actor_dispatcher(
1140            source_fragment_distribution,
1141            source_fragment_actors,
1142            target_fragment_distribution,
1143            target_fragment_actors,
1144        )
1145        .into_iter()
1146        .map(|(upstream_actor_id, downstream_actor_id)| {
1147            (
1148                upstream_actor_id,
1149                PbDispatcher {
1150                    r#type: PbDispatcherType::NoShuffle as _,
1151                    dist_key_indices: dist_key_indices.clone(),
1152                    output_indices: output_indices.clone(),
1153                    hash_mapping: None,
1154                    dispatcher_id: target_fragment_id as _,
1155                    downstream_actor_id: vec![downstream_actor_id as _],
1156                },
1157            )
1158        })
1159        .collect(),
1160    }
1161}
1162
1163/// return (`upstream_actor_id` -> `downstream_actor_id`)
1164pub fn resolve_no_shuffle_actor_dispatcher(
1165    source_fragment_distribution: DistributionType,
1166    source_fragment_actors: &HashMap<crate::model::ActorId, Option<Bitmap>>,
1167    target_fragment_distribution: DistributionType,
1168    target_fragment_actors: &HashMap<crate::model::ActorId, Option<Bitmap>>,
1169) -> Vec<(crate::model::ActorId, crate::model::ActorId)> {
1170    assert_eq!(source_fragment_distribution, target_fragment_distribution);
1171    assert_eq!(
1172        source_fragment_actors.len(),
1173        target_fragment_actors.len(),
1174        "no-shuffle should have equal upstream downstream actor count: {:?} {:?}",
1175        source_fragment_actors,
1176        target_fragment_actors
1177    );
1178    match source_fragment_distribution {
1179        DistributionType::Single => {
1180            let assert_singleton = |bitmap: &Option<Bitmap>| {
1181                assert!(
1182                    bitmap.as_ref().map(|bitmap| bitmap.all()).unwrap_or(true),
1183                    "not singleton: {:?}",
1184                    bitmap
1185                );
1186            };
1187            assert_eq!(
1188                source_fragment_actors.len(),
1189                1,
1190                "singleton distribution actor count not 1: {:?}",
1191                source_fragment_distribution
1192            );
1193            assert_eq!(
1194                target_fragment_actors.len(),
1195                1,
1196                "singleton distribution actor count not 1: {:?}",
1197                target_fragment_distribution
1198            );
1199            let (source_actor_id, bitmap) = source_fragment_actors.iter().next().unwrap();
1200            assert_singleton(bitmap);
1201            let (target_actor_id, bitmap) = target_fragment_actors.iter().next().unwrap();
1202            assert_singleton(bitmap);
1203            vec![(*source_actor_id, *target_actor_id)]
1204        }
1205        DistributionType::Hash => {
1206            let mut target_fragment_actor_index: HashMap<_, _> = target_fragment_actors
1207                .iter()
1208                .map(|(actor_id, bitmap)| {
1209                    let bitmap = bitmap
1210                        .as_ref()
1211                        .expect("hash distribution should have bitmap");
1212                    let first_vnode = bitmap.iter_vnodes().next().expect("non-empty bitmap");
1213                    (first_vnode, (*actor_id, bitmap))
1214                })
1215                .collect();
1216            source_fragment_actors
1217                .iter()
1218                .map(|(source_actor_id, bitmap)| {
1219                    let bitmap = bitmap
1220                        .as_ref()
1221                        .expect("hash distribution should have bitmap");
1222                    let first_vnode = bitmap.iter_vnodes().next().expect("non-empty bitmap");
1223                    let (target_actor_id, target_bitmap) =
1224                        target_fragment_actor_index.remove(&first_vnode).unwrap_or_else(|| {
1225                            panic!(
1226                                "cannot find matched target actor: {} {:?} {:?} {:?}",
1227                                source_actor_id,
1228                                first_vnode,
1229                                source_fragment_actors,
1230                                target_fragment_actors
1231                            );
1232                        });
1233                    assert_eq!(
1234                        bitmap,
1235                        target_bitmap,
1236                        "cannot find matched target actor due to bitmap mismatch: {} {:?} {:?} {:?}",
1237                        source_actor_id,
1238                        first_vnode,
1239                        source_fragment_actors,
1240                        target_fragment_actors
1241                    );
1242                    (*source_actor_id, target_actor_id)
1243                }).collect()
1244        }
1245    }
1246}
1247
1248/// `get_fragment_mappings` returns the fragment vnode mappings of the given job.
1249pub async fn get_fragment_mappings<C>(
1250    db: &C,
1251    job_id: ObjectId,
1252) -> MetaResult<Vec<PbFragmentWorkerSlotMapping>>
1253where
1254    C: ConnectionTrait,
1255{
1256    let job_actors: Vec<(
1257        FragmentId,
1258        DistributionType,
1259        ActorId,
1260        Option<VnodeBitmap>,
1261        WorkerId,
1262        ActorStatus,
1263    )> = Actor::find()
1264        .select_only()
1265        .columns([
1266            fragment::Column::FragmentId,
1267            fragment::Column::DistributionType,
1268        ])
1269        .columns([
1270            actor::Column::ActorId,
1271            actor::Column::VnodeBitmap,
1272            actor::Column::WorkerId,
1273            actor::Column::Status,
1274        ])
1275        .join(JoinType::InnerJoin, actor::Relation::Fragment.def())
1276        .filter(fragment::Column::JobId.eq(job_id))
1277        .into_tuple()
1278        .all(db)
1279        .await?;
1280
1281    Ok(rebuild_fragment_mapping_from_actors(job_actors))
1282}
1283
1284pub fn rebuild_fragment_mapping_from_actors(
1285    job_actors: Vec<(
1286        FragmentId,
1287        DistributionType,
1288        ActorId,
1289        Option<VnodeBitmap>,
1290        WorkerId,
1291        ActorStatus,
1292    )>,
1293) -> Vec<FragmentWorkerSlotMapping> {
1294    let mut all_actor_locations = HashMap::new();
1295    let mut actor_bitmaps = HashMap::new();
1296    let mut fragment_actors = HashMap::new();
1297    let mut fragment_dist = HashMap::new();
1298
1299    for (fragment_id, dist, actor_id, bitmap, worker_id, actor_status) in job_actors {
1300        if actor_status == ActorStatus::Inactive {
1301            continue;
1302        }
1303
1304        all_actor_locations
1305            .entry(fragment_id)
1306            .or_insert(HashMap::new())
1307            .insert(actor_id as hash::ActorId, worker_id as u32);
1308        actor_bitmaps.insert(actor_id, bitmap);
1309        fragment_actors
1310            .entry(fragment_id)
1311            .or_insert_with(Vec::new)
1312            .push(actor_id);
1313        fragment_dist.insert(fragment_id, dist);
1314    }
1315
1316    let mut result = vec![];
1317    for (fragment_id, dist) in fragment_dist {
1318        let mut actor_locations = all_actor_locations.remove(&fragment_id).unwrap();
1319        let fragment_worker_slot_mapping = match dist {
1320            DistributionType::Single => {
1321                let actor = fragment_actors
1322                    .remove(&fragment_id)
1323                    .unwrap()
1324                    .into_iter()
1325                    .exactly_one()
1326                    .unwrap() as hash::ActorId;
1327                let actor_location = actor_locations.remove(&actor).unwrap();
1328
1329                WorkerSlotMapping::new_single(WorkerSlotId::new(actor_location, 0))
1330            }
1331            DistributionType::Hash => {
1332                let actors = fragment_actors.remove(&fragment_id).unwrap();
1333
1334                let all_actor_bitmaps: HashMap<_, _> = actors
1335                    .iter()
1336                    .map(|actor_id| {
1337                        let vnode_bitmap = actor_bitmaps
1338                            .remove(actor_id)
1339                            .flatten()
1340                            .expect("actor bitmap shouldn't be none in hash fragment");
1341
1342                        let bitmap = Bitmap::from(&vnode_bitmap.to_protobuf());
1343                        (*actor_id as hash::ActorId, bitmap)
1344                    })
1345                    .collect();
1346
1347                let actor_mapping = ActorMapping::from_bitmaps(&all_actor_bitmaps);
1348
1349                actor_mapping.to_worker_slot(&actor_locations)
1350            }
1351        };
1352
1353        result.push(PbFragmentWorkerSlotMapping {
1354            fragment_id: fragment_id as u32,
1355            mapping: Some(fragment_worker_slot_mapping.to_protobuf()),
1356        })
1357    }
1358    result
1359}
1360
1361/// `get_fragment_actor_ids` returns the fragment actor ids of the given fragments.
1362pub async fn get_fragment_actor_ids<C>(
1363    db: &C,
1364    fragment_ids: Vec<FragmentId>,
1365) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>>
1366where
1367    C: ConnectionTrait,
1368{
1369    let fragment_actors: Vec<(FragmentId, ActorId)> = Actor::find()
1370        .select_only()
1371        .columns([actor::Column::FragmentId, actor::Column::ActorId])
1372        .filter(actor::Column::FragmentId.is_in(fragment_ids))
1373        .into_tuple()
1374        .all(db)
1375        .await?;
1376
1377    Ok(fragment_actors.into_iter().into_group_map())
1378}
1379
1380/// For the given streaming jobs, returns
1381/// - All source fragments
1382/// - All actors
1383/// - All fragments
1384pub async fn get_fragments_for_jobs<C>(
1385    db: &C,
1386    streaming_jobs: Vec<ObjectId>,
1387) -> MetaResult<(
1388    HashMap<SourceId, BTreeSet<FragmentId>>,
1389    HashSet<ActorId>,
1390    HashSet<FragmentId>,
1391)>
1392where
1393    C: ConnectionTrait,
1394{
1395    if streaming_jobs.is_empty() {
1396        return Ok((HashMap::default(), HashSet::default(), HashSet::default()));
1397    }
1398
1399    let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find()
1400        .select_only()
1401        .columns([
1402            fragment::Column::FragmentId,
1403            fragment::Column::FragmentTypeMask,
1404            fragment::Column::StreamNode,
1405        ])
1406        .filter(fragment::Column::JobId.is_in(streaming_jobs))
1407        .into_tuple()
1408        .all(db)
1409        .await?;
1410    let actors: Vec<ActorId> = Actor::find()
1411        .select_only()
1412        .column(actor::Column::ActorId)
1413        .filter(
1414            actor::Column::FragmentId.is_in(fragments.iter().map(|(id, _, _)| *id).collect_vec()),
1415        )
1416        .into_tuple()
1417        .all(db)
1418        .await?;
1419
1420    let fragment_ids = fragments
1421        .iter()
1422        .map(|(fragment_id, _, _)| *fragment_id)
1423        .collect();
1424
1425    let mut source_fragment_ids: HashMap<SourceId, BTreeSet<FragmentId>> = HashMap::new();
1426    for (fragment_id, mask, stream_node) in fragments {
1427        if mask & PbFragmentTypeFlag::Source as i32 == 0 {
1428            continue;
1429        }
1430        if let Some(source_id) = stream_node.to_protobuf().find_stream_source() {
1431            source_fragment_ids
1432                .entry(source_id as _)
1433                .or_default()
1434                .insert(fragment_id);
1435        }
1436    }
1437
1438    Ok((
1439        source_fragment_ids,
1440        actors.into_iter().collect(),
1441        fragment_ids,
1442    ))
1443}
1444
1445/// Build a object group for notifying the deletion of the given objects.
1446///
1447/// Note that only id fields are filled in the object info, as the arguments are partial objects.
1448/// As a result, the returned notification info should only be used for deletion.
1449pub(crate) fn build_object_group_for_delete(
1450    partial_objects: Vec<PartialObject>,
1451) -> NotificationInfo {
1452    let mut objects = vec![];
1453    for obj in partial_objects {
1454        match obj.obj_type {
1455            ObjectType::Database => objects.push(PbObject {
1456                object_info: Some(PbObjectInfo::Database(PbDatabase {
1457                    id: obj.oid as _,
1458                    ..Default::default()
1459                })),
1460            }),
1461            ObjectType::Schema => objects.push(PbObject {
1462                object_info: Some(PbObjectInfo::Schema(PbSchema {
1463                    id: obj.oid as _,
1464                    database_id: obj.database_id.unwrap() as _,
1465                    ..Default::default()
1466                })),
1467            }),
1468            ObjectType::Table => objects.push(PbObject {
1469                object_info: Some(PbObjectInfo::Table(PbTable {
1470                    id: obj.oid as _,
1471                    schema_id: obj.schema_id.unwrap() as _,
1472                    database_id: obj.database_id.unwrap() as _,
1473                    ..Default::default()
1474                })),
1475            }),
1476            ObjectType::Source => objects.push(PbObject {
1477                object_info: Some(PbObjectInfo::Source(PbSource {
1478                    id: obj.oid as _,
1479                    schema_id: obj.schema_id.unwrap() as _,
1480                    database_id: obj.database_id.unwrap() as _,
1481                    ..Default::default()
1482                })),
1483            }),
1484            ObjectType::Sink => objects.push(PbObject {
1485                object_info: Some(PbObjectInfo::Sink(PbSink {
1486                    id: obj.oid as _,
1487                    schema_id: obj.schema_id.unwrap() as _,
1488                    database_id: obj.database_id.unwrap() as _,
1489                    ..Default::default()
1490                })),
1491            }),
1492            ObjectType::Subscription => objects.push(PbObject {
1493                object_info: Some(PbObjectInfo::Subscription(PbSubscription {
1494                    id: obj.oid as _,
1495                    schema_id: obj.schema_id.unwrap() as _,
1496                    database_id: obj.database_id.unwrap() as _,
1497                    ..Default::default()
1498                })),
1499            }),
1500            ObjectType::View => objects.push(PbObject {
1501                object_info: Some(PbObjectInfo::View(PbView {
1502                    id: obj.oid as _,
1503                    schema_id: obj.schema_id.unwrap() as _,
1504                    database_id: obj.database_id.unwrap() as _,
1505                    ..Default::default()
1506                })),
1507            }),
1508            ObjectType::Index => {
1509                objects.push(PbObject {
1510                    object_info: Some(PbObjectInfo::Index(PbIndex {
1511                        id: obj.oid as _,
1512                        schema_id: obj.schema_id.unwrap() as _,
1513                        database_id: obj.database_id.unwrap() as _,
1514                        ..Default::default()
1515                    })),
1516                });
1517                objects.push(PbObject {
1518                    object_info: Some(PbObjectInfo::Table(PbTable {
1519                        id: obj.oid as _,
1520                        schema_id: obj.schema_id.unwrap() as _,
1521                        database_id: obj.database_id.unwrap() as _,
1522                        ..Default::default()
1523                    })),
1524                });
1525            }
1526            ObjectType::Function => objects.push(PbObject {
1527                object_info: Some(PbObjectInfo::Function(PbFunction {
1528                    id: obj.oid as _,
1529                    schema_id: obj.schema_id.unwrap() as _,
1530                    database_id: obj.database_id.unwrap() as _,
1531                    ..Default::default()
1532                })),
1533            }),
1534            ObjectType::Connection => objects.push(PbObject {
1535                object_info: Some(PbObjectInfo::Connection(PbConnection {
1536                    id: obj.oid as _,
1537                    schema_id: obj.schema_id.unwrap() as _,
1538                    database_id: obj.database_id.unwrap() as _,
1539                    ..Default::default()
1540                })),
1541            }),
1542            ObjectType::Secret => objects.push(PbObject {
1543                object_info: Some(PbObjectInfo::Secret(PbSecret {
1544                    id: obj.oid as _,
1545                    schema_id: obj.schema_id.unwrap() as _,
1546                    database_id: obj.database_id.unwrap() as _,
1547                    ..Default::default()
1548                })),
1549            }),
1550        }
1551    }
1552    NotificationInfo::ObjectGroup(PbObjectGroup { objects })
1553}
1554
1555pub fn extract_external_table_name_from_definition(table_definition: &str) -> Option<String> {
1556    let [mut definition]: [_; 1] = Parser::parse_sql(table_definition)
1557        .context("unable to parse table definition")
1558        .inspect_err(|e| {
1559            tracing::error!(
1560                target: "auto_schema_change",
1561                error = %e.as_report(),
1562                "failed to parse table definition")
1563        })
1564        .unwrap()
1565        .try_into()
1566        .unwrap();
1567    if let SqlStatement::CreateTable { cdc_table_info, .. } = &mut definition {
1568        cdc_table_info
1569            .clone()
1570            .map(|cdc_table_info| cdc_table_info.external_table_name)
1571    } else {
1572        None
1573    }
1574}
1575
1576/// `rename_relation` renames the target relation and its definition,
1577/// it commits the changes to the transaction and returns the updated relations and the old name.
1578pub async fn rename_relation(
1579    txn: &DatabaseTransaction,
1580    object_type: ObjectType,
1581    object_id: ObjectId,
1582    object_name: &str,
1583) -> MetaResult<(Vec<PbObject>, String)> {
1584    use sea_orm::ActiveModelTrait;
1585
1586    use crate::controller::rename::alter_relation_rename;
1587
1588    let mut to_update_relations = vec![];
1589    // rename relation.
1590    macro_rules! rename_relation {
1591        ($entity:ident, $table:ident, $identity:ident, $object_id:expr) => {{
1592            let (mut relation, obj) = $entity::find_by_id($object_id)
1593                .find_also_related(Object)
1594                .one(txn)
1595                .await?
1596                .unwrap();
1597            let obj = obj.unwrap();
1598            let old_name = relation.name.clone();
1599            relation.name = object_name.into();
1600            if obj.obj_type != ObjectType::View {
1601                relation.definition = alter_relation_rename(&relation.definition, object_name);
1602            }
1603            let active_model = $table::ActiveModel {
1604                $identity: Set(relation.$identity),
1605                name: Set(object_name.into()),
1606                definition: Set(relation.definition.clone()),
1607                ..Default::default()
1608            };
1609            active_model.update(txn).await?;
1610            to_update_relations.push(PbObject {
1611                object_info: Some(PbObjectInfo::$entity(ObjectModel(relation, obj).into())),
1612            });
1613            old_name
1614        }};
1615    }
1616    // TODO: check is there any thing to change for shared source?
1617    let old_name = match object_type {
1618        ObjectType::Table => {
1619            let associated_source_id: Option<SourceId> = Source::find()
1620                .select_only()
1621                .column(source::Column::SourceId)
1622                .filter(source::Column::OptionalAssociatedTableId.eq(object_id))
1623                .into_tuple()
1624                .one(txn)
1625                .await?;
1626            if let Some(source_id) = associated_source_id {
1627                rename_relation!(Source, source, source_id, source_id);
1628            }
1629            rename_relation!(Table, table, table_id, object_id)
1630        }
1631        ObjectType::Source => rename_relation!(Source, source, source_id, object_id),
1632        ObjectType::Sink => rename_relation!(Sink, sink, sink_id, object_id),
1633        ObjectType::Subscription => {
1634            rename_relation!(Subscription, subscription, subscription_id, object_id)
1635        }
1636        ObjectType::View => rename_relation!(View, view, view_id, object_id),
1637        ObjectType::Index => {
1638            let (mut index, obj) = Index::find_by_id(object_id)
1639                .find_also_related(Object)
1640                .one(txn)
1641                .await?
1642                .unwrap();
1643            index.name = object_name.into();
1644            let index_table_id = index.index_table_id;
1645            let old_name = rename_relation!(Table, table, table_id, index_table_id);
1646
1647            // the name of index and its associated table is the same.
1648            let active_model = index::ActiveModel {
1649                index_id: sea_orm::ActiveValue::Set(index.index_id),
1650                name: sea_orm::ActiveValue::Set(object_name.into()),
1651                ..Default::default()
1652            };
1653            active_model.update(txn).await?;
1654            to_update_relations.push(PbObject {
1655                object_info: Some(PbObjectInfo::Index(ObjectModel(index, obj.unwrap()).into())),
1656            });
1657            old_name
1658        }
1659        _ => unreachable!("only relation name can be altered."),
1660    };
1661
1662    Ok((to_update_relations, old_name))
1663}
1664
1665pub async fn get_database_resource_group<C>(txn: &C, database_id: ObjectId) -> MetaResult<String>
1666where
1667    C: ConnectionTrait,
1668{
1669    let database_resource_group: Option<String> = Database::find_by_id(database_id)
1670        .select_only()
1671        .column(database::Column::ResourceGroup)
1672        .into_tuple()
1673        .one(txn)
1674        .await?
1675        .ok_or_else(|| MetaError::catalog_id_not_found("database", database_id))?;
1676
1677    Ok(database_resource_group.unwrap_or_else(|| DEFAULT_RESOURCE_GROUP.to_owned()))
1678}
1679
1680pub async fn get_existing_job_resource_group<C>(
1681    txn: &C,
1682    streaming_job_id: ObjectId,
1683) -> MetaResult<String>
1684where
1685    C: ConnectionTrait,
1686{
1687    let (job_specific_resource_group, database_resource_group): (Option<String>, Option<String>) =
1688        StreamingJob::find_by_id(streaming_job_id)
1689            .select_only()
1690            .join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
1691            .join(JoinType::InnerJoin, object::Relation::Database2.def())
1692            .column(streaming_job::Column::SpecificResourceGroup)
1693            .column(database::Column::ResourceGroup)
1694            .into_tuple()
1695            .one(txn)
1696            .await?
1697            .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", streaming_job_id))?;
1698
1699    Ok(job_specific_resource_group.unwrap_or_else(|| {
1700        database_resource_group.unwrap_or_else(|| DEFAULT_RESOURCE_GROUP.to_owned())
1701    }))
1702}
1703
1704pub fn filter_workers_by_resource_group(
1705    workers: &HashMap<u32, WorkerNode>,
1706    resource_group: &str,
1707) -> BTreeSet<WorkerId> {
1708    workers
1709        .iter()
1710        .filter(|&(_, worker)| {
1711            worker
1712                .resource_group()
1713                .map(|node_label| node_label.as_str() == resource_group)
1714                .unwrap_or(false)
1715        })
1716        .map(|(id, _)| (*id as WorkerId))
1717        .collect()
1718}
1719
1720/// `rename_relation_refer` updates the definition of relations that refer to the target one,
1721/// it commits the changes to the transaction and returns all the updated relations.
1722pub async fn rename_relation_refer(
1723    txn: &DatabaseTransaction,
1724    object_type: ObjectType,
1725    object_id: ObjectId,
1726    object_name: &str,
1727    old_name: &str,
1728) -> MetaResult<Vec<PbObject>> {
1729    use sea_orm::ActiveModelTrait;
1730
1731    use crate::controller::rename::alter_relation_rename_refs;
1732
1733    let mut to_update_relations = vec![];
1734    macro_rules! rename_relation_ref {
1735        ($entity:ident, $table:ident, $identity:ident, $object_id:expr) => {{
1736            let (mut relation, obj) = $entity::find_by_id($object_id)
1737                .find_also_related(Object)
1738                .one(txn)
1739                .await?
1740                .unwrap();
1741            relation.definition =
1742                alter_relation_rename_refs(&relation.definition, old_name, object_name);
1743            let active_model = $table::ActiveModel {
1744                $identity: Set(relation.$identity),
1745                definition: Set(relation.definition.clone()),
1746                ..Default::default()
1747            };
1748            active_model.update(txn).await?;
1749            to_update_relations.push(PbObject {
1750                object_info: Some(PbObjectInfo::$entity(
1751                    ObjectModel(relation, obj.unwrap()).into(),
1752                )),
1753            });
1754        }};
1755    }
1756    let mut objs = get_referring_objects(object_id, txn).await?;
1757    if object_type == ObjectType::Table {
1758        let incoming_sinks: I32Array = Table::find_by_id(object_id)
1759            .select_only()
1760            .column(table::Column::IncomingSinks)
1761            .into_tuple()
1762            .one(txn)
1763            .await?
1764            .ok_or_else(|| MetaError::catalog_id_not_found("table", object_id))?;
1765
1766        objs.extend(
1767            incoming_sinks
1768                .into_inner()
1769                .into_iter()
1770                .map(|id| PartialObject {
1771                    oid: id,
1772                    obj_type: ObjectType::Sink,
1773                    schema_id: None,
1774                    database_id: None,
1775                }),
1776        );
1777    }
1778
1779    for obj in objs {
1780        match obj.obj_type {
1781            ObjectType::Table => rename_relation_ref!(Table, table, table_id, obj.oid),
1782            ObjectType::Sink => rename_relation_ref!(Sink, sink, sink_id, obj.oid),
1783            ObjectType::Subscription => {
1784                rename_relation_ref!(Subscription, subscription, subscription_id, obj.oid)
1785            }
1786            ObjectType::View => rename_relation_ref!(View, view, view_id, obj.oid),
1787            ObjectType::Index => {
1788                let index_table_id: Option<TableId> = Index::find_by_id(obj.oid)
1789                    .select_only()
1790                    .column(index::Column::IndexTableId)
1791                    .into_tuple()
1792                    .one(txn)
1793                    .await?;
1794                rename_relation_ref!(Table, table, table_id, index_table_id.unwrap());
1795            }
1796            _ => {
1797                bail!("only table, sink, subscription, view and index depend on other objects.")
1798            }
1799        }
1800    }
1801
1802    Ok(to_update_relations)
1803}
1804
1805/// Validate that subscription can be safely deleted, meeting any of the following conditions:
1806/// 1. The upstream table is not referred to by any cross-db mv.
1807/// 2. After deleting the subscription, the upstream table still has at least one subscription.
1808pub async fn validate_subscription_deletion<C>(txn: &C, subscription_id: ObjectId) -> MetaResult<()>
1809where
1810    C: ConnectionTrait,
1811{
1812    let upstream_table_id: ObjectId = Subscription::find_by_id(subscription_id)
1813        .select_only()
1814        .column(subscription::Column::DependentTableId)
1815        .into_tuple()
1816        .one(txn)
1817        .await?
1818        .ok_or_else(|| MetaError::catalog_id_not_found("subscription", subscription_id))?;
1819
1820    let cnt = Subscription::find()
1821        .filter(subscription::Column::DependentTableId.eq(upstream_table_id))
1822        .count(txn)
1823        .await?;
1824    if cnt > 1 {
1825        // Ensure that at least one subscription is remained for the upstream table
1826        // once the subscription is dropped.
1827        return Ok(());
1828    }
1829
1830    // Ensure that the upstream table is not referred by any cross-db mv.
1831    let obj_alias = Alias::new("o1");
1832    let used_by_alias = Alias::new("o2");
1833    let count = ObjectDependency::find()
1834        .join_as(
1835            JoinType::InnerJoin,
1836            object_dependency::Relation::Object2.def(),
1837            obj_alias.clone(),
1838        )
1839        .join_as(
1840            JoinType::InnerJoin,
1841            object_dependency::Relation::Object1.def(),
1842            used_by_alias.clone(),
1843        )
1844        .filter(
1845            object_dependency::Column::Oid
1846                .eq(upstream_table_id)
1847                .and(object_dependency::Column::UsedBy.ne(subscription_id))
1848                .and(
1849                    Expr::col((obj_alias, object::Column::DatabaseId))
1850                        .ne(Expr::col((used_by_alias, object::Column::DatabaseId))),
1851                ),
1852        )
1853        .count(txn)
1854        .await?;
1855
1856    if count != 0 {
1857        return Err(MetaError::permission_denied(format!(
1858            "Referenced by {} cross-db objects.",
1859            count
1860        )));
1861    }
1862
1863    Ok(())
1864}
1865
1866#[cfg(test)]
1867mod tests {
1868    use super::*;
1869
1870    #[test]
1871    fn test_extract_cdc_table_name() {
1872        let ddl1 = "CREATE TABLE t1 () FROM pg_source TABLE 'public.t1'";
1873        let ddl2 = "CREATE TABLE t2 (v1 int) FROM pg_source TABLE 'mydb.t2'";
1874        assert_eq!(
1875            extract_external_table_name_from_definition(ddl1),
1876            Some("public.t1".into())
1877        );
1878        assert_eq!(
1879            extract_external_table_name_from_definition(ddl2),
1880            Some("mydb.t2".into())
1881        );
1882    }
1883}