risingwave_meta/controller/catalog/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod alter_op;
16mod create_op;
17mod drop_op;
18mod get_op;
19mod list_op;
20mod test;
21mod util;
22
23use std::collections::{BTreeSet, HashMap, HashSet};
24use std::iter;
25use std::mem::take;
26use std::sync::Arc;
27
28use anyhow::{Context, anyhow};
29use itertools::Itertools;
30use risingwave_common::catalog::{
31    DEFAULT_SCHEMA_NAME, FragmentTypeFlag, SYSTEM_SCHEMAS, TableOption,
32};
33use risingwave_common::current_cluster_version;
34use risingwave_common::id::JobId;
35use risingwave_common::secret::LocalSecretManager;
36use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont_mut;
37use risingwave_connector::source::UPSTREAM_SOURCE_KEY;
38use risingwave_connector::source::cdc::build_cdc_table_id;
39use risingwave_meta_model::object::ObjectType;
40use risingwave_meta_model::prelude::*;
41use risingwave_meta_model::table::{RefreshState, TableType};
42use risingwave_meta_model::{
43    ActorId, ColumnCatalogArray, ConnectionId, CreateType, DatabaseId, FragmentId, I32Array,
44    IndexId, JobStatus, ObjectId, Property, SchemaId, SecretId, SinkFormatDesc, SinkId, SourceId,
45    StreamNode, StreamSourceInfo, StreamingParallelism, SubscriptionId, TableId, UserId, ViewId,
46    connection, database, fragment, function, index, object, object_dependency, schema, secret,
47    sink, source, streaming_job, subscription, table, user_privilege, view,
48};
49use risingwave_pb::catalog::connection::Info as ConnectionInfo;
50use risingwave_pb::catalog::subscription::SubscriptionState;
51use risingwave_pb::catalog::table::PbTableType;
52use risingwave_pb::catalog::{
53    PbComment, PbConnection, PbDatabase, PbFunction, PbIndex, PbSchema, PbSecret, PbSink, PbSource,
54    PbStreamJobStatus, PbSubscription, PbTable, PbView,
55};
56use risingwave_pb::meta::cancel_creating_jobs_request::PbCreatingJobInfo;
57use risingwave_pb::meta::list_object_dependencies_response::PbObjectDependencies;
58use risingwave_pb::meta::object::PbObjectInfo;
59use risingwave_pb::meta::subscribe_response::{
60    Info as NotificationInfo, Info, Operation as NotificationOperation, Operation,
61};
62use risingwave_pb::meta::{PbObject, PbObjectGroup};
63use risingwave_pb::stream_plan::stream_node::NodeBody;
64use risingwave_pb::telemetry::PbTelemetryEventStage;
65use risingwave_pb::user::PbUserInfo;
66use sea_orm::ActiveValue::Set;
67use sea_orm::sea_query::{Expr, Query, SimpleExpr};
68use sea_orm::{
69    ActiveModelTrait, ColumnTrait, DatabaseConnection, DatabaseTransaction, EntityTrait,
70    IntoActiveModel, JoinType, PaginatorTrait, QueryFilter, QuerySelect, RelationTrait,
71    SelectColumns, TransactionTrait, Value,
72};
73use tokio::sync::oneshot::Sender;
74use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
75use tracing::info;
76
77use super::utils::{
78    check_subscription_name_duplicate, get_internal_tables_by_id, rename_relation,
79    rename_relation_refer,
80};
81use crate::controller::ObjectModel;
82use crate::controller::catalog::util::update_internal_tables;
83use crate::controller::utils::*;
84use crate::manager::{
85    IGNORED_NOTIFICATION_VERSION, MetaSrvEnv, NotificationVersion,
86    get_referred_connection_ids_from_source, get_referred_secret_ids_from_source,
87};
88use crate::rpc::ddl_controller::DropMode;
89use crate::telemetry::{MetaTelemetryJobDesc, report_event};
90use crate::{MetaError, MetaResult};
91
92pub type Catalog = (
93    Vec<PbDatabase>,
94    Vec<PbSchema>,
95    Vec<PbTable>,
96    Vec<PbSource>,
97    Vec<PbSink>,
98    Vec<PbSubscription>,
99    Vec<PbIndex>,
100    Vec<PbView>,
101    Vec<PbFunction>,
102    Vec<PbConnection>,
103    Vec<PbSecret>,
104);
105
106pub type CatalogControllerRef = Arc<CatalogController>;
107
108/// `CatalogController` is the controller for catalog related operations, including database, schema, table, view, etc.
109pub struct CatalogController {
110    pub(crate) env: MetaSrvEnv,
111    pub(crate) inner: RwLock<CatalogControllerInner>,
112}
113
114#[derive(Clone, Default, Debug)]
115pub struct DropTableConnectorContext {
116    // we only apply one drop connector action for one table each time, so no need to vector here
117    pub(crate) to_change_streaming_job_id: JobId,
118    pub(crate) to_remove_state_table_id: TableId,
119    pub(crate) to_remove_source_id: SourceId,
120}
121
122#[derive(Clone, Default, Debug)]
123pub struct ReleaseContext {
124    pub(crate) database_id: DatabaseId,
125    pub(crate) removed_streaming_job_ids: Vec<JobId>,
126    /// Dropped state table list, need to unregister from hummock.
127    pub(crate) removed_state_table_ids: Vec<TableId>,
128
129    /// Dropped secrets, need to remove from secret manager.
130    pub(crate) removed_secret_ids: Vec<SecretId>,
131    /// Dropped sources (when `DROP SOURCE`), need to unregister from source manager.
132    pub(crate) removed_source_ids: Vec<SourceId>,
133    /// Dropped Source fragments (when `DROP MATERIALIZED VIEW` referencing sources),
134    /// need to unregister from source manager.
135    pub(crate) removed_source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
136
137    pub(crate) removed_actors: HashSet<ActorId>,
138    pub(crate) removed_fragments: HashSet<FragmentId>,
139
140    /// Removed sink fragment by target fragment.
141    pub(crate) removed_sink_fragment_by_targets: HashMap<FragmentId, Vec<FragmentId>>,
142}
143
144impl CatalogController {
145    pub async fn new(env: MetaSrvEnv) -> MetaResult<Self> {
146        let meta_store = env.meta_store();
147        let catalog_controller = Self {
148            env,
149            inner: RwLock::new(CatalogControllerInner {
150                db: meta_store.conn,
151                creating_table_finish_notifier: HashMap::new(),
152                dropped_tables: HashMap::new(),
153            }),
154        };
155
156        catalog_controller.init().await?;
157        Ok(catalog_controller)
158    }
159
160    /// Used in `NotificationService::subscribe`.
161    /// Need to pay attention to the order of acquiring locks to prevent deadlock problems.
162    pub async fn get_inner_read_guard(&self) -> RwLockReadGuard<'_, CatalogControllerInner> {
163        self.inner.read().await
164    }
165
166    pub async fn get_inner_write_guard(&self) -> RwLockWriteGuard<'_, CatalogControllerInner> {
167        self.inner.write().await
168    }
169}
170
171pub struct CatalogControllerInner {
172    pub(crate) db: DatabaseConnection,
173    /// Registered finish notifiers for creating tables.
174    ///
175    /// `DdlController` will update this map, and pass the `tx` side to `CatalogController`.
176    /// On notifying, we can remove the entry from this map.
177    #[expect(clippy::type_complexity)]
178    pub creating_table_finish_notifier:
179        HashMap<DatabaseId, HashMap<JobId, Vec<Sender<Result<NotificationVersion, String>>>>>,
180    /// Tables have been dropped from the meta store, but the corresponding barrier remains unfinished.
181    pub dropped_tables: HashMap<TableId, PbTable>,
182}
183
184impl CatalogController {
185    pub(crate) async fn notify_frontend(
186        &self,
187        operation: NotificationOperation,
188        info: NotificationInfo,
189    ) -> NotificationVersion {
190        self.env
191            .notification_manager()
192            .notify_frontend(operation, info)
193            .await
194    }
195
196    pub(crate) async fn notify_frontend_relation_info(
197        &self,
198        operation: NotificationOperation,
199        relation_info: PbObjectInfo,
200    ) -> NotificationVersion {
201        self.env
202            .notification_manager()
203            .notify_frontend_object_info(operation, relation_info)
204            .await
205    }
206
207    pub(crate) async fn current_notification_version(&self) -> NotificationVersion {
208        self.env.notification_manager().current_version().await
209    }
210}
211
212impl CatalogController {
213    pub async fn finish_create_subscription_catalog(&self, subscription_id: u32) -> MetaResult<()> {
214        let inner = self.inner.write().await;
215        let txn = inner.db.begin().await?;
216        let job_id = subscription_id as i32;
217
218        // update `created_at` as now() and `created_at_cluster_version` as current cluster version.
219        let res = Object::update_many()
220            .col_expr(object::Column::CreatedAt, Expr::current_timestamp().into())
221            .col_expr(
222                object::Column::CreatedAtClusterVersion,
223                current_cluster_version().into(),
224            )
225            .filter(object::Column::Oid.eq(job_id))
226            .exec(&txn)
227            .await?;
228        if res.rows_affected == 0 {
229            return Err(MetaError::catalog_id_not_found("subscription", job_id));
230        }
231
232        // mark the target subscription as `Create`.
233        let job = subscription::ActiveModel {
234            subscription_id: Set(job_id),
235            subscription_state: Set(SubscriptionState::Created.into()),
236            ..Default::default()
237        };
238        job.update(&txn).await?;
239
240        let _ = grant_default_privileges_automatically(&txn, job_id).await?;
241
242        txn.commit().await?;
243
244        Ok(())
245    }
246
247    pub async fn notify_create_subscription(
248        &self,
249        subscription_id: u32,
250    ) -> MetaResult<NotificationVersion> {
251        let inner = self.inner.read().await;
252        let job_id = subscription_id as i32;
253        let (subscription, obj) = Subscription::find_by_id(job_id)
254            .find_also_related(Object)
255            .filter(subscription::Column::SubscriptionState.eq(SubscriptionState::Created as i32))
256            .one(&inner.db)
257            .await?
258            .ok_or_else(|| MetaError::catalog_id_not_found("subscription", job_id))?;
259
260        let mut version = self
261            .notify_frontend(
262                NotificationOperation::Add,
263                NotificationInfo::ObjectGroup(PbObjectGroup {
264                    objects: vec![PbObject {
265                        object_info: PbObjectInfo::Subscription(
266                            ObjectModel(subscription, obj.unwrap()).into(),
267                        )
268                        .into(),
269                    }],
270                }),
271            )
272            .await;
273
274        // notify default privileges about the new subscription
275        let updated_user_ids: Vec<UserId> = UserPrivilege::find()
276            .select_only()
277            .distinct()
278            .column(user_privilege::Column::UserId)
279            .filter(user_privilege::Column::Oid.eq(subscription_id as ObjectId))
280            .into_tuple()
281            .all(&inner.db)
282            .await?;
283
284        if !updated_user_ids.is_empty() {
285            let updated_user_infos = list_user_info_by_ids(updated_user_ids, &inner.db).await?;
286            version = self.notify_users_update(updated_user_infos).await;
287        }
288
289        Ok(version)
290    }
291
292    // for telemetry
293    pub async fn get_connector_usage(&self) -> MetaResult<jsonbb::Value> {
294        // get connector usage by source/sink
295        // the expect format is like:
296        // {
297        //     "source": [{
298        //         "$source_id": {
299        //             "connector": "kafka",
300        //             "format": "plain",
301        //             "encode": "json"
302        //         },
303        //     }],
304        //     "sink": [{
305        //         "$sink_id": {
306        //             "connector": "pulsar",
307        //             "format": "upsert",
308        //             "encode": "avro"
309        //         },
310        //     }],
311        // }
312
313        let inner = self.inner.read().await;
314        let source_props_and_info: Vec<(i32, Property, Option<StreamSourceInfo>)> = Source::find()
315            .select_only()
316            .column(source::Column::SourceId)
317            .column(source::Column::WithProperties)
318            .column(source::Column::SourceInfo)
319            .into_tuple()
320            .all(&inner.db)
321            .await?;
322        let sink_props_and_info: Vec<(i32, Property, Option<SinkFormatDesc>)> = Sink::find()
323            .select_only()
324            .column(sink::Column::SinkId)
325            .column(sink::Column::Properties)
326            .column(sink::Column::SinkFormatDesc)
327            .into_tuple()
328            .all(&inner.db)
329            .await?;
330        drop(inner);
331
332        let get_connector_from_property = |property: &Property| -> String {
333            property
334                .0
335                .get(UPSTREAM_SOURCE_KEY)
336                .cloned()
337                .unwrap_or_default()
338        };
339
340        let source_report: Vec<jsonbb::Value> = source_props_and_info
341            .iter()
342            .map(|(oid, property, info)| {
343                let connector_name = get_connector_from_property(property);
344                let mut format = None;
345                let mut encode = None;
346                if let Some(info) = info {
347                    let pb_info = info.to_protobuf();
348                    format = Some(pb_info.format().as_str_name());
349                    encode = Some(pb_info.row_encode().as_str_name());
350                }
351                jsonbb::json!({
352                    oid.to_string(): {
353                        "connector": connector_name,
354                        "format": format,
355                        "encode": encode,
356                    },
357                })
358            })
359            .collect_vec();
360
361        let sink_report: Vec<jsonbb::Value> = sink_props_and_info
362            .iter()
363            .map(|(oid, property, info)| {
364                let connector_name = get_connector_from_property(property);
365                let mut format = None;
366                let mut encode = None;
367                if let Some(info) = info {
368                    let pb_info = info.to_protobuf();
369                    format = Some(pb_info.format().as_str_name());
370                    encode = Some(pb_info.encode().as_str_name());
371                }
372                jsonbb::json!({
373                    oid.to_string(): {
374                        "connector": connector_name,
375                        "format": format,
376                        "encode": encode,
377                    },
378                })
379            })
380            .collect_vec();
381
382        Ok(jsonbb::json!({
383                "source": source_report,
384                "sink": sink_report,
385        }))
386    }
387
388    pub async fn clean_dirty_subscription(
389        &self,
390        database_id: Option<DatabaseId>,
391    ) -> MetaResult<()> {
392        let inner = self.inner.write().await;
393        let txn = inner.db.begin().await?;
394        let filter_condition = object::Column::ObjType.eq(ObjectType::Subscription).and(
395            object::Column::Oid.not_in_subquery(
396                Query::select()
397                    .column(subscription::Column::SubscriptionId)
398                    .from(Subscription)
399                    .and_where(
400                        subscription::Column::SubscriptionState
401                            .eq(SubscriptionState::Created as i32),
402                    )
403                    .take(),
404            ),
405        );
406
407        let filter_condition = if let Some(database_id) = database_id {
408            filter_condition.and(object::Column::DatabaseId.eq(database_id))
409        } else {
410            filter_condition
411        };
412        Object::delete_many()
413            .filter(filter_condition)
414            .exec(&txn)
415            .await?;
416        txn.commit().await?;
417        // We don't need to notify the frontend, because the Init subscription is not send to frontend.
418        Ok(())
419    }
420
421    /// `clean_dirty_creating_jobs` cleans up creating jobs that are creating in Foreground mode or in Initial status.
422    pub async fn clean_dirty_creating_jobs(
423        &self,
424        database_id: Option<DatabaseId>,
425    ) -> MetaResult<Vec<SourceId>> {
426        let inner = self.inner.write().await;
427        let txn = inner.db.begin().await?;
428
429        let filter_condition = streaming_job::Column::JobStatus.eq(JobStatus::Initial).or(
430            streaming_job::Column::JobStatus
431                .eq(JobStatus::Creating)
432                .and(streaming_job::Column::CreateType.eq(CreateType::Foreground)),
433        );
434
435        let filter_condition = if let Some(database_id) = database_id {
436            filter_condition.and(object::Column::DatabaseId.eq(database_id))
437        } else {
438            filter_condition
439        };
440
441        let dirty_job_objs: Vec<PartialObject> = streaming_job::Entity::find()
442            .select_only()
443            .column(streaming_job::Column::JobId)
444            .columns([
445                object::Column::Oid,
446                object::Column::ObjType,
447                object::Column::SchemaId,
448                object::Column::DatabaseId,
449            ])
450            .join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
451            .filter(filter_condition)
452            .into_partial_model()
453            .all(&txn)
454            .await?;
455
456        Self::clean_dirty_sink_downstreams(&txn).await?;
457
458        if dirty_job_objs.is_empty() {
459            return Ok(vec![]);
460        }
461
462        self.log_cleaned_dirty_jobs(&dirty_job_objs, &txn).await?;
463
464        let dirty_job_ids = dirty_job_objs.iter().map(|obj| obj.oid).collect::<Vec<_>>();
465
466        // Filter out dummy objs for replacement.
467        // todo: we'd better introduce a new dummy object type for replacement.
468        let all_dirty_table_ids = dirty_job_objs
469            .iter()
470            .filter(|obj| obj.obj_type == ObjectType::Table)
471            .map(|obj| obj.oid)
472            .collect_vec();
473        let dirty_table_type_map: HashMap<ObjectId, TableType> = Table::find()
474            .select_only()
475            .column(table::Column::TableId)
476            .column(table::Column::TableType)
477            .filter(table::Column::TableId.is_in(all_dirty_table_ids))
478            .into_tuple::<(ObjectId, TableType)>()
479            .all(&txn)
480            .await?
481            .into_iter()
482            .collect();
483
484        let dirty_background_jobs: HashSet<ObjectId> = streaming_job::Entity::find()
485            .select_only()
486            .column(streaming_job::Column::JobId)
487            .filter(
488                streaming_job::Column::JobId
489                    .is_in(dirty_job_ids.clone())
490                    .and(streaming_job::Column::CreateType.eq(CreateType::Background)),
491            )
492            .into_tuple()
493            .all(&txn)
494            .await?
495            .into_iter()
496            .collect();
497
498        // notify delete for failed materialized views and background jobs.
499        let to_notify_objs = dirty_job_objs
500            .into_iter()
501            .filter(|obj| {
502                matches!(
503                    dirty_table_type_map.get(&obj.oid),
504                    Some(TableType::MaterializedView)
505                ) || dirty_background_jobs.contains(&obj.oid)
506            })
507            .collect_vec();
508
509        // The source ids for dirty tables with connector.
510        // FIXME: we should also clean dirty sources.
511        let dirty_associated_source_ids: Vec<SourceId> = Table::find()
512            .select_only()
513            .column(table::Column::OptionalAssociatedSourceId)
514            .filter(
515                table::Column::TableId
516                    .is_in(dirty_job_ids.clone())
517                    .and(table::Column::OptionalAssociatedSourceId.is_not_null()),
518            )
519            .into_tuple()
520            .all(&txn)
521            .await?;
522
523        let dirty_state_table_ids: Vec<TableId> = Table::find()
524            .select_only()
525            .column(table::Column::TableId)
526            .filter(table::Column::BelongsToJobId.is_in(dirty_job_ids.clone()))
527            .into_tuple()
528            .all(&txn)
529            .await?;
530
531        let dirty_internal_table_objs = Object::find()
532            .select_only()
533            .columns([
534                object::Column::Oid,
535                object::Column::ObjType,
536                object::Column::SchemaId,
537                object::Column::DatabaseId,
538            ])
539            .join(JoinType::InnerJoin, object::Relation::Table.def())
540            .filter(table::Column::BelongsToJobId.is_in(to_notify_objs.iter().map(|obj| obj.oid)))
541            .into_partial_model()
542            .all(&txn)
543            .await?;
544
545        let to_delete_objs: HashSet<ObjectId> = dirty_job_ids
546            .clone()
547            .into_iter()
548            .chain(
549                dirty_state_table_ids
550                    .into_iter()
551                    .map(|table_id| table_id.as_raw_id() as _),
552            )
553            .chain(dirty_associated_source_ids.clone().into_iter())
554            .collect();
555
556        let res = Object::delete_many()
557            .filter(object::Column::Oid.is_in(to_delete_objs))
558            .exec(&txn)
559            .await?;
560        assert!(res.rows_affected > 0);
561
562        txn.commit().await?;
563
564        let object_group = build_object_group_for_delete(
565            to_notify_objs
566                .into_iter()
567                .chain(dirty_internal_table_objs.into_iter())
568                .collect_vec(),
569        );
570
571        let _version = self
572            .notify_frontend(NotificationOperation::Delete, object_group)
573            .await;
574
575        Ok(dirty_associated_source_ids)
576    }
577
578    /// On recovery, reset refreshable table's `refresh_state` to a reasonable state.
579    pub async fn reset_refreshing_tables(&self, database_id: Option<DatabaseId>) -> MetaResult<()> {
580        let inner = self.inner.write().await;
581        let txn = inner.db.begin().await?;
582
583        // IDLE: no change
584        // REFRESHING: reset to IDLE (give up refresh for allowing re-trigger)
585        // FINISHING: no change (materialize executor will recover from state table and continue)
586        let filter_condition = table::Column::RefreshState.eq(RefreshState::Refreshing);
587
588        let filter_condition = if let Some(database_id) = database_id {
589            filter_condition.and(object::Column::DatabaseId.eq(database_id))
590        } else {
591            filter_condition
592        };
593
594        let table_ids: Vec<TableId> = Table::find()
595            .find_also_related(Object)
596            .filter(filter_condition)
597            .all(&txn)
598            .await
599            .context("reset_refreshing_tables: finding table ids")?
600            .into_iter()
601            .map(|(t, _)| t.table_id)
602            .collect();
603
604        let res = Table::update_many()
605            .col_expr(table::Column::RefreshState, Expr::value(RefreshState::Idle))
606            .filter(table::Column::TableId.is_in(table_ids))
607            .exec(&txn)
608            .await
609            .context("reset_refreshing_tables: update refresh state")?;
610
611        txn.commit().await?;
612
613        tracing::debug!(
614            "reset refreshing tables: {} tables updated",
615            res.rows_affected
616        );
617
618        Ok(())
619    }
620
621    pub async fn comment_on(&self, comment: PbComment) -> MetaResult<NotificationVersion> {
622        let inner = self.inner.write().await;
623        let txn = inner.db.begin().await?;
624        ensure_object_id(ObjectType::Database, comment.database_id as _, &txn).await?;
625        ensure_object_id(ObjectType::Schema, comment.schema_id as _, &txn).await?;
626        let table_obj = Object::find_by_id(comment.table_id as ObjectId)
627            .one(&txn)
628            .await?
629            .ok_or_else(|| MetaError::catalog_id_not_found("table", comment.table_id))?;
630
631        let table = if let Some(col_idx) = comment.column_index {
632            let columns: ColumnCatalogArray = Table::find_by_id(TableId::new(comment.table_id))
633                .select_only()
634                .column(table::Column::Columns)
635                .into_tuple()
636                .one(&txn)
637                .await?
638                .ok_or_else(|| MetaError::catalog_id_not_found("table", comment.table_id))?;
639            let mut pb_columns = columns.to_protobuf();
640
641            let column = pb_columns
642                .get_mut(col_idx as usize)
643                .ok_or_else(|| MetaError::catalog_id_not_found("column", col_idx))?;
644            let column_desc = column.column_desc.as_mut().ok_or_else(|| {
645                anyhow!(
646                    "column desc at index {} for table id {} not found",
647                    col_idx,
648                    comment.table_id
649                )
650            })?;
651            column_desc.description = comment.description;
652            table::ActiveModel {
653                table_id: Set(comment.table_id.into()),
654                columns: Set(pb_columns.into()),
655                ..Default::default()
656            }
657            .update(&txn)
658            .await?
659        } else {
660            table::ActiveModel {
661                table_id: Set(comment.table_id.into()),
662                description: Set(comment.description),
663                ..Default::default()
664            }
665            .update(&txn)
666            .await?
667        };
668        txn.commit().await?;
669
670        let version = self
671            .notify_frontend_relation_info(
672                NotificationOperation::Update,
673                PbObjectInfo::Table(ObjectModel(table, table_obj).into()),
674            )
675            .await;
676
677        Ok(version)
678    }
679
680    async fn notify_hummock_dropped_tables(&self, tables: Vec<PbTable>) {
681        if tables.is_empty() {
682            return;
683        }
684        let objects = tables
685            .into_iter()
686            .map(|t| PbObject {
687                object_info: Some(PbObjectInfo::Table(t)),
688            })
689            .collect();
690        let group = NotificationInfo::ObjectGroup(PbObjectGroup { objects });
691        self.env
692            .notification_manager()
693            .notify_hummock(NotificationOperation::Delete, group.clone())
694            .await;
695        self.env
696            .notification_manager()
697            .notify_compactor(NotificationOperation::Delete, group)
698            .await;
699    }
700
701    pub async fn complete_dropped_tables(&self, table_ids: impl IntoIterator<Item = TableId>) {
702        let mut inner = self.inner.write().await;
703        let tables = inner.complete_dropped_tables(table_ids);
704        self.notify_hummock_dropped_tables(tables).await;
705    }
706
707    pub async fn cleanup_dropped_tables(&self) {
708        let mut inner = self.inner.write().await;
709        let tables = inner.dropped_tables.drain().map(|(_, t)| t).collect();
710        self.notify_hummock_dropped_tables(tables).await;
711    }
712
713    pub async fn stats(&self) -> MetaResult<CatalogStats> {
714        let inner = self.inner.read().await;
715
716        let mut table_num_map: HashMap<_, _> = Table::find()
717            .select_only()
718            .column(table::Column::TableType)
719            .column_as(table::Column::TableId.count(), "num")
720            .group_by(table::Column::TableType)
721            .having(table::Column::TableType.ne(TableType::Internal))
722            .into_tuple::<(TableType, i64)>()
723            .all(&inner.db)
724            .await?
725            .into_iter()
726            .map(|(table_type, num)| (table_type, num as u64))
727            .collect();
728
729        let source_num = Source::find().count(&inner.db).await?;
730        let sink_num = Sink::find().count(&inner.db).await?;
731        let function_num = Function::find().count(&inner.db).await?;
732        let streaming_job_num = StreamingJob::find().count(&inner.db).await?;
733
734        let actor_num = {
735            let guard = self.env.shared_actor_info.read_guard();
736            guard
737                .iter_over_fragments()
738                .map(|(_, fragment)| fragment.actors.len() as u64)
739                .sum::<u64>()
740        };
741
742        Ok(CatalogStats {
743            table_num: table_num_map.remove(&TableType::Table).unwrap_or(0),
744            mview_num: table_num_map
745                .remove(&TableType::MaterializedView)
746                .unwrap_or(0),
747            index_num: table_num_map.remove(&TableType::Index).unwrap_or(0),
748            source_num,
749            sink_num,
750            function_num,
751            streaming_job_num,
752            actor_num,
753        })
754    }
755}
756
757/// `CatalogStats` is a struct to store the statistics of all catalogs.
758pub struct CatalogStats {
759    pub table_num: u64,
760    pub mview_num: u64,
761    pub index_num: u64,
762    pub source_num: u64,
763    pub sink_num: u64,
764    pub function_num: u64,
765    pub streaming_job_num: u64,
766    pub actor_num: u64,
767}
768
769impl CatalogControllerInner {
770    pub async fn snapshot(&self) -> MetaResult<(Catalog, Vec<PbUserInfo>)> {
771        let databases = self.list_databases().await?;
772        let schemas = self.list_schemas().await?;
773        let tables = self.list_tables().await?;
774        let sources = self.list_sources().await?;
775        let sinks = self.list_sinks().await?;
776        let subscriptions = self.list_subscriptions().await?;
777        let indexes = self.list_indexes().await?;
778        let views = self.list_views().await?;
779        let functions = self.list_functions().await?;
780        let connections = self.list_connections().await?;
781        let secrets = self.list_secrets().await?;
782
783        let users = self.list_users().await?;
784
785        Ok((
786            (
787                databases,
788                schemas,
789                tables,
790                sources,
791                sinks,
792                subscriptions,
793                indexes,
794                views,
795                functions,
796                connections,
797                secrets,
798            ),
799            users,
800        ))
801    }
802
803    async fn list_databases(&self) -> MetaResult<Vec<PbDatabase>> {
804        let db_objs = Database::find()
805            .find_also_related(Object)
806            .all(&self.db)
807            .await?;
808        Ok(db_objs
809            .into_iter()
810            .map(|(db, obj)| ObjectModel(db, obj.unwrap()).into())
811            .collect())
812    }
813
814    async fn list_schemas(&self) -> MetaResult<Vec<PbSchema>> {
815        let schema_objs = Schema::find()
816            .find_also_related(Object)
817            .all(&self.db)
818            .await?;
819
820        Ok(schema_objs
821            .into_iter()
822            .map(|(schema, obj)| ObjectModel(schema, obj.unwrap()).into())
823            .collect())
824    }
825
826    async fn list_users(&self) -> MetaResult<Vec<PbUserInfo>> {
827        let mut user_infos: Vec<PbUserInfo> = User::find()
828            .all(&self.db)
829            .await?
830            .into_iter()
831            .map(Into::into)
832            .collect();
833
834        for user_info in &mut user_infos {
835            user_info.grant_privileges = get_user_privilege(user_info.id as _, &self.db).await?;
836        }
837        Ok(user_infos)
838    }
839
840    /// `list_all_tables` return all tables and internal tables.
841    pub async fn list_all_state_tables(&self) -> MetaResult<Vec<PbTable>> {
842        let table_objs = Table::find()
843            .find_also_related(Object)
844            .all(&self.db)
845            .await?;
846
847        Ok(table_objs
848            .into_iter()
849            .map(|(table, obj)| ObjectModel(table, obj.unwrap()).into())
850            .collect())
851    }
852
853    /// `list_tables` return all `CREATED` tables, `CREATING` materialized views/ `BACKGROUND` jobs and internal tables that belong to them.
854    async fn list_tables(&self) -> MetaResult<Vec<PbTable>> {
855        let table_objs = Table::find()
856            .find_also_related(Object)
857            .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
858            .filter(
859                streaming_job::Column::JobStatus.eq(JobStatus::Created).or(
860                    table::Column::TableType
861                        .eq(TableType::MaterializedView)
862                        .or(streaming_job::Column::CreateType.eq(CreateType::Background)),
863                ),
864            )
865            .all(&self.db)
866            .await?;
867
868        let job_statuses: HashMap<JobId, JobStatus> = StreamingJob::find()
869            .select_only()
870            .column(streaming_job::Column::JobId)
871            .column(streaming_job::Column::JobStatus)
872            .filter(
873                streaming_job::Column::JobStatus
874                    .eq(JobStatus::Created)
875                    .or(streaming_job::Column::CreateType.eq(CreateType::Background)),
876            )
877            .into_tuple::<(JobId, JobStatus)>()
878            .all(&self.db)
879            .await?
880            .into_iter()
881            .collect();
882
883        let job_ids: HashSet<JobId> = table_objs
884            .iter()
885            .map(|(t, _)| t.table_id.as_job_id())
886            .chain(job_statuses.keys().cloned())
887            .collect();
888
889        let internal_table_objs = Table::find()
890            .find_also_related(Object)
891            .filter(
892                table::Column::TableType
893                    .eq(TableType::Internal)
894                    .and(table::Column::BelongsToJobId.is_in(job_ids)),
895            )
896            .all(&self.db)
897            .await?;
898
899        Ok(table_objs
900            .into_iter()
901            .chain(internal_table_objs.into_iter())
902            .map(|(table, obj)| {
903                // Correctly set the stream job status for creating materialized views and internal tables.
904                // If the table is not contained in `job_statuses`, it means the job is a creating mv.
905                let status: PbStreamJobStatus = if table.table_type == TableType::Internal {
906                    (*job_statuses
907                        .get(&table.belongs_to_job_id.unwrap())
908                        .unwrap_or(&JobStatus::Creating))
909                    .into()
910                } else {
911                    (*job_statuses
912                        .get(&table.table_id.as_job_id())
913                        .unwrap_or(&JobStatus::Creating))
914                    .into()
915                };
916                let mut pb_table: PbTable = ObjectModel(table, obj.unwrap()).into();
917                pb_table.stream_job_status = status.into();
918                pb_table
919            })
920            .collect())
921    }
922
923    /// `list_sources` return all sources and `CREATED` ones if contains any streaming jobs.
924    async fn list_sources(&self) -> MetaResult<Vec<PbSource>> {
925        let mut source_objs = Source::find()
926            .find_also_related(Object)
927            .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
928            .filter(
929                streaming_job::Column::JobStatus
930                    .is_null()
931                    .or(streaming_job::Column::JobStatus.eq(JobStatus::Created)),
932            )
933            .all(&self.db)
934            .await?;
935
936        // filter out inner connector sources that are still under creating.
937        let created_table_ids: HashSet<TableId> = Table::find()
938            .select_only()
939            .column(table::Column::TableId)
940            .join(JoinType::InnerJoin, table::Relation::Object1.def())
941            .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
942            .filter(
943                table::Column::OptionalAssociatedSourceId
944                    .is_not_null()
945                    .and(streaming_job::Column::JobStatus.eq(JobStatus::Created)),
946            )
947            .into_tuple()
948            .all(&self.db)
949            .await?
950            .into_iter()
951            .collect();
952        source_objs.retain_mut(|(source, _)| {
953            source.optional_associated_table_id.is_none()
954                || created_table_ids.contains(&source.optional_associated_table_id.unwrap())
955        });
956
957        Ok(source_objs
958            .into_iter()
959            .map(|(source, obj)| ObjectModel(source, obj.unwrap()).into())
960            .collect())
961    }
962
963    /// `list_sinks` return all `CREATED` and `BACKGROUND` sinks.
964    async fn list_sinks(&self) -> MetaResult<Vec<PbSink>> {
965        let sink_objs = Sink::find()
966            .find_also_related(Object)
967            .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
968            .filter(
969                streaming_job::Column::JobStatus
970                    .eq(JobStatus::Created)
971                    .or(streaming_job::Column::CreateType.eq(CreateType::Background)),
972            )
973            .all(&self.db)
974            .await?;
975
976        let creating_sinks: HashSet<_> = StreamingJob::find()
977            .select_only()
978            .column(streaming_job::Column::JobId)
979            .filter(
980                streaming_job::Column::JobStatus
981                    .eq(JobStatus::Creating)
982                    .and(
983                        streaming_job::Column::JobId
984                            .is_in(sink_objs.iter().map(|(sink, _)| sink.sink_id)),
985                    ),
986            )
987            .into_tuple::<SinkId>()
988            .all(&self.db)
989            .await?
990            .into_iter()
991            .collect();
992
993        Ok(sink_objs
994            .into_iter()
995            .map(|(sink, obj)| {
996                let is_creating = creating_sinks.contains(&sink.sink_id);
997                let mut pb_sink: PbSink = ObjectModel(sink, obj.unwrap()).into();
998                pb_sink.stream_job_status = if is_creating {
999                    PbStreamJobStatus::Creating.into()
1000                } else {
1001                    PbStreamJobStatus::Created.into()
1002                };
1003                pb_sink
1004            })
1005            .collect())
1006    }
1007
1008    /// `list_subscriptions` return all `CREATED` subscriptions.
1009    async fn list_subscriptions(&self) -> MetaResult<Vec<PbSubscription>> {
1010        let subscription_objs = Subscription::find()
1011            .find_also_related(Object)
1012            .filter(subscription::Column::SubscriptionState.eq(SubscriptionState::Created as i32))
1013            .all(&self.db)
1014            .await?;
1015
1016        Ok(subscription_objs
1017            .into_iter()
1018            .map(|(subscription, obj)| ObjectModel(subscription, obj.unwrap()).into())
1019            .collect())
1020    }
1021
1022    async fn list_views(&self) -> MetaResult<Vec<PbView>> {
1023        let view_objs = View::find().find_also_related(Object).all(&self.db).await?;
1024
1025        Ok(view_objs
1026            .into_iter()
1027            .map(|(view, obj)| ObjectModel(view, obj.unwrap()).into())
1028            .collect())
1029    }
1030
1031    /// `list_indexes` return all `CREATED` and `BACKGROUND` indexes.
1032    async fn list_indexes(&self) -> MetaResult<Vec<PbIndex>> {
1033        let index_objs = Index::find()
1034            .find_also_related(Object)
1035            .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
1036            .filter(
1037                streaming_job::Column::JobStatus
1038                    .eq(JobStatus::Created)
1039                    .or(streaming_job::Column::CreateType.eq(CreateType::Background)),
1040            )
1041            .all(&self.db)
1042            .await?;
1043
1044        let creating_indexes: HashSet<_> = StreamingJob::find()
1045            .select_only()
1046            .column(streaming_job::Column::JobId)
1047            .filter(
1048                streaming_job::Column::JobStatus
1049                    .eq(JobStatus::Creating)
1050                    .and(
1051                        streaming_job::Column::JobId
1052                            .is_in(index_objs.iter().map(|(index, _)| index.index_id)),
1053                    ),
1054            )
1055            .into_tuple::<IndexId>()
1056            .all(&self.db)
1057            .await?
1058            .into_iter()
1059            .collect();
1060
1061        Ok(index_objs
1062            .into_iter()
1063            .map(|(index, obj)| {
1064                let is_creating = creating_indexes.contains(&index.index_id);
1065                let mut pb_index: PbIndex = ObjectModel(index, obj.unwrap()).into();
1066                pb_index.stream_job_status = if is_creating {
1067                    PbStreamJobStatus::Creating.into()
1068                } else {
1069                    PbStreamJobStatus::Created.into()
1070                };
1071                pb_index
1072            })
1073            .collect())
1074    }
1075
1076    async fn list_connections(&self) -> MetaResult<Vec<PbConnection>> {
1077        let conn_objs = Connection::find()
1078            .find_also_related(Object)
1079            .all(&self.db)
1080            .await?;
1081
1082        Ok(conn_objs
1083            .into_iter()
1084            .map(|(conn, obj)| ObjectModel(conn, obj.unwrap()).into())
1085            .collect())
1086    }
1087
1088    pub async fn list_secrets(&self) -> MetaResult<Vec<PbSecret>> {
1089        let secret_objs = Secret::find()
1090            .find_also_related(Object)
1091            .all(&self.db)
1092            .await?;
1093        Ok(secret_objs
1094            .into_iter()
1095            .map(|(secret, obj)| ObjectModel(secret, obj.unwrap()).into())
1096            .collect())
1097    }
1098
1099    async fn list_functions(&self) -> MetaResult<Vec<PbFunction>> {
1100        let func_objs = Function::find()
1101            .find_also_related(Object)
1102            .all(&self.db)
1103            .await?;
1104
1105        Ok(func_objs
1106            .into_iter()
1107            .map(|(func, obj)| ObjectModel(func, obj.unwrap()).into())
1108            .collect())
1109    }
1110
1111    pub(crate) fn register_finish_notifier(
1112        &mut self,
1113        database_id: DatabaseId,
1114        id: JobId,
1115        sender: Sender<Result<NotificationVersion, String>>,
1116    ) {
1117        self.creating_table_finish_notifier
1118            .entry(database_id)
1119            .or_default()
1120            .entry(id)
1121            .or_default()
1122            .push(sender);
1123    }
1124
1125    pub(crate) async fn streaming_job_is_finished(&mut self, id: JobId) -> MetaResult<bool> {
1126        let status = StreamingJob::find()
1127            .select_only()
1128            .column(streaming_job::Column::JobStatus)
1129            .filter(streaming_job::Column::JobId.eq(id))
1130            .into_tuple::<JobStatus>()
1131            .one(&self.db)
1132            .await?;
1133
1134        status
1135            .map(|status| status == JobStatus::Created)
1136            .ok_or_else(|| {
1137                MetaError::catalog_id_not_found("streaming job", "may have been cancelled/dropped")
1138            })
1139    }
1140
1141    pub(crate) fn notify_finish_failed(&mut self, database_id: Option<DatabaseId>, err: String) {
1142        if let Some(database_id) = database_id {
1143            if let Some(creating_tables) = self.creating_table_finish_notifier.remove(&database_id)
1144            {
1145                for tx in creating_tables.into_values().flatten() {
1146                    let _ = tx.send(Err(err.clone()));
1147                }
1148            }
1149        } else {
1150            for tx in take(&mut self.creating_table_finish_notifier)
1151                .into_values()
1152                .flatten()
1153                .flat_map(|(_, txs)| txs.into_iter())
1154            {
1155                let _ = tx.send(Err(err.clone()));
1156            }
1157        }
1158    }
1159
1160    pub(crate) fn notify_cancelled(&mut self, database_id: DatabaseId, job_id: JobId) {
1161        if let Some(creating_tables) = self.creating_table_finish_notifier.get_mut(&database_id)
1162            && let Some(tx_list) = creating_tables.remove(&job_id)
1163        {
1164            for tx in tx_list {
1165                let _ = tx.send(Err("Cancelled".to_owned()));
1166            }
1167        }
1168    }
1169
1170    pub async fn list_time_travel_table_ids(&self) -> MetaResult<Vec<TableId>> {
1171        let table_ids: Vec<TableId> = Table::find()
1172            .select_only()
1173            .filter(table::Column::TableType.is_in(vec![
1174                TableType::Table,
1175                TableType::MaterializedView,
1176                TableType::Index,
1177            ]))
1178            .column(table::Column::TableId)
1179            .into_tuple()
1180            .all(&self.db)
1181            .await?;
1182        Ok(table_ids)
1183    }
1184
1185    /// Since the tables have been dropped from both meta store and streaming jobs, this method removes those table copies.
1186    /// Returns the removed table copies.
1187    pub(crate) fn complete_dropped_tables(
1188        &mut self,
1189        table_ids: impl IntoIterator<Item = TableId>,
1190    ) -> Vec<PbTable> {
1191        table_ids
1192            .into_iter()
1193            .filter_map(|table_id| {
1194                self.dropped_tables.remove(&table_id).map_or_else(
1195                    || {
1196                        tracing::warn!(%table_id, "table not found");
1197                        None
1198                    },
1199                    Some,
1200                )
1201            })
1202            .collect()
1203    }
1204}