risingwave_meta/controller/catalog/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod alter_op;
16mod create_op;
17mod drop_op;
18mod get_op;
19mod list_op;
20mod test;
21mod util;
22
23use std::collections::{BTreeSet, HashMap, HashSet};
24use std::iter;
25use std::mem::take;
26use std::sync::Arc;
27
28use anyhow::anyhow;
29use itertools::Itertools;
30use risingwave_common::catalog::{
31    DEFAULT_SCHEMA_NAME, FragmentTypeFlag, FragmentTypeMask, SYSTEM_SCHEMAS, TableOption,
32};
33use risingwave_common::current_cluster_version;
34use risingwave_common::id::JobId;
35use risingwave_common::secret::LocalSecretManager;
36use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont_mut;
37use risingwave_connector::source::UPSTREAM_SOURCE_KEY;
38use risingwave_connector::source::cdc::build_cdc_table_id;
39use risingwave_meta_model::object::ObjectType;
40use risingwave_meta_model::prelude::*;
41use risingwave_meta_model::table::TableType;
42use risingwave_meta_model::{
43    ActorId, ColumnCatalogArray, ConnectionId, CreateType, DatabaseId, FragmentId, I32Array,
44    IndexId, JobStatus, ObjectId, Property, SchemaId, SecretId, SinkFormatDesc, SinkId, SourceId,
45    StreamNode, StreamSourceInfo, StreamingParallelism, SubscriptionId, TableId, TableIdArray,
46    UserId, ViewId, connection, database, fragment, function, index, object, object_dependency,
47    pending_sink_state, schema, secret, sink, source, streaming_job, subscription, table,
48    user_privilege, view,
49};
50use risingwave_pb::catalog::connection::Info as ConnectionInfo;
51use risingwave_pb::catalog::subscription::SubscriptionState;
52use risingwave_pb::catalog::table::PbTableType;
53use risingwave_pb::catalog::{
54    PbComment, PbConnection, PbDatabase, PbFunction, PbIndex, PbSchema, PbSecret, PbSink, PbSource,
55    PbStreamJobStatus, PbSubscription, PbTable, PbView,
56};
57use risingwave_pb::meta::cancel_creating_jobs_request::PbCreatingJobInfo;
58use risingwave_pb::meta::list_object_dependencies_response::PbObjectDependencies;
59use risingwave_pb::meta::object::PbObjectInfo;
60use risingwave_pb::meta::subscribe_response::{
61    Info as NotificationInfo, Info, Operation as NotificationOperation, Operation,
62};
63use risingwave_pb::meta::{PbObject, PbObjectGroup};
64use risingwave_pb::stream_plan::stream_node::NodeBody;
65use risingwave_pb::telemetry::PbTelemetryEventStage;
66use risingwave_pb::user::PbUserInfo;
67use sea_orm::ActiveValue::Set;
68use sea_orm::sea_query::{Expr, Query, SimpleExpr};
69use sea_orm::{
70    ActiveModelTrait, ColumnTrait, DatabaseConnection, DatabaseTransaction, EntityTrait,
71    IntoActiveModel, JoinType, PaginatorTrait, QueryFilter, QuerySelect, RelationTrait,
72    SelectColumns, TransactionTrait, Value,
73};
74use tokio::sync::oneshot::Sender;
75use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
76use tracing::info;
77
78use super::utils::{
79    check_subscription_name_duplicate, get_internal_tables_by_id, rename_relation,
80    rename_relation_refer,
81};
82use crate::controller::ObjectModel;
83use crate::controller::catalog::util::update_internal_tables;
84use crate::controller::fragment::FragmentTypeMaskExt;
85use crate::controller::utils::*;
86use crate::manager::{
87    IGNORED_NOTIFICATION_VERSION, MetaSrvEnv, NotificationVersion,
88    get_referred_connection_ids_from_source, get_referred_secret_ids_from_source,
89};
90use crate::rpc::ddl_controller::DropMode;
91use crate::telemetry::{MetaTelemetryJobDesc, report_event};
92use crate::{MetaError, MetaResult};
93
94pub type Catalog = (
95    Vec<PbDatabase>,
96    Vec<PbSchema>,
97    Vec<PbTable>,
98    Vec<PbSource>,
99    Vec<PbSink>,
100    Vec<PbSubscription>,
101    Vec<PbIndex>,
102    Vec<PbView>,
103    Vec<PbFunction>,
104    Vec<PbConnection>,
105    Vec<PbSecret>,
106);
107
108pub type CatalogControllerRef = Arc<CatalogController>;
109
110/// `CatalogController` is the controller for catalog related operations, including database, schema, table, view, etc.
111pub struct CatalogController {
112    pub(crate) env: MetaSrvEnv,
113    pub(crate) inner: RwLock<CatalogControllerInner>,
114}
115
116#[derive(Clone, Default, Debug)]
117pub struct DropTableConnectorContext {
118    // we only apply one drop connector action for one table each time, so no need to vector here
119    pub(crate) to_change_streaming_job_id: JobId,
120    pub(crate) to_remove_state_table_id: TableId,
121    pub(crate) to_remove_source_id: SourceId,
122}
123
124#[derive(Clone, Default, Debug)]
125pub struct ReleaseContext {
126    pub(crate) database_id: DatabaseId,
127    pub(crate) removed_streaming_job_ids: Vec<JobId>,
128    /// Dropped state table list, need to unregister from hummock.
129    pub(crate) removed_state_table_ids: Vec<TableId>,
130
131    /// Dropped secrets, need to remove from secret manager.
132    pub(crate) removed_secret_ids: Vec<SecretId>,
133    /// Dropped sources (when `DROP SOURCE`), need to unregister from source manager.
134    pub(crate) removed_source_ids: Vec<SourceId>,
135    /// Dropped Source fragments (when `DROP MATERIALIZED VIEW` referencing sources),
136    /// need to unregister from source manager.
137    pub(crate) removed_source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
138
139    pub(crate) removed_actors: HashSet<ActorId>,
140    pub(crate) removed_fragments: HashSet<FragmentId>,
141
142    /// Removed sink fragment by target fragment.
143    pub(crate) removed_sink_fragment_by_targets: HashMap<FragmentId, Vec<FragmentId>>,
144
145    /// Dropped iceberg table sinks
146    pub(crate) removed_iceberg_table_sinks: Vec<PbSink>,
147}
148
149impl CatalogController {
150    pub async fn new(env: MetaSrvEnv) -> MetaResult<Self> {
151        let meta_store = env.meta_store();
152        let catalog_controller = Self {
153            env,
154            inner: RwLock::new(CatalogControllerInner {
155                db: meta_store.conn,
156                creating_table_finish_notifier: HashMap::new(),
157                dropped_tables: HashMap::new(),
158            }),
159        };
160
161        catalog_controller.init().await?;
162        Ok(catalog_controller)
163    }
164
165    /// Used in `NotificationService::subscribe`.
166    /// Need to pay attention to the order of acquiring locks to prevent deadlock problems.
167    pub async fn get_inner_read_guard(&self) -> RwLockReadGuard<'_, CatalogControllerInner> {
168        self.inner.read().await
169    }
170
171    pub async fn get_inner_write_guard(&self) -> RwLockWriteGuard<'_, CatalogControllerInner> {
172        self.inner.write().await
173    }
174}
175
176pub struct CatalogControllerInner {
177    pub(crate) db: DatabaseConnection,
178    /// Registered finish notifiers for creating tables.
179    ///
180    /// `DdlController` will update this map, and pass the `tx` side to `CatalogController`.
181    /// On notifying, we can remove the entry from this map.
182    #[expect(clippy::type_complexity)]
183    pub creating_table_finish_notifier:
184        HashMap<DatabaseId, HashMap<JobId, Vec<Sender<Result<NotificationVersion, String>>>>>,
185    /// Tables have been dropped from the meta store, but the corresponding barrier remains unfinished.
186    pub dropped_tables: HashMap<TableId, PbTable>,
187}
188
189impl CatalogController {
190    pub(crate) async fn notify_frontend(
191        &self,
192        operation: NotificationOperation,
193        info: NotificationInfo,
194    ) -> NotificationVersion {
195        self.env
196            .notification_manager()
197            .notify_frontend(operation, info)
198            .await
199    }
200
201    pub(crate) async fn notify_frontend_relation_info(
202        &self,
203        operation: NotificationOperation,
204        relation_info: PbObjectInfo,
205    ) -> NotificationVersion {
206        self.env
207            .notification_manager()
208            .notify_frontend_object_info(operation, relation_info)
209            .await
210    }
211
212    pub(crate) async fn current_notification_version(&self) -> NotificationVersion {
213        self.env.notification_manager().current_version().await
214    }
215}
216
217impl CatalogController {
218    pub async fn finish_create_subscription_catalog(
219        &self,
220        subscription_id: SubscriptionId,
221    ) -> MetaResult<()> {
222        let inner = self.inner.write().await;
223        let txn = inner.db.begin().await?;
224
225        // update `created_at` as now() and `created_at_cluster_version` as current cluster version.
226        let res = Object::update_many()
227            .col_expr(object::Column::CreatedAt, Expr::current_timestamp().into())
228            .col_expr(
229                object::Column::CreatedAtClusterVersion,
230                current_cluster_version().into(),
231            )
232            .filter(object::Column::Oid.eq(subscription_id))
233            .exec(&txn)
234            .await?;
235        if res.rows_affected == 0 {
236            return Err(MetaError::catalog_id_not_found(
237                "subscription",
238                subscription_id,
239            ));
240        }
241
242        // mark the target subscription as `Create`.
243        let job = subscription::ActiveModel {
244            subscription_id: Set(subscription_id),
245            subscription_state: Set(SubscriptionState::Created.into()),
246            ..Default::default()
247        };
248        job.update(&txn).await?;
249
250        let _ = grant_default_privileges_automatically(&txn, subscription_id).await?;
251
252        txn.commit().await?;
253
254        Ok(())
255    }
256
257    pub async fn notify_create_subscription(
258        &self,
259        subscription_id: SubscriptionId,
260    ) -> MetaResult<NotificationVersion> {
261        let inner = self.inner.read().await;
262        let (subscription, obj) = Subscription::find_by_id(subscription_id)
263            .find_also_related(Object)
264            .filter(subscription::Column::SubscriptionState.eq(SubscriptionState::Created as i32))
265            .one(&inner.db)
266            .await?
267            .ok_or_else(|| MetaError::catalog_id_not_found("subscription", subscription_id))?;
268
269        let mut version = self
270            .notify_frontend(
271                NotificationOperation::Add,
272                NotificationInfo::ObjectGroup(PbObjectGroup {
273                    objects: vec![PbObject {
274                        object_info: PbObjectInfo::Subscription(
275                            ObjectModel(subscription, obj.unwrap()).into(),
276                        )
277                        .into(),
278                    }],
279                }),
280            )
281            .await;
282
283        // notify default privileges about the new subscription
284        let updated_user_ids: Vec<UserId> = UserPrivilege::find()
285            .select_only()
286            .distinct()
287            .column(user_privilege::Column::UserId)
288            .filter(user_privilege::Column::Oid.eq(subscription_id.as_object_id()))
289            .into_tuple()
290            .all(&inner.db)
291            .await?;
292
293        if !updated_user_ids.is_empty() {
294            let updated_user_infos = list_user_info_by_ids(updated_user_ids, &inner.db).await?;
295            version = self.notify_users_update(updated_user_infos).await;
296        }
297
298        Ok(version)
299    }
300
301    // for telemetry
302    pub async fn get_connector_usage(&self) -> MetaResult<jsonbb::Value> {
303        // get connector usage by source/sink
304        // the expect format is like:
305        // {
306        //     "source": [{
307        //         "$source_id": {
308        //             "connector": "kafka",
309        //             "format": "plain",
310        //             "encode": "json"
311        //         },
312        //     }],
313        //     "sink": [{
314        //         "$sink_id": {
315        //             "connector": "pulsar",
316        //             "format": "upsert",
317        //             "encode": "avro"
318        //         },
319        //     }],
320        // }
321
322        let inner = self.inner.read().await;
323        let source_props_and_info: Vec<(i32, Property, Option<StreamSourceInfo>)> = Source::find()
324            .select_only()
325            .column(source::Column::SourceId)
326            .column(source::Column::WithProperties)
327            .column(source::Column::SourceInfo)
328            .into_tuple()
329            .all(&inner.db)
330            .await?;
331        let sink_props_and_info: Vec<(i32, Property, Option<SinkFormatDesc>)> = Sink::find()
332            .select_only()
333            .column(sink::Column::SinkId)
334            .column(sink::Column::Properties)
335            .column(sink::Column::SinkFormatDesc)
336            .into_tuple()
337            .all(&inner.db)
338            .await?;
339        drop(inner);
340
341        let get_connector_from_property = |property: &Property| -> String {
342            property
343                .0
344                .get(UPSTREAM_SOURCE_KEY)
345                .cloned()
346                .unwrap_or_default()
347        };
348
349        let source_report: Vec<jsonbb::Value> = source_props_and_info
350            .iter()
351            .map(|(oid, property, info)| {
352                let connector_name = get_connector_from_property(property);
353                let mut format = None;
354                let mut encode = None;
355                if let Some(info) = info {
356                    let pb_info = info.to_protobuf();
357                    format = Some(pb_info.format().as_str_name());
358                    encode = Some(pb_info.row_encode().as_str_name());
359                }
360                jsonbb::json!({
361                    oid.to_string(): {
362                        "connector": connector_name,
363                        "format": format,
364                        "encode": encode,
365                    },
366                })
367            })
368            .collect_vec();
369
370        let sink_report: Vec<jsonbb::Value> = sink_props_and_info
371            .iter()
372            .map(|(oid, property, info)| {
373                let connector_name = get_connector_from_property(property);
374                let mut format = None;
375                let mut encode = None;
376                if let Some(info) = info {
377                    let pb_info = info.to_protobuf();
378                    format = Some(pb_info.format().as_str_name());
379                    encode = Some(pb_info.encode().as_str_name());
380                }
381                jsonbb::json!({
382                    oid.to_string(): {
383                        "connector": connector_name,
384                        "format": format,
385                        "encode": encode,
386                    },
387                })
388            })
389            .collect_vec();
390
391        Ok(jsonbb::json!({
392                "source": source_report,
393                "sink": sink_report,
394        }))
395    }
396
397    pub async fn clean_dirty_subscription(
398        &self,
399        database_id: Option<DatabaseId>,
400    ) -> MetaResult<()> {
401        let inner = self.inner.write().await;
402        let txn = inner.db.begin().await?;
403        let filter_condition = object::Column::ObjType.eq(ObjectType::Subscription).and(
404            object::Column::Oid.not_in_subquery(
405                Query::select()
406                    .column(subscription::Column::SubscriptionId)
407                    .from(Subscription)
408                    .and_where(
409                        subscription::Column::SubscriptionState
410                            .eq(SubscriptionState::Created as i32),
411                    )
412                    .take(),
413            ),
414        );
415
416        let filter_condition = if let Some(database_id) = database_id {
417            filter_condition.and(object::Column::DatabaseId.eq(database_id))
418        } else {
419            filter_condition
420        };
421        Object::delete_many()
422            .filter(filter_condition)
423            .exec(&txn)
424            .await?;
425        txn.commit().await?;
426        // We don't need to notify the frontend, because the Init subscription is not send to frontend.
427        Ok(())
428    }
429
430    /// `clean_dirty_creating_jobs` cleans up creating jobs that are creating in Foreground mode or in Initial status.
431    pub async fn clean_dirty_creating_jobs(
432        &self,
433        database_id: Option<DatabaseId>,
434    ) -> MetaResult<Vec<SourceId>> {
435        let inner = self.inner.write().await;
436        let txn = inner.db.begin().await?;
437
438        let filter_condition = streaming_job::Column::JobStatus.eq(JobStatus::Initial).or(
439            streaming_job::Column::JobStatus
440                .eq(JobStatus::Creating)
441                .and(streaming_job::Column::CreateType.eq(CreateType::Foreground)),
442        );
443
444        let filter_condition = if let Some(database_id) = database_id {
445            filter_condition.and(object::Column::DatabaseId.eq(database_id))
446        } else {
447            filter_condition
448        };
449
450        let mut dirty_job_objs: Vec<PartialObject> = streaming_job::Entity::find()
451            .select_only()
452            .column(streaming_job::Column::JobId)
453            .columns([
454                object::Column::Oid,
455                object::Column::ObjType,
456                object::Column::SchemaId,
457                object::Column::DatabaseId,
458            ])
459            .join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
460            .filter(filter_condition)
461            .into_partial_model()
462            .all(&txn)
463            .await?;
464
465        // Check if there are any pending iceberg table jobs.
466        let dirty_iceberg_jobs = find_dirty_iceberg_table_jobs(&txn, database_id).await?;
467        if !dirty_iceberg_jobs.is_empty() {
468            dirty_job_objs.extend(dirty_iceberg_jobs);
469        }
470
471        Self::clean_dirty_sink_downstreams(&txn).await?;
472
473        if dirty_job_objs.is_empty() {
474            return Ok(vec![]);
475        }
476
477        self.log_cleaned_dirty_jobs(&dirty_job_objs, &txn).await?;
478
479        let dirty_job_ids = dirty_job_objs.iter().map(|obj| obj.oid).collect::<Vec<_>>();
480
481        // Filter out dummy objs for replacement.
482        // todo: we'd better introduce a new dummy object type for replacement.
483        let all_dirty_table_ids = dirty_job_objs
484            .iter()
485            .filter(|obj| obj.obj_type == ObjectType::Table)
486            .map(|obj| obj.oid)
487            .collect_vec();
488        let dirty_table_type_map: HashMap<ObjectId, TableType> = Table::find()
489            .select_only()
490            .column(table::Column::TableId)
491            .column(table::Column::TableType)
492            .filter(table::Column::TableId.is_in(all_dirty_table_ids))
493            .into_tuple::<(ObjectId, TableType)>()
494            .all(&txn)
495            .await?
496            .into_iter()
497            .collect();
498
499        let dirty_background_jobs: HashSet<ObjectId> = streaming_job::Entity::find()
500            .select_only()
501            .column(streaming_job::Column::JobId)
502            .filter(
503                streaming_job::Column::JobId
504                    .is_in(dirty_job_ids.clone())
505                    .and(streaming_job::Column::CreateType.eq(CreateType::Background)),
506            )
507            .into_tuple()
508            .all(&txn)
509            .await?
510            .into_iter()
511            .collect();
512
513        // notify delete for failed materialized views and background jobs.
514        let to_notify_objs = dirty_job_objs
515            .into_iter()
516            .filter(|obj| {
517                matches!(
518                    dirty_table_type_map.get(&obj.oid),
519                    Some(TableType::MaterializedView)
520                ) || dirty_background_jobs.contains(&obj.oid)
521            })
522            .collect_vec();
523
524        // The source ids for dirty tables with connector.
525        // FIXME: we should also clean dirty sources.
526        let dirty_associated_source_ids: Vec<SourceId> = Table::find()
527            .select_only()
528            .column(table::Column::OptionalAssociatedSourceId)
529            .filter(
530                table::Column::TableId
531                    .is_in(dirty_job_ids.clone())
532                    .and(table::Column::OptionalAssociatedSourceId.is_not_null()),
533            )
534            .into_tuple()
535            .all(&txn)
536            .await?;
537
538        let dirty_state_table_ids: Vec<TableId> = Table::find()
539            .select_only()
540            .column(table::Column::TableId)
541            .filter(table::Column::BelongsToJobId.is_in(dirty_job_ids.clone()))
542            .into_tuple()
543            .all(&txn)
544            .await?;
545
546        let dirty_internal_table_objs = Object::find()
547            .select_only()
548            .columns([
549                object::Column::Oid,
550                object::Column::ObjType,
551                object::Column::SchemaId,
552                object::Column::DatabaseId,
553            ])
554            .join(JoinType::InnerJoin, object::Relation::Table.def())
555            .filter(table::Column::BelongsToJobId.is_in(to_notify_objs.iter().map(|obj| obj.oid)))
556            .into_partial_model()
557            .all(&txn)
558            .await?;
559
560        let to_delete_objs: HashSet<ObjectId> = dirty_job_ids
561            .clone()
562            .into_iter()
563            .chain(
564                dirty_state_table_ids
565                    .into_iter()
566                    .map(|table_id| table_id.as_object_id()),
567            )
568            .chain(
569                dirty_associated_source_ids
570                    .iter()
571                    .map(|source_id| source_id.as_object_id()),
572            )
573            .collect();
574
575        let res = Object::delete_many()
576            .filter(object::Column::Oid.is_in(to_delete_objs))
577            .exec(&txn)
578            .await?;
579        assert!(res.rows_affected > 0);
580
581        txn.commit().await?;
582
583        let object_group = build_object_group_for_delete(
584            to_notify_objs
585                .into_iter()
586                .chain(dirty_internal_table_objs.into_iter())
587                .collect_vec(),
588        );
589
590        let _version = self
591            .notify_frontend(NotificationOperation::Delete, object_group)
592            .await;
593
594        Ok(dirty_associated_source_ids)
595    }
596
597    pub async fn comment_on(&self, comment: PbComment) -> MetaResult<NotificationVersion> {
598        let inner = self.inner.write().await;
599        let txn = inner.db.begin().await?;
600        ensure_object_id(ObjectType::Database, comment.database_id, &txn).await?;
601        ensure_object_id(ObjectType::Schema, comment.schema_id, &txn).await?;
602        let table_obj = Object::find_by_id(comment.table_id)
603            .one(&txn)
604            .await?
605            .ok_or_else(|| MetaError::catalog_id_not_found("table", comment.table_id))?;
606
607        let table = if let Some(col_idx) = comment.column_index {
608            let columns: ColumnCatalogArray = Table::find_by_id(comment.table_id)
609                .select_only()
610                .column(table::Column::Columns)
611                .into_tuple()
612                .one(&txn)
613                .await?
614                .ok_or_else(|| MetaError::catalog_id_not_found("table", comment.table_id))?;
615            let mut pb_columns = columns.to_protobuf();
616
617            let column = pb_columns
618                .get_mut(col_idx as usize)
619                .ok_or_else(|| MetaError::catalog_id_not_found("column", col_idx))?;
620            let column_desc = column.column_desc.as_mut().ok_or_else(|| {
621                anyhow!(
622                    "column desc at index {} for table id {} not found",
623                    col_idx,
624                    comment.table_id
625                )
626            })?;
627            column_desc.description = comment.description;
628            table::ActiveModel {
629                table_id: Set(comment.table_id),
630                columns: Set(pb_columns.into()),
631                ..Default::default()
632            }
633            .update(&txn)
634            .await?
635        } else {
636            table::ActiveModel {
637                table_id: Set(comment.table_id),
638                description: Set(comment.description),
639                ..Default::default()
640            }
641            .update(&txn)
642            .await?
643        };
644        txn.commit().await?;
645
646        let version = self
647            .notify_frontend_relation_info(
648                NotificationOperation::Update,
649                PbObjectInfo::Table(ObjectModel(table, table_obj).into()),
650            )
651            .await;
652
653        Ok(version)
654    }
655
656    async fn notify_hummock_dropped_tables(&self, tables: Vec<PbTable>) {
657        if tables.is_empty() {
658            return;
659        }
660        let objects = tables
661            .into_iter()
662            .map(|t| PbObject {
663                object_info: Some(PbObjectInfo::Table(t)),
664            })
665            .collect();
666        let group = NotificationInfo::ObjectGroup(PbObjectGroup { objects });
667        self.env
668            .notification_manager()
669            .notify_hummock(NotificationOperation::Delete, group.clone())
670            .await;
671        self.env
672            .notification_manager()
673            .notify_compactor(NotificationOperation::Delete, group)
674            .await;
675    }
676
677    pub async fn complete_dropped_tables(&self, table_ids: impl IntoIterator<Item = TableId>) {
678        let mut inner = self.inner.write().await;
679        let tables = inner.complete_dropped_tables(table_ids);
680        self.notify_hummock_dropped_tables(tables).await;
681    }
682
683    pub async fn cleanup_dropped_tables(&self) {
684        let mut inner = self.inner.write().await;
685        let tables = inner.dropped_tables.drain().map(|(_, t)| t).collect();
686        self.notify_hummock_dropped_tables(tables).await;
687    }
688
689    pub async fn stats(&self) -> MetaResult<CatalogStats> {
690        let inner = self.inner.read().await;
691
692        let mut table_num_map: HashMap<_, _> = Table::find()
693            .select_only()
694            .column(table::Column::TableType)
695            .column_as(table::Column::TableId.count(), "num")
696            .group_by(table::Column::TableType)
697            .having(table::Column::TableType.ne(TableType::Internal))
698            .into_tuple::<(TableType, i64)>()
699            .all(&inner.db)
700            .await?
701            .into_iter()
702            .map(|(table_type, num)| (table_type, num as u64))
703            .collect();
704
705        let source_num = Source::find().count(&inner.db).await?;
706        let sink_num = Sink::find().count(&inner.db).await?;
707        let function_num = Function::find().count(&inner.db).await?;
708        let streaming_job_num = StreamingJob::find().count(&inner.db).await?;
709
710        let actor_num = {
711            let guard = self.env.shared_actor_info.read_guard();
712            guard
713                .iter_over_fragments()
714                .map(|(_, fragment)| fragment.actors.len() as u64)
715                .sum::<u64>()
716        };
717        let database_num = Database::find().count(&inner.db).await?;
718
719        Ok(CatalogStats {
720            table_num: table_num_map.remove(&TableType::Table).unwrap_or(0),
721            mview_num: table_num_map
722                .remove(&TableType::MaterializedView)
723                .unwrap_or(0),
724            index_num: table_num_map.remove(&TableType::Index).unwrap_or(0),
725            source_num,
726            sink_num,
727            function_num,
728            streaming_job_num,
729            actor_num,
730            database_num,
731        })
732    }
733
734    pub async fn fetch_sink_with_state_table_ids(
735        &self,
736        sink_ids: HashSet<SinkId>,
737    ) -> MetaResult<HashMap<SinkId, Vec<TableId>>> {
738        let inner = self.inner.read().await;
739
740        let query = Fragment::find()
741            .select_only()
742            .columns([fragment::Column::JobId, fragment::Column::StateTableIds])
743            .filter(
744                fragment::Column::JobId
745                    .is_in(sink_ids)
746                    .and(FragmentTypeMask::intersects(FragmentTypeFlag::Sink)),
747            );
748
749        let rows: Vec<(JobId, TableIdArray)> = query.into_tuple().all(&inner.db).await?;
750
751        debug_assert!(rows.iter().map(|(job_id, _)| job_id).all_unique());
752
753        let result = rows
754            .into_iter()
755            .map(|(job_id, table_id_array)| (job_id.as_sink_id(), table_id_array.0))
756            .collect::<HashMap<_, _>>();
757
758        Ok(result)
759    }
760
761    pub async fn list_all_pending_sinks(
762        &self,
763        database_id: Option<DatabaseId>,
764    ) -> MetaResult<HashSet<SinkId>> {
765        let inner = self.inner.read().await;
766
767        let mut query = pending_sink_state::Entity::find()
768            .select_only()
769            .columns([pending_sink_state::Column::SinkId])
770            .filter(
771                pending_sink_state::Column::SinkState.eq(pending_sink_state::SinkState::Pending),
772            )
773            .distinct();
774
775        if let Some(db_id) = database_id {
776            query = query
777                .join(
778                    JoinType::InnerJoin,
779                    pending_sink_state::Relation::Object.def(),
780                )
781                .filter(object::Column::DatabaseId.eq(db_id));
782        }
783
784        let result: Vec<SinkId> = query.into_tuple().all(&inner.db).await?;
785
786        Ok(result.into_iter().collect())
787    }
788
789    pub async fn abort_pending_sink_epochs(
790        &self,
791        sink_committed_epoch: HashMap<SinkId, u64>,
792    ) -> MetaResult<()> {
793        let inner = self.inner.write().await;
794        let txn = inner.db.begin().await?;
795
796        for (sink_id, committed_epoch) in sink_committed_epoch {
797            pending_sink_state::Entity::update_many()
798                .col_expr(
799                    pending_sink_state::Column::SinkState,
800                    Expr::value(pending_sink_state::SinkState::Aborted),
801                )
802                .filter(
803                    pending_sink_state::Column::SinkId
804                        .eq(sink_id)
805                        .and(pending_sink_state::Column::Epoch.gt(committed_epoch as i64)),
806                )
807                .exec(&txn)
808                .await?;
809        }
810
811        txn.commit().await?;
812        Ok(())
813    }
814}
815
816/// `CatalogStats` is a struct to store the statistics of all catalogs.
817pub struct CatalogStats {
818    pub table_num: u64,
819    pub mview_num: u64,
820    pub index_num: u64,
821    pub source_num: u64,
822    pub sink_num: u64,
823    pub function_num: u64,
824    pub streaming_job_num: u64,
825    pub actor_num: u64,
826    pub database_num: u64,
827}
828
829impl CatalogControllerInner {
830    pub async fn snapshot(&self) -> MetaResult<(Catalog, Vec<PbUserInfo>)> {
831        let databases = self.list_databases().await?;
832        let schemas = self.list_schemas().await?;
833        let tables = self.list_tables().await?;
834        let sources = self.list_sources().await?;
835        let sinks = self.list_sinks().await?;
836        let subscriptions = self.list_subscriptions().await?;
837        let indexes = self.list_indexes().await?;
838        let views = self.list_views().await?;
839        let functions = self.list_functions().await?;
840        let connections = self.list_connections().await?;
841        let secrets = self.list_secrets().await?;
842
843        let users = self.list_users().await?;
844
845        Ok((
846            (
847                databases,
848                schemas,
849                tables,
850                sources,
851                sinks,
852                subscriptions,
853                indexes,
854                views,
855                functions,
856                connections,
857                secrets,
858            ),
859            users,
860        ))
861    }
862
863    async fn list_databases(&self) -> MetaResult<Vec<PbDatabase>> {
864        let db_objs = Database::find()
865            .find_also_related(Object)
866            .all(&self.db)
867            .await?;
868        Ok(db_objs
869            .into_iter()
870            .map(|(db, obj)| ObjectModel(db, obj.unwrap()).into())
871            .collect())
872    }
873
874    async fn list_schemas(&self) -> MetaResult<Vec<PbSchema>> {
875        let schema_objs = Schema::find()
876            .find_also_related(Object)
877            .all(&self.db)
878            .await?;
879
880        Ok(schema_objs
881            .into_iter()
882            .map(|(schema, obj)| ObjectModel(schema, obj.unwrap()).into())
883            .collect())
884    }
885
886    async fn list_users(&self) -> MetaResult<Vec<PbUserInfo>> {
887        let mut user_infos: Vec<PbUserInfo> = User::find()
888            .all(&self.db)
889            .await?
890            .into_iter()
891            .map(Into::into)
892            .collect();
893
894        for user_info in &mut user_infos {
895            user_info.grant_privileges = get_user_privilege(user_info.id as _, &self.db).await?;
896        }
897        Ok(user_infos)
898    }
899
900    /// `list_all_tables` return all tables and internal tables.
901    pub async fn list_all_state_tables(&self) -> MetaResult<Vec<PbTable>> {
902        let table_objs = Table::find()
903            .find_also_related(Object)
904            .all(&self.db)
905            .await?;
906
907        Ok(table_objs
908            .into_iter()
909            .map(|(table, obj)| ObjectModel(table, obj.unwrap()).into())
910            .collect())
911    }
912
913    /// `list_tables` return all `CREATED` tables, `CREATING` materialized views/ `BACKGROUND` jobs and internal tables that belong to them.
914    async fn list_tables(&self) -> MetaResult<Vec<PbTable>> {
915        let table_objs = Table::find()
916            .find_also_related(Object)
917            .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
918            .filter(
919                streaming_job::Column::JobStatus.eq(JobStatus::Created).or(
920                    table::Column::TableType
921                        .eq(TableType::MaterializedView)
922                        .or(streaming_job::Column::CreateType.eq(CreateType::Background)),
923                ),
924            )
925            .all(&self.db)
926            .await?;
927
928        let job_statuses: HashMap<JobId, JobStatus> = StreamingJob::find()
929            .select_only()
930            .column(streaming_job::Column::JobId)
931            .column(streaming_job::Column::JobStatus)
932            .filter(
933                streaming_job::Column::JobStatus
934                    .eq(JobStatus::Created)
935                    .or(streaming_job::Column::CreateType.eq(CreateType::Background)),
936            )
937            .into_tuple::<(JobId, JobStatus)>()
938            .all(&self.db)
939            .await?
940            .into_iter()
941            .collect();
942
943        let sink_ids: HashSet<SinkId> = Sink::find()
944            .select_only()
945            .column(sink::Column::SinkId)
946            .into_tuple::<SinkId>()
947            .all(&self.db)
948            .await?
949            .into_iter()
950            .collect();
951
952        let job_ids: HashSet<JobId> = table_objs
953            .iter()
954            .map(|(t, _)| t.table_id.as_job_id())
955            .chain(job_statuses.keys().cloned())
956            .chain(sink_ids.into_iter().map(|s| s.as_job_id()))
957            .collect();
958
959        let internal_table_objs = Table::find()
960            .find_also_related(Object)
961            .filter(
962                table::Column::TableType
963                    .eq(TableType::Internal)
964                    .and(table::Column::BelongsToJobId.is_in(job_ids)),
965            )
966            .all(&self.db)
967            .await?;
968
969        Ok(table_objs
970            .into_iter()
971            .chain(internal_table_objs.into_iter())
972            .map(|(table, obj)| {
973                // Correctly set the stream job status for creating materialized views and internal tables.
974                // If the table is not contained in `job_statuses`, it means the job is a creating mv.
975                let status: PbStreamJobStatus = if table.table_type == TableType::Internal {
976                    (*job_statuses
977                        .get(&table.belongs_to_job_id.unwrap())
978                        .unwrap_or(&JobStatus::Creating))
979                    .into()
980                } else {
981                    (*job_statuses
982                        .get(&table.table_id.as_job_id())
983                        .unwrap_or(&JobStatus::Creating))
984                    .into()
985                };
986                let mut pb_table: PbTable = ObjectModel(table, obj.unwrap()).into();
987                pb_table.stream_job_status = status.into();
988                pb_table
989            })
990            .collect())
991    }
992
993    /// `list_sources` return all sources and `CREATED` ones if contains any streaming jobs.
994    async fn list_sources(&self) -> MetaResult<Vec<PbSource>> {
995        let mut source_objs = Source::find()
996            .find_also_related(Object)
997            .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
998            .filter(
999                streaming_job::Column::JobStatus
1000                    .is_null()
1001                    .or(streaming_job::Column::JobStatus.eq(JobStatus::Created)),
1002            )
1003            .all(&self.db)
1004            .await?;
1005
1006        // filter out inner connector sources that are still under creating.
1007        let created_table_ids: HashSet<TableId> = Table::find()
1008            .select_only()
1009            .column(table::Column::TableId)
1010            .join(JoinType::InnerJoin, table::Relation::Object1.def())
1011            .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
1012            .filter(
1013                table::Column::OptionalAssociatedSourceId
1014                    .is_not_null()
1015                    .and(streaming_job::Column::JobStatus.eq(JobStatus::Created)),
1016            )
1017            .into_tuple()
1018            .all(&self.db)
1019            .await?
1020            .into_iter()
1021            .collect();
1022        source_objs.retain_mut(|(source, _)| {
1023            source.optional_associated_table_id.is_none()
1024                || created_table_ids.contains(&source.optional_associated_table_id.unwrap())
1025        });
1026
1027        Ok(source_objs
1028            .into_iter()
1029            .map(|(source, obj)| ObjectModel(source, obj.unwrap()).into())
1030            .collect())
1031    }
1032
1033    /// `list_sinks` return all sinks.
1034    async fn list_sinks(&self) -> MetaResult<Vec<PbSink>> {
1035        let sink_objs = Sink::find()
1036            .find_also_related(Object)
1037            .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
1038            .all(&self.db)
1039            .await?;
1040
1041        let creating_sinks: HashSet<_> = StreamingJob::find()
1042            .select_only()
1043            .column(streaming_job::Column::JobId)
1044            .filter(
1045                streaming_job::Column::JobStatus
1046                    .eq(JobStatus::Creating)
1047                    .and(
1048                        streaming_job::Column::JobId
1049                            .is_in(sink_objs.iter().map(|(sink, _)| sink.sink_id)),
1050                    ),
1051            )
1052            .into_tuple::<SinkId>()
1053            .all(&self.db)
1054            .await?
1055            .into_iter()
1056            .collect();
1057
1058        Ok(sink_objs
1059            .into_iter()
1060            .map(|(sink, obj)| {
1061                let is_creating = creating_sinks.contains(&sink.sink_id);
1062                let mut pb_sink: PbSink = ObjectModel(sink, obj.unwrap()).into();
1063                pb_sink.stream_job_status = if is_creating {
1064                    PbStreamJobStatus::Creating.into()
1065                } else {
1066                    PbStreamJobStatus::Created.into()
1067                };
1068                pb_sink
1069            })
1070            .collect())
1071    }
1072
1073    /// `list_subscriptions` return all `CREATED` subscriptions.
1074    async fn list_subscriptions(&self) -> MetaResult<Vec<PbSubscription>> {
1075        let subscription_objs = Subscription::find()
1076            .find_also_related(Object)
1077            .filter(subscription::Column::SubscriptionState.eq(SubscriptionState::Created as i32))
1078            .all(&self.db)
1079            .await?;
1080
1081        Ok(subscription_objs
1082            .into_iter()
1083            .map(|(subscription, obj)| ObjectModel(subscription, obj.unwrap()).into())
1084            .collect())
1085    }
1086
1087    async fn list_views(&self) -> MetaResult<Vec<PbView>> {
1088        let view_objs = View::find().find_also_related(Object).all(&self.db).await?;
1089
1090        Ok(view_objs
1091            .into_iter()
1092            .map(|(view, obj)| ObjectModel(view, obj.unwrap()).into())
1093            .collect())
1094    }
1095
1096    /// `list_indexes` return all `CREATED` and `BACKGROUND` indexes.
1097    async fn list_indexes(&self) -> MetaResult<Vec<PbIndex>> {
1098        let index_objs = Index::find()
1099            .find_also_related(Object)
1100            .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
1101            .filter(
1102                streaming_job::Column::JobStatus
1103                    .eq(JobStatus::Created)
1104                    .or(streaming_job::Column::CreateType.eq(CreateType::Background)),
1105            )
1106            .all(&self.db)
1107            .await?;
1108
1109        let creating_indexes: HashSet<_> = StreamingJob::find()
1110            .select_only()
1111            .column(streaming_job::Column::JobId)
1112            .filter(
1113                streaming_job::Column::JobStatus
1114                    .eq(JobStatus::Creating)
1115                    .and(
1116                        streaming_job::Column::JobId
1117                            .is_in(index_objs.iter().map(|(index, _)| index.index_id)),
1118                    ),
1119            )
1120            .into_tuple::<IndexId>()
1121            .all(&self.db)
1122            .await?
1123            .into_iter()
1124            .collect();
1125
1126        Ok(index_objs
1127            .into_iter()
1128            .map(|(index, obj)| {
1129                let is_creating = creating_indexes.contains(&index.index_id);
1130                let mut pb_index: PbIndex = ObjectModel(index, obj.unwrap()).into();
1131                pb_index.stream_job_status = if is_creating {
1132                    PbStreamJobStatus::Creating.into()
1133                } else {
1134                    PbStreamJobStatus::Created.into()
1135                };
1136                pb_index
1137            })
1138            .collect())
1139    }
1140
1141    async fn list_connections(&self) -> MetaResult<Vec<PbConnection>> {
1142        let conn_objs = Connection::find()
1143            .find_also_related(Object)
1144            .all(&self.db)
1145            .await?;
1146
1147        Ok(conn_objs
1148            .into_iter()
1149            .map(|(conn, obj)| ObjectModel(conn, obj.unwrap()).into())
1150            .collect())
1151    }
1152
1153    pub async fn list_secrets(&self) -> MetaResult<Vec<PbSecret>> {
1154        let secret_objs = Secret::find()
1155            .find_also_related(Object)
1156            .all(&self.db)
1157            .await?;
1158        Ok(secret_objs
1159            .into_iter()
1160            .map(|(secret, obj)| ObjectModel(secret, obj.unwrap()).into())
1161            .collect())
1162    }
1163
1164    async fn list_functions(&self) -> MetaResult<Vec<PbFunction>> {
1165        let func_objs = Function::find()
1166            .find_also_related(Object)
1167            .all(&self.db)
1168            .await?;
1169
1170        Ok(func_objs
1171            .into_iter()
1172            .map(|(func, obj)| ObjectModel(func, obj.unwrap()).into())
1173            .collect())
1174    }
1175
1176    pub(crate) fn register_finish_notifier(
1177        &mut self,
1178        database_id: DatabaseId,
1179        id: JobId,
1180        sender: Sender<Result<NotificationVersion, String>>,
1181    ) {
1182        self.creating_table_finish_notifier
1183            .entry(database_id)
1184            .or_default()
1185            .entry(id)
1186            .or_default()
1187            .push(sender);
1188    }
1189
1190    pub(crate) async fn streaming_job_is_finished(&mut self, id: JobId) -> MetaResult<bool> {
1191        let status = StreamingJob::find()
1192            .select_only()
1193            .column(streaming_job::Column::JobStatus)
1194            .filter(streaming_job::Column::JobId.eq(id))
1195            .into_tuple::<JobStatus>()
1196            .one(&self.db)
1197            .await?;
1198
1199        status
1200            .map(|status| status == JobStatus::Created)
1201            .ok_or_else(|| {
1202                MetaError::catalog_id_not_found("streaming job", "may have been cancelled/dropped")
1203            })
1204    }
1205
1206    pub(crate) fn notify_finish_failed(&mut self, database_id: Option<DatabaseId>, err: String) {
1207        if let Some(database_id) = database_id {
1208            if let Some(creating_tables) = self.creating_table_finish_notifier.remove(&database_id)
1209            {
1210                for tx in creating_tables.into_values().flatten() {
1211                    let _ = tx.send(Err(err.clone()));
1212                }
1213            }
1214        } else {
1215            for tx in take(&mut self.creating_table_finish_notifier)
1216                .into_values()
1217                .flatten()
1218                .flat_map(|(_, txs)| txs.into_iter())
1219            {
1220                let _ = tx.send(Err(err.clone()));
1221            }
1222        }
1223    }
1224
1225    pub async fn list_time_travel_table_ids(&self) -> MetaResult<Vec<TableId>> {
1226        let table_ids: Vec<TableId> = Table::find()
1227            .select_only()
1228            .filter(table::Column::TableType.is_in(vec![
1229                TableType::Table,
1230                TableType::MaterializedView,
1231                TableType::Index,
1232            ]))
1233            .column(table::Column::TableId)
1234            .into_tuple()
1235            .all(&self.db)
1236            .await?;
1237        Ok(table_ids)
1238    }
1239
1240    /// Since the tables have been dropped from both meta store and streaming jobs, this method removes those table copies.
1241    /// Returns the removed table copies.
1242    pub(crate) fn complete_dropped_tables(
1243        &mut self,
1244        table_ids: impl IntoIterator<Item = TableId>,
1245    ) -> Vec<PbTable> {
1246        table_ids
1247            .into_iter()
1248            .filter_map(|table_id| {
1249                self.dropped_tables.remove(&table_id).map_or_else(
1250                    || {
1251                        tracing::warn!(%table_id, "table not found");
1252                        None
1253                    },
1254                    Some,
1255                )
1256            })
1257            .collect()
1258    }
1259}