risingwave_meta/controller/catalog/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod alter_op;
16mod create_op;
17mod drop_op;
18mod get_op;
19mod list_op;
20mod test;
21mod util;
22
23use std::collections::{BTreeSet, HashMap, HashSet};
24use std::iter;
25use std::mem::take;
26use std::sync::Arc;
27
28use anyhow::anyhow;
29use itertools::Itertools;
30use risingwave_common::catalog::{DEFAULT_SCHEMA_NAME, SYSTEM_SCHEMAS, TableOption};
31use risingwave_common::current_cluster_version;
32use risingwave_common::secret::LocalSecretManager;
33use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont_mut;
34use risingwave_connector::source::UPSTREAM_SOURCE_KEY;
35use risingwave_connector::source::cdc::build_cdc_table_id;
36use risingwave_meta_model::object::ObjectType;
37use risingwave_meta_model::prelude::*;
38use risingwave_meta_model::table::TableType;
39use risingwave_meta_model::{
40    ActorId, ColumnCatalogArray, ConnectionId, CreateType, DatabaseId, FragmentId, I32Array,
41    IndexId, JobStatus, ObjectId, Property, SchemaId, SecretId, SinkFormatDesc, SinkId, SourceId,
42    StreamNode, StreamSourceInfo, StreamingParallelism, SubscriptionId, TableId, UserId, ViewId,
43    connection, database, fragment, function, index, object, object_dependency, schema, secret,
44    sink, source, streaming_job, subscription, table, user_privilege, view,
45};
46use risingwave_pb::catalog::connection::Info as ConnectionInfo;
47use risingwave_pb::catalog::subscription::SubscriptionState;
48use risingwave_pb::catalog::table::PbTableType;
49use risingwave_pb::catalog::{
50    PbComment, PbConnection, PbDatabase, PbFunction, PbIndex, PbSchema, PbSecret, PbSink, PbSource,
51    PbStreamJobStatus, PbSubscription, PbTable, PbView,
52};
53use risingwave_pb::meta::cancel_creating_jobs_request::PbCreatingJobInfo;
54use risingwave_pb::meta::list_object_dependencies_response::PbObjectDependencies;
55use risingwave_pb::meta::object::PbObjectInfo;
56use risingwave_pb::meta::subscribe_response::{
57    Info as NotificationInfo, Info, Operation as NotificationOperation, Operation,
58};
59use risingwave_pb::meta::{PbFragmentWorkerSlotMapping, PbObject, PbObjectGroup};
60use risingwave_pb::stream_plan::FragmentTypeFlag;
61use risingwave_pb::stream_plan::stream_node::NodeBody;
62use risingwave_pb::telemetry::PbTelemetryEventStage;
63use risingwave_pb::user::PbUserInfo;
64use sea_orm::ActiveValue::Set;
65use sea_orm::sea_query::{Expr, Query, SimpleExpr};
66use sea_orm::{
67    ActiveModelTrait, ColumnTrait, DatabaseConnection, DatabaseTransaction, EntityTrait,
68    IntoActiveModel, JoinType, PaginatorTrait, QueryFilter, QuerySelect, RelationTrait,
69    SelectColumns, TransactionTrait, Value,
70};
71use tokio::sync::oneshot::Sender;
72use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
73use tracing::info;
74
75use super::utils::{
76    check_subscription_name_duplicate, get_internal_tables_by_id, rename_relation,
77    rename_relation_refer,
78};
79use crate::controller::ObjectModel;
80use crate::controller::catalog::util::update_internal_tables;
81use crate::controller::utils::*;
82use crate::manager::{
83    IGNORED_NOTIFICATION_VERSION, MetaSrvEnv, NotificationVersion,
84    get_referred_connection_ids_from_source, get_referred_secret_ids_from_source,
85};
86use crate::rpc::ddl_controller::DropMode;
87use crate::telemetry::{MetaTelemetryJobDesc, report_event};
88use crate::{MetaError, MetaResult};
89
90pub type Catalog = (
91    Vec<PbDatabase>,
92    Vec<PbSchema>,
93    Vec<PbTable>,
94    Vec<PbSource>,
95    Vec<PbSink>,
96    Vec<PbSubscription>,
97    Vec<PbIndex>,
98    Vec<PbView>,
99    Vec<PbFunction>,
100    Vec<PbConnection>,
101    Vec<PbSecret>,
102);
103
104pub type CatalogControllerRef = Arc<CatalogController>;
105
106/// `CatalogController` is the controller for catalog related operations, including database, schema, table, view, etc.
107pub struct CatalogController {
108    pub(crate) env: MetaSrvEnv,
109    pub(crate) inner: RwLock<CatalogControllerInner>,
110}
111
112#[derive(Clone, Default, Debug)]
113pub struct DropTableConnectorContext {
114    // we only apply one drop connector action for one table each time, so no need to vector here
115    pub(crate) to_change_streaming_job_id: ObjectId,
116    pub(crate) to_remove_state_table_id: TableId,
117    pub(crate) to_remove_source_id: SourceId,
118}
119
120#[derive(Clone, Default, Debug)]
121pub struct ReleaseContext {
122    pub(crate) database_id: DatabaseId,
123    pub(crate) removed_streaming_job_ids: Vec<ObjectId>,
124    /// Dropped state table list, need to unregister from hummock.
125    pub(crate) removed_state_table_ids: Vec<TableId>,
126
127    /// Dropped secrets, need to remove from secret manager.
128    pub(crate) removed_secret_ids: Vec<SecretId>,
129    /// Dropped sources (when `DROP SOURCE`), need to unregister from source manager.
130    pub(crate) removed_source_ids: Vec<SourceId>,
131    /// Dropped Source fragments (when `DROP MATERIALIZED VIEW` referencing sources),
132    /// need to unregister from source manager.
133    pub(crate) removed_source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
134
135    pub(crate) removed_actors: HashSet<ActorId>,
136    pub(crate) removed_fragments: HashSet<FragmentId>,
137}
138
139impl CatalogController {
140    pub async fn new(env: MetaSrvEnv) -> MetaResult<Self> {
141        let meta_store = env.meta_store();
142        let catalog_controller = Self {
143            env,
144            inner: RwLock::new(CatalogControllerInner {
145                db: meta_store.conn,
146                creating_table_finish_notifier: HashMap::new(),
147                dropped_tables: HashMap::new(),
148            }),
149        };
150
151        catalog_controller.init().await?;
152        Ok(catalog_controller)
153    }
154
155    /// Used in `NotificationService::subscribe`.
156    /// Need to pay attention to the order of acquiring locks to prevent deadlock problems.
157    pub async fn get_inner_read_guard(&self) -> RwLockReadGuard<'_, CatalogControllerInner> {
158        self.inner.read().await
159    }
160
161    pub async fn get_inner_write_guard(&self) -> RwLockWriteGuard<'_, CatalogControllerInner> {
162        self.inner.write().await
163    }
164}
165
166pub struct CatalogControllerInner {
167    pub(crate) db: DatabaseConnection,
168    /// Registered finish notifiers for creating tables.
169    ///
170    /// `DdlController` will update this map, and pass the `tx` side to `CatalogController`.
171    /// On notifying, we can remove the entry from this map.
172    #[expect(clippy::type_complexity)]
173    pub creating_table_finish_notifier:
174        HashMap<DatabaseId, HashMap<ObjectId, Vec<Sender<Result<NotificationVersion, String>>>>>,
175    /// Tables have been dropped from the meta store, but the corresponding barrier remains unfinished.
176    pub dropped_tables: HashMap<TableId, PbTable>,
177}
178
179impl CatalogController {
180    pub(crate) async fn notify_frontend(
181        &self,
182        operation: NotificationOperation,
183        info: NotificationInfo,
184    ) -> NotificationVersion {
185        self.env
186            .notification_manager()
187            .notify_frontend(operation, info)
188            .await
189    }
190
191    pub(crate) async fn notify_frontend_relation_info(
192        &self,
193        operation: NotificationOperation,
194        relation_info: PbObjectInfo,
195    ) -> NotificationVersion {
196        self.env
197            .notification_manager()
198            .notify_frontend_object_info(operation, relation_info)
199            .await
200    }
201
202    pub(crate) async fn current_notification_version(&self) -> NotificationVersion {
203        self.env.notification_manager().current_version().await
204    }
205}
206
207impl CatalogController {
208    pub async fn finish_create_subscription_catalog(&self, subscription_id: u32) -> MetaResult<()> {
209        let inner = self.inner.write().await;
210        let txn = inner.db.begin().await?;
211        let job_id = subscription_id as i32;
212
213        // update `created_at` as now() and `created_at_cluster_version` as current cluster version.
214        let res = Object::update_many()
215            .col_expr(object::Column::CreatedAt, Expr::current_timestamp().into())
216            .col_expr(
217                object::Column::CreatedAtClusterVersion,
218                current_cluster_version().into(),
219            )
220            .filter(object::Column::Oid.eq(job_id))
221            .exec(&txn)
222            .await?;
223        if res.rows_affected == 0 {
224            return Err(MetaError::catalog_id_not_found("subscription", job_id));
225        }
226
227        // mark the target subscription as `Create`.
228        let job = subscription::ActiveModel {
229            subscription_id: Set(job_id),
230            subscription_state: Set(SubscriptionState::Created.into()),
231            ..Default::default()
232        };
233        job.update(&txn).await?;
234        txn.commit().await?;
235
236        Ok(())
237    }
238
239    pub async fn notify_create_subscription(
240        &self,
241        subscription_id: u32,
242    ) -> MetaResult<NotificationVersion> {
243        let inner = self.inner.read().await;
244        let job_id = subscription_id as i32;
245        let (subscription, obj) = Subscription::find_by_id(job_id)
246            .find_also_related(Object)
247            .filter(subscription::Column::SubscriptionState.eq(SubscriptionState::Created as i32))
248            .one(&inner.db)
249            .await?
250            .ok_or_else(|| MetaError::catalog_id_not_found("subscription", job_id))?;
251
252        let version = self
253            .notify_frontend(
254                NotificationOperation::Add,
255                NotificationInfo::ObjectGroup(PbObjectGroup {
256                    objects: vec![PbObject {
257                        object_info: PbObjectInfo::Subscription(
258                            ObjectModel(subscription, obj.unwrap()).into(),
259                        )
260                        .into(),
261                    }],
262                }),
263            )
264            .await;
265        Ok(version)
266    }
267
268    // for telemetry
269    pub async fn get_connector_usage(&self) -> MetaResult<jsonbb::Value> {
270        // get connector usage by source/sink
271        // the expect format is like:
272        // {
273        //     "source": [{
274        //         "$source_id": {
275        //             "connector": "kafka",
276        //             "format": "plain",
277        //             "encode": "json"
278        //         },
279        //     }],
280        //     "sink": [{
281        //         "$sink_id": {
282        //             "connector": "pulsar",
283        //             "format": "upsert",
284        //             "encode": "avro"
285        //         },
286        //     }],
287        // }
288
289        let inner = self.inner.read().await;
290        let source_props_and_info: Vec<(i32, Property, Option<StreamSourceInfo>)> = Source::find()
291            .select_only()
292            .column(source::Column::SourceId)
293            .column(source::Column::WithProperties)
294            .column(source::Column::SourceInfo)
295            .into_tuple()
296            .all(&inner.db)
297            .await?;
298        let sink_props_and_info: Vec<(i32, Property, Option<SinkFormatDesc>)> = Sink::find()
299            .select_only()
300            .column(sink::Column::SinkId)
301            .column(sink::Column::Properties)
302            .column(sink::Column::SinkFormatDesc)
303            .into_tuple()
304            .all(&inner.db)
305            .await?;
306        drop(inner);
307
308        let get_connector_from_property = |property: &Property| -> String {
309            property
310                .0
311                .get(UPSTREAM_SOURCE_KEY)
312                .map(|v| v.to_string())
313                .unwrap_or_default()
314        };
315
316        let source_report: Vec<jsonbb::Value> = source_props_and_info
317            .iter()
318            .map(|(oid, property, info)| {
319                let connector_name = get_connector_from_property(property);
320                let mut format = None;
321                let mut encode = None;
322                if let Some(info) = info {
323                    let pb_info = info.to_protobuf();
324                    format = Some(pb_info.format().as_str_name());
325                    encode = Some(pb_info.row_encode().as_str_name());
326                }
327                jsonbb::json!({
328                    oid.to_string(): {
329                        "connector": connector_name,
330                        "format": format,
331                        "encode": encode,
332                    },
333                })
334            })
335            .collect_vec();
336
337        let sink_report: Vec<jsonbb::Value> = sink_props_and_info
338            .iter()
339            .map(|(oid, property, info)| {
340                let connector_name = get_connector_from_property(property);
341                let mut format = None;
342                let mut encode = None;
343                if let Some(info) = info {
344                    let pb_info = info.to_protobuf();
345                    format = Some(pb_info.format().as_str_name());
346                    encode = Some(pb_info.encode().as_str_name());
347                }
348                jsonbb::json!({
349                    oid.to_string(): {
350                        "connector": connector_name,
351                        "format": format,
352                        "encode": encode,
353                    },
354                })
355            })
356            .collect_vec();
357
358        Ok(jsonbb::json!({
359                "source": source_report,
360                "sink": sink_report,
361        }))
362    }
363
364    pub async fn clean_dirty_subscription(
365        &self,
366        database_id: Option<DatabaseId>,
367    ) -> MetaResult<()> {
368        let inner = self.inner.write().await;
369        let txn = inner.db.begin().await?;
370        let filter_condition = object::Column::ObjType.eq(ObjectType::Subscription).and(
371            object::Column::Oid.not_in_subquery(
372                Query::select()
373                    .column(subscription::Column::SubscriptionId)
374                    .from(Subscription)
375                    .and_where(
376                        subscription::Column::SubscriptionState
377                            .eq(SubscriptionState::Created as i32),
378                    )
379                    .take(),
380            ),
381        );
382
383        let filter_condition = if let Some(database_id) = database_id {
384            filter_condition.and(object::Column::DatabaseId.eq(database_id))
385        } else {
386            filter_condition
387        };
388        Object::delete_many()
389            .filter(filter_condition)
390            .exec(&txn)
391            .await?;
392        txn.commit().await?;
393        // We don't need to notify the frontend, because the Init subscription is not send to frontend.
394        Ok(())
395    }
396
397    /// `clean_dirty_creating_jobs` cleans up creating jobs that are creating in Foreground mode or in Initial status.
398    pub async fn clean_dirty_creating_jobs(
399        &self,
400        database_id: Option<DatabaseId>,
401    ) -> MetaResult<Vec<SourceId>> {
402        let inner = self.inner.write().await;
403        let txn = inner.db.begin().await?;
404
405        let filter_condition = streaming_job::Column::JobStatus.eq(JobStatus::Initial).or(
406            streaming_job::Column::JobStatus
407                .eq(JobStatus::Creating)
408                .and(streaming_job::Column::CreateType.eq(CreateType::Foreground)),
409        );
410
411        let filter_condition = if let Some(database_id) = database_id {
412            filter_condition.and(object::Column::DatabaseId.eq(database_id))
413        } else {
414            filter_condition
415        };
416
417        let dirty_job_objs: Vec<PartialObject> = streaming_job::Entity::find()
418            .select_only()
419            .column(streaming_job::Column::JobId)
420            .columns([
421                object::Column::Oid,
422                object::Column::ObjType,
423                object::Column::SchemaId,
424                object::Column::DatabaseId,
425            ])
426            .join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
427            .filter(filter_condition)
428            .into_partial_model()
429            .all(&txn)
430            .await?;
431
432        let updated_table_ids = Self::clean_dirty_sink_downstreams(&txn).await?;
433        let updated_table_objs = if !updated_table_ids.is_empty() {
434            Table::find()
435                .find_also_related(Object)
436                .filter(table::Column::TableId.is_in(updated_table_ids))
437                .all(&txn)
438                .await?
439        } else {
440            vec![]
441        };
442
443        if dirty_job_objs.is_empty() {
444            if !updated_table_objs.is_empty() {
445                txn.commit().await?;
446
447                // Notify the frontend to update the table info.
448                self.notify_frontend(
449                    NotificationOperation::Update,
450                    NotificationInfo::ObjectGroup(PbObjectGroup {
451                        objects: updated_table_objs
452                            .into_iter()
453                            .map(|(t, obj)| PbObject {
454                                object_info: PbObjectInfo::Table(
455                                    ObjectModel(t, obj.unwrap()).into(),
456                                )
457                                .into(),
458                            })
459                            .collect(),
460                    }),
461                )
462                .await;
463            }
464
465            return Ok(vec![]);
466        }
467
468        self.log_cleaned_dirty_jobs(&dirty_job_objs, &txn).await?;
469
470        let dirty_job_ids = dirty_job_objs.iter().map(|obj| obj.oid).collect::<Vec<_>>();
471
472        // Filter out dummy objs for replacement.
473        // todo: we'd better introduce a new dummy object type for replacement.
474        let all_dirty_table_ids = dirty_job_objs
475            .iter()
476            .filter(|obj| obj.obj_type == ObjectType::Table)
477            .map(|obj| obj.oid)
478            .collect_vec();
479        let dirty_table_type_map: HashMap<ObjectId, TableType> = Table::find()
480            .select_only()
481            .column(table::Column::TableId)
482            .column(table::Column::TableType)
483            .filter(table::Column::TableId.is_in(all_dirty_table_ids))
484            .into_tuple::<(ObjectId, TableType)>()
485            .all(&txn)
486            .await?
487            .into_iter()
488            .collect();
489
490        // Only notify delete for failed materialized views.
491        let dirty_mview_objs = dirty_job_objs
492            .into_iter()
493            .filter(|obj| {
494                matches!(
495                    dirty_table_type_map.get(&obj.oid),
496                    Some(TableType::MaterializedView)
497                )
498            })
499            .collect_vec();
500
501        // The source ids for dirty tables with connector.
502        // FIXME: we should also clean dirty sources.
503        let dirty_associated_source_ids: Vec<SourceId> = Table::find()
504            .select_only()
505            .column(table::Column::OptionalAssociatedSourceId)
506            .filter(
507                table::Column::TableId
508                    .is_in(dirty_job_ids.clone())
509                    .and(table::Column::OptionalAssociatedSourceId.is_not_null()),
510            )
511            .into_tuple()
512            .all(&txn)
513            .await?;
514
515        let dirty_state_table_ids: Vec<TableId> = Table::find()
516            .select_only()
517            .column(table::Column::TableId)
518            .filter(table::Column::BelongsToJobId.is_in(dirty_job_ids.clone()))
519            .into_tuple()
520            .all(&txn)
521            .await?;
522
523        let dirty_mview_internal_table_objs = Object::find()
524            .select_only()
525            .columns([
526                object::Column::Oid,
527                object::Column::ObjType,
528                object::Column::SchemaId,
529                object::Column::DatabaseId,
530            ])
531            .join(JoinType::InnerJoin, object::Relation::Table.def())
532            .filter(table::Column::BelongsToJobId.is_in(dirty_mview_objs.iter().map(|obj| obj.oid)))
533            .into_partial_model()
534            .all(&txn)
535            .await?;
536
537        let to_delete_objs: HashSet<ObjectId> = dirty_job_ids
538            .clone()
539            .into_iter()
540            .chain(dirty_state_table_ids.into_iter())
541            .chain(dirty_associated_source_ids.clone().into_iter())
542            .collect();
543
544        let res = Object::delete_many()
545            .filter(object::Column::Oid.is_in(to_delete_objs))
546            .exec(&txn)
547            .await?;
548        assert!(res.rows_affected > 0);
549
550        txn.commit().await?;
551
552        let object_group = build_object_group_for_delete(
553            dirty_mview_objs
554                .into_iter()
555                .chain(dirty_mview_internal_table_objs.into_iter())
556                .collect_vec(),
557        );
558
559        let _version = self
560            .notify_frontend(NotificationOperation::Delete, object_group)
561            .await;
562
563        // Notify the frontend to update the table info.
564        if !updated_table_objs.is_empty() {
565            self.notify_frontend(
566                NotificationOperation::Update,
567                NotificationInfo::ObjectGroup(PbObjectGroup {
568                    objects: updated_table_objs
569                        .into_iter()
570                        .map(|(t, obj)| PbObject {
571                            object_info: PbObjectInfo::Table(ObjectModel(t, obj.unwrap()).into())
572                                .into(),
573                        })
574                        .collect(),
575                }),
576            )
577            .await;
578        }
579
580        Ok(dirty_associated_source_ids)
581    }
582
583    pub async fn comment_on(&self, comment: PbComment) -> MetaResult<NotificationVersion> {
584        let inner = self.inner.write().await;
585        let txn = inner.db.begin().await?;
586        ensure_object_id(ObjectType::Database, comment.database_id as _, &txn).await?;
587        ensure_object_id(ObjectType::Schema, comment.schema_id as _, &txn).await?;
588        let table_obj = Object::find_by_id(comment.table_id as ObjectId)
589            .one(&txn)
590            .await?
591            .ok_or_else(|| MetaError::catalog_id_not_found("table", comment.table_id))?;
592
593        let table = if let Some(col_idx) = comment.column_index {
594            let columns: ColumnCatalogArray = Table::find_by_id(comment.table_id as TableId)
595                .select_only()
596                .column(table::Column::Columns)
597                .into_tuple()
598                .one(&txn)
599                .await?
600                .ok_or_else(|| MetaError::catalog_id_not_found("table", comment.table_id))?;
601            let mut pb_columns = columns.to_protobuf();
602
603            let column = pb_columns
604                .get_mut(col_idx as usize)
605                .ok_or_else(|| MetaError::catalog_id_not_found("column", col_idx))?;
606            let column_desc = column.column_desc.as_mut().ok_or_else(|| {
607                anyhow!(
608                    "column desc at index {} for table id {} not found",
609                    col_idx,
610                    comment.table_id
611                )
612            })?;
613            column_desc.description = comment.description;
614            table::ActiveModel {
615                table_id: Set(comment.table_id as _),
616                columns: Set(pb_columns.into()),
617                ..Default::default()
618            }
619            .update(&txn)
620            .await?
621        } else {
622            table::ActiveModel {
623                table_id: Set(comment.table_id as _),
624                description: Set(comment.description),
625                ..Default::default()
626            }
627            .update(&txn)
628            .await?
629        };
630        txn.commit().await?;
631
632        let version = self
633            .notify_frontend_relation_info(
634                NotificationOperation::Update,
635                PbObjectInfo::Table(ObjectModel(table, table_obj).into()),
636            )
637            .await;
638
639        Ok(version)
640    }
641
642    pub async fn complete_dropped_tables(
643        &self,
644        table_ids: impl Iterator<Item = TableId>,
645    ) -> Vec<PbTable> {
646        let mut inner = self.inner.write().await;
647        inner.complete_dropped_tables(table_ids)
648    }
649}
650
651/// `CatalogStats` is a struct to store the statistics of all catalogs.
652pub struct CatalogStats {
653    pub table_num: u64,
654    pub mview_num: u64,
655    pub index_num: u64,
656    pub source_num: u64,
657    pub sink_num: u64,
658    pub function_num: u64,
659    pub streaming_job_num: u64,
660    pub actor_num: u64,
661}
662
663impl CatalogControllerInner {
664    pub async fn snapshot(&self) -> MetaResult<(Catalog, Vec<PbUserInfo>)> {
665        let databases = self.list_databases().await?;
666        let schemas = self.list_schemas().await?;
667        let tables = self.list_tables().await?;
668        let sources = self.list_sources().await?;
669        let sinks = self.list_sinks().await?;
670        let subscriptions = self.list_subscriptions().await?;
671        let indexes = self.list_indexes().await?;
672        let views = self.list_views().await?;
673        let functions = self.list_functions().await?;
674        let connections = self.list_connections().await?;
675        let secrets = self.list_secrets().await?;
676
677        let users = self.list_users().await?;
678
679        Ok((
680            (
681                databases,
682                schemas,
683                tables,
684                sources,
685                sinks,
686                subscriptions,
687                indexes,
688                views,
689                functions,
690                connections,
691                secrets,
692            ),
693            users,
694        ))
695    }
696
697    pub async fn stats(&self) -> MetaResult<CatalogStats> {
698        let mut table_num_map: HashMap<_, _> = Table::find()
699            .select_only()
700            .column(table::Column::TableType)
701            .column_as(table::Column::TableId.count(), "num")
702            .group_by(table::Column::TableType)
703            .having(table::Column::TableType.ne(TableType::Internal))
704            .into_tuple::<(TableType, i64)>()
705            .all(&self.db)
706            .await?
707            .into_iter()
708            .map(|(table_type, num)| (table_type, num as u64))
709            .collect();
710
711        let source_num = Source::find().count(&self.db).await?;
712        let sink_num = Sink::find().count(&self.db).await?;
713        let function_num = Function::find().count(&self.db).await?;
714        let streaming_job_num = StreamingJob::find().count(&self.db).await?;
715        let actor_num = Actor::find().count(&self.db).await?;
716
717        Ok(CatalogStats {
718            table_num: table_num_map.remove(&TableType::Table).unwrap_or(0),
719            mview_num: table_num_map
720                .remove(&TableType::MaterializedView)
721                .unwrap_or(0),
722            index_num: table_num_map.remove(&TableType::Index).unwrap_or(0),
723            source_num,
724            sink_num,
725            function_num,
726            streaming_job_num,
727            actor_num,
728        })
729    }
730
731    async fn list_databases(&self) -> MetaResult<Vec<PbDatabase>> {
732        let db_objs = Database::find()
733            .find_also_related(Object)
734            .all(&self.db)
735            .await?;
736        Ok(db_objs
737            .into_iter()
738            .map(|(db, obj)| ObjectModel(db, obj.unwrap()).into())
739            .collect())
740    }
741
742    async fn list_schemas(&self) -> MetaResult<Vec<PbSchema>> {
743        let schema_objs = Schema::find()
744            .find_also_related(Object)
745            .all(&self.db)
746            .await?;
747
748        Ok(schema_objs
749            .into_iter()
750            .map(|(schema, obj)| ObjectModel(schema, obj.unwrap()).into())
751            .collect())
752    }
753
754    async fn list_users(&self) -> MetaResult<Vec<PbUserInfo>> {
755        let mut user_infos: Vec<PbUserInfo> = User::find()
756            .all(&self.db)
757            .await?
758            .into_iter()
759            .map(Into::into)
760            .collect();
761
762        for user_info in &mut user_infos {
763            user_info.grant_privileges = get_user_privilege(user_info.id as _, &self.db).await?;
764        }
765        Ok(user_infos)
766    }
767
768    /// `list_all_tables` return all tables and internal tables.
769    pub async fn list_all_state_tables(&self) -> MetaResult<Vec<PbTable>> {
770        let table_objs = Table::find()
771            .find_also_related(Object)
772            .all(&self.db)
773            .await?;
774
775        Ok(table_objs
776            .into_iter()
777            .map(|(table, obj)| ObjectModel(table, obj.unwrap()).into())
778            .collect())
779    }
780
781    /// `list_tables` return all `CREATED` tables, `CREATING` materialized views and internal tables that belong to them.
782    async fn list_tables(&self) -> MetaResult<Vec<PbTable>> {
783        let table_objs = Table::find()
784            .find_also_related(Object)
785            .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
786            .filter(
787                streaming_job::Column::JobStatus
788                    .eq(JobStatus::Created)
789                    .or(table::Column::TableType.eq(TableType::MaterializedView)),
790            )
791            .all(&self.db)
792            .await?;
793
794        let created_streaming_job_ids: Vec<ObjectId> = StreamingJob::find()
795            .select_only()
796            .column(streaming_job::Column::JobId)
797            .filter(streaming_job::Column::JobStatus.eq(JobStatus::Created))
798            .into_tuple()
799            .all(&self.db)
800            .await?;
801
802        let job_ids: HashSet<ObjectId> = table_objs
803            .iter()
804            .map(|(t, _)| t.table_id)
805            .chain(created_streaming_job_ids.iter().cloned())
806            .collect();
807
808        let internal_table_objs = Table::find()
809            .find_also_related(Object)
810            .filter(
811                table::Column::TableType
812                    .eq(TableType::Internal)
813                    .and(table::Column::BelongsToJobId.is_in(job_ids)),
814            )
815            .all(&self.db)
816            .await?;
817
818        Ok(table_objs
819            .into_iter()
820            .chain(internal_table_objs.into_iter())
821            .map(|(table, obj)| {
822                // Correctly set the stream job status for creating materialized views and internal tables.
823                let is_created = created_streaming_job_ids.contains(&table.table_id)
824                    || (table.table_type == TableType::Internal
825                        && created_streaming_job_ids.contains(&table.belongs_to_job_id.unwrap()));
826                let mut pb_table: PbTable = ObjectModel(table, obj.unwrap()).into();
827                pb_table.stream_job_status = if is_created {
828                    PbStreamJobStatus::Created.into()
829                } else {
830                    PbStreamJobStatus::Creating.into()
831                };
832                pb_table
833            })
834            .collect())
835    }
836
837    /// `list_sources` return all sources and `CREATED` ones if contains any streaming jobs.
838    async fn list_sources(&self) -> MetaResult<Vec<PbSource>> {
839        let mut source_objs = Source::find()
840            .find_also_related(Object)
841            .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
842            .filter(
843                streaming_job::Column::JobStatus
844                    .is_null()
845                    .or(streaming_job::Column::JobStatus.eq(JobStatus::Created)),
846            )
847            .all(&self.db)
848            .await?;
849
850        // filter out inner connector sources that are still under creating.
851        let created_table_ids: HashSet<TableId> = Table::find()
852            .select_only()
853            .column(table::Column::TableId)
854            .join(JoinType::InnerJoin, table::Relation::Object1.def())
855            .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
856            .filter(
857                table::Column::OptionalAssociatedSourceId
858                    .is_not_null()
859                    .and(streaming_job::Column::JobStatus.eq(JobStatus::Created)),
860            )
861            .into_tuple()
862            .all(&self.db)
863            .await?
864            .into_iter()
865            .collect();
866        source_objs.retain_mut(|(source, _)| {
867            source.optional_associated_table_id.is_none()
868                || created_table_ids.contains(&source.optional_associated_table_id.unwrap())
869        });
870
871        Ok(source_objs
872            .into_iter()
873            .map(|(source, obj)| ObjectModel(source, obj.unwrap()).into())
874            .collect())
875    }
876
877    /// `list_sinks` return all `CREATED` sinks.
878    async fn list_sinks(&self) -> MetaResult<Vec<PbSink>> {
879        let sink_objs = Sink::find()
880            .find_also_related(Object)
881            .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
882            .filter(streaming_job::Column::JobStatus.eq(JobStatus::Created))
883            .all(&self.db)
884            .await?;
885
886        Ok(sink_objs
887            .into_iter()
888            .map(|(sink, obj)| ObjectModel(sink, obj.unwrap()).into())
889            .collect())
890    }
891
892    /// `list_subscriptions` return all `CREATED` subscriptions.
893    async fn list_subscriptions(&self) -> MetaResult<Vec<PbSubscription>> {
894        let subscription_objs = Subscription::find()
895            .find_also_related(Object)
896            .filter(subscription::Column::SubscriptionState.eq(SubscriptionState::Created as i32))
897            .all(&self.db)
898            .await?;
899
900        Ok(subscription_objs
901            .into_iter()
902            .map(|(subscription, obj)| ObjectModel(subscription, obj.unwrap()).into())
903            .collect())
904    }
905
906    async fn list_views(&self) -> MetaResult<Vec<PbView>> {
907        let view_objs = View::find().find_also_related(Object).all(&self.db).await?;
908
909        Ok(view_objs
910            .into_iter()
911            .map(|(view, obj)| ObjectModel(view, obj.unwrap()).into())
912            .collect())
913    }
914
915    /// `list_indexes` return all `CREATED` indexes.
916    async fn list_indexes(&self) -> MetaResult<Vec<PbIndex>> {
917        let index_objs = Index::find()
918            .find_also_related(Object)
919            .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
920            .filter(streaming_job::Column::JobStatus.eq(JobStatus::Created))
921            .all(&self.db)
922            .await?;
923
924        Ok(index_objs
925            .into_iter()
926            .map(|(index, obj)| ObjectModel(index, obj.unwrap()).into())
927            .collect())
928    }
929
930    async fn list_connections(&self) -> MetaResult<Vec<PbConnection>> {
931        let conn_objs = Connection::find()
932            .find_also_related(Object)
933            .all(&self.db)
934            .await?;
935
936        Ok(conn_objs
937            .into_iter()
938            .map(|(conn, obj)| ObjectModel(conn, obj.unwrap()).into())
939            .collect())
940    }
941
942    pub async fn list_secrets(&self) -> MetaResult<Vec<PbSecret>> {
943        let secret_objs = Secret::find()
944            .find_also_related(Object)
945            .all(&self.db)
946            .await?;
947        Ok(secret_objs
948            .into_iter()
949            .map(|(secret, obj)| ObjectModel(secret, obj.unwrap()).into())
950            .collect())
951    }
952
953    async fn list_functions(&self) -> MetaResult<Vec<PbFunction>> {
954        let func_objs = Function::find()
955            .find_also_related(Object)
956            .all(&self.db)
957            .await?;
958
959        Ok(func_objs
960            .into_iter()
961            .map(|(func, obj)| ObjectModel(func, obj.unwrap()).into())
962            .collect())
963    }
964
965    pub(crate) fn register_finish_notifier(
966        &mut self,
967        database_id: DatabaseId,
968        id: ObjectId,
969        sender: Sender<Result<NotificationVersion, String>>,
970    ) {
971        self.creating_table_finish_notifier
972            .entry(database_id)
973            .or_default()
974            .entry(id)
975            .or_default()
976            .push(sender);
977    }
978
979    pub(crate) async fn streaming_job_is_finished(&mut self, id: i32) -> MetaResult<bool> {
980        let status = StreamingJob::find()
981            .select_only()
982            .column(streaming_job::Column::JobStatus)
983            .filter(streaming_job::Column::JobId.eq(id))
984            .into_tuple::<JobStatus>()
985            .one(&self.db)
986            .await?;
987
988        status
989            .map(|status| status == JobStatus::Created)
990            .ok_or_else(|| {
991                MetaError::catalog_id_not_found("streaming job", "may have been cancelled/dropped")
992            })
993    }
994
995    pub(crate) fn notify_finish_failed(&mut self, database_id: Option<DatabaseId>, err: String) {
996        if let Some(database_id) = database_id {
997            if let Some(creating_tables) = self.creating_table_finish_notifier.remove(&database_id)
998            {
999                for tx in creating_tables.into_values().flatten() {
1000                    let _ = tx.send(Err(err.clone()));
1001                }
1002            }
1003        } else {
1004            for tx in take(&mut self.creating_table_finish_notifier)
1005                .into_values()
1006                .flatten()
1007                .flat_map(|(_, txs)| txs.into_iter())
1008            {
1009                let _ = tx.send(Err(err.clone()));
1010            }
1011        }
1012    }
1013
1014    pub async fn list_time_travel_table_ids(&self) -> MetaResult<Vec<TableId>> {
1015        let table_ids: Vec<TableId> = Table::find()
1016            .select_only()
1017            .filter(table::Column::TableType.is_in(vec![
1018                TableType::Table,
1019                TableType::MaterializedView,
1020                TableType::Index,
1021            ]))
1022            .column(table::Column::TableId)
1023            .into_tuple()
1024            .all(&self.db)
1025            .await?;
1026        Ok(table_ids)
1027    }
1028
1029    /// Since the tables have been dropped from both meta store and streaming jobs, this method removes those table copies.
1030    /// Returns the removed table copies.
1031    pub(crate) fn complete_dropped_tables(
1032        &mut self,
1033        table_ids: impl Iterator<Item = TableId>,
1034    ) -> Vec<PbTable> {
1035        table_ids
1036            .filter_map(|table_id| {
1037                self.dropped_tables.remove(&table_id).map_or_else(
1038                    || {
1039                        tracing::warn!(table_id, "table not found");
1040                        None
1041                    },
1042                    Some,
1043                )
1044            })
1045            .collect()
1046    }
1047}