risingwave_meta/controller/catalog/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod alter_op;
16mod create_op;
17mod drop_op;
18mod get_op;
19mod list_op;
20mod test;
21mod util;
22
23use std::collections::{BTreeSet, HashMap, HashSet};
24use std::iter;
25use std::mem::take;
26use std::sync::Arc;
27
28use anyhow::{Context, anyhow};
29use itertools::Itertools;
30use risingwave_common::catalog::{
31    DEFAULT_SCHEMA_NAME, FragmentTypeFlag, SYSTEM_SCHEMAS, TableOption,
32};
33use risingwave_common::current_cluster_version;
34use risingwave_common::secret::LocalSecretManager;
35use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont_mut;
36use risingwave_connector::source::UPSTREAM_SOURCE_KEY;
37use risingwave_connector::source::cdc::build_cdc_table_id;
38use risingwave_meta_model::object::ObjectType;
39use risingwave_meta_model::prelude::*;
40use risingwave_meta_model::table::{RefreshState, TableType};
41use risingwave_meta_model::{
42    ActorId, ColumnCatalogArray, ConnectionId, CreateType, DatabaseId, FragmentId, I32Array,
43    IndexId, JobStatus, ObjectId, Property, SchemaId, SecretId, SinkFormatDesc, SinkId, SourceId,
44    StreamNode, StreamSourceInfo, StreamingParallelism, SubscriptionId, TableId, UserId, ViewId,
45    connection, database, fragment, function, index, object, object_dependency, schema, secret,
46    sink, source, streaming_job, subscription, table, user_privilege, view,
47};
48use risingwave_pb::catalog::connection::Info as ConnectionInfo;
49use risingwave_pb::catalog::subscription::SubscriptionState;
50use risingwave_pb::catalog::table::PbTableType;
51use risingwave_pb::catalog::{
52    PbComment, PbConnection, PbDatabase, PbFunction, PbIndex, PbSchema, PbSecret, PbSink, PbSource,
53    PbStreamJobStatus, PbSubscription, PbTable, PbView,
54};
55use risingwave_pb::meta::cancel_creating_jobs_request::PbCreatingJobInfo;
56use risingwave_pb::meta::list_object_dependencies_response::PbObjectDependencies;
57use risingwave_pb::meta::object::PbObjectInfo;
58use risingwave_pb::meta::subscribe_response::{
59    Info as NotificationInfo, Info, Operation as NotificationOperation, Operation,
60};
61use risingwave_pb::meta::{PbObject, PbObjectGroup};
62use risingwave_pb::stream_plan::stream_node::NodeBody;
63use risingwave_pb::telemetry::PbTelemetryEventStage;
64use risingwave_pb::user::PbUserInfo;
65use sea_orm::ActiveValue::Set;
66use sea_orm::sea_query::{Expr, Query, SimpleExpr};
67use sea_orm::{
68    ActiveModelTrait, ColumnTrait, DatabaseConnection, DatabaseTransaction, EntityTrait,
69    IntoActiveModel, JoinType, PaginatorTrait, QueryFilter, QuerySelect, RelationTrait,
70    SelectColumns, TransactionTrait, Value,
71};
72use tokio::sync::oneshot::Sender;
73use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
74use tracing::info;
75
76use super::utils::{
77    check_subscription_name_duplicate, get_internal_tables_by_id, rename_relation,
78    rename_relation_refer,
79};
80use crate::controller::ObjectModel;
81use crate::controller::catalog::util::update_internal_tables;
82use crate::controller::utils::*;
83use crate::manager::{
84    IGNORED_NOTIFICATION_VERSION, MetaSrvEnv, NotificationVersion,
85    get_referred_connection_ids_from_source, get_referred_secret_ids_from_source,
86};
87use crate::rpc::ddl_controller::DropMode;
88use crate::telemetry::{MetaTelemetryJobDesc, report_event};
89use crate::{MetaError, MetaResult};
90
91pub type Catalog = (
92    Vec<PbDatabase>,
93    Vec<PbSchema>,
94    Vec<PbTable>,
95    Vec<PbSource>,
96    Vec<PbSink>,
97    Vec<PbSubscription>,
98    Vec<PbIndex>,
99    Vec<PbView>,
100    Vec<PbFunction>,
101    Vec<PbConnection>,
102    Vec<PbSecret>,
103);
104
105pub type CatalogControllerRef = Arc<CatalogController>;
106
107/// `CatalogController` is the controller for catalog related operations, including database, schema, table, view, etc.
108pub struct CatalogController {
109    pub(crate) env: MetaSrvEnv,
110    pub(crate) inner: RwLock<CatalogControllerInner>,
111}
112
113#[derive(Clone, Default, Debug)]
114pub struct DropTableConnectorContext {
115    // we only apply one drop connector action for one table each time, so no need to vector here
116    pub(crate) to_change_streaming_job_id: ObjectId,
117    pub(crate) to_remove_state_table_id: TableId,
118    pub(crate) to_remove_source_id: SourceId,
119}
120
121#[derive(Clone, Default, Debug)]
122pub struct ReleaseContext {
123    pub(crate) database_id: DatabaseId,
124    pub(crate) removed_streaming_job_ids: Vec<ObjectId>,
125    /// Dropped state table list, need to unregister from hummock.
126    pub(crate) removed_state_table_ids: Vec<TableId>,
127
128    /// Dropped secrets, need to remove from secret manager.
129    pub(crate) removed_secret_ids: Vec<SecretId>,
130    /// Dropped sources (when `DROP SOURCE`), need to unregister from source manager.
131    pub(crate) removed_source_ids: Vec<SourceId>,
132    /// Dropped Source fragments (when `DROP MATERIALIZED VIEW` referencing sources),
133    /// need to unregister from source manager.
134    pub(crate) removed_source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
135
136    pub(crate) removed_actors: HashSet<ActorId>,
137    pub(crate) removed_fragments: HashSet<FragmentId>,
138
139    /// Removed sink fragment by target fragment.
140    pub(crate) removed_sink_fragment_by_targets: HashMap<FragmentId, Vec<FragmentId>>,
141}
142
143impl CatalogController {
144    pub async fn new(env: MetaSrvEnv) -> MetaResult<Self> {
145        let meta_store = env.meta_store();
146        let catalog_controller = Self {
147            env,
148            inner: RwLock::new(CatalogControllerInner {
149                db: meta_store.conn,
150                creating_table_finish_notifier: HashMap::new(),
151                dropped_tables: HashMap::new(),
152            }),
153        };
154
155        catalog_controller.init().await?;
156        Ok(catalog_controller)
157    }
158
159    /// Used in `NotificationService::subscribe`.
160    /// Need to pay attention to the order of acquiring locks to prevent deadlock problems.
161    pub async fn get_inner_read_guard(&self) -> RwLockReadGuard<'_, CatalogControllerInner> {
162        self.inner.read().await
163    }
164
165    pub async fn get_inner_write_guard(&self) -> RwLockWriteGuard<'_, CatalogControllerInner> {
166        self.inner.write().await
167    }
168}
169
170pub struct CatalogControllerInner {
171    pub(crate) db: DatabaseConnection,
172    /// Registered finish notifiers for creating tables.
173    ///
174    /// `DdlController` will update this map, and pass the `tx` side to `CatalogController`.
175    /// On notifying, we can remove the entry from this map.
176    #[expect(clippy::type_complexity)]
177    pub creating_table_finish_notifier:
178        HashMap<DatabaseId, HashMap<ObjectId, Vec<Sender<Result<NotificationVersion, String>>>>>,
179    /// Tables have been dropped from the meta store, but the corresponding barrier remains unfinished.
180    pub dropped_tables: HashMap<TableId, PbTable>,
181}
182
183impl CatalogController {
184    pub(crate) async fn notify_frontend(
185        &self,
186        operation: NotificationOperation,
187        info: NotificationInfo,
188    ) -> NotificationVersion {
189        self.env
190            .notification_manager()
191            .notify_frontend(operation, info)
192            .await
193    }
194
195    pub(crate) async fn notify_frontend_relation_info(
196        &self,
197        operation: NotificationOperation,
198        relation_info: PbObjectInfo,
199    ) -> NotificationVersion {
200        self.env
201            .notification_manager()
202            .notify_frontend_object_info(operation, relation_info)
203            .await
204    }
205
206    pub(crate) async fn current_notification_version(&self) -> NotificationVersion {
207        self.env.notification_manager().current_version().await
208    }
209}
210
211impl CatalogController {
212    pub async fn finish_create_subscription_catalog(&self, subscription_id: u32) -> MetaResult<()> {
213        let inner = self.inner.write().await;
214        let txn = inner.db.begin().await?;
215        let job_id = subscription_id as i32;
216
217        // update `created_at` as now() and `created_at_cluster_version` as current cluster version.
218        let res = Object::update_many()
219            .col_expr(object::Column::CreatedAt, Expr::current_timestamp().into())
220            .col_expr(
221                object::Column::CreatedAtClusterVersion,
222                current_cluster_version().into(),
223            )
224            .filter(object::Column::Oid.eq(job_id))
225            .exec(&txn)
226            .await?;
227        if res.rows_affected == 0 {
228            return Err(MetaError::catalog_id_not_found("subscription", job_id));
229        }
230
231        // mark the target subscription as `Create`.
232        let job = subscription::ActiveModel {
233            subscription_id: Set(job_id),
234            subscription_state: Set(SubscriptionState::Created.into()),
235            ..Default::default()
236        };
237        job.update(&txn).await?;
238
239        let _ = grant_default_privileges_automatically(&txn, job_id).await?;
240
241        txn.commit().await?;
242
243        Ok(())
244    }
245
246    pub async fn notify_create_subscription(
247        &self,
248        subscription_id: u32,
249    ) -> MetaResult<NotificationVersion> {
250        let inner = self.inner.read().await;
251        let job_id = subscription_id as i32;
252        let (subscription, obj) = Subscription::find_by_id(job_id)
253            .find_also_related(Object)
254            .filter(subscription::Column::SubscriptionState.eq(SubscriptionState::Created as i32))
255            .one(&inner.db)
256            .await?
257            .ok_or_else(|| MetaError::catalog_id_not_found("subscription", job_id))?;
258
259        let mut version = self
260            .notify_frontend(
261                NotificationOperation::Add,
262                NotificationInfo::ObjectGroup(PbObjectGroup {
263                    objects: vec![PbObject {
264                        object_info: PbObjectInfo::Subscription(
265                            ObjectModel(subscription, obj.unwrap()).into(),
266                        )
267                        .into(),
268                    }],
269                }),
270            )
271            .await;
272
273        // notify default privileges about the new subscription
274        let updated_user_ids: Vec<UserId> = UserPrivilege::find()
275            .select_only()
276            .distinct()
277            .column(user_privilege::Column::UserId)
278            .filter(user_privilege::Column::Oid.eq(subscription_id as ObjectId))
279            .into_tuple()
280            .all(&inner.db)
281            .await?;
282
283        if !updated_user_ids.is_empty() {
284            let updated_user_infos = list_user_info_by_ids(updated_user_ids, &inner.db).await?;
285            version = self.notify_users_update(updated_user_infos).await;
286        }
287
288        Ok(version)
289    }
290
291    // for telemetry
292    pub async fn get_connector_usage(&self) -> MetaResult<jsonbb::Value> {
293        // get connector usage by source/sink
294        // the expect format is like:
295        // {
296        //     "source": [{
297        //         "$source_id": {
298        //             "connector": "kafka",
299        //             "format": "plain",
300        //             "encode": "json"
301        //         },
302        //     }],
303        //     "sink": [{
304        //         "$sink_id": {
305        //             "connector": "pulsar",
306        //             "format": "upsert",
307        //             "encode": "avro"
308        //         },
309        //     }],
310        // }
311
312        let inner = self.inner.read().await;
313        let source_props_and_info: Vec<(i32, Property, Option<StreamSourceInfo>)> = Source::find()
314            .select_only()
315            .column(source::Column::SourceId)
316            .column(source::Column::WithProperties)
317            .column(source::Column::SourceInfo)
318            .into_tuple()
319            .all(&inner.db)
320            .await?;
321        let sink_props_and_info: Vec<(i32, Property, Option<SinkFormatDesc>)> = Sink::find()
322            .select_only()
323            .column(sink::Column::SinkId)
324            .column(sink::Column::Properties)
325            .column(sink::Column::SinkFormatDesc)
326            .into_tuple()
327            .all(&inner.db)
328            .await?;
329        drop(inner);
330
331        let get_connector_from_property = |property: &Property| -> String {
332            property
333                .0
334                .get(UPSTREAM_SOURCE_KEY)
335                .cloned()
336                .unwrap_or_default()
337        };
338
339        let source_report: Vec<jsonbb::Value> = source_props_and_info
340            .iter()
341            .map(|(oid, property, info)| {
342                let connector_name = get_connector_from_property(property);
343                let mut format = None;
344                let mut encode = None;
345                if let Some(info) = info {
346                    let pb_info = info.to_protobuf();
347                    format = Some(pb_info.format().as_str_name());
348                    encode = Some(pb_info.row_encode().as_str_name());
349                }
350                jsonbb::json!({
351                    oid.to_string(): {
352                        "connector": connector_name,
353                        "format": format,
354                        "encode": encode,
355                    },
356                })
357            })
358            .collect_vec();
359
360        let sink_report: Vec<jsonbb::Value> = sink_props_and_info
361            .iter()
362            .map(|(oid, property, info)| {
363                let connector_name = get_connector_from_property(property);
364                let mut format = None;
365                let mut encode = None;
366                if let Some(info) = info {
367                    let pb_info = info.to_protobuf();
368                    format = Some(pb_info.format().as_str_name());
369                    encode = Some(pb_info.encode().as_str_name());
370                }
371                jsonbb::json!({
372                    oid.to_string(): {
373                        "connector": connector_name,
374                        "format": format,
375                        "encode": encode,
376                    },
377                })
378            })
379            .collect_vec();
380
381        Ok(jsonbb::json!({
382                "source": source_report,
383                "sink": sink_report,
384        }))
385    }
386
387    pub async fn clean_dirty_subscription(
388        &self,
389        database_id: Option<DatabaseId>,
390    ) -> MetaResult<()> {
391        let inner = self.inner.write().await;
392        let txn = inner.db.begin().await?;
393        let filter_condition = object::Column::ObjType.eq(ObjectType::Subscription).and(
394            object::Column::Oid.not_in_subquery(
395                Query::select()
396                    .column(subscription::Column::SubscriptionId)
397                    .from(Subscription)
398                    .and_where(
399                        subscription::Column::SubscriptionState
400                            .eq(SubscriptionState::Created as i32),
401                    )
402                    .take(),
403            ),
404        );
405
406        let filter_condition = if let Some(database_id) = database_id {
407            filter_condition.and(object::Column::DatabaseId.eq(database_id))
408        } else {
409            filter_condition
410        };
411        Object::delete_many()
412            .filter(filter_condition)
413            .exec(&txn)
414            .await?;
415        txn.commit().await?;
416        // We don't need to notify the frontend, because the Init subscription is not send to frontend.
417        Ok(())
418    }
419
420    /// `clean_dirty_creating_jobs` cleans up creating jobs that are creating in Foreground mode or in Initial status.
421    pub async fn clean_dirty_creating_jobs(
422        &self,
423        database_id: Option<DatabaseId>,
424    ) -> MetaResult<Vec<SourceId>> {
425        let inner = self.inner.write().await;
426        let txn = inner.db.begin().await?;
427
428        let filter_condition = streaming_job::Column::JobStatus.eq(JobStatus::Initial).or(
429            streaming_job::Column::JobStatus
430                .eq(JobStatus::Creating)
431                .and(streaming_job::Column::CreateType.eq(CreateType::Foreground)),
432        );
433
434        let filter_condition = if let Some(database_id) = database_id {
435            filter_condition.and(object::Column::DatabaseId.eq(database_id))
436        } else {
437            filter_condition
438        };
439
440        let dirty_job_objs: Vec<PartialObject> = streaming_job::Entity::find()
441            .select_only()
442            .column(streaming_job::Column::JobId)
443            .columns([
444                object::Column::Oid,
445                object::Column::ObjType,
446                object::Column::SchemaId,
447                object::Column::DatabaseId,
448            ])
449            .join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
450            .filter(filter_condition)
451            .into_partial_model()
452            .all(&txn)
453            .await?;
454
455        Self::clean_dirty_sink_downstreams(&txn).await?;
456
457        if dirty_job_objs.is_empty() {
458            return Ok(vec![]);
459        }
460
461        self.log_cleaned_dirty_jobs(&dirty_job_objs, &txn).await?;
462
463        let dirty_job_ids = dirty_job_objs.iter().map(|obj| obj.oid).collect::<Vec<_>>();
464
465        // Filter out dummy objs for replacement.
466        // todo: we'd better introduce a new dummy object type for replacement.
467        let all_dirty_table_ids = dirty_job_objs
468            .iter()
469            .filter(|obj| obj.obj_type == ObjectType::Table)
470            .map(|obj| obj.oid)
471            .collect_vec();
472        let dirty_table_type_map: HashMap<ObjectId, TableType> = Table::find()
473            .select_only()
474            .column(table::Column::TableId)
475            .column(table::Column::TableType)
476            .filter(table::Column::TableId.is_in(all_dirty_table_ids))
477            .into_tuple::<(ObjectId, TableType)>()
478            .all(&txn)
479            .await?
480            .into_iter()
481            .collect();
482
483        let dirty_background_jobs: HashSet<ObjectId> = streaming_job::Entity::find()
484            .select_only()
485            .column(streaming_job::Column::JobId)
486            .filter(
487                streaming_job::Column::JobId
488                    .is_in(dirty_job_ids.clone())
489                    .and(streaming_job::Column::CreateType.eq(CreateType::Background)),
490            )
491            .into_tuple()
492            .all(&txn)
493            .await?
494            .into_iter()
495            .collect();
496
497        // notify delete for failed materialized views and background jobs.
498        let to_notify_objs = dirty_job_objs
499            .into_iter()
500            .filter(|obj| {
501                matches!(
502                    dirty_table_type_map.get(&obj.oid),
503                    Some(TableType::MaterializedView)
504                ) || dirty_background_jobs.contains(&obj.oid)
505            })
506            .collect_vec();
507
508        // The source ids for dirty tables with connector.
509        // FIXME: we should also clean dirty sources.
510        let dirty_associated_source_ids: Vec<SourceId> = Table::find()
511            .select_only()
512            .column(table::Column::OptionalAssociatedSourceId)
513            .filter(
514                table::Column::TableId
515                    .is_in(dirty_job_ids.clone())
516                    .and(table::Column::OptionalAssociatedSourceId.is_not_null()),
517            )
518            .into_tuple()
519            .all(&txn)
520            .await?;
521
522        let dirty_state_table_ids: Vec<TableId> = Table::find()
523            .select_only()
524            .column(table::Column::TableId)
525            .filter(table::Column::BelongsToJobId.is_in(dirty_job_ids.clone()))
526            .into_tuple()
527            .all(&txn)
528            .await?;
529
530        let dirty_internal_table_objs = Object::find()
531            .select_only()
532            .columns([
533                object::Column::Oid,
534                object::Column::ObjType,
535                object::Column::SchemaId,
536                object::Column::DatabaseId,
537            ])
538            .join(JoinType::InnerJoin, object::Relation::Table.def())
539            .filter(table::Column::BelongsToJobId.is_in(to_notify_objs.iter().map(|obj| obj.oid)))
540            .into_partial_model()
541            .all(&txn)
542            .await?;
543
544        let to_delete_objs: HashSet<ObjectId> = dirty_job_ids
545            .clone()
546            .into_iter()
547            .chain(dirty_state_table_ids.into_iter())
548            .chain(dirty_associated_source_ids.clone().into_iter())
549            .collect();
550
551        let res = Object::delete_many()
552            .filter(object::Column::Oid.is_in(to_delete_objs))
553            .exec(&txn)
554            .await?;
555        assert!(res.rows_affected > 0);
556
557        txn.commit().await?;
558
559        let object_group = build_object_group_for_delete(
560            to_notify_objs
561                .into_iter()
562                .chain(dirty_internal_table_objs.into_iter())
563                .collect_vec(),
564        );
565
566        let _version = self
567            .notify_frontend(NotificationOperation::Delete, object_group)
568            .await;
569
570        Ok(dirty_associated_source_ids)
571    }
572
573    /// On recovery, reset refreshable table's `refresh_state` to a reasonable state.
574    pub async fn reset_refreshing_tables(&self, database_id: Option<DatabaseId>) -> MetaResult<()> {
575        let inner = self.inner.write().await;
576        let txn = inner.db.begin().await?;
577
578        // IDLE: no change
579        // REFRESHING: reset to IDLE (give up refresh for allowing re-trigger)
580        // FINISHING: no change (materialize executor will recover from state table and continue)
581        let filter_condition = table::Column::RefreshState.eq(RefreshState::Refreshing);
582
583        let filter_condition = if let Some(database_id) = database_id {
584            filter_condition.and(object::Column::DatabaseId.eq(database_id))
585        } else {
586            filter_condition
587        };
588
589        let table_ids: Vec<TableId> = Table::find()
590            .find_also_related(Object)
591            .filter(filter_condition)
592            .all(&txn)
593            .await
594            .context("reset_refreshing_tables: finding table ids")?
595            .into_iter()
596            .map(|(t, _)| t.table_id)
597            .collect();
598
599        let res = Table::update_many()
600            .col_expr(table::Column::RefreshState, Expr::value(RefreshState::Idle))
601            .filter(table::Column::TableId.is_in(table_ids))
602            .exec(&txn)
603            .await
604            .context("reset_refreshing_tables: update refresh state")?;
605
606        txn.commit().await?;
607
608        tracing::debug!(
609            "reset refreshing tables: {} tables updated",
610            res.rows_affected
611        );
612
613        Ok(())
614    }
615
616    pub async fn comment_on(&self, comment: PbComment) -> MetaResult<NotificationVersion> {
617        let inner = self.inner.write().await;
618        let txn = inner.db.begin().await?;
619        ensure_object_id(ObjectType::Database, comment.database_id as _, &txn).await?;
620        ensure_object_id(ObjectType::Schema, comment.schema_id as _, &txn).await?;
621        let table_obj = Object::find_by_id(comment.table_id as ObjectId)
622            .one(&txn)
623            .await?
624            .ok_or_else(|| MetaError::catalog_id_not_found("table", comment.table_id))?;
625
626        let table = if let Some(col_idx) = comment.column_index {
627            let columns: ColumnCatalogArray = Table::find_by_id(comment.table_id as TableId)
628                .select_only()
629                .column(table::Column::Columns)
630                .into_tuple()
631                .one(&txn)
632                .await?
633                .ok_or_else(|| MetaError::catalog_id_not_found("table", comment.table_id))?;
634            let mut pb_columns = columns.to_protobuf();
635
636            let column = pb_columns
637                .get_mut(col_idx as usize)
638                .ok_or_else(|| MetaError::catalog_id_not_found("column", col_idx))?;
639            let column_desc = column.column_desc.as_mut().ok_or_else(|| {
640                anyhow!(
641                    "column desc at index {} for table id {} not found",
642                    col_idx,
643                    comment.table_id
644                )
645            })?;
646            column_desc.description = comment.description;
647            table::ActiveModel {
648                table_id: Set(comment.table_id as _),
649                columns: Set(pb_columns.into()),
650                ..Default::default()
651            }
652            .update(&txn)
653            .await?
654        } else {
655            table::ActiveModel {
656                table_id: Set(comment.table_id as _),
657                description: Set(comment.description),
658                ..Default::default()
659            }
660            .update(&txn)
661            .await?
662        };
663        txn.commit().await?;
664
665        let version = self
666            .notify_frontend_relation_info(
667                NotificationOperation::Update,
668                PbObjectInfo::Table(ObjectModel(table, table_obj).into()),
669            )
670            .await;
671
672        Ok(version)
673    }
674
675    async fn notify_hummock_dropped_tables(&self, tables: Vec<PbTable>) {
676        if tables.is_empty() {
677            return;
678        }
679        let objects = tables
680            .into_iter()
681            .map(|t| PbObject {
682                object_info: Some(PbObjectInfo::Table(t)),
683            })
684            .collect();
685        let group = NotificationInfo::ObjectGroup(PbObjectGroup { objects });
686        self.env
687            .notification_manager()
688            .notify_hummock(NotificationOperation::Delete, group.clone())
689            .await;
690        self.env
691            .notification_manager()
692            .notify_compactor(NotificationOperation::Delete, group)
693            .await;
694    }
695
696    pub async fn complete_dropped_tables(&self, table_ids: impl Iterator<Item = TableId>) {
697        let mut inner = self.inner.write().await;
698        let tables = inner.complete_dropped_tables(table_ids);
699        self.notify_hummock_dropped_tables(tables).await;
700    }
701
702    pub async fn cleanup_dropped_tables(&self) {
703        let mut inner = self.inner.write().await;
704        let tables = inner.dropped_tables.drain().map(|(_, t)| t).collect();
705        self.notify_hummock_dropped_tables(tables).await;
706    }
707}
708
709/// `CatalogStats` is a struct to store the statistics of all catalogs.
710pub struct CatalogStats {
711    pub table_num: u64,
712    pub mview_num: u64,
713    pub index_num: u64,
714    pub source_num: u64,
715    pub sink_num: u64,
716    pub function_num: u64,
717    pub streaming_job_num: u64,
718    pub actor_num: u64,
719}
720
721impl CatalogControllerInner {
722    pub async fn snapshot(&self) -> MetaResult<(Catalog, Vec<PbUserInfo>)> {
723        let databases = self.list_databases().await?;
724        let schemas = self.list_schemas().await?;
725        let tables = self.list_tables().await?;
726        let sources = self.list_sources().await?;
727        let sinks = self.list_sinks().await?;
728        let subscriptions = self.list_subscriptions().await?;
729        let indexes = self.list_indexes().await?;
730        let views = self.list_views().await?;
731        let functions = self.list_functions().await?;
732        let connections = self.list_connections().await?;
733        let secrets = self.list_secrets().await?;
734
735        let users = self.list_users().await?;
736
737        Ok((
738            (
739                databases,
740                schemas,
741                tables,
742                sources,
743                sinks,
744                subscriptions,
745                indexes,
746                views,
747                functions,
748                connections,
749                secrets,
750            ),
751            users,
752        ))
753    }
754
755    pub async fn stats(&self) -> MetaResult<CatalogStats> {
756        let mut table_num_map: HashMap<_, _> = Table::find()
757            .select_only()
758            .column(table::Column::TableType)
759            .column_as(table::Column::TableId.count(), "num")
760            .group_by(table::Column::TableType)
761            .having(table::Column::TableType.ne(TableType::Internal))
762            .into_tuple::<(TableType, i64)>()
763            .all(&self.db)
764            .await?
765            .into_iter()
766            .map(|(table_type, num)| (table_type, num as u64))
767            .collect();
768
769        let source_num = Source::find().count(&self.db).await?;
770        let sink_num = Sink::find().count(&self.db).await?;
771        let function_num = Function::find().count(&self.db).await?;
772        let streaming_job_num = StreamingJob::find().count(&self.db).await?;
773        let actor_num = Actor::find().count(&self.db).await?;
774
775        Ok(CatalogStats {
776            table_num: table_num_map.remove(&TableType::Table).unwrap_or(0),
777            mview_num: table_num_map
778                .remove(&TableType::MaterializedView)
779                .unwrap_or(0),
780            index_num: table_num_map.remove(&TableType::Index).unwrap_or(0),
781            source_num,
782            sink_num,
783            function_num,
784            streaming_job_num,
785            actor_num,
786        })
787    }
788
789    async fn list_databases(&self) -> MetaResult<Vec<PbDatabase>> {
790        let db_objs = Database::find()
791            .find_also_related(Object)
792            .all(&self.db)
793            .await?;
794        Ok(db_objs
795            .into_iter()
796            .map(|(db, obj)| ObjectModel(db, obj.unwrap()).into())
797            .collect())
798    }
799
800    async fn list_schemas(&self) -> MetaResult<Vec<PbSchema>> {
801        let schema_objs = Schema::find()
802            .find_also_related(Object)
803            .all(&self.db)
804            .await?;
805
806        Ok(schema_objs
807            .into_iter()
808            .map(|(schema, obj)| ObjectModel(schema, obj.unwrap()).into())
809            .collect())
810    }
811
812    async fn list_users(&self) -> MetaResult<Vec<PbUserInfo>> {
813        let mut user_infos: Vec<PbUserInfo> = User::find()
814            .all(&self.db)
815            .await?
816            .into_iter()
817            .map(Into::into)
818            .collect();
819
820        for user_info in &mut user_infos {
821            user_info.grant_privileges = get_user_privilege(user_info.id as _, &self.db).await?;
822        }
823        Ok(user_infos)
824    }
825
826    /// `list_all_tables` return all tables and internal tables.
827    pub async fn list_all_state_tables(&self) -> MetaResult<Vec<PbTable>> {
828        let table_objs = Table::find()
829            .find_also_related(Object)
830            .all(&self.db)
831            .await?;
832
833        Ok(table_objs
834            .into_iter()
835            .map(|(table, obj)| ObjectModel(table, obj.unwrap()).into())
836            .collect())
837    }
838
839    /// `list_tables` return all `CREATED` tables, `CREATING` materialized views/ `BACKGROUND` jobs and internal tables that belong to them.
840    async fn list_tables(&self) -> MetaResult<Vec<PbTable>> {
841        let table_objs = Table::find()
842            .find_also_related(Object)
843            .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
844            .filter(
845                streaming_job::Column::JobStatus.eq(JobStatus::Created).or(
846                    table::Column::TableType
847                        .eq(TableType::MaterializedView)
848                        .or(streaming_job::Column::CreateType.eq(CreateType::Background)),
849                ),
850            )
851            .all(&self.db)
852            .await?;
853
854        let job_statuses: HashMap<ObjectId, JobStatus> = StreamingJob::find()
855            .select_only()
856            .column(streaming_job::Column::JobId)
857            .column(streaming_job::Column::JobStatus)
858            .filter(
859                streaming_job::Column::JobStatus
860                    .eq(JobStatus::Created)
861                    .or(streaming_job::Column::CreateType.eq(CreateType::Background)),
862            )
863            .into_tuple::<(ObjectId, JobStatus)>()
864            .all(&self.db)
865            .await?
866            .into_iter()
867            .collect();
868
869        let job_ids: HashSet<ObjectId> = table_objs
870            .iter()
871            .map(|(t, _)| t.table_id)
872            .chain(job_statuses.keys().cloned())
873            .collect();
874
875        let internal_table_objs = Table::find()
876            .find_also_related(Object)
877            .filter(
878                table::Column::TableType
879                    .eq(TableType::Internal)
880                    .and(table::Column::BelongsToJobId.is_in(job_ids)),
881            )
882            .all(&self.db)
883            .await?;
884
885        Ok(table_objs
886            .into_iter()
887            .chain(internal_table_objs.into_iter())
888            .map(|(table, obj)| {
889                // Correctly set the stream job status for creating materialized views and internal tables.
890                // If the table is not contained in `job_statuses`, it means the job is a creating mv.
891                let status: PbStreamJobStatus = if table.table_type == TableType::Internal {
892                    (*job_statuses
893                        .get(&table.belongs_to_job_id.unwrap())
894                        .unwrap_or(&JobStatus::Creating))
895                    .into()
896                } else {
897                    (*job_statuses
898                        .get(&table.table_id)
899                        .unwrap_or(&JobStatus::Creating))
900                    .into()
901                };
902                let mut pb_table: PbTable = ObjectModel(table, obj.unwrap()).into();
903                pb_table.stream_job_status = status.into();
904                pb_table
905            })
906            .collect())
907    }
908
909    /// `list_sources` return all sources and `CREATED` ones if contains any streaming jobs.
910    async fn list_sources(&self) -> MetaResult<Vec<PbSource>> {
911        let mut source_objs = Source::find()
912            .find_also_related(Object)
913            .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
914            .filter(
915                streaming_job::Column::JobStatus
916                    .is_null()
917                    .or(streaming_job::Column::JobStatus.eq(JobStatus::Created)),
918            )
919            .all(&self.db)
920            .await?;
921
922        // filter out inner connector sources that are still under creating.
923        let created_table_ids: HashSet<TableId> = Table::find()
924            .select_only()
925            .column(table::Column::TableId)
926            .join(JoinType::InnerJoin, table::Relation::Object1.def())
927            .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
928            .filter(
929                table::Column::OptionalAssociatedSourceId
930                    .is_not_null()
931                    .and(streaming_job::Column::JobStatus.eq(JobStatus::Created)),
932            )
933            .into_tuple()
934            .all(&self.db)
935            .await?
936            .into_iter()
937            .collect();
938        source_objs.retain_mut(|(source, _)| {
939            source.optional_associated_table_id.is_none()
940                || created_table_ids.contains(&source.optional_associated_table_id.unwrap())
941        });
942
943        Ok(source_objs
944            .into_iter()
945            .map(|(source, obj)| ObjectModel(source, obj.unwrap()).into())
946            .collect())
947    }
948
949    /// `list_sinks` return all `CREATED` and `BACKGROUND` sinks.
950    async fn list_sinks(&self) -> MetaResult<Vec<PbSink>> {
951        let sink_objs = Sink::find()
952            .find_also_related(Object)
953            .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
954            .filter(
955                streaming_job::Column::JobStatus
956                    .eq(JobStatus::Created)
957                    .or(streaming_job::Column::CreateType.eq(CreateType::Background)),
958            )
959            .all(&self.db)
960            .await?;
961
962        let creating_sinks: HashSet<_> = StreamingJob::find()
963            .select_only()
964            .column(streaming_job::Column::JobId)
965            .filter(
966                streaming_job::Column::JobStatus
967                    .eq(JobStatus::Creating)
968                    .and(
969                        streaming_job::Column::JobId
970                            .is_in(sink_objs.iter().map(|(sink, _)| sink.sink_id)),
971                    ),
972            )
973            .into_tuple::<SinkId>()
974            .all(&self.db)
975            .await?
976            .into_iter()
977            .collect();
978
979        Ok(sink_objs
980            .into_iter()
981            .map(|(sink, obj)| {
982                let is_creating = creating_sinks.contains(&sink.sink_id);
983                let mut pb_sink: PbSink = ObjectModel(sink, obj.unwrap()).into();
984                pb_sink.stream_job_status = if is_creating {
985                    PbStreamJobStatus::Creating.into()
986                } else {
987                    PbStreamJobStatus::Created.into()
988                };
989                pb_sink
990            })
991            .collect())
992    }
993
994    /// `list_subscriptions` return all `CREATED` subscriptions.
995    async fn list_subscriptions(&self) -> MetaResult<Vec<PbSubscription>> {
996        let subscription_objs = Subscription::find()
997            .find_also_related(Object)
998            .filter(subscription::Column::SubscriptionState.eq(SubscriptionState::Created as i32))
999            .all(&self.db)
1000            .await?;
1001
1002        Ok(subscription_objs
1003            .into_iter()
1004            .map(|(subscription, obj)| ObjectModel(subscription, obj.unwrap()).into())
1005            .collect())
1006    }
1007
1008    async fn list_views(&self) -> MetaResult<Vec<PbView>> {
1009        let view_objs = View::find().find_also_related(Object).all(&self.db).await?;
1010
1011        Ok(view_objs
1012            .into_iter()
1013            .map(|(view, obj)| ObjectModel(view, obj.unwrap()).into())
1014            .collect())
1015    }
1016
1017    /// `list_indexes` return all `CREATED` and `BACKGROUND` indexes.
1018    async fn list_indexes(&self) -> MetaResult<Vec<PbIndex>> {
1019        let index_objs = Index::find()
1020            .find_also_related(Object)
1021            .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
1022            .filter(
1023                streaming_job::Column::JobStatus
1024                    .eq(JobStatus::Created)
1025                    .or(streaming_job::Column::CreateType.eq(CreateType::Background)),
1026            )
1027            .all(&self.db)
1028            .await?;
1029
1030        let creating_indexes: HashSet<_> = StreamingJob::find()
1031            .select_only()
1032            .column(streaming_job::Column::JobId)
1033            .filter(
1034                streaming_job::Column::JobStatus
1035                    .eq(JobStatus::Creating)
1036                    .and(
1037                        streaming_job::Column::JobId
1038                            .is_in(index_objs.iter().map(|(index, _)| index.index_id)),
1039                    ),
1040            )
1041            .into_tuple::<IndexId>()
1042            .all(&self.db)
1043            .await?
1044            .into_iter()
1045            .collect();
1046
1047        Ok(index_objs
1048            .into_iter()
1049            .map(|(index, obj)| {
1050                let is_creating = creating_indexes.contains(&index.index_id);
1051                let mut pb_index: PbIndex = ObjectModel(index, obj.unwrap()).into();
1052                pb_index.stream_job_status = if is_creating {
1053                    PbStreamJobStatus::Creating.into()
1054                } else {
1055                    PbStreamJobStatus::Created.into()
1056                };
1057                pb_index
1058            })
1059            .collect())
1060    }
1061
1062    async fn list_connections(&self) -> MetaResult<Vec<PbConnection>> {
1063        let conn_objs = Connection::find()
1064            .find_also_related(Object)
1065            .all(&self.db)
1066            .await?;
1067
1068        Ok(conn_objs
1069            .into_iter()
1070            .map(|(conn, obj)| ObjectModel(conn, obj.unwrap()).into())
1071            .collect())
1072    }
1073
1074    pub async fn list_secrets(&self) -> MetaResult<Vec<PbSecret>> {
1075        let secret_objs = Secret::find()
1076            .find_also_related(Object)
1077            .all(&self.db)
1078            .await?;
1079        Ok(secret_objs
1080            .into_iter()
1081            .map(|(secret, obj)| ObjectModel(secret, obj.unwrap()).into())
1082            .collect())
1083    }
1084
1085    async fn list_functions(&self) -> MetaResult<Vec<PbFunction>> {
1086        let func_objs = Function::find()
1087            .find_also_related(Object)
1088            .all(&self.db)
1089            .await?;
1090
1091        Ok(func_objs
1092            .into_iter()
1093            .map(|(func, obj)| ObjectModel(func, obj.unwrap()).into())
1094            .collect())
1095    }
1096
1097    pub(crate) fn register_finish_notifier(
1098        &mut self,
1099        database_id: DatabaseId,
1100        id: ObjectId,
1101        sender: Sender<Result<NotificationVersion, String>>,
1102    ) {
1103        self.creating_table_finish_notifier
1104            .entry(database_id)
1105            .or_default()
1106            .entry(id)
1107            .or_default()
1108            .push(sender);
1109    }
1110
1111    pub(crate) async fn streaming_job_is_finished(&mut self, id: i32) -> MetaResult<bool> {
1112        let status = StreamingJob::find()
1113            .select_only()
1114            .column(streaming_job::Column::JobStatus)
1115            .filter(streaming_job::Column::JobId.eq(id))
1116            .into_tuple::<JobStatus>()
1117            .one(&self.db)
1118            .await?;
1119
1120        status
1121            .map(|status| status == JobStatus::Created)
1122            .ok_or_else(|| {
1123                MetaError::catalog_id_not_found("streaming job", "may have been cancelled/dropped")
1124            })
1125    }
1126
1127    pub(crate) fn notify_finish_failed(&mut self, database_id: Option<DatabaseId>, err: String) {
1128        if let Some(database_id) = database_id {
1129            if let Some(creating_tables) = self.creating_table_finish_notifier.remove(&database_id)
1130            {
1131                for tx in creating_tables.into_values().flatten() {
1132                    let _ = tx.send(Err(err.clone()));
1133                }
1134            }
1135        } else {
1136            for tx in take(&mut self.creating_table_finish_notifier)
1137                .into_values()
1138                .flatten()
1139                .flat_map(|(_, txs)| txs.into_iter())
1140            {
1141                let _ = tx.send(Err(err.clone()));
1142            }
1143        }
1144    }
1145
1146    pub(crate) fn notify_cancelled(&mut self, database_id: DatabaseId, id: ObjectId) {
1147        if let Some(creating_tables) = self.creating_table_finish_notifier.get_mut(&database_id)
1148            && let Some(tx_list) = creating_tables.remove(&id)
1149        {
1150            for tx in tx_list {
1151                let _ = tx.send(Err("Cancelled".to_owned()));
1152            }
1153        }
1154    }
1155
1156    pub async fn list_time_travel_table_ids(&self) -> MetaResult<Vec<TableId>> {
1157        let table_ids: Vec<TableId> = Table::find()
1158            .select_only()
1159            .filter(table::Column::TableType.is_in(vec![
1160                TableType::Table,
1161                TableType::MaterializedView,
1162                TableType::Index,
1163            ]))
1164            .column(table::Column::TableId)
1165            .into_tuple()
1166            .all(&self.db)
1167            .await?;
1168        Ok(table_ids)
1169    }
1170
1171    /// Since the tables have been dropped from both meta store and streaming jobs, this method removes those table copies.
1172    /// Returns the removed table copies.
1173    pub(crate) fn complete_dropped_tables(
1174        &mut self,
1175        table_ids: impl Iterator<Item = TableId>,
1176    ) -> Vec<PbTable> {
1177        table_ids
1178            .filter_map(|table_id| {
1179                self.dropped_tables.remove(&table_id).map_or_else(
1180                    || {
1181                        tracing::warn!(table_id, "table not found");
1182                        None
1183                    },
1184                    Some,
1185                )
1186            })
1187            .collect()
1188    }
1189}