risingwave_meta/controller/
utils.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{BTreeSet, HashMap, HashSet};
17use std::sync::Arc;
18
19use anyhow::{Context, anyhow};
20use itertools::Itertools;
21use risingwave_common::bitmap::Bitmap;
22use risingwave_common::hash::{ActorMapping, VnodeBitmapExt, WorkerSlotId, WorkerSlotMapping};
23use risingwave_common::util::worker_util::DEFAULT_RESOURCE_GROUP;
24use risingwave_common::{bail, hash};
25use risingwave_meta_model::actor::ActorStatus;
26use risingwave_meta_model::actor_dispatcher::DispatcherType;
27use risingwave_meta_model::fragment::DistributionType;
28use risingwave_meta_model::object::ObjectType;
29use risingwave_meta_model::prelude::*;
30use risingwave_meta_model::table::TableType;
31use risingwave_meta_model::{
32    ActorId, DataTypeArray, DatabaseId, FragmentId, I32Array, ObjectId, PrivilegeId, SchemaId,
33    SourceId, StreamNode, TableId, UserId, VnodeBitmap, WorkerId, actor, connection, database,
34    fragment, fragment_relation, function, index, object, object_dependency, schema, secret, sink,
35    source, streaming_job, subscription, table, user, user_privilege, view,
36};
37use risingwave_meta_model_migration::WithQuery;
38use risingwave_pb::catalog::{
39    PbConnection, PbDatabase, PbFunction, PbIndex, PbSchema, PbSecret, PbSink, PbSource,
40    PbSubscription, PbTable, PbView,
41};
42use risingwave_pb::common::WorkerNode;
43use risingwave_pb::meta::object::PbObjectInfo;
44use risingwave_pb::meta::subscribe_response::Info as NotificationInfo;
45use risingwave_pb::meta::{
46    FragmentWorkerSlotMapping, PbFragmentWorkerSlotMapping, PbObject, PbObjectGroup,
47};
48use risingwave_pb::stream_plan::{PbDispatcher, PbDispatcherType, PbFragmentTypeFlag};
49use risingwave_pb::user::grant_privilege::{
50    PbAction, PbActionWithGrantOption, PbObject as PbGrantObject,
51};
52use risingwave_pb::user::{PbGrantPrivilege, PbUserInfo};
53use risingwave_sqlparser::ast::Statement as SqlStatement;
54use risingwave_sqlparser::parser::Parser;
55use sea_orm::sea_query::{
56    Alias, CommonTableExpression, Expr, Query, QueryStatementBuilder, SelectStatement, UnionType,
57    WithClause,
58};
59use sea_orm::{
60    ColumnTrait, ConnectionTrait, DatabaseTransaction, DerivePartialModel, EntityTrait,
61    FromQueryResult, IntoActiveModel, JoinType, Order, PaginatorTrait, QueryFilter, QuerySelect,
62    RelationTrait, Set, Statement,
63};
64use thiserror_ext::AsReport;
65
66use crate::controller::ObjectModel;
67use crate::model::{FragmentActorDispatchers, FragmentDownstreamRelation};
68use crate::{MetaError, MetaResult};
69
70/// This function will construct a query using recursive cte to find all objects[(id, `obj_type`)] that are used by the given object.
71///
72/// # Examples
73///
74/// ```
75/// use risingwave_meta::controller::utils::construct_obj_dependency_query;
76/// use sea_orm::sea_query::*;
77/// use sea_orm::*;
78///
79/// let query = construct_obj_dependency_query(1);
80///
81/// assert_eq!(
82///     query.to_string(MysqlQueryBuilder),
83///     r#"WITH RECURSIVE `used_by_object_ids` (`used_by`) AS (SELECT `used_by` FROM `object_dependency` WHERE `object_dependency`.`oid` = 1 UNION ALL (SELECT `oid` FROM `object` WHERE `object`.`database_id` = 1 OR `object`.`schema_id` = 1) UNION ALL (SELECT `object_dependency`.`used_by` FROM `object_dependency` INNER JOIN `used_by_object_ids` ON `used_by_object_ids`.`used_by` = `oid`)) SELECT DISTINCT `oid`, `obj_type`, `schema_id`, `database_id` FROM `used_by_object_ids` INNER JOIN `object` ON `used_by_object_ids`.`used_by` = `oid` ORDER BY `oid` DESC"#
84/// );
85/// assert_eq!(
86///     query.to_string(PostgresQueryBuilder),
87///     r#"WITH RECURSIVE "used_by_object_ids" ("used_by") AS (SELECT "used_by" FROM "object_dependency" WHERE "object_dependency"."oid" = 1 UNION ALL (SELECT "oid" FROM "object" WHERE "object"."database_id" = 1 OR "object"."schema_id" = 1) UNION ALL (SELECT "object_dependency"."used_by" FROM "object_dependency" INNER JOIN "used_by_object_ids" ON "used_by_object_ids"."used_by" = "oid")) SELECT DISTINCT "oid", "obj_type", "schema_id", "database_id" FROM "used_by_object_ids" INNER JOIN "object" ON "used_by_object_ids"."used_by" = "oid" ORDER BY "oid" DESC"#
88/// );
89/// assert_eq!(
90///     query.to_string(SqliteQueryBuilder),
91///     r#"WITH RECURSIVE "used_by_object_ids" ("used_by") AS (SELECT "used_by" FROM "object_dependency" WHERE "object_dependency"."oid" = 1 UNION ALL SELECT "oid" FROM "object" WHERE "object"."database_id" = 1 OR "object"."schema_id" = 1 UNION ALL SELECT "object_dependency"."used_by" FROM "object_dependency" INNER JOIN "used_by_object_ids" ON "used_by_object_ids"."used_by" = "oid") SELECT DISTINCT "oid", "obj_type", "schema_id", "database_id" FROM "used_by_object_ids" INNER JOIN "object" ON "used_by_object_ids"."used_by" = "oid" ORDER BY "oid" DESC"#
92/// );
93/// ```
94pub fn construct_obj_dependency_query(obj_id: ObjectId) -> WithQuery {
95    let cte_alias = Alias::new("used_by_object_ids");
96    let cte_return_alias = Alias::new("used_by");
97
98    let mut base_query = SelectStatement::new()
99        .column(object_dependency::Column::UsedBy)
100        .from(ObjectDependency)
101        .and_where(object_dependency::Column::Oid.eq(obj_id))
102        .to_owned();
103
104    let belonged_obj_query = SelectStatement::new()
105        .column(object::Column::Oid)
106        .from(Object)
107        .and_where(
108            object::Column::DatabaseId
109                .eq(obj_id)
110                .or(object::Column::SchemaId.eq(obj_id)),
111        )
112        .to_owned();
113
114    let cte_referencing = Query::select()
115        .column((ObjectDependency, object_dependency::Column::UsedBy))
116        .from(ObjectDependency)
117        .inner_join(
118            cte_alias.clone(),
119            Expr::col((cte_alias.clone(), cte_return_alias.clone()))
120                .equals(object_dependency::Column::Oid),
121        )
122        .to_owned();
123
124    let common_table_expr = CommonTableExpression::new()
125        .query(
126            base_query
127                .union(UnionType::All, belonged_obj_query)
128                .union(UnionType::All, cte_referencing)
129                .to_owned(),
130        )
131        .column(cte_return_alias.clone())
132        .table_name(cte_alias.clone())
133        .to_owned();
134
135    SelectStatement::new()
136        .distinct()
137        .columns([
138            object::Column::Oid,
139            object::Column::ObjType,
140            object::Column::SchemaId,
141            object::Column::DatabaseId,
142        ])
143        .from(cte_alias.clone())
144        .inner_join(
145            Object,
146            Expr::col((cte_alias, cte_return_alias.clone())).equals(object::Column::Oid),
147        )
148        .order_by(object::Column::Oid, Order::Desc)
149        .to_owned()
150        .with(
151            WithClause::new()
152                .recursive(true)
153                .cte(common_table_expr)
154                .to_owned(),
155        )
156        .to_owned()
157}
158
159/// This function will construct a query using recursive cte to find if dependent objects are already relying on the target table.
160///
161/// # Examples
162///
163/// ```
164/// use risingwave_meta::controller::utils::construct_sink_cycle_check_query;
165/// use sea_orm::sea_query::*;
166/// use sea_orm::*;
167///
168/// let query = construct_sink_cycle_check_query(1, vec![2, 3]);
169///
170/// assert_eq!(
171///     query.to_string(MysqlQueryBuilder),
172///     r#"WITH RECURSIVE `used_by_object_ids_with_sink` (`oid`, `used_by`) AS (SELECT `oid`, `used_by` FROM `object_dependency` WHERE `object_dependency`.`oid` = 1 UNION ALL (SELECT `obj_dependency_with_sink`.`oid`, `obj_dependency_with_sink`.`used_by` FROM (SELECT `oid`, `used_by` FROM `object_dependency` UNION ALL (SELECT `sink_id`, `target_table` FROM `sink` WHERE `sink`.`target_table` IS NOT NULL)) AS `obj_dependency_with_sink` INNER JOIN `used_by_object_ids_with_sink` ON `used_by_object_ids_with_sink`.`used_by` = `obj_dependency_with_sink`.`oid` WHERE `used_by_object_ids_with_sink`.`used_by` <> `used_by_object_ids_with_sink`.`oid`)) SELECT COUNT(`used_by_object_ids_with_sink`.`used_by`) FROM `used_by_object_ids_with_sink` WHERE `used_by_object_ids_with_sink`.`used_by` IN (2, 3)"#
173/// );
174/// assert_eq!(
175///     query.to_string(PostgresQueryBuilder),
176///     r#"WITH RECURSIVE "used_by_object_ids_with_sink" ("oid", "used_by") AS (SELECT "oid", "used_by" FROM "object_dependency" WHERE "object_dependency"."oid" = 1 UNION ALL (SELECT "obj_dependency_with_sink"."oid", "obj_dependency_with_sink"."used_by" FROM (SELECT "oid", "used_by" FROM "object_dependency" UNION ALL (SELECT "sink_id", "target_table" FROM "sink" WHERE "sink"."target_table" IS NOT NULL)) AS "obj_dependency_with_sink" INNER JOIN "used_by_object_ids_with_sink" ON "used_by_object_ids_with_sink"."used_by" = "obj_dependency_with_sink"."oid" WHERE "used_by_object_ids_with_sink"."used_by" <> "used_by_object_ids_with_sink"."oid")) SELECT COUNT("used_by_object_ids_with_sink"."used_by") FROM "used_by_object_ids_with_sink" WHERE "used_by_object_ids_with_sink"."used_by" IN (2, 3)"#
177/// );
178/// assert_eq!(
179///     query.to_string(SqliteQueryBuilder),
180///     r#"WITH RECURSIVE "used_by_object_ids_with_sink" ("oid", "used_by") AS (SELECT "oid", "used_by" FROM "object_dependency" WHERE "object_dependency"."oid" = 1 UNION ALL SELECT "obj_dependency_with_sink"."oid", "obj_dependency_with_sink"."used_by" FROM (SELECT "oid", "used_by" FROM "object_dependency" UNION ALL SELECT "sink_id", "target_table" FROM "sink" WHERE "sink"."target_table" IS NOT NULL) AS "obj_dependency_with_sink" INNER JOIN "used_by_object_ids_with_sink" ON "used_by_object_ids_with_sink"."used_by" = "obj_dependency_with_sink"."oid" WHERE "used_by_object_ids_with_sink"."used_by" <> "used_by_object_ids_with_sink"."oid") SELECT COUNT("used_by_object_ids_with_sink"."used_by") FROM "used_by_object_ids_with_sink" WHERE "used_by_object_ids_with_sink"."used_by" IN (2, 3)"#
181/// );
182/// ```
183pub fn construct_sink_cycle_check_query(
184    target_table: ObjectId,
185    dependent_objects: Vec<ObjectId>,
186) -> WithQuery {
187    let cte_alias = Alias::new("used_by_object_ids_with_sink");
188    let depend_alias = Alias::new("obj_dependency_with_sink");
189
190    let mut base_query = SelectStatement::new()
191        .columns([
192            object_dependency::Column::Oid,
193            object_dependency::Column::UsedBy,
194        ])
195        .from(ObjectDependency)
196        .and_where(object_dependency::Column::Oid.eq(target_table))
197        .to_owned();
198
199    let query_sink_deps = SelectStatement::new()
200        .columns([sink::Column::SinkId, sink::Column::TargetTable])
201        .from(Sink)
202        .and_where(sink::Column::TargetTable.is_not_null())
203        .to_owned();
204
205    let cte_referencing = Query::select()
206        .column((depend_alias.clone(), object_dependency::Column::Oid))
207        .column((depend_alias.clone(), object_dependency::Column::UsedBy))
208        .from_subquery(
209            SelectStatement::new()
210                .columns([
211                    object_dependency::Column::Oid,
212                    object_dependency::Column::UsedBy,
213                ])
214                .from(ObjectDependency)
215                .union(UnionType::All, query_sink_deps)
216                .to_owned(),
217            depend_alias.clone(),
218        )
219        .inner_join(
220            cte_alias.clone(),
221            Expr::col((cte_alias.clone(), object_dependency::Column::UsedBy)).eq(Expr::col((
222                depend_alias.clone(),
223                object_dependency::Column::Oid,
224            ))),
225        )
226        .and_where(
227            Expr::col((cte_alias.clone(), object_dependency::Column::UsedBy)).ne(Expr::col((
228                cte_alias.clone(),
229                object_dependency::Column::Oid,
230            ))),
231        )
232        .to_owned();
233
234    let common_table_expr = CommonTableExpression::new()
235        .query(base_query.union(UnionType::All, cte_referencing).to_owned())
236        .columns([
237            object_dependency::Column::Oid,
238            object_dependency::Column::UsedBy,
239        ])
240        .table_name(cte_alias.clone())
241        .to_owned();
242
243    SelectStatement::new()
244        .expr(Expr::col((cte_alias.clone(), object_dependency::Column::UsedBy)).count())
245        .from(cte_alias.clone())
246        .and_where(
247            Expr::col((cte_alias.clone(), object_dependency::Column::UsedBy))
248                .is_in(dependent_objects),
249        )
250        .to_owned()
251        .with(
252            WithClause::new()
253                .recursive(true)
254                .cte(common_table_expr)
255                .to_owned(),
256        )
257        .to_owned()
258}
259
260#[derive(Clone, DerivePartialModel, FromQueryResult, Debug)]
261#[sea_orm(entity = "Object")]
262pub struct PartialObject {
263    pub oid: ObjectId,
264    pub obj_type: ObjectType,
265    pub schema_id: Option<SchemaId>,
266    pub database_id: Option<DatabaseId>,
267}
268
269#[derive(Clone, DerivePartialModel, FromQueryResult)]
270#[sea_orm(entity = "Fragment")]
271pub struct PartialFragmentStateTables {
272    pub fragment_id: FragmentId,
273    pub job_id: ObjectId,
274    pub state_table_ids: I32Array,
275}
276
277#[derive(Clone, DerivePartialModel, FromQueryResult)]
278#[sea_orm(entity = "Actor")]
279pub struct PartialActorLocation {
280    pub actor_id: ActorId,
281    pub fragment_id: FragmentId,
282    pub worker_id: WorkerId,
283    pub status: ActorStatus,
284}
285
286#[derive(FromQueryResult)]
287pub struct FragmentDesc {
288    pub fragment_id: FragmentId,
289    pub job_id: ObjectId,
290    pub fragment_type_mask: i32,
291    pub distribution_type: DistributionType,
292    pub state_table_ids: I32Array,
293    pub parallelism: i64,
294    pub vnode_count: i32,
295    pub stream_node: StreamNode,
296}
297
298/// List all objects that are using the given one in a cascade way. It runs a recursive CTE to find all the dependencies.
299pub async fn get_referring_objects_cascade<C>(
300    obj_id: ObjectId,
301    db: &C,
302) -> MetaResult<Vec<PartialObject>>
303where
304    C: ConnectionTrait,
305{
306    let query = construct_obj_dependency_query(obj_id);
307    let (sql, values) = query.build_any(&*db.get_database_backend().get_query_builder());
308    let objects = PartialObject::find_by_statement(Statement::from_sql_and_values(
309        db.get_database_backend(),
310        sql,
311        values,
312    ))
313    .all(db)
314    .await?;
315    Ok(objects)
316}
317
318/// Check if create a sink with given dependent objects into the target table will cause a cycle, return true if it will.
319pub async fn check_sink_into_table_cycle<C>(
320    target_table: ObjectId,
321    dependent_objs: Vec<ObjectId>,
322    db: &C,
323) -> MetaResult<bool>
324where
325    C: ConnectionTrait,
326{
327    if dependent_objs.is_empty() {
328        return Ok(false);
329    }
330
331    // special check for self referencing
332    if dependent_objs.contains(&target_table) {
333        return Ok(true);
334    }
335
336    let query = construct_sink_cycle_check_query(target_table, dependent_objs);
337    let (sql, values) = query.build_any(&*db.get_database_backend().get_query_builder());
338
339    let res = db
340        .query_one(Statement::from_sql_and_values(
341            db.get_database_backend(),
342            sql,
343            values,
344        ))
345        .await?
346        .unwrap();
347
348    let cnt: i64 = res.try_get_by(0)?;
349
350    Ok(cnt != 0)
351}
352
353/// `ensure_object_id` ensures the existence of target object in the cluster.
354pub async fn ensure_object_id<C>(
355    object_type: ObjectType,
356    obj_id: ObjectId,
357    db: &C,
358) -> MetaResult<()>
359where
360    C: ConnectionTrait,
361{
362    let count = Object::find_by_id(obj_id).count(db).await?;
363    if count == 0 {
364        return Err(MetaError::catalog_id_not_found(
365            object_type.as_str(),
366            obj_id,
367        ));
368    }
369    Ok(())
370}
371
372/// `ensure_user_id` ensures the existence of target user in the cluster.
373pub async fn ensure_user_id<C>(user_id: UserId, db: &C) -> MetaResult<()>
374where
375    C: ConnectionTrait,
376{
377    let count = User::find_by_id(user_id).count(db).await?;
378    if count == 0 {
379        return Err(anyhow!("user {} was concurrently dropped", user_id).into());
380    }
381    Ok(())
382}
383
384/// `check_database_name_duplicate` checks whether the database name is already used in the cluster.
385pub async fn check_database_name_duplicate<C>(name: &str, db: &C) -> MetaResult<()>
386where
387    C: ConnectionTrait,
388{
389    let count = Database::find()
390        .filter(database::Column::Name.eq(name))
391        .count(db)
392        .await?;
393    if count > 0 {
394        assert_eq!(count, 1);
395        return Err(MetaError::catalog_duplicated("database", name));
396    }
397    Ok(())
398}
399
400/// `check_function_signature_duplicate` checks whether the function name and its signature is already used in the target namespace.
401pub async fn check_function_signature_duplicate<C>(
402    pb_function: &PbFunction,
403    db: &C,
404) -> MetaResult<()>
405where
406    C: ConnectionTrait,
407{
408    let count = Function::find()
409        .inner_join(Object)
410        .filter(
411            object::Column::DatabaseId
412                .eq(pb_function.database_id as DatabaseId)
413                .and(object::Column::SchemaId.eq(pb_function.schema_id as SchemaId))
414                .and(function::Column::Name.eq(&pb_function.name))
415                .and(
416                    function::Column::ArgTypes
417                        .eq(DataTypeArray::from(pb_function.arg_types.clone())),
418                ),
419        )
420        .count(db)
421        .await?;
422    if count > 0 {
423        assert_eq!(count, 1);
424        return Err(MetaError::catalog_duplicated("function", &pb_function.name));
425    }
426    Ok(())
427}
428
429/// `check_connection_name_duplicate` checks whether the connection name is already used in the target namespace.
430pub async fn check_connection_name_duplicate<C>(
431    pb_connection: &PbConnection,
432    db: &C,
433) -> MetaResult<()>
434where
435    C: ConnectionTrait,
436{
437    let count = Connection::find()
438        .inner_join(Object)
439        .filter(
440            object::Column::DatabaseId
441                .eq(pb_connection.database_id as DatabaseId)
442                .and(object::Column::SchemaId.eq(pb_connection.schema_id as SchemaId))
443                .and(connection::Column::Name.eq(&pb_connection.name)),
444        )
445        .count(db)
446        .await?;
447    if count > 0 {
448        assert_eq!(count, 1);
449        return Err(MetaError::catalog_duplicated(
450            "connection",
451            &pb_connection.name,
452        ));
453    }
454    Ok(())
455}
456
457pub async fn check_secret_name_duplicate<C>(pb_secret: &PbSecret, db: &C) -> MetaResult<()>
458where
459    C: ConnectionTrait,
460{
461    let count = Secret::find()
462        .inner_join(Object)
463        .filter(
464            object::Column::DatabaseId
465                .eq(pb_secret.database_id as DatabaseId)
466                .and(object::Column::SchemaId.eq(pb_secret.schema_id as SchemaId))
467                .and(secret::Column::Name.eq(&pb_secret.name)),
468        )
469        .count(db)
470        .await?;
471    if count > 0 {
472        assert_eq!(count, 1);
473        return Err(MetaError::catalog_duplicated("secret", &pb_secret.name));
474    }
475    Ok(())
476}
477
478pub async fn check_subscription_name_duplicate<C>(
479    pb_subscription: &PbSubscription,
480    db: &C,
481) -> MetaResult<()>
482where
483    C: ConnectionTrait,
484{
485    let count = Subscription::find()
486        .inner_join(Object)
487        .filter(
488            object::Column::DatabaseId
489                .eq(pb_subscription.database_id as DatabaseId)
490                .and(object::Column::SchemaId.eq(pb_subscription.schema_id as SchemaId))
491                .and(subscription::Column::Name.eq(&pb_subscription.name)),
492        )
493        .count(db)
494        .await?;
495    if count > 0 {
496        assert_eq!(count, 1);
497        return Err(MetaError::catalog_duplicated(
498            "subscription",
499            &pb_subscription.name,
500        ));
501    }
502    Ok(())
503}
504
505/// `check_user_name_duplicate` checks whether the user is already existed in the cluster.
506pub async fn check_user_name_duplicate<C>(name: &str, db: &C) -> MetaResult<()>
507where
508    C: ConnectionTrait,
509{
510    let count = User::find()
511        .filter(user::Column::Name.eq(name))
512        .count(db)
513        .await?;
514    if count > 0 {
515        assert_eq!(count, 1);
516        return Err(MetaError::catalog_duplicated("user", name));
517    }
518    Ok(())
519}
520
521/// `check_relation_name_duplicate` checks whether the relation name is already used in the target namespace.
522pub async fn check_relation_name_duplicate<C>(
523    name: &str,
524    database_id: DatabaseId,
525    schema_id: SchemaId,
526    db: &C,
527) -> MetaResult<()>
528where
529    C: ConnectionTrait,
530{
531    macro_rules! check_duplicated {
532        ($obj_type:expr, $entity:ident, $table:ident) => {
533            let count = Object::find()
534                .inner_join($entity)
535                .filter(
536                    object::Column::DatabaseId
537                        .eq(Some(database_id))
538                        .and(object::Column::SchemaId.eq(Some(schema_id)))
539                        .and($table::Column::Name.eq(name)),
540                )
541                .count(db)
542                .await?;
543            if count != 0 {
544                return Err(MetaError::catalog_duplicated($obj_type.as_str(), name));
545            }
546        };
547    }
548    check_duplicated!(ObjectType::Table, Table, table);
549    check_duplicated!(ObjectType::Source, Source, source);
550    check_duplicated!(ObjectType::Sink, Sink, sink);
551    check_duplicated!(ObjectType::Index, Index, index);
552    check_duplicated!(ObjectType::View, View, view);
553
554    Ok(())
555}
556
557/// `check_schema_name_duplicate` checks whether the schema name is already used in the target database.
558pub async fn check_schema_name_duplicate<C>(
559    name: &str,
560    database_id: DatabaseId,
561    db: &C,
562) -> MetaResult<()>
563where
564    C: ConnectionTrait,
565{
566    let count = Object::find()
567        .inner_join(Schema)
568        .filter(
569            object::Column::ObjType
570                .eq(ObjectType::Schema)
571                .and(object::Column::DatabaseId.eq(Some(database_id)))
572                .and(schema::Column::Name.eq(name)),
573        )
574        .count(db)
575        .await?;
576    if count != 0 {
577        return Err(MetaError::catalog_duplicated("schema", name));
578    }
579
580    Ok(())
581}
582
583/// `ensure_object_not_refer` ensures that object is not used by any other ones except indexes.
584pub async fn ensure_object_not_refer<C>(
585    object_type: ObjectType,
586    object_id: ObjectId,
587    db: &C,
588) -> MetaResult<()>
589where
590    C: ConnectionTrait,
591{
592    // Ignore indexes.
593    let count = if object_type == ObjectType::Table {
594        ObjectDependency::find()
595            .join(
596                JoinType::InnerJoin,
597                object_dependency::Relation::Object1.def(),
598            )
599            .filter(
600                object_dependency::Column::Oid
601                    .eq(object_id)
602                    .and(object::Column::ObjType.ne(ObjectType::Index)),
603            )
604            .count(db)
605            .await?
606    } else {
607        ObjectDependency::find()
608            .filter(object_dependency::Column::Oid.eq(object_id))
609            .count(db)
610            .await?
611    };
612    if count != 0 {
613        return Err(MetaError::permission_denied(format!(
614            "{} used by {} other objects.",
615            object_type.as_str(),
616            count
617        )));
618    }
619    Ok(())
620}
621
622/// List all objects that are using the given one.
623pub async fn get_referring_objects<C>(object_id: ObjectId, db: &C) -> MetaResult<Vec<PartialObject>>
624where
625    C: ConnectionTrait,
626{
627    let objs = ObjectDependency::find()
628        .filter(object_dependency::Column::Oid.eq(object_id))
629        .join(
630            JoinType::InnerJoin,
631            object_dependency::Relation::Object1.def(),
632        )
633        .into_partial_model()
634        .all(db)
635        .await?;
636
637    Ok(objs)
638}
639
640/// `ensure_schema_empty` ensures that the schema is empty, used by `DROP SCHEMA`.
641pub async fn ensure_schema_empty<C>(schema_id: SchemaId, db: &C) -> MetaResult<()>
642where
643    C: ConnectionTrait,
644{
645    let count = Object::find()
646        .filter(object::Column::SchemaId.eq(Some(schema_id)))
647        .count(db)
648        .await?;
649    if count != 0 {
650        return Err(MetaError::permission_denied("schema is not empty"));
651    }
652
653    Ok(())
654}
655
656/// `list_user_info_by_ids` lists all users' info by their ids.
657pub async fn list_user_info_by_ids<C>(user_ids: Vec<UserId>, db: &C) -> MetaResult<Vec<PbUserInfo>>
658where
659    C: ConnectionTrait,
660{
661    let mut user_infos = vec![];
662    for user_id in user_ids {
663        let user = User::find_by_id(user_id)
664            .one(db)
665            .await?
666            .ok_or_else(|| MetaError::catalog_id_not_found("user", user_id))?;
667        let mut user_info: PbUserInfo = user.into();
668        user_info.grant_privileges = get_user_privilege(user_id, db).await?;
669        user_infos.push(user_info);
670    }
671    Ok(user_infos)
672}
673
674/// `get_object_owner` returns the owner of the given object.
675pub async fn get_object_owner<C>(object_id: ObjectId, db: &C) -> MetaResult<UserId>
676where
677    C: ConnectionTrait,
678{
679    let obj_owner: UserId = Object::find_by_id(object_id)
680        .select_only()
681        .column(object::Column::OwnerId)
682        .into_tuple()
683        .one(db)
684        .await?
685        .ok_or_else(|| MetaError::catalog_id_not_found("object", object_id))?;
686    Ok(obj_owner)
687}
688
689/// `construct_privilege_dependency_query` constructs a query to find all privileges that are dependent on the given one.
690///
691/// # Examples
692///
693/// ```
694/// use risingwave_meta::controller::utils::construct_privilege_dependency_query;
695/// use sea_orm::sea_query::*;
696/// use sea_orm::*;
697///
698/// let query = construct_privilege_dependency_query(vec![1, 2, 3]);
699///
700/// assert_eq!(
701///    query.to_string(MysqlQueryBuilder),
702///   r#"WITH RECURSIVE `granted_privilege_ids` (`id`, `user_id`) AS (SELECT `id`, `user_id` FROM `user_privilege` WHERE `user_privilege`.`id` IN (1, 2, 3) UNION ALL (SELECT `user_privilege`.`id`, `user_privilege`.`user_id` FROM `user_privilege` INNER JOIN `granted_privilege_ids` ON `granted_privilege_ids`.`id` = `dependent_id`)) SELECT `id`, `user_id` FROM `granted_privilege_ids`"#
703/// );
704/// assert_eq!(
705///   query.to_string(PostgresQueryBuilder),
706///  r#"WITH RECURSIVE "granted_privilege_ids" ("id", "user_id") AS (SELECT "id", "user_id" FROM "user_privilege" WHERE "user_privilege"."id" IN (1, 2, 3) UNION ALL (SELECT "user_privilege"."id", "user_privilege"."user_id" FROM "user_privilege" INNER JOIN "granted_privilege_ids" ON "granted_privilege_ids"."id" = "dependent_id")) SELECT "id", "user_id" FROM "granted_privilege_ids""#
707/// );
708/// assert_eq!(
709///  query.to_string(SqliteQueryBuilder),
710///  r#"WITH RECURSIVE "granted_privilege_ids" ("id", "user_id") AS (SELECT "id", "user_id" FROM "user_privilege" WHERE "user_privilege"."id" IN (1, 2, 3) UNION ALL SELECT "user_privilege"."id", "user_privilege"."user_id" FROM "user_privilege" INNER JOIN "granted_privilege_ids" ON "granted_privilege_ids"."id" = "dependent_id") SELECT "id", "user_id" FROM "granted_privilege_ids""#
711/// );
712/// ```
713pub fn construct_privilege_dependency_query(ids: Vec<PrivilegeId>) -> WithQuery {
714    let cte_alias = Alias::new("granted_privilege_ids");
715    let cte_return_privilege_alias = Alias::new("id");
716    let cte_return_user_alias = Alias::new("user_id");
717
718    let mut base_query = SelectStatement::new()
719        .columns([user_privilege::Column::Id, user_privilege::Column::UserId])
720        .from(UserPrivilege)
721        .and_where(user_privilege::Column::Id.is_in(ids))
722        .to_owned();
723
724    let cte_referencing = Query::select()
725        .columns([
726            (UserPrivilege, user_privilege::Column::Id),
727            (UserPrivilege, user_privilege::Column::UserId),
728        ])
729        .from(UserPrivilege)
730        .inner_join(
731            cte_alias.clone(),
732            Expr::col((cte_alias.clone(), cte_return_privilege_alias.clone()))
733                .equals(user_privilege::Column::DependentId),
734        )
735        .to_owned();
736
737    let common_table_expr = CommonTableExpression::new()
738        .query(base_query.union(UnionType::All, cte_referencing).to_owned())
739        .columns([
740            cte_return_privilege_alias.clone(),
741            cte_return_user_alias.clone(),
742        ])
743        .table_name(cte_alias.clone())
744        .to_owned();
745
746    SelectStatement::new()
747        .columns([cte_return_privilege_alias, cte_return_user_alias])
748        .from(cte_alias.clone())
749        .to_owned()
750        .with(
751            WithClause::new()
752                .recursive(true)
753                .cte(common_table_expr)
754                .to_owned(),
755        )
756        .to_owned()
757}
758
759pub async fn get_internal_tables_by_id<C>(job_id: ObjectId, db: &C) -> MetaResult<Vec<TableId>>
760where
761    C: ConnectionTrait,
762{
763    let table_ids: Vec<TableId> = Table::find()
764        .select_only()
765        .column(table::Column::TableId)
766        .filter(
767            table::Column::TableType
768                .eq(TableType::Internal)
769                .and(table::Column::BelongsToJobId.eq(job_id)),
770        )
771        .into_tuple()
772        .all(db)
773        .await?;
774    Ok(table_ids)
775}
776
777pub async fn get_index_state_tables_by_table_id<C>(
778    table_id: TableId,
779    db: &C,
780) -> MetaResult<Vec<TableId>>
781where
782    C: ConnectionTrait,
783{
784    let mut index_table_ids: Vec<TableId> = Index::find()
785        .select_only()
786        .column(index::Column::IndexTableId)
787        .filter(index::Column::PrimaryTableId.eq(table_id))
788        .into_tuple()
789        .all(db)
790        .await?;
791
792    if !index_table_ids.is_empty() {
793        let internal_table_ids: Vec<TableId> = Table::find()
794            .select_only()
795            .column(table::Column::TableId)
796            .filter(
797                table::Column::TableType
798                    .eq(TableType::Internal)
799                    .and(table::Column::BelongsToJobId.is_in(index_table_ids.clone())),
800            )
801            .into_tuple()
802            .all(db)
803            .await?;
804
805        index_table_ids.extend(internal_table_ids.into_iter());
806    }
807
808    Ok(index_table_ids)
809}
810
811#[derive(Clone, DerivePartialModel, FromQueryResult)]
812#[sea_orm(entity = "UserPrivilege")]
813pub struct PartialUserPrivilege {
814    pub id: PrivilegeId,
815    pub user_id: UserId,
816}
817
818pub async fn get_referring_privileges_cascade<C>(
819    ids: Vec<PrivilegeId>,
820    db: &C,
821) -> MetaResult<Vec<PartialUserPrivilege>>
822where
823    C: ConnectionTrait,
824{
825    let query = construct_privilege_dependency_query(ids);
826    let (sql, values) = query.build_any(&*db.get_database_backend().get_query_builder());
827    let privileges = PartialUserPrivilege::find_by_statement(Statement::from_sql_and_values(
828        db.get_database_backend(),
829        sql,
830        values,
831    ))
832    .all(db)
833    .await?;
834
835    Ok(privileges)
836}
837
838/// `ensure_privileges_not_referred` ensures that the privileges are not granted to any other users.
839pub async fn ensure_privileges_not_referred<C>(ids: Vec<PrivilegeId>, db: &C) -> MetaResult<()>
840where
841    C: ConnectionTrait,
842{
843    let count = UserPrivilege::find()
844        .filter(user_privilege::Column::DependentId.is_in(ids))
845        .count(db)
846        .await?;
847    if count != 0 {
848        return Err(MetaError::permission_denied(format!(
849            "privileges granted to {} other ones.",
850            count
851        )));
852    }
853    Ok(())
854}
855
856/// `get_user_privilege` returns the privileges of the given user.
857pub async fn get_user_privilege<C>(user_id: UserId, db: &C) -> MetaResult<Vec<PbGrantPrivilege>>
858where
859    C: ConnectionTrait,
860{
861    let user_privileges = UserPrivilege::find()
862        .find_also_related(Object)
863        .filter(user_privilege::Column::UserId.eq(user_id))
864        .all(db)
865        .await?;
866    Ok(user_privileges
867        .into_iter()
868        .map(|(privilege, object)| {
869            let object = object.unwrap();
870            let oid = object.oid as _;
871            let obj = match object.obj_type {
872                ObjectType::Database => PbGrantObject::DatabaseId(oid),
873                ObjectType::Schema => PbGrantObject::SchemaId(oid),
874                ObjectType::Table | ObjectType::Index => PbGrantObject::TableId(oid),
875                ObjectType::Source => PbGrantObject::SourceId(oid),
876                ObjectType::Sink => PbGrantObject::SinkId(oid),
877                ObjectType::View => PbGrantObject::ViewId(oid),
878                ObjectType::Function => PbGrantObject::FunctionId(oid),
879                ObjectType::Connection => unreachable!("connection is not supported yet"),
880                ObjectType::Subscription => PbGrantObject::SubscriptionId(oid),
881                ObjectType::Secret => unreachable!("secret is not supported yet"),
882            };
883            PbGrantPrivilege {
884                action_with_opts: vec![PbActionWithGrantOption {
885                    action: PbAction::from(privilege.action) as _,
886                    with_grant_option: privilege.with_grant_option,
887                    granted_by: privilege.granted_by as _,
888                }],
889                object: Some(obj),
890            }
891        })
892        .collect())
893}
894
895// todo: remove it after migrated to sql backend.
896pub fn extract_grant_obj_id(object: &PbGrantObject) -> ObjectId {
897    match object {
898        PbGrantObject::DatabaseId(id)
899        | PbGrantObject::SchemaId(id)
900        | PbGrantObject::TableId(id)
901        | PbGrantObject::SourceId(id)
902        | PbGrantObject::SinkId(id)
903        | PbGrantObject::ViewId(id)
904        | PbGrantObject::FunctionId(id)
905        | PbGrantObject::SubscriptionId(id)
906        | PbGrantObject::ConnectionId(id)
907        | PbGrantObject::SecretId(id) => *id as _,
908    }
909}
910
911pub async fn insert_fragment_relations(
912    db: &impl ConnectionTrait,
913    downstream_fragment_relations: &FragmentDownstreamRelation,
914) -> MetaResult<()> {
915    for (upstream_fragment_id, downstreams) in downstream_fragment_relations {
916        for downstream in downstreams {
917            let relation = fragment_relation::Model {
918                source_fragment_id: *upstream_fragment_id as _,
919                target_fragment_id: downstream.downstream_fragment_id as _,
920                dispatcher_type: downstream.dispatcher_type,
921                dist_key_indices: downstream
922                    .dist_key_indices
923                    .iter()
924                    .map(|idx| *idx as i32)
925                    .collect_vec()
926                    .into(),
927                output_indices: downstream
928                    .output_indices
929                    .iter()
930                    .map(|idx| *idx as i32)
931                    .collect_vec()
932                    .into(),
933            };
934            FragmentRelation::insert(relation.into_active_model())
935                .exec(db)
936                .await?;
937        }
938    }
939    Ok(())
940}
941
942pub async fn get_fragment_actor_dispatchers<C>(
943    db: &C,
944    fragment_ids: Vec<FragmentId>,
945) -> MetaResult<FragmentActorDispatchers>
946where
947    C: ConnectionTrait,
948{
949    type FragmentActorInfo = (
950        DistributionType,
951        Arc<HashMap<crate::model::ActorId, Option<Bitmap>>>,
952    );
953    let mut fragment_actor_cache: HashMap<FragmentId, FragmentActorInfo> = HashMap::new();
954    let get_fragment_actors = |fragment_id: FragmentId| async move {
955        let result: MetaResult<FragmentActorInfo> = try {
956            let mut fragment_actors = Fragment::find_by_id(fragment_id)
957                .find_with_related(Actor)
958                .filter(actor::Column::Status.eq(ActorStatus::Running))
959                .all(db)
960                .await?;
961            if fragment_actors.is_empty() {
962                return Err(anyhow!("failed to find fragment: {}", fragment_id).into());
963            }
964            assert_eq!(
965                fragment_actors.len(),
966                1,
967                "find multiple fragment {:?}",
968                fragment_actors
969            );
970            let (fragment, actors) = fragment_actors.pop().unwrap();
971            (
972                fragment.distribution_type,
973                Arc::new(
974                    actors
975                        .into_iter()
976                        .map(|actor| {
977                            (
978                                actor.actor_id as _,
979                                actor
980                                    .vnode_bitmap
981                                    .map(|bitmap| Bitmap::from(bitmap.to_protobuf())),
982                            )
983                        })
984                        .collect(),
985                ),
986            )
987        };
988        result
989    };
990    let fragment_relations = FragmentRelation::find()
991        .filter(fragment_relation::Column::SourceFragmentId.is_in(fragment_ids))
992        .all(db)
993        .await?;
994
995    let mut actor_dispatchers_map: HashMap<_, HashMap<_, Vec<_>>> = HashMap::new();
996    for fragment_relation::Model {
997        source_fragment_id,
998        target_fragment_id,
999        dispatcher_type,
1000        dist_key_indices,
1001        output_indices,
1002    } in fragment_relations
1003    {
1004        let (source_fragment_distribution, source_fragment_actors) = {
1005            let (distribution, actors) = {
1006                match fragment_actor_cache.entry(source_fragment_id) {
1007                    Entry::Occupied(entry) => entry.into_mut(),
1008                    Entry::Vacant(entry) => {
1009                        entry.insert(get_fragment_actors(source_fragment_id).await?)
1010                    }
1011                }
1012            };
1013            (*distribution, actors.clone())
1014        };
1015        let (target_fragment_distribution, target_fragment_actors) = {
1016            let (distribution, actors) = {
1017                match fragment_actor_cache.entry(target_fragment_id) {
1018                    Entry::Occupied(entry) => entry.into_mut(),
1019                    Entry::Vacant(entry) => {
1020                        entry.insert(get_fragment_actors(target_fragment_id).await?)
1021                    }
1022                }
1023            };
1024            (*distribution, actors.clone())
1025        };
1026        let dispatchers = compose_dispatchers(
1027            source_fragment_distribution,
1028            &source_fragment_actors,
1029            target_fragment_id as _,
1030            target_fragment_distribution,
1031            &target_fragment_actors,
1032            dispatcher_type,
1033            dist_key_indices.into_u32_array(),
1034            output_indices.into_u32_array(),
1035        );
1036        let actor_dispatchers_map = actor_dispatchers_map
1037            .entry(source_fragment_id as _)
1038            .or_default();
1039        for (actor_id, dispatchers) in dispatchers {
1040            actor_dispatchers_map
1041                .entry(actor_id as _)
1042                .or_default()
1043                .push(dispatchers);
1044        }
1045    }
1046    Ok(actor_dispatchers_map)
1047}
1048
1049pub fn compose_dispatchers(
1050    source_fragment_distribution: DistributionType,
1051    source_fragment_actors: &HashMap<crate::model::ActorId, Option<Bitmap>>,
1052    target_fragment_id: crate::model::FragmentId,
1053    target_fragment_distribution: DistributionType,
1054    target_fragment_actors: &HashMap<crate::model::ActorId, Option<Bitmap>>,
1055    dispatcher_type: DispatcherType,
1056    dist_key_indices: Vec<u32>,
1057    output_indices: Vec<u32>,
1058) -> HashMap<crate::model::ActorId, PbDispatcher> {
1059    match dispatcher_type {
1060        DispatcherType::Hash => {
1061            let dispatcher = PbDispatcher {
1062                r#type: PbDispatcherType::from(dispatcher_type) as _,
1063                dist_key_indices: dist_key_indices.clone(),
1064                output_indices: output_indices.clone(),
1065                hash_mapping: Some(
1066                    ActorMapping::from_bitmaps(
1067                        &target_fragment_actors
1068                            .iter()
1069                            .map(|(actor_id, bitmap)| {
1070                                (
1071                                    *actor_id as _,
1072                                    bitmap
1073                                        .clone()
1074                                        .expect("downstream hash dispatch must have distribution"),
1075                                )
1076                            })
1077                            .collect(),
1078                    )
1079                    .to_protobuf(),
1080                ),
1081                dispatcher_id: target_fragment_id as _,
1082                downstream_actor_id: target_fragment_actors
1083                    .keys()
1084                    .map(|actor_id| *actor_id as _)
1085                    .collect(),
1086            };
1087            source_fragment_actors
1088                .keys()
1089                .map(|source_actor_id| (*source_actor_id, dispatcher.clone()))
1090                .collect()
1091        }
1092        DispatcherType::Broadcast | DispatcherType::Simple => {
1093            let dispatcher = PbDispatcher {
1094                r#type: PbDispatcherType::from(dispatcher_type) as _,
1095                dist_key_indices: dist_key_indices.clone(),
1096                output_indices: output_indices.clone(),
1097                hash_mapping: None,
1098                dispatcher_id: target_fragment_id as _,
1099                downstream_actor_id: target_fragment_actors
1100                    .keys()
1101                    .map(|actor_id| *actor_id as _)
1102                    .collect(),
1103            };
1104            source_fragment_actors
1105                .keys()
1106                .map(|source_actor_id| (*source_actor_id, dispatcher.clone()))
1107                .collect()
1108        }
1109        DispatcherType::NoShuffle => resolve_no_shuffle_actor_dispatcher(
1110            source_fragment_distribution,
1111            source_fragment_actors,
1112            target_fragment_distribution,
1113            target_fragment_actors,
1114        )
1115        .into_iter()
1116        .map(|(upstream_actor_id, downstream_actor_id)| {
1117            (
1118                upstream_actor_id,
1119                PbDispatcher {
1120                    r#type: PbDispatcherType::NoShuffle as _,
1121                    dist_key_indices: dist_key_indices.clone(),
1122                    output_indices: output_indices.clone(),
1123                    hash_mapping: None,
1124                    dispatcher_id: target_fragment_id as _,
1125                    downstream_actor_id: vec![downstream_actor_id as _],
1126                },
1127            )
1128        })
1129        .collect(),
1130    }
1131}
1132
1133/// return (`upstream_actor_id` -> `downstream_actor_id`)
1134pub fn resolve_no_shuffle_actor_dispatcher(
1135    source_fragment_distribution: DistributionType,
1136    source_fragment_actors: &HashMap<crate::model::ActorId, Option<Bitmap>>,
1137    target_fragment_distribution: DistributionType,
1138    target_fragment_actors: &HashMap<crate::model::ActorId, Option<Bitmap>>,
1139) -> Vec<(crate::model::ActorId, crate::model::ActorId)> {
1140    assert_eq!(source_fragment_distribution, target_fragment_distribution);
1141    assert_eq!(
1142        source_fragment_actors.len(),
1143        target_fragment_actors.len(),
1144        "no-shuffle should have equal upstream downstream actor count: {:?} {:?}",
1145        source_fragment_actors,
1146        target_fragment_actors
1147    );
1148    match source_fragment_distribution {
1149        DistributionType::Single => {
1150            let assert_singleton = |bitmap: &Option<Bitmap>| {
1151                assert!(
1152                    bitmap.as_ref().map(|bitmap| bitmap.all()).unwrap_or(true),
1153                    "not singleton: {:?}",
1154                    bitmap
1155                );
1156            };
1157            assert_eq!(
1158                source_fragment_actors.len(),
1159                1,
1160                "singleton distribution actor count not 1: {:?}",
1161                source_fragment_distribution
1162            );
1163            assert_eq!(
1164                target_fragment_actors.len(),
1165                1,
1166                "singleton distribution actor count not 1: {:?}",
1167                target_fragment_distribution
1168            );
1169            let (source_actor_id, bitmap) = source_fragment_actors.iter().next().unwrap();
1170            assert_singleton(bitmap);
1171            let (target_actor_id, bitmap) = target_fragment_actors.iter().next().unwrap();
1172            assert_singleton(bitmap);
1173            vec![(*source_actor_id, *target_actor_id)]
1174        }
1175        DistributionType::Hash => {
1176            let mut target_fragment_actor_index: HashMap<_, _> = target_fragment_actors
1177                .iter()
1178                .map(|(actor_id, bitmap)| {
1179                    let bitmap = bitmap
1180                        .as_ref()
1181                        .expect("hash distribution should have bitmap");
1182                    let first_vnode = bitmap.iter_vnodes().next().expect("non-empty bitmap");
1183                    (first_vnode, (*actor_id, bitmap))
1184                })
1185                .collect();
1186            source_fragment_actors
1187                .iter()
1188                .map(|(source_actor_id, bitmap)| {
1189                    let bitmap = bitmap
1190                        .as_ref()
1191                        .expect("hash distribution should have bitmap");
1192                    let first_vnode = bitmap.iter_vnodes().next().expect("non-empty bitmap");
1193                    let (target_actor_id, target_bitmap) =
1194                        target_fragment_actor_index.remove(&first_vnode).unwrap_or_else(|| {
1195                            panic!(
1196                                "cannot find matched target actor: {} {:?} {:?} {:?}",
1197                                source_actor_id,
1198                                first_vnode,
1199                                source_fragment_actors,
1200                                target_fragment_actors
1201                            );
1202                        });
1203                    assert_eq!(
1204                        bitmap,
1205                        target_bitmap,
1206                        "cannot find matched target actor due to bitmap mismatch: {} {:?} {:?} {:?}",
1207                        source_actor_id,
1208                        first_vnode,
1209                        source_fragment_actors,
1210                        target_fragment_actors
1211                    );
1212                    (*source_actor_id, target_actor_id)
1213                }).collect()
1214        }
1215    }
1216}
1217
1218/// `get_fragment_mappings` returns the fragment vnode mappings of the given job.
1219pub async fn get_fragment_mappings<C>(
1220    db: &C,
1221    job_id: ObjectId,
1222) -> MetaResult<Vec<PbFragmentWorkerSlotMapping>>
1223where
1224    C: ConnectionTrait,
1225{
1226    let job_actors: Vec<(
1227        FragmentId,
1228        DistributionType,
1229        ActorId,
1230        Option<VnodeBitmap>,
1231        WorkerId,
1232        ActorStatus,
1233    )> = Actor::find()
1234        .select_only()
1235        .columns([
1236            fragment::Column::FragmentId,
1237            fragment::Column::DistributionType,
1238        ])
1239        .columns([
1240            actor::Column::ActorId,
1241            actor::Column::VnodeBitmap,
1242            actor::Column::WorkerId,
1243            actor::Column::Status,
1244        ])
1245        .join(JoinType::InnerJoin, actor::Relation::Fragment.def())
1246        .filter(fragment::Column::JobId.eq(job_id))
1247        .into_tuple()
1248        .all(db)
1249        .await?;
1250
1251    Ok(rebuild_fragment_mapping_from_actors(job_actors))
1252}
1253
1254pub fn rebuild_fragment_mapping_from_actors(
1255    job_actors: Vec<(
1256        FragmentId,
1257        DistributionType,
1258        ActorId,
1259        Option<VnodeBitmap>,
1260        WorkerId,
1261        ActorStatus,
1262    )>,
1263) -> Vec<FragmentWorkerSlotMapping> {
1264    let mut all_actor_locations = HashMap::new();
1265    let mut actor_bitmaps = HashMap::new();
1266    let mut fragment_actors = HashMap::new();
1267    let mut fragment_dist = HashMap::new();
1268
1269    for (fragment_id, dist, actor_id, bitmap, worker_id, actor_status) in job_actors {
1270        if actor_status == ActorStatus::Inactive {
1271            continue;
1272        }
1273
1274        all_actor_locations
1275            .entry(fragment_id)
1276            .or_insert(HashMap::new())
1277            .insert(actor_id as hash::ActorId, worker_id as u32);
1278        actor_bitmaps.insert(actor_id, bitmap);
1279        fragment_actors
1280            .entry(fragment_id)
1281            .or_insert_with(Vec::new)
1282            .push(actor_id);
1283        fragment_dist.insert(fragment_id, dist);
1284    }
1285
1286    let mut result = vec![];
1287    for (fragment_id, dist) in fragment_dist {
1288        let mut actor_locations = all_actor_locations.remove(&fragment_id).unwrap();
1289        let fragment_worker_slot_mapping = match dist {
1290            DistributionType::Single => {
1291                let actor = fragment_actors
1292                    .remove(&fragment_id)
1293                    .unwrap()
1294                    .into_iter()
1295                    .exactly_one()
1296                    .unwrap() as hash::ActorId;
1297                let actor_location = actor_locations.remove(&actor).unwrap();
1298
1299                WorkerSlotMapping::new_single(WorkerSlotId::new(actor_location, 0))
1300            }
1301            DistributionType::Hash => {
1302                let actors = fragment_actors.remove(&fragment_id).unwrap();
1303
1304                let all_actor_bitmaps: HashMap<_, _> = actors
1305                    .iter()
1306                    .map(|actor_id| {
1307                        let vnode_bitmap = actor_bitmaps
1308                            .remove(actor_id)
1309                            .flatten()
1310                            .expect("actor bitmap shouldn't be none in hash fragment");
1311
1312                        let bitmap = Bitmap::from(&vnode_bitmap.to_protobuf());
1313                        (*actor_id as hash::ActorId, bitmap)
1314                    })
1315                    .collect();
1316
1317                let actor_mapping = ActorMapping::from_bitmaps(&all_actor_bitmaps);
1318
1319                actor_mapping.to_worker_slot(&actor_locations)
1320            }
1321        };
1322
1323        result.push(PbFragmentWorkerSlotMapping {
1324            fragment_id: fragment_id as u32,
1325            mapping: Some(fragment_worker_slot_mapping.to_protobuf()),
1326        })
1327    }
1328    result
1329}
1330
1331/// `get_fragment_actor_ids` returns the fragment actor ids of the given fragments.
1332pub async fn get_fragment_actor_ids<C>(
1333    db: &C,
1334    fragment_ids: Vec<FragmentId>,
1335) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>>
1336where
1337    C: ConnectionTrait,
1338{
1339    let fragment_actors: Vec<(FragmentId, ActorId)> = Actor::find()
1340        .select_only()
1341        .columns([actor::Column::FragmentId, actor::Column::ActorId])
1342        .filter(actor::Column::FragmentId.is_in(fragment_ids))
1343        .into_tuple()
1344        .all(db)
1345        .await?;
1346
1347    Ok(fragment_actors.into_iter().into_group_map())
1348}
1349
1350/// For the given streaming jobs, returns
1351/// - All source fragments
1352/// - All actors
1353/// - All fragments
1354pub async fn get_fragments_for_jobs<C>(
1355    db: &C,
1356    streaming_jobs: Vec<ObjectId>,
1357) -> MetaResult<(
1358    HashMap<SourceId, BTreeSet<FragmentId>>,
1359    HashSet<ActorId>,
1360    HashSet<FragmentId>,
1361)>
1362where
1363    C: ConnectionTrait,
1364{
1365    if streaming_jobs.is_empty() {
1366        return Ok((HashMap::default(), HashSet::default(), HashSet::default()));
1367    }
1368
1369    let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find()
1370        .select_only()
1371        .columns([
1372            fragment::Column::FragmentId,
1373            fragment::Column::FragmentTypeMask,
1374            fragment::Column::StreamNode,
1375        ])
1376        .filter(fragment::Column::JobId.is_in(streaming_jobs))
1377        .into_tuple()
1378        .all(db)
1379        .await?;
1380    let actors: Vec<ActorId> = Actor::find()
1381        .select_only()
1382        .column(actor::Column::ActorId)
1383        .filter(
1384            actor::Column::FragmentId.is_in(fragments.iter().map(|(id, _, _)| *id).collect_vec()),
1385        )
1386        .into_tuple()
1387        .all(db)
1388        .await?;
1389
1390    let fragment_ids = fragments
1391        .iter()
1392        .map(|(fragment_id, _, _)| *fragment_id)
1393        .collect();
1394
1395    let mut source_fragment_ids: HashMap<SourceId, BTreeSet<FragmentId>> = HashMap::new();
1396    for (fragment_id, mask, stream_node) in fragments {
1397        if mask & PbFragmentTypeFlag::Source as i32 == 0 {
1398            continue;
1399        }
1400        if let Some(source_id) = stream_node.to_protobuf().find_stream_source() {
1401            source_fragment_ids
1402                .entry(source_id as _)
1403                .or_default()
1404                .insert(fragment_id);
1405        }
1406    }
1407
1408    Ok((
1409        source_fragment_ids,
1410        actors.into_iter().collect(),
1411        fragment_ids,
1412    ))
1413}
1414
1415/// Build a object group for notifying the deletion of the given objects.
1416///
1417/// Note that only id fields are filled in the object info, as the arguments are partial objects.
1418/// As a result, the returned notification info should only be used for deletion.
1419pub(crate) fn build_object_group_for_delete(
1420    partial_objects: Vec<PartialObject>,
1421) -> NotificationInfo {
1422    let mut objects = vec![];
1423    for obj in partial_objects {
1424        match obj.obj_type {
1425            ObjectType::Database => objects.push(PbObject {
1426                object_info: Some(PbObjectInfo::Database(PbDatabase {
1427                    id: obj.oid as _,
1428                    ..Default::default()
1429                })),
1430            }),
1431            ObjectType::Schema => objects.push(PbObject {
1432                object_info: Some(PbObjectInfo::Schema(PbSchema {
1433                    id: obj.oid as _,
1434                    database_id: obj.database_id.unwrap() as _,
1435                    ..Default::default()
1436                })),
1437            }),
1438            ObjectType::Table => objects.push(PbObject {
1439                object_info: Some(PbObjectInfo::Table(PbTable {
1440                    id: obj.oid as _,
1441                    schema_id: obj.schema_id.unwrap() as _,
1442                    database_id: obj.database_id.unwrap() as _,
1443                    ..Default::default()
1444                })),
1445            }),
1446            ObjectType::Source => objects.push(PbObject {
1447                object_info: Some(PbObjectInfo::Source(PbSource {
1448                    id: obj.oid as _,
1449                    schema_id: obj.schema_id.unwrap() as _,
1450                    database_id: obj.database_id.unwrap() as _,
1451                    ..Default::default()
1452                })),
1453            }),
1454            ObjectType::Sink => objects.push(PbObject {
1455                object_info: Some(PbObjectInfo::Sink(PbSink {
1456                    id: obj.oid as _,
1457                    schema_id: obj.schema_id.unwrap() as _,
1458                    database_id: obj.database_id.unwrap() as _,
1459                    ..Default::default()
1460                })),
1461            }),
1462            ObjectType::Subscription => objects.push(PbObject {
1463                object_info: Some(PbObjectInfo::Subscription(PbSubscription {
1464                    id: obj.oid as _,
1465                    schema_id: obj.schema_id.unwrap() as _,
1466                    database_id: obj.database_id.unwrap() as _,
1467                    ..Default::default()
1468                })),
1469            }),
1470            ObjectType::View => objects.push(PbObject {
1471                object_info: Some(PbObjectInfo::View(PbView {
1472                    id: obj.oid as _,
1473                    schema_id: obj.schema_id.unwrap() as _,
1474                    database_id: obj.database_id.unwrap() as _,
1475                    ..Default::default()
1476                })),
1477            }),
1478            ObjectType::Index => {
1479                objects.push(PbObject {
1480                    object_info: Some(PbObjectInfo::Index(PbIndex {
1481                        id: obj.oid as _,
1482                        schema_id: obj.schema_id.unwrap() as _,
1483                        database_id: obj.database_id.unwrap() as _,
1484                        ..Default::default()
1485                    })),
1486                });
1487                objects.push(PbObject {
1488                    object_info: Some(PbObjectInfo::Table(PbTable {
1489                        id: obj.oid as _,
1490                        schema_id: obj.schema_id.unwrap() as _,
1491                        database_id: obj.database_id.unwrap() as _,
1492                        ..Default::default()
1493                    })),
1494                });
1495            }
1496            ObjectType::Function => objects.push(PbObject {
1497                object_info: Some(PbObjectInfo::Function(PbFunction {
1498                    id: obj.oid as _,
1499                    schema_id: obj.schema_id.unwrap() as _,
1500                    database_id: obj.database_id.unwrap() as _,
1501                    ..Default::default()
1502                })),
1503            }),
1504            ObjectType::Connection => objects.push(PbObject {
1505                object_info: Some(PbObjectInfo::Connection(PbConnection {
1506                    id: obj.oid as _,
1507                    schema_id: obj.schema_id.unwrap() as _,
1508                    database_id: obj.database_id.unwrap() as _,
1509                    ..Default::default()
1510                })),
1511            }),
1512            ObjectType::Secret => objects.push(PbObject {
1513                object_info: Some(PbObjectInfo::Secret(PbSecret {
1514                    id: obj.oid as _,
1515                    schema_id: obj.schema_id.unwrap() as _,
1516                    database_id: obj.database_id.unwrap() as _,
1517                    ..Default::default()
1518                })),
1519            }),
1520        }
1521    }
1522    NotificationInfo::ObjectGroup(PbObjectGroup { objects })
1523}
1524
1525pub fn extract_external_table_name_from_definition(table_definition: &str) -> Option<String> {
1526    let [mut definition]: [_; 1] = Parser::parse_sql(table_definition)
1527        .context("unable to parse table definition")
1528        .inspect_err(|e| {
1529            tracing::error!(
1530                target: "auto_schema_change",
1531                error = %e.as_report(),
1532                "failed to parse table definition")
1533        })
1534        .unwrap()
1535        .try_into()
1536        .unwrap();
1537    if let SqlStatement::CreateTable { cdc_table_info, .. } = &mut definition {
1538        cdc_table_info
1539            .clone()
1540            .map(|cdc_table_info| cdc_table_info.external_table_name)
1541    } else {
1542        None
1543    }
1544}
1545
1546/// `rename_relation` renames the target relation and its definition,
1547/// it commits the changes to the transaction and returns the updated relations and the old name.
1548pub async fn rename_relation(
1549    txn: &DatabaseTransaction,
1550    object_type: ObjectType,
1551    object_id: ObjectId,
1552    object_name: &str,
1553) -> MetaResult<(Vec<PbObject>, String)> {
1554    use sea_orm::ActiveModelTrait;
1555
1556    use crate::controller::rename::alter_relation_rename;
1557
1558    let mut to_update_relations = vec![];
1559    // rename relation.
1560    macro_rules! rename_relation {
1561        ($entity:ident, $table:ident, $identity:ident, $object_id:expr) => {{
1562            let (mut relation, obj) = $entity::find_by_id($object_id)
1563                .find_also_related(Object)
1564                .one(txn)
1565                .await?
1566                .unwrap();
1567            let obj = obj.unwrap();
1568            let old_name = relation.name.clone();
1569            relation.name = object_name.into();
1570            if obj.obj_type != ObjectType::View {
1571                relation.definition = alter_relation_rename(&relation.definition, object_name);
1572            }
1573            let active_model = $table::ActiveModel {
1574                $identity: Set(relation.$identity),
1575                name: Set(object_name.into()),
1576                definition: Set(relation.definition.clone()),
1577                ..Default::default()
1578            };
1579            active_model.update(txn).await?;
1580            to_update_relations.push(PbObject {
1581                object_info: Some(PbObjectInfo::$entity(ObjectModel(relation, obj).into())),
1582            });
1583            old_name
1584        }};
1585    }
1586    // TODO: check is there any thing to change for shared source?
1587    let old_name = match object_type {
1588        ObjectType::Table => rename_relation!(Table, table, table_id, object_id),
1589        ObjectType::Source => rename_relation!(Source, source, source_id, object_id),
1590        ObjectType::Sink => rename_relation!(Sink, sink, sink_id, object_id),
1591        ObjectType::Subscription => {
1592            rename_relation!(Subscription, subscription, subscription_id, object_id)
1593        }
1594        ObjectType::View => rename_relation!(View, view, view_id, object_id),
1595        ObjectType::Index => {
1596            let (mut index, obj) = Index::find_by_id(object_id)
1597                .find_also_related(Object)
1598                .one(txn)
1599                .await?
1600                .unwrap();
1601            index.name = object_name.into();
1602            let index_table_id = index.index_table_id;
1603            let old_name = rename_relation!(Table, table, table_id, index_table_id);
1604
1605            // the name of index and its associated table is the same.
1606            let active_model = index::ActiveModel {
1607                index_id: sea_orm::ActiveValue::Set(index.index_id),
1608                name: sea_orm::ActiveValue::Set(object_name.into()),
1609                ..Default::default()
1610            };
1611            active_model.update(txn).await?;
1612            to_update_relations.push(PbObject {
1613                object_info: Some(PbObjectInfo::Index(ObjectModel(index, obj.unwrap()).into())),
1614            });
1615            old_name
1616        }
1617        _ => unreachable!("only relation name can be altered."),
1618    };
1619
1620    Ok((to_update_relations, old_name))
1621}
1622
1623pub async fn get_database_resource_group<C>(txn: &C, database_id: ObjectId) -> MetaResult<String>
1624where
1625    C: ConnectionTrait,
1626{
1627    let database_resource_group: Option<String> = Database::find_by_id(database_id)
1628        .select_only()
1629        .column(database::Column::ResourceGroup)
1630        .into_tuple()
1631        .one(txn)
1632        .await?
1633        .ok_or_else(|| MetaError::catalog_id_not_found("database", database_id))?;
1634
1635    Ok(database_resource_group.unwrap_or_else(|| DEFAULT_RESOURCE_GROUP.to_owned()))
1636}
1637
1638pub async fn get_existing_job_resource_group<C>(
1639    txn: &C,
1640    streaming_job_id: ObjectId,
1641) -> MetaResult<String>
1642where
1643    C: ConnectionTrait,
1644{
1645    let (job_specific_resource_group, database_resource_group): (Option<String>, Option<String>) =
1646        StreamingJob::find_by_id(streaming_job_id)
1647            .select_only()
1648            .join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
1649            .join(JoinType::InnerJoin, object::Relation::Database2.def())
1650            .column(streaming_job::Column::SpecificResourceGroup)
1651            .column(database::Column::ResourceGroup)
1652            .into_tuple()
1653            .one(txn)
1654            .await?
1655            .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", streaming_job_id))?;
1656
1657    Ok(job_specific_resource_group.unwrap_or_else(|| {
1658        database_resource_group.unwrap_or_else(|| DEFAULT_RESOURCE_GROUP.to_owned())
1659    }))
1660}
1661
1662pub fn filter_workers_by_resource_group(
1663    workers: &HashMap<u32, WorkerNode>,
1664    resource_group: &str,
1665) -> BTreeSet<WorkerId> {
1666    workers
1667        .iter()
1668        .filter(|&(_, worker)| {
1669            worker
1670                .resource_group()
1671                .map(|node_label| node_label.as_str() == resource_group)
1672                .unwrap_or(false)
1673        })
1674        .map(|(id, _)| (*id as WorkerId))
1675        .collect()
1676}
1677
1678/// `rename_relation_refer` updates the definition of relations that refer to the target one,
1679/// it commits the changes to the transaction and returns all the updated relations.
1680pub async fn rename_relation_refer(
1681    txn: &DatabaseTransaction,
1682    object_type: ObjectType,
1683    object_id: ObjectId,
1684    object_name: &str,
1685    old_name: &str,
1686) -> MetaResult<Vec<PbObject>> {
1687    use sea_orm::ActiveModelTrait;
1688
1689    use crate::controller::rename::alter_relation_rename_refs;
1690
1691    let mut to_update_relations = vec![];
1692    macro_rules! rename_relation_ref {
1693        ($entity:ident, $table:ident, $identity:ident, $object_id:expr) => {{
1694            let (mut relation, obj) = $entity::find_by_id($object_id)
1695                .find_also_related(Object)
1696                .one(txn)
1697                .await?
1698                .unwrap();
1699            relation.definition =
1700                alter_relation_rename_refs(&relation.definition, old_name, object_name);
1701            let active_model = $table::ActiveModel {
1702                $identity: Set(relation.$identity),
1703                definition: Set(relation.definition.clone()),
1704                ..Default::default()
1705            };
1706            active_model.update(txn).await?;
1707            to_update_relations.push(PbObject {
1708                object_info: Some(PbObjectInfo::$entity(
1709                    ObjectModel(relation, obj.unwrap()).into(),
1710                )),
1711            });
1712        }};
1713    }
1714    let mut objs = get_referring_objects(object_id, txn).await?;
1715    if object_type == ObjectType::Table {
1716        let incoming_sinks: I32Array = Table::find_by_id(object_id)
1717            .select_only()
1718            .column(table::Column::IncomingSinks)
1719            .into_tuple()
1720            .one(txn)
1721            .await?
1722            .ok_or_else(|| MetaError::catalog_id_not_found("table", object_id))?;
1723
1724        objs.extend(
1725            incoming_sinks
1726                .into_inner()
1727                .into_iter()
1728                .map(|id| PartialObject {
1729                    oid: id,
1730                    obj_type: ObjectType::Sink,
1731                    schema_id: None,
1732                    database_id: None,
1733                }),
1734        );
1735    }
1736
1737    for obj in objs {
1738        match obj.obj_type {
1739            ObjectType::Table => rename_relation_ref!(Table, table, table_id, obj.oid),
1740            ObjectType::Sink => rename_relation_ref!(Sink, sink, sink_id, obj.oid),
1741            ObjectType::Subscription => {
1742                rename_relation_ref!(Subscription, subscription, subscription_id, obj.oid)
1743            }
1744            ObjectType::View => rename_relation_ref!(View, view, view_id, obj.oid),
1745            ObjectType::Index => {
1746                let index_table_id: Option<TableId> = Index::find_by_id(obj.oid)
1747                    .select_only()
1748                    .column(index::Column::IndexTableId)
1749                    .into_tuple()
1750                    .one(txn)
1751                    .await?;
1752                rename_relation_ref!(Table, table, table_id, index_table_id.unwrap());
1753            }
1754            _ => {
1755                bail!("only table, sink, subscription, view and index depend on other objects.")
1756            }
1757        }
1758    }
1759
1760    Ok(to_update_relations)
1761}
1762
1763/// Validate that subscription can be safely deleted, meeting any of the following conditions:
1764/// 1. The upstream table is not referred to by any cross-db mv.
1765/// 2. After deleting the subscription, the upstream table still has at least one subscription.
1766pub async fn validate_subscription_deletion<C>(txn: &C, subscription_id: ObjectId) -> MetaResult<()>
1767where
1768    C: ConnectionTrait,
1769{
1770    let upstream_table_id: ObjectId = Subscription::find_by_id(subscription_id)
1771        .select_only()
1772        .column(subscription::Column::DependentTableId)
1773        .into_tuple()
1774        .one(txn)
1775        .await?
1776        .ok_or_else(|| MetaError::catalog_id_not_found("subscription", subscription_id))?;
1777
1778    let cnt = Subscription::find()
1779        .filter(subscription::Column::DependentTableId.eq(upstream_table_id))
1780        .count(txn)
1781        .await?;
1782    if cnt > 1 {
1783        // Ensure that at least one subscription is remained for the upstream table
1784        // once the subscription is dropped.
1785        return Ok(());
1786    }
1787
1788    // Ensure that the upstream table is not referred by any cross-db mv.
1789    let obj_alias = Alias::new("o1");
1790    let used_by_alias = Alias::new("o2");
1791    let count = ObjectDependency::find()
1792        .join_as(
1793            JoinType::InnerJoin,
1794            object_dependency::Relation::Object2.def(),
1795            obj_alias.clone(),
1796        )
1797        .join_as(
1798            JoinType::InnerJoin,
1799            object_dependency::Relation::Object1.def(),
1800            used_by_alias.clone(),
1801        )
1802        .filter(
1803            object_dependency::Column::Oid
1804                .eq(upstream_table_id)
1805                .and(object_dependency::Column::UsedBy.ne(subscription_id))
1806                .and(
1807                    Expr::col((obj_alias, object::Column::DatabaseId))
1808                        .ne(Expr::col((used_by_alias, object::Column::DatabaseId))),
1809                ),
1810        )
1811        .count(txn)
1812        .await?;
1813
1814    if count != 0 {
1815        return Err(MetaError::permission_denied(format!(
1816            "Referenced by {} cross-db objects.",
1817            count
1818        )));
1819    }
1820
1821    Ok(())
1822}
1823
1824#[cfg(test)]
1825mod tests {
1826    use super::*;
1827
1828    #[test]
1829    fn test_extract_cdc_table_name() {
1830        let ddl1 = "CREATE TABLE t1 () FROM pg_source TABLE 'public.t1'";
1831        let ddl2 = "CREATE TABLE t2 (v1 int) FROM pg_source TABLE 'mydb.t2'";
1832        assert_eq!(
1833            extract_external_table_name_from_definition(ddl1),
1834            Some("public.t1".into())
1835        );
1836        assert_eq!(
1837            extract_external_table_name_from_definition(ddl2),
1838            Some("mydb.t2".into())
1839        );
1840    }
1841}