risingwave_meta/controller/catalog/
mod.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod alter_op;
16mod create_op;
17mod drop_op;
18mod get_op;
19mod list_op;
20mod test;
21mod util;
22
23use std::collections::{BTreeSet, HashMap, HashSet};
24use std::iter;
25use std::mem::take;
26use std::sync::Arc;
27
28use anyhow::anyhow;
29use itertools::Itertools;
30use risingwave_common::catalog::{
31    DEFAULT_SCHEMA_NAME, FragmentTypeFlag, FragmentTypeMask, SYSTEM_SCHEMAS, TableOption,
32};
33use risingwave_common::current_cluster_version;
34use risingwave_common::id::JobId;
35use risingwave_common::secret::LocalSecretManager;
36use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont_mut;
37use risingwave_connector::source::UPSTREAM_SOURCE_KEY;
38use risingwave_connector::source::cdc::build_cdc_table_id;
39use risingwave_meta_model::object::ObjectType;
40use risingwave_meta_model::prelude::*;
41use risingwave_meta_model::table::TableType;
42use risingwave_meta_model::{
43    ColumnCatalogArray, ConnectionId, CreateType, DatabaseId, FragmentId, I32Array, IndexId,
44    JobStatus, ObjectId, Property, SchemaId, SecretId, SinkFormatDesc, SinkId, SourceId,
45    StreamNode, StreamSourceInfo, StreamingParallelism, SubscriptionId, TableId, TableIdArray,
46    UserId, ViewId, connection, database, fragment, function, index, object, object_dependency,
47    pending_sink_state, schema, secret, sink, source, streaming_job, subscription, table,
48    user_privilege, view,
49};
50use risingwave_pb::catalog::connection::Info as ConnectionInfo;
51use risingwave_pb::catalog::subscription::SubscriptionState;
52use risingwave_pb::catalog::table::PbTableType;
53use risingwave_pb::catalog::{
54    PbComment, PbConnection, PbDatabase, PbFunction, PbIndex, PbSchema, PbSecret, PbSink, PbSource,
55    PbSubscription, PbTable, PbView,
56};
57use risingwave_pb::meta::cancel_creating_jobs_request::PbCreatingJobInfo;
58use risingwave_pb::meta::object::PbObjectInfo;
59use risingwave_pb::meta::subscribe_response::{
60    Info as NotificationInfo, Info, Operation as NotificationOperation, Operation,
61};
62use risingwave_pb::meta::{PbObject, PbObjectGroup};
63use risingwave_pb::stream_plan::stream_node::NodeBody;
64use risingwave_pb::telemetry::PbTelemetryEventStage;
65use risingwave_pb::user::PbUserInfo;
66use sea_orm::ActiveValue::Set;
67use sea_orm::sea_query::{Expr, Query, SimpleExpr};
68use sea_orm::{
69    ActiveModelTrait, ColumnTrait, DatabaseConnection, DatabaseTransaction, EntityTrait,
70    IntoActiveModel, JoinType, PaginatorTrait, QueryFilter, QuerySelect, RelationTrait,
71    SelectColumns, TransactionTrait, Value,
72};
73use tokio::sync::oneshot::Sender;
74use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
75use tracing::info;
76
77use super::utils::{
78    check_subscription_name_duplicate, get_internal_tables_by_id, load_streaming_jobs_by_ids,
79    rename_relation, rename_relation_refer,
80};
81use crate::controller::ObjectModel;
82use crate::controller::catalog::util::update_internal_tables;
83use crate::controller::fragment::FragmentTypeMaskExt;
84use crate::controller::utils::*;
85use crate::manager::{
86    IGNORED_NOTIFICATION_VERSION, MetaSrvEnv, NotificationVersion,
87    get_referred_connection_ids_from_source, get_referred_secret_ids_from_source,
88};
89use crate::rpc::ddl_controller::DropMode;
90use crate::telemetry::{MetaTelemetryJobDesc, report_event};
91use crate::{MetaError, MetaResult};
92
93pub type Catalog = (
94    Vec<PbDatabase>,
95    Vec<PbSchema>,
96    Vec<PbTable>,
97    Vec<PbSource>,
98    Vec<PbSink>,
99    Vec<PbSubscription>,
100    Vec<PbIndex>,
101    Vec<PbView>,
102    Vec<PbFunction>,
103    Vec<PbConnection>,
104    Vec<PbSecret>,
105);
106
107pub type CatalogControllerRef = Arc<CatalogController>;
108
109/// `CatalogController` is the controller for catalog related operations, including database, schema, table, view, etc.
110pub struct CatalogController {
111    pub(crate) env: MetaSrvEnv,
112    pub(crate) inner: RwLock<CatalogControllerInner>,
113}
114
115#[derive(Clone, Default, Debug)]
116pub struct DropTableConnectorContext {
117    // we only apply one drop connector action for one table each time, so no need to vector here
118    pub(crate) to_change_streaming_job_id: JobId,
119    pub(crate) to_remove_state_table_id: TableId,
120    pub(crate) to_remove_source_id: SourceId,
121}
122
123#[derive(Clone, Default, Debug)]
124pub struct ReleaseContext {
125    pub(crate) database_id: DatabaseId,
126    pub(crate) removed_streaming_job_ids: Vec<JobId>,
127    /// Dropped state table list, need to unregister from hummock.
128    pub(crate) removed_state_table_ids: Vec<TableId>,
129
130    /// Dropped secrets, need to remove from secret manager.
131    pub(crate) removed_secret_ids: Vec<SecretId>,
132    /// Dropped sources (when `DROP SOURCE`), need to unregister from source manager.
133    pub(crate) removed_source_ids: Vec<SourceId>,
134    /// Dropped Source fragments (when `DROP MATERIALIZED VIEW` referencing sources),
135    /// need to unregister from source manager.
136    pub(crate) removed_source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
137
138    pub(crate) removed_fragments: HashSet<FragmentId>,
139
140    /// Removed sink fragment by target fragment.
141    pub(crate) removed_sink_fragment_by_targets: HashMap<FragmentId, Vec<FragmentId>>,
142
143    /// Dropped iceberg table sinks
144    pub(crate) removed_iceberg_table_sinks: Vec<PbSink>,
145}
146
147impl CatalogController {
148    pub async fn new(env: MetaSrvEnv) -> MetaResult<Self> {
149        let meta_store = env.meta_store();
150        let catalog_controller = Self {
151            env,
152            inner: RwLock::new(CatalogControllerInner {
153                db: meta_store.conn,
154                creating_table_finish_notifier: HashMap::new(),
155                dropped_tables: HashMap::new(),
156            }),
157        };
158
159        catalog_controller.init().await?;
160        Ok(catalog_controller)
161    }
162
163    /// Used in `NotificationService::subscribe`.
164    /// Need to pay attention to the order of acquiring locks to prevent deadlock problems.
165    pub async fn get_inner_read_guard(&self) -> RwLockReadGuard<'_, CatalogControllerInner> {
166        self.inner.read().await
167    }
168
169    pub async fn get_inner_write_guard(&self) -> RwLockWriteGuard<'_, CatalogControllerInner> {
170        self.inner.write().await
171    }
172}
173
174pub struct CatalogControllerInner {
175    pub(crate) db: DatabaseConnection,
176    /// Registered finish notifiers for creating tables.
177    ///
178    /// `DdlController` will update this map, and pass the `tx` side to `CatalogController`.
179    /// On notifying, we can remove the entry from this map.
180    #[expect(clippy::type_complexity)]
181    pub creating_table_finish_notifier:
182        HashMap<DatabaseId, HashMap<JobId, Vec<Sender<Result<NotificationVersion, String>>>>>,
183    /// Tables have been dropped from the meta store, but the corresponding barrier remains unfinished.
184    pub dropped_tables: HashMap<TableId, PbTable>,
185}
186
187impl CatalogController {
188    pub(crate) async fn notify_frontend(
189        &self,
190        operation: NotificationOperation,
191        info: NotificationInfo,
192    ) -> NotificationVersion {
193        self.env
194            .notification_manager()
195            .notify_frontend(operation, info)
196            .await
197    }
198
199    pub(crate) async fn notify_frontend_relation_info(
200        &self,
201        operation: NotificationOperation,
202        relation_info: PbObjectInfo,
203    ) -> NotificationVersion {
204        self.env
205            .notification_manager()
206            .notify_frontend_object_info(operation, relation_info)
207            .await
208    }
209
210    /// Trivially advance the notification version and notify to frontend,
211    /// return the notification version for frontend to wait for.
212    ///
213    /// Cannot simply return the current version, because the current version may not be sent
214    /// to frontend, and the frontend may endlessly wait for this version, until a frontend
215    /// related notification is sent.
216    pub(crate) async fn notify_frontend_trivial(&self) -> NotificationVersion {
217        self.env
218            .notification_manager()
219            .notify_frontend(
220                NotificationOperation::Update,
221                NotificationInfo::ObjectGroup(PbObjectGroup {
222                    objects: vec![],
223                    dependencies: vec![],
224                }),
225            )
226            .await
227    }
228}
229
230impl CatalogController {
231    pub async fn finish_create_subscription_catalog(
232        &self,
233        subscription_id: SubscriptionId,
234    ) -> MetaResult<()> {
235        let inner = self.inner.write().await;
236        let txn = inner.db.begin().await?;
237
238        // update `created_at` as now() and `created_at_cluster_version` as current cluster version.
239        let res = Object::update_many()
240            .col_expr(object::Column::CreatedAt, Expr::current_timestamp().into())
241            .col_expr(
242                object::Column::CreatedAtClusterVersion,
243                current_cluster_version().into(),
244            )
245            .filter(object::Column::Oid.eq(subscription_id))
246            .exec(&txn)
247            .await?;
248        if res.rows_affected == 0 {
249            return Err(MetaError::catalog_id_not_found(
250                "subscription",
251                subscription_id,
252            ));
253        }
254
255        // mark the target subscription as `Create`.
256        let job = subscription::ActiveModel {
257            subscription_id: Set(subscription_id),
258            subscription_state: Set(SubscriptionState::Created.into()),
259            ..Default::default()
260        };
261        Subscription::update(job).exec(&txn).await?;
262
263        let _ = grant_default_privileges_automatically(&txn, subscription_id).await?;
264
265        txn.commit().await?;
266
267        Ok(())
268    }
269
270    pub async fn notify_create_subscription(
271        &self,
272        subscription_id: SubscriptionId,
273    ) -> MetaResult<NotificationVersion> {
274        let inner = self.inner.read().await;
275        let txn = inner.db.begin().await?;
276        let (subscription, obj) = Subscription::find_by_id(subscription_id)
277            .find_also_related(Object)
278            .filter(subscription::Column::SubscriptionState.eq(SubscriptionState::Created as i32))
279            .one(&txn)
280            .await?
281            .ok_or_else(|| MetaError::catalog_id_not_found("subscription", subscription_id))?;
282
283        let dependencies =
284            list_object_dependencies_by_object_id(&txn, subscription_id.into()).await?;
285        txn.commit().await?;
286
287        let mut version = self
288            .notify_frontend(
289                NotificationOperation::Add,
290                NotificationInfo::ObjectGroup(PbObjectGroup {
291                    objects: vec![PbObject {
292                        object_info: PbObjectInfo::Subscription(
293                            ObjectModel(subscription, obj.unwrap(), None).into(),
294                        )
295                        .into(),
296                    }],
297                    dependencies,
298                }),
299            )
300            .await;
301
302        // notify default privileges about the new subscription
303        let updated_user_ids: Vec<UserId> = UserPrivilege::find()
304            .select_only()
305            .distinct()
306            .column(user_privilege::Column::UserId)
307            .filter(user_privilege::Column::Oid.eq(subscription_id.as_object_id()))
308            .into_tuple()
309            .all(&inner.db)
310            .await?;
311
312        if !updated_user_ids.is_empty() {
313            let updated_user_infos = list_user_info_by_ids(updated_user_ids, &inner.db).await?;
314            version = self.notify_users_update(updated_user_infos).await;
315        }
316
317        Ok(version)
318    }
319
320    // for telemetry
321    pub async fn get_connector_usage(&self) -> MetaResult<jsonbb::Value> {
322        // get connector usage by source/sink
323        // the expect format is like:
324        // {
325        //     "source": [{
326        //         "$source_id": {
327        //             "connector": "kafka",
328        //             "format": "plain",
329        //             "encode": "json"
330        //         },
331        //     }],
332        //     "sink": [{
333        //         "$sink_id": {
334        //             "connector": "pulsar",
335        //             "format": "upsert",
336        //             "encode": "avro"
337        //         },
338        //     }],
339        // }
340
341        let inner = self.inner.read().await;
342        let source_props_and_info: Vec<(i32, Property, Option<StreamSourceInfo>)> = Source::find()
343            .select_only()
344            .column(source::Column::SourceId)
345            .column(source::Column::WithProperties)
346            .column(source::Column::SourceInfo)
347            .into_tuple()
348            .all(&inner.db)
349            .await?;
350        let sink_props_and_info: Vec<(i32, Property, Option<SinkFormatDesc>)> = Sink::find()
351            .select_only()
352            .column(sink::Column::SinkId)
353            .column(sink::Column::Properties)
354            .column(sink::Column::SinkFormatDesc)
355            .into_tuple()
356            .all(&inner.db)
357            .await?;
358        drop(inner);
359
360        let get_connector_from_property = |property: &Property| -> String {
361            property
362                .0
363                .get(UPSTREAM_SOURCE_KEY)
364                .cloned()
365                .unwrap_or_default()
366        };
367
368        let source_report: Vec<jsonbb::Value> = source_props_and_info
369            .iter()
370            .map(|(oid, property, info)| {
371                let connector_name = get_connector_from_property(property);
372                let mut format = None;
373                let mut encode = None;
374                if let Some(info) = info {
375                    let pb_info = info.to_protobuf();
376                    format = Some(pb_info.format().as_str_name());
377                    encode = Some(pb_info.row_encode().as_str_name());
378                }
379                jsonbb::json!({
380                    oid.to_string(): {
381                        "connector": connector_name,
382                        "format": format,
383                        "encode": encode,
384                    },
385                })
386            })
387            .collect_vec();
388
389        let sink_report: Vec<jsonbb::Value> = sink_props_and_info
390            .iter()
391            .map(|(oid, property, info)| {
392                let connector_name = get_connector_from_property(property);
393                let mut format = None;
394                let mut encode = None;
395                if let Some(info) = info {
396                    let pb_info = info.to_protobuf();
397                    format = Some(pb_info.format().as_str_name());
398                    encode = Some(pb_info.encode().as_str_name());
399                }
400                jsonbb::json!({
401                    oid.to_string(): {
402                        "connector": connector_name,
403                        "format": format,
404                        "encode": encode,
405                    },
406                })
407            })
408            .collect_vec();
409
410        Ok(jsonbb::json!({
411                "source": source_report,
412                "sink": sink_report,
413        }))
414    }
415
416    pub async fn clean_dirty_subscription(
417        &self,
418        database_id: Option<DatabaseId>,
419    ) -> MetaResult<()> {
420        let inner = self.inner.write().await;
421        let txn = inner.db.begin().await?;
422        let filter_condition = object::Column::ObjType.eq(ObjectType::Subscription).and(
423            object::Column::Oid.not_in_subquery(
424                Query::select()
425                    .column(subscription::Column::SubscriptionId)
426                    .from(Subscription)
427                    .and_where(
428                        subscription::Column::SubscriptionState
429                            .eq(SubscriptionState::Created as i32),
430                    )
431                    .take(),
432            ),
433        );
434
435        let filter_condition = if let Some(database_id) = database_id {
436            filter_condition.and(object::Column::DatabaseId.eq(database_id))
437        } else {
438            filter_condition
439        };
440        Object::delete_many()
441            .filter(filter_condition)
442            .exec(&txn)
443            .await?;
444        txn.commit().await?;
445        // We don't need to notify the frontend, because the Init subscription is not send to frontend.
446        Ok(())
447    }
448
449    /// `clean_dirty_creating_jobs` cleans up creating jobs that are creating in Foreground mode or in Initial status.
450    pub async fn clean_dirty_creating_jobs(
451        &self,
452        database_id: Option<DatabaseId>,
453    ) -> MetaResult<Vec<SourceId>> {
454        let inner = self.inner.write().await;
455        let txn = inner.db.begin().await?;
456
457        let filter_condition = streaming_job::Column::JobStatus.eq(JobStatus::Initial).or(
458            streaming_job::Column::JobStatus
459                .eq(JobStatus::Creating)
460                .and(streaming_job::Column::CreateType.eq(CreateType::Foreground)),
461        );
462
463        let filter_condition = if let Some(database_id) = database_id {
464            filter_condition.and(object::Column::DatabaseId.eq(database_id))
465        } else {
466            filter_condition
467        };
468
469        let mut dirty_job_objs: Vec<PartialObject> = streaming_job::Entity::find()
470            .select_only()
471            .column(streaming_job::Column::JobId)
472            .columns([
473                object::Column::Oid,
474                object::Column::ObjType,
475                object::Column::SchemaId,
476                object::Column::DatabaseId,
477            ])
478            .join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
479            .filter(filter_condition)
480            .into_partial_model()
481            .all(&txn)
482            .await?;
483
484        // Check if there are any pending iceberg table jobs.
485        let dirty_iceberg_jobs = find_dirty_iceberg_table_jobs(&txn, database_id).await?;
486        if !dirty_iceberg_jobs.is_empty() {
487            dirty_job_objs.extend(dirty_iceberg_jobs);
488        }
489
490        Self::clean_dirty_sink_downstreams(&txn).await?;
491
492        if dirty_job_objs.is_empty() {
493            return Ok(vec![]);
494        }
495
496        self.log_cleaned_dirty_jobs(&dirty_job_objs, &txn).await?;
497
498        let dirty_job_ids = dirty_job_objs.iter().map(|obj| obj.oid).collect::<Vec<_>>();
499
500        // Filter out dummy objs for replacement.
501        // todo: we'd better introduce a new dummy object type for replacement.
502        let all_dirty_table_ids = dirty_job_objs
503            .iter()
504            .filter(|obj| obj.obj_type == ObjectType::Table)
505            .map(|obj| obj.oid)
506            .collect_vec();
507        let dirty_table_type_map: HashMap<ObjectId, TableType> = Table::find()
508            .select_only()
509            .column(table::Column::TableId)
510            .column(table::Column::TableType)
511            .filter(table::Column::TableId.is_in(all_dirty_table_ids))
512            .into_tuple::<(ObjectId, TableType)>()
513            .all(&txn)
514            .await?
515            .into_iter()
516            .collect();
517
518        let dirty_background_jobs: HashSet<ObjectId> = streaming_job::Entity::find()
519            .select_only()
520            .column(streaming_job::Column::JobId)
521            .filter(
522                streaming_job::Column::JobId
523                    .is_in(dirty_job_ids.clone())
524                    .and(streaming_job::Column::CreateType.eq(CreateType::Background)),
525            )
526            .into_tuple()
527            .all(&txn)
528            .await?
529            .into_iter()
530            .collect();
531
532        // notify delete for failed materialized views and background jobs.
533        let to_notify_objs = dirty_job_objs
534            .into_iter()
535            .filter(|obj| {
536                matches!(
537                    dirty_table_type_map.get(&obj.oid),
538                    Some(TableType::MaterializedView)
539                ) || dirty_background_jobs.contains(&obj.oid)
540            })
541            .collect_vec();
542
543        // The source ids for dirty tables with connector.
544        // FIXME: we should also clean dirty sources.
545        let dirty_associated_source_ids: Vec<SourceId> = Table::find()
546            .select_only()
547            .column(table::Column::OptionalAssociatedSourceId)
548            .filter(
549                table::Column::TableId
550                    .is_in(dirty_job_ids.clone())
551                    .and(table::Column::OptionalAssociatedSourceId.is_not_null()),
552            )
553            .into_tuple()
554            .all(&txn)
555            .await?;
556
557        let dirty_state_table_ids: Vec<TableId> = Table::find()
558            .select_only()
559            .column(table::Column::TableId)
560            .filter(table::Column::BelongsToJobId.is_in(dirty_job_ids.clone()))
561            .into_tuple()
562            .all(&txn)
563            .await?;
564
565        let dirty_internal_table_objs = Object::find()
566            .select_only()
567            .columns([
568                object::Column::Oid,
569                object::Column::ObjType,
570                object::Column::SchemaId,
571                object::Column::DatabaseId,
572            ])
573            .join(JoinType::InnerJoin, object::Relation::Table.def())
574            .filter(table::Column::BelongsToJobId.is_in(to_notify_objs.iter().map(|obj| obj.oid)))
575            .into_partial_model()
576            .all(&txn)
577            .await?;
578
579        let to_delete_objs: HashSet<ObjectId> = dirty_job_ids
580            .clone()
581            .into_iter()
582            .chain(
583                dirty_state_table_ids
584                    .into_iter()
585                    .map(|table_id| table_id.as_object_id()),
586            )
587            .chain(
588                dirty_associated_source_ids
589                    .iter()
590                    .map(|source_id| source_id.as_object_id()),
591            )
592            .collect();
593
594        let res = Object::delete_many()
595            .filter(object::Column::Oid.is_in(to_delete_objs))
596            .exec(&txn)
597            .await?;
598        assert!(res.rows_affected > 0);
599
600        txn.commit().await?;
601
602        let object_group = build_object_group_for_delete(
603            to_notify_objs
604                .into_iter()
605                .chain(dirty_internal_table_objs.into_iter())
606                .collect_vec(),
607        );
608
609        let _version = self
610            .notify_frontend(NotificationOperation::Delete, object_group)
611            .await;
612
613        Ok(dirty_associated_source_ids)
614    }
615
616    pub async fn comment_on(&self, comment: PbComment) -> MetaResult<NotificationVersion> {
617        let inner = self.inner.write().await;
618        let txn = inner.db.begin().await?;
619        ensure_object_id(ObjectType::Database, comment.database_id, &txn).await?;
620        ensure_object_id(ObjectType::Schema, comment.schema_id, &txn).await?;
621        let (table_obj, streaming_job) = Object::find_by_id(comment.table_id)
622            .find_also_related(StreamingJob)
623            .one(&txn)
624            .await?
625            .ok_or_else(|| MetaError::catalog_id_not_found("object", comment.table_id))?;
626
627        let table = if let Some(col_idx) = comment.column_index {
628            let columns: ColumnCatalogArray = Table::find_by_id(comment.table_id)
629                .select_only()
630                .column(table::Column::Columns)
631                .into_tuple()
632                .one(&txn)
633                .await?
634                .ok_or_else(|| MetaError::catalog_id_not_found("table", comment.table_id))?;
635            let mut pb_columns = columns.to_protobuf();
636
637            let column = pb_columns
638                .get_mut(col_idx as usize)
639                .ok_or_else(|| MetaError::catalog_id_not_found("column", col_idx))?;
640            let column_desc = column.column_desc.as_mut().ok_or_else(|| {
641                anyhow!(
642                    "column desc at index {} for table id {} not found",
643                    col_idx,
644                    comment.table_id
645                )
646            })?;
647            column_desc.description = comment.description;
648            table::ActiveModel {
649                table_id: Set(comment.table_id),
650                columns: Set(pb_columns.into()),
651                ..Default::default()
652            }
653            .update(&txn)
654            .await?
655        } else {
656            table::ActiveModel {
657                table_id: Set(comment.table_id),
658                description: Set(comment.description),
659                ..Default::default()
660            }
661            .update(&txn)
662            .await?
663        };
664        txn.commit().await?;
665
666        let version = self
667            .notify_frontend_relation_info(
668                NotificationOperation::Update,
669                PbObjectInfo::Table(ObjectModel(table, table_obj, streaming_job).into()),
670            )
671            .await;
672
673        Ok(version)
674    }
675
676    async fn notify_hummock_dropped_tables(&self, tables: Vec<PbTable>) {
677        if tables.is_empty() {
678            return;
679        }
680        let objects = tables
681            .into_iter()
682            .map(|t| PbObject {
683                object_info: Some(PbObjectInfo::Table(t)),
684            })
685            .collect();
686        let group = NotificationInfo::ObjectGroup(PbObjectGroup {
687            objects,
688            dependencies: vec![],
689        });
690        self.env
691            .notification_manager()
692            .notify_hummock(NotificationOperation::Delete, group.clone())
693            .await;
694        self.env
695            .notification_manager()
696            .notify_compactor(NotificationOperation::Delete, group)
697            .await;
698    }
699
700    pub async fn complete_dropped_tables(&self, table_ids: impl IntoIterator<Item = TableId>) {
701        let mut inner = self.inner.write().await;
702        let tables = inner.complete_dropped_tables(table_ids);
703        self.notify_hummock_dropped_tables(tables).await;
704    }
705
706    pub async fn cleanup_dropped_tables(&self) {
707        let mut inner = self.inner.write().await;
708        let tables = inner.dropped_tables.drain().map(|(_, t)| t).collect();
709        self.notify_hummock_dropped_tables(tables).await;
710    }
711
712    pub async fn stats(&self) -> MetaResult<CatalogStats> {
713        let inner = self.inner.read().await;
714
715        let mut table_num_map: HashMap<_, _> = Table::find()
716            .select_only()
717            .column(table::Column::TableType)
718            .column_as(table::Column::TableId.count(), "num")
719            .group_by(table::Column::TableType)
720            .having(table::Column::TableType.ne(TableType::Internal))
721            .into_tuple::<(TableType, i64)>()
722            .all(&inner.db)
723            .await?
724            .into_iter()
725            .map(|(table_type, num)| (table_type, num as u64))
726            .collect();
727
728        let source_num = Source::find().count(&inner.db).await?;
729        let sink_num = Sink::find().count(&inner.db).await?;
730        let function_num = Function::find().count(&inner.db).await?;
731        let streaming_job_num = StreamingJob::find().count(&inner.db).await?;
732
733        let actor_num = {
734            let guard = self.env.shared_actor_info.read_guard();
735            guard
736                .iter_over_fragments()
737                .map(|(_, fragment)| fragment.actors.len() as u64)
738                .sum::<u64>()
739        };
740        let database_num = Database::find().count(&inner.db).await?;
741
742        Ok(CatalogStats {
743            table_num: table_num_map.remove(&TableType::Table).unwrap_or(0),
744            mview_num: table_num_map
745                .remove(&TableType::MaterializedView)
746                .unwrap_or(0),
747            index_num: table_num_map.remove(&TableType::Index).unwrap_or(0),
748            source_num,
749            sink_num,
750            function_num,
751            streaming_job_num,
752            actor_num,
753            database_num,
754        })
755    }
756
757    pub async fn fetch_sink_with_state_table_ids(
758        &self,
759        sink_ids: HashSet<SinkId>,
760    ) -> MetaResult<HashMap<SinkId, Vec<TableId>>> {
761        let inner = self.inner.read().await;
762
763        let query = Fragment::find()
764            .select_only()
765            .columns([fragment::Column::JobId, fragment::Column::StateTableIds])
766            .filter(
767                fragment::Column::JobId
768                    .is_in(sink_ids)
769                    .and(FragmentTypeMask::intersects(FragmentTypeFlag::Sink)),
770            );
771
772        let rows: Vec<(JobId, TableIdArray)> = query.into_tuple().all(&inner.db).await?;
773
774        debug_assert!(rows.iter().map(|(job_id, _)| job_id).all_unique());
775
776        let result = rows
777            .into_iter()
778            .map(|(job_id, table_id_array)| (job_id.as_sink_id(), table_id_array.0))
779            .collect::<HashMap<_, _>>();
780
781        Ok(result)
782    }
783
784    pub async fn list_all_pending_sinks(
785        &self,
786        database_id: Option<DatabaseId>,
787    ) -> MetaResult<HashSet<SinkId>> {
788        let inner = self.inner.read().await;
789
790        let mut query = pending_sink_state::Entity::find()
791            .select_only()
792            .columns([pending_sink_state::Column::SinkId])
793            .filter(
794                pending_sink_state::Column::SinkState.eq(pending_sink_state::SinkState::Pending),
795            )
796            .distinct();
797
798        if let Some(db_id) = database_id {
799            query = query
800                .join(
801                    JoinType::InnerJoin,
802                    pending_sink_state::Relation::Object.def(),
803                )
804                .filter(object::Column::DatabaseId.eq(db_id));
805        }
806
807        let result: Vec<SinkId> = query.into_tuple().all(&inner.db).await?;
808
809        Ok(result.into_iter().collect())
810    }
811
812    pub async fn abort_pending_sink_epochs(
813        &self,
814        sink_committed_epoch: HashMap<SinkId, u64>,
815    ) -> MetaResult<()> {
816        let inner = self.inner.write().await;
817        let txn = inner.db.begin().await?;
818
819        for (sink_id, committed_epoch) in sink_committed_epoch {
820            pending_sink_state::Entity::update_many()
821                .col_expr(
822                    pending_sink_state::Column::SinkState,
823                    Expr::value(pending_sink_state::SinkState::Aborted),
824                )
825                .filter(
826                    pending_sink_state::Column::SinkId
827                        .eq(sink_id)
828                        .and(pending_sink_state::Column::Epoch.gt(committed_epoch as i64)),
829                )
830                .exec(&txn)
831                .await?;
832        }
833
834        txn.commit().await?;
835        Ok(())
836    }
837}
838
839/// `CatalogStats` is a struct to store the statistics of all catalogs.
840pub struct CatalogStats {
841    pub table_num: u64,
842    pub mview_num: u64,
843    pub index_num: u64,
844    pub source_num: u64,
845    pub sink_num: u64,
846    pub function_num: u64,
847    pub streaming_job_num: u64,
848    pub actor_num: u64,
849    pub database_num: u64,
850}
851
852impl CatalogControllerInner {
853    pub async fn snapshot(&self) -> MetaResult<(Catalog, Vec<PbUserInfo>)> {
854        let databases = self.list_databases().await?;
855        let schemas = self.list_schemas().await?;
856        let tables = self.list_tables().await?;
857        let sources = self.list_sources().await?;
858        let sinks = self.list_sinks().await?;
859        let subscriptions = self.list_subscriptions().await?;
860        let indexes = self.list_indexes().await?;
861        let views = self.list_views().await?;
862        let functions = self.list_functions().await?;
863        let connections = self.list_connections().await?;
864        let secrets = self.list_secrets().await?;
865
866        let users = self.list_users().await?;
867
868        Ok((
869            (
870                databases,
871                schemas,
872                tables,
873                sources,
874                sinks,
875                subscriptions,
876                indexes,
877                views,
878                functions,
879                connections,
880                secrets,
881            ),
882            users,
883        ))
884    }
885
886    async fn list_databases(&self) -> MetaResult<Vec<PbDatabase>> {
887        let db_objs = Database::find()
888            .find_also_related(Object)
889            .all(&self.db)
890            .await?;
891        Ok(db_objs
892            .into_iter()
893            .map(|(db, obj)| ObjectModel(db, obj.unwrap(), None).into())
894            .collect())
895    }
896
897    async fn list_schemas(&self) -> MetaResult<Vec<PbSchema>> {
898        let schema_objs = Schema::find()
899            .find_also_related(Object)
900            .all(&self.db)
901            .await?;
902
903        Ok(schema_objs
904            .into_iter()
905            .map(|(schema, obj)| ObjectModel(schema, obj.unwrap(), None).into())
906            .collect())
907    }
908
909    async fn list_users(&self) -> MetaResult<Vec<PbUserInfo>> {
910        let mut user_infos: Vec<PbUserInfo> = User::find()
911            .all(&self.db)
912            .await?
913            .into_iter()
914            .map(Into::into)
915            .collect();
916
917        for user_info in &mut user_infos {
918            user_info.grant_privileges = get_user_privilege(user_info.id as _, &self.db).await?;
919        }
920        Ok(user_infos)
921    }
922
923    /// `list_all_tables` return all tables and internal tables.
924    pub async fn list_all_state_tables(&self) -> MetaResult<Vec<PbTable>> {
925        let table_objs = Table::find()
926            .find_also_related(Object)
927            .all(&self.db)
928            .await?;
929        let streaming_jobs = load_streaming_jobs_by_ids(
930            &self.db,
931            table_objs.iter().map(|(table, _)| table.job_id()),
932        )
933        .await?;
934
935        Ok(table_objs
936            .into_iter()
937            .map(|(table, obj)| {
938                let job_id = table.job_id();
939                let streaming_job = streaming_jobs.get(&job_id).cloned();
940                ObjectModel(table, obj.unwrap(), streaming_job).into()
941            })
942            .collect())
943    }
944
945    /// `list_tables` return all `CREATED` tables, `CREATING` materialized views/ `BACKGROUND` jobs and internal tables that belong to them and sinks.
946    async fn list_tables(&self) -> MetaResult<Vec<PbTable>> {
947        let mut table_objs = Table::find()
948            .find_also_related(Object)
949            .all(&self.db)
950            .await?;
951
952        let all_streaming_jobs: HashMap<JobId, streaming_job::Model> = StreamingJob::find()
953            .all(&self.db)
954            .await?
955            .into_iter()
956            .map(|job| (job.job_id, job))
957            .collect();
958
959        let sink_ids: HashSet<SinkId> = Sink::find()
960            .select_only()
961            .column(sink::Column::SinkId)
962            .into_tuple::<SinkId>()
963            .all(&self.db)
964            .await?
965            .into_iter()
966            .collect();
967
968        let mview_job_ids: HashSet<JobId> = table_objs
969            .iter()
970            .filter_map(|(table, _)| {
971                if table.table_type == TableType::MaterializedView {
972                    Some(table.table_id.as_job_id())
973                } else {
974                    None
975                }
976            })
977            .collect();
978
979        table_objs.retain(|(table, _)| {
980            let job_id = table.job_id();
981
982            if sink_ids.contains(&job_id.as_sink_id()) || mview_job_ids.contains(&job_id) {
983                return true;
984            }
985            if let Some(streaming_job) = all_streaming_jobs.get(&job_id) {
986                return streaming_job.job_status == JobStatus::Created
987                    || (streaming_job.create_type == CreateType::Background);
988            }
989            false
990        });
991
992        let mut tables = Vec::with_capacity(table_objs.len());
993        for (table, obj) in table_objs {
994            let job_id = table.job_id();
995            let streaming_job = all_streaming_jobs.get(&job_id).cloned();
996            let pb_table: PbTable = ObjectModel(table, obj.unwrap(), streaming_job).into();
997            tables.push(pb_table);
998        }
999        Ok(tables)
1000    }
1001
1002    /// `list_sources` return all sources and `CREATED` ones if contains any streaming jobs.
1003    async fn list_sources(&self) -> MetaResult<Vec<PbSource>> {
1004        let mut source_objs = Source::find()
1005            .find_also_related(Object)
1006            .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
1007            .filter(
1008                streaming_job::Column::JobStatus
1009                    .is_null()
1010                    .or(streaming_job::Column::JobStatus.eq(JobStatus::Created)),
1011            )
1012            .all(&self.db)
1013            .await?;
1014
1015        // filter out inner connector sources that are still under creating.
1016        let created_table_ids: HashSet<TableId> = Table::find()
1017            .select_only()
1018            .column(table::Column::TableId)
1019            .join(JoinType::InnerJoin, table::Relation::Object1.def())
1020            .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
1021            .filter(
1022                table::Column::OptionalAssociatedSourceId
1023                    .is_not_null()
1024                    .and(streaming_job::Column::JobStatus.eq(JobStatus::Created)),
1025            )
1026            .into_tuple()
1027            .all(&self.db)
1028            .await?
1029            .into_iter()
1030            .collect();
1031        source_objs.retain_mut(|(source, _)| {
1032            source.optional_associated_table_id.is_none()
1033                || created_table_ids.contains(&source.optional_associated_table_id.unwrap())
1034        });
1035
1036        Ok(source_objs
1037            .into_iter()
1038            .map(|(source, obj)| ObjectModel(source, obj.unwrap(), None).into())
1039            .collect())
1040    }
1041
1042    /// `list_sinks` return all sinks.
1043    async fn list_sinks(&self) -> MetaResult<Vec<PbSink>> {
1044        let sink_objs = Sink::find()
1045            .find_also_related(Object)
1046            .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
1047            .all(&self.db)
1048            .await?;
1049        let streaming_jobs = load_streaming_jobs_by_ids(
1050            &self.db,
1051            sink_objs.iter().map(|(sink, _)| sink.sink_id.as_job_id()),
1052        )
1053        .await?;
1054
1055        Ok(sink_objs
1056            .into_iter()
1057            .map(|(sink, obj)| {
1058                let streaming_job = streaming_jobs.get(&sink.sink_id.as_job_id()).cloned();
1059                ObjectModel(sink, obj.unwrap(), streaming_job).into()
1060            })
1061            .collect())
1062    }
1063
1064    /// `list_subscriptions` return all `CREATED` subscriptions.
1065    async fn list_subscriptions(&self) -> MetaResult<Vec<PbSubscription>> {
1066        let subscription_objs = Subscription::find()
1067            .find_also_related(Object)
1068            .filter(subscription::Column::SubscriptionState.eq(SubscriptionState::Created as i32))
1069            .all(&self.db)
1070            .await?;
1071
1072        Ok(subscription_objs
1073            .into_iter()
1074            .map(|(subscription, obj)| ObjectModel(subscription, obj.unwrap(), None).into())
1075            .collect())
1076    }
1077
1078    async fn list_views(&self) -> MetaResult<Vec<PbView>> {
1079        let view_objs = View::find().find_also_related(Object).all(&self.db).await?;
1080
1081        Ok(view_objs
1082            .into_iter()
1083            .map(|(view, obj)| ObjectModel(view, obj.unwrap(), None).into())
1084            .collect())
1085    }
1086
1087    /// `list_indexes` return all `CREATED` and `BACKGROUND` indexes.
1088    async fn list_indexes(&self) -> MetaResult<Vec<PbIndex>> {
1089        let index_objs = Index::find()
1090            .find_also_related(Object)
1091            .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
1092            .filter(
1093                streaming_job::Column::JobStatus
1094                    .eq(JobStatus::Created)
1095                    .or(streaming_job::Column::CreateType.eq(CreateType::Background)),
1096            )
1097            .all(&self.db)
1098            .await?;
1099        let streaming_jobs = load_streaming_jobs_by_ids(
1100            &self.db,
1101            index_objs
1102                .iter()
1103                .map(|(index, _)| index.index_id.as_job_id()),
1104        )
1105        .await?;
1106
1107        Ok(index_objs
1108            .into_iter()
1109            .map(|(index, obj)| {
1110                let streaming_job = streaming_jobs.get(&index.index_id.as_job_id()).cloned();
1111                ObjectModel(index, obj.unwrap(), streaming_job).into()
1112            })
1113            .collect())
1114    }
1115
1116    async fn list_connections(&self) -> MetaResult<Vec<PbConnection>> {
1117        let conn_objs = Connection::find()
1118            .find_also_related(Object)
1119            .all(&self.db)
1120            .await?;
1121
1122        Ok(conn_objs
1123            .into_iter()
1124            .map(|(conn, obj)| ObjectModel(conn, obj.unwrap(), None).into())
1125            .collect())
1126    }
1127
1128    pub async fn list_secrets(&self) -> MetaResult<Vec<PbSecret>> {
1129        let secret_objs = Secret::find()
1130            .find_also_related(Object)
1131            .all(&self.db)
1132            .await?;
1133        Ok(secret_objs
1134            .into_iter()
1135            .map(|(secret, obj)| ObjectModel(secret, obj.unwrap(), None).into())
1136            .collect())
1137    }
1138
1139    async fn list_functions(&self) -> MetaResult<Vec<PbFunction>> {
1140        let func_objs = Function::find()
1141            .find_also_related(Object)
1142            .all(&self.db)
1143            .await?;
1144
1145        Ok(func_objs
1146            .into_iter()
1147            .map(|(func, obj)| ObjectModel(func, obj.unwrap(), None).into())
1148            .collect())
1149    }
1150
1151    pub(crate) fn register_finish_notifier(
1152        &mut self,
1153        database_id: DatabaseId,
1154        id: JobId,
1155        sender: Sender<Result<NotificationVersion, String>>,
1156    ) {
1157        self.creating_table_finish_notifier
1158            .entry(database_id)
1159            .or_default()
1160            .entry(id)
1161            .or_default()
1162            .push(sender);
1163    }
1164
1165    pub(crate) async fn streaming_job_is_finished(&mut self, id: JobId) -> MetaResult<bool> {
1166        let status = StreamingJob::find()
1167            .select_only()
1168            .column(streaming_job::Column::JobStatus)
1169            .filter(streaming_job::Column::JobId.eq(id))
1170            .into_tuple::<JobStatus>()
1171            .one(&self.db)
1172            .await?;
1173
1174        status
1175            .map(|status| status == JobStatus::Created)
1176            .ok_or_else(|| {
1177                MetaError::catalog_id_not_found("streaming job", "may have been cancelled/dropped")
1178            })
1179    }
1180
1181    pub(crate) fn notify_finish_failed(&mut self, database_id: Option<DatabaseId>, err: String) {
1182        if let Some(database_id) = database_id {
1183            if let Some(creating_tables) = self.creating_table_finish_notifier.remove(&database_id)
1184            {
1185                for tx in creating_tables.into_values().flatten() {
1186                    let _ = tx.send(Err(err.clone()));
1187                }
1188            }
1189        } else {
1190            for tx in take(&mut self.creating_table_finish_notifier)
1191                .into_values()
1192                .flatten()
1193                .flat_map(|(_, txs)| txs.into_iter())
1194            {
1195                let _ = tx.send(Err(err.clone()));
1196            }
1197        }
1198    }
1199
1200    pub async fn list_time_travel_table_ids(&self) -> MetaResult<Vec<TableId>> {
1201        let table_ids: Vec<TableId> = Table::find()
1202            .select_only()
1203            .filter(table::Column::TableType.is_in(vec![
1204                TableType::Table,
1205                TableType::MaterializedView,
1206                TableType::Index,
1207            ]))
1208            .column(table::Column::TableId)
1209            .into_tuple()
1210            .all(&self.db)
1211            .await?;
1212        Ok(table_ids)
1213    }
1214
1215    /// Since the tables have been dropped from both meta store and streaming jobs, this method removes those table copies.
1216    /// Returns the removed table copies.
1217    pub(crate) fn complete_dropped_tables(
1218        &mut self,
1219        table_ids: impl IntoIterator<Item = TableId>,
1220    ) -> Vec<PbTable> {
1221        table_ids
1222            .into_iter()
1223            .filter_map(|table_id| {
1224                self.dropped_tables.remove(&table_id).map_or_else(
1225                    || {
1226                        tracing::warn!(%table_id, "table not found");
1227                        None
1228                    },
1229                    Some,
1230                )
1231            })
1232            .collect()
1233    }
1234}