risingwave_meta/controller/catalog/
mod.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod alter_op;
16mod create_op;
17mod drop_op;
18mod get_op;
19mod list_op;
20mod test;
21mod util;
22
23use std::collections::{BTreeSet, HashMap, HashSet};
24use std::iter;
25use std::mem::take;
26use std::sync::Arc;
27
28use anyhow::anyhow;
29use itertools::Itertools;
30use risingwave_common::catalog::{
31    DEFAULT_SCHEMA_NAME, FragmentTypeFlag, FragmentTypeMask, SYSTEM_SCHEMAS, TableOption,
32};
33use risingwave_common::current_cluster_version;
34use risingwave_common::id::JobId;
35use risingwave_common::secret::LocalSecretManager;
36use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont_mut;
37use risingwave_connector::source::UPSTREAM_SOURCE_KEY;
38use risingwave_connector::source::cdc::build_cdc_table_id;
39use risingwave_meta_model::object::ObjectType;
40use risingwave_meta_model::prelude::*;
41use risingwave_meta_model::table::TableType;
42use risingwave_meta_model::{
43    ActorId, ColumnCatalogArray, ConnectionId, CreateType, DatabaseId, FragmentId, I32Array,
44    IndexId, JobStatus, ObjectId, Property, SchemaId, SecretId, SinkFormatDesc, SinkId, SourceId,
45    StreamNode, StreamSourceInfo, StreamingParallelism, SubscriptionId, TableId, TableIdArray,
46    UserId, ViewId, connection, database, fragment, function, index, object, object_dependency,
47    pending_sink_state, schema, secret, sink, source, streaming_job, subscription, table,
48    user_privilege, view,
49};
50use risingwave_pb::catalog::connection::Info as ConnectionInfo;
51use risingwave_pb::catalog::subscription::SubscriptionState;
52use risingwave_pb::catalog::table::PbTableType;
53use risingwave_pb::catalog::{
54    PbComment, PbConnection, PbDatabase, PbFunction, PbIndex, PbSchema, PbSecret, PbSink, PbSource,
55    PbSubscription, PbTable, PbView,
56};
57use risingwave_pb::meta::cancel_creating_jobs_request::PbCreatingJobInfo;
58use risingwave_pb::meta::object::PbObjectInfo;
59use risingwave_pb::meta::subscribe_response::{
60    Info as NotificationInfo, Info, Operation as NotificationOperation, Operation,
61};
62use risingwave_pb::meta::{PbObject, PbObjectGroup};
63use risingwave_pb::stream_plan::stream_node::NodeBody;
64use risingwave_pb::telemetry::PbTelemetryEventStage;
65use risingwave_pb::user::PbUserInfo;
66use sea_orm::ActiveValue::Set;
67use sea_orm::sea_query::{Expr, Query, SimpleExpr};
68use sea_orm::{
69    ActiveModelTrait, ColumnTrait, DatabaseConnection, DatabaseTransaction, EntityTrait,
70    IntoActiveModel, JoinType, PaginatorTrait, QueryFilter, QuerySelect, RelationTrait,
71    SelectColumns, TransactionTrait, Value,
72};
73use tokio::sync::oneshot::Sender;
74use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
75use tracing::info;
76
77use super::utils::{
78    check_subscription_name_duplicate, get_internal_tables_by_id, load_streaming_jobs_by_ids,
79    rename_relation, rename_relation_refer,
80};
81use crate::controller::ObjectModel;
82use crate::controller::catalog::util::update_internal_tables;
83use crate::controller::fragment::FragmentTypeMaskExt;
84use crate::controller::utils::*;
85use crate::manager::{
86    IGNORED_NOTIFICATION_VERSION, MetaSrvEnv, NotificationVersion,
87    get_referred_connection_ids_from_source, get_referred_secret_ids_from_source,
88};
89use crate::rpc::ddl_controller::DropMode;
90use crate::telemetry::{MetaTelemetryJobDesc, report_event};
91use crate::{MetaError, MetaResult};
92
93pub type Catalog = (
94    Vec<PbDatabase>,
95    Vec<PbSchema>,
96    Vec<PbTable>,
97    Vec<PbSource>,
98    Vec<PbSink>,
99    Vec<PbSubscription>,
100    Vec<PbIndex>,
101    Vec<PbView>,
102    Vec<PbFunction>,
103    Vec<PbConnection>,
104    Vec<PbSecret>,
105);
106
107pub type CatalogControllerRef = Arc<CatalogController>;
108
109/// `CatalogController` is the controller for catalog related operations, including database, schema, table, view, etc.
110pub struct CatalogController {
111    pub(crate) env: MetaSrvEnv,
112    pub(crate) inner: RwLock<CatalogControllerInner>,
113}
114
115#[derive(Clone, Default, Debug)]
116pub struct DropTableConnectorContext {
117    // we only apply one drop connector action for one table each time, so no need to vector here
118    pub(crate) to_change_streaming_job_id: JobId,
119    pub(crate) to_remove_state_table_id: TableId,
120    pub(crate) to_remove_source_id: SourceId,
121}
122
123#[derive(Clone, Default, Debug)]
124pub struct ReleaseContext {
125    pub(crate) database_id: DatabaseId,
126    pub(crate) removed_streaming_job_ids: Vec<JobId>,
127    /// Dropped state table list, need to unregister from hummock.
128    pub(crate) removed_state_table_ids: Vec<TableId>,
129
130    /// Dropped secrets, need to remove from secret manager.
131    pub(crate) removed_secret_ids: Vec<SecretId>,
132    /// Dropped sources (when `DROP SOURCE`), need to unregister from source manager.
133    pub(crate) removed_source_ids: Vec<SourceId>,
134    /// Dropped Source fragments (when `DROP MATERIALIZED VIEW` referencing sources),
135    /// need to unregister from source manager.
136    pub(crate) removed_source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
137
138    pub(crate) removed_actors: HashSet<ActorId>,
139    pub(crate) removed_fragments: HashSet<FragmentId>,
140
141    /// Removed sink fragment by target fragment.
142    pub(crate) removed_sink_fragment_by_targets: HashMap<FragmentId, Vec<FragmentId>>,
143
144    /// Dropped iceberg table sinks
145    pub(crate) removed_iceberg_table_sinks: Vec<PbSink>,
146}
147
148impl CatalogController {
149    pub async fn new(env: MetaSrvEnv) -> MetaResult<Self> {
150        let meta_store = env.meta_store();
151        let catalog_controller = Self {
152            env,
153            inner: RwLock::new(CatalogControllerInner {
154                db: meta_store.conn,
155                creating_table_finish_notifier: HashMap::new(),
156                dropped_tables: HashMap::new(),
157            }),
158        };
159
160        catalog_controller.init().await?;
161        Ok(catalog_controller)
162    }
163
164    /// Used in `NotificationService::subscribe`.
165    /// Need to pay attention to the order of acquiring locks to prevent deadlock problems.
166    pub async fn get_inner_read_guard(&self) -> RwLockReadGuard<'_, CatalogControllerInner> {
167        self.inner.read().await
168    }
169
170    pub async fn get_inner_write_guard(&self) -> RwLockWriteGuard<'_, CatalogControllerInner> {
171        self.inner.write().await
172    }
173}
174
175pub struct CatalogControllerInner {
176    pub(crate) db: DatabaseConnection,
177    /// Registered finish notifiers for creating tables.
178    ///
179    /// `DdlController` will update this map, and pass the `tx` side to `CatalogController`.
180    /// On notifying, we can remove the entry from this map.
181    #[expect(clippy::type_complexity)]
182    pub creating_table_finish_notifier:
183        HashMap<DatabaseId, HashMap<JobId, Vec<Sender<Result<NotificationVersion, String>>>>>,
184    /// Tables have been dropped from the meta store, but the corresponding barrier remains unfinished.
185    pub dropped_tables: HashMap<TableId, PbTable>,
186}
187
188impl CatalogController {
189    pub(crate) async fn notify_frontend(
190        &self,
191        operation: NotificationOperation,
192        info: NotificationInfo,
193    ) -> NotificationVersion {
194        self.env
195            .notification_manager()
196            .notify_frontend(operation, info)
197            .await
198    }
199
200    pub(crate) async fn notify_frontend_relation_info(
201        &self,
202        operation: NotificationOperation,
203        relation_info: PbObjectInfo,
204    ) -> NotificationVersion {
205        self.env
206            .notification_manager()
207            .notify_frontend_object_info(operation, relation_info)
208            .await
209    }
210
211    /// Trivially advance the notification version and notify to frontend,
212    /// return the notification version for frontend to wait for.
213    ///
214    /// Cannot simply return the current version, because the current version may not be sent
215    /// to frontend, and the frontend may endlessly wait for this version, until a frontend
216    /// related notification is sent.
217    pub(crate) async fn notify_frontend_trivial(&self) -> NotificationVersion {
218        self.env
219            .notification_manager()
220            .notify_frontend(
221                NotificationOperation::Update,
222                NotificationInfo::ObjectGroup(PbObjectGroup {
223                    objects: vec![],
224                    dependencies: vec![],
225                }),
226            )
227            .await
228    }
229}
230
231impl CatalogController {
232    pub async fn finish_create_subscription_catalog(
233        &self,
234        subscription_id: SubscriptionId,
235    ) -> MetaResult<()> {
236        let inner = self.inner.write().await;
237        let txn = inner.db.begin().await?;
238
239        // update `created_at` as now() and `created_at_cluster_version` as current cluster version.
240        let res = Object::update_many()
241            .col_expr(object::Column::CreatedAt, Expr::current_timestamp().into())
242            .col_expr(
243                object::Column::CreatedAtClusterVersion,
244                current_cluster_version().into(),
245            )
246            .filter(object::Column::Oid.eq(subscription_id))
247            .exec(&txn)
248            .await?;
249        if res.rows_affected == 0 {
250            return Err(MetaError::catalog_id_not_found(
251                "subscription",
252                subscription_id,
253            ));
254        }
255
256        // mark the target subscription as `Create`.
257        let job = subscription::ActiveModel {
258            subscription_id: Set(subscription_id),
259            subscription_state: Set(SubscriptionState::Created.into()),
260            ..Default::default()
261        };
262        Subscription::update(job).exec(&txn).await?;
263
264        let _ = grant_default_privileges_automatically(&txn, subscription_id).await?;
265
266        txn.commit().await?;
267
268        Ok(())
269    }
270
271    pub async fn notify_create_subscription(
272        &self,
273        subscription_id: SubscriptionId,
274    ) -> MetaResult<NotificationVersion> {
275        let inner = self.inner.read().await;
276        let txn = inner.db.begin().await?;
277        let (subscription, obj) = Subscription::find_by_id(subscription_id)
278            .find_also_related(Object)
279            .filter(subscription::Column::SubscriptionState.eq(SubscriptionState::Created as i32))
280            .one(&txn)
281            .await?
282            .ok_or_else(|| MetaError::catalog_id_not_found("subscription", subscription_id))?;
283
284        let dependencies =
285            list_object_dependencies_by_object_id(&txn, subscription_id.into()).await?;
286        txn.commit().await?;
287
288        let mut version = self
289            .notify_frontend(
290                NotificationOperation::Add,
291                NotificationInfo::ObjectGroup(PbObjectGroup {
292                    objects: vec![PbObject {
293                        object_info: PbObjectInfo::Subscription(
294                            ObjectModel(subscription, obj.unwrap(), None).into(),
295                        )
296                        .into(),
297                    }],
298                    dependencies,
299                }),
300            )
301            .await;
302
303        // notify default privileges about the new subscription
304        let updated_user_ids: Vec<UserId> = UserPrivilege::find()
305            .select_only()
306            .distinct()
307            .column(user_privilege::Column::UserId)
308            .filter(user_privilege::Column::Oid.eq(subscription_id.as_object_id()))
309            .into_tuple()
310            .all(&inner.db)
311            .await?;
312
313        if !updated_user_ids.is_empty() {
314            let updated_user_infos = list_user_info_by_ids(updated_user_ids, &inner.db).await?;
315            version = self.notify_users_update(updated_user_infos).await;
316        }
317
318        Ok(version)
319    }
320
321    // for telemetry
322    pub async fn get_connector_usage(&self) -> MetaResult<jsonbb::Value> {
323        // get connector usage by source/sink
324        // the expect format is like:
325        // {
326        //     "source": [{
327        //         "$source_id": {
328        //             "connector": "kafka",
329        //             "format": "plain",
330        //             "encode": "json"
331        //         },
332        //     }],
333        //     "sink": [{
334        //         "$sink_id": {
335        //             "connector": "pulsar",
336        //             "format": "upsert",
337        //             "encode": "avro"
338        //         },
339        //     }],
340        // }
341
342        let inner = self.inner.read().await;
343        let source_props_and_info: Vec<(i32, Property, Option<StreamSourceInfo>)> = Source::find()
344            .select_only()
345            .column(source::Column::SourceId)
346            .column(source::Column::WithProperties)
347            .column(source::Column::SourceInfo)
348            .into_tuple()
349            .all(&inner.db)
350            .await?;
351        let sink_props_and_info: Vec<(i32, Property, Option<SinkFormatDesc>)> = Sink::find()
352            .select_only()
353            .column(sink::Column::SinkId)
354            .column(sink::Column::Properties)
355            .column(sink::Column::SinkFormatDesc)
356            .into_tuple()
357            .all(&inner.db)
358            .await?;
359        drop(inner);
360
361        let get_connector_from_property = |property: &Property| -> String {
362            property
363                .0
364                .get(UPSTREAM_SOURCE_KEY)
365                .cloned()
366                .unwrap_or_default()
367        };
368
369        let source_report: Vec<jsonbb::Value> = source_props_and_info
370            .iter()
371            .map(|(oid, property, info)| {
372                let connector_name = get_connector_from_property(property);
373                let mut format = None;
374                let mut encode = None;
375                if let Some(info) = info {
376                    let pb_info = info.to_protobuf();
377                    format = Some(pb_info.format().as_str_name());
378                    encode = Some(pb_info.row_encode().as_str_name());
379                }
380                jsonbb::json!({
381                    oid.to_string(): {
382                        "connector": connector_name,
383                        "format": format,
384                        "encode": encode,
385                    },
386                })
387            })
388            .collect_vec();
389
390        let sink_report: Vec<jsonbb::Value> = sink_props_and_info
391            .iter()
392            .map(|(oid, property, info)| {
393                let connector_name = get_connector_from_property(property);
394                let mut format = None;
395                let mut encode = None;
396                if let Some(info) = info {
397                    let pb_info = info.to_protobuf();
398                    format = Some(pb_info.format().as_str_name());
399                    encode = Some(pb_info.encode().as_str_name());
400                }
401                jsonbb::json!({
402                    oid.to_string(): {
403                        "connector": connector_name,
404                        "format": format,
405                        "encode": encode,
406                    },
407                })
408            })
409            .collect_vec();
410
411        Ok(jsonbb::json!({
412                "source": source_report,
413                "sink": sink_report,
414        }))
415    }
416
417    pub async fn clean_dirty_subscription(
418        &self,
419        database_id: Option<DatabaseId>,
420    ) -> MetaResult<()> {
421        let inner = self.inner.write().await;
422        let txn = inner.db.begin().await?;
423        let filter_condition = object::Column::ObjType.eq(ObjectType::Subscription).and(
424            object::Column::Oid.not_in_subquery(
425                Query::select()
426                    .column(subscription::Column::SubscriptionId)
427                    .from(Subscription)
428                    .and_where(
429                        subscription::Column::SubscriptionState
430                            .eq(SubscriptionState::Created as i32),
431                    )
432                    .take(),
433            ),
434        );
435
436        let filter_condition = if let Some(database_id) = database_id {
437            filter_condition.and(object::Column::DatabaseId.eq(database_id))
438        } else {
439            filter_condition
440        };
441        Object::delete_many()
442            .filter(filter_condition)
443            .exec(&txn)
444            .await?;
445        txn.commit().await?;
446        // We don't need to notify the frontend, because the Init subscription is not send to frontend.
447        Ok(())
448    }
449
450    /// `clean_dirty_creating_jobs` cleans up creating jobs that are creating in Foreground mode or in Initial status.
451    pub async fn clean_dirty_creating_jobs(
452        &self,
453        database_id: Option<DatabaseId>,
454    ) -> MetaResult<Vec<SourceId>> {
455        let inner = self.inner.write().await;
456        let txn = inner.db.begin().await?;
457
458        let filter_condition = streaming_job::Column::JobStatus.eq(JobStatus::Initial).or(
459            streaming_job::Column::JobStatus
460                .eq(JobStatus::Creating)
461                .and(streaming_job::Column::CreateType.eq(CreateType::Foreground)),
462        );
463
464        let filter_condition = if let Some(database_id) = database_id {
465            filter_condition.and(object::Column::DatabaseId.eq(database_id))
466        } else {
467            filter_condition
468        };
469
470        let mut dirty_job_objs: Vec<PartialObject> = streaming_job::Entity::find()
471            .select_only()
472            .column(streaming_job::Column::JobId)
473            .columns([
474                object::Column::Oid,
475                object::Column::ObjType,
476                object::Column::SchemaId,
477                object::Column::DatabaseId,
478            ])
479            .join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
480            .filter(filter_condition)
481            .into_partial_model()
482            .all(&txn)
483            .await?;
484
485        // Check if there are any pending iceberg table jobs.
486        let dirty_iceberg_jobs = find_dirty_iceberg_table_jobs(&txn, database_id).await?;
487        if !dirty_iceberg_jobs.is_empty() {
488            dirty_job_objs.extend(dirty_iceberg_jobs);
489        }
490
491        Self::clean_dirty_sink_downstreams(&txn).await?;
492
493        if dirty_job_objs.is_empty() {
494            return Ok(vec![]);
495        }
496
497        self.log_cleaned_dirty_jobs(&dirty_job_objs, &txn).await?;
498
499        let dirty_job_ids = dirty_job_objs.iter().map(|obj| obj.oid).collect::<Vec<_>>();
500
501        // Filter out dummy objs for replacement.
502        // todo: we'd better introduce a new dummy object type for replacement.
503        let all_dirty_table_ids = dirty_job_objs
504            .iter()
505            .filter(|obj| obj.obj_type == ObjectType::Table)
506            .map(|obj| obj.oid)
507            .collect_vec();
508        let dirty_table_type_map: HashMap<ObjectId, TableType> = Table::find()
509            .select_only()
510            .column(table::Column::TableId)
511            .column(table::Column::TableType)
512            .filter(table::Column::TableId.is_in(all_dirty_table_ids))
513            .into_tuple::<(ObjectId, TableType)>()
514            .all(&txn)
515            .await?
516            .into_iter()
517            .collect();
518
519        let dirty_background_jobs: HashSet<ObjectId> = streaming_job::Entity::find()
520            .select_only()
521            .column(streaming_job::Column::JobId)
522            .filter(
523                streaming_job::Column::JobId
524                    .is_in(dirty_job_ids.clone())
525                    .and(streaming_job::Column::CreateType.eq(CreateType::Background)),
526            )
527            .into_tuple()
528            .all(&txn)
529            .await?
530            .into_iter()
531            .collect();
532
533        // notify delete for failed materialized views and background jobs.
534        let to_notify_objs = dirty_job_objs
535            .into_iter()
536            .filter(|obj| {
537                matches!(
538                    dirty_table_type_map.get(&obj.oid),
539                    Some(TableType::MaterializedView)
540                ) || dirty_background_jobs.contains(&obj.oid)
541            })
542            .collect_vec();
543
544        // The source ids for dirty tables with connector.
545        // FIXME: we should also clean dirty sources.
546        let dirty_associated_source_ids: Vec<SourceId> = Table::find()
547            .select_only()
548            .column(table::Column::OptionalAssociatedSourceId)
549            .filter(
550                table::Column::TableId
551                    .is_in(dirty_job_ids.clone())
552                    .and(table::Column::OptionalAssociatedSourceId.is_not_null()),
553            )
554            .into_tuple()
555            .all(&txn)
556            .await?;
557
558        let dirty_state_table_ids: Vec<TableId> = Table::find()
559            .select_only()
560            .column(table::Column::TableId)
561            .filter(table::Column::BelongsToJobId.is_in(dirty_job_ids.clone()))
562            .into_tuple()
563            .all(&txn)
564            .await?;
565
566        let dirty_internal_table_objs = Object::find()
567            .select_only()
568            .columns([
569                object::Column::Oid,
570                object::Column::ObjType,
571                object::Column::SchemaId,
572                object::Column::DatabaseId,
573            ])
574            .join(JoinType::InnerJoin, object::Relation::Table.def())
575            .filter(table::Column::BelongsToJobId.is_in(to_notify_objs.iter().map(|obj| obj.oid)))
576            .into_partial_model()
577            .all(&txn)
578            .await?;
579
580        let to_delete_objs: HashSet<ObjectId> = dirty_job_ids
581            .clone()
582            .into_iter()
583            .chain(
584                dirty_state_table_ids
585                    .into_iter()
586                    .map(|table_id| table_id.as_object_id()),
587            )
588            .chain(
589                dirty_associated_source_ids
590                    .iter()
591                    .map(|source_id| source_id.as_object_id()),
592            )
593            .collect();
594
595        let res = Object::delete_many()
596            .filter(object::Column::Oid.is_in(to_delete_objs))
597            .exec(&txn)
598            .await?;
599        assert!(res.rows_affected > 0);
600
601        txn.commit().await?;
602
603        let object_group = build_object_group_for_delete(
604            to_notify_objs
605                .into_iter()
606                .chain(dirty_internal_table_objs.into_iter())
607                .collect_vec(),
608        );
609
610        let _version = self
611            .notify_frontend(NotificationOperation::Delete, object_group)
612            .await;
613
614        Ok(dirty_associated_source_ids)
615    }
616
617    pub async fn comment_on(&self, comment: PbComment) -> MetaResult<NotificationVersion> {
618        let inner = self.inner.write().await;
619        let txn = inner.db.begin().await?;
620        ensure_object_id(ObjectType::Database, comment.database_id, &txn).await?;
621        ensure_object_id(ObjectType::Schema, comment.schema_id, &txn).await?;
622        let (table_obj, streaming_job) = Object::find_by_id(comment.table_id)
623            .find_also_related(StreamingJob)
624            .one(&txn)
625            .await?
626            .ok_or_else(|| MetaError::catalog_id_not_found("object", comment.table_id))?;
627
628        let table = if let Some(col_idx) = comment.column_index {
629            let columns: ColumnCatalogArray = Table::find_by_id(comment.table_id)
630                .select_only()
631                .column(table::Column::Columns)
632                .into_tuple()
633                .one(&txn)
634                .await?
635                .ok_or_else(|| MetaError::catalog_id_not_found("table", comment.table_id))?;
636            let mut pb_columns = columns.to_protobuf();
637
638            let column = pb_columns
639                .get_mut(col_idx as usize)
640                .ok_or_else(|| MetaError::catalog_id_not_found("column", col_idx))?;
641            let column_desc = column.column_desc.as_mut().ok_or_else(|| {
642                anyhow!(
643                    "column desc at index {} for table id {} not found",
644                    col_idx,
645                    comment.table_id
646                )
647            })?;
648            column_desc.description = comment.description;
649            table::ActiveModel {
650                table_id: Set(comment.table_id),
651                columns: Set(pb_columns.into()),
652                ..Default::default()
653            }
654            .update(&txn)
655            .await?
656        } else {
657            table::ActiveModel {
658                table_id: Set(comment.table_id),
659                description: Set(comment.description),
660                ..Default::default()
661            }
662            .update(&txn)
663            .await?
664        };
665        txn.commit().await?;
666
667        let version = self
668            .notify_frontend_relation_info(
669                NotificationOperation::Update,
670                PbObjectInfo::Table(ObjectModel(table, table_obj, streaming_job).into()),
671            )
672            .await;
673
674        Ok(version)
675    }
676
677    async fn notify_hummock_dropped_tables(&self, tables: Vec<PbTable>) {
678        if tables.is_empty() {
679            return;
680        }
681        let objects = tables
682            .into_iter()
683            .map(|t| PbObject {
684                object_info: Some(PbObjectInfo::Table(t)),
685            })
686            .collect();
687        let group = NotificationInfo::ObjectGroup(PbObjectGroup {
688            objects,
689            dependencies: vec![],
690        });
691        self.env
692            .notification_manager()
693            .notify_hummock(NotificationOperation::Delete, group.clone())
694            .await;
695        self.env
696            .notification_manager()
697            .notify_compactor(NotificationOperation::Delete, group)
698            .await;
699    }
700
701    pub async fn complete_dropped_tables(&self, table_ids: impl IntoIterator<Item = TableId>) {
702        let mut inner = self.inner.write().await;
703        let tables = inner.complete_dropped_tables(table_ids);
704        self.notify_hummock_dropped_tables(tables).await;
705    }
706
707    pub async fn cleanup_dropped_tables(&self) {
708        let mut inner = self.inner.write().await;
709        let tables = inner.dropped_tables.drain().map(|(_, t)| t).collect();
710        self.notify_hummock_dropped_tables(tables).await;
711    }
712
713    pub async fn stats(&self) -> MetaResult<CatalogStats> {
714        let inner = self.inner.read().await;
715
716        let mut table_num_map: HashMap<_, _> = Table::find()
717            .select_only()
718            .column(table::Column::TableType)
719            .column_as(table::Column::TableId.count(), "num")
720            .group_by(table::Column::TableType)
721            .having(table::Column::TableType.ne(TableType::Internal))
722            .into_tuple::<(TableType, i64)>()
723            .all(&inner.db)
724            .await?
725            .into_iter()
726            .map(|(table_type, num)| (table_type, num as u64))
727            .collect();
728
729        let source_num = Source::find().count(&inner.db).await?;
730        let sink_num = Sink::find().count(&inner.db).await?;
731        let function_num = Function::find().count(&inner.db).await?;
732        let streaming_job_num = StreamingJob::find().count(&inner.db).await?;
733
734        let actor_num = {
735            let guard = self.env.shared_actor_info.read_guard();
736            guard
737                .iter_over_fragments()
738                .map(|(_, fragment)| fragment.actors.len() as u64)
739                .sum::<u64>()
740        };
741        let database_num = Database::find().count(&inner.db).await?;
742
743        Ok(CatalogStats {
744            table_num: table_num_map.remove(&TableType::Table).unwrap_or(0),
745            mview_num: table_num_map
746                .remove(&TableType::MaterializedView)
747                .unwrap_or(0),
748            index_num: table_num_map.remove(&TableType::Index).unwrap_or(0),
749            source_num,
750            sink_num,
751            function_num,
752            streaming_job_num,
753            actor_num,
754            database_num,
755        })
756    }
757
758    pub async fn fetch_sink_with_state_table_ids(
759        &self,
760        sink_ids: HashSet<SinkId>,
761    ) -> MetaResult<HashMap<SinkId, Vec<TableId>>> {
762        let inner = self.inner.read().await;
763
764        let query = Fragment::find()
765            .select_only()
766            .columns([fragment::Column::JobId, fragment::Column::StateTableIds])
767            .filter(
768                fragment::Column::JobId
769                    .is_in(sink_ids)
770                    .and(FragmentTypeMask::intersects(FragmentTypeFlag::Sink)),
771            );
772
773        let rows: Vec<(JobId, TableIdArray)> = query.into_tuple().all(&inner.db).await?;
774
775        debug_assert!(rows.iter().map(|(job_id, _)| job_id).all_unique());
776
777        let result = rows
778            .into_iter()
779            .map(|(job_id, table_id_array)| (job_id.as_sink_id(), table_id_array.0))
780            .collect::<HashMap<_, _>>();
781
782        Ok(result)
783    }
784
785    pub async fn list_all_pending_sinks(
786        &self,
787        database_id: Option<DatabaseId>,
788    ) -> MetaResult<HashSet<SinkId>> {
789        let inner = self.inner.read().await;
790
791        let mut query = pending_sink_state::Entity::find()
792            .select_only()
793            .columns([pending_sink_state::Column::SinkId])
794            .filter(
795                pending_sink_state::Column::SinkState.eq(pending_sink_state::SinkState::Pending),
796            )
797            .distinct();
798
799        if let Some(db_id) = database_id {
800            query = query
801                .join(
802                    JoinType::InnerJoin,
803                    pending_sink_state::Relation::Object.def(),
804                )
805                .filter(object::Column::DatabaseId.eq(db_id));
806        }
807
808        let result: Vec<SinkId> = query.into_tuple().all(&inner.db).await?;
809
810        Ok(result.into_iter().collect())
811    }
812
813    pub async fn abort_pending_sink_epochs(
814        &self,
815        sink_committed_epoch: HashMap<SinkId, u64>,
816    ) -> MetaResult<()> {
817        let inner = self.inner.write().await;
818        let txn = inner.db.begin().await?;
819
820        for (sink_id, committed_epoch) in sink_committed_epoch {
821            pending_sink_state::Entity::update_many()
822                .col_expr(
823                    pending_sink_state::Column::SinkState,
824                    Expr::value(pending_sink_state::SinkState::Aborted),
825                )
826                .filter(
827                    pending_sink_state::Column::SinkId
828                        .eq(sink_id)
829                        .and(pending_sink_state::Column::Epoch.gt(committed_epoch as i64)),
830                )
831                .exec(&txn)
832                .await?;
833        }
834
835        txn.commit().await?;
836        Ok(())
837    }
838}
839
840/// `CatalogStats` is a struct to store the statistics of all catalogs.
841pub struct CatalogStats {
842    pub table_num: u64,
843    pub mview_num: u64,
844    pub index_num: u64,
845    pub source_num: u64,
846    pub sink_num: u64,
847    pub function_num: u64,
848    pub streaming_job_num: u64,
849    pub actor_num: u64,
850    pub database_num: u64,
851}
852
853impl CatalogControllerInner {
854    pub async fn snapshot(&self) -> MetaResult<(Catalog, Vec<PbUserInfo>)> {
855        let databases = self.list_databases().await?;
856        let schemas = self.list_schemas().await?;
857        let tables = self.list_tables().await?;
858        let sources = self.list_sources().await?;
859        let sinks = self.list_sinks().await?;
860        let subscriptions = self.list_subscriptions().await?;
861        let indexes = self.list_indexes().await?;
862        let views = self.list_views().await?;
863        let functions = self.list_functions().await?;
864        let connections = self.list_connections().await?;
865        let secrets = self.list_secrets().await?;
866
867        let users = self.list_users().await?;
868
869        Ok((
870            (
871                databases,
872                schemas,
873                tables,
874                sources,
875                sinks,
876                subscriptions,
877                indexes,
878                views,
879                functions,
880                connections,
881                secrets,
882            ),
883            users,
884        ))
885    }
886
887    async fn list_databases(&self) -> MetaResult<Vec<PbDatabase>> {
888        let db_objs = Database::find()
889            .find_also_related(Object)
890            .all(&self.db)
891            .await?;
892        Ok(db_objs
893            .into_iter()
894            .map(|(db, obj)| ObjectModel(db, obj.unwrap(), None).into())
895            .collect())
896    }
897
898    async fn list_schemas(&self) -> MetaResult<Vec<PbSchema>> {
899        let schema_objs = Schema::find()
900            .find_also_related(Object)
901            .all(&self.db)
902            .await?;
903
904        Ok(schema_objs
905            .into_iter()
906            .map(|(schema, obj)| ObjectModel(schema, obj.unwrap(), None).into())
907            .collect())
908    }
909
910    async fn list_users(&self) -> MetaResult<Vec<PbUserInfo>> {
911        let mut user_infos: Vec<PbUserInfo> = User::find()
912            .all(&self.db)
913            .await?
914            .into_iter()
915            .map(Into::into)
916            .collect();
917
918        for user_info in &mut user_infos {
919            user_info.grant_privileges = get_user_privilege(user_info.id as _, &self.db).await?;
920        }
921        Ok(user_infos)
922    }
923
924    /// `list_all_tables` return all tables and internal tables.
925    pub async fn list_all_state_tables(&self) -> MetaResult<Vec<PbTable>> {
926        let table_objs = Table::find()
927            .find_also_related(Object)
928            .all(&self.db)
929            .await?;
930        let streaming_jobs = load_streaming_jobs_by_ids(
931            &self.db,
932            table_objs.iter().map(|(table, _)| table.job_id()),
933        )
934        .await?;
935
936        Ok(table_objs
937            .into_iter()
938            .map(|(table, obj)| {
939                let job_id = table.job_id();
940                let streaming_job = streaming_jobs.get(&job_id).cloned();
941                ObjectModel(table, obj.unwrap(), streaming_job).into()
942            })
943            .collect())
944    }
945
946    /// `list_tables` return all `CREATED` tables, `CREATING` materialized views/ `BACKGROUND` jobs and internal tables that belong to them and sinks.
947    async fn list_tables(&self) -> MetaResult<Vec<PbTable>> {
948        let mut table_objs = Table::find()
949            .find_also_related(Object)
950            .all(&self.db)
951            .await?;
952
953        let all_streaming_jobs: HashMap<JobId, streaming_job::Model> = StreamingJob::find()
954            .all(&self.db)
955            .await?
956            .into_iter()
957            .map(|job| (job.job_id, job))
958            .collect();
959
960        let sink_ids: HashSet<SinkId> = Sink::find()
961            .select_only()
962            .column(sink::Column::SinkId)
963            .into_tuple::<SinkId>()
964            .all(&self.db)
965            .await?
966            .into_iter()
967            .collect();
968
969        let mview_job_ids: HashSet<JobId> = table_objs
970            .iter()
971            .filter_map(|(table, _)| {
972                if table.table_type == TableType::MaterializedView {
973                    Some(table.table_id.as_job_id())
974                } else {
975                    None
976                }
977            })
978            .collect();
979
980        table_objs.retain(|(table, _)| {
981            let job_id = table.job_id();
982
983            if sink_ids.contains(&job_id.as_sink_id()) || mview_job_ids.contains(&job_id) {
984                return true;
985            }
986            if let Some(streaming_job) = all_streaming_jobs.get(&job_id) {
987                return streaming_job.job_status == JobStatus::Created
988                    || (streaming_job.create_type == CreateType::Background);
989            }
990            false
991        });
992
993        let mut tables = Vec::with_capacity(table_objs.len());
994        for (table, obj) in table_objs {
995            let job_id = table.job_id();
996            let streaming_job = all_streaming_jobs.get(&job_id).cloned();
997            let pb_table: PbTable = ObjectModel(table, obj.unwrap(), streaming_job).into();
998            tables.push(pb_table);
999        }
1000        Ok(tables)
1001    }
1002
1003    /// `list_sources` return all sources and `CREATED` ones if contains any streaming jobs.
1004    async fn list_sources(&self) -> MetaResult<Vec<PbSource>> {
1005        let mut source_objs = Source::find()
1006            .find_also_related(Object)
1007            .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
1008            .filter(
1009                streaming_job::Column::JobStatus
1010                    .is_null()
1011                    .or(streaming_job::Column::JobStatus.eq(JobStatus::Created)),
1012            )
1013            .all(&self.db)
1014            .await?;
1015
1016        // filter out inner connector sources that are still under creating.
1017        let created_table_ids: HashSet<TableId> = Table::find()
1018            .select_only()
1019            .column(table::Column::TableId)
1020            .join(JoinType::InnerJoin, table::Relation::Object1.def())
1021            .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
1022            .filter(
1023                table::Column::OptionalAssociatedSourceId
1024                    .is_not_null()
1025                    .and(streaming_job::Column::JobStatus.eq(JobStatus::Created)),
1026            )
1027            .into_tuple()
1028            .all(&self.db)
1029            .await?
1030            .into_iter()
1031            .collect();
1032        source_objs.retain_mut(|(source, _)| {
1033            source.optional_associated_table_id.is_none()
1034                || created_table_ids.contains(&source.optional_associated_table_id.unwrap())
1035        });
1036
1037        Ok(source_objs
1038            .into_iter()
1039            .map(|(source, obj)| ObjectModel(source, obj.unwrap(), None).into())
1040            .collect())
1041    }
1042
1043    /// `list_sinks` return all sinks.
1044    async fn list_sinks(&self) -> MetaResult<Vec<PbSink>> {
1045        let sink_objs = Sink::find()
1046            .find_also_related(Object)
1047            .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
1048            .all(&self.db)
1049            .await?;
1050        let streaming_jobs = load_streaming_jobs_by_ids(
1051            &self.db,
1052            sink_objs.iter().map(|(sink, _)| sink.sink_id.as_job_id()),
1053        )
1054        .await?;
1055
1056        Ok(sink_objs
1057            .into_iter()
1058            .map(|(sink, obj)| {
1059                let streaming_job = streaming_jobs.get(&sink.sink_id.as_job_id()).cloned();
1060                ObjectModel(sink, obj.unwrap(), streaming_job).into()
1061            })
1062            .collect())
1063    }
1064
1065    /// `list_subscriptions` return all `CREATED` subscriptions.
1066    async fn list_subscriptions(&self) -> MetaResult<Vec<PbSubscription>> {
1067        let subscription_objs = Subscription::find()
1068            .find_also_related(Object)
1069            .filter(subscription::Column::SubscriptionState.eq(SubscriptionState::Created as i32))
1070            .all(&self.db)
1071            .await?;
1072
1073        Ok(subscription_objs
1074            .into_iter()
1075            .map(|(subscription, obj)| ObjectModel(subscription, obj.unwrap(), None).into())
1076            .collect())
1077    }
1078
1079    async fn list_views(&self) -> MetaResult<Vec<PbView>> {
1080        let view_objs = View::find().find_also_related(Object).all(&self.db).await?;
1081
1082        Ok(view_objs
1083            .into_iter()
1084            .map(|(view, obj)| ObjectModel(view, obj.unwrap(), None).into())
1085            .collect())
1086    }
1087
1088    /// `list_indexes` return all `CREATED` and `BACKGROUND` indexes.
1089    async fn list_indexes(&self) -> MetaResult<Vec<PbIndex>> {
1090        let index_objs = Index::find()
1091            .find_also_related(Object)
1092            .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
1093            .filter(
1094                streaming_job::Column::JobStatus
1095                    .eq(JobStatus::Created)
1096                    .or(streaming_job::Column::CreateType.eq(CreateType::Background)),
1097            )
1098            .all(&self.db)
1099            .await?;
1100        let streaming_jobs = load_streaming_jobs_by_ids(
1101            &self.db,
1102            index_objs
1103                .iter()
1104                .map(|(index, _)| index.index_id.as_job_id()),
1105        )
1106        .await?;
1107
1108        Ok(index_objs
1109            .into_iter()
1110            .map(|(index, obj)| {
1111                let streaming_job = streaming_jobs.get(&index.index_id.as_job_id()).cloned();
1112                ObjectModel(index, obj.unwrap(), streaming_job).into()
1113            })
1114            .collect())
1115    }
1116
1117    async fn list_connections(&self) -> MetaResult<Vec<PbConnection>> {
1118        let conn_objs = Connection::find()
1119            .find_also_related(Object)
1120            .all(&self.db)
1121            .await?;
1122
1123        Ok(conn_objs
1124            .into_iter()
1125            .map(|(conn, obj)| ObjectModel(conn, obj.unwrap(), None).into())
1126            .collect())
1127    }
1128
1129    pub async fn list_secrets(&self) -> MetaResult<Vec<PbSecret>> {
1130        let secret_objs = Secret::find()
1131            .find_also_related(Object)
1132            .all(&self.db)
1133            .await?;
1134        Ok(secret_objs
1135            .into_iter()
1136            .map(|(secret, obj)| ObjectModel(secret, obj.unwrap(), None).into())
1137            .collect())
1138    }
1139
1140    async fn list_functions(&self) -> MetaResult<Vec<PbFunction>> {
1141        let func_objs = Function::find()
1142            .find_also_related(Object)
1143            .all(&self.db)
1144            .await?;
1145
1146        Ok(func_objs
1147            .into_iter()
1148            .map(|(func, obj)| ObjectModel(func, obj.unwrap(), None).into())
1149            .collect())
1150    }
1151
1152    pub(crate) fn register_finish_notifier(
1153        &mut self,
1154        database_id: DatabaseId,
1155        id: JobId,
1156        sender: Sender<Result<NotificationVersion, String>>,
1157    ) {
1158        self.creating_table_finish_notifier
1159            .entry(database_id)
1160            .or_default()
1161            .entry(id)
1162            .or_default()
1163            .push(sender);
1164    }
1165
1166    pub(crate) async fn streaming_job_is_finished(&mut self, id: JobId) -> MetaResult<bool> {
1167        let status = StreamingJob::find()
1168            .select_only()
1169            .column(streaming_job::Column::JobStatus)
1170            .filter(streaming_job::Column::JobId.eq(id))
1171            .into_tuple::<JobStatus>()
1172            .one(&self.db)
1173            .await?;
1174
1175        status
1176            .map(|status| status == JobStatus::Created)
1177            .ok_or_else(|| {
1178                MetaError::catalog_id_not_found("streaming job", "may have been cancelled/dropped")
1179            })
1180    }
1181
1182    pub(crate) fn notify_finish_failed(&mut self, database_id: Option<DatabaseId>, err: String) {
1183        if let Some(database_id) = database_id {
1184            if let Some(creating_tables) = self.creating_table_finish_notifier.remove(&database_id)
1185            {
1186                for tx in creating_tables.into_values().flatten() {
1187                    let _ = tx.send(Err(err.clone()));
1188                }
1189            }
1190        } else {
1191            for tx in take(&mut self.creating_table_finish_notifier)
1192                .into_values()
1193                .flatten()
1194                .flat_map(|(_, txs)| txs.into_iter())
1195            {
1196                let _ = tx.send(Err(err.clone()));
1197            }
1198        }
1199    }
1200
1201    pub async fn list_time_travel_table_ids(&self) -> MetaResult<Vec<TableId>> {
1202        let table_ids: Vec<TableId> = Table::find()
1203            .select_only()
1204            .filter(table::Column::TableType.is_in(vec![
1205                TableType::Table,
1206                TableType::MaterializedView,
1207                TableType::Index,
1208            ]))
1209            .column(table::Column::TableId)
1210            .into_tuple()
1211            .all(&self.db)
1212            .await?;
1213        Ok(table_ids)
1214    }
1215
1216    /// Since the tables have been dropped from both meta store and streaming jobs, this method removes those table copies.
1217    /// Returns the removed table copies.
1218    pub(crate) fn complete_dropped_tables(
1219        &mut self,
1220        table_ids: impl IntoIterator<Item = TableId>,
1221    ) -> Vec<PbTable> {
1222        table_ids
1223            .into_iter()
1224            .filter_map(|table_id| {
1225                self.dropped_tables.remove(&table_id).map_or_else(
1226                    || {
1227                        tracing::warn!(%table_id, "table not found");
1228                        None
1229                    },
1230                    Some,
1231                )
1232            })
1233            .collect()
1234    }
1235}