risingwave_meta/controller/
utils.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{BTreeSet, HashMap, HashSet};
17use std::sync::Arc;
18
19use anyhow::{Context, anyhow};
20use itertools::Itertools;
21use risingwave_common::bitmap::Bitmap;
22use risingwave_common::hash::{ActorMapping, VnodeBitmapExt, WorkerSlotId, WorkerSlotMapping};
23use risingwave_common::util::worker_util::DEFAULT_RESOURCE_GROUP;
24use risingwave_common::{bail, hash};
25use risingwave_meta_model::actor::ActorStatus;
26use risingwave_meta_model::actor_dispatcher::DispatcherType;
27use risingwave_meta_model::fragment::DistributionType;
28use risingwave_meta_model::object::ObjectType;
29use risingwave_meta_model::prelude::*;
30use risingwave_meta_model::table::TableType;
31use risingwave_meta_model::{
32    ActorId, DataTypeArray, DatabaseId, FragmentId, I32Array, JobStatus, ObjectId, PrivilegeId,
33    SchemaId, SourceId, StreamNode, StreamSourceInfo, TableId, UserId, VnodeBitmap, WorkerId,
34    actor, connection, database, fragment, fragment_relation, function, index, object,
35    object_dependency, schema, secret, sink, source, streaming_job, subscription, table, user,
36    user_privilege, view,
37};
38use risingwave_meta_model_migration::WithQuery;
39use risingwave_pb::catalog::{
40    PbConnection, PbDatabase, PbFunction, PbIndex, PbSchema, PbSecret, PbSink, PbSource,
41    PbSubscription, PbTable, PbView,
42};
43use risingwave_pb::common::WorkerNode;
44use risingwave_pb::meta::object::PbObjectInfo;
45use risingwave_pb::meta::subscribe_response::Info as NotificationInfo;
46use risingwave_pb::meta::{
47    FragmentWorkerSlotMapping, PbFragmentWorkerSlotMapping, PbObject, PbObjectGroup,
48};
49use risingwave_pb::stream_plan::{PbDispatcher, PbDispatcherType, PbFragmentTypeFlag};
50use risingwave_pb::user::grant_privilege::{
51    PbAction, PbActionWithGrantOption, PbObject as PbGrantObject,
52};
53use risingwave_pb::user::{PbGrantPrivilege, PbUserInfo};
54use risingwave_sqlparser::ast::Statement as SqlStatement;
55use risingwave_sqlparser::parser::Parser;
56use sea_orm::sea_query::{
57    Alias, CommonTableExpression, Expr, Query, QueryStatementBuilder, SelectStatement, UnionType,
58    WithClause,
59};
60use sea_orm::{
61    ColumnTrait, ConnectionTrait, DatabaseTransaction, DerivePartialModel, EntityTrait,
62    FromQueryResult, IntoActiveModel, JoinType, Order, PaginatorTrait, QueryFilter, QuerySelect,
63    RelationTrait, Set, Statement,
64};
65use thiserror_ext::AsReport;
66
67use crate::controller::ObjectModel;
68use crate::model::{FragmentActorDispatchers, FragmentDownstreamRelation};
69use crate::{MetaError, MetaResult};
70
71/// This function will construct a query using recursive cte to find all objects[(id, `obj_type`)] that are used by the given object.
72///
73/// # Examples
74///
75/// ```
76/// use risingwave_meta::controller::utils::construct_obj_dependency_query;
77/// use sea_orm::sea_query::*;
78/// use sea_orm::*;
79///
80/// let query = construct_obj_dependency_query(1);
81///
82/// assert_eq!(
83///     query.to_string(MysqlQueryBuilder),
84///     r#"WITH RECURSIVE `used_by_object_ids` (`used_by`) AS (SELECT `used_by` FROM `object_dependency` WHERE `object_dependency`.`oid` = 1 UNION ALL (SELECT `oid` FROM `object` WHERE `object`.`database_id` = 1 OR `object`.`schema_id` = 1) UNION ALL (SELECT `object_dependency`.`used_by` FROM `object_dependency` INNER JOIN `used_by_object_ids` ON `used_by_object_ids`.`used_by` = `oid`)) SELECT DISTINCT `oid`, `obj_type`, `schema_id`, `database_id` FROM `used_by_object_ids` INNER JOIN `object` ON `used_by_object_ids`.`used_by` = `oid` ORDER BY `oid` DESC"#
85/// );
86/// assert_eq!(
87///     query.to_string(PostgresQueryBuilder),
88///     r#"WITH RECURSIVE "used_by_object_ids" ("used_by") AS (SELECT "used_by" FROM "object_dependency" WHERE "object_dependency"."oid" = 1 UNION ALL (SELECT "oid" FROM "object" WHERE "object"."database_id" = 1 OR "object"."schema_id" = 1) UNION ALL (SELECT "object_dependency"."used_by" FROM "object_dependency" INNER JOIN "used_by_object_ids" ON "used_by_object_ids"."used_by" = "oid")) SELECT DISTINCT "oid", "obj_type", "schema_id", "database_id" FROM "used_by_object_ids" INNER JOIN "object" ON "used_by_object_ids"."used_by" = "oid" ORDER BY "oid" DESC"#
89/// );
90/// assert_eq!(
91///     query.to_string(SqliteQueryBuilder),
92///     r#"WITH RECURSIVE "used_by_object_ids" ("used_by") AS (SELECT "used_by" FROM "object_dependency" WHERE "object_dependency"."oid" = 1 UNION ALL SELECT "oid" FROM "object" WHERE "object"."database_id" = 1 OR "object"."schema_id" = 1 UNION ALL SELECT "object_dependency"."used_by" FROM "object_dependency" INNER JOIN "used_by_object_ids" ON "used_by_object_ids"."used_by" = "oid") SELECT DISTINCT "oid", "obj_type", "schema_id", "database_id" FROM "used_by_object_ids" INNER JOIN "object" ON "used_by_object_ids"."used_by" = "oid" ORDER BY "oid" DESC"#
93/// );
94/// ```
95pub fn construct_obj_dependency_query(obj_id: ObjectId) -> WithQuery {
96    let cte_alias = Alias::new("used_by_object_ids");
97    let cte_return_alias = Alias::new("used_by");
98
99    let mut base_query = SelectStatement::new()
100        .column(object_dependency::Column::UsedBy)
101        .from(ObjectDependency)
102        .and_where(object_dependency::Column::Oid.eq(obj_id))
103        .to_owned();
104
105    let belonged_obj_query = SelectStatement::new()
106        .column(object::Column::Oid)
107        .from(Object)
108        .and_where(
109            object::Column::DatabaseId
110                .eq(obj_id)
111                .or(object::Column::SchemaId.eq(obj_id)),
112        )
113        .to_owned();
114
115    let cte_referencing = Query::select()
116        .column((ObjectDependency, object_dependency::Column::UsedBy))
117        .from(ObjectDependency)
118        .inner_join(
119            cte_alias.clone(),
120            Expr::col((cte_alias.clone(), cte_return_alias.clone()))
121                .equals(object_dependency::Column::Oid),
122        )
123        .to_owned();
124
125    let common_table_expr = CommonTableExpression::new()
126        .query(
127            base_query
128                .union(UnionType::All, belonged_obj_query)
129                .union(UnionType::All, cte_referencing)
130                .to_owned(),
131        )
132        .column(cte_return_alias.clone())
133        .table_name(cte_alias.clone())
134        .to_owned();
135
136    SelectStatement::new()
137        .distinct()
138        .columns([
139            object::Column::Oid,
140            object::Column::ObjType,
141            object::Column::SchemaId,
142            object::Column::DatabaseId,
143        ])
144        .from(cte_alias.clone())
145        .inner_join(
146            Object,
147            Expr::col((cte_alias, cte_return_alias.clone())).equals(object::Column::Oid),
148        )
149        .order_by(object::Column::Oid, Order::Desc)
150        .to_owned()
151        .with(
152            WithClause::new()
153                .recursive(true)
154                .cte(common_table_expr)
155                .to_owned(),
156        )
157        .to_owned()
158}
159
160/// This function will construct a query using recursive cte to find if dependent objects are already relying on the target table.
161///
162/// # Examples
163///
164/// ```
165/// use risingwave_meta::controller::utils::construct_sink_cycle_check_query;
166/// use sea_orm::sea_query::*;
167/// use sea_orm::*;
168///
169/// let query = construct_sink_cycle_check_query(1, vec![2, 3]);
170///
171/// assert_eq!(
172///     query.to_string(MysqlQueryBuilder),
173///     r#"WITH RECURSIVE `used_by_object_ids_with_sink` (`oid`, `used_by`) AS (SELECT `oid`, `used_by` FROM `object_dependency` WHERE `object_dependency`.`oid` = 1 UNION ALL (SELECT `obj_dependency_with_sink`.`oid`, `obj_dependency_with_sink`.`used_by` FROM (SELECT `oid`, `used_by` FROM `object_dependency` UNION ALL (SELECT `sink_id`, `target_table` FROM `sink` WHERE `sink`.`target_table` IS NOT NULL)) AS `obj_dependency_with_sink` INNER JOIN `used_by_object_ids_with_sink` ON `used_by_object_ids_with_sink`.`used_by` = `obj_dependency_with_sink`.`oid` WHERE `used_by_object_ids_with_sink`.`used_by` <> `used_by_object_ids_with_sink`.`oid`)) SELECT COUNT(`used_by_object_ids_with_sink`.`used_by`) FROM `used_by_object_ids_with_sink` WHERE `used_by_object_ids_with_sink`.`used_by` IN (2, 3)"#
174/// );
175/// assert_eq!(
176///     query.to_string(PostgresQueryBuilder),
177///     r#"WITH RECURSIVE "used_by_object_ids_with_sink" ("oid", "used_by") AS (SELECT "oid", "used_by" FROM "object_dependency" WHERE "object_dependency"."oid" = 1 UNION ALL (SELECT "obj_dependency_with_sink"."oid", "obj_dependency_with_sink"."used_by" FROM (SELECT "oid", "used_by" FROM "object_dependency" UNION ALL (SELECT "sink_id", "target_table" FROM "sink" WHERE "sink"."target_table" IS NOT NULL)) AS "obj_dependency_with_sink" INNER JOIN "used_by_object_ids_with_sink" ON "used_by_object_ids_with_sink"."used_by" = "obj_dependency_with_sink"."oid" WHERE "used_by_object_ids_with_sink"."used_by" <> "used_by_object_ids_with_sink"."oid")) SELECT COUNT("used_by_object_ids_with_sink"."used_by") FROM "used_by_object_ids_with_sink" WHERE "used_by_object_ids_with_sink"."used_by" IN (2, 3)"#
178/// );
179/// assert_eq!(
180///     query.to_string(SqliteQueryBuilder),
181///     r#"WITH RECURSIVE "used_by_object_ids_with_sink" ("oid", "used_by") AS (SELECT "oid", "used_by" FROM "object_dependency" WHERE "object_dependency"."oid" = 1 UNION ALL SELECT "obj_dependency_with_sink"."oid", "obj_dependency_with_sink"."used_by" FROM (SELECT "oid", "used_by" FROM "object_dependency" UNION ALL SELECT "sink_id", "target_table" FROM "sink" WHERE "sink"."target_table" IS NOT NULL) AS "obj_dependency_with_sink" INNER JOIN "used_by_object_ids_with_sink" ON "used_by_object_ids_with_sink"."used_by" = "obj_dependency_with_sink"."oid" WHERE "used_by_object_ids_with_sink"."used_by" <> "used_by_object_ids_with_sink"."oid") SELECT COUNT("used_by_object_ids_with_sink"."used_by") FROM "used_by_object_ids_with_sink" WHERE "used_by_object_ids_with_sink"."used_by" IN (2, 3)"#
182/// );
183/// ```
184pub fn construct_sink_cycle_check_query(
185    target_table: ObjectId,
186    dependent_objects: Vec<ObjectId>,
187) -> WithQuery {
188    let cte_alias = Alias::new("used_by_object_ids_with_sink");
189    let depend_alias = Alias::new("obj_dependency_with_sink");
190
191    let mut base_query = SelectStatement::new()
192        .columns([
193            object_dependency::Column::Oid,
194            object_dependency::Column::UsedBy,
195        ])
196        .from(ObjectDependency)
197        .and_where(object_dependency::Column::Oid.eq(target_table))
198        .to_owned();
199
200    let query_sink_deps = SelectStatement::new()
201        .columns([sink::Column::SinkId, sink::Column::TargetTable])
202        .from(Sink)
203        .and_where(sink::Column::TargetTable.is_not_null())
204        .to_owned();
205
206    let cte_referencing = Query::select()
207        .column((depend_alias.clone(), object_dependency::Column::Oid))
208        .column((depend_alias.clone(), object_dependency::Column::UsedBy))
209        .from_subquery(
210            SelectStatement::new()
211                .columns([
212                    object_dependency::Column::Oid,
213                    object_dependency::Column::UsedBy,
214                ])
215                .from(ObjectDependency)
216                .union(UnionType::All, query_sink_deps)
217                .to_owned(),
218            depend_alias.clone(),
219        )
220        .inner_join(
221            cte_alias.clone(),
222            Expr::col((cte_alias.clone(), object_dependency::Column::UsedBy)).eq(Expr::col((
223                depend_alias.clone(),
224                object_dependency::Column::Oid,
225            ))),
226        )
227        .and_where(
228            Expr::col((cte_alias.clone(), object_dependency::Column::UsedBy)).ne(Expr::col((
229                cte_alias.clone(),
230                object_dependency::Column::Oid,
231            ))),
232        )
233        .to_owned();
234
235    let common_table_expr = CommonTableExpression::new()
236        .query(base_query.union(UnionType::All, cte_referencing).to_owned())
237        .columns([
238            object_dependency::Column::Oid,
239            object_dependency::Column::UsedBy,
240        ])
241        .table_name(cte_alias.clone())
242        .to_owned();
243
244    SelectStatement::new()
245        .expr(Expr::col((cte_alias.clone(), object_dependency::Column::UsedBy)).count())
246        .from(cte_alias.clone())
247        .and_where(
248            Expr::col((cte_alias.clone(), object_dependency::Column::UsedBy))
249                .is_in(dependent_objects),
250        )
251        .to_owned()
252        .with(
253            WithClause::new()
254                .recursive(true)
255                .cte(common_table_expr)
256                .to_owned(),
257        )
258        .to_owned()
259}
260
261#[derive(Clone, DerivePartialModel, FromQueryResult, Debug)]
262#[sea_orm(entity = "Object")]
263pub struct PartialObject {
264    pub oid: ObjectId,
265    pub obj_type: ObjectType,
266    pub schema_id: Option<SchemaId>,
267    pub database_id: Option<DatabaseId>,
268}
269
270#[derive(Clone, DerivePartialModel, FromQueryResult)]
271#[sea_orm(entity = "Fragment")]
272pub struct PartialFragmentStateTables {
273    pub fragment_id: FragmentId,
274    pub job_id: ObjectId,
275    pub state_table_ids: I32Array,
276}
277
278#[derive(Clone, DerivePartialModel, FromQueryResult)]
279#[sea_orm(entity = "Actor")]
280pub struct PartialActorLocation {
281    pub actor_id: ActorId,
282    pub fragment_id: FragmentId,
283    pub worker_id: WorkerId,
284    pub status: ActorStatus,
285}
286
287#[derive(FromQueryResult)]
288pub struct FragmentDesc {
289    pub fragment_id: FragmentId,
290    pub job_id: ObjectId,
291    pub fragment_type_mask: i32,
292    pub distribution_type: DistributionType,
293    pub state_table_ids: I32Array,
294    pub parallelism: i64,
295    pub vnode_count: i32,
296    pub stream_node: StreamNode,
297}
298
299/// List all objects that are using the given one in a cascade way. It runs a recursive CTE to find all the dependencies.
300pub async fn get_referring_objects_cascade<C>(
301    obj_id: ObjectId,
302    db: &C,
303) -> MetaResult<Vec<PartialObject>>
304where
305    C: ConnectionTrait,
306{
307    let query = construct_obj_dependency_query(obj_id);
308    let (sql, values) = query.build_any(&*db.get_database_backend().get_query_builder());
309    let objects = PartialObject::find_by_statement(Statement::from_sql_and_values(
310        db.get_database_backend(),
311        sql,
312        values,
313    ))
314    .all(db)
315    .await?;
316    Ok(objects)
317}
318
319/// Check if create a sink with given dependent objects into the target table will cause a cycle, return true if it will.
320pub async fn check_sink_into_table_cycle<C>(
321    target_table: ObjectId,
322    dependent_objs: Vec<ObjectId>,
323    db: &C,
324) -> MetaResult<bool>
325where
326    C: ConnectionTrait,
327{
328    if dependent_objs.is_empty() {
329        return Ok(false);
330    }
331
332    // special check for self referencing
333    if dependent_objs.contains(&target_table) {
334        return Ok(true);
335    }
336
337    let query = construct_sink_cycle_check_query(target_table, dependent_objs);
338    let (sql, values) = query.build_any(&*db.get_database_backend().get_query_builder());
339
340    let res = db
341        .query_one(Statement::from_sql_and_values(
342            db.get_database_backend(),
343            sql,
344            values,
345        ))
346        .await?
347        .unwrap();
348
349    let cnt: i64 = res.try_get_by(0)?;
350
351    Ok(cnt != 0)
352}
353
354/// `ensure_object_id` ensures the existence of target object in the cluster.
355pub async fn ensure_object_id<C>(
356    object_type: ObjectType,
357    obj_id: ObjectId,
358    db: &C,
359) -> MetaResult<()>
360where
361    C: ConnectionTrait,
362{
363    let count = Object::find_by_id(obj_id).count(db).await?;
364    if count == 0 {
365        return Err(MetaError::catalog_id_not_found(
366            object_type.as_str(),
367            obj_id,
368        ));
369    }
370    Ok(())
371}
372
373/// `ensure_user_id` ensures the existence of target user in the cluster.
374pub async fn ensure_user_id<C>(user_id: UserId, db: &C) -> MetaResult<()>
375where
376    C: ConnectionTrait,
377{
378    let count = User::find_by_id(user_id).count(db).await?;
379    if count == 0 {
380        return Err(anyhow!("user {} was concurrently dropped", user_id).into());
381    }
382    Ok(())
383}
384
385/// `check_database_name_duplicate` checks whether the database name is already used in the cluster.
386pub async fn check_database_name_duplicate<C>(name: &str, db: &C) -> MetaResult<()>
387where
388    C: ConnectionTrait,
389{
390    let count = Database::find()
391        .filter(database::Column::Name.eq(name))
392        .count(db)
393        .await?;
394    if count > 0 {
395        assert_eq!(count, 1);
396        return Err(MetaError::catalog_duplicated("database", name));
397    }
398    Ok(())
399}
400
401/// `check_function_signature_duplicate` checks whether the function name and its signature is already used in the target namespace.
402pub async fn check_function_signature_duplicate<C>(
403    pb_function: &PbFunction,
404    db: &C,
405) -> MetaResult<()>
406where
407    C: ConnectionTrait,
408{
409    let count = Function::find()
410        .inner_join(Object)
411        .filter(
412            object::Column::DatabaseId
413                .eq(pb_function.database_id as DatabaseId)
414                .and(object::Column::SchemaId.eq(pb_function.schema_id as SchemaId))
415                .and(function::Column::Name.eq(&pb_function.name))
416                .and(
417                    function::Column::ArgTypes
418                        .eq(DataTypeArray::from(pb_function.arg_types.clone())),
419                ),
420        )
421        .count(db)
422        .await?;
423    if count > 0 {
424        assert_eq!(count, 1);
425        return Err(MetaError::catalog_duplicated("function", &pb_function.name));
426    }
427    Ok(())
428}
429
430/// `check_connection_name_duplicate` checks whether the connection name is already used in the target namespace.
431pub async fn check_connection_name_duplicate<C>(
432    pb_connection: &PbConnection,
433    db: &C,
434) -> MetaResult<()>
435where
436    C: ConnectionTrait,
437{
438    let count = Connection::find()
439        .inner_join(Object)
440        .filter(
441            object::Column::DatabaseId
442                .eq(pb_connection.database_id as DatabaseId)
443                .and(object::Column::SchemaId.eq(pb_connection.schema_id as SchemaId))
444                .and(connection::Column::Name.eq(&pb_connection.name)),
445        )
446        .count(db)
447        .await?;
448    if count > 0 {
449        assert_eq!(count, 1);
450        return Err(MetaError::catalog_duplicated(
451            "connection",
452            &pb_connection.name,
453        ));
454    }
455    Ok(())
456}
457
458pub async fn check_secret_name_duplicate<C>(pb_secret: &PbSecret, db: &C) -> MetaResult<()>
459where
460    C: ConnectionTrait,
461{
462    let count = Secret::find()
463        .inner_join(Object)
464        .filter(
465            object::Column::DatabaseId
466                .eq(pb_secret.database_id as DatabaseId)
467                .and(object::Column::SchemaId.eq(pb_secret.schema_id as SchemaId))
468                .and(secret::Column::Name.eq(&pb_secret.name)),
469        )
470        .count(db)
471        .await?;
472    if count > 0 {
473        assert_eq!(count, 1);
474        return Err(MetaError::catalog_duplicated("secret", &pb_secret.name));
475    }
476    Ok(())
477}
478
479pub async fn check_subscription_name_duplicate<C>(
480    pb_subscription: &PbSubscription,
481    db: &C,
482) -> MetaResult<()>
483where
484    C: ConnectionTrait,
485{
486    let count = Subscription::find()
487        .inner_join(Object)
488        .filter(
489            object::Column::DatabaseId
490                .eq(pb_subscription.database_id as DatabaseId)
491                .and(object::Column::SchemaId.eq(pb_subscription.schema_id as SchemaId))
492                .and(subscription::Column::Name.eq(&pb_subscription.name)),
493        )
494        .count(db)
495        .await?;
496    if count > 0 {
497        assert_eq!(count, 1);
498        return Err(MetaError::catalog_duplicated(
499            "subscription",
500            &pb_subscription.name,
501        ));
502    }
503    Ok(())
504}
505
506/// `check_user_name_duplicate` checks whether the user is already existed in the cluster.
507pub async fn check_user_name_duplicate<C>(name: &str, db: &C) -> MetaResult<()>
508where
509    C: ConnectionTrait,
510{
511    let count = User::find()
512        .filter(user::Column::Name.eq(name))
513        .count(db)
514        .await?;
515    if count > 0 {
516        assert_eq!(count, 1);
517        return Err(MetaError::catalog_duplicated("user", name));
518    }
519    Ok(())
520}
521
522/// `check_relation_name_duplicate` checks whether the relation name is already used in the target namespace.
523pub async fn check_relation_name_duplicate<C>(
524    name: &str,
525    database_id: DatabaseId,
526    schema_id: SchemaId,
527    db: &C,
528) -> MetaResult<()>
529where
530    C: ConnectionTrait,
531{
532    macro_rules! check_duplicated {
533        ($obj_type:expr, $entity:ident, $table:ident) => {
534            let object_id = Object::find()
535                .select_only()
536                .column(object::Column::Oid)
537                .inner_join($entity)
538                .filter(
539                    object::Column::DatabaseId
540                        .eq(Some(database_id))
541                        .and(object::Column::SchemaId.eq(Some(schema_id)))
542                        .and($table::Column::Name.eq(name)),
543                )
544                .into_tuple::<ObjectId>()
545                .one(db)
546                .await?;
547            if let Some(oid) = object_id {
548                let check_creation = if $obj_type == ObjectType::View {
549                    false
550                } else if $obj_type == ObjectType::Source {
551                    let source_info = Source::find_by_id(oid)
552                        .select_only()
553                        .column(source::Column::SourceInfo)
554                        .into_tuple::<Option<StreamSourceInfo>>()
555                        .one(db)
556                        .await?
557                        .unwrap();
558                    source_info.map_or(false, |info| info.to_protobuf().is_shared())
559                } else {
560                    true
561                };
562                return if check_creation
563                    && !matches!(
564                        StreamingJob::find_by_id(oid)
565                            .select_only()
566                            .column(streaming_job::Column::JobStatus)
567                            .into_tuple::<JobStatus>()
568                            .one(db)
569                            .await?,
570                        Some(JobStatus::Created)
571                    ) {
572                    Err(MetaError::catalog_under_creation($obj_type.as_str(), name))
573                } else {
574                    Err(MetaError::catalog_duplicated($obj_type.as_str(), name))
575                };
576            }
577        };
578    }
579    check_duplicated!(ObjectType::Table, Table, table);
580    check_duplicated!(ObjectType::Source, Source, source);
581    check_duplicated!(ObjectType::Sink, Sink, sink);
582    check_duplicated!(ObjectType::Index, Index, index);
583    check_duplicated!(ObjectType::View, View, view);
584
585    Ok(())
586}
587
588/// `check_schema_name_duplicate` checks whether the schema name is already used in the target database.
589pub async fn check_schema_name_duplicate<C>(
590    name: &str,
591    database_id: DatabaseId,
592    db: &C,
593) -> MetaResult<()>
594where
595    C: ConnectionTrait,
596{
597    let count = Object::find()
598        .inner_join(Schema)
599        .filter(
600            object::Column::ObjType
601                .eq(ObjectType::Schema)
602                .and(object::Column::DatabaseId.eq(Some(database_id)))
603                .and(schema::Column::Name.eq(name)),
604        )
605        .count(db)
606        .await?;
607    if count != 0 {
608        return Err(MetaError::catalog_duplicated("schema", name));
609    }
610
611    Ok(())
612}
613
614/// `ensure_object_not_refer` ensures that object is not used by any other ones except indexes.
615pub async fn ensure_object_not_refer<C>(
616    object_type: ObjectType,
617    object_id: ObjectId,
618    db: &C,
619) -> MetaResult<()>
620where
621    C: ConnectionTrait,
622{
623    // Ignore indexes.
624    let count = if object_type == ObjectType::Table {
625        ObjectDependency::find()
626            .join(
627                JoinType::InnerJoin,
628                object_dependency::Relation::Object1.def(),
629            )
630            .filter(
631                object_dependency::Column::Oid
632                    .eq(object_id)
633                    .and(object::Column::ObjType.ne(ObjectType::Index)),
634            )
635            .count(db)
636            .await?
637    } else {
638        ObjectDependency::find()
639            .filter(object_dependency::Column::Oid.eq(object_id))
640            .count(db)
641            .await?
642    };
643    if count != 0 {
644        return Err(MetaError::permission_denied(format!(
645            "{} used by {} other objects.",
646            object_type.as_str(),
647            count
648        )));
649    }
650    Ok(())
651}
652
653/// List all objects that are using the given one.
654pub async fn get_referring_objects<C>(object_id: ObjectId, db: &C) -> MetaResult<Vec<PartialObject>>
655where
656    C: ConnectionTrait,
657{
658    let objs = ObjectDependency::find()
659        .filter(object_dependency::Column::Oid.eq(object_id))
660        .join(
661            JoinType::InnerJoin,
662            object_dependency::Relation::Object1.def(),
663        )
664        .into_partial_model()
665        .all(db)
666        .await?;
667
668    Ok(objs)
669}
670
671/// `ensure_schema_empty` ensures that the schema is empty, used by `DROP SCHEMA`.
672pub async fn ensure_schema_empty<C>(schema_id: SchemaId, db: &C) -> MetaResult<()>
673where
674    C: ConnectionTrait,
675{
676    let count = Object::find()
677        .filter(object::Column::SchemaId.eq(Some(schema_id)))
678        .count(db)
679        .await?;
680    if count != 0 {
681        return Err(MetaError::permission_denied("schema is not empty"));
682    }
683
684    Ok(())
685}
686
687/// `list_user_info_by_ids` lists all users' info by their ids.
688pub async fn list_user_info_by_ids<C>(user_ids: Vec<UserId>, db: &C) -> MetaResult<Vec<PbUserInfo>>
689where
690    C: ConnectionTrait,
691{
692    let mut user_infos = vec![];
693    for user_id in user_ids {
694        let user = User::find_by_id(user_id)
695            .one(db)
696            .await?
697            .ok_or_else(|| MetaError::catalog_id_not_found("user", user_id))?;
698        let mut user_info: PbUserInfo = user.into();
699        user_info.grant_privileges = get_user_privilege(user_id, db).await?;
700        user_infos.push(user_info);
701    }
702    Ok(user_infos)
703}
704
705/// `get_object_owner` returns the owner of the given object.
706pub async fn get_object_owner<C>(object_id: ObjectId, db: &C) -> MetaResult<UserId>
707where
708    C: ConnectionTrait,
709{
710    let obj_owner: UserId = Object::find_by_id(object_id)
711        .select_only()
712        .column(object::Column::OwnerId)
713        .into_tuple()
714        .one(db)
715        .await?
716        .ok_or_else(|| MetaError::catalog_id_not_found("object", object_id))?;
717    Ok(obj_owner)
718}
719
720/// `construct_privilege_dependency_query` constructs a query to find all privileges that are dependent on the given one.
721///
722/// # Examples
723///
724/// ```
725/// use risingwave_meta::controller::utils::construct_privilege_dependency_query;
726/// use sea_orm::sea_query::*;
727/// use sea_orm::*;
728///
729/// let query = construct_privilege_dependency_query(vec![1, 2, 3]);
730///
731/// assert_eq!(
732///    query.to_string(MysqlQueryBuilder),
733///   r#"WITH RECURSIVE `granted_privilege_ids` (`id`, `user_id`) AS (SELECT `id`, `user_id` FROM `user_privilege` WHERE `user_privilege`.`id` IN (1, 2, 3) UNION ALL (SELECT `user_privilege`.`id`, `user_privilege`.`user_id` FROM `user_privilege` INNER JOIN `granted_privilege_ids` ON `granted_privilege_ids`.`id` = `dependent_id`)) SELECT `id`, `user_id` FROM `granted_privilege_ids`"#
734/// );
735/// assert_eq!(
736///   query.to_string(PostgresQueryBuilder),
737///  r#"WITH RECURSIVE "granted_privilege_ids" ("id", "user_id") AS (SELECT "id", "user_id" FROM "user_privilege" WHERE "user_privilege"."id" IN (1, 2, 3) UNION ALL (SELECT "user_privilege"."id", "user_privilege"."user_id" FROM "user_privilege" INNER JOIN "granted_privilege_ids" ON "granted_privilege_ids"."id" = "dependent_id")) SELECT "id", "user_id" FROM "granted_privilege_ids""#
738/// );
739/// assert_eq!(
740///  query.to_string(SqliteQueryBuilder),
741///  r#"WITH RECURSIVE "granted_privilege_ids" ("id", "user_id") AS (SELECT "id", "user_id" FROM "user_privilege" WHERE "user_privilege"."id" IN (1, 2, 3) UNION ALL SELECT "user_privilege"."id", "user_privilege"."user_id" FROM "user_privilege" INNER JOIN "granted_privilege_ids" ON "granted_privilege_ids"."id" = "dependent_id") SELECT "id", "user_id" FROM "granted_privilege_ids""#
742/// );
743/// ```
744pub fn construct_privilege_dependency_query(ids: Vec<PrivilegeId>) -> WithQuery {
745    let cte_alias = Alias::new("granted_privilege_ids");
746    let cte_return_privilege_alias = Alias::new("id");
747    let cte_return_user_alias = Alias::new("user_id");
748
749    let mut base_query = SelectStatement::new()
750        .columns([user_privilege::Column::Id, user_privilege::Column::UserId])
751        .from(UserPrivilege)
752        .and_where(user_privilege::Column::Id.is_in(ids))
753        .to_owned();
754
755    let cte_referencing = Query::select()
756        .columns([
757            (UserPrivilege, user_privilege::Column::Id),
758            (UserPrivilege, user_privilege::Column::UserId),
759        ])
760        .from(UserPrivilege)
761        .inner_join(
762            cte_alias.clone(),
763            Expr::col((cte_alias.clone(), cte_return_privilege_alias.clone()))
764                .equals(user_privilege::Column::DependentId),
765        )
766        .to_owned();
767
768    let common_table_expr = CommonTableExpression::new()
769        .query(base_query.union(UnionType::All, cte_referencing).to_owned())
770        .columns([
771            cte_return_privilege_alias.clone(),
772            cte_return_user_alias.clone(),
773        ])
774        .table_name(cte_alias.clone())
775        .to_owned();
776
777    SelectStatement::new()
778        .columns([cte_return_privilege_alias, cte_return_user_alias])
779        .from(cte_alias.clone())
780        .to_owned()
781        .with(
782            WithClause::new()
783                .recursive(true)
784                .cte(common_table_expr)
785                .to_owned(),
786        )
787        .to_owned()
788}
789
790pub async fn get_internal_tables_by_id<C>(job_id: ObjectId, db: &C) -> MetaResult<Vec<TableId>>
791where
792    C: ConnectionTrait,
793{
794    let table_ids: Vec<TableId> = Table::find()
795        .select_only()
796        .column(table::Column::TableId)
797        .filter(
798            table::Column::TableType
799                .eq(TableType::Internal)
800                .and(table::Column::BelongsToJobId.eq(job_id)),
801        )
802        .into_tuple()
803        .all(db)
804        .await?;
805    Ok(table_ids)
806}
807
808pub async fn get_index_state_tables_by_table_id<C>(
809    table_id: TableId,
810    db: &C,
811) -> MetaResult<Vec<TableId>>
812where
813    C: ConnectionTrait,
814{
815    let mut index_table_ids: Vec<TableId> = Index::find()
816        .select_only()
817        .column(index::Column::IndexTableId)
818        .filter(index::Column::PrimaryTableId.eq(table_id))
819        .into_tuple()
820        .all(db)
821        .await?;
822
823    if !index_table_ids.is_empty() {
824        let internal_table_ids: Vec<TableId> = Table::find()
825            .select_only()
826            .column(table::Column::TableId)
827            .filter(
828                table::Column::TableType
829                    .eq(TableType::Internal)
830                    .and(table::Column::BelongsToJobId.is_in(index_table_ids.clone())),
831            )
832            .into_tuple()
833            .all(db)
834            .await?;
835
836        index_table_ids.extend(internal_table_ids.into_iter());
837    }
838
839    Ok(index_table_ids)
840}
841
842#[derive(Clone, DerivePartialModel, FromQueryResult)]
843#[sea_orm(entity = "UserPrivilege")]
844pub struct PartialUserPrivilege {
845    pub id: PrivilegeId,
846    pub user_id: UserId,
847}
848
849pub async fn get_referring_privileges_cascade<C>(
850    ids: Vec<PrivilegeId>,
851    db: &C,
852) -> MetaResult<Vec<PartialUserPrivilege>>
853where
854    C: ConnectionTrait,
855{
856    let query = construct_privilege_dependency_query(ids);
857    let (sql, values) = query.build_any(&*db.get_database_backend().get_query_builder());
858    let privileges = PartialUserPrivilege::find_by_statement(Statement::from_sql_and_values(
859        db.get_database_backend(),
860        sql,
861        values,
862    ))
863    .all(db)
864    .await?;
865
866    Ok(privileges)
867}
868
869/// `ensure_privileges_not_referred` ensures that the privileges are not granted to any other users.
870pub async fn ensure_privileges_not_referred<C>(ids: Vec<PrivilegeId>, db: &C) -> MetaResult<()>
871where
872    C: ConnectionTrait,
873{
874    let count = UserPrivilege::find()
875        .filter(user_privilege::Column::DependentId.is_in(ids))
876        .count(db)
877        .await?;
878    if count != 0 {
879        return Err(MetaError::permission_denied(format!(
880            "privileges granted to {} other ones.",
881            count
882        )));
883    }
884    Ok(())
885}
886
887/// `get_user_privilege` returns the privileges of the given user.
888pub async fn get_user_privilege<C>(user_id: UserId, db: &C) -> MetaResult<Vec<PbGrantPrivilege>>
889where
890    C: ConnectionTrait,
891{
892    let user_privileges = UserPrivilege::find()
893        .find_also_related(Object)
894        .filter(user_privilege::Column::UserId.eq(user_id))
895        .all(db)
896        .await?;
897    Ok(user_privileges
898        .into_iter()
899        .map(|(privilege, object)| {
900            let object = object.unwrap();
901            let oid = object.oid as _;
902            let obj = match object.obj_type {
903                ObjectType::Database => PbGrantObject::DatabaseId(oid),
904                ObjectType::Schema => PbGrantObject::SchemaId(oid),
905                ObjectType::Table | ObjectType::Index => PbGrantObject::TableId(oid),
906                ObjectType::Source => PbGrantObject::SourceId(oid),
907                ObjectType::Sink => PbGrantObject::SinkId(oid),
908                ObjectType::View => PbGrantObject::ViewId(oid),
909                ObjectType::Function => PbGrantObject::FunctionId(oid),
910                ObjectType::Connection => unreachable!("connection is not supported yet"),
911                ObjectType::Subscription => PbGrantObject::SubscriptionId(oid),
912                ObjectType::Secret => unreachable!("secret is not supported yet"),
913            };
914            PbGrantPrivilege {
915                action_with_opts: vec![PbActionWithGrantOption {
916                    action: PbAction::from(privilege.action) as _,
917                    with_grant_option: privilege.with_grant_option,
918                    granted_by: privilege.granted_by as _,
919                }],
920                object: Some(obj),
921            }
922        })
923        .collect())
924}
925
926// todo: remove it after migrated to sql backend.
927pub fn extract_grant_obj_id(object: &PbGrantObject) -> ObjectId {
928    match object {
929        PbGrantObject::DatabaseId(id)
930        | PbGrantObject::SchemaId(id)
931        | PbGrantObject::TableId(id)
932        | PbGrantObject::SourceId(id)
933        | PbGrantObject::SinkId(id)
934        | PbGrantObject::ViewId(id)
935        | PbGrantObject::FunctionId(id)
936        | PbGrantObject::SubscriptionId(id)
937        | PbGrantObject::ConnectionId(id)
938        | PbGrantObject::SecretId(id) => *id as _,
939    }
940}
941
942pub async fn insert_fragment_relations(
943    db: &impl ConnectionTrait,
944    downstream_fragment_relations: &FragmentDownstreamRelation,
945) -> MetaResult<()> {
946    for (upstream_fragment_id, downstreams) in downstream_fragment_relations {
947        for downstream in downstreams {
948            let relation = fragment_relation::Model {
949                source_fragment_id: *upstream_fragment_id as _,
950                target_fragment_id: downstream.downstream_fragment_id as _,
951                dispatcher_type: downstream.dispatcher_type,
952                dist_key_indices: downstream
953                    .dist_key_indices
954                    .iter()
955                    .map(|idx| *idx as i32)
956                    .collect_vec()
957                    .into(),
958                output_indices: downstream
959                    .output_indices
960                    .iter()
961                    .map(|idx| *idx as i32)
962                    .collect_vec()
963                    .into(),
964            };
965            FragmentRelation::insert(relation.into_active_model())
966                .exec(db)
967                .await?;
968        }
969    }
970    Ok(())
971}
972
973pub async fn get_fragment_actor_dispatchers<C>(
974    db: &C,
975    fragment_ids: Vec<FragmentId>,
976) -> MetaResult<FragmentActorDispatchers>
977where
978    C: ConnectionTrait,
979{
980    type FragmentActorInfo = (
981        DistributionType,
982        Arc<HashMap<crate::model::ActorId, Option<Bitmap>>>,
983    );
984    let mut fragment_actor_cache: HashMap<FragmentId, FragmentActorInfo> = HashMap::new();
985    let get_fragment_actors = |fragment_id: FragmentId| async move {
986        let result: MetaResult<FragmentActorInfo> = try {
987            let mut fragment_actors = Fragment::find_by_id(fragment_id)
988                .find_with_related(Actor)
989                .filter(actor::Column::Status.eq(ActorStatus::Running))
990                .all(db)
991                .await?;
992            if fragment_actors.is_empty() {
993                return Err(anyhow!("failed to find fragment: {}", fragment_id).into());
994            }
995            assert_eq!(
996                fragment_actors.len(),
997                1,
998                "find multiple fragment {:?}",
999                fragment_actors
1000            );
1001            let (fragment, actors) = fragment_actors.pop().unwrap();
1002            (
1003                fragment.distribution_type,
1004                Arc::new(
1005                    actors
1006                        .into_iter()
1007                        .map(|actor| {
1008                            (
1009                                actor.actor_id as _,
1010                                actor
1011                                    .vnode_bitmap
1012                                    .map(|bitmap| Bitmap::from(bitmap.to_protobuf())),
1013                            )
1014                        })
1015                        .collect(),
1016                ),
1017            )
1018        };
1019        result
1020    };
1021    let fragment_relations = FragmentRelation::find()
1022        .filter(fragment_relation::Column::SourceFragmentId.is_in(fragment_ids))
1023        .all(db)
1024        .await?;
1025
1026    let mut actor_dispatchers_map: HashMap<_, HashMap<_, Vec<_>>> = HashMap::new();
1027    for fragment_relation::Model {
1028        source_fragment_id,
1029        target_fragment_id,
1030        dispatcher_type,
1031        dist_key_indices,
1032        output_indices,
1033    } in fragment_relations
1034    {
1035        let (source_fragment_distribution, source_fragment_actors) = {
1036            let (distribution, actors) = {
1037                match fragment_actor_cache.entry(source_fragment_id) {
1038                    Entry::Occupied(entry) => entry.into_mut(),
1039                    Entry::Vacant(entry) => {
1040                        entry.insert(get_fragment_actors(source_fragment_id).await?)
1041                    }
1042                }
1043            };
1044            (*distribution, actors.clone())
1045        };
1046        let (target_fragment_distribution, target_fragment_actors) = {
1047            let (distribution, actors) = {
1048                match fragment_actor_cache.entry(target_fragment_id) {
1049                    Entry::Occupied(entry) => entry.into_mut(),
1050                    Entry::Vacant(entry) => {
1051                        entry.insert(get_fragment_actors(target_fragment_id).await?)
1052                    }
1053                }
1054            };
1055            (*distribution, actors.clone())
1056        };
1057        let dispatchers = compose_dispatchers(
1058            source_fragment_distribution,
1059            &source_fragment_actors,
1060            target_fragment_id as _,
1061            target_fragment_distribution,
1062            &target_fragment_actors,
1063            dispatcher_type,
1064            dist_key_indices.into_u32_array(),
1065            output_indices.into_u32_array(),
1066        );
1067        let actor_dispatchers_map = actor_dispatchers_map
1068            .entry(source_fragment_id as _)
1069            .or_default();
1070        for (actor_id, dispatchers) in dispatchers {
1071            actor_dispatchers_map
1072                .entry(actor_id as _)
1073                .or_default()
1074                .push(dispatchers);
1075        }
1076    }
1077    Ok(actor_dispatchers_map)
1078}
1079
1080pub fn compose_dispatchers(
1081    source_fragment_distribution: DistributionType,
1082    source_fragment_actors: &HashMap<crate::model::ActorId, Option<Bitmap>>,
1083    target_fragment_id: crate::model::FragmentId,
1084    target_fragment_distribution: DistributionType,
1085    target_fragment_actors: &HashMap<crate::model::ActorId, Option<Bitmap>>,
1086    dispatcher_type: DispatcherType,
1087    dist_key_indices: Vec<u32>,
1088    output_indices: Vec<u32>,
1089) -> HashMap<crate::model::ActorId, PbDispatcher> {
1090    match dispatcher_type {
1091        DispatcherType::Hash => {
1092            let dispatcher = PbDispatcher {
1093                r#type: PbDispatcherType::from(dispatcher_type) as _,
1094                dist_key_indices: dist_key_indices.clone(),
1095                output_indices: output_indices.clone(),
1096                hash_mapping: Some(
1097                    ActorMapping::from_bitmaps(
1098                        &target_fragment_actors
1099                            .iter()
1100                            .map(|(actor_id, bitmap)| {
1101                                (
1102                                    *actor_id as _,
1103                                    bitmap
1104                                        .clone()
1105                                        .expect("downstream hash dispatch must have distribution"),
1106                                )
1107                            })
1108                            .collect(),
1109                    )
1110                    .to_protobuf(),
1111                ),
1112                dispatcher_id: target_fragment_id as _,
1113                downstream_actor_id: target_fragment_actors
1114                    .keys()
1115                    .map(|actor_id| *actor_id as _)
1116                    .collect(),
1117            };
1118            source_fragment_actors
1119                .keys()
1120                .map(|source_actor_id| (*source_actor_id, dispatcher.clone()))
1121                .collect()
1122        }
1123        DispatcherType::Broadcast | DispatcherType::Simple => {
1124            let dispatcher = PbDispatcher {
1125                r#type: PbDispatcherType::from(dispatcher_type) as _,
1126                dist_key_indices: dist_key_indices.clone(),
1127                output_indices: output_indices.clone(),
1128                hash_mapping: None,
1129                dispatcher_id: target_fragment_id as _,
1130                downstream_actor_id: target_fragment_actors
1131                    .keys()
1132                    .map(|actor_id| *actor_id as _)
1133                    .collect(),
1134            };
1135            source_fragment_actors
1136                .keys()
1137                .map(|source_actor_id| (*source_actor_id, dispatcher.clone()))
1138                .collect()
1139        }
1140        DispatcherType::NoShuffle => resolve_no_shuffle_actor_dispatcher(
1141            source_fragment_distribution,
1142            source_fragment_actors,
1143            target_fragment_distribution,
1144            target_fragment_actors,
1145        )
1146        .into_iter()
1147        .map(|(upstream_actor_id, downstream_actor_id)| {
1148            (
1149                upstream_actor_id,
1150                PbDispatcher {
1151                    r#type: PbDispatcherType::NoShuffle as _,
1152                    dist_key_indices: dist_key_indices.clone(),
1153                    output_indices: output_indices.clone(),
1154                    hash_mapping: None,
1155                    dispatcher_id: target_fragment_id as _,
1156                    downstream_actor_id: vec![downstream_actor_id as _],
1157                },
1158            )
1159        })
1160        .collect(),
1161    }
1162}
1163
1164/// return (`upstream_actor_id` -> `downstream_actor_id`)
1165pub fn resolve_no_shuffle_actor_dispatcher(
1166    source_fragment_distribution: DistributionType,
1167    source_fragment_actors: &HashMap<crate::model::ActorId, Option<Bitmap>>,
1168    target_fragment_distribution: DistributionType,
1169    target_fragment_actors: &HashMap<crate::model::ActorId, Option<Bitmap>>,
1170) -> Vec<(crate::model::ActorId, crate::model::ActorId)> {
1171    assert_eq!(source_fragment_distribution, target_fragment_distribution);
1172    assert_eq!(
1173        source_fragment_actors.len(),
1174        target_fragment_actors.len(),
1175        "no-shuffle should have equal upstream downstream actor count: {:?} {:?}",
1176        source_fragment_actors,
1177        target_fragment_actors
1178    );
1179    match source_fragment_distribution {
1180        DistributionType::Single => {
1181            let assert_singleton = |bitmap: &Option<Bitmap>| {
1182                assert!(
1183                    bitmap.as_ref().map(|bitmap| bitmap.all()).unwrap_or(true),
1184                    "not singleton: {:?}",
1185                    bitmap
1186                );
1187            };
1188            assert_eq!(
1189                source_fragment_actors.len(),
1190                1,
1191                "singleton distribution actor count not 1: {:?}",
1192                source_fragment_distribution
1193            );
1194            assert_eq!(
1195                target_fragment_actors.len(),
1196                1,
1197                "singleton distribution actor count not 1: {:?}",
1198                target_fragment_distribution
1199            );
1200            let (source_actor_id, bitmap) = source_fragment_actors.iter().next().unwrap();
1201            assert_singleton(bitmap);
1202            let (target_actor_id, bitmap) = target_fragment_actors.iter().next().unwrap();
1203            assert_singleton(bitmap);
1204            vec![(*source_actor_id, *target_actor_id)]
1205        }
1206        DistributionType::Hash => {
1207            let mut target_fragment_actor_index: HashMap<_, _> = target_fragment_actors
1208                .iter()
1209                .map(|(actor_id, bitmap)| {
1210                    let bitmap = bitmap
1211                        .as_ref()
1212                        .expect("hash distribution should have bitmap");
1213                    let first_vnode = bitmap.iter_vnodes().next().expect("non-empty bitmap");
1214                    (first_vnode, (*actor_id, bitmap))
1215                })
1216                .collect();
1217            source_fragment_actors
1218                .iter()
1219                .map(|(source_actor_id, bitmap)| {
1220                    let bitmap = bitmap
1221                        .as_ref()
1222                        .expect("hash distribution should have bitmap");
1223                    let first_vnode = bitmap.iter_vnodes().next().expect("non-empty bitmap");
1224                    let (target_actor_id, target_bitmap) =
1225                        target_fragment_actor_index.remove(&first_vnode).unwrap_or_else(|| {
1226                            panic!(
1227                                "cannot find matched target actor: {} {:?} {:?} {:?}",
1228                                source_actor_id,
1229                                first_vnode,
1230                                source_fragment_actors,
1231                                target_fragment_actors
1232                            );
1233                        });
1234                    assert_eq!(
1235                        bitmap,
1236                        target_bitmap,
1237                        "cannot find matched target actor due to bitmap mismatch: {} {:?} {:?} {:?}",
1238                        source_actor_id,
1239                        first_vnode,
1240                        source_fragment_actors,
1241                        target_fragment_actors
1242                    );
1243                    (*source_actor_id, target_actor_id)
1244                }).collect()
1245        }
1246    }
1247}
1248
1249/// `get_fragment_mappings` returns the fragment vnode mappings of the given job.
1250pub async fn get_fragment_mappings<C>(
1251    db: &C,
1252    job_id: ObjectId,
1253) -> MetaResult<Vec<PbFragmentWorkerSlotMapping>>
1254where
1255    C: ConnectionTrait,
1256{
1257    let job_actors: Vec<(
1258        FragmentId,
1259        DistributionType,
1260        ActorId,
1261        Option<VnodeBitmap>,
1262        WorkerId,
1263        ActorStatus,
1264    )> = Actor::find()
1265        .select_only()
1266        .columns([
1267            fragment::Column::FragmentId,
1268            fragment::Column::DistributionType,
1269        ])
1270        .columns([
1271            actor::Column::ActorId,
1272            actor::Column::VnodeBitmap,
1273            actor::Column::WorkerId,
1274            actor::Column::Status,
1275        ])
1276        .join(JoinType::InnerJoin, actor::Relation::Fragment.def())
1277        .filter(fragment::Column::JobId.eq(job_id))
1278        .into_tuple()
1279        .all(db)
1280        .await?;
1281
1282    Ok(rebuild_fragment_mapping_from_actors(job_actors))
1283}
1284
1285pub fn rebuild_fragment_mapping_from_actors(
1286    job_actors: Vec<(
1287        FragmentId,
1288        DistributionType,
1289        ActorId,
1290        Option<VnodeBitmap>,
1291        WorkerId,
1292        ActorStatus,
1293    )>,
1294) -> Vec<FragmentWorkerSlotMapping> {
1295    let mut all_actor_locations = HashMap::new();
1296    let mut actor_bitmaps = HashMap::new();
1297    let mut fragment_actors = HashMap::new();
1298    let mut fragment_dist = HashMap::new();
1299
1300    for (fragment_id, dist, actor_id, bitmap, worker_id, actor_status) in job_actors {
1301        if actor_status == ActorStatus::Inactive {
1302            continue;
1303        }
1304
1305        all_actor_locations
1306            .entry(fragment_id)
1307            .or_insert(HashMap::new())
1308            .insert(actor_id as hash::ActorId, worker_id as u32);
1309        actor_bitmaps.insert(actor_id, bitmap);
1310        fragment_actors
1311            .entry(fragment_id)
1312            .or_insert_with(Vec::new)
1313            .push(actor_id);
1314        fragment_dist.insert(fragment_id, dist);
1315    }
1316
1317    let mut result = vec![];
1318    for (fragment_id, dist) in fragment_dist {
1319        let mut actor_locations = all_actor_locations.remove(&fragment_id).unwrap();
1320        let fragment_worker_slot_mapping = match dist {
1321            DistributionType::Single => {
1322                let actor = fragment_actors
1323                    .remove(&fragment_id)
1324                    .unwrap()
1325                    .into_iter()
1326                    .exactly_one()
1327                    .unwrap() as hash::ActorId;
1328                let actor_location = actor_locations.remove(&actor).unwrap();
1329
1330                WorkerSlotMapping::new_single(WorkerSlotId::new(actor_location, 0))
1331            }
1332            DistributionType::Hash => {
1333                let actors = fragment_actors.remove(&fragment_id).unwrap();
1334
1335                let all_actor_bitmaps: HashMap<_, _> = actors
1336                    .iter()
1337                    .map(|actor_id| {
1338                        let vnode_bitmap = actor_bitmaps
1339                            .remove(actor_id)
1340                            .flatten()
1341                            .expect("actor bitmap shouldn't be none in hash fragment");
1342
1343                        let bitmap = Bitmap::from(&vnode_bitmap.to_protobuf());
1344                        (*actor_id as hash::ActorId, bitmap)
1345                    })
1346                    .collect();
1347
1348                let actor_mapping = ActorMapping::from_bitmaps(&all_actor_bitmaps);
1349
1350                actor_mapping.to_worker_slot(&actor_locations)
1351            }
1352        };
1353
1354        result.push(PbFragmentWorkerSlotMapping {
1355            fragment_id: fragment_id as u32,
1356            mapping: Some(fragment_worker_slot_mapping.to_protobuf()),
1357        })
1358    }
1359    result
1360}
1361
1362/// `get_fragment_actor_ids` returns the fragment actor ids of the given fragments.
1363pub async fn get_fragment_actor_ids<C>(
1364    db: &C,
1365    fragment_ids: Vec<FragmentId>,
1366) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>>
1367where
1368    C: ConnectionTrait,
1369{
1370    let fragment_actors: Vec<(FragmentId, ActorId)> = Actor::find()
1371        .select_only()
1372        .columns([actor::Column::FragmentId, actor::Column::ActorId])
1373        .filter(actor::Column::FragmentId.is_in(fragment_ids))
1374        .into_tuple()
1375        .all(db)
1376        .await?;
1377
1378    Ok(fragment_actors.into_iter().into_group_map())
1379}
1380
1381/// For the given streaming jobs, returns
1382/// - All source fragments
1383/// - All actors
1384/// - All fragments
1385pub async fn get_fragments_for_jobs<C>(
1386    db: &C,
1387    streaming_jobs: Vec<ObjectId>,
1388) -> MetaResult<(
1389    HashMap<SourceId, BTreeSet<FragmentId>>,
1390    HashSet<ActorId>,
1391    HashSet<FragmentId>,
1392)>
1393where
1394    C: ConnectionTrait,
1395{
1396    if streaming_jobs.is_empty() {
1397        return Ok((HashMap::default(), HashSet::default(), HashSet::default()));
1398    }
1399
1400    let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find()
1401        .select_only()
1402        .columns([
1403            fragment::Column::FragmentId,
1404            fragment::Column::FragmentTypeMask,
1405            fragment::Column::StreamNode,
1406        ])
1407        .filter(fragment::Column::JobId.is_in(streaming_jobs))
1408        .into_tuple()
1409        .all(db)
1410        .await?;
1411    let actors: Vec<ActorId> = Actor::find()
1412        .select_only()
1413        .column(actor::Column::ActorId)
1414        .filter(
1415            actor::Column::FragmentId.is_in(fragments.iter().map(|(id, _, _)| *id).collect_vec()),
1416        )
1417        .into_tuple()
1418        .all(db)
1419        .await?;
1420
1421    let fragment_ids = fragments
1422        .iter()
1423        .map(|(fragment_id, _, _)| *fragment_id)
1424        .collect();
1425
1426    let mut source_fragment_ids: HashMap<SourceId, BTreeSet<FragmentId>> = HashMap::new();
1427    for (fragment_id, mask, stream_node) in fragments {
1428        if mask & PbFragmentTypeFlag::Source as i32 == 0 {
1429            continue;
1430        }
1431        if let Some(source_id) = stream_node.to_protobuf().find_stream_source() {
1432            source_fragment_ids
1433                .entry(source_id as _)
1434                .or_default()
1435                .insert(fragment_id);
1436        }
1437    }
1438
1439    Ok((
1440        source_fragment_ids,
1441        actors.into_iter().collect(),
1442        fragment_ids,
1443    ))
1444}
1445
1446/// Build a object group for notifying the deletion of the given objects.
1447///
1448/// Note that only id fields are filled in the object info, as the arguments are partial objects.
1449/// As a result, the returned notification info should only be used for deletion.
1450pub(crate) fn build_object_group_for_delete(
1451    partial_objects: Vec<PartialObject>,
1452) -> NotificationInfo {
1453    let mut objects = vec![];
1454    for obj in partial_objects {
1455        match obj.obj_type {
1456            ObjectType::Database => objects.push(PbObject {
1457                object_info: Some(PbObjectInfo::Database(PbDatabase {
1458                    id: obj.oid as _,
1459                    ..Default::default()
1460                })),
1461            }),
1462            ObjectType::Schema => objects.push(PbObject {
1463                object_info: Some(PbObjectInfo::Schema(PbSchema {
1464                    id: obj.oid as _,
1465                    database_id: obj.database_id.unwrap() as _,
1466                    ..Default::default()
1467                })),
1468            }),
1469            ObjectType::Table => objects.push(PbObject {
1470                object_info: Some(PbObjectInfo::Table(PbTable {
1471                    id: obj.oid as _,
1472                    schema_id: obj.schema_id.unwrap() as _,
1473                    database_id: obj.database_id.unwrap() as _,
1474                    ..Default::default()
1475                })),
1476            }),
1477            ObjectType::Source => objects.push(PbObject {
1478                object_info: Some(PbObjectInfo::Source(PbSource {
1479                    id: obj.oid as _,
1480                    schema_id: obj.schema_id.unwrap() as _,
1481                    database_id: obj.database_id.unwrap() as _,
1482                    ..Default::default()
1483                })),
1484            }),
1485            ObjectType::Sink => objects.push(PbObject {
1486                object_info: Some(PbObjectInfo::Sink(PbSink {
1487                    id: obj.oid as _,
1488                    schema_id: obj.schema_id.unwrap() as _,
1489                    database_id: obj.database_id.unwrap() as _,
1490                    ..Default::default()
1491                })),
1492            }),
1493            ObjectType::Subscription => objects.push(PbObject {
1494                object_info: Some(PbObjectInfo::Subscription(PbSubscription {
1495                    id: obj.oid as _,
1496                    schema_id: obj.schema_id.unwrap() as _,
1497                    database_id: obj.database_id.unwrap() as _,
1498                    ..Default::default()
1499                })),
1500            }),
1501            ObjectType::View => objects.push(PbObject {
1502                object_info: Some(PbObjectInfo::View(PbView {
1503                    id: obj.oid as _,
1504                    schema_id: obj.schema_id.unwrap() as _,
1505                    database_id: obj.database_id.unwrap() as _,
1506                    ..Default::default()
1507                })),
1508            }),
1509            ObjectType::Index => {
1510                objects.push(PbObject {
1511                    object_info: Some(PbObjectInfo::Index(PbIndex {
1512                        id: obj.oid as _,
1513                        schema_id: obj.schema_id.unwrap() as _,
1514                        database_id: obj.database_id.unwrap() as _,
1515                        ..Default::default()
1516                    })),
1517                });
1518                objects.push(PbObject {
1519                    object_info: Some(PbObjectInfo::Table(PbTable {
1520                        id: obj.oid as _,
1521                        schema_id: obj.schema_id.unwrap() as _,
1522                        database_id: obj.database_id.unwrap() as _,
1523                        ..Default::default()
1524                    })),
1525                });
1526            }
1527            ObjectType::Function => objects.push(PbObject {
1528                object_info: Some(PbObjectInfo::Function(PbFunction {
1529                    id: obj.oid as _,
1530                    schema_id: obj.schema_id.unwrap() as _,
1531                    database_id: obj.database_id.unwrap() as _,
1532                    ..Default::default()
1533                })),
1534            }),
1535            ObjectType::Connection => objects.push(PbObject {
1536                object_info: Some(PbObjectInfo::Connection(PbConnection {
1537                    id: obj.oid as _,
1538                    schema_id: obj.schema_id.unwrap() as _,
1539                    database_id: obj.database_id.unwrap() as _,
1540                    ..Default::default()
1541                })),
1542            }),
1543            ObjectType::Secret => objects.push(PbObject {
1544                object_info: Some(PbObjectInfo::Secret(PbSecret {
1545                    id: obj.oid as _,
1546                    schema_id: obj.schema_id.unwrap() as _,
1547                    database_id: obj.database_id.unwrap() as _,
1548                    ..Default::default()
1549                })),
1550            }),
1551        }
1552    }
1553    NotificationInfo::ObjectGroup(PbObjectGroup { objects })
1554}
1555
1556pub fn extract_external_table_name_from_definition(table_definition: &str) -> Option<String> {
1557    let [mut definition]: [_; 1] = Parser::parse_sql(table_definition)
1558        .context("unable to parse table definition")
1559        .inspect_err(|e| {
1560            tracing::error!(
1561                target: "auto_schema_change",
1562                error = %e.as_report(),
1563                "failed to parse table definition")
1564        })
1565        .unwrap()
1566        .try_into()
1567        .unwrap();
1568    if let SqlStatement::CreateTable { cdc_table_info, .. } = &mut definition {
1569        cdc_table_info
1570            .clone()
1571            .map(|cdc_table_info| cdc_table_info.external_table_name)
1572    } else {
1573        None
1574    }
1575}
1576
1577/// `rename_relation` renames the target relation and its definition,
1578/// it commits the changes to the transaction and returns the updated relations and the old name.
1579pub async fn rename_relation(
1580    txn: &DatabaseTransaction,
1581    object_type: ObjectType,
1582    object_id: ObjectId,
1583    object_name: &str,
1584) -> MetaResult<(Vec<PbObject>, String)> {
1585    use sea_orm::ActiveModelTrait;
1586
1587    use crate::controller::rename::alter_relation_rename;
1588
1589    let mut to_update_relations = vec![];
1590    // rename relation.
1591    macro_rules! rename_relation {
1592        ($entity:ident, $table:ident, $identity:ident, $object_id:expr) => {{
1593            let (mut relation, obj) = $entity::find_by_id($object_id)
1594                .find_also_related(Object)
1595                .one(txn)
1596                .await?
1597                .unwrap();
1598            let obj = obj.unwrap();
1599            let old_name = relation.name.clone();
1600            relation.name = object_name.into();
1601            if obj.obj_type != ObjectType::View {
1602                relation.definition = alter_relation_rename(&relation.definition, object_name);
1603            }
1604            let active_model = $table::ActiveModel {
1605                $identity: Set(relation.$identity),
1606                name: Set(object_name.into()),
1607                definition: Set(relation.definition.clone()),
1608                ..Default::default()
1609            };
1610            active_model.update(txn).await?;
1611            to_update_relations.push(PbObject {
1612                object_info: Some(PbObjectInfo::$entity(ObjectModel(relation, obj).into())),
1613            });
1614            old_name
1615        }};
1616    }
1617    // TODO: check is there any thing to change for shared source?
1618    let old_name = match object_type {
1619        ObjectType::Table => rename_relation!(Table, table, table_id, object_id),
1620        ObjectType::Source => rename_relation!(Source, source, source_id, object_id),
1621        ObjectType::Sink => rename_relation!(Sink, sink, sink_id, object_id),
1622        ObjectType::Subscription => {
1623            rename_relation!(Subscription, subscription, subscription_id, object_id)
1624        }
1625        ObjectType::View => rename_relation!(View, view, view_id, object_id),
1626        ObjectType::Index => {
1627            let (mut index, obj) = Index::find_by_id(object_id)
1628                .find_also_related(Object)
1629                .one(txn)
1630                .await?
1631                .unwrap();
1632            index.name = object_name.into();
1633            let index_table_id = index.index_table_id;
1634            let old_name = rename_relation!(Table, table, table_id, index_table_id);
1635
1636            // the name of index and its associated table is the same.
1637            let active_model = index::ActiveModel {
1638                index_id: sea_orm::ActiveValue::Set(index.index_id),
1639                name: sea_orm::ActiveValue::Set(object_name.into()),
1640                ..Default::default()
1641            };
1642            active_model.update(txn).await?;
1643            to_update_relations.push(PbObject {
1644                object_info: Some(PbObjectInfo::Index(ObjectModel(index, obj.unwrap()).into())),
1645            });
1646            old_name
1647        }
1648        _ => unreachable!("only relation name can be altered."),
1649    };
1650
1651    Ok((to_update_relations, old_name))
1652}
1653
1654pub async fn get_database_resource_group<C>(txn: &C, database_id: ObjectId) -> MetaResult<String>
1655where
1656    C: ConnectionTrait,
1657{
1658    let database_resource_group: Option<String> = Database::find_by_id(database_id)
1659        .select_only()
1660        .column(database::Column::ResourceGroup)
1661        .into_tuple()
1662        .one(txn)
1663        .await?
1664        .ok_or_else(|| MetaError::catalog_id_not_found("database", database_id))?;
1665
1666    Ok(database_resource_group.unwrap_or_else(|| DEFAULT_RESOURCE_GROUP.to_owned()))
1667}
1668
1669pub async fn get_existing_job_resource_group<C>(
1670    txn: &C,
1671    streaming_job_id: ObjectId,
1672) -> MetaResult<String>
1673where
1674    C: ConnectionTrait,
1675{
1676    let (job_specific_resource_group, database_resource_group): (Option<String>, Option<String>) =
1677        StreamingJob::find_by_id(streaming_job_id)
1678            .select_only()
1679            .join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
1680            .join(JoinType::InnerJoin, object::Relation::Database2.def())
1681            .column(streaming_job::Column::SpecificResourceGroup)
1682            .column(database::Column::ResourceGroup)
1683            .into_tuple()
1684            .one(txn)
1685            .await?
1686            .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", streaming_job_id))?;
1687
1688    Ok(job_specific_resource_group.unwrap_or_else(|| {
1689        database_resource_group.unwrap_or_else(|| DEFAULT_RESOURCE_GROUP.to_owned())
1690    }))
1691}
1692
1693pub fn filter_workers_by_resource_group(
1694    workers: &HashMap<u32, WorkerNode>,
1695    resource_group: &str,
1696) -> BTreeSet<WorkerId> {
1697    workers
1698        .iter()
1699        .filter(|&(_, worker)| {
1700            worker
1701                .resource_group()
1702                .map(|node_label| node_label.as_str() == resource_group)
1703                .unwrap_or(false)
1704        })
1705        .map(|(id, _)| (*id as WorkerId))
1706        .collect()
1707}
1708
1709/// `rename_relation_refer` updates the definition of relations that refer to the target one,
1710/// it commits the changes to the transaction and returns all the updated relations.
1711pub async fn rename_relation_refer(
1712    txn: &DatabaseTransaction,
1713    object_type: ObjectType,
1714    object_id: ObjectId,
1715    object_name: &str,
1716    old_name: &str,
1717) -> MetaResult<Vec<PbObject>> {
1718    use sea_orm::ActiveModelTrait;
1719
1720    use crate::controller::rename::alter_relation_rename_refs;
1721
1722    let mut to_update_relations = vec![];
1723    macro_rules! rename_relation_ref {
1724        ($entity:ident, $table:ident, $identity:ident, $object_id:expr) => {{
1725            let (mut relation, obj) = $entity::find_by_id($object_id)
1726                .find_also_related(Object)
1727                .one(txn)
1728                .await?
1729                .unwrap();
1730            relation.definition =
1731                alter_relation_rename_refs(&relation.definition, old_name, object_name);
1732            let active_model = $table::ActiveModel {
1733                $identity: Set(relation.$identity),
1734                definition: Set(relation.definition.clone()),
1735                ..Default::default()
1736            };
1737            active_model.update(txn).await?;
1738            to_update_relations.push(PbObject {
1739                object_info: Some(PbObjectInfo::$entity(
1740                    ObjectModel(relation, obj.unwrap()).into(),
1741                )),
1742            });
1743        }};
1744    }
1745    let mut objs = get_referring_objects(object_id, txn).await?;
1746    if object_type == ObjectType::Table {
1747        let incoming_sinks: I32Array = Table::find_by_id(object_id)
1748            .select_only()
1749            .column(table::Column::IncomingSinks)
1750            .into_tuple()
1751            .one(txn)
1752            .await?
1753            .ok_or_else(|| MetaError::catalog_id_not_found("table", object_id))?;
1754
1755        objs.extend(
1756            incoming_sinks
1757                .into_inner()
1758                .into_iter()
1759                .map(|id| PartialObject {
1760                    oid: id,
1761                    obj_type: ObjectType::Sink,
1762                    schema_id: None,
1763                    database_id: None,
1764                }),
1765        );
1766    }
1767
1768    for obj in objs {
1769        match obj.obj_type {
1770            ObjectType::Table => rename_relation_ref!(Table, table, table_id, obj.oid),
1771            ObjectType::Sink => rename_relation_ref!(Sink, sink, sink_id, obj.oid),
1772            ObjectType::Subscription => {
1773                rename_relation_ref!(Subscription, subscription, subscription_id, obj.oid)
1774            }
1775            ObjectType::View => rename_relation_ref!(View, view, view_id, obj.oid),
1776            ObjectType::Index => {
1777                let index_table_id: Option<TableId> = Index::find_by_id(obj.oid)
1778                    .select_only()
1779                    .column(index::Column::IndexTableId)
1780                    .into_tuple()
1781                    .one(txn)
1782                    .await?;
1783                rename_relation_ref!(Table, table, table_id, index_table_id.unwrap());
1784            }
1785            _ => {
1786                bail!("only table, sink, subscription, view and index depend on other objects.")
1787            }
1788        }
1789    }
1790
1791    Ok(to_update_relations)
1792}
1793
1794/// Validate that subscription can be safely deleted, meeting any of the following conditions:
1795/// 1. The upstream table is not referred to by any cross-db mv.
1796/// 2. After deleting the subscription, the upstream table still has at least one subscription.
1797pub async fn validate_subscription_deletion<C>(txn: &C, subscription_id: ObjectId) -> MetaResult<()>
1798where
1799    C: ConnectionTrait,
1800{
1801    let upstream_table_id: ObjectId = Subscription::find_by_id(subscription_id)
1802        .select_only()
1803        .column(subscription::Column::DependentTableId)
1804        .into_tuple()
1805        .one(txn)
1806        .await?
1807        .ok_or_else(|| MetaError::catalog_id_not_found("subscription", subscription_id))?;
1808
1809    let cnt = Subscription::find()
1810        .filter(subscription::Column::DependentTableId.eq(upstream_table_id))
1811        .count(txn)
1812        .await?;
1813    if cnt > 1 {
1814        // Ensure that at least one subscription is remained for the upstream table
1815        // once the subscription is dropped.
1816        return Ok(());
1817    }
1818
1819    // Ensure that the upstream table is not referred by any cross-db mv.
1820    let obj_alias = Alias::new("o1");
1821    let used_by_alias = Alias::new("o2");
1822    let count = ObjectDependency::find()
1823        .join_as(
1824            JoinType::InnerJoin,
1825            object_dependency::Relation::Object2.def(),
1826            obj_alias.clone(),
1827        )
1828        .join_as(
1829            JoinType::InnerJoin,
1830            object_dependency::Relation::Object1.def(),
1831            used_by_alias.clone(),
1832        )
1833        .filter(
1834            object_dependency::Column::Oid
1835                .eq(upstream_table_id)
1836                .and(object_dependency::Column::UsedBy.ne(subscription_id))
1837                .and(
1838                    Expr::col((obj_alias, object::Column::DatabaseId))
1839                        .ne(Expr::col((used_by_alias, object::Column::DatabaseId))),
1840                ),
1841        )
1842        .count(txn)
1843        .await?;
1844
1845    if count != 0 {
1846        return Err(MetaError::permission_denied(format!(
1847            "Referenced by {} cross-db objects.",
1848            count
1849        )));
1850    }
1851
1852    Ok(())
1853}
1854
1855#[cfg(test)]
1856mod tests {
1857    use super::*;
1858
1859    #[test]
1860    fn test_extract_cdc_table_name() {
1861        let ddl1 = "CREATE TABLE t1 () FROM pg_source TABLE 'public.t1'";
1862        let ddl2 = "CREATE TABLE t2 (v1 int) FROM pg_source TABLE 'mydb.t2'";
1863        assert_eq!(
1864            extract_external_table_name_from_definition(ddl1),
1865            Some("public.t1".into())
1866        );
1867        assert_eq!(
1868            extract_external_table_name_from_definition(ddl2),
1869            Some("mydb.t2".into())
1870        );
1871    }
1872}