risingwave_meta/controller/catalog/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod alter_op;
16mod create_op;
17mod drop_op;
18mod get_op;
19mod list_op;
20mod test;
21mod util;
22
23use std::collections::{BTreeSet, HashMap, HashSet};
24use std::iter;
25use std::mem::take;
26use std::sync::Arc;
27
28use anyhow::{Context, anyhow};
29use itertools::Itertools;
30use risingwave_common::catalog::{
31    DEFAULT_SCHEMA_NAME, FragmentTypeFlag, SYSTEM_SCHEMAS, TableOption,
32};
33use risingwave_common::current_cluster_version;
34use risingwave_common::id::JobId;
35use risingwave_common::secret::LocalSecretManager;
36use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont_mut;
37use risingwave_connector::source::UPSTREAM_SOURCE_KEY;
38use risingwave_connector::source::cdc::build_cdc_table_id;
39use risingwave_meta_model::object::ObjectType;
40use risingwave_meta_model::prelude::*;
41use risingwave_meta_model::table::{RefreshState, TableType};
42use risingwave_meta_model::{
43    ActorId, ColumnCatalogArray, ConnectionId, CreateType, DatabaseId, FragmentId, I32Array,
44    IndexId, JobStatus, ObjectId, Property, SchemaId, SecretId, SinkFormatDesc, SinkId, SourceId,
45    StreamNode, StreamSourceInfo, StreamingParallelism, SubscriptionId, TableId, UserId, ViewId,
46    connection, database, fragment, function, index, object, object_dependency, schema, secret,
47    sink, source, streaming_job, subscription, table, user_privilege, view,
48};
49use risingwave_pb::catalog::connection::Info as ConnectionInfo;
50use risingwave_pb::catalog::subscription::SubscriptionState;
51use risingwave_pb::catalog::table::PbTableType;
52use risingwave_pb::catalog::{
53    PbComment, PbConnection, PbDatabase, PbFunction, PbIndex, PbSchema, PbSecret, PbSink, PbSource,
54    PbStreamJobStatus, PbSubscription, PbTable, PbView,
55};
56use risingwave_pb::meta::cancel_creating_jobs_request::PbCreatingJobInfo;
57use risingwave_pb::meta::list_object_dependencies_response::PbObjectDependencies;
58use risingwave_pb::meta::object::PbObjectInfo;
59use risingwave_pb::meta::subscribe_response::{
60    Info as NotificationInfo, Info, Operation as NotificationOperation, Operation,
61};
62use risingwave_pb::meta::{PbObject, PbObjectGroup};
63use risingwave_pb::stream_plan::stream_node::NodeBody;
64use risingwave_pb::telemetry::PbTelemetryEventStage;
65use risingwave_pb::user::PbUserInfo;
66use sea_orm::ActiveValue::Set;
67use sea_orm::sea_query::{Expr, Query, SimpleExpr};
68use sea_orm::{
69    ActiveModelTrait, ColumnTrait, DatabaseConnection, DatabaseTransaction, EntityTrait,
70    IntoActiveModel, JoinType, PaginatorTrait, QueryFilter, QuerySelect, RelationTrait,
71    SelectColumns, TransactionTrait, Value,
72};
73use tokio::sync::oneshot::Sender;
74use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
75use tracing::info;
76
77use super::utils::{
78    check_subscription_name_duplicate, get_internal_tables_by_id, rename_relation,
79    rename_relation_refer,
80};
81use crate::controller::ObjectModel;
82use crate::controller::catalog::util::update_internal_tables;
83use crate::controller::utils::*;
84use crate::manager::{
85    IGNORED_NOTIFICATION_VERSION, MetaSrvEnv, NotificationVersion,
86    get_referred_connection_ids_from_source, get_referred_secret_ids_from_source,
87};
88use crate::rpc::ddl_controller::DropMode;
89use crate::telemetry::{MetaTelemetryJobDesc, report_event};
90use crate::{MetaError, MetaResult};
91
92pub type Catalog = (
93    Vec<PbDatabase>,
94    Vec<PbSchema>,
95    Vec<PbTable>,
96    Vec<PbSource>,
97    Vec<PbSink>,
98    Vec<PbSubscription>,
99    Vec<PbIndex>,
100    Vec<PbView>,
101    Vec<PbFunction>,
102    Vec<PbConnection>,
103    Vec<PbSecret>,
104);
105
106pub type CatalogControllerRef = Arc<CatalogController>;
107
108/// `CatalogController` is the controller for catalog related operations, including database, schema, table, view, etc.
109pub struct CatalogController {
110    pub(crate) env: MetaSrvEnv,
111    pub(crate) inner: RwLock<CatalogControllerInner>,
112}
113
114#[derive(Clone, Default, Debug)]
115pub struct DropTableConnectorContext {
116    // we only apply one drop connector action for one table each time, so no need to vector here
117    pub(crate) to_change_streaming_job_id: JobId,
118    pub(crate) to_remove_state_table_id: TableId,
119    pub(crate) to_remove_source_id: SourceId,
120}
121
122#[derive(Clone, Default, Debug)]
123pub struct ReleaseContext {
124    pub(crate) database_id: DatabaseId,
125    pub(crate) removed_streaming_job_ids: Vec<JobId>,
126    /// Dropped state table list, need to unregister from hummock.
127    pub(crate) removed_state_table_ids: Vec<TableId>,
128
129    /// Dropped secrets, need to remove from secret manager.
130    pub(crate) removed_secret_ids: Vec<SecretId>,
131    /// Dropped sources (when `DROP SOURCE`), need to unregister from source manager.
132    pub(crate) removed_source_ids: Vec<SourceId>,
133    /// Dropped Source fragments (when `DROP MATERIALIZED VIEW` referencing sources),
134    /// need to unregister from source manager.
135    pub(crate) removed_source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
136
137    pub(crate) removed_actors: HashSet<ActorId>,
138    pub(crate) removed_fragments: HashSet<FragmentId>,
139
140    /// Removed sink fragment by target fragment.
141    pub(crate) removed_sink_fragment_by_targets: HashMap<FragmentId, Vec<FragmentId>>,
142
143    /// Dropped iceberg table sinks
144    pub(crate) removed_iceberg_table_sinks: Vec<PbSink>,
145}
146
147impl CatalogController {
148    pub async fn new(env: MetaSrvEnv) -> MetaResult<Self> {
149        let meta_store = env.meta_store();
150        let catalog_controller = Self {
151            env,
152            inner: RwLock::new(CatalogControllerInner {
153                db: meta_store.conn,
154                creating_table_finish_notifier: HashMap::new(),
155                dropped_tables: HashMap::new(),
156            }),
157        };
158
159        catalog_controller.init().await?;
160        Ok(catalog_controller)
161    }
162
163    /// Used in `NotificationService::subscribe`.
164    /// Need to pay attention to the order of acquiring locks to prevent deadlock problems.
165    pub async fn get_inner_read_guard(&self) -> RwLockReadGuard<'_, CatalogControllerInner> {
166        self.inner.read().await
167    }
168
169    pub async fn get_inner_write_guard(&self) -> RwLockWriteGuard<'_, CatalogControllerInner> {
170        self.inner.write().await
171    }
172}
173
174pub struct CatalogControllerInner {
175    pub(crate) db: DatabaseConnection,
176    /// Registered finish notifiers for creating tables.
177    ///
178    /// `DdlController` will update this map, and pass the `tx` side to `CatalogController`.
179    /// On notifying, we can remove the entry from this map.
180    #[expect(clippy::type_complexity)]
181    pub creating_table_finish_notifier:
182        HashMap<DatabaseId, HashMap<JobId, Vec<Sender<Result<NotificationVersion, String>>>>>,
183    /// Tables have been dropped from the meta store, but the corresponding barrier remains unfinished.
184    pub dropped_tables: HashMap<TableId, PbTable>,
185}
186
187impl CatalogController {
188    pub(crate) async fn notify_frontend(
189        &self,
190        operation: NotificationOperation,
191        info: NotificationInfo,
192    ) -> NotificationVersion {
193        self.env
194            .notification_manager()
195            .notify_frontend(operation, info)
196            .await
197    }
198
199    pub(crate) async fn notify_frontend_relation_info(
200        &self,
201        operation: NotificationOperation,
202        relation_info: PbObjectInfo,
203    ) -> NotificationVersion {
204        self.env
205            .notification_manager()
206            .notify_frontend_object_info(operation, relation_info)
207            .await
208    }
209
210    pub(crate) async fn current_notification_version(&self) -> NotificationVersion {
211        self.env.notification_manager().current_version().await
212    }
213}
214
215impl CatalogController {
216    pub async fn finish_create_subscription_catalog(
217        &self,
218        subscription_id: SubscriptionId,
219    ) -> MetaResult<()> {
220        let inner = self.inner.write().await;
221        let txn = inner.db.begin().await?;
222
223        // update `created_at` as now() and `created_at_cluster_version` as current cluster version.
224        let res = Object::update_many()
225            .col_expr(object::Column::CreatedAt, Expr::current_timestamp().into())
226            .col_expr(
227                object::Column::CreatedAtClusterVersion,
228                current_cluster_version().into(),
229            )
230            .filter(object::Column::Oid.eq(subscription_id))
231            .exec(&txn)
232            .await?;
233        if res.rows_affected == 0 {
234            return Err(MetaError::catalog_id_not_found(
235                "subscription",
236                subscription_id,
237            ));
238        }
239
240        // mark the target subscription as `Create`.
241        let job = subscription::ActiveModel {
242            subscription_id: Set(subscription_id),
243            subscription_state: Set(SubscriptionState::Created.into()),
244            ..Default::default()
245        };
246        job.update(&txn).await?;
247
248        let _ = grant_default_privileges_automatically(&txn, subscription_id).await?;
249
250        txn.commit().await?;
251
252        Ok(())
253    }
254
255    pub async fn notify_create_subscription(
256        &self,
257        subscription_id: SubscriptionId,
258    ) -> MetaResult<NotificationVersion> {
259        let inner = self.inner.read().await;
260        let (subscription, obj) = Subscription::find_by_id(subscription_id)
261            .find_also_related(Object)
262            .filter(subscription::Column::SubscriptionState.eq(SubscriptionState::Created as i32))
263            .one(&inner.db)
264            .await?
265            .ok_or_else(|| MetaError::catalog_id_not_found("subscription", subscription_id))?;
266
267        let mut version = self
268            .notify_frontend(
269                NotificationOperation::Add,
270                NotificationInfo::ObjectGroup(PbObjectGroup {
271                    objects: vec![PbObject {
272                        object_info: PbObjectInfo::Subscription(
273                            ObjectModel(subscription, obj.unwrap()).into(),
274                        )
275                        .into(),
276                    }],
277                }),
278            )
279            .await;
280
281        // notify default privileges about the new subscription
282        let updated_user_ids: Vec<UserId> = UserPrivilege::find()
283            .select_only()
284            .distinct()
285            .column(user_privilege::Column::UserId)
286            .filter(user_privilege::Column::Oid.eq(subscription_id.as_object_id()))
287            .into_tuple()
288            .all(&inner.db)
289            .await?;
290
291        if !updated_user_ids.is_empty() {
292            let updated_user_infos = list_user_info_by_ids(updated_user_ids, &inner.db).await?;
293            version = self.notify_users_update(updated_user_infos).await;
294        }
295
296        Ok(version)
297    }
298
299    // for telemetry
300    pub async fn get_connector_usage(&self) -> MetaResult<jsonbb::Value> {
301        // get connector usage by source/sink
302        // the expect format is like:
303        // {
304        //     "source": [{
305        //         "$source_id": {
306        //             "connector": "kafka",
307        //             "format": "plain",
308        //             "encode": "json"
309        //         },
310        //     }],
311        //     "sink": [{
312        //         "$sink_id": {
313        //             "connector": "pulsar",
314        //             "format": "upsert",
315        //             "encode": "avro"
316        //         },
317        //     }],
318        // }
319
320        let inner = self.inner.read().await;
321        let source_props_and_info: Vec<(i32, Property, Option<StreamSourceInfo>)> = Source::find()
322            .select_only()
323            .column(source::Column::SourceId)
324            .column(source::Column::WithProperties)
325            .column(source::Column::SourceInfo)
326            .into_tuple()
327            .all(&inner.db)
328            .await?;
329        let sink_props_and_info: Vec<(i32, Property, Option<SinkFormatDesc>)> = Sink::find()
330            .select_only()
331            .column(sink::Column::SinkId)
332            .column(sink::Column::Properties)
333            .column(sink::Column::SinkFormatDesc)
334            .into_tuple()
335            .all(&inner.db)
336            .await?;
337        drop(inner);
338
339        let get_connector_from_property = |property: &Property| -> String {
340            property
341                .0
342                .get(UPSTREAM_SOURCE_KEY)
343                .cloned()
344                .unwrap_or_default()
345        };
346
347        let source_report: Vec<jsonbb::Value> = source_props_and_info
348            .iter()
349            .map(|(oid, property, info)| {
350                let connector_name = get_connector_from_property(property);
351                let mut format = None;
352                let mut encode = None;
353                if let Some(info) = info {
354                    let pb_info = info.to_protobuf();
355                    format = Some(pb_info.format().as_str_name());
356                    encode = Some(pb_info.row_encode().as_str_name());
357                }
358                jsonbb::json!({
359                    oid.to_string(): {
360                        "connector": connector_name,
361                        "format": format,
362                        "encode": encode,
363                    },
364                })
365            })
366            .collect_vec();
367
368        let sink_report: Vec<jsonbb::Value> = sink_props_and_info
369            .iter()
370            .map(|(oid, property, info)| {
371                let connector_name = get_connector_from_property(property);
372                let mut format = None;
373                let mut encode = None;
374                if let Some(info) = info {
375                    let pb_info = info.to_protobuf();
376                    format = Some(pb_info.format().as_str_name());
377                    encode = Some(pb_info.encode().as_str_name());
378                }
379                jsonbb::json!({
380                    oid.to_string(): {
381                        "connector": connector_name,
382                        "format": format,
383                        "encode": encode,
384                    },
385                })
386            })
387            .collect_vec();
388
389        Ok(jsonbb::json!({
390                "source": source_report,
391                "sink": sink_report,
392        }))
393    }
394
395    pub async fn clean_dirty_subscription(
396        &self,
397        database_id: Option<DatabaseId>,
398    ) -> MetaResult<()> {
399        let inner = self.inner.write().await;
400        let txn = inner.db.begin().await?;
401        let filter_condition = object::Column::ObjType.eq(ObjectType::Subscription).and(
402            object::Column::Oid.not_in_subquery(
403                Query::select()
404                    .column(subscription::Column::SubscriptionId)
405                    .from(Subscription)
406                    .and_where(
407                        subscription::Column::SubscriptionState
408                            .eq(SubscriptionState::Created as i32),
409                    )
410                    .take(),
411            ),
412        );
413
414        let filter_condition = if let Some(database_id) = database_id {
415            filter_condition.and(object::Column::DatabaseId.eq(database_id))
416        } else {
417            filter_condition
418        };
419        Object::delete_many()
420            .filter(filter_condition)
421            .exec(&txn)
422            .await?;
423        txn.commit().await?;
424        // We don't need to notify the frontend, because the Init subscription is not send to frontend.
425        Ok(())
426    }
427
428    /// `clean_dirty_creating_jobs` cleans up creating jobs that are creating in Foreground mode or in Initial status.
429    pub async fn clean_dirty_creating_jobs(
430        &self,
431        database_id: Option<DatabaseId>,
432    ) -> MetaResult<Vec<SourceId>> {
433        let inner = self.inner.write().await;
434        let txn = inner.db.begin().await?;
435
436        let filter_condition = streaming_job::Column::JobStatus.eq(JobStatus::Initial).or(
437            streaming_job::Column::JobStatus
438                .eq(JobStatus::Creating)
439                .and(streaming_job::Column::CreateType.eq(CreateType::Foreground)),
440        );
441
442        let filter_condition = if let Some(database_id) = database_id {
443            filter_condition.and(object::Column::DatabaseId.eq(database_id))
444        } else {
445            filter_condition
446        };
447
448        let mut dirty_job_objs: Vec<PartialObject> = streaming_job::Entity::find()
449            .select_only()
450            .column(streaming_job::Column::JobId)
451            .columns([
452                object::Column::Oid,
453                object::Column::ObjType,
454                object::Column::SchemaId,
455                object::Column::DatabaseId,
456            ])
457            .join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
458            .filter(filter_condition)
459            .into_partial_model()
460            .all(&txn)
461            .await?;
462
463        // Check if there are any pending iceberg table jobs.
464        let dirty_iceberg_jobs = find_dirty_iceberg_table_jobs(&txn, database_id).await?;
465        if !dirty_iceberg_jobs.is_empty() {
466            dirty_job_objs.extend(dirty_iceberg_jobs);
467        }
468
469        Self::clean_dirty_sink_downstreams(&txn).await?;
470
471        if dirty_job_objs.is_empty() {
472            return Ok(vec![]);
473        }
474
475        self.log_cleaned_dirty_jobs(&dirty_job_objs, &txn).await?;
476
477        let dirty_job_ids = dirty_job_objs.iter().map(|obj| obj.oid).collect::<Vec<_>>();
478
479        // Filter out dummy objs for replacement.
480        // todo: we'd better introduce a new dummy object type for replacement.
481        let all_dirty_table_ids = dirty_job_objs
482            .iter()
483            .filter(|obj| obj.obj_type == ObjectType::Table)
484            .map(|obj| obj.oid)
485            .collect_vec();
486        let dirty_table_type_map: HashMap<ObjectId, TableType> = Table::find()
487            .select_only()
488            .column(table::Column::TableId)
489            .column(table::Column::TableType)
490            .filter(table::Column::TableId.is_in(all_dirty_table_ids))
491            .into_tuple::<(ObjectId, TableType)>()
492            .all(&txn)
493            .await?
494            .into_iter()
495            .collect();
496
497        let dirty_background_jobs: HashSet<ObjectId> = streaming_job::Entity::find()
498            .select_only()
499            .column(streaming_job::Column::JobId)
500            .filter(
501                streaming_job::Column::JobId
502                    .is_in(dirty_job_ids.clone())
503                    .and(streaming_job::Column::CreateType.eq(CreateType::Background)),
504            )
505            .into_tuple()
506            .all(&txn)
507            .await?
508            .into_iter()
509            .collect();
510
511        // notify delete for failed materialized views and background jobs.
512        let to_notify_objs = dirty_job_objs
513            .into_iter()
514            .filter(|obj| {
515                matches!(
516                    dirty_table_type_map.get(&obj.oid),
517                    Some(TableType::MaterializedView)
518                ) || dirty_background_jobs.contains(&obj.oid)
519            })
520            .collect_vec();
521
522        // The source ids for dirty tables with connector.
523        // FIXME: we should also clean dirty sources.
524        let dirty_associated_source_ids: Vec<SourceId> = Table::find()
525            .select_only()
526            .column(table::Column::OptionalAssociatedSourceId)
527            .filter(
528                table::Column::TableId
529                    .is_in(dirty_job_ids.clone())
530                    .and(table::Column::OptionalAssociatedSourceId.is_not_null()),
531            )
532            .into_tuple()
533            .all(&txn)
534            .await?;
535
536        let dirty_state_table_ids: Vec<TableId> = Table::find()
537            .select_only()
538            .column(table::Column::TableId)
539            .filter(table::Column::BelongsToJobId.is_in(dirty_job_ids.clone()))
540            .into_tuple()
541            .all(&txn)
542            .await?;
543
544        let dirty_internal_table_objs = Object::find()
545            .select_only()
546            .columns([
547                object::Column::Oid,
548                object::Column::ObjType,
549                object::Column::SchemaId,
550                object::Column::DatabaseId,
551            ])
552            .join(JoinType::InnerJoin, object::Relation::Table.def())
553            .filter(table::Column::BelongsToJobId.is_in(to_notify_objs.iter().map(|obj| obj.oid)))
554            .into_partial_model()
555            .all(&txn)
556            .await?;
557
558        let to_delete_objs: HashSet<ObjectId> = dirty_job_ids
559            .clone()
560            .into_iter()
561            .chain(
562                dirty_state_table_ids
563                    .into_iter()
564                    .map(|table_id| table_id.as_object_id()),
565            )
566            .chain(
567                dirty_associated_source_ids
568                    .iter()
569                    .map(|source_id| source_id.as_object_id()),
570            )
571            .collect();
572
573        let res = Object::delete_many()
574            .filter(object::Column::Oid.is_in(to_delete_objs))
575            .exec(&txn)
576            .await?;
577        assert!(res.rows_affected > 0);
578
579        txn.commit().await?;
580
581        let object_group = build_object_group_for_delete(
582            to_notify_objs
583                .into_iter()
584                .chain(dirty_internal_table_objs.into_iter())
585                .collect_vec(),
586        );
587
588        let _version = self
589            .notify_frontend(NotificationOperation::Delete, object_group)
590            .await;
591
592        Ok(dirty_associated_source_ids)
593    }
594
595    /// On recovery, reset refreshable table's `refresh_state` to a reasonable state.
596    pub async fn reset_refreshing_tables(&self, database_id: Option<DatabaseId>) -> MetaResult<()> {
597        let inner = self.inner.write().await;
598        let txn = inner.db.begin().await?;
599
600        // IDLE: no change
601        // REFRESHING: reset to IDLE (give up refresh for allowing re-trigger)
602        // FINISHING: no change (materialize executor will recover from state table and continue)
603        let filter_condition = table::Column::RefreshState.eq(RefreshState::Refreshing);
604
605        let filter_condition = if let Some(database_id) = database_id {
606            filter_condition.and(object::Column::DatabaseId.eq(database_id))
607        } else {
608            filter_condition
609        };
610
611        let table_ids: Vec<TableId> = Table::find()
612            .find_also_related(Object)
613            .filter(filter_condition)
614            .all(&txn)
615            .await
616            .context("reset_refreshing_tables: finding table ids")?
617            .into_iter()
618            .map(|(t, _)| t.table_id)
619            .collect();
620
621        let res = Table::update_many()
622            .col_expr(table::Column::RefreshState, Expr::value(RefreshState::Idle))
623            .filter(table::Column::TableId.is_in(table_ids))
624            .exec(&txn)
625            .await
626            .context("reset_refreshing_tables: update refresh state")?;
627
628        txn.commit().await?;
629
630        tracing::debug!(
631            "reset refreshing tables: {} tables updated",
632            res.rows_affected
633        );
634
635        Ok(())
636    }
637
638    pub async fn comment_on(&self, comment: PbComment) -> MetaResult<NotificationVersion> {
639        let inner = self.inner.write().await;
640        let txn = inner.db.begin().await?;
641        ensure_object_id(ObjectType::Database, comment.database_id, &txn).await?;
642        ensure_object_id(ObjectType::Schema, comment.schema_id, &txn).await?;
643        let table_obj = Object::find_by_id(comment.table_id)
644            .one(&txn)
645            .await?
646            .ok_or_else(|| MetaError::catalog_id_not_found("table", comment.table_id))?;
647
648        let table = if let Some(col_idx) = comment.column_index {
649            let columns: ColumnCatalogArray = Table::find_by_id(comment.table_id)
650                .select_only()
651                .column(table::Column::Columns)
652                .into_tuple()
653                .one(&txn)
654                .await?
655                .ok_or_else(|| MetaError::catalog_id_not_found("table", comment.table_id))?;
656            let mut pb_columns = columns.to_protobuf();
657
658            let column = pb_columns
659                .get_mut(col_idx as usize)
660                .ok_or_else(|| MetaError::catalog_id_not_found("column", col_idx))?;
661            let column_desc = column.column_desc.as_mut().ok_or_else(|| {
662                anyhow!(
663                    "column desc at index {} for table id {} not found",
664                    col_idx,
665                    comment.table_id
666                )
667            })?;
668            column_desc.description = comment.description;
669            table::ActiveModel {
670                table_id: Set(comment.table_id),
671                columns: Set(pb_columns.into()),
672                ..Default::default()
673            }
674            .update(&txn)
675            .await?
676        } else {
677            table::ActiveModel {
678                table_id: Set(comment.table_id),
679                description: Set(comment.description),
680                ..Default::default()
681            }
682            .update(&txn)
683            .await?
684        };
685        txn.commit().await?;
686
687        let version = self
688            .notify_frontend_relation_info(
689                NotificationOperation::Update,
690                PbObjectInfo::Table(ObjectModel(table, table_obj).into()),
691            )
692            .await;
693
694        Ok(version)
695    }
696
697    async fn notify_hummock_dropped_tables(&self, tables: Vec<PbTable>) {
698        if tables.is_empty() {
699            return;
700        }
701        let objects = tables
702            .into_iter()
703            .map(|t| PbObject {
704                object_info: Some(PbObjectInfo::Table(t)),
705            })
706            .collect();
707        let group = NotificationInfo::ObjectGroup(PbObjectGroup { objects });
708        self.env
709            .notification_manager()
710            .notify_hummock(NotificationOperation::Delete, group.clone())
711            .await;
712        self.env
713            .notification_manager()
714            .notify_compactor(NotificationOperation::Delete, group)
715            .await;
716    }
717
718    pub async fn complete_dropped_tables(&self, table_ids: impl IntoIterator<Item = TableId>) {
719        let mut inner = self.inner.write().await;
720        let tables = inner.complete_dropped_tables(table_ids);
721        self.notify_hummock_dropped_tables(tables).await;
722    }
723
724    pub async fn cleanup_dropped_tables(&self) {
725        let mut inner = self.inner.write().await;
726        let tables = inner.dropped_tables.drain().map(|(_, t)| t).collect();
727        self.notify_hummock_dropped_tables(tables).await;
728    }
729
730    pub async fn stats(&self) -> MetaResult<CatalogStats> {
731        let inner = self.inner.read().await;
732
733        let mut table_num_map: HashMap<_, _> = Table::find()
734            .select_only()
735            .column(table::Column::TableType)
736            .column_as(table::Column::TableId.count(), "num")
737            .group_by(table::Column::TableType)
738            .having(table::Column::TableType.ne(TableType::Internal))
739            .into_tuple::<(TableType, i64)>()
740            .all(&inner.db)
741            .await?
742            .into_iter()
743            .map(|(table_type, num)| (table_type, num as u64))
744            .collect();
745
746        let source_num = Source::find().count(&inner.db).await?;
747        let sink_num = Sink::find().count(&inner.db).await?;
748        let function_num = Function::find().count(&inner.db).await?;
749        let streaming_job_num = StreamingJob::find().count(&inner.db).await?;
750
751        let actor_num = {
752            let guard = self.env.shared_actor_info.read_guard();
753            guard
754                .iter_over_fragments()
755                .map(|(_, fragment)| fragment.actors.len() as u64)
756                .sum::<u64>()
757        };
758
759        Ok(CatalogStats {
760            table_num: table_num_map.remove(&TableType::Table).unwrap_or(0),
761            mview_num: table_num_map
762                .remove(&TableType::MaterializedView)
763                .unwrap_or(0),
764            index_num: table_num_map.remove(&TableType::Index).unwrap_or(0),
765            source_num,
766            sink_num,
767            function_num,
768            streaming_job_num,
769            actor_num,
770        })
771    }
772}
773
774/// `CatalogStats` is a struct to store the statistics of all catalogs.
775pub struct CatalogStats {
776    pub table_num: u64,
777    pub mview_num: u64,
778    pub index_num: u64,
779    pub source_num: u64,
780    pub sink_num: u64,
781    pub function_num: u64,
782    pub streaming_job_num: u64,
783    pub actor_num: u64,
784}
785
786impl CatalogControllerInner {
787    pub async fn snapshot(&self) -> MetaResult<(Catalog, Vec<PbUserInfo>)> {
788        let databases = self.list_databases().await?;
789        let schemas = self.list_schemas().await?;
790        let tables = self.list_tables().await?;
791        let sources = self.list_sources().await?;
792        let sinks = self.list_sinks().await?;
793        let subscriptions = self.list_subscriptions().await?;
794        let indexes = self.list_indexes().await?;
795        let views = self.list_views().await?;
796        let functions = self.list_functions().await?;
797        let connections = self.list_connections().await?;
798        let secrets = self.list_secrets().await?;
799
800        let users = self.list_users().await?;
801
802        Ok((
803            (
804                databases,
805                schemas,
806                tables,
807                sources,
808                sinks,
809                subscriptions,
810                indexes,
811                views,
812                functions,
813                connections,
814                secrets,
815            ),
816            users,
817        ))
818    }
819
820    async fn list_databases(&self) -> MetaResult<Vec<PbDatabase>> {
821        let db_objs = Database::find()
822            .find_also_related(Object)
823            .all(&self.db)
824            .await?;
825        Ok(db_objs
826            .into_iter()
827            .map(|(db, obj)| ObjectModel(db, obj.unwrap()).into())
828            .collect())
829    }
830
831    async fn list_schemas(&self) -> MetaResult<Vec<PbSchema>> {
832        let schema_objs = Schema::find()
833            .find_also_related(Object)
834            .all(&self.db)
835            .await?;
836
837        Ok(schema_objs
838            .into_iter()
839            .map(|(schema, obj)| ObjectModel(schema, obj.unwrap()).into())
840            .collect())
841    }
842
843    async fn list_users(&self) -> MetaResult<Vec<PbUserInfo>> {
844        let mut user_infos: Vec<PbUserInfo> = User::find()
845            .all(&self.db)
846            .await?
847            .into_iter()
848            .map(Into::into)
849            .collect();
850
851        for user_info in &mut user_infos {
852            user_info.grant_privileges = get_user_privilege(user_info.id as _, &self.db).await?;
853        }
854        Ok(user_infos)
855    }
856
857    /// `list_all_tables` return all tables and internal tables.
858    pub async fn list_all_state_tables(&self) -> MetaResult<Vec<PbTable>> {
859        let table_objs = Table::find()
860            .find_also_related(Object)
861            .all(&self.db)
862            .await?;
863
864        Ok(table_objs
865            .into_iter()
866            .map(|(table, obj)| ObjectModel(table, obj.unwrap()).into())
867            .collect())
868    }
869
870    /// `list_tables` return all `CREATED` tables, `CREATING` materialized views/ `BACKGROUND` jobs and internal tables that belong to them.
871    async fn list_tables(&self) -> MetaResult<Vec<PbTable>> {
872        let table_objs = Table::find()
873            .find_also_related(Object)
874            .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
875            .filter(
876                streaming_job::Column::JobStatus.eq(JobStatus::Created).or(
877                    table::Column::TableType
878                        .eq(TableType::MaterializedView)
879                        .or(streaming_job::Column::CreateType.eq(CreateType::Background)),
880                ),
881            )
882            .all(&self.db)
883            .await?;
884
885        let job_statuses: HashMap<JobId, JobStatus> = StreamingJob::find()
886            .select_only()
887            .column(streaming_job::Column::JobId)
888            .column(streaming_job::Column::JobStatus)
889            .filter(
890                streaming_job::Column::JobStatus
891                    .eq(JobStatus::Created)
892                    .or(streaming_job::Column::CreateType.eq(CreateType::Background)),
893            )
894            .into_tuple::<(JobId, JobStatus)>()
895            .all(&self.db)
896            .await?
897            .into_iter()
898            .collect();
899
900        let job_ids: HashSet<JobId> = table_objs
901            .iter()
902            .map(|(t, _)| t.table_id.as_job_id())
903            .chain(job_statuses.keys().cloned())
904            .collect();
905
906        let internal_table_objs = Table::find()
907            .find_also_related(Object)
908            .filter(
909                table::Column::TableType
910                    .eq(TableType::Internal)
911                    .and(table::Column::BelongsToJobId.is_in(job_ids)),
912            )
913            .all(&self.db)
914            .await?;
915
916        Ok(table_objs
917            .into_iter()
918            .chain(internal_table_objs.into_iter())
919            .map(|(table, obj)| {
920                // Correctly set the stream job status for creating materialized views and internal tables.
921                // If the table is not contained in `job_statuses`, it means the job is a creating mv.
922                let status: PbStreamJobStatus = if table.table_type == TableType::Internal {
923                    (*job_statuses
924                        .get(&table.belongs_to_job_id.unwrap())
925                        .unwrap_or(&JobStatus::Creating))
926                    .into()
927                } else {
928                    (*job_statuses
929                        .get(&table.table_id.as_job_id())
930                        .unwrap_or(&JobStatus::Creating))
931                    .into()
932                };
933                let mut pb_table: PbTable = ObjectModel(table, obj.unwrap()).into();
934                pb_table.stream_job_status = status.into();
935                pb_table
936            })
937            .collect())
938    }
939
940    /// `list_sources` return all sources and `CREATED` ones if contains any streaming jobs.
941    async fn list_sources(&self) -> MetaResult<Vec<PbSource>> {
942        let mut source_objs = Source::find()
943            .find_also_related(Object)
944            .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
945            .filter(
946                streaming_job::Column::JobStatus
947                    .is_null()
948                    .or(streaming_job::Column::JobStatus.eq(JobStatus::Created)),
949            )
950            .all(&self.db)
951            .await?;
952
953        // filter out inner connector sources that are still under creating.
954        let created_table_ids: HashSet<TableId> = Table::find()
955            .select_only()
956            .column(table::Column::TableId)
957            .join(JoinType::InnerJoin, table::Relation::Object1.def())
958            .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
959            .filter(
960                table::Column::OptionalAssociatedSourceId
961                    .is_not_null()
962                    .and(streaming_job::Column::JobStatus.eq(JobStatus::Created)),
963            )
964            .into_tuple()
965            .all(&self.db)
966            .await?
967            .into_iter()
968            .collect();
969        source_objs.retain_mut(|(source, _)| {
970            source.optional_associated_table_id.is_none()
971                || created_table_ids.contains(&source.optional_associated_table_id.unwrap())
972        });
973
974        Ok(source_objs
975            .into_iter()
976            .map(|(source, obj)| ObjectModel(source, obj.unwrap()).into())
977            .collect())
978    }
979
980    /// `list_sinks` return all `CREATED` and `BACKGROUND` sinks.
981    async fn list_sinks(&self) -> MetaResult<Vec<PbSink>> {
982        let sink_objs = Sink::find()
983            .find_also_related(Object)
984            .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
985            .filter(
986                streaming_job::Column::JobStatus
987                    .eq(JobStatus::Created)
988                    .or(streaming_job::Column::CreateType.eq(CreateType::Background)),
989            )
990            .all(&self.db)
991            .await?;
992
993        let creating_sinks: HashSet<_> = StreamingJob::find()
994            .select_only()
995            .column(streaming_job::Column::JobId)
996            .filter(
997                streaming_job::Column::JobStatus
998                    .eq(JobStatus::Creating)
999                    .and(
1000                        streaming_job::Column::JobId
1001                            .is_in(sink_objs.iter().map(|(sink, _)| sink.sink_id)),
1002                    ),
1003            )
1004            .into_tuple::<SinkId>()
1005            .all(&self.db)
1006            .await?
1007            .into_iter()
1008            .collect();
1009
1010        Ok(sink_objs
1011            .into_iter()
1012            .map(|(sink, obj)| {
1013                let is_creating = creating_sinks.contains(&sink.sink_id);
1014                let mut pb_sink: PbSink = ObjectModel(sink, obj.unwrap()).into();
1015                pb_sink.stream_job_status = if is_creating {
1016                    PbStreamJobStatus::Creating.into()
1017                } else {
1018                    PbStreamJobStatus::Created.into()
1019                };
1020                pb_sink
1021            })
1022            .collect())
1023    }
1024
1025    /// `list_subscriptions` return all `CREATED` subscriptions.
1026    async fn list_subscriptions(&self) -> MetaResult<Vec<PbSubscription>> {
1027        let subscription_objs = Subscription::find()
1028            .find_also_related(Object)
1029            .filter(subscription::Column::SubscriptionState.eq(SubscriptionState::Created as i32))
1030            .all(&self.db)
1031            .await?;
1032
1033        Ok(subscription_objs
1034            .into_iter()
1035            .map(|(subscription, obj)| ObjectModel(subscription, obj.unwrap()).into())
1036            .collect())
1037    }
1038
1039    async fn list_views(&self) -> MetaResult<Vec<PbView>> {
1040        let view_objs = View::find().find_also_related(Object).all(&self.db).await?;
1041
1042        Ok(view_objs
1043            .into_iter()
1044            .map(|(view, obj)| ObjectModel(view, obj.unwrap()).into())
1045            .collect())
1046    }
1047
1048    /// `list_indexes` return all `CREATED` and `BACKGROUND` indexes.
1049    async fn list_indexes(&self) -> MetaResult<Vec<PbIndex>> {
1050        let index_objs = Index::find()
1051            .find_also_related(Object)
1052            .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
1053            .filter(
1054                streaming_job::Column::JobStatus
1055                    .eq(JobStatus::Created)
1056                    .or(streaming_job::Column::CreateType.eq(CreateType::Background)),
1057            )
1058            .all(&self.db)
1059            .await?;
1060
1061        let creating_indexes: HashSet<_> = StreamingJob::find()
1062            .select_only()
1063            .column(streaming_job::Column::JobId)
1064            .filter(
1065                streaming_job::Column::JobStatus
1066                    .eq(JobStatus::Creating)
1067                    .and(
1068                        streaming_job::Column::JobId
1069                            .is_in(index_objs.iter().map(|(index, _)| index.index_id)),
1070                    ),
1071            )
1072            .into_tuple::<IndexId>()
1073            .all(&self.db)
1074            .await?
1075            .into_iter()
1076            .collect();
1077
1078        Ok(index_objs
1079            .into_iter()
1080            .map(|(index, obj)| {
1081                let is_creating = creating_indexes.contains(&index.index_id);
1082                let mut pb_index: PbIndex = ObjectModel(index, obj.unwrap()).into();
1083                pb_index.stream_job_status = if is_creating {
1084                    PbStreamJobStatus::Creating.into()
1085                } else {
1086                    PbStreamJobStatus::Created.into()
1087                };
1088                pb_index
1089            })
1090            .collect())
1091    }
1092
1093    async fn list_connections(&self) -> MetaResult<Vec<PbConnection>> {
1094        let conn_objs = Connection::find()
1095            .find_also_related(Object)
1096            .all(&self.db)
1097            .await?;
1098
1099        Ok(conn_objs
1100            .into_iter()
1101            .map(|(conn, obj)| ObjectModel(conn, obj.unwrap()).into())
1102            .collect())
1103    }
1104
1105    pub async fn list_secrets(&self) -> MetaResult<Vec<PbSecret>> {
1106        let secret_objs = Secret::find()
1107            .find_also_related(Object)
1108            .all(&self.db)
1109            .await?;
1110        Ok(secret_objs
1111            .into_iter()
1112            .map(|(secret, obj)| ObjectModel(secret, obj.unwrap()).into())
1113            .collect())
1114    }
1115
1116    async fn list_functions(&self) -> MetaResult<Vec<PbFunction>> {
1117        let func_objs = Function::find()
1118            .find_also_related(Object)
1119            .all(&self.db)
1120            .await?;
1121
1122        Ok(func_objs
1123            .into_iter()
1124            .map(|(func, obj)| ObjectModel(func, obj.unwrap()).into())
1125            .collect())
1126    }
1127
1128    pub(crate) fn register_finish_notifier(
1129        &mut self,
1130        database_id: DatabaseId,
1131        id: JobId,
1132        sender: Sender<Result<NotificationVersion, String>>,
1133    ) {
1134        self.creating_table_finish_notifier
1135            .entry(database_id)
1136            .or_default()
1137            .entry(id)
1138            .or_default()
1139            .push(sender);
1140    }
1141
1142    pub(crate) async fn streaming_job_is_finished(&mut self, id: JobId) -> MetaResult<bool> {
1143        let status = StreamingJob::find()
1144            .select_only()
1145            .column(streaming_job::Column::JobStatus)
1146            .filter(streaming_job::Column::JobId.eq(id))
1147            .into_tuple::<JobStatus>()
1148            .one(&self.db)
1149            .await?;
1150
1151        status
1152            .map(|status| status == JobStatus::Created)
1153            .ok_or_else(|| {
1154                MetaError::catalog_id_not_found("streaming job", "may have been cancelled/dropped")
1155            })
1156    }
1157
1158    pub(crate) fn notify_finish_failed(&mut self, database_id: Option<DatabaseId>, err: String) {
1159        if let Some(database_id) = database_id {
1160            if let Some(creating_tables) = self.creating_table_finish_notifier.remove(&database_id)
1161            {
1162                for tx in creating_tables.into_values().flatten() {
1163                    let _ = tx.send(Err(err.clone()));
1164                }
1165            }
1166        } else {
1167            for tx in take(&mut self.creating_table_finish_notifier)
1168                .into_values()
1169                .flatten()
1170                .flat_map(|(_, txs)| txs.into_iter())
1171            {
1172                let _ = tx.send(Err(err.clone()));
1173            }
1174        }
1175    }
1176
1177    pub async fn list_time_travel_table_ids(&self) -> MetaResult<Vec<TableId>> {
1178        let table_ids: Vec<TableId> = Table::find()
1179            .select_only()
1180            .filter(table::Column::TableType.is_in(vec![
1181                TableType::Table,
1182                TableType::MaterializedView,
1183                TableType::Index,
1184            ]))
1185            .column(table::Column::TableId)
1186            .into_tuple()
1187            .all(&self.db)
1188            .await?;
1189        Ok(table_ids)
1190    }
1191
1192    /// Since the tables have been dropped from both meta store and streaming jobs, this method removes those table copies.
1193    /// Returns the removed table copies.
1194    pub(crate) fn complete_dropped_tables(
1195        &mut self,
1196        table_ids: impl IntoIterator<Item = TableId>,
1197    ) -> Vec<PbTable> {
1198        table_ids
1199            .into_iter()
1200            .filter_map(|table_id| {
1201                self.dropped_tables.remove(&table_id).map_or_else(
1202                    || {
1203                        tracing::warn!(%table_id, "table not found");
1204                        None
1205                    },
1206                    Some,
1207                )
1208            })
1209            .collect()
1210    }
1211}