risingwave_meta/controller/catalog/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod alter_op;
16mod create_op;
17mod drop_op;
18mod get_op;
19mod list_op;
20mod test;
21mod util;
22
23use std::collections::{BTreeSet, HashMap, HashSet};
24use std::iter;
25use std::mem::take;
26use std::sync::Arc;
27
28use anyhow::anyhow;
29use itertools::Itertools;
30use risingwave_common::catalog::{
31    DEFAULT_SCHEMA_NAME, FragmentTypeFlag, SYSTEM_SCHEMAS, TableOption,
32};
33use risingwave_common::current_cluster_version;
34use risingwave_common::secret::LocalSecretManager;
35use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont_mut;
36use risingwave_connector::source::UPSTREAM_SOURCE_KEY;
37use risingwave_connector::source::cdc::build_cdc_table_id;
38use risingwave_meta_model::object::ObjectType;
39use risingwave_meta_model::prelude::*;
40use risingwave_meta_model::table::TableType;
41use risingwave_meta_model::{
42    ActorId, ColumnCatalogArray, ConnectionId, CreateType, DatabaseId, FragmentId, I32Array,
43    IndexId, JobStatus, ObjectId, Property, SchemaId, SecretId, SinkFormatDesc, SinkId, SourceId,
44    StreamNode, StreamSourceInfo, StreamingParallelism, SubscriptionId, TableId, UserId, ViewId,
45    connection, database, fragment, function, index, object, object_dependency, schema, secret,
46    sink, source, streaming_job, subscription, table, user_privilege, view,
47};
48use risingwave_pb::catalog::connection::Info as ConnectionInfo;
49use risingwave_pb::catalog::subscription::SubscriptionState;
50use risingwave_pb::catalog::table::PbTableType;
51use risingwave_pb::catalog::{
52    PbComment, PbConnection, PbDatabase, PbFunction, PbIndex, PbSchema, PbSecret, PbSink, PbSource,
53    PbStreamJobStatus, PbSubscription, PbTable, PbView,
54};
55use risingwave_pb::meta::cancel_creating_jobs_request::PbCreatingJobInfo;
56use risingwave_pb::meta::list_object_dependencies_response::PbObjectDependencies;
57use risingwave_pb::meta::object::PbObjectInfo;
58use risingwave_pb::meta::subscribe_response::{
59    Info as NotificationInfo, Info, Operation as NotificationOperation, Operation,
60};
61use risingwave_pb::meta::{PbObject, PbObjectGroup};
62use risingwave_pb::stream_plan::stream_node::NodeBody;
63use risingwave_pb::telemetry::PbTelemetryEventStage;
64use risingwave_pb::user::PbUserInfo;
65use sea_orm::ActiveValue::Set;
66use sea_orm::sea_query::{Expr, Query, SimpleExpr};
67use sea_orm::{
68    ActiveModelTrait, ColumnTrait, DatabaseConnection, DatabaseTransaction, EntityTrait,
69    IntoActiveModel, JoinType, PaginatorTrait, QueryFilter, QuerySelect, RelationTrait,
70    SelectColumns, TransactionTrait, Value,
71};
72use tokio::sync::oneshot::Sender;
73use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
74use tracing::info;
75
76use super::utils::{
77    check_subscription_name_duplicate, get_internal_tables_by_id, rename_relation,
78    rename_relation_refer,
79};
80use crate::controller::ObjectModel;
81use crate::controller::catalog::util::update_internal_tables;
82use crate::controller::utils::*;
83use crate::manager::{
84    IGNORED_NOTIFICATION_VERSION, MetaSrvEnv, NotificationVersion,
85    get_referred_connection_ids_from_source, get_referred_secret_ids_from_source,
86};
87use crate::rpc::ddl_controller::DropMode;
88use crate::telemetry::{MetaTelemetryJobDesc, report_event};
89use crate::{MetaError, MetaResult};
90
91pub type Catalog = (
92    Vec<PbDatabase>,
93    Vec<PbSchema>,
94    Vec<PbTable>,
95    Vec<PbSource>,
96    Vec<PbSink>,
97    Vec<PbSubscription>,
98    Vec<PbIndex>,
99    Vec<PbView>,
100    Vec<PbFunction>,
101    Vec<PbConnection>,
102    Vec<PbSecret>,
103);
104
105pub type CatalogControllerRef = Arc<CatalogController>;
106
107/// `CatalogController` is the controller for catalog related operations, including database, schema, table, view, etc.
108pub struct CatalogController {
109    pub(crate) env: MetaSrvEnv,
110    pub(crate) inner: RwLock<CatalogControllerInner>,
111}
112
113#[derive(Clone, Default, Debug)]
114pub struct DropTableConnectorContext {
115    // we only apply one drop connector action for one table each time, so no need to vector here
116    pub(crate) to_change_streaming_job_id: ObjectId,
117    pub(crate) to_remove_state_table_id: TableId,
118    pub(crate) to_remove_source_id: SourceId,
119}
120
121#[derive(Clone, Default, Debug)]
122pub struct ReleaseContext {
123    pub(crate) database_id: DatabaseId,
124    pub(crate) removed_streaming_job_ids: Vec<ObjectId>,
125    /// Dropped state table list, need to unregister from hummock.
126    pub(crate) removed_state_table_ids: Vec<TableId>,
127
128    /// Dropped secrets, need to remove from secret manager.
129    pub(crate) removed_secret_ids: Vec<SecretId>,
130    /// Dropped sources (when `DROP SOURCE`), need to unregister from source manager.
131    pub(crate) removed_source_ids: Vec<SourceId>,
132    /// Dropped Source fragments (when `DROP MATERIALIZED VIEW` referencing sources),
133    /// need to unregister from source manager.
134    pub(crate) removed_source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
135
136    pub(crate) removed_actors: HashSet<ActorId>,
137    pub(crate) removed_fragments: HashSet<FragmentId>,
138}
139
140impl CatalogController {
141    pub async fn new(env: MetaSrvEnv) -> MetaResult<Self> {
142        let meta_store = env.meta_store();
143        let catalog_controller = Self {
144            env,
145            inner: RwLock::new(CatalogControllerInner {
146                db: meta_store.conn,
147                creating_table_finish_notifier: HashMap::new(),
148                dropped_tables: HashMap::new(),
149            }),
150        };
151
152        catalog_controller.init().await?;
153        Ok(catalog_controller)
154    }
155
156    /// Used in `NotificationService::subscribe`.
157    /// Need to pay attention to the order of acquiring locks to prevent deadlock problems.
158    pub async fn get_inner_read_guard(&self) -> RwLockReadGuard<'_, CatalogControllerInner> {
159        self.inner.read().await
160    }
161
162    pub async fn get_inner_write_guard(&self) -> RwLockWriteGuard<'_, CatalogControllerInner> {
163        self.inner.write().await
164    }
165}
166
167pub struct CatalogControllerInner {
168    pub(crate) db: DatabaseConnection,
169    /// Registered finish notifiers for creating tables.
170    ///
171    /// `DdlController` will update this map, and pass the `tx` side to `CatalogController`.
172    /// On notifying, we can remove the entry from this map.
173    #[expect(clippy::type_complexity)]
174    pub creating_table_finish_notifier:
175        HashMap<DatabaseId, HashMap<ObjectId, Vec<Sender<Result<NotificationVersion, String>>>>>,
176    /// Tables have been dropped from the meta store, but the corresponding barrier remains unfinished.
177    pub dropped_tables: HashMap<TableId, PbTable>,
178}
179
180impl CatalogController {
181    pub(crate) async fn notify_frontend(
182        &self,
183        operation: NotificationOperation,
184        info: NotificationInfo,
185    ) -> NotificationVersion {
186        self.env
187            .notification_manager()
188            .notify_frontend(operation, info)
189            .await
190    }
191
192    pub(crate) async fn notify_frontend_relation_info(
193        &self,
194        operation: NotificationOperation,
195        relation_info: PbObjectInfo,
196    ) -> NotificationVersion {
197        self.env
198            .notification_manager()
199            .notify_frontend_object_info(operation, relation_info)
200            .await
201    }
202
203    pub(crate) async fn current_notification_version(&self) -> NotificationVersion {
204        self.env.notification_manager().current_version().await
205    }
206}
207
208impl CatalogController {
209    pub async fn finish_create_subscription_catalog(&self, subscription_id: u32) -> MetaResult<()> {
210        let inner = self.inner.write().await;
211        let txn = inner.db.begin().await?;
212        let job_id = subscription_id as i32;
213
214        // update `created_at` as now() and `created_at_cluster_version` as current cluster version.
215        let res = Object::update_many()
216            .col_expr(object::Column::CreatedAt, Expr::current_timestamp().into())
217            .col_expr(
218                object::Column::CreatedAtClusterVersion,
219                current_cluster_version().into(),
220            )
221            .filter(object::Column::Oid.eq(job_id))
222            .exec(&txn)
223            .await?;
224        if res.rows_affected == 0 {
225            return Err(MetaError::catalog_id_not_found("subscription", job_id));
226        }
227
228        // mark the target subscription as `Create`.
229        let job = subscription::ActiveModel {
230            subscription_id: Set(job_id),
231            subscription_state: Set(SubscriptionState::Created.into()),
232            ..Default::default()
233        };
234        job.update(&txn).await?;
235
236        let _ = grant_default_privileges_automatically(&txn, job_id).await?;
237
238        txn.commit().await?;
239
240        Ok(())
241    }
242
243    pub async fn notify_create_subscription(
244        &self,
245        subscription_id: u32,
246    ) -> MetaResult<NotificationVersion> {
247        let inner = self.inner.read().await;
248        let job_id = subscription_id as i32;
249        let (subscription, obj) = Subscription::find_by_id(job_id)
250            .find_also_related(Object)
251            .filter(subscription::Column::SubscriptionState.eq(SubscriptionState::Created as i32))
252            .one(&inner.db)
253            .await?
254            .ok_or_else(|| MetaError::catalog_id_not_found("subscription", job_id))?;
255
256        let mut version = self
257            .notify_frontend(
258                NotificationOperation::Add,
259                NotificationInfo::ObjectGroup(PbObjectGroup {
260                    objects: vec![PbObject {
261                        object_info: PbObjectInfo::Subscription(
262                            ObjectModel(subscription, obj.unwrap()).into(),
263                        )
264                        .into(),
265                    }],
266                }),
267            )
268            .await;
269
270        // notify default privileges about the new subscription
271        let updated_user_ids: Vec<UserId> = UserPrivilege::find()
272            .select_only()
273            .distinct()
274            .column(user_privilege::Column::UserId)
275            .filter(user_privilege::Column::Oid.eq(subscription_id as ObjectId))
276            .into_tuple()
277            .all(&inner.db)
278            .await?;
279
280        if !updated_user_ids.is_empty() {
281            let updated_user_infos = list_user_info_by_ids(updated_user_ids, &inner.db).await?;
282            version = self.notify_users_update(updated_user_infos).await;
283        }
284
285        Ok(version)
286    }
287
288    // for telemetry
289    pub async fn get_connector_usage(&self) -> MetaResult<jsonbb::Value> {
290        // get connector usage by source/sink
291        // the expect format is like:
292        // {
293        //     "source": [{
294        //         "$source_id": {
295        //             "connector": "kafka",
296        //             "format": "plain",
297        //             "encode": "json"
298        //         },
299        //     }],
300        //     "sink": [{
301        //         "$sink_id": {
302        //             "connector": "pulsar",
303        //             "format": "upsert",
304        //             "encode": "avro"
305        //         },
306        //     }],
307        // }
308
309        let inner = self.inner.read().await;
310        let source_props_and_info: Vec<(i32, Property, Option<StreamSourceInfo>)> = Source::find()
311            .select_only()
312            .column(source::Column::SourceId)
313            .column(source::Column::WithProperties)
314            .column(source::Column::SourceInfo)
315            .into_tuple()
316            .all(&inner.db)
317            .await?;
318        let sink_props_and_info: Vec<(i32, Property, Option<SinkFormatDesc>)> = Sink::find()
319            .select_only()
320            .column(sink::Column::SinkId)
321            .column(sink::Column::Properties)
322            .column(sink::Column::SinkFormatDesc)
323            .into_tuple()
324            .all(&inner.db)
325            .await?;
326        drop(inner);
327
328        let get_connector_from_property = |property: &Property| -> String {
329            property
330                .0
331                .get(UPSTREAM_SOURCE_KEY)
332                .cloned()
333                .unwrap_or_default()
334        };
335
336        let source_report: Vec<jsonbb::Value> = source_props_and_info
337            .iter()
338            .map(|(oid, property, info)| {
339                let connector_name = get_connector_from_property(property);
340                let mut format = None;
341                let mut encode = None;
342                if let Some(info) = info {
343                    let pb_info = info.to_protobuf();
344                    format = Some(pb_info.format().as_str_name());
345                    encode = Some(pb_info.row_encode().as_str_name());
346                }
347                jsonbb::json!({
348                    oid.to_string(): {
349                        "connector": connector_name,
350                        "format": format,
351                        "encode": encode,
352                    },
353                })
354            })
355            .collect_vec();
356
357        let sink_report: Vec<jsonbb::Value> = sink_props_and_info
358            .iter()
359            .map(|(oid, property, info)| {
360                let connector_name = get_connector_from_property(property);
361                let mut format = None;
362                let mut encode = None;
363                if let Some(info) = info {
364                    let pb_info = info.to_protobuf();
365                    format = Some(pb_info.format().as_str_name());
366                    encode = Some(pb_info.encode().as_str_name());
367                }
368                jsonbb::json!({
369                    oid.to_string(): {
370                        "connector": connector_name,
371                        "format": format,
372                        "encode": encode,
373                    },
374                })
375            })
376            .collect_vec();
377
378        Ok(jsonbb::json!({
379                "source": source_report,
380                "sink": sink_report,
381        }))
382    }
383
384    pub async fn clean_dirty_subscription(
385        &self,
386        database_id: Option<DatabaseId>,
387    ) -> MetaResult<()> {
388        let inner = self.inner.write().await;
389        let txn = inner.db.begin().await?;
390        let filter_condition = object::Column::ObjType.eq(ObjectType::Subscription).and(
391            object::Column::Oid.not_in_subquery(
392                Query::select()
393                    .column(subscription::Column::SubscriptionId)
394                    .from(Subscription)
395                    .and_where(
396                        subscription::Column::SubscriptionState
397                            .eq(SubscriptionState::Created as i32),
398                    )
399                    .take(),
400            ),
401        );
402
403        let filter_condition = if let Some(database_id) = database_id {
404            filter_condition.and(object::Column::DatabaseId.eq(database_id))
405        } else {
406            filter_condition
407        };
408        Object::delete_many()
409            .filter(filter_condition)
410            .exec(&txn)
411            .await?;
412        txn.commit().await?;
413        // We don't need to notify the frontend, because the Init subscription is not send to frontend.
414        Ok(())
415    }
416
417    /// `clean_dirty_creating_jobs` cleans up creating jobs that are creating in Foreground mode or in Initial status.
418    pub async fn clean_dirty_creating_jobs(
419        &self,
420        database_id: Option<DatabaseId>,
421    ) -> MetaResult<Vec<SourceId>> {
422        let inner = self.inner.write().await;
423        let txn = inner.db.begin().await?;
424
425        let filter_condition = streaming_job::Column::JobStatus.eq(JobStatus::Initial).or(
426            streaming_job::Column::JobStatus
427                .eq(JobStatus::Creating)
428                .and(streaming_job::Column::CreateType.eq(CreateType::Foreground)),
429        );
430
431        let filter_condition = if let Some(database_id) = database_id {
432            filter_condition.and(object::Column::DatabaseId.eq(database_id))
433        } else {
434            filter_condition
435        };
436
437        let dirty_job_objs: Vec<PartialObject> = streaming_job::Entity::find()
438            .select_only()
439            .column(streaming_job::Column::JobId)
440            .columns([
441                object::Column::Oid,
442                object::Column::ObjType,
443                object::Column::SchemaId,
444                object::Column::DatabaseId,
445            ])
446            .join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
447            .filter(filter_condition)
448            .into_partial_model()
449            .all(&txn)
450            .await?;
451
452        let updated_table_ids = Self::clean_dirty_sink_downstreams(&txn).await?;
453        let updated_table_objs = if !updated_table_ids.is_empty() {
454            Table::find()
455                .find_also_related(Object)
456                .filter(table::Column::TableId.is_in(updated_table_ids))
457                .all(&txn)
458                .await?
459        } else {
460            vec![]
461        };
462
463        if dirty_job_objs.is_empty() {
464            if !updated_table_objs.is_empty() {
465                txn.commit().await?;
466
467                // Notify the frontend to update the table info.
468                self.notify_frontend(
469                    NotificationOperation::Update,
470                    NotificationInfo::ObjectGroup(PbObjectGroup {
471                        objects: updated_table_objs
472                            .into_iter()
473                            .map(|(t, obj)| PbObject {
474                                object_info: PbObjectInfo::Table(
475                                    ObjectModel(t, obj.unwrap()).into(),
476                                )
477                                .into(),
478                            })
479                            .collect(),
480                    }),
481                )
482                .await;
483            }
484
485            return Ok(vec![]);
486        }
487
488        self.log_cleaned_dirty_jobs(&dirty_job_objs, &txn).await?;
489
490        let dirty_job_ids = dirty_job_objs.iter().map(|obj| obj.oid).collect::<Vec<_>>();
491
492        // Filter out dummy objs for replacement.
493        // todo: we'd better introduce a new dummy object type for replacement.
494        let all_dirty_table_ids = dirty_job_objs
495            .iter()
496            .filter(|obj| obj.obj_type == ObjectType::Table)
497            .map(|obj| obj.oid)
498            .collect_vec();
499        let dirty_table_type_map: HashMap<ObjectId, TableType> = Table::find()
500            .select_only()
501            .column(table::Column::TableId)
502            .column(table::Column::TableType)
503            .filter(table::Column::TableId.is_in(all_dirty_table_ids))
504            .into_tuple::<(ObjectId, TableType)>()
505            .all(&txn)
506            .await?
507            .into_iter()
508            .collect();
509
510        let dirty_background_jobs: HashSet<ObjectId> = streaming_job::Entity::find()
511            .select_only()
512            .column(streaming_job::Column::JobId)
513            .filter(
514                streaming_job::Column::JobId
515                    .is_in(dirty_job_ids.clone())
516                    .and(streaming_job::Column::CreateType.eq(CreateType::Background)),
517            )
518            .into_tuple()
519            .all(&txn)
520            .await?
521            .into_iter()
522            .collect();
523
524        // notify delete for failed materialized views and background jobs.
525        let to_notify_objs = dirty_job_objs
526            .into_iter()
527            .filter(|obj| {
528                matches!(
529                    dirty_table_type_map.get(&obj.oid),
530                    Some(TableType::MaterializedView)
531                ) || dirty_background_jobs.contains(&obj.oid)
532            })
533            .collect_vec();
534
535        // The source ids for dirty tables with connector.
536        // FIXME: we should also clean dirty sources.
537        let dirty_associated_source_ids: Vec<SourceId> = Table::find()
538            .select_only()
539            .column(table::Column::OptionalAssociatedSourceId)
540            .filter(
541                table::Column::TableId
542                    .is_in(dirty_job_ids.clone())
543                    .and(table::Column::OptionalAssociatedSourceId.is_not_null()),
544            )
545            .into_tuple()
546            .all(&txn)
547            .await?;
548
549        let dirty_state_table_ids: Vec<TableId> = Table::find()
550            .select_only()
551            .column(table::Column::TableId)
552            .filter(table::Column::BelongsToJobId.is_in(dirty_job_ids.clone()))
553            .into_tuple()
554            .all(&txn)
555            .await?;
556
557        let dirty_internal_table_objs = Object::find()
558            .select_only()
559            .columns([
560                object::Column::Oid,
561                object::Column::ObjType,
562                object::Column::SchemaId,
563                object::Column::DatabaseId,
564            ])
565            .join(JoinType::InnerJoin, object::Relation::Table.def())
566            .filter(table::Column::BelongsToJobId.is_in(to_notify_objs.iter().map(|obj| obj.oid)))
567            .into_partial_model()
568            .all(&txn)
569            .await?;
570
571        let to_delete_objs: HashSet<ObjectId> = dirty_job_ids
572            .clone()
573            .into_iter()
574            .chain(dirty_state_table_ids.into_iter())
575            .chain(dirty_associated_source_ids.clone().into_iter())
576            .collect();
577
578        let res = Object::delete_many()
579            .filter(object::Column::Oid.is_in(to_delete_objs))
580            .exec(&txn)
581            .await?;
582        assert!(res.rows_affected > 0);
583
584        txn.commit().await?;
585
586        let object_group = build_object_group_for_delete(
587            to_notify_objs
588                .into_iter()
589                .chain(dirty_internal_table_objs.into_iter())
590                .collect_vec(),
591        );
592
593        let _version = self
594            .notify_frontend(NotificationOperation::Delete, object_group)
595            .await;
596
597        // Notify the frontend to update the table info.
598        if !updated_table_objs.is_empty() {
599            self.notify_frontend(
600                NotificationOperation::Update,
601                NotificationInfo::ObjectGroup(PbObjectGroup {
602                    objects: updated_table_objs
603                        .into_iter()
604                        .map(|(t, obj)| PbObject {
605                            object_info: PbObjectInfo::Table(ObjectModel(t, obj.unwrap()).into())
606                                .into(),
607                        })
608                        .collect(),
609                }),
610            )
611            .await;
612        }
613
614        Ok(dirty_associated_source_ids)
615    }
616
617    pub async fn comment_on(&self, comment: PbComment) -> MetaResult<NotificationVersion> {
618        let inner = self.inner.write().await;
619        let txn = inner.db.begin().await?;
620        ensure_object_id(ObjectType::Database, comment.database_id as _, &txn).await?;
621        ensure_object_id(ObjectType::Schema, comment.schema_id as _, &txn).await?;
622        let table_obj = Object::find_by_id(comment.table_id as ObjectId)
623            .one(&txn)
624            .await?
625            .ok_or_else(|| MetaError::catalog_id_not_found("table", comment.table_id))?;
626
627        let table = if let Some(col_idx) = comment.column_index {
628            let columns: ColumnCatalogArray = Table::find_by_id(comment.table_id as TableId)
629                .select_only()
630                .column(table::Column::Columns)
631                .into_tuple()
632                .one(&txn)
633                .await?
634                .ok_or_else(|| MetaError::catalog_id_not_found("table", comment.table_id))?;
635            let mut pb_columns = columns.to_protobuf();
636
637            let column = pb_columns
638                .get_mut(col_idx as usize)
639                .ok_or_else(|| MetaError::catalog_id_not_found("column", col_idx))?;
640            let column_desc = column.column_desc.as_mut().ok_or_else(|| {
641                anyhow!(
642                    "column desc at index {} for table id {} not found",
643                    col_idx,
644                    comment.table_id
645                )
646            })?;
647            column_desc.description = comment.description;
648            table::ActiveModel {
649                table_id: Set(comment.table_id as _),
650                columns: Set(pb_columns.into()),
651                ..Default::default()
652            }
653            .update(&txn)
654            .await?
655        } else {
656            table::ActiveModel {
657                table_id: Set(comment.table_id as _),
658                description: Set(comment.description),
659                ..Default::default()
660            }
661            .update(&txn)
662            .await?
663        };
664        txn.commit().await?;
665
666        let version = self
667            .notify_frontend_relation_info(
668                NotificationOperation::Update,
669                PbObjectInfo::Table(ObjectModel(table, table_obj).into()),
670            )
671            .await;
672
673        Ok(version)
674    }
675
676    pub async fn complete_dropped_tables(
677        &self,
678        table_ids: impl Iterator<Item = TableId>,
679    ) -> Vec<PbTable> {
680        let mut inner = self.inner.write().await;
681        inner.complete_dropped_tables(table_ids)
682    }
683}
684
685/// `CatalogStats` is a struct to store the statistics of all catalogs.
686pub struct CatalogStats {
687    pub table_num: u64,
688    pub mview_num: u64,
689    pub index_num: u64,
690    pub source_num: u64,
691    pub sink_num: u64,
692    pub function_num: u64,
693    pub streaming_job_num: u64,
694    pub actor_num: u64,
695}
696
697impl CatalogControllerInner {
698    pub async fn snapshot(&self) -> MetaResult<(Catalog, Vec<PbUserInfo>)> {
699        let databases = self.list_databases().await?;
700        let schemas = self.list_schemas().await?;
701        let tables = self.list_tables().await?;
702        let sources = self.list_sources().await?;
703        let sinks = self.list_sinks().await?;
704        let subscriptions = self.list_subscriptions().await?;
705        let indexes = self.list_indexes().await?;
706        let views = self.list_views().await?;
707        let functions = self.list_functions().await?;
708        let connections = self.list_connections().await?;
709        let secrets = self.list_secrets().await?;
710
711        let users = self.list_users().await?;
712
713        Ok((
714            (
715                databases,
716                schemas,
717                tables,
718                sources,
719                sinks,
720                subscriptions,
721                indexes,
722                views,
723                functions,
724                connections,
725                secrets,
726            ),
727            users,
728        ))
729    }
730
731    pub async fn stats(&self) -> MetaResult<CatalogStats> {
732        let mut table_num_map: HashMap<_, _> = Table::find()
733            .select_only()
734            .column(table::Column::TableType)
735            .column_as(table::Column::TableId.count(), "num")
736            .group_by(table::Column::TableType)
737            .having(table::Column::TableType.ne(TableType::Internal))
738            .into_tuple::<(TableType, i64)>()
739            .all(&self.db)
740            .await?
741            .into_iter()
742            .map(|(table_type, num)| (table_type, num as u64))
743            .collect();
744
745        let source_num = Source::find().count(&self.db).await?;
746        let sink_num = Sink::find().count(&self.db).await?;
747        let function_num = Function::find().count(&self.db).await?;
748        let streaming_job_num = StreamingJob::find().count(&self.db).await?;
749        let actor_num = Actor::find().count(&self.db).await?;
750
751        Ok(CatalogStats {
752            table_num: table_num_map.remove(&TableType::Table).unwrap_or(0),
753            mview_num: table_num_map
754                .remove(&TableType::MaterializedView)
755                .unwrap_or(0),
756            index_num: table_num_map.remove(&TableType::Index).unwrap_or(0),
757            source_num,
758            sink_num,
759            function_num,
760            streaming_job_num,
761            actor_num,
762        })
763    }
764
765    async fn list_databases(&self) -> MetaResult<Vec<PbDatabase>> {
766        let db_objs = Database::find()
767            .find_also_related(Object)
768            .all(&self.db)
769            .await?;
770        Ok(db_objs
771            .into_iter()
772            .map(|(db, obj)| ObjectModel(db, obj.unwrap()).into())
773            .collect())
774    }
775
776    async fn list_schemas(&self) -> MetaResult<Vec<PbSchema>> {
777        let schema_objs = Schema::find()
778            .find_also_related(Object)
779            .all(&self.db)
780            .await?;
781
782        Ok(schema_objs
783            .into_iter()
784            .map(|(schema, obj)| ObjectModel(schema, obj.unwrap()).into())
785            .collect())
786    }
787
788    async fn list_users(&self) -> MetaResult<Vec<PbUserInfo>> {
789        let mut user_infos: Vec<PbUserInfo> = User::find()
790            .all(&self.db)
791            .await?
792            .into_iter()
793            .map(Into::into)
794            .collect();
795
796        for user_info in &mut user_infos {
797            user_info.grant_privileges = get_user_privilege(user_info.id as _, &self.db).await?;
798        }
799        Ok(user_infos)
800    }
801
802    /// `list_all_tables` return all tables and internal tables.
803    pub async fn list_all_state_tables(&self) -> MetaResult<Vec<PbTable>> {
804        let table_objs = Table::find()
805            .find_also_related(Object)
806            .all(&self.db)
807            .await?;
808
809        Ok(table_objs
810            .into_iter()
811            .map(|(table, obj)| ObjectModel(table, obj.unwrap()).into())
812            .collect())
813    }
814
815    /// `list_tables` return all `CREATED` tables, `CREATING` materialized views/ `BACKGROUND` jobs and internal tables that belong to them.
816    async fn list_tables(&self) -> MetaResult<Vec<PbTable>> {
817        let table_objs = Table::find()
818            .find_also_related(Object)
819            .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
820            .filter(
821                streaming_job::Column::JobStatus.eq(JobStatus::Created).or(
822                    table::Column::TableType
823                        .eq(TableType::MaterializedView)
824                        .or(streaming_job::Column::CreateType.eq(CreateType::Background)),
825                ),
826            )
827            .all(&self.db)
828            .await?;
829
830        let job_statuses: HashMap<ObjectId, JobStatus> = StreamingJob::find()
831            .select_only()
832            .column(streaming_job::Column::JobId)
833            .column(streaming_job::Column::JobStatus)
834            .filter(
835                streaming_job::Column::JobStatus
836                    .eq(JobStatus::Created)
837                    .or(streaming_job::Column::CreateType.eq(CreateType::Background)),
838            )
839            .into_tuple::<(ObjectId, JobStatus)>()
840            .all(&self.db)
841            .await?
842            .into_iter()
843            .collect();
844
845        let job_ids: HashSet<ObjectId> = table_objs
846            .iter()
847            .map(|(t, _)| t.table_id)
848            .chain(job_statuses.keys().cloned())
849            .collect();
850
851        let internal_table_objs = Table::find()
852            .find_also_related(Object)
853            .filter(
854                table::Column::TableType
855                    .eq(TableType::Internal)
856                    .and(table::Column::BelongsToJobId.is_in(job_ids)),
857            )
858            .all(&self.db)
859            .await?;
860
861        Ok(table_objs
862            .into_iter()
863            .chain(internal_table_objs.into_iter())
864            .map(|(table, obj)| {
865                // Correctly set the stream job status for creating materialized views and internal tables.
866                // If the table is not contained in `job_statuses`, it means the job is a creating mv.
867                let status: PbStreamJobStatus = if table.table_type == TableType::Internal {
868                    (*job_statuses
869                        .get(&table.belongs_to_job_id.unwrap())
870                        .unwrap_or(&JobStatus::Creating))
871                    .into()
872                } else {
873                    (*job_statuses
874                        .get(&table.table_id)
875                        .unwrap_or(&JobStatus::Creating))
876                    .into()
877                };
878                let mut pb_table: PbTable = ObjectModel(table, obj.unwrap()).into();
879                pb_table.stream_job_status = status.into();
880                pb_table
881            })
882            .collect())
883    }
884
885    /// `list_sources` return all sources and `CREATED` ones if contains any streaming jobs.
886    async fn list_sources(&self) -> MetaResult<Vec<PbSource>> {
887        let mut source_objs = Source::find()
888            .find_also_related(Object)
889            .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
890            .filter(
891                streaming_job::Column::JobStatus
892                    .is_null()
893                    .or(streaming_job::Column::JobStatus.eq(JobStatus::Created)),
894            )
895            .all(&self.db)
896            .await?;
897
898        // filter out inner connector sources that are still under creating.
899        let created_table_ids: HashSet<TableId> = Table::find()
900            .select_only()
901            .column(table::Column::TableId)
902            .join(JoinType::InnerJoin, table::Relation::Object1.def())
903            .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
904            .filter(
905                table::Column::OptionalAssociatedSourceId
906                    .is_not_null()
907                    .and(streaming_job::Column::JobStatus.eq(JobStatus::Created)),
908            )
909            .into_tuple()
910            .all(&self.db)
911            .await?
912            .into_iter()
913            .collect();
914        source_objs.retain_mut(|(source, _)| {
915            source.optional_associated_table_id.is_none()
916                || created_table_ids.contains(&source.optional_associated_table_id.unwrap())
917        });
918
919        Ok(source_objs
920            .into_iter()
921            .map(|(source, obj)| ObjectModel(source, obj.unwrap()).into())
922            .collect())
923    }
924
925    /// `list_sinks` return all `CREATED` and `BACKGROUND` sinks.
926    async fn list_sinks(&self) -> MetaResult<Vec<PbSink>> {
927        let sink_objs = Sink::find()
928            .find_also_related(Object)
929            .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
930            .filter(
931                streaming_job::Column::JobStatus
932                    .eq(JobStatus::Created)
933                    .or(streaming_job::Column::CreateType.eq(CreateType::Background)),
934            )
935            .all(&self.db)
936            .await?;
937
938        let creating_sinks: HashSet<_> = StreamingJob::find()
939            .select_only()
940            .column(streaming_job::Column::JobId)
941            .filter(
942                streaming_job::Column::JobStatus
943                    .eq(JobStatus::Creating)
944                    .and(
945                        streaming_job::Column::JobId
946                            .is_in(sink_objs.iter().map(|(sink, _)| sink.sink_id)),
947                    ),
948            )
949            .into_tuple::<SinkId>()
950            .all(&self.db)
951            .await?
952            .into_iter()
953            .collect();
954
955        Ok(sink_objs
956            .into_iter()
957            .map(|(sink, obj)| {
958                let is_creating = creating_sinks.contains(&sink.sink_id);
959                let mut pb_sink: PbSink = ObjectModel(sink, obj.unwrap()).into();
960                pb_sink.stream_job_status = if is_creating {
961                    PbStreamJobStatus::Creating.into()
962                } else {
963                    PbStreamJobStatus::Created.into()
964                };
965                pb_sink
966            })
967            .collect())
968    }
969
970    /// `list_subscriptions` return all `CREATED` subscriptions.
971    async fn list_subscriptions(&self) -> MetaResult<Vec<PbSubscription>> {
972        let subscription_objs = Subscription::find()
973            .find_also_related(Object)
974            .filter(subscription::Column::SubscriptionState.eq(SubscriptionState::Created as i32))
975            .all(&self.db)
976            .await?;
977
978        Ok(subscription_objs
979            .into_iter()
980            .map(|(subscription, obj)| ObjectModel(subscription, obj.unwrap()).into())
981            .collect())
982    }
983
984    async fn list_views(&self) -> MetaResult<Vec<PbView>> {
985        let view_objs = View::find().find_also_related(Object).all(&self.db).await?;
986
987        Ok(view_objs
988            .into_iter()
989            .map(|(view, obj)| ObjectModel(view, obj.unwrap()).into())
990            .collect())
991    }
992
993    /// `list_indexes` return all `CREATED` indexes.
994    async fn list_indexes(&self) -> MetaResult<Vec<PbIndex>> {
995        let index_objs = Index::find()
996            .find_also_related(Object)
997            .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
998            .filter(streaming_job::Column::JobStatus.eq(JobStatus::Created))
999            .all(&self.db)
1000            .await?;
1001
1002        Ok(index_objs
1003            .into_iter()
1004            .map(|(index, obj)| ObjectModel(index, obj.unwrap()).into())
1005            .collect())
1006    }
1007
1008    async fn list_connections(&self) -> MetaResult<Vec<PbConnection>> {
1009        let conn_objs = Connection::find()
1010            .find_also_related(Object)
1011            .all(&self.db)
1012            .await?;
1013
1014        Ok(conn_objs
1015            .into_iter()
1016            .map(|(conn, obj)| ObjectModel(conn, obj.unwrap()).into())
1017            .collect())
1018    }
1019
1020    pub async fn list_secrets(&self) -> MetaResult<Vec<PbSecret>> {
1021        let secret_objs = Secret::find()
1022            .find_also_related(Object)
1023            .all(&self.db)
1024            .await?;
1025        Ok(secret_objs
1026            .into_iter()
1027            .map(|(secret, obj)| ObjectModel(secret, obj.unwrap()).into())
1028            .collect())
1029    }
1030
1031    async fn list_functions(&self) -> MetaResult<Vec<PbFunction>> {
1032        let func_objs = Function::find()
1033            .find_also_related(Object)
1034            .all(&self.db)
1035            .await?;
1036
1037        Ok(func_objs
1038            .into_iter()
1039            .map(|(func, obj)| ObjectModel(func, obj.unwrap()).into())
1040            .collect())
1041    }
1042
1043    pub(crate) fn register_finish_notifier(
1044        &mut self,
1045        database_id: DatabaseId,
1046        id: ObjectId,
1047        sender: Sender<Result<NotificationVersion, String>>,
1048    ) {
1049        self.creating_table_finish_notifier
1050            .entry(database_id)
1051            .or_default()
1052            .entry(id)
1053            .or_default()
1054            .push(sender);
1055    }
1056
1057    pub(crate) async fn streaming_job_is_finished(&mut self, id: i32) -> MetaResult<bool> {
1058        let status = StreamingJob::find()
1059            .select_only()
1060            .column(streaming_job::Column::JobStatus)
1061            .filter(streaming_job::Column::JobId.eq(id))
1062            .into_tuple::<JobStatus>()
1063            .one(&self.db)
1064            .await?;
1065
1066        status
1067            .map(|status| status == JobStatus::Created)
1068            .ok_or_else(|| {
1069                MetaError::catalog_id_not_found("streaming job", "may have been cancelled/dropped")
1070            })
1071    }
1072
1073    pub(crate) fn notify_finish_failed(&mut self, database_id: Option<DatabaseId>, err: String) {
1074        if let Some(database_id) = database_id {
1075            if let Some(creating_tables) = self.creating_table_finish_notifier.remove(&database_id)
1076            {
1077                for tx in creating_tables.into_values().flatten() {
1078                    let _ = tx.send(Err(err.clone()));
1079                }
1080            }
1081        } else {
1082            for tx in take(&mut self.creating_table_finish_notifier)
1083                .into_values()
1084                .flatten()
1085                .flat_map(|(_, txs)| txs.into_iter())
1086            {
1087                let _ = tx.send(Err(err.clone()));
1088            }
1089        }
1090    }
1091
1092    pub(crate) fn notify_cancelled(&mut self, database_id: DatabaseId, id: ObjectId) {
1093        if let Some(creating_tables) = self.creating_table_finish_notifier.get_mut(&database_id)
1094            && let Some(tx_list) = creating_tables.remove(&id)
1095        {
1096            for tx in tx_list {
1097                let _ = tx.send(Err("Cancelled".to_owned()));
1098            }
1099        }
1100    }
1101
1102    pub async fn list_time_travel_table_ids(&self) -> MetaResult<Vec<TableId>> {
1103        let table_ids: Vec<TableId> = Table::find()
1104            .select_only()
1105            .filter(table::Column::TableType.is_in(vec![
1106                TableType::Table,
1107                TableType::MaterializedView,
1108                TableType::Index,
1109            ]))
1110            .column(table::Column::TableId)
1111            .into_tuple()
1112            .all(&self.db)
1113            .await?;
1114        Ok(table_ids)
1115    }
1116
1117    /// Since the tables have been dropped from both meta store and streaming jobs, this method removes those table copies.
1118    /// Returns the removed table copies.
1119    pub(crate) fn complete_dropped_tables(
1120        &mut self,
1121        table_ids: impl Iterator<Item = TableId>,
1122    ) -> Vec<PbTable> {
1123        table_ids
1124            .filter_map(|table_id| {
1125                self.dropped_tables.remove(&table_id).map_or_else(
1126                    || {
1127                        tracing::warn!(table_id, "table not found");
1128                        None
1129                    },
1130                    Some,
1131                )
1132            })
1133            .collect()
1134    }
1135}