risingwave_meta/controller/catalog/
mod.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod alter_op;
16mod create_op;
17mod drop_op;
18mod get_op;
19mod list_op;
20mod test;
21mod util;
22
23use std::collections::{BTreeSet, HashMap, HashSet};
24use std::iter;
25use std::mem::take;
26use std::sync::Arc;
27
28use anyhow::anyhow;
29use itertools::Itertools;
30use risingwave_common::catalog::{
31    DEFAULT_SCHEMA_NAME, FragmentTypeFlag, FragmentTypeMask, SYSTEM_SCHEMAS, TableOption,
32};
33use risingwave_common::current_cluster_version;
34use risingwave_common::id::JobId;
35use risingwave_common::secret::LocalSecretManager;
36use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont_mut;
37use risingwave_connector::source::UPSTREAM_SOURCE_KEY;
38use risingwave_connector::source::cdc::build_cdc_table_id;
39use risingwave_meta_model::object::ObjectType;
40use risingwave_meta_model::prelude::*;
41use risingwave_meta_model::table::TableType;
42use risingwave_meta_model::{
43    ColumnCatalogArray, ConnectionId, CreateType, DatabaseId, FragmentId, I32Array, IndexId,
44    JobStatus, ObjectId, Property, SchemaId, SecretId, SinkFormatDesc, SinkId, SourceId,
45    StreamNode, StreamSourceInfo, StreamingParallelism, SubscriptionId, TableId, TableIdArray,
46    UserId, ViewId, connection, database, fragment, function, index, object, object_dependency,
47    pending_sink_state, schema, secret, sink, source, streaming_job, subscription, table,
48    user_privilege, view,
49};
50use risingwave_pb::catalog::connection::Info as ConnectionInfo;
51use risingwave_pb::catalog::subscription::SubscriptionState;
52use risingwave_pb::catalog::table::PbTableType;
53use risingwave_pb::catalog::{
54    PbComment, PbConnection, PbDatabase, PbFunction, PbIndex, PbSchema, PbSecret, PbSink, PbSource,
55    PbSubscription, PbTable, PbView,
56};
57use risingwave_pb::meta::cancel_creating_jobs_request::PbCreatingJobInfo;
58use risingwave_pb::meta::object::PbObjectInfo;
59use risingwave_pb::meta::subscribe_response::{
60    Info as NotificationInfo, Info, Operation as NotificationOperation, Operation,
61};
62use risingwave_pb::meta::{PbObject, PbObjectGroup};
63use risingwave_pb::stream_plan::stream_node::NodeBody;
64use risingwave_pb::telemetry::PbTelemetryEventStage;
65use risingwave_pb::user::PbUserInfo;
66use sea_orm::ActiveValue::Set;
67use sea_orm::sea_query::{Expr, Query, SimpleExpr};
68use sea_orm::{
69    ActiveModelTrait, ColumnTrait, DatabaseConnection, DatabaseTransaction, EntityTrait,
70    IntoActiveModel, JoinType, PaginatorTrait, QueryFilter, QuerySelect, RelationTrait,
71    SelectColumns, TransactionTrait, Value,
72};
73use tokio::sync::oneshot::Sender;
74use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
75use tracing::info;
76
77use super::utils::{
78    check_subscription_name_duplicate, get_internal_tables_by_id, load_streaming_jobs_by_ids,
79    rename_relation, rename_relation_refer,
80};
81use crate::controller::ObjectModel;
82use crate::controller::catalog::util::update_internal_tables;
83use crate::controller::fragment::FragmentTypeMaskExt;
84use crate::controller::utils::*;
85use crate::manager::{
86    IGNORED_NOTIFICATION_VERSION, MetaSrvEnv, NotificationVersion,
87    get_referred_connection_ids_from_source, get_referred_secret_ids_from_source,
88};
89use crate::rpc::ddl_controller::DropMode;
90use crate::telemetry::{MetaTelemetryJobDesc, report_event};
91use crate::{MetaError, MetaResult};
92
93pub type Catalog = (
94    Vec<PbDatabase>,
95    Vec<PbSchema>,
96    Vec<PbTable>,
97    Vec<PbSource>,
98    Vec<PbSink>,
99    Vec<PbSubscription>,
100    Vec<PbIndex>,
101    Vec<PbView>,
102    Vec<PbFunction>,
103    Vec<PbConnection>,
104    Vec<PbSecret>,
105);
106
107pub type CatalogControllerRef = Arc<CatalogController>;
108
109/// `CatalogController` is the controller for catalog related operations, including database, schema, table, view, etc.
110pub struct CatalogController {
111    pub(crate) env: MetaSrvEnv,
112    pub(crate) inner: RwLock<CatalogControllerInner>,
113}
114
115#[derive(Clone, Default, Debug)]
116pub struct DropTableConnectorContext {
117    // we only apply one drop connector action for one table each time, so no need to vector here
118    pub(crate) to_change_streaming_job_id: JobId,
119    pub(crate) to_remove_state_table_id: TableId,
120    pub(crate) to_remove_source_id: SourceId,
121}
122
123#[derive(Clone, Default, Debug)]
124pub struct ReleaseContext {
125    pub(crate) database_id: DatabaseId,
126    pub(crate) removed_streaming_job_ids: Vec<JobId>,
127    /// Dropped state table list, need to unregister from hummock.
128    pub(crate) removed_state_table_ids: Vec<TableId>,
129
130    /// Dropped secrets, need to remove from secret manager.
131    pub(crate) removed_secret_ids: Vec<SecretId>,
132    /// Dropped sources (when `DROP SOURCE`), need to unregister from source manager.
133    pub(crate) removed_source_ids: Vec<SourceId>,
134    /// Dropped Source fragments (when `DROP MATERIALIZED VIEW` referencing sources),
135    /// need to unregister from source manager.
136    pub(crate) removed_source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
137
138    pub(crate) removed_fragments: HashSet<FragmentId>,
139
140    /// Removed sink fragment by target fragment.
141    pub(crate) removed_sink_fragment_by_targets: HashMap<FragmentId, Vec<FragmentId>>,
142
143    /// Dropped iceberg table sinks
144    pub(crate) removed_iceberg_table_sinks: Vec<PbSink>,
145}
146
147#[derive(Default)]
148pub(crate) struct CleanedDirtyStreamingJobs {
149    pub(crate) streaming_job_ids: Vec<JobId>,
150    /// Only populated for per-database recovery.
151    pub(crate) dropped_table_ids: Vec<TableId>,
152    pub(crate) source_ids: Vec<SourceId>,
153}
154
155impl CatalogController {
156    pub async fn new(env: MetaSrvEnv) -> MetaResult<Self> {
157        let meta_store = env.meta_store();
158        let catalog_controller = Self {
159            env,
160            inner: RwLock::new(CatalogControllerInner {
161                db: meta_store.conn,
162                creating_table_finish_notifier: HashMap::new(),
163                dropped_tables: HashMap::new(),
164            }),
165        };
166
167        catalog_controller.init().await?;
168        Ok(catalog_controller)
169    }
170
171    /// Used in `NotificationService::subscribe`.
172    /// Need to pay attention to the order of acquiring locks to prevent deadlock problems.
173    pub async fn get_inner_read_guard(&self) -> RwLockReadGuard<'_, CatalogControllerInner> {
174        self.inner.read().await
175    }
176
177    pub async fn get_inner_write_guard(&self) -> RwLockWriteGuard<'_, CatalogControllerInner> {
178        self.inner.write().await
179    }
180}
181
182pub struct CatalogControllerInner {
183    pub(crate) db: DatabaseConnection,
184    /// Registered finish notifiers for creating tables.
185    ///
186    /// `DdlController` will update this map, and pass the `tx` side to `CatalogController`.
187    /// On notifying, we can remove the entry from this map.
188    #[expect(clippy::type_complexity)]
189    pub creating_table_finish_notifier:
190        HashMap<DatabaseId, HashMap<JobId, Vec<Sender<Result<NotificationVersion, String>>>>>,
191    /// Tables have been dropped from the meta store, but the corresponding barrier remains unfinished.
192    pub dropped_tables: HashMap<TableId, PbTable>,
193}
194
195impl CatalogController {
196    pub(crate) async fn notify_frontend(
197        &self,
198        operation: NotificationOperation,
199        info: NotificationInfo,
200    ) -> NotificationVersion {
201        self.env
202            .notification_manager()
203            .notify_frontend(operation, info)
204            .await
205    }
206
207    pub(crate) async fn notify_frontend_relation_info(
208        &self,
209        operation: NotificationOperation,
210        relation_info: PbObjectInfo,
211    ) -> NotificationVersion {
212        self.env
213            .notification_manager()
214            .notify_frontend_object_info(operation, relation_info)
215            .await
216    }
217
218    /// Trivially advance the notification version and notify to frontend,
219    /// return the notification version for frontend to wait for.
220    ///
221    /// Cannot simply return the current version, because the current version may not be sent
222    /// to frontend, and the frontend may endlessly wait for this version, until a frontend
223    /// related notification is sent.
224    pub(crate) async fn notify_frontend_trivial(&self) -> NotificationVersion {
225        self.env
226            .notification_manager()
227            .notify_frontend(
228                NotificationOperation::Update,
229                NotificationInfo::ObjectGroup(PbObjectGroup {
230                    objects: vec![],
231                    dependencies: vec![],
232                }),
233            )
234            .await
235    }
236}
237
238impl CatalogController {
239    pub async fn finish_create_subscription_catalog(
240        &self,
241        subscription_id: SubscriptionId,
242    ) -> MetaResult<()> {
243        let inner = self.inner.write().await;
244        let txn = inner.db.begin().await?;
245
246        // update `created_at` as now() and `created_at_cluster_version` as current cluster version.
247        let res = Object::update_many()
248            .col_expr(object::Column::CreatedAt, Expr::current_timestamp().into())
249            .col_expr(
250                object::Column::CreatedAtClusterVersion,
251                current_cluster_version().into(),
252            )
253            .filter(object::Column::Oid.eq(subscription_id))
254            .exec(&txn)
255            .await?;
256        if res.rows_affected == 0 {
257            return Err(MetaError::catalog_id_not_found(
258                "subscription",
259                subscription_id,
260            ));
261        }
262
263        // mark the target subscription as `Create`.
264        let job = subscription::ActiveModel {
265            subscription_id: Set(subscription_id),
266            subscription_state: Set(SubscriptionState::Created.into()),
267            ..Default::default()
268        };
269        Subscription::update(job).exec(&txn).await?;
270
271        let _ = grant_default_privileges_automatically(&txn, subscription_id).await?;
272
273        txn.commit().await?;
274
275        Ok(())
276    }
277
278    pub async fn notify_create_subscription(
279        &self,
280        subscription_id: SubscriptionId,
281    ) -> MetaResult<NotificationVersion> {
282        let inner = self.inner.read().await;
283        let txn = inner.db.begin().await?;
284        let (subscription, obj) = Subscription::find_by_id(subscription_id)
285            .find_also_related(Object)
286            .filter(subscription::Column::SubscriptionState.eq(SubscriptionState::Created as i32))
287            .one(&txn)
288            .await?
289            .ok_or_else(|| MetaError::catalog_id_not_found("subscription", subscription_id))?;
290
291        let dependencies =
292            list_object_dependencies_by_object_id(&txn, subscription_id.into()).await?;
293        txn.commit().await?;
294
295        let mut version = self
296            .notify_frontend(
297                NotificationOperation::Add,
298                NotificationInfo::ObjectGroup(PbObjectGroup {
299                    objects: vec![PbObject {
300                        object_info: PbObjectInfo::Subscription(
301                            ObjectModel(subscription, obj.unwrap(), None).into(),
302                        )
303                        .into(),
304                    }],
305                    dependencies,
306                }),
307            )
308            .await;
309
310        // notify default privileges about the new subscription
311        let updated_user_ids: Vec<UserId> = UserPrivilege::find()
312            .select_only()
313            .distinct()
314            .column(user_privilege::Column::UserId)
315            .filter(user_privilege::Column::Oid.eq(subscription_id.as_object_id()))
316            .into_tuple()
317            .all(&inner.db)
318            .await?;
319
320        if !updated_user_ids.is_empty() {
321            let updated_user_infos = list_user_info_by_ids(updated_user_ids, &inner.db).await?;
322            version = self.notify_users_update(updated_user_infos).await;
323        }
324
325        Ok(version)
326    }
327
328    // for telemetry
329    pub async fn get_connector_usage(&self) -> MetaResult<jsonbb::Value> {
330        // get connector usage by source/sink
331        // the expect format is like:
332        // {
333        //     "source": [{
334        //         "$source_id": {
335        //             "connector": "kafka",
336        //             "format": "plain",
337        //             "encode": "json"
338        //         },
339        //     }],
340        //     "sink": [{
341        //         "$sink_id": {
342        //             "connector": "pulsar",
343        //             "format": "upsert",
344        //             "encode": "avro"
345        //         },
346        //     }],
347        // }
348
349        let inner = self.inner.read().await;
350        let source_props_and_info: Vec<(i32, Property, Option<StreamSourceInfo>)> = Source::find()
351            .select_only()
352            .column(source::Column::SourceId)
353            .column(source::Column::WithProperties)
354            .column(source::Column::SourceInfo)
355            .into_tuple()
356            .all(&inner.db)
357            .await?;
358        let sink_props_and_info: Vec<(i32, Property, Option<SinkFormatDesc>)> = Sink::find()
359            .select_only()
360            .column(sink::Column::SinkId)
361            .column(sink::Column::Properties)
362            .column(sink::Column::SinkFormatDesc)
363            .into_tuple()
364            .all(&inner.db)
365            .await?;
366        drop(inner);
367
368        let get_connector_from_property = |property: &Property| -> String {
369            property
370                .0
371                .get(UPSTREAM_SOURCE_KEY)
372                .cloned()
373                .unwrap_or_default()
374        };
375
376        let source_report: Vec<jsonbb::Value> = source_props_and_info
377            .iter()
378            .map(|(oid, property, info)| {
379                let connector_name = get_connector_from_property(property);
380                let mut format = None;
381                let mut encode = None;
382                if let Some(info) = info {
383                    let pb_info = info.to_protobuf();
384                    format = Some(pb_info.format().as_str_name());
385                    encode = Some(pb_info.row_encode().as_str_name());
386                }
387                jsonbb::json!({
388                    oid.to_string(): {
389                        "connector": connector_name,
390                        "format": format,
391                        "encode": encode,
392                    },
393                })
394            })
395            .collect_vec();
396
397        let sink_report: Vec<jsonbb::Value> = sink_props_and_info
398            .iter()
399            .map(|(oid, property, info)| {
400                let connector_name = get_connector_from_property(property);
401                let mut format = None;
402                let mut encode = None;
403                if let Some(info) = info {
404                    let pb_info = info.to_protobuf();
405                    format = Some(pb_info.format().as_str_name());
406                    encode = Some(pb_info.encode().as_str_name());
407                }
408                jsonbb::json!({
409                    oid.to_string(): {
410                        "connector": connector_name,
411                        "format": format,
412                        "encode": encode,
413                    },
414                })
415            })
416            .collect_vec();
417
418        Ok(jsonbb::json!({
419                "source": source_report,
420                "sink": sink_report,
421        }))
422    }
423
424    pub async fn clean_dirty_subscription(
425        &self,
426        database_id: Option<DatabaseId>,
427    ) -> MetaResult<()> {
428        let inner = self.inner.write().await;
429        let txn = inner.db.begin().await?;
430        let filter_condition = object::Column::ObjType.eq(ObjectType::Subscription).and(
431            object::Column::Oid.not_in_subquery(
432                Query::select()
433                    .column(subscription::Column::SubscriptionId)
434                    .from(Subscription)
435                    .and_where(
436                        subscription::Column::SubscriptionState
437                            .eq(SubscriptionState::Created as i32),
438                    )
439                    .take(),
440            ),
441        );
442
443        let filter_condition = if let Some(database_id) = database_id {
444            filter_condition.and(object::Column::DatabaseId.eq(database_id))
445        } else {
446            filter_condition
447        };
448        Object::delete_many()
449            .filter(filter_condition)
450            .exec(&txn)
451            .await?;
452        txn.commit().await?;
453        // We don't need to notify the frontend, because the Init subscription is not send to frontend.
454        Ok(())
455    }
456
457    /// `clean_dirty_creating_jobs` cleans up creating jobs that are creating in Foreground mode or in Initial status.
458    pub(crate) async fn clean_dirty_creating_jobs(
459        &self,
460        database_id: Option<DatabaseId>,
461    ) -> MetaResult<CleanedDirtyStreamingJobs> {
462        let mut inner = self.inner.write().await;
463        let txn = inner.db.begin().await?;
464
465        let filter_condition = streaming_job::Column::JobStatus.eq(JobStatus::Initial).or(
466            streaming_job::Column::JobStatus
467                .eq(JobStatus::Creating)
468                .and(streaming_job::Column::CreateType.eq(CreateType::Foreground)),
469        );
470
471        let filter_condition = if let Some(database_id) = database_id {
472            filter_condition.and(object::Column::DatabaseId.eq(database_id))
473        } else {
474            filter_condition
475        };
476
477        let mut dirty_job_objs: Vec<PartialObject> = streaming_job::Entity::find()
478            .select_only()
479            .columns([
480                object::Column::Oid,
481                object::Column::ObjType,
482                object::Column::SchemaId,
483                object::Column::DatabaseId,
484            ])
485            .join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
486            .filter(filter_condition)
487            .into_partial_model()
488            .all(&txn)
489            .await?;
490
491        // Check if there are any pending iceberg table jobs.
492        let dirty_iceberg_jobs = find_dirty_iceberg_table_jobs(&txn, database_id).await?;
493        if !dirty_iceberg_jobs.is_empty() {
494            dirty_job_objs.extend(dirty_iceberg_jobs);
495        }
496
497        Self::clean_dirty_sink_downstreams(&txn).await?;
498
499        if dirty_job_objs.is_empty() {
500            return Ok(CleanedDirtyStreamingJobs::default());
501        }
502
503        self.log_cleaned_dirty_jobs(&dirty_job_objs, &txn).await?;
504
505        let dirty_job_ids = dirty_job_objs
506            .iter()
507            .map(|obj| obj.oid.as_job_id())
508            .collect_vec();
509        let dirty_job_table_ids = dirty_job_ids
510            .iter()
511            .map(|job_id| job_id.as_mv_table_id())
512            .collect_vec();
513
514        // Filter out dummy objs for replacement.
515        // todo: we'd better introduce a new dummy object type for replacement.
516        let all_dirty_table_ids = dirty_job_objs
517            .iter()
518            .filter(|obj| obj.obj_type == ObjectType::Table)
519            .map(|obj| obj.oid)
520            .collect_vec();
521        let dirty_table_type_map: HashMap<ObjectId, TableType> = Table::find()
522            .select_only()
523            .column(table::Column::TableId)
524            .column(table::Column::TableType)
525            .filter(table::Column::TableId.is_in(all_dirty_table_ids))
526            .into_tuple::<(ObjectId, TableType)>()
527            .all(&txn)
528            .await?
529            .into_iter()
530            .collect();
531
532        let dirty_background_jobs: HashSet<JobId> = streaming_job::Entity::find()
533            .select_only()
534            .column(streaming_job::Column::JobId)
535            .filter(
536                streaming_job::Column::JobId
537                    .is_in(dirty_job_ids.iter().copied())
538                    .and(streaming_job::Column::CreateType.eq(CreateType::Background)),
539            )
540            .into_tuple()
541            .all(&txn)
542            .await?
543            .into_iter()
544            .collect();
545
546        // notify delete for failed materialized views and background jobs.
547        let to_notify_objs = dirty_job_objs
548            .iter()
549            .filter(|obj| {
550                matches!(
551                    dirty_table_type_map.get(&obj.oid),
552                    Some(TableType::MaterializedView)
553                ) || dirty_background_jobs.contains(&obj.oid.as_job_id())
554            })
555            .cloned()
556            .collect_vec();
557
558        // The source ids for dirty tables with connector.
559        // FIXME: we should also clean dirty sources.
560        let dirty_associated_source_ids: Vec<SourceId> = Table::find()
561            .select_only()
562            .column(table::Column::OptionalAssociatedSourceId)
563            .filter(
564                table::Column::TableId
565                    .is_in(dirty_job_table_ids)
566                    .and(table::Column::OptionalAssociatedSourceId.is_not_null()),
567            )
568            .into_tuple()
569            .all(&txn)
570            .await?;
571
572        let dirty_internal_state_table_ids: Vec<TableId> = Table::find()
573            .select_only()
574            .column(table::Column::TableId)
575            .filter(table::Column::BelongsToJobId.is_in(dirty_job_ids.iter().copied()))
576            .into_tuple()
577            .all(&txn)
578            .await?;
579        let dirty_state_table_ids = dirty_job_ids
580            .iter()
581            .map(|job_id| job_id.as_mv_table_id())
582            .chain(dirty_internal_state_table_ids.iter().copied())
583            .collect_vec();
584
585        let dirty_internal_table_objs = Object::find()
586            .select_only()
587            .columns([
588                object::Column::Oid,
589                object::Column::ObjType,
590                object::Column::SchemaId,
591                object::Column::DatabaseId,
592            ])
593            .join(JoinType::InnerJoin, object::Relation::Table.def())
594            .filter(table::Column::BelongsToJobId.is_in(to_notify_objs.iter().map(|obj| obj.oid)))
595            .into_partial_model()
596            .all(&txn)
597            .await?;
598
599        let to_delete_objs: HashSet<ObjectId> = dirty_job_ids
600            .iter()
601            .map(|job_id| job_id.as_object_id())
602            .chain(
603                dirty_state_table_ids
604                    .iter()
605                    .copied()
606                    .map(|table_id| table_id.as_object_id()),
607            )
608            .chain(
609                dirty_associated_source_ids
610                    .iter()
611                    .map(|source_id| source_id.as_object_id()),
612            )
613            .collect();
614
615        // Per-database recovery does not run the global Hummock purge. Keep table catalogs so the
616        // caller can reuse the normal dropped-table cleanup path after the catalog rows are deleted.
617        let dropped_tables = if database_id.is_some() {
618            Table::find()
619                .find_also_related(Object)
620                .filter(table::Column::TableId.is_in(dirty_state_table_ids.iter().copied()))
621                .all(&txn)
622                .await?
623                .into_iter()
624                .map(|(table, obj)| PbTable::from(ObjectModel(table, obj.unwrap(), None)))
625                .collect_vec()
626        } else {
627            vec![]
628        };
629        let dropped_table_ids = dropped_tables.iter().map(|table| table.id).collect_vec();
630
631        let res = Object::delete_many()
632            .filter(object::Column::Oid.is_in(to_delete_objs))
633            .exec(&txn)
634            .await?;
635        assert!(res.rows_affected > 0);
636
637        txn.commit().await?;
638        inner
639            .dropped_tables
640            .extend(dropped_tables.into_iter().map(|t| (t.id, t)));
641
642        let object_group = build_object_group_for_delete(
643            to_notify_objs
644                .into_iter()
645                .chain(dirty_internal_table_objs.into_iter())
646                .collect_vec(),
647        );
648
649        let _version = self
650            .notify_frontend(NotificationOperation::Delete, object_group)
651            .await;
652
653        Ok(CleanedDirtyStreamingJobs {
654            streaming_job_ids: dirty_job_ids,
655            dropped_table_ids,
656            source_ids: dirty_associated_source_ids,
657        })
658    }
659
660    pub async fn comment_on(&self, comment: PbComment) -> MetaResult<NotificationVersion> {
661        let inner = self.inner.write().await;
662        let txn = inner.db.begin().await?;
663        ensure_object_id(ObjectType::Database, comment.database_id, &txn).await?;
664        ensure_object_id(ObjectType::Schema, comment.schema_id, &txn).await?;
665        let (table_obj, streaming_job) = Object::find_by_id(comment.table_id)
666            .find_also_related(StreamingJob)
667            .one(&txn)
668            .await?
669            .ok_or_else(|| MetaError::catalog_id_not_found("object", comment.table_id))?;
670
671        let table = if let Some(col_idx) = comment.column_index {
672            let columns: ColumnCatalogArray = Table::find_by_id(comment.table_id)
673                .select_only()
674                .column(table::Column::Columns)
675                .into_tuple()
676                .one(&txn)
677                .await?
678                .ok_or_else(|| MetaError::catalog_id_not_found("table", comment.table_id))?;
679            let mut pb_columns = columns.to_protobuf();
680
681            let column = pb_columns
682                .get_mut(col_idx as usize)
683                .ok_or_else(|| MetaError::catalog_id_not_found("column", col_idx))?;
684            let column_desc = column.column_desc.as_mut().ok_or_else(|| {
685                anyhow!(
686                    "column desc at index {} for table id {} not found",
687                    col_idx,
688                    comment.table_id
689                )
690            })?;
691            column_desc.description = comment.description;
692            table::ActiveModel {
693                table_id: Set(comment.table_id),
694                columns: Set(pb_columns.into()),
695                ..Default::default()
696            }
697            .update(&txn)
698            .await?
699        } else {
700            table::ActiveModel {
701                table_id: Set(comment.table_id),
702                description: Set(comment.description),
703                ..Default::default()
704            }
705            .update(&txn)
706            .await?
707        };
708        txn.commit().await?;
709
710        let version = self
711            .notify_frontend_relation_info(
712                NotificationOperation::Update,
713                PbObjectInfo::Table(ObjectModel(table, table_obj, streaming_job).into()),
714            )
715            .await;
716
717        Ok(version)
718    }
719
720    async fn notify_hummock_dropped_tables(&self, tables: Vec<PbTable>) {
721        if tables.is_empty() {
722            return;
723        }
724        let objects = tables
725            .into_iter()
726            .map(|t| PbObject {
727                object_info: Some(PbObjectInfo::Table(t)),
728            })
729            .collect();
730        let group = NotificationInfo::ObjectGroup(PbObjectGroup {
731            objects,
732            dependencies: vec![],
733        });
734        self.env
735            .notification_manager()
736            .notify_hummock(NotificationOperation::Delete, group.clone())
737            .await;
738        self.env
739            .notification_manager()
740            .notify_compactor(NotificationOperation::Delete, group)
741            .await;
742    }
743
744    pub async fn complete_dropped_tables(&self, table_ids: impl IntoIterator<Item = TableId>) {
745        let mut inner = self.inner.write().await;
746        let tables = inner.complete_dropped_tables(table_ids);
747        self.notify_hummock_dropped_tables(tables).await;
748    }
749
750    pub async fn cleanup_dropped_tables(&self) {
751        let mut inner = self.inner.write().await;
752        let tables = inner.dropped_tables.drain().map(|(_, t)| t).collect();
753        self.notify_hummock_dropped_tables(tables).await;
754    }
755
756    pub async fn stats(&self) -> MetaResult<CatalogStats> {
757        let inner = self.inner.read().await;
758
759        let mut table_num_map: HashMap<_, _> = Table::find()
760            .select_only()
761            .column(table::Column::TableType)
762            .column_as(table::Column::TableId.count(), "num")
763            .group_by(table::Column::TableType)
764            .having(table::Column::TableType.ne(TableType::Internal))
765            .into_tuple::<(TableType, i64)>()
766            .all(&inner.db)
767            .await?
768            .into_iter()
769            .map(|(table_type, num)| (table_type, num as u64))
770            .collect();
771
772        let source_num = Source::find().count(&inner.db).await?;
773        let sink_num = Sink::find().count(&inner.db).await?;
774        let function_num = Function::find().count(&inner.db).await?;
775        let streaming_job_num = StreamingJob::find().count(&inner.db).await?;
776
777        let actor_num = {
778            let guard = self.env.shared_actor_info.read_guard();
779            guard
780                .iter_over_fragments()
781                .map(|(_, fragment)| fragment.actors.len() as u64)
782                .sum::<u64>()
783        };
784        let database_num = Database::find().count(&inner.db).await?;
785
786        Ok(CatalogStats {
787            table_num: table_num_map.remove(&TableType::Table).unwrap_or(0),
788            mview_num: table_num_map
789                .remove(&TableType::MaterializedView)
790                .unwrap_or(0),
791            index_num: table_num_map.remove(&TableType::Index).unwrap_or(0),
792            source_num,
793            sink_num,
794            function_num,
795            streaming_job_num,
796            actor_num,
797            database_num,
798        })
799    }
800
801    pub async fn fetch_sink_with_state_table_ids(
802        &self,
803        sink_ids: HashSet<SinkId>,
804    ) -> MetaResult<HashMap<SinkId, Vec<TableId>>> {
805        let inner = self.inner.read().await;
806
807        let query = Fragment::find()
808            .select_only()
809            .columns([fragment::Column::JobId, fragment::Column::StateTableIds])
810            .filter(
811                fragment::Column::JobId
812                    .is_in(sink_ids)
813                    .and(FragmentTypeMask::intersects(FragmentTypeFlag::Sink)),
814            );
815
816        let rows: Vec<(JobId, TableIdArray)> = query.into_tuple().all(&inner.db).await?;
817
818        debug_assert!(rows.iter().map(|(job_id, _)| job_id).all_unique());
819
820        let result = rows
821            .into_iter()
822            .map(|(job_id, table_id_array)| (job_id.as_sink_id(), table_id_array.0))
823            .collect::<HashMap<_, _>>();
824
825        Ok(result)
826    }
827
828    pub async fn list_all_pending_sinks(
829        &self,
830        database_id: Option<DatabaseId>,
831    ) -> MetaResult<HashSet<SinkId>> {
832        let inner = self.inner.read().await;
833
834        let mut query = pending_sink_state::Entity::find()
835            .select_only()
836            .columns([pending_sink_state::Column::SinkId])
837            .filter(
838                pending_sink_state::Column::SinkState.eq(pending_sink_state::SinkState::Pending),
839            )
840            .distinct();
841
842        if let Some(db_id) = database_id {
843            query = query
844                .join(
845                    JoinType::InnerJoin,
846                    pending_sink_state::Relation::Object.def(),
847                )
848                .filter(object::Column::DatabaseId.eq(db_id));
849        }
850
851        let result: Vec<SinkId> = query.into_tuple().all(&inner.db).await?;
852
853        Ok(result.into_iter().collect())
854    }
855
856    pub async fn abort_pending_sink_epochs(
857        &self,
858        sink_committed_epoch: HashMap<SinkId, u64>,
859    ) -> MetaResult<()> {
860        let inner = self.inner.write().await;
861        let txn = inner.db.begin().await?;
862
863        for (sink_id, committed_epoch) in sink_committed_epoch {
864            pending_sink_state::Entity::update_many()
865                .col_expr(
866                    pending_sink_state::Column::SinkState,
867                    Expr::value(pending_sink_state::SinkState::Aborted),
868                )
869                .filter(
870                    pending_sink_state::Column::SinkId
871                        .eq(sink_id)
872                        .and(pending_sink_state::Column::Epoch.gt(committed_epoch as i64)),
873                )
874                .exec(&txn)
875                .await?;
876        }
877
878        txn.commit().await?;
879        Ok(())
880    }
881}
882
883/// `CatalogStats` is a struct to store the statistics of all catalogs.
884pub struct CatalogStats {
885    pub table_num: u64,
886    pub mview_num: u64,
887    pub index_num: u64,
888    pub source_num: u64,
889    pub sink_num: u64,
890    pub function_num: u64,
891    pub streaming_job_num: u64,
892    pub actor_num: u64,
893    pub database_num: u64,
894}
895
896impl CatalogControllerInner {
897    pub async fn snapshot(&self) -> MetaResult<(Catalog, Vec<PbUserInfo>)> {
898        let databases = self.list_databases().await?;
899        let schemas = self.list_schemas().await?;
900        let tables = self.list_tables().await?;
901        let sources = self.list_sources().await?;
902        let sinks = self.list_sinks().await?;
903        let subscriptions = self.list_subscriptions().await?;
904        let indexes = self.list_indexes().await?;
905        let views = self.list_views().await?;
906        let functions = self.list_functions().await?;
907        let connections = self.list_connections().await?;
908        let secrets = self.list_secrets().await?;
909
910        let users = self.list_users().await?;
911
912        Ok((
913            (
914                databases,
915                schemas,
916                tables,
917                sources,
918                sinks,
919                subscriptions,
920                indexes,
921                views,
922                functions,
923                connections,
924                secrets,
925            ),
926            users,
927        ))
928    }
929
930    async fn list_databases(&self) -> MetaResult<Vec<PbDatabase>> {
931        let db_objs = Database::find()
932            .find_also_related(Object)
933            .all(&self.db)
934            .await?;
935        Ok(db_objs
936            .into_iter()
937            .map(|(db, obj)| ObjectModel(db, obj.unwrap(), None).into())
938            .collect())
939    }
940
941    async fn list_schemas(&self) -> MetaResult<Vec<PbSchema>> {
942        let schema_objs = Schema::find()
943            .find_also_related(Object)
944            .all(&self.db)
945            .await?;
946
947        Ok(schema_objs
948            .into_iter()
949            .map(|(schema, obj)| ObjectModel(schema, obj.unwrap(), None).into())
950            .collect())
951    }
952
953    async fn list_users(&self) -> MetaResult<Vec<PbUserInfo>> {
954        let mut user_infos: Vec<PbUserInfo> = User::find()
955            .all(&self.db)
956            .await?
957            .into_iter()
958            .map(Into::into)
959            .collect();
960
961        for user_info in &mut user_infos {
962            user_info.grant_privileges = get_user_privilege(user_info.id as _, &self.db).await?;
963        }
964        Ok(user_infos)
965    }
966
967    /// `list_all_tables` return all tables and internal tables.
968    pub async fn list_all_state_tables(&self) -> MetaResult<Vec<PbTable>> {
969        let table_objs = Table::find()
970            .find_also_related(Object)
971            .all(&self.db)
972            .await?;
973        let streaming_jobs = load_streaming_jobs_by_ids(
974            &self.db,
975            table_objs.iter().map(|(table, _)| table.job_id()),
976        )
977        .await?;
978
979        Ok(table_objs
980            .into_iter()
981            .map(|(table, obj)| {
982                let job_id = table.job_id();
983                let streaming_job = streaming_jobs.get(&job_id).cloned();
984                ObjectModel(table, obj.unwrap(), streaming_job).into()
985            })
986            .collect())
987    }
988
989    /// `list_tables` return all `CREATED` tables, `CREATING` materialized views/ `BACKGROUND` jobs and internal tables that belong to them and sinks.
990    async fn list_tables(&self) -> MetaResult<Vec<PbTable>> {
991        let mut table_objs = Table::find()
992            .find_also_related(Object)
993            .all(&self.db)
994            .await?;
995
996        let all_streaming_jobs: HashMap<JobId, streaming_job::Model> = StreamingJob::find()
997            .all(&self.db)
998            .await?
999            .into_iter()
1000            .map(|job| (job.job_id, job))
1001            .collect();
1002
1003        let sink_ids: HashSet<SinkId> = Sink::find()
1004            .select_only()
1005            .column(sink::Column::SinkId)
1006            .into_tuple::<SinkId>()
1007            .all(&self.db)
1008            .await?
1009            .into_iter()
1010            .collect();
1011
1012        let mview_job_ids: HashSet<JobId> = table_objs
1013            .iter()
1014            .filter_map(|(table, _)| {
1015                if table.table_type == TableType::MaterializedView {
1016                    Some(table.table_id.as_job_id())
1017                } else {
1018                    None
1019                }
1020            })
1021            .collect();
1022
1023        table_objs.retain(|(table, _)| {
1024            let job_id = table.job_id();
1025
1026            if sink_ids.contains(&job_id.as_sink_id()) || mview_job_ids.contains(&job_id) {
1027                return true;
1028            }
1029            if let Some(streaming_job) = all_streaming_jobs.get(&job_id) {
1030                return streaming_job.job_status == JobStatus::Created
1031                    || (streaming_job.create_type == CreateType::Background);
1032            }
1033            false
1034        });
1035
1036        let mut tables = Vec::with_capacity(table_objs.len());
1037        for (table, obj) in table_objs {
1038            let job_id = table.job_id();
1039            let streaming_job = all_streaming_jobs.get(&job_id).cloned();
1040            let pb_table: PbTable = ObjectModel(table, obj.unwrap(), streaming_job).into();
1041            tables.push(pb_table);
1042        }
1043        Ok(tables)
1044    }
1045
1046    /// `list_sources` return all sources and `CREATED` ones if contains any streaming jobs.
1047    async fn list_sources(&self) -> MetaResult<Vec<PbSource>> {
1048        let mut source_objs = Source::find()
1049            .find_also_related(Object)
1050            .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
1051            .filter(
1052                streaming_job::Column::JobStatus
1053                    .is_null()
1054                    .or(streaming_job::Column::JobStatus.eq(JobStatus::Created)),
1055            )
1056            .all(&self.db)
1057            .await?;
1058
1059        // filter out inner connector sources that are still under creating.
1060        let created_table_ids: HashSet<TableId> = Table::find()
1061            .select_only()
1062            .column(table::Column::TableId)
1063            .join(JoinType::InnerJoin, table::Relation::Object1.def())
1064            .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
1065            .filter(
1066                table::Column::OptionalAssociatedSourceId
1067                    .is_not_null()
1068                    .and(streaming_job::Column::JobStatus.eq(JobStatus::Created)),
1069            )
1070            .into_tuple()
1071            .all(&self.db)
1072            .await?
1073            .into_iter()
1074            .collect();
1075        source_objs.retain_mut(|(source, _)| {
1076            source.optional_associated_table_id.is_none()
1077                || created_table_ids.contains(&source.optional_associated_table_id.unwrap())
1078        });
1079
1080        Ok(source_objs
1081            .into_iter()
1082            .map(|(source, obj)| ObjectModel(source, obj.unwrap(), None).into())
1083            .collect())
1084    }
1085
1086    /// `list_sinks` return all sinks.
1087    async fn list_sinks(&self) -> MetaResult<Vec<PbSink>> {
1088        let sink_objs = Sink::find()
1089            .find_also_related(Object)
1090            .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
1091            .all(&self.db)
1092            .await?;
1093        let streaming_jobs = load_streaming_jobs_by_ids(
1094            &self.db,
1095            sink_objs.iter().map(|(sink, _)| sink.sink_id.as_job_id()),
1096        )
1097        .await?;
1098
1099        Ok(sink_objs
1100            .into_iter()
1101            .map(|(sink, obj)| {
1102                let streaming_job = streaming_jobs.get(&sink.sink_id.as_job_id()).cloned();
1103                ObjectModel(sink, obj.unwrap(), streaming_job).into()
1104            })
1105            .collect())
1106    }
1107
1108    /// `list_subscriptions` return all `CREATED` subscriptions.
1109    async fn list_subscriptions(&self) -> MetaResult<Vec<PbSubscription>> {
1110        let subscription_objs = Subscription::find()
1111            .find_also_related(Object)
1112            .filter(subscription::Column::SubscriptionState.eq(SubscriptionState::Created as i32))
1113            .all(&self.db)
1114            .await?;
1115
1116        Ok(subscription_objs
1117            .into_iter()
1118            .map(|(subscription, obj)| ObjectModel(subscription, obj.unwrap(), None).into())
1119            .collect())
1120    }
1121
1122    async fn list_views(&self) -> MetaResult<Vec<PbView>> {
1123        let view_objs = View::find().find_also_related(Object).all(&self.db).await?;
1124
1125        Ok(view_objs
1126            .into_iter()
1127            .map(|(view, obj)| ObjectModel(view, obj.unwrap(), None).into())
1128            .collect())
1129    }
1130
1131    /// `list_indexes` return all `CREATED` and `BACKGROUND` indexes.
1132    async fn list_indexes(&self) -> MetaResult<Vec<PbIndex>> {
1133        let index_objs = Index::find()
1134            .find_also_related(Object)
1135            .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
1136            .filter(
1137                streaming_job::Column::JobStatus
1138                    .eq(JobStatus::Created)
1139                    .or(streaming_job::Column::CreateType.eq(CreateType::Background)),
1140            )
1141            .all(&self.db)
1142            .await?;
1143        let streaming_jobs = load_streaming_jobs_by_ids(
1144            &self.db,
1145            index_objs
1146                .iter()
1147                .map(|(index, _)| index.index_id.as_job_id()),
1148        )
1149        .await?;
1150
1151        Ok(index_objs
1152            .into_iter()
1153            .map(|(index, obj)| {
1154                let streaming_job = streaming_jobs.get(&index.index_id.as_job_id()).cloned();
1155                ObjectModel(index, obj.unwrap(), streaming_job).into()
1156            })
1157            .collect())
1158    }
1159
1160    async fn list_connections(&self) -> MetaResult<Vec<PbConnection>> {
1161        let conn_objs = Connection::find()
1162            .find_also_related(Object)
1163            .all(&self.db)
1164            .await?;
1165
1166        Ok(conn_objs
1167            .into_iter()
1168            .map(|(conn, obj)| ObjectModel(conn, obj.unwrap(), None).into())
1169            .collect())
1170    }
1171
1172    pub async fn list_secrets(&self) -> MetaResult<Vec<PbSecret>> {
1173        let secret_objs = Secret::find()
1174            .find_also_related(Object)
1175            .all(&self.db)
1176            .await?;
1177        Ok(secret_objs
1178            .into_iter()
1179            .map(|(secret, obj)| ObjectModel(secret, obj.unwrap(), None).into())
1180            .collect())
1181    }
1182
1183    async fn list_functions(&self) -> MetaResult<Vec<PbFunction>> {
1184        let func_objs = Function::find()
1185            .find_also_related(Object)
1186            .all(&self.db)
1187            .await?;
1188
1189        Ok(func_objs
1190            .into_iter()
1191            .map(|(func, obj)| ObjectModel(func, obj.unwrap(), None).into())
1192            .collect())
1193    }
1194
1195    pub(crate) fn register_finish_notifier(
1196        &mut self,
1197        database_id: DatabaseId,
1198        id: JobId,
1199        sender: Sender<Result<NotificationVersion, String>>,
1200    ) {
1201        self.creating_table_finish_notifier
1202            .entry(database_id)
1203            .or_default()
1204            .entry(id)
1205            .or_default()
1206            .push(sender);
1207    }
1208
1209    pub(crate) async fn streaming_job_is_finished(&mut self, id: JobId) -> MetaResult<bool> {
1210        let status = StreamingJob::find()
1211            .select_only()
1212            .column(streaming_job::Column::JobStatus)
1213            .filter(streaming_job::Column::JobId.eq(id))
1214            .into_tuple::<JobStatus>()
1215            .one(&self.db)
1216            .await?;
1217
1218        status
1219            .map(|status| status == JobStatus::Created)
1220            .ok_or_else(|| {
1221                MetaError::catalog_id_not_found("streaming job", "may have been cancelled/dropped")
1222            })
1223    }
1224
1225    pub(crate) fn notify_finish_failed(&mut self, database_id: Option<DatabaseId>, err: String) {
1226        if let Some(database_id) = database_id {
1227            if let Some(creating_tables) = self.creating_table_finish_notifier.remove(&database_id)
1228            {
1229                for tx in creating_tables.into_values().flatten() {
1230                    let _ = tx.send(Err(err.clone()));
1231                }
1232            }
1233        } else {
1234            for tx in take(&mut self.creating_table_finish_notifier)
1235                .into_values()
1236                .flatten()
1237                .flat_map(|(_, txs)| txs.into_iter())
1238            {
1239                let _ = tx.send(Err(err.clone()));
1240            }
1241        }
1242    }
1243
1244    pub async fn list_time_travel_table_ids(&self) -> MetaResult<Vec<TableId>> {
1245        let table_ids: Vec<TableId> = Table::find()
1246            .select_only()
1247            .filter(table::Column::TableType.is_in(vec![
1248                TableType::Table,
1249                TableType::MaterializedView,
1250                TableType::Index,
1251            ]))
1252            .column(table::Column::TableId)
1253            .into_tuple()
1254            .all(&self.db)
1255            .await?;
1256        Ok(table_ids)
1257    }
1258
1259    /// Since the tables have been dropped from both meta store and streaming jobs, this method removes those table copies.
1260    /// Returns the removed table copies.
1261    pub(crate) fn complete_dropped_tables(
1262        &mut self,
1263        table_ids: impl IntoIterator<Item = TableId>,
1264    ) -> Vec<PbTable> {
1265        table_ids
1266            .into_iter()
1267            .filter_map(|table_id| {
1268                self.dropped_tables.remove(&table_id).map_or_else(
1269                    || {
1270                        tracing::warn!(%table_id, "table not found");
1271                        None
1272                    },
1273                    Some,
1274                )
1275            })
1276            .collect()
1277    }
1278}