risingwave_meta/controller/
utils.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{BTreeSet, HashMap, HashSet};
17use std::sync::Arc;
18
19use anyhow::{Context, anyhow};
20use itertools::Itertools;
21use risingwave_common::bitmap::Bitmap;
22use risingwave_common::hash::{ActorMapping, VnodeBitmapExt, WorkerSlotId, WorkerSlotMapping};
23use risingwave_common::util::worker_util::DEFAULT_RESOURCE_GROUP;
24use risingwave_common::{bail, hash};
25use risingwave_meta_model::actor::ActorStatus;
26use risingwave_meta_model::fragment::DistributionType;
27use risingwave_meta_model::object::ObjectType;
28use risingwave_meta_model::prelude::*;
29use risingwave_meta_model::table::TableType;
30use risingwave_meta_model::{
31    ActorId, DataTypeArray, DatabaseId, DispatcherType, FragmentId, I32Array, JobStatus, ObjectId,
32    PrivilegeId, SchemaId, SourceId, StreamNode, StreamSourceInfo, TableId, UserId, VnodeBitmap,
33    WorkerId, actor, connection, database, fragment, fragment_relation, function, index, object,
34    object_dependency, schema, secret, sink, source, streaming_job, subscription, table, user,
35    user_privilege, view,
36};
37use risingwave_meta_model_migration::WithQuery;
38use risingwave_pb::catalog::{
39    PbConnection, PbDatabase, PbFunction, PbIndex, PbSchema, PbSecret, PbSink, PbSource,
40    PbSubscription, PbTable, PbView,
41};
42use risingwave_pb::common::WorkerNode;
43use risingwave_pb::meta::object::PbObjectInfo;
44use risingwave_pb::meta::subscribe_response::Info as NotificationInfo;
45use risingwave_pb::meta::{
46    FragmentWorkerSlotMapping, PbFragmentWorkerSlotMapping, PbObject, PbObjectGroup,
47};
48use risingwave_pb::stream_plan::{
49    PbDispatchOutputMapping, PbDispatcher, PbDispatcherType, PbFragmentTypeFlag,
50};
51use risingwave_pb::user::grant_privilege::{
52    PbAction, PbActionWithGrantOption, PbObject as PbGrantObject,
53};
54use risingwave_pb::user::{PbGrantPrivilege, PbUserInfo};
55use risingwave_sqlparser::ast::Statement as SqlStatement;
56use risingwave_sqlparser::parser::Parser;
57use sea_orm::sea_query::{
58    Alias, CommonTableExpression, Expr, Query, QueryStatementBuilder, SelectStatement, UnionType,
59    WithClause,
60};
61use sea_orm::{
62    ColumnTrait, ConnectionTrait, DatabaseTransaction, DerivePartialModel, EntityTrait,
63    FromQueryResult, IntoActiveModel, JoinType, Order, PaginatorTrait, QueryFilter, QuerySelect,
64    RelationTrait, Set, Statement,
65};
66use thiserror_ext::AsReport;
67
68use crate::controller::ObjectModel;
69use crate::model::{FragmentActorDispatchers, FragmentDownstreamRelation};
70use crate::{MetaError, MetaResult};
71
72/// This function will construct a query using recursive cte to find all objects[(id, `obj_type`)] that are used by the given object.
73///
74/// # Examples
75///
76/// ```
77/// use risingwave_meta::controller::utils::construct_obj_dependency_query;
78/// use sea_orm::sea_query::*;
79/// use sea_orm::*;
80///
81/// let query = construct_obj_dependency_query(1);
82///
83/// assert_eq!(
84///     query.to_string(MysqlQueryBuilder),
85///     r#"WITH RECURSIVE `used_by_object_ids` (`used_by`) AS (SELECT `used_by` FROM `object_dependency` WHERE `object_dependency`.`oid` = 1 UNION ALL (SELECT `oid` FROM `object` WHERE `object`.`database_id` = 1 OR `object`.`schema_id` = 1) UNION ALL (SELECT `object_dependency`.`used_by` FROM `object_dependency` INNER JOIN `used_by_object_ids` ON `used_by_object_ids`.`used_by` = `oid`)) SELECT DISTINCT `oid`, `obj_type`, `schema_id`, `database_id` FROM `used_by_object_ids` INNER JOIN `object` ON `used_by_object_ids`.`used_by` = `oid` ORDER BY `oid` DESC"#
86/// );
87/// assert_eq!(
88///     query.to_string(PostgresQueryBuilder),
89///     r#"WITH RECURSIVE "used_by_object_ids" ("used_by") AS (SELECT "used_by" FROM "object_dependency" WHERE "object_dependency"."oid" = 1 UNION ALL (SELECT "oid" FROM "object" WHERE "object"."database_id" = 1 OR "object"."schema_id" = 1) UNION ALL (SELECT "object_dependency"."used_by" FROM "object_dependency" INNER JOIN "used_by_object_ids" ON "used_by_object_ids"."used_by" = "oid")) SELECT DISTINCT "oid", "obj_type", "schema_id", "database_id" FROM "used_by_object_ids" INNER JOIN "object" ON "used_by_object_ids"."used_by" = "oid" ORDER BY "oid" DESC"#
90/// );
91/// assert_eq!(
92///     query.to_string(SqliteQueryBuilder),
93///     r#"WITH RECURSIVE "used_by_object_ids" ("used_by") AS (SELECT "used_by" FROM "object_dependency" WHERE "object_dependency"."oid" = 1 UNION ALL SELECT "oid" FROM "object" WHERE "object"."database_id" = 1 OR "object"."schema_id" = 1 UNION ALL SELECT "object_dependency"."used_by" FROM "object_dependency" INNER JOIN "used_by_object_ids" ON "used_by_object_ids"."used_by" = "oid") SELECT DISTINCT "oid", "obj_type", "schema_id", "database_id" FROM "used_by_object_ids" INNER JOIN "object" ON "used_by_object_ids"."used_by" = "oid" ORDER BY "oid" DESC"#
94/// );
95/// ```
96pub fn construct_obj_dependency_query(obj_id: ObjectId) -> WithQuery {
97    let cte_alias = Alias::new("used_by_object_ids");
98    let cte_return_alias = Alias::new("used_by");
99
100    let mut base_query = SelectStatement::new()
101        .column(object_dependency::Column::UsedBy)
102        .from(ObjectDependency)
103        .and_where(object_dependency::Column::Oid.eq(obj_id))
104        .to_owned();
105
106    let belonged_obj_query = SelectStatement::new()
107        .column(object::Column::Oid)
108        .from(Object)
109        .and_where(
110            object::Column::DatabaseId
111                .eq(obj_id)
112                .or(object::Column::SchemaId.eq(obj_id)),
113        )
114        .to_owned();
115
116    let cte_referencing = Query::select()
117        .column((ObjectDependency, object_dependency::Column::UsedBy))
118        .from(ObjectDependency)
119        .inner_join(
120            cte_alias.clone(),
121            Expr::col((cte_alias.clone(), cte_return_alias.clone()))
122                .equals(object_dependency::Column::Oid),
123        )
124        .to_owned();
125
126    let common_table_expr = CommonTableExpression::new()
127        .query(
128            base_query
129                .union(UnionType::All, belonged_obj_query)
130                .union(UnionType::All, cte_referencing)
131                .to_owned(),
132        )
133        .column(cte_return_alias.clone())
134        .table_name(cte_alias.clone())
135        .to_owned();
136
137    SelectStatement::new()
138        .distinct()
139        .columns([
140            object::Column::Oid,
141            object::Column::ObjType,
142            object::Column::SchemaId,
143            object::Column::DatabaseId,
144        ])
145        .from(cte_alias.clone())
146        .inner_join(
147            Object,
148            Expr::col((cte_alias, cte_return_alias.clone())).equals(object::Column::Oid),
149        )
150        .order_by(object::Column::Oid, Order::Desc)
151        .to_owned()
152        .with(
153            WithClause::new()
154                .recursive(true)
155                .cte(common_table_expr)
156                .to_owned(),
157        )
158        .to_owned()
159}
160
161/// This function will construct a query using recursive cte to find if dependent objects are already relying on the target table.
162///
163/// # Examples
164///
165/// ```
166/// use risingwave_meta::controller::utils::construct_sink_cycle_check_query;
167/// use sea_orm::sea_query::*;
168/// use sea_orm::*;
169///
170/// let query = construct_sink_cycle_check_query(1, vec![2, 3]);
171///
172/// assert_eq!(
173///     query.to_string(MysqlQueryBuilder),
174///     r#"WITH RECURSIVE `used_by_object_ids_with_sink` (`oid`, `used_by`) AS (SELECT `oid`, `used_by` FROM `object_dependency` WHERE `object_dependency`.`oid` = 1 UNION ALL (SELECT `obj_dependency_with_sink`.`oid`, `obj_dependency_with_sink`.`used_by` FROM (SELECT `oid`, `used_by` FROM `object_dependency` UNION ALL (SELECT `sink_id`, `target_table` FROM `sink` WHERE `sink`.`target_table` IS NOT NULL)) AS `obj_dependency_with_sink` INNER JOIN `used_by_object_ids_with_sink` ON `used_by_object_ids_with_sink`.`used_by` = `obj_dependency_with_sink`.`oid` WHERE `used_by_object_ids_with_sink`.`used_by` <> `used_by_object_ids_with_sink`.`oid`)) SELECT COUNT(`used_by_object_ids_with_sink`.`used_by`) FROM `used_by_object_ids_with_sink` WHERE `used_by_object_ids_with_sink`.`used_by` IN (2, 3)"#
175/// );
176/// assert_eq!(
177///     query.to_string(PostgresQueryBuilder),
178///     r#"WITH RECURSIVE "used_by_object_ids_with_sink" ("oid", "used_by") AS (SELECT "oid", "used_by" FROM "object_dependency" WHERE "object_dependency"."oid" = 1 UNION ALL (SELECT "obj_dependency_with_sink"."oid", "obj_dependency_with_sink"."used_by" FROM (SELECT "oid", "used_by" FROM "object_dependency" UNION ALL (SELECT "sink_id", "target_table" FROM "sink" WHERE "sink"."target_table" IS NOT NULL)) AS "obj_dependency_with_sink" INNER JOIN "used_by_object_ids_with_sink" ON "used_by_object_ids_with_sink"."used_by" = "obj_dependency_with_sink"."oid" WHERE "used_by_object_ids_with_sink"."used_by" <> "used_by_object_ids_with_sink"."oid")) SELECT COUNT("used_by_object_ids_with_sink"."used_by") FROM "used_by_object_ids_with_sink" WHERE "used_by_object_ids_with_sink"."used_by" IN (2, 3)"#
179/// );
180/// assert_eq!(
181///     query.to_string(SqliteQueryBuilder),
182///     r#"WITH RECURSIVE "used_by_object_ids_with_sink" ("oid", "used_by") AS (SELECT "oid", "used_by" FROM "object_dependency" WHERE "object_dependency"."oid" = 1 UNION ALL SELECT "obj_dependency_with_sink"."oid", "obj_dependency_with_sink"."used_by" FROM (SELECT "oid", "used_by" FROM "object_dependency" UNION ALL SELECT "sink_id", "target_table" FROM "sink" WHERE "sink"."target_table" IS NOT NULL) AS "obj_dependency_with_sink" INNER JOIN "used_by_object_ids_with_sink" ON "used_by_object_ids_with_sink"."used_by" = "obj_dependency_with_sink"."oid" WHERE "used_by_object_ids_with_sink"."used_by" <> "used_by_object_ids_with_sink"."oid") SELECT COUNT("used_by_object_ids_with_sink"."used_by") FROM "used_by_object_ids_with_sink" WHERE "used_by_object_ids_with_sink"."used_by" IN (2, 3)"#
183/// );
184/// ```
185pub fn construct_sink_cycle_check_query(
186    target_table: ObjectId,
187    dependent_objects: Vec<ObjectId>,
188) -> WithQuery {
189    let cte_alias = Alias::new("used_by_object_ids_with_sink");
190    let depend_alias = Alias::new("obj_dependency_with_sink");
191
192    let mut base_query = SelectStatement::new()
193        .columns([
194            object_dependency::Column::Oid,
195            object_dependency::Column::UsedBy,
196        ])
197        .from(ObjectDependency)
198        .and_where(object_dependency::Column::Oid.eq(target_table))
199        .to_owned();
200
201    let query_sink_deps = SelectStatement::new()
202        .columns([sink::Column::SinkId, sink::Column::TargetTable])
203        .from(Sink)
204        .and_where(sink::Column::TargetTable.is_not_null())
205        .to_owned();
206
207    let cte_referencing = Query::select()
208        .column((depend_alias.clone(), object_dependency::Column::Oid))
209        .column((depend_alias.clone(), object_dependency::Column::UsedBy))
210        .from_subquery(
211            SelectStatement::new()
212                .columns([
213                    object_dependency::Column::Oid,
214                    object_dependency::Column::UsedBy,
215                ])
216                .from(ObjectDependency)
217                .union(UnionType::All, query_sink_deps)
218                .to_owned(),
219            depend_alias.clone(),
220        )
221        .inner_join(
222            cte_alias.clone(),
223            Expr::col((cte_alias.clone(), object_dependency::Column::UsedBy)).eq(Expr::col((
224                depend_alias.clone(),
225                object_dependency::Column::Oid,
226            ))),
227        )
228        .and_where(
229            Expr::col((cte_alias.clone(), object_dependency::Column::UsedBy)).ne(Expr::col((
230                cte_alias.clone(),
231                object_dependency::Column::Oid,
232            ))),
233        )
234        .to_owned();
235
236    let common_table_expr = CommonTableExpression::new()
237        .query(base_query.union(UnionType::All, cte_referencing).to_owned())
238        .columns([
239            object_dependency::Column::Oid,
240            object_dependency::Column::UsedBy,
241        ])
242        .table_name(cte_alias.clone())
243        .to_owned();
244
245    SelectStatement::new()
246        .expr(Expr::col((cte_alias.clone(), object_dependency::Column::UsedBy)).count())
247        .from(cte_alias.clone())
248        .and_where(
249            Expr::col((cte_alias.clone(), object_dependency::Column::UsedBy))
250                .is_in(dependent_objects),
251        )
252        .to_owned()
253        .with(
254            WithClause::new()
255                .recursive(true)
256                .cte(common_table_expr)
257                .to_owned(),
258        )
259        .to_owned()
260}
261
262#[derive(Clone, DerivePartialModel, FromQueryResult, Debug)]
263#[sea_orm(entity = "Object")]
264pub struct PartialObject {
265    pub oid: ObjectId,
266    pub obj_type: ObjectType,
267    pub schema_id: Option<SchemaId>,
268    pub database_id: Option<DatabaseId>,
269}
270
271#[derive(Clone, DerivePartialModel, FromQueryResult)]
272#[sea_orm(entity = "Fragment")]
273pub struct PartialFragmentStateTables {
274    pub fragment_id: FragmentId,
275    pub job_id: ObjectId,
276    pub state_table_ids: I32Array,
277}
278
279#[derive(Clone, DerivePartialModel, FromQueryResult)]
280#[sea_orm(entity = "Actor")]
281pub struct PartialActorLocation {
282    pub actor_id: ActorId,
283    pub fragment_id: FragmentId,
284    pub worker_id: WorkerId,
285    pub status: ActorStatus,
286}
287
288#[derive(FromQueryResult)]
289pub struct FragmentDesc {
290    pub fragment_id: FragmentId,
291    pub job_id: ObjectId,
292    pub fragment_type_mask: i32,
293    pub distribution_type: DistributionType,
294    pub state_table_ids: I32Array,
295    pub parallelism: i64,
296    pub vnode_count: i32,
297    pub stream_node: StreamNode,
298}
299
300/// List all objects that are using the given one in a cascade way. It runs a recursive CTE to find all the dependencies.
301pub async fn get_referring_objects_cascade<C>(
302    obj_id: ObjectId,
303    db: &C,
304) -> MetaResult<Vec<PartialObject>>
305where
306    C: ConnectionTrait,
307{
308    let query = construct_obj_dependency_query(obj_id);
309    let (sql, values) = query.build_any(&*db.get_database_backend().get_query_builder());
310    let objects = PartialObject::find_by_statement(Statement::from_sql_and_values(
311        db.get_database_backend(),
312        sql,
313        values,
314    ))
315    .all(db)
316    .await?;
317    Ok(objects)
318}
319
320/// Check if create a sink with given dependent objects into the target table will cause a cycle, return true if it will.
321pub async fn check_sink_into_table_cycle<C>(
322    target_table: ObjectId,
323    dependent_objs: Vec<ObjectId>,
324    db: &C,
325) -> MetaResult<bool>
326where
327    C: ConnectionTrait,
328{
329    if dependent_objs.is_empty() {
330        return Ok(false);
331    }
332
333    // special check for self referencing
334    if dependent_objs.contains(&target_table) {
335        return Ok(true);
336    }
337
338    let query = construct_sink_cycle_check_query(target_table, dependent_objs);
339    let (sql, values) = query.build_any(&*db.get_database_backend().get_query_builder());
340
341    let res = db
342        .query_one(Statement::from_sql_and_values(
343            db.get_database_backend(),
344            sql,
345            values,
346        ))
347        .await?
348        .unwrap();
349
350    let cnt: i64 = res.try_get_by(0)?;
351
352    Ok(cnt != 0)
353}
354
355/// `ensure_object_id` ensures the existence of target object in the cluster.
356pub async fn ensure_object_id<C>(
357    object_type: ObjectType,
358    obj_id: ObjectId,
359    db: &C,
360) -> MetaResult<()>
361where
362    C: ConnectionTrait,
363{
364    let count = Object::find_by_id(obj_id).count(db).await?;
365    if count == 0 {
366        return Err(MetaError::catalog_id_not_found(
367            object_type.as_str(),
368            obj_id,
369        ));
370    }
371    Ok(())
372}
373
374/// `ensure_user_id` ensures the existence of target user in the cluster.
375pub async fn ensure_user_id<C>(user_id: UserId, db: &C) -> MetaResult<()>
376where
377    C: ConnectionTrait,
378{
379    let count = User::find_by_id(user_id).count(db).await?;
380    if count == 0 {
381        return Err(anyhow!("user {} was concurrently dropped", user_id).into());
382    }
383    Ok(())
384}
385
386/// `check_database_name_duplicate` checks whether the database name is already used in the cluster.
387pub async fn check_database_name_duplicate<C>(name: &str, db: &C) -> MetaResult<()>
388where
389    C: ConnectionTrait,
390{
391    let count = Database::find()
392        .filter(database::Column::Name.eq(name))
393        .count(db)
394        .await?;
395    if count > 0 {
396        assert_eq!(count, 1);
397        return Err(MetaError::catalog_duplicated("database", name));
398    }
399    Ok(())
400}
401
402/// `check_function_signature_duplicate` checks whether the function name and its signature is already used in the target namespace.
403pub async fn check_function_signature_duplicate<C>(
404    pb_function: &PbFunction,
405    db: &C,
406) -> MetaResult<()>
407where
408    C: ConnectionTrait,
409{
410    let count = Function::find()
411        .inner_join(Object)
412        .filter(
413            object::Column::DatabaseId
414                .eq(pb_function.database_id as DatabaseId)
415                .and(object::Column::SchemaId.eq(pb_function.schema_id as SchemaId))
416                .and(function::Column::Name.eq(&pb_function.name))
417                .and(
418                    function::Column::ArgTypes
419                        .eq(DataTypeArray::from(pb_function.arg_types.clone())),
420                ),
421        )
422        .count(db)
423        .await?;
424    if count > 0 {
425        assert_eq!(count, 1);
426        return Err(MetaError::catalog_duplicated("function", &pb_function.name));
427    }
428    Ok(())
429}
430
431/// `check_connection_name_duplicate` checks whether the connection name is already used in the target namespace.
432pub async fn check_connection_name_duplicate<C>(
433    pb_connection: &PbConnection,
434    db: &C,
435) -> MetaResult<()>
436where
437    C: ConnectionTrait,
438{
439    let count = Connection::find()
440        .inner_join(Object)
441        .filter(
442            object::Column::DatabaseId
443                .eq(pb_connection.database_id as DatabaseId)
444                .and(object::Column::SchemaId.eq(pb_connection.schema_id as SchemaId))
445                .and(connection::Column::Name.eq(&pb_connection.name)),
446        )
447        .count(db)
448        .await?;
449    if count > 0 {
450        assert_eq!(count, 1);
451        return Err(MetaError::catalog_duplicated(
452            "connection",
453            &pb_connection.name,
454        ));
455    }
456    Ok(())
457}
458
459pub async fn check_secret_name_duplicate<C>(pb_secret: &PbSecret, db: &C) -> MetaResult<()>
460where
461    C: ConnectionTrait,
462{
463    let count = Secret::find()
464        .inner_join(Object)
465        .filter(
466            object::Column::DatabaseId
467                .eq(pb_secret.database_id as DatabaseId)
468                .and(object::Column::SchemaId.eq(pb_secret.schema_id as SchemaId))
469                .and(secret::Column::Name.eq(&pb_secret.name)),
470        )
471        .count(db)
472        .await?;
473    if count > 0 {
474        assert_eq!(count, 1);
475        return Err(MetaError::catalog_duplicated("secret", &pb_secret.name));
476    }
477    Ok(())
478}
479
480pub async fn check_subscription_name_duplicate<C>(
481    pb_subscription: &PbSubscription,
482    db: &C,
483) -> MetaResult<()>
484where
485    C: ConnectionTrait,
486{
487    let count = Subscription::find()
488        .inner_join(Object)
489        .filter(
490            object::Column::DatabaseId
491                .eq(pb_subscription.database_id as DatabaseId)
492                .and(object::Column::SchemaId.eq(pb_subscription.schema_id as SchemaId))
493                .and(subscription::Column::Name.eq(&pb_subscription.name)),
494        )
495        .count(db)
496        .await?;
497    if count > 0 {
498        assert_eq!(count, 1);
499        return Err(MetaError::catalog_duplicated(
500            "subscription",
501            &pb_subscription.name,
502        ));
503    }
504    Ok(())
505}
506
507/// `check_user_name_duplicate` checks whether the user is already existed in the cluster.
508pub async fn check_user_name_duplicate<C>(name: &str, db: &C) -> MetaResult<()>
509where
510    C: ConnectionTrait,
511{
512    let count = User::find()
513        .filter(user::Column::Name.eq(name))
514        .count(db)
515        .await?;
516    if count > 0 {
517        assert_eq!(count, 1);
518        return Err(MetaError::catalog_duplicated("user", name));
519    }
520    Ok(())
521}
522
523/// `check_relation_name_duplicate` checks whether the relation name is already used in the target namespace.
524pub async fn check_relation_name_duplicate<C>(
525    name: &str,
526    database_id: DatabaseId,
527    schema_id: SchemaId,
528    db: &C,
529) -> MetaResult<()>
530where
531    C: ConnectionTrait,
532{
533    macro_rules! check_duplicated {
534        ($obj_type:expr, $entity:ident, $table:ident) => {
535            let object_id = Object::find()
536                .select_only()
537                .column(object::Column::Oid)
538                .inner_join($entity)
539                .filter(
540                    object::Column::DatabaseId
541                        .eq(Some(database_id))
542                        .and(object::Column::SchemaId.eq(Some(schema_id)))
543                        .and($table::Column::Name.eq(name)),
544                )
545                .into_tuple::<ObjectId>()
546                .one(db)
547                .await?;
548            if let Some(oid) = object_id {
549                let check_creation = if $obj_type == ObjectType::View {
550                    false
551                } else if $obj_type == ObjectType::Source {
552                    let source_info = Source::find_by_id(oid)
553                        .select_only()
554                        .column(source::Column::SourceInfo)
555                        .into_tuple::<Option<StreamSourceInfo>>()
556                        .one(db)
557                        .await?
558                        .unwrap();
559                    source_info.map_or(false, |info| info.to_protobuf().is_shared())
560                } else {
561                    true
562                };
563                return if check_creation
564                    && !matches!(
565                        StreamingJob::find_by_id(oid)
566                            .select_only()
567                            .column(streaming_job::Column::JobStatus)
568                            .into_tuple::<JobStatus>()
569                            .one(db)
570                            .await?,
571                        Some(JobStatus::Created)
572                    ) {
573                    Err(MetaError::catalog_under_creation(
574                        $obj_type.as_str(),
575                        name,
576                        oid,
577                    ))
578                } else {
579                    Err(MetaError::catalog_duplicated($obj_type.as_str(), name))
580                };
581            }
582        };
583    }
584    check_duplicated!(ObjectType::Table, Table, table);
585    check_duplicated!(ObjectType::Source, Source, source);
586    check_duplicated!(ObjectType::Sink, Sink, sink);
587    check_duplicated!(ObjectType::Index, Index, index);
588    check_duplicated!(ObjectType::View, View, view);
589
590    Ok(())
591}
592
593/// `check_schema_name_duplicate` checks whether the schema name is already used in the target database.
594pub async fn check_schema_name_duplicate<C>(
595    name: &str,
596    database_id: DatabaseId,
597    db: &C,
598) -> MetaResult<()>
599where
600    C: ConnectionTrait,
601{
602    let count = Object::find()
603        .inner_join(Schema)
604        .filter(
605            object::Column::ObjType
606                .eq(ObjectType::Schema)
607                .and(object::Column::DatabaseId.eq(Some(database_id)))
608                .and(schema::Column::Name.eq(name)),
609        )
610        .count(db)
611        .await?;
612    if count != 0 {
613        return Err(MetaError::catalog_duplicated("schema", name));
614    }
615
616    Ok(())
617}
618
619/// `ensure_object_not_refer` ensures that object is not used by any other ones except indexes.
620pub async fn ensure_object_not_refer<C>(
621    object_type: ObjectType,
622    object_id: ObjectId,
623    db: &C,
624) -> MetaResult<()>
625where
626    C: ConnectionTrait,
627{
628    // Ignore indexes.
629    let count = if object_type == ObjectType::Table {
630        ObjectDependency::find()
631            .join(
632                JoinType::InnerJoin,
633                object_dependency::Relation::Object1.def(),
634            )
635            .filter(
636                object_dependency::Column::Oid
637                    .eq(object_id)
638                    .and(object::Column::ObjType.ne(ObjectType::Index)),
639            )
640            .count(db)
641            .await?
642    } else {
643        ObjectDependency::find()
644            .filter(object_dependency::Column::Oid.eq(object_id))
645            .count(db)
646            .await?
647    };
648    if count != 0 {
649        return Err(MetaError::permission_denied(format!(
650            "{} used by {} other objects.",
651            object_type.as_str(),
652            count
653        )));
654    }
655    Ok(())
656}
657
658/// List all objects that are using the given one.
659pub async fn get_referring_objects<C>(object_id: ObjectId, db: &C) -> MetaResult<Vec<PartialObject>>
660where
661    C: ConnectionTrait,
662{
663    let objs = ObjectDependency::find()
664        .filter(object_dependency::Column::Oid.eq(object_id))
665        .join(
666            JoinType::InnerJoin,
667            object_dependency::Relation::Object1.def(),
668        )
669        .into_partial_model()
670        .all(db)
671        .await?;
672
673    Ok(objs)
674}
675
676/// `ensure_schema_empty` ensures that the schema is empty, used by `DROP SCHEMA`.
677pub async fn ensure_schema_empty<C>(schema_id: SchemaId, db: &C) -> MetaResult<()>
678where
679    C: ConnectionTrait,
680{
681    let count = Object::find()
682        .filter(object::Column::SchemaId.eq(Some(schema_id)))
683        .count(db)
684        .await?;
685    if count != 0 {
686        return Err(MetaError::permission_denied("schema is not empty"));
687    }
688
689    Ok(())
690}
691
692/// `list_user_info_by_ids` lists all users' info by their ids.
693pub async fn list_user_info_by_ids<C>(user_ids: Vec<UserId>, db: &C) -> MetaResult<Vec<PbUserInfo>>
694where
695    C: ConnectionTrait,
696{
697    let mut user_infos = vec![];
698    for user_id in user_ids {
699        let user = User::find_by_id(user_id)
700            .one(db)
701            .await?
702            .ok_or_else(|| MetaError::catalog_id_not_found("user", user_id))?;
703        let mut user_info: PbUserInfo = user.into();
704        user_info.grant_privileges = get_user_privilege(user_id, db).await?;
705        user_infos.push(user_info);
706    }
707    Ok(user_infos)
708}
709
710/// `get_object_owner` returns the owner of the given object.
711pub async fn get_object_owner<C>(object_id: ObjectId, db: &C) -> MetaResult<UserId>
712where
713    C: ConnectionTrait,
714{
715    let obj_owner: UserId = Object::find_by_id(object_id)
716        .select_only()
717        .column(object::Column::OwnerId)
718        .into_tuple()
719        .one(db)
720        .await?
721        .ok_or_else(|| MetaError::catalog_id_not_found("object", object_id))?;
722    Ok(obj_owner)
723}
724
725/// `construct_privilege_dependency_query` constructs a query to find all privileges that are dependent on the given one.
726///
727/// # Examples
728///
729/// ```
730/// use risingwave_meta::controller::utils::construct_privilege_dependency_query;
731/// use sea_orm::sea_query::*;
732/// use sea_orm::*;
733///
734/// let query = construct_privilege_dependency_query(vec![1, 2, 3]);
735///
736/// assert_eq!(
737///    query.to_string(MysqlQueryBuilder),
738///   r#"WITH RECURSIVE `granted_privilege_ids` (`id`, `user_id`) AS (SELECT `id`, `user_id` FROM `user_privilege` WHERE `user_privilege`.`id` IN (1, 2, 3) UNION ALL (SELECT `user_privilege`.`id`, `user_privilege`.`user_id` FROM `user_privilege` INNER JOIN `granted_privilege_ids` ON `granted_privilege_ids`.`id` = `dependent_id`)) SELECT `id`, `user_id` FROM `granted_privilege_ids`"#
739/// );
740/// assert_eq!(
741///   query.to_string(PostgresQueryBuilder),
742///  r#"WITH RECURSIVE "granted_privilege_ids" ("id", "user_id") AS (SELECT "id", "user_id" FROM "user_privilege" WHERE "user_privilege"."id" IN (1, 2, 3) UNION ALL (SELECT "user_privilege"."id", "user_privilege"."user_id" FROM "user_privilege" INNER JOIN "granted_privilege_ids" ON "granted_privilege_ids"."id" = "dependent_id")) SELECT "id", "user_id" FROM "granted_privilege_ids""#
743/// );
744/// assert_eq!(
745///  query.to_string(SqliteQueryBuilder),
746///  r#"WITH RECURSIVE "granted_privilege_ids" ("id", "user_id") AS (SELECT "id", "user_id" FROM "user_privilege" WHERE "user_privilege"."id" IN (1, 2, 3) UNION ALL SELECT "user_privilege"."id", "user_privilege"."user_id" FROM "user_privilege" INNER JOIN "granted_privilege_ids" ON "granted_privilege_ids"."id" = "dependent_id") SELECT "id", "user_id" FROM "granted_privilege_ids""#
747/// );
748/// ```
749pub fn construct_privilege_dependency_query(ids: Vec<PrivilegeId>) -> WithQuery {
750    let cte_alias = Alias::new("granted_privilege_ids");
751    let cte_return_privilege_alias = Alias::new("id");
752    let cte_return_user_alias = Alias::new("user_id");
753
754    let mut base_query = SelectStatement::new()
755        .columns([user_privilege::Column::Id, user_privilege::Column::UserId])
756        .from(UserPrivilege)
757        .and_where(user_privilege::Column::Id.is_in(ids))
758        .to_owned();
759
760    let cte_referencing = Query::select()
761        .columns([
762            (UserPrivilege, user_privilege::Column::Id),
763            (UserPrivilege, user_privilege::Column::UserId),
764        ])
765        .from(UserPrivilege)
766        .inner_join(
767            cte_alias.clone(),
768            Expr::col((cte_alias.clone(), cte_return_privilege_alias.clone()))
769                .equals(user_privilege::Column::DependentId),
770        )
771        .to_owned();
772
773    let common_table_expr = CommonTableExpression::new()
774        .query(base_query.union(UnionType::All, cte_referencing).to_owned())
775        .columns([
776            cte_return_privilege_alias.clone(),
777            cte_return_user_alias.clone(),
778        ])
779        .table_name(cte_alias.clone())
780        .to_owned();
781
782    SelectStatement::new()
783        .columns([cte_return_privilege_alias, cte_return_user_alias])
784        .from(cte_alias.clone())
785        .to_owned()
786        .with(
787            WithClause::new()
788                .recursive(true)
789                .cte(common_table_expr)
790                .to_owned(),
791        )
792        .to_owned()
793}
794
795pub async fn get_internal_tables_by_id<C>(job_id: ObjectId, db: &C) -> MetaResult<Vec<TableId>>
796where
797    C: ConnectionTrait,
798{
799    let table_ids: Vec<TableId> = Table::find()
800        .select_only()
801        .column(table::Column::TableId)
802        .filter(
803            table::Column::TableType
804                .eq(TableType::Internal)
805                .and(table::Column::BelongsToJobId.eq(job_id)),
806        )
807        .into_tuple()
808        .all(db)
809        .await?;
810    Ok(table_ids)
811}
812
813pub async fn get_index_state_tables_by_table_id<C>(
814    table_id: TableId,
815    db: &C,
816) -> MetaResult<Vec<TableId>>
817where
818    C: ConnectionTrait,
819{
820    let mut index_table_ids: Vec<TableId> = Index::find()
821        .select_only()
822        .column(index::Column::IndexTableId)
823        .filter(index::Column::PrimaryTableId.eq(table_id))
824        .into_tuple()
825        .all(db)
826        .await?;
827
828    if !index_table_ids.is_empty() {
829        let internal_table_ids: Vec<TableId> = Table::find()
830            .select_only()
831            .column(table::Column::TableId)
832            .filter(
833                table::Column::TableType
834                    .eq(TableType::Internal)
835                    .and(table::Column::BelongsToJobId.is_in(index_table_ids.clone())),
836            )
837            .into_tuple()
838            .all(db)
839            .await?;
840
841        index_table_ids.extend(internal_table_ids.into_iter());
842    }
843
844    Ok(index_table_ids)
845}
846
847#[derive(Clone, DerivePartialModel, FromQueryResult)]
848#[sea_orm(entity = "UserPrivilege")]
849pub struct PartialUserPrivilege {
850    pub id: PrivilegeId,
851    pub user_id: UserId,
852}
853
854pub async fn get_referring_privileges_cascade<C>(
855    ids: Vec<PrivilegeId>,
856    db: &C,
857) -> MetaResult<Vec<PartialUserPrivilege>>
858where
859    C: ConnectionTrait,
860{
861    let query = construct_privilege_dependency_query(ids);
862    let (sql, values) = query.build_any(&*db.get_database_backend().get_query_builder());
863    let privileges = PartialUserPrivilege::find_by_statement(Statement::from_sql_and_values(
864        db.get_database_backend(),
865        sql,
866        values,
867    ))
868    .all(db)
869    .await?;
870
871    Ok(privileges)
872}
873
874/// `ensure_privileges_not_referred` ensures that the privileges are not granted to any other users.
875pub async fn ensure_privileges_not_referred<C>(ids: Vec<PrivilegeId>, db: &C) -> MetaResult<()>
876where
877    C: ConnectionTrait,
878{
879    let count = UserPrivilege::find()
880        .filter(user_privilege::Column::DependentId.is_in(ids))
881        .count(db)
882        .await?;
883    if count != 0 {
884        return Err(MetaError::permission_denied(format!(
885            "privileges granted to {} other ones.",
886            count
887        )));
888    }
889    Ok(())
890}
891
892/// `get_user_privilege` returns the privileges of the given user.
893pub async fn get_user_privilege<C>(user_id: UserId, db: &C) -> MetaResult<Vec<PbGrantPrivilege>>
894where
895    C: ConnectionTrait,
896{
897    let user_privileges = UserPrivilege::find()
898        .find_also_related(Object)
899        .filter(user_privilege::Column::UserId.eq(user_id))
900        .all(db)
901        .await?;
902    Ok(user_privileges
903        .into_iter()
904        .map(|(privilege, object)| {
905            let object = object.unwrap();
906            let oid = object.oid as _;
907            let obj = match object.obj_type {
908                ObjectType::Database => PbGrantObject::DatabaseId(oid),
909                ObjectType::Schema => PbGrantObject::SchemaId(oid),
910                ObjectType::Table | ObjectType::Index => PbGrantObject::TableId(oid),
911                ObjectType::Source => PbGrantObject::SourceId(oid),
912                ObjectType::Sink => PbGrantObject::SinkId(oid),
913                ObjectType::View => PbGrantObject::ViewId(oid),
914                ObjectType::Function => PbGrantObject::FunctionId(oid),
915                ObjectType::Connection => unreachable!("connection is not supported yet"),
916                ObjectType::Subscription => PbGrantObject::SubscriptionId(oid),
917                ObjectType::Secret => unreachable!("secret is not supported yet"),
918            };
919            PbGrantPrivilege {
920                action_with_opts: vec![PbActionWithGrantOption {
921                    action: PbAction::from(privilege.action) as _,
922                    with_grant_option: privilege.with_grant_option,
923                    granted_by: privilege.granted_by as _,
924                }],
925                object: Some(obj),
926            }
927        })
928        .collect())
929}
930
931// todo: remove it after migrated to sql backend.
932pub fn extract_grant_obj_id(object: &PbGrantObject) -> ObjectId {
933    match object {
934        PbGrantObject::DatabaseId(id)
935        | PbGrantObject::SchemaId(id)
936        | PbGrantObject::TableId(id)
937        | PbGrantObject::SourceId(id)
938        | PbGrantObject::SinkId(id)
939        | PbGrantObject::ViewId(id)
940        | PbGrantObject::FunctionId(id)
941        | PbGrantObject::SubscriptionId(id)
942        | PbGrantObject::ConnectionId(id)
943        | PbGrantObject::SecretId(id) => *id as _,
944    }
945}
946
947pub async fn insert_fragment_relations(
948    db: &impl ConnectionTrait,
949    downstream_fragment_relations: &FragmentDownstreamRelation,
950) -> MetaResult<()> {
951    for (upstream_fragment_id, downstreams) in downstream_fragment_relations {
952        for downstream in downstreams {
953            let relation = fragment_relation::Model {
954                source_fragment_id: *upstream_fragment_id as _,
955                target_fragment_id: downstream.downstream_fragment_id as _,
956                dispatcher_type: downstream.dispatcher_type,
957                dist_key_indices: downstream
958                    .dist_key_indices
959                    .iter()
960                    .map(|idx| *idx as i32)
961                    .collect_vec()
962                    .into(),
963                output_indices: downstream
964                    .output_mapping
965                    .indices
966                    .iter()
967                    .map(|idx| *idx as i32)
968                    .collect_vec()
969                    .into(),
970                output_type_mapping: Some(downstream.output_mapping.types.clone().into()),
971            };
972            FragmentRelation::insert(relation.into_active_model())
973                .exec(db)
974                .await?;
975        }
976    }
977    Ok(())
978}
979
980pub async fn get_fragment_actor_dispatchers<C>(
981    db: &C,
982    fragment_ids: Vec<FragmentId>,
983) -> MetaResult<FragmentActorDispatchers>
984where
985    C: ConnectionTrait,
986{
987    type FragmentActorInfo = (
988        DistributionType,
989        Arc<HashMap<crate::model::ActorId, Option<Bitmap>>>,
990    );
991    let mut fragment_actor_cache: HashMap<FragmentId, FragmentActorInfo> = HashMap::new();
992    let get_fragment_actors = |fragment_id: FragmentId| async move {
993        let result: MetaResult<FragmentActorInfo> = try {
994            let mut fragment_actors = Fragment::find_by_id(fragment_id)
995                .find_with_related(Actor)
996                .filter(actor::Column::Status.eq(ActorStatus::Running))
997                .all(db)
998                .await?;
999            if fragment_actors.is_empty() {
1000                return Err(anyhow!("failed to find fragment: {}", fragment_id).into());
1001            }
1002            assert_eq!(
1003                fragment_actors.len(),
1004                1,
1005                "find multiple fragment {:?}",
1006                fragment_actors
1007            );
1008            let (fragment, actors) = fragment_actors.pop().unwrap();
1009            (
1010                fragment.distribution_type,
1011                Arc::new(
1012                    actors
1013                        .into_iter()
1014                        .map(|actor| {
1015                            (
1016                                actor.actor_id as _,
1017                                actor
1018                                    .vnode_bitmap
1019                                    .map(|bitmap| Bitmap::from(bitmap.to_protobuf())),
1020                            )
1021                        })
1022                        .collect(),
1023                ),
1024            )
1025        };
1026        result
1027    };
1028    let fragment_relations = FragmentRelation::find()
1029        .filter(fragment_relation::Column::SourceFragmentId.is_in(fragment_ids))
1030        .all(db)
1031        .await?;
1032
1033    let mut actor_dispatchers_map: HashMap<_, HashMap<_, Vec<_>>> = HashMap::new();
1034    for fragment_relation::Model {
1035        source_fragment_id,
1036        target_fragment_id,
1037        dispatcher_type,
1038        dist_key_indices,
1039        output_indices,
1040        output_type_mapping,
1041    } in fragment_relations
1042    {
1043        let (source_fragment_distribution, source_fragment_actors) = {
1044            let (distribution, actors) = {
1045                match fragment_actor_cache.entry(source_fragment_id) {
1046                    Entry::Occupied(entry) => entry.into_mut(),
1047                    Entry::Vacant(entry) => {
1048                        entry.insert(get_fragment_actors(source_fragment_id).await?)
1049                    }
1050                }
1051            };
1052            (*distribution, actors.clone())
1053        };
1054        let (target_fragment_distribution, target_fragment_actors) = {
1055            let (distribution, actors) = {
1056                match fragment_actor_cache.entry(target_fragment_id) {
1057                    Entry::Occupied(entry) => entry.into_mut(),
1058                    Entry::Vacant(entry) => {
1059                        entry.insert(get_fragment_actors(target_fragment_id).await?)
1060                    }
1061                }
1062            };
1063            (*distribution, actors.clone())
1064        };
1065        let output_mapping = PbDispatchOutputMapping {
1066            indices: output_indices.into_u32_array(),
1067            types: output_type_mapping.unwrap_or_default().to_protobuf(),
1068        };
1069        let dispatchers = compose_dispatchers(
1070            source_fragment_distribution,
1071            &source_fragment_actors,
1072            target_fragment_id as _,
1073            target_fragment_distribution,
1074            &target_fragment_actors,
1075            dispatcher_type,
1076            dist_key_indices.into_u32_array(),
1077            output_mapping,
1078        );
1079        let actor_dispatchers_map = actor_dispatchers_map
1080            .entry(source_fragment_id as _)
1081            .or_default();
1082        for (actor_id, dispatchers) in dispatchers {
1083            actor_dispatchers_map
1084                .entry(actor_id as _)
1085                .or_default()
1086                .push(dispatchers);
1087        }
1088    }
1089    Ok(actor_dispatchers_map)
1090}
1091
1092pub fn compose_dispatchers(
1093    source_fragment_distribution: DistributionType,
1094    source_fragment_actors: &HashMap<crate::model::ActorId, Option<Bitmap>>,
1095    target_fragment_id: crate::model::FragmentId,
1096    target_fragment_distribution: DistributionType,
1097    target_fragment_actors: &HashMap<crate::model::ActorId, Option<Bitmap>>,
1098    dispatcher_type: DispatcherType,
1099    dist_key_indices: Vec<u32>,
1100    output_mapping: PbDispatchOutputMapping,
1101) -> HashMap<crate::model::ActorId, PbDispatcher> {
1102    match dispatcher_type {
1103        DispatcherType::Hash => {
1104            let dispatcher = PbDispatcher {
1105                r#type: PbDispatcherType::from(dispatcher_type) as _,
1106                dist_key_indices: dist_key_indices.clone(),
1107                output_mapping: output_mapping.into(),
1108                hash_mapping: Some(
1109                    ActorMapping::from_bitmaps(
1110                        &target_fragment_actors
1111                            .iter()
1112                            .map(|(actor_id, bitmap)| {
1113                                (
1114                                    *actor_id as _,
1115                                    bitmap
1116                                        .clone()
1117                                        .expect("downstream hash dispatch must have distribution"),
1118                                )
1119                            })
1120                            .collect(),
1121                    )
1122                    .to_protobuf(),
1123                ),
1124                dispatcher_id: target_fragment_id as _,
1125                downstream_actor_id: target_fragment_actors
1126                    .keys()
1127                    .map(|actor_id| *actor_id as _)
1128                    .collect(),
1129            };
1130            source_fragment_actors
1131                .keys()
1132                .map(|source_actor_id| (*source_actor_id, dispatcher.clone()))
1133                .collect()
1134        }
1135        DispatcherType::Broadcast | DispatcherType::Simple => {
1136            let dispatcher = PbDispatcher {
1137                r#type: PbDispatcherType::from(dispatcher_type) as _,
1138                dist_key_indices: dist_key_indices.clone(),
1139                output_mapping: output_mapping.into(),
1140                hash_mapping: None,
1141                dispatcher_id: target_fragment_id as _,
1142                downstream_actor_id: target_fragment_actors
1143                    .keys()
1144                    .map(|actor_id| *actor_id as _)
1145                    .collect(),
1146            };
1147            source_fragment_actors
1148                .keys()
1149                .map(|source_actor_id| (*source_actor_id, dispatcher.clone()))
1150                .collect()
1151        }
1152        DispatcherType::NoShuffle => resolve_no_shuffle_actor_dispatcher(
1153            source_fragment_distribution,
1154            source_fragment_actors,
1155            target_fragment_distribution,
1156            target_fragment_actors,
1157        )
1158        .into_iter()
1159        .map(|(upstream_actor_id, downstream_actor_id)| {
1160            (
1161                upstream_actor_id,
1162                PbDispatcher {
1163                    r#type: PbDispatcherType::NoShuffle as _,
1164                    dist_key_indices: dist_key_indices.clone(),
1165                    output_mapping: output_mapping.clone().into(),
1166                    hash_mapping: None,
1167                    dispatcher_id: target_fragment_id as _,
1168                    downstream_actor_id: vec![downstream_actor_id as _],
1169                },
1170            )
1171        })
1172        .collect(),
1173    }
1174}
1175
1176/// return (`upstream_actor_id` -> `downstream_actor_id`)
1177pub fn resolve_no_shuffle_actor_dispatcher(
1178    source_fragment_distribution: DistributionType,
1179    source_fragment_actors: &HashMap<crate::model::ActorId, Option<Bitmap>>,
1180    target_fragment_distribution: DistributionType,
1181    target_fragment_actors: &HashMap<crate::model::ActorId, Option<Bitmap>>,
1182) -> Vec<(crate::model::ActorId, crate::model::ActorId)> {
1183    assert_eq!(source_fragment_distribution, target_fragment_distribution);
1184    assert_eq!(
1185        source_fragment_actors.len(),
1186        target_fragment_actors.len(),
1187        "no-shuffle should have equal upstream downstream actor count: {:?} {:?}",
1188        source_fragment_actors,
1189        target_fragment_actors
1190    );
1191    match source_fragment_distribution {
1192        DistributionType::Single => {
1193            let assert_singleton = |bitmap: &Option<Bitmap>| {
1194                assert!(
1195                    bitmap.as_ref().map(|bitmap| bitmap.all()).unwrap_or(true),
1196                    "not singleton: {:?}",
1197                    bitmap
1198                );
1199            };
1200            assert_eq!(
1201                source_fragment_actors.len(),
1202                1,
1203                "singleton distribution actor count not 1: {:?}",
1204                source_fragment_distribution
1205            );
1206            assert_eq!(
1207                target_fragment_actors.len(),
1208                1,
1209                "singleton distribution actor count not 1: {:?}",
1210                target_fragment_distribution
1211            );
1212            let (source_actor_id, bitmap) = source_fragment_actors.iter().next().unwrap();
1213            assert_singleton(bitmap);
1214            let (target_actor_id, bitmap) = target_fragment_actors.iter().next().unwrap();
1215            assert_singleton(bitmap);
1216            vec![(*source_actor_id, *target_actor_id)]
1217        }
1218        DistributionType::Hash => {
1219            let mut target_fragment_actor_index: HashMap<_, _> = target_fragment_actors
1220                .iter()
1221                .map(|(actor_id, bitmap)| {
1222                    let bitmap = bitmap
1223                        .as_ref()
1224                        .expect("hash distribution should have bitmap");
1225                    let first_vnode = bitmap.iter_vnodes().next().expect("non-empty bitmap");
1226                    (first_vnode, (*actor_id, bitmap))
1227                })
1228                .collect();
1229            source_fragment_actors
1230                .iter()
1231                .map(|(source_actor_id, bitmap)| {
1232                    let bitmap = bitmap
1233                        .as_ref()
1234                        .expect("hash distribution should have bitmap");
1235                    let first_vnode = bitmap.iter_vnodes().next().expect("non-empty bitmap");
1236                    let (target_actor_id, target_bitmap) =
1237                        target_fragment_actor_index.remove(&first_vnode).unwrap_or_else(|| {
1238                            panic!(
1239                                "cannot find matched target actor: {} {:?} {:?} {:?}",
1240                                source_actor_id,
1241                                first_vnode,
1242                                source_fragment_actors,
1243                                target_fragment_actors
1244                            );
1245                        });
1246                    assert_eq!(
1247                        bitmap,
1248                        target_bitmap,
1249                        "cannot find matched target actor due to bitmap mismatch: {} {:?} {:?} {:?}",
1250                        source_actor_id,
1251                        first_vnode,
1252                        source_fragment_actors,
1253                        target_fragment_actors
1254                    );
1255                    (*source_actor_id, target_actor_id)
1256                }).collect()
1257        }
1258    }
1259}
1260
1261/// `get_fragment_mappings` returns the fragment vnode mappings of the given job.
1262pub async fn get_fragment_mappings<C>(
1263    db: &C,
1264    job_id: ObjectId,
1265) -> MetaResult<Vec<PbFragmentWorkerSlotMapping>>
1266where
1267    C: ConnectionTrait,
1268{
1269    let job_actors: Vec<(
1270        FragmentId,
1271        DistributionType,
1272        ActorId,
1273        Option<VnodeBitmap>,
1274        WorkerId,
1275        ActorStatus,
1276    )> = Actor::find()
1277        .select_only()
1278        .columns([
1279            fragment::Column::FragmentId,
1280            fragment::Column::DistributionType,
1281        ])
1282        .columns([
1283            actor::Column::ActorId,
1284            actor::Column::VnodeBitmap,
1285            actor::Column::WorkerId,
1286            actor::Column::Status,
1287        ])
1288        .join(JoinType::InnerJoin, actor::Relation::Fragment.def())
1289        .filter(fragment::Column::JobId.eq(job_id))
1290        .into_tuple()
1291        .all(db)
1292        .await?;
1293
1294    Ok(rebuild_fragment_mapping_from_actors(job_actors))
1295}
1296
1297pub fn rebuild_fragment_mapping_from_actors(
1298    job_actors: Vec<(
1299        FragmentId,
1300        DistributionType,
1301        ActorId,
1302        Option<VnodeBitmap>,
1303        WorkerId,
1304        ActorStatus,
1305    )>,
1306) -> Vec<FragmentWorkerSlotMapping> {
1307    let mut all_actor_locations = HashMap::new();
1308    let mut actor_bitmaps = HashMap::new();
1309    let mut fragment_actors = HashMap::new();
1310    let mut fragment_dist = HashMap::new();
1311
1312    for (fragment_id, dist, actor_id, bitmap, worker_id, actor_status) in job_actors {
1313        if actor_status == ActorStatus::Inactive {
1314            continue;
1315        }
1316
1317        all_actor_locations
1318            .entry(fragment_id)
1319            .or_insert(HashMap::new())
1320            .insert(actor_id as hash::ActorId, worker_id as u32);
1321        actor_bitmaps.insert(actor_id, bitmap);
1322        fragment_actors
1323            .entry(fragment_id)
1324            .or_insert_with(Vec::new)
1325            .push(actor_id);
1326        fragment_dist.insert(fragment_id, dist);
1327    }
1328
1329    let mut result = vec![];
1330    for (fragment_id, dist) in fragment_dist {
1331        let mut actor_locations = all_actor_locations.remove(&fragment_id).unwrap();
1332        let fragment_worker_slot_mapping = match dist {
1333            DistributionType::Single => {
1334                let actor = fragment_actors
1335                    .remove(&fragment_id)
1336                    .unwrap()
1337                    .into_iter()
1338                    .exactly_one()
1339                    .unwrap() as hash::ActorId;
1340                let actor_location = actor_locations.remove(&actor).unwrap();
1341
1342                WorkerSlotMapping::new_single(WorkerSlotId::new(actor_location, 0))
1343            }
1344            DistributionType::Hash => {
1345                let actors = fragment_actors.remove(&fragment_id).unwrap();
1346
1347                let all_actor_bitmaps: HashMap<_, _> = actors
1348                    .iter()
1349                    .map(|actor_id| {
1350                        let vnode_bitmap = actor_bitmaps
1351                            .remove(actor_id)
1352                            .flatten()
1353                            .expect("actor bitmap shouldn't be none in hash fragment");
1354
1355                        let bitmap = Bitmap::from(&vnode_bitmap.to_protobuf());
1356                        (*actor_id as hash::ActorId, bitmap)
1357                    })
1358                    .collect();
1359
1360                let actor_mapping = ActorMapping::from_bitmaps(&all_actor_bitmaps);
1361
1362                actor_mapping.to_worker_slot(&actor_locations)
1363            }
1364        };
1365
1366        result.push(PbFragmentWorkerSlotMapping {
1367            fragment_id: fragment_id as u32,
1368            mapping: Some(fragment_worker_slot_mapping.to_protobuf()),
1369        })
1370    }
1371    result
1372}
1373
1374/// `get_fragment_actor_ids` returns the fragment actor ids of the given fragments.
1375pub async fn get_fragment_actor_ids<C>(
1376    db: &C,
1377    fragment_ids: Vec<FragmentId>,
1378) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>>
1379where
1380    C: ConnectionTrait,
1381{
1382    let fragment_actors: Vec<(FragmentId, ActorId)> = Actor::find()
1383        .select_only()
1384        .columns([actor::Column::FragmentId, actor::Column::ActorId])
1385        .filter(actor::Column::FragmentId.is_in(fragment_ids))
1386        .into_tuple()
1387        .all(db)
1388        .await?;
1389
1390    Ok(fragment_actors.into_iter().into_group_map())
1391}
1392
1393/// For the given streaming jobs, returns
1394/// - All source fragments
1395/// - All actors
1396/// - All fragments
1397pub async fn get_fragments_for_jobs<C>(
1398    db: &C,
1399    streaming_jobs: Vec<ObjectId>,
1400) -> MetaResult<(
1401    HashMap<SourceId, BTreeSet<FragmentId>>,
1402    HashSet<ActorId>,
1403    HashSet<FragmentId>,
1404)>
1405where
1406    C: ConnectionTrait,
1407{
1408    if streaming_jobs.is_empty() {
1409        return Ok((HashMap::default(), HashSet::default(), HashSet::default()));
1410    }
1411
1412    let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find()
1413        .select_only()
1414        .columns([
1415            fragment::Column::FragmentId,
1416            fragment::Column::FragmentTypeMask,
1417            fragment::Column::StreamNode,
1418        ])
1419        .filter(fragment::Column::JobId.is_in(streaming_jobs))
1420        .into_tuple()
1421        .all(db)
1422        .await?;
1423    let actors: Vec<ActorId> = Actor::find()
1424        .select_only()
1425        .column(actor::Column::ActorId)
1426        .filter(
1427            actor::Column::FragmentId.is_in(fragments.iter().map(|(id, _, _)| *id).collect_vec()),
1428        )
1429        .into_tuple()
1430        .all(db)
1431        .await?;
1432
1433    let fragment_ids = fragments
1434        .iter()
1435        .map(|(fragment_id, _, _)| *fragment_id)
1436        .collect();
1437
1438    let mut source_fragment_ids: HashMap<SourceId, BTreeSet<FragmentId>> = HashMap::new();
1439    for (fragment_id, mask, stream_node) in fragments {
1440        if mask & PbFragmentTypeFlag::Source as i32 == 0 {
1441            continue;
1442        }
1443        if let Some(source_id) = stream_node.to_protobuf().find_stream_source() {
1444            source_fragment_ids
1445                .entry(source_id as _)
1446                .or_default()
1447                .insert(fragment_id);
1448        }
1449    }
1450
1451    Ok((
1452        source_fragment_ids,
1453        actors.into_iter().collect(),
1454        fragment_ids,
1455    ))
1456}
1457
1458/// Build a object group for notifying the deletion of the given objects.
1459///
1460/// Note that only id fields are filled in the object info, as the arguments are partial objects.
1461/// As a result, the returned notification info should only be used for deletion.
1462pub(crate) fn build_object_group_for_delete(
1463    partial_objects: Vec<PartialObject>,
1464) -> NotificationInfo {
1465    let mut objects = vec![];
1466    for obj in partial_objects {
1467        match obj.obj_type {
1468            ObjectType::Database => objects.push(PbObject {
1469                object_info: Some(PbObjectInfo::Database(PbDatabase {
1470                    id: obj.oid as _,
1471                    ..Default::default()
1472                })),
1473            }),
1474            ObjectType::Schema => objects.push(PbObject {
1475                object_info: Some(PbObjectInfo::Schema(PbSchema {
1476                    id: obj.oid as _,
1477                    database_id: obj.database_id.unwrap() as _,
1478                    ..Default::default()
1479                })),
1480            }),
1481            ObjectType::Table => objects.push(PbObject {
1482                object_info: Some(PbObjectInfo::Table(PbTable {
1483                    id: obj.oid as _,
1484                    schema_id: obj.schema_id.unwrap() as _,
1485                    database_id: obj.database_id.unwrap() as _,
1486                    ..Default::default()
1487                })),
1488            }),
1489            ObjectType::Source => objects.push(PbObject {
1490                object_info: Some(PbObjectInfo::Source(PbSource {
1491                    id: obj.oid as _,
1492                    schema_id: obj.schema_id.unwrap() as _,
1493                    database_id: obj.database_id.unwrap() as _,
1494                    ..Default::default()
1495                })),
1496            }),
1497            ObjectType::Sink => objects.push(PbObject {
1498                object_info: Some(PbObjectInfo::Sink(PbSink {
1499                    id: obj.oid as _,
1500                    schema_id: obj.schema_id.unwrap() as _,
1501                    database_id: obj.database_id.unwrap() as _,
1502                    ..Default::default()
1503                })),
1504            }),
1505            ObjectType::Subscription => objects.push(PbObject {
1506                object_info: Some(PbObjectInfo::Subscription(PbSubscription {
1507                    id: obj.oid as _,
1508                    schema_id: obj.schema_id.unwrap() as _,
1509                    database_id: obj.database_id.unwrap() as _,
1510                    ..Default::default()
1511                })),
1512            }),
1513            ObjectType::View => objects.push(PbObject {
1514                object_info: Some(PbObjectInfo::View(PbView {
1515                    id: obj.oid as _,
1516                    schema_id: obj.schema_id.unwrap() as _,
1517                    database_id: obj.database_id.unwrap() as _,
1518                    ..Default::default()
1519                })),
1520            }),
1521            ObjectType::Index => {
1522                objects.push(PbObject {
1523                    object_info: Some(PbObjectInfo::Index(PbIndex {
1524                        id: obj.oid as _,
1525                        schema_id: obj.schema_id.unwrap() as _,
1526                        database_id: obj.database_id.unwrap() as _,
1527                        ..Default::default()
1528                    })),
1529                });
1530                objects.push(PbObject {
1531                    object_info: Some(PbObjectInfo::Table(PbTable {
1532                        id: obj.oid as _,
1533                        schema_id: obj.schema_id.unwrap() as _,
1534                        database_id: obj.database_id.unwrap() as _,
1535                        ..Default::default()
1536                    })),
1537                });
1538            }
1539            ObjectType::Function => objects.push(PbObject {
1540                object_info: Some(PbObjectInfo::Function(PbFunction {
1541                    id: obj.oid as _,
1542                    schema_id: obj.schema_id.unwrap() as _,
1543                    database_id: obj.database_id.unwrap() as _,
1544                    ..Default::default()
1545                })),
1546            }),
1547            ObjectType::Connection => objects.push(PbObject {
1548                object_info: Some(PbObjectInfo::Connection(PbConnection {
1549                    id: obj.oid as _,
1550                    schema_id: obj.schema_id.unwrap() as _,
1551                    database_id: obj.database_id.unwrap() as _,
1552                    ..Default::default()
1553                })),
1554            }),
1555            ObjectType::Secret => objects.push(PbObject {
1556                object_info: Some(PbObjectInfo::Secret(PbSecret {
1557                    id: obj.oid as _,
1558                    schema_id: obj.schema_id.unwrap() as _,
1559                    database_id: obj.database_id.unwrap() as _,
1560                    ..Default::default()
1561                })),
1562            }),
1563        }
1564    }
1565    NotificationInfo::ObjectGroup(PbObjectGroup { objects })
1566}
1567
1568pub fn extract_external_table_name_from_definition(table_definition: &str) -> Option<String> {
1569    let [mut definition]: [_; 1] = Parser::parse_sql(table_definition)
1570        .context("unable to parse table definition")
1571        .inspect_err(|e| {
1572            tracing::error!(
1573                target: "auto_schema_change",
1574                error = %e.as_report(),
1575                "failed to parse table definition")
1576        })
1577        .unwrap()
1578        .try_into()
1579        .unwrap();
1580    if let SqlStatement::CreateTable { cdc_table_info, .. } = &mut definition {
1581        cdc_table_info
1582            .clone()
1583            .map(|cdc_table_info| cdc_table_info.external_table_name)
1584    } else {
1585        None
1586    }
1587}
1588
1589/// `rename_relation` renames the target relation and its definition,
1590/// it commits the changes to the transaction and returns the updated relations and the old name.
1591pub async fn rename_relation(
1592    txn: &DatabaseTransaction,
1593    object_type: ObjectType,
1594    object_id: ObjectId,
1595    object_name: &str,
1596) -> MetaResult<(Vec<PbObject>, String)> {
1597    use sea_orm::ActiveModelTrait;
1598
1599    use crate::controller::rename::alter_relation_rename;
1600
1601    let mut to_update_relations = vec![];
1602    // rename relation.
1603    macro_rules! rename_relation {
1604        ($entity:ident, $table:ident, $identity:ident, $object_id:expr) => {{
1605            let (mut relation, obj) = $entity::find_by_id($object_id)
1606                .find_also_related(Object)
1607                .one(txn)
1608                .await?
1609                .unwrap();
1610            let obj = obj.unwrap();
1611            let old_name = relation.name.clone();
1612            relation.name = object_name.into();
1613            if obj.obj_type != ObjectType::View {
1614                relation.definition = alter_relation_rename(&relation.definition, object_name);
1615            }
1616            let active_model = $table::ActiveModel {
1617                $identity: Set(relation.$identity),
1618                name: Set(object_name.into()),
1619                definition: Set(relation.definition.clone()),
1620                ..Default::default()
1621            };
1622            active_model.update(txn).await?;
1623            to_update_relations.push(PbObject {
1624                object_info: Some(PbObjectInfo::$entity(ObjectModel(relation, obj).into())),
1625            });
1626            old_name
1627        }};
1628    }
1629    // TODO: check is there any thing to change for shared source?
1630    let old_name = match object_type {
1631        ObjectType::Table => {
1632            let associated_source_id: Option<SourceId> = Source::find()
1633                .select_only()
1634                .column(source::Column::SourceId)
1635                .filter(source::Column::OptionalAssociatedTableId.eq(object_id))
1636                .into_tuple()
1637                .one(txn)
1638                .await?;
1639            if let Some(source_id) = associated_source_id {
1640                rename_relation!(Source, source, source_id, source_id);
1641            }
1642            rename_relation!(Table, table, table_id, object_id)
1643        }
1644        ObjectType::Source => rename_relation!(Source, source, source_id, object_id),
1645        ObjectType::Sink => rename_relation!(Sink, sink, sink_id, object_id),
1646        ObjectType::Subscription => {
1647            rename_relation!(Subscription, subscription, subscription_id, object_id)
1648        }
1649        ObjectType::View => rename_relation!(View, view, view_id, object_id),
1650        ObjectType::Index => {
1651            let (mut index, obj) = Index::find_by_id(object_id)
1652                .find_also_related(Object)
1653                .one(txn)
1654                .await?
1655                .unwrap();
1656            index.name = object_name.into();
1657            let index_table_id = index.index_table_id;
1658            let old_name = rename_relation!(Table, table, table_id, index_table_id);
1659
1660            // the name of index and its associated table is the same.
1661            let active_model = index::ActiveModel {
1662                index_id: sea_orm::ActiveValue::Set(index.index_id),
1663                name: sea_orm::ActiveValue::Set(object_name.into()),
1664                ..Default::default()
1665            };
1666            active_model.update(txn).await?;
1667            to_update_relations.push(PbObject {
1668                object_info: Some(PbObjectInfo::Index(ObjectModel(index, obj.unwrap()).into())),
1669            });
1670            old_name
1671        }
1672        _ => unreachable!("only relation name can be altered."),
1673    };
1674
1675    Ok((to_update_relations, old_name))
1676}
1677
1678pub async fn get_database_resource_group<C>(txn: &C, database_id: ObjectId) -> MetaResult<String>
1679where
1680    C: ConnectionTrait,
1681{
1682    let database_resource_group: Option<String> = Database::find_by_id(database_id)
1683        .select_only()
1684        .column(database::Column::ResourceGroup)
1685        .into_tuple()
1686        .one(txn)
1687        .await?
1688        .ok_or_else(|| MetaError::catalog_id_not_found("database", database_id))?;
1689
1690    Ok(database_resource_group.unwrap_or_else(|| DEFAULT_RESOURCE_GROUP.to_owned()))
1691}
1692
1693pub async fn get_existing_job_resource_group<C>(
1694    txn: &C,
1695    streaming_job_id: ObjectId,
1696) -> MetaResult<String>
1697where
1698    C: ConnectionTrait,
1699{
1700    let (job_specific_resource_group, database_resource_group): (Option<String>, Option<String>) =
1701        StreamingJob::find_by_id(streaming_job_id)
1702            .select_only()
1703            .join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
1704            .join(JoinType::InnerJoin, object::Relation::Database2.def())
1705            .column(streaming_job::Column::SpecificResourceGroup)
1706            .column(database::Column::ResourceGroup)
1707            .into_tuple()
1708            .one(txn)
1709            .await?
1710            .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", streaming_job_id))?;
1711
1712    Ok(job_specific_resource_group.unwrap_or_else(|| {
1713        database_resource_group.unwrap_or_else(|| DEFAULT_RESOURCE_GROUP.to_owned())
1714    }))
1715}
1716
1717pub fn filter_workers_by_resource_group(
1718    workers: &HashMap<u32, WorkerNode>,
1719    resource_group: &str,
1720) -> BTreeSet<WorkerId> {
1721    workers
1722        .iter()
1723        .filter(|&(_, worker)| {
1724            worker
1725                .resource_group()
1726                .map(|node_label| node_label.as_str() == resource_group)
1727                .unwrap_or(false)
1728        })
1729        .map(|(id, _)| (*id as WorkerId))
1730        .collect()
1731}
1732
1733/// `rename_relation_refer` updates the definition of relations that refer to the target one,
1734/// it commits the changes to the transaction and returns all the updated relations.
1735pub async fn rename_relation_refer(
1736    txn: &DatabaseTransaction,
1737    object_type: ObjectType,
1738    object_id: ObjectId,
1739    object_name: &str,
1740    old_name: &str,
1741) -> MetaResult<Vec<PbObject>> {
1742    use sea_orm::ActiveModelTrait;
1743
1744    use crate::controller::rename::alter_relation_rename_refs;
1745
1746    let mut to_update_relations = vec![];
1747    macro_rules! rename_relation_ref {
1748        ($entity:ident, $table:ident, $identity:ident, $object_id:expr) => {{
1749            let (mut relation, obj) = $entity::find_by_id($object_id)
1750                .find_also_related(Object)
1751                .one(txn)
1752                .await?
1753                .unwrap();
1754            relation.definition =
1755                alter_relation_rename_refs(&relation.definition, old_name, object_name);
1756            let active_model = $table::ActiveModel {
1757                $identity: Set(relation.$identity),
1758                definition: Set(relation.definition.clone()),
1759                ..Default::default()
1760            };
1761            active_model.update(txn).await?;
1762            to_update_relations.push(PbObject {
1763                object_info: Some(PbObjectInfo::$entity(
1764                    ObjectModel(relation, obj.unwrap()).into(),
1765                )),
1766            });
1767        }};
1768    }
1769    let mut objs = get_referring_objects(object_id, txn).await?;
1770    if object_type == ObjectType::Table {
1771        let incoming_sinks: I32Array = Table::find_by_id(object_id)
1772            .select_only()
1773            .column(table::Column::IncomingSinks)
1774            .into_tuple()
1775            .one(txn)
1776            .await?
1777            .ok_or_else(|| MetaError::catalog_id_not_found("table", object_id))?;
1778
1779        objs.extend(
1780            incoming_sinks
1781                .into_inner()
1782                .into_iter()
1783                .map(|id| PartialObject {
1784                    oid: id,
1785                    obj_type: ObjectType::Sink,
1786                    schema_id: None,
1787                    database_id: None,
1788                }),
1789        );
1790    }
1791
1792    for obj in objs {
1793        match obj.obj_type {
1794            ObjectType::Table => rename_relation_ref!(Table, table, table_id, obj.oid),
1795            ObjectType::Sink => rename_relation_ref!(Sink, sink, sink_id, obj.oid),
1796            ObjectType::Subscription => {
1797                rename_relation_ref!(Subscription, subscription, subscription_id, obj.oid)
1798            }
1799            ObjectType::View => rename_relation_ref!(View, view, view_id, obj.oid),
1800            ObjectType::Index => {
1801                let index_table_id: Option<TableId> = Index::find_by_id(obj.oid)
1802                    .select_only()
1803                    .column(index::Column::IndexTableId)
1804                    .into_tuple()
1805                    .one(txn)
1806                    .await?;
1807                rename_relation_ref!(Table, table, table_id, index_table_id.unwrap());
1808            }
1809            _ => {
1810                bail!("only table, sink, subscription, view and index depend on other objects.")
1811            }
1812        }
1813    }
1814
1815    Ok(to_update_relations)
1816}
1817
1818/// Validate that subscription can be safely deleted, meeting any of the following conditions:
1819/// 1. The upstream table is not referred to by any cross-db mv.
1820/// 2. After deleting the subscription, the upstream table still has at least one subscription.
1821pub async fn validate_subscription_deletion<C>(txn: &C, subscription_id: ObjectId) -> MetaResult<()>
1822where
1823    C: ConnectionTrait,
1824{
1825    let upstream_table_id: ObjectId = Subscription::find_by_id(subscription_id)
1826        .select_only()
1827        .column(subscription::Column::DependentTableId)
1828        .into_tuple()
1829        .one(txn)
1830        .await?
1831        .ok_or_else(|| MetaError::catalog_id_not_found("subscription", subscription_id))?;
1832
1833    let cnt = Subscription::find()
1834        .filter(subscription::Column::DependentTableId.eq(upstream_table_id))
1835        .count(txn)
1836        .await?;
1837    if cnt > 1 {
1838        // Ensure that at least one subscription is remained for the upstream table
1839        // once the subscription is dropped.
1840        return Ok(());
1841    }
1842
1843    // Ensure that the upstream table is not referred by any cross-db mv.
1844    let obj_alias = Alias::new("o1");
1845    let used_by_alias = Alias::new("o2");
1846    let count = ObjectDependency::find()
1847        .join_as(
1848            JoinType::InnerJoin,
1849            object_dependency::Relation::Object2.def(),
1850            obj_alias.clone(),
1851        )
1852        .join_as(
1853            JoinType::InnerJoin,
1854            object_dependency::Relation::Object1.def(),
1855            used_by_alias.clone(),
1856        )
1857        .filter(
1858            object_dependency::Column::Oid
1859                .eq(upstream_table_id)
1860                .and(object_dependency::Column::UsedBy.ne(subscription_id))
1861                .and(
1862                    Expr::col((obj_alias, object::Column::DatabaseId))
1863                        .ne(Expr::col((used_by_alias, object::Column::DatabaseId))),
1864                ),
1865        )
1866        .count(txn)
1867        .await?;
1868
1869    if count != 0 {
1870        return Err(MetaError::permission_denied(format!(
1871            "Referenced by {} cross-db objects.",
1872            count
1873        )));
1874    }
1875
1876    Ok(())
1877}
1878
1879#[cfg(test)]
1880mod tests {
1881    use super::*;
1882
1883    #[test]
1884    fn test_extract_cdc_table_name() {
1885        let ddl1 = "CREATE TABLE t1 () FROM pg_source TABLE 'public.t1'";
1886        let ddl2 = "CREATE TABLE t2 (v1 int) FROM pg_source TABLE 'mydb.t2'";
1887        assert_eq!(
1888            extract_external_table_name_from_definition(ddl1),
1889            Some("public.t1".into())
1890        );
1891        assert_eq!(
1892            extract_external_table_name_from_definition(ddl2),
1893            Some("mydb.t2".into())
1894        );
1895    }
1896}