Skip to main content

risingwave_meta/controller/catalog/
mod.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod alter_op;
16mod create_op;
17mod drop_op;
18mod get_op;
19mod list_op;
20mod test;
21mod util;
22
23use std::collections::{BTreeSet, HashMap, HashSet};
24use std::iter;
25use std::mem::take;
26use std::sync::Arc;
27
28use anyhow::anyhow;
29use itertools::Itertools;
30use risingwave_common::catalog::{
31    DEFAULT_SCHEMA_NAME, FragmentTypeFlag, FragmentTypeMask, SYSTEM_SCHEMAS, TableOption,
32};
33use risingwave_common::current_cluster_version;
34use risingwave_common::id::JobId;
35use risingwave_common::secret::LocalSecretManager;
36use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont_mut;
37use risingwave_connector::source::UPSTREAM_SOURCE_KEY;
38use risingwave_connector::source::cdc::build_cdc_table_id;
39use risingwave_meta_model::object::ObjectType;
40use risingwave_meta_model::prelude::*;
41use risingwave_meta_model::table::TableType;
42use risingwave_meta_model::{
43    ColumnCatalogArray, ConnectionId, CreateType, DatabaseId, FragmentId, I32Array, IndexId,
44    JobStatus, ObjectId, Property, SchemaId, SecretId, SinkFormatDesc, SinkId, SourceId,
45    StreamNode, StreamSourceInfo, StreamingParallelism, SubscriptionId, TableId, TableIdArray,
46    UserId, ViewId, connection, database, fragment, function, index, object, object_dependency,
47    pending_sink_state, schema, secret, sink, source, streaming_job, subscription, table,
48    user_privilege, view,
49};
50use risingwave_pb::catalog::connection::Info as ConnectionInfo;
51use risingwave_pb::catalog::subscription::SubscriptionState;
52use risingwave_pb::catalog::table::PbTableType;
53use risingwave_pb::catalog::{
54    PbComment, PbConnection, PbDatabase, PbFunction, PbIndex, PbSchema, PbSecret, PbSink, PbSource,
55    PbSubscription, PbTable, PbView,
56};
57use risingwave_pb::meta::cancel_creating_jobs_request::PbCreatingJobInfo;
58use risingwave_pb::meta::object::PbObjectInfo;
59use risingwave_pb::meta::subscribe_response::{
60    Info as NotificationInfo, Info, Operation as NotificationOperation, Operation,
61};
62use risingwave_pb::meta::{PbObject, PbObjectGroup};
63use risingwave_pb::stream_plan::stream_node::NodeBody;
64use risingwave_pb::telemetry::PbTelemetryEventStage;
65use risingwave_pb::user::PbUserInfo;
66use sea_orm::ActiveValue::Set;
67use sea_orm::sea_query::{Expr, Query, SimpleExpr};
68use sea_orm::{
69    ActiveModelTrait, ColumnTrait, DatabaseConnection, DatabaseTransaction, EntityTrait,
70    IntoActiveModel, JoinType, PaginatorTrait, QueryFilter, QuerySelect, RelationTrait,
71    SelectColumns, TransactionTrait, Value,
72};
73use tokio::sync::oneshot::Sender;
74use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
75use tracing::info;
76
77use super::utils::{
78    check_subscription_name_duplicate, get_internal_tables_by_id, load_streaming_jobs_by_ids,
79    rename_relation, rename_relation_refer,
80};
81use crate::controller::ObjectModel;
82use crate::controller::catalog::util::update_internal_tables;
83use crate::controller::fragment::FragmentTypeMaskExt;
84use crate::controller::utils::*;
85use crate::manager::{
86    IGNORED_NOTIFICATION_VERSION, MetaSrvEnv, NotificationVersion,
87    get_referred_connection_ids_from_source, get_referred_secret_ids_from_source,
88};
89use crate::rpc::ddl_controller::DropMode;
90use crate::telemetry::{MetaTelemetryJobDesc, report_event};
91use crate::{MetaError, MetaResult};
92
93pub type Catalog = (
94    Vec<PbDatabase>,
95    Vec<PbSchema>,
96    Vec<PbTable>,
97    Vec<PbSource>,
98    Vec<PbSink>,
99    Vec<PbSubscription>,
100    Vec<PbIndex>,
101    Vec<PbView>,
102    Vec<PbFunction>,
103    Vec<PbConnection>,
104    Vec<PbSecret>,
105);
106
107pub type CatalogControllerRef = Arc<CatalogController>;
108
109/// `CatalogController` is the controller for catalog related operations, including database, schema, table, view, etc.
110pub struct CatalogController {
111    pub(crate) env: MetaSrvEnv,
112    pub(crate) inner: RwLock<CatalogControllerInner>,
113}
114
115#[derive(Clone, Default, Debug)]
116pub struct DropTableConnectorContext {
117    // we only apply one drop connector action for one table each time, so no need to vector here
118    pub(crate) to_change_streaming_job_id: JobId,
119    pub(crate) to_remove_state_table_id: TableId,
120    pub(crate) to_remove_source_id: SourceId,
121}
122
123#[derive(Clone, Default, Debug)]
124pub struct ReleaseContext {
125    pub(crate) database_id: DatabaseId,
126    pub(crate) removed_streaming_job_ids: Vec<JobId>,
127    /// Dropped state table list, need to unregister from hummock.
128    pub(crate) removed_state_table_ids: Vec<TableId>,
129
130    /// Dropped secrets, need to remove from secret manager.
131    pub(crate) removed_secret_ids: Vec<SecretId>,
132    /// Dropped sources (when `DROP SOURCE`), need to unregister from source manager.
133    pub(crate) removed_source_ids: Vec<SourceId>,
134    /// Dropped Source fragments (when `DROP MATERIALIZED VIEW` referencing sources),
135    /// need to unregister from source manager.
136    pub(crate) removed_source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
137
138    pub(crate) removed_fragments: HashSet<FragmentId>,
139
140    /// Removed sink fragment by target fragment.
141    pub(crate) removed_sink_fragment_by_targets: HashMap<FragmentId, Vec<FragmentId>>,
142
143    /// Dropped iceberg table sinks
144    pub(crate) removed_iceberg_table_sinks: Vec<PbSink>,
145
146    /// Dropped Iceberg V3 sink ids. Used to unregister per-sink commit workers
147    /// owned by `IcebergV3SinkManager`. Filtered via `is_iceberg_v3_sink` on
148    /// the sink properties so user-created V3 sinks (any name) are included.
149    pub(crate) removed_iceberg_v3_sink_ids: Vec<SinkId>,
150}
151
152#[derive(Default)]
153pub(crate) struct CleanedDirtyStreamingJobs {
154    pub(crate) streaming_job_ids: Vec<JobId>,
155    /// Only populated for per-database recovery.
156    pub(crate) dropped_table_ids: Vec<TableId>,
157    pub(crate) source_ids: Vec<SourceId>,
158}
159
160impl CatalogController {
161    pub async fn new(env: MetaSrvEnv) -> MetaResult<Self> {
162        let meta_store = env.meta_store();
163        let catalog_controller = Self {
164            env,
165            inner: RwLock::new(CatalogControllerInner {
166                db: meta_store.conn,
167                creating_table_finish_notifier: HashMap::new(),
168                dropped_tables: HashMap::new(),
169            }),
170        };
171
172        catalog_controller.init().await?;
173        Ok(catalog_controller)
174    }
175
176    /// Used in `NotificationService::subscribe`.
177    /// Need to pay attention to the order of acquiring locks to prevent deadlock problems.
178    pub async fn get_inner_read_guard(&self) -> RwLockReadGuard<'_, CatalogControllerInner> {
179        self.inner.read().await
180    }
181
182    pub async fn get_inner_write_guard(&self) -> RwLockWriteGuard<'_, CatalogControllerInner> {
183        self.inner.write().await
184    }
185}
186
187pub struct CatalogControllerInner {
188    pub(crate) db: DatabaseConnection,
189    /// Registered finish notifiers for creating tables.
190    ///
191    /// `DdlController` will update this map, and pass the `tx` side to `CatalogController`.
192    /// On notifying, we can remove the entry from this map.
193    #[expect(clippy::type_complexity)]
194    pub creating_table_finish_notifier:
195        HashMap<DatabaseId, HashMap<JobId, Vec<Sender<Result<NotificationVersion, String>>>>>,
196    /// Tables have been dropped from the meta store, but the corresponding barrier remains unfinished.
197    pub dropped_tables: HashMap<TableId, PbTable>,
198}
199
200impl CatalogController {
201    pub(crate) async fn notify_frontend(
202        &self,
203        operation: NotificationOperation,
204        info: NotificationInfo,
205    ) -> NotificationVersion {
206        self.env
207            .notification_manager()
208            .notify_frontend(operation, info)
209            .await
210    }
211
212    pub(crate) async fn notify_frontend_relation_info(
213        &self,
214        operation: NotificationOperation,
215        relation_info: PbObjectInfo,
216    ) -> NotificationVersion {
217        self.env
218            .notification_manager()
219            .notify_frontend_object_info(operation, relation_info)
220            .await
221    }
222
223    /// Trivially advance the notification version and notify to frontend,
224    /// return the notification version for frontend to wait for.
225    ///
226    /// Cannot simply return the current version, because the current version may not be sent
227    /// to frontend, and the frontend may endlessly wait for this version, until a frontend
228    /// related notification is sent.
229    pub(crate) async fn notify_frontend_trivial(&self) -> NotificationVersion {
230        self.env
231            .notification_manager()
232            .notify_frontend(
233                NotificationOperation::Update,
234                NotificationInfo::ObjectGroup(PbObjectGroup {
235                    objects: vec![],
236                    dependencies: vec![],
237                }),
238            )
239            .await
240    }
241}
242
243impl CatalogController {
244    pub async fn finish_create_subscription_catalog(
245        &self,
246        subscription_id: SubscriptionId,
247    ) -> MetaResult<()> {
248        let inner = self.inner.write().await;
249        let txn = inner.db.begin().await?;
250
251        // update `created_at` as now() and `created_at_cluster_version` as current cluster version.
252        let res = Object::update_many()
253            .col_expr(object::Column::CreatedAt, Expr::current_timestamp().into())
254            .col_expr(
255                object::Column::CreatedAtClusterVersion,
256                current_cluster_version().into(),
257            )
258            .filter(object::Column::Oid.eq(subscription_id))
259            .exec(&txn)
260            .await?;
261        if res.rows_affected == 0 {
262            return Err(MetaError::catalog_id_not_found(
263                "subscription",
264                subscription_id,
265            ));
266        }
267
268        // mark the target subscription as `Create`.
269        let job = subscription::ActiveModel {
270            subscription_id: Set(subscription_id),
271            subscription_state: Set(SubscriptionState::Created.into()),
272            ..Default::default()
273        };
274        Subscription::update(job).exec(&txn).await?;
275
276        let _ = grant_default_privileges_automatically(&txn, subscription_id).await?;
277
278        txn.commit().await?;
279
280        Ok(())
281    }
282
283    pub async fn notify_create_subscription(
284        &self,
285        subscription_id: SubscriptionId,
286    ) -> MetaResult<NotificationVersion> {
287        let inner = self.inner.read().await;
288        let txn = inner.db.begin().await?;
289        let (subscription, obj) = Subscription::find_by_id(subscription_id)
290            .find_also_related(Object)
291            .filter(subscription::Column::SubscriptionState.eq(SubscriptionState::Created as i32))
292            .one(&txn)
293            .await?
294            .ok_or_else(|| MetaError::catalog_id_not_found("subscription", subscription_id))?;
295
296        let dependencies =
297            list_object_dependencies_by_object_id(&txn, subscription_id.into()).await?;
298        txn.commit().await?;
299
300        let mut version = self
301            .notify_frontend(
302                NotificationOperation::Add,
303                NotificationInfo::ObjectGroup(PbObjectGroup {
304                    objects: vec![PbObject {
305                        object_info: PbObjectInfo::Subscription(
306                            ObjectModel(subscription, obj.unwrap(), None).into(),
307                        )
308                        .into(),
309                    }],
310                    dependencies,
311                }),
312            )
313            .await;
314
315        // notify default privileges about the new subscription
316        let updated_user_ids: Vec<UserId> = UserPrivilege::find()
317            .select_only()
318            .distinct()
319            .column(user_privilege::Column::UserId)
320            .filter(user_privilege::Column::Oid.eq(subscription_id.as_object_id()))
321            .into_tuple()
322            .all(&inner.db)
323            .await?;
324
325        if !updated_user_ids.is_empty() {
326            let updated_user_infos = list_user_info_by_ids(updated_user_ids, &inner.db).await?;
327            version = self.notify_users_update(updated_user_infos).await;
328        }
329
330        Ok(version)
331    }
332
333    // for telemetry
334    pub async fn get_connector_usage(&self) -> MetaResult<jsonbb::Value> {
335        // get connector usage by source/sink
336        // the expect format is like:
337        // {
338        //     "source": [{
339        //         "$source_id": {
340        //             "connector": "kafka",
341        //             "format": "plain",
342        //             "encode": "json"
343        //         },
344        //     }],
345        //     "sink": [{
346        //         "$sink_id": {
347        //             "connector": "pulsar",
348        //             "format": "upsert",
349        //             "encode": "avro"
350        //         },
351        //     }],
352        // }
353
354        let inner = self.inner.read().await;
355        let source_props_and_info: Vec<(i32, Property, Option<StreamSourceInfo>)> = Source::find()
356            .select_only()
357            .column(source::Column::SourceId)
358            .column(source::Column::WithProperties)
359            .column(source::Column::SourceInfo)
360            .into_tuple()
361            .all(&inner.db)
362            .await?;
363        let sink_props_and_info: Vec<(i32, Property, Option<SinkFormatDesc>)> = Sink::find()
364            .select_only()
365            .column(sink::Column::SinkId)
366            .column(sink::Column::Properties)
367            .column(sink::Column::SinkFormatDesc)
368            .into_tuple()
369            .all(&inner.db)
370            .await?;
371        drop(inner);
372
373        let get_connector_from_property = |property: &Property| -> String {
374            property
375                .0
376                .get(UPSTREAM_SOURCE_KEY)
377                .cloned()
378                .unwrap_or_default()
379        };
380
381        let source_report: Vec<jsonbb::Value> = source_props_and_info
382            .iter()
383            .map(|(oid, property, info)| {
384                let connector_name = get_connector_from_property(property);
385                let mut format = None;
386                let mut encode = None;
387                if let Some(info) = info {
388                    let pb_info = info.to_protobuf();
389                    format = Some(pb_info.format().as_str_name());
390                    encode = Some(pb_info.row_encode().as_str_name());
391                }
392                jsonbb::json!({
393                    oid.to_string(): {
394                        "connector": connector_name,
395                        "format": format,
396                        "encode": encode,
397                    },
398                })
399            })
400            .collect_vec();
401
402        let sink_report: Vec<jsonbb::Value> = sink_props_and_info
403            .iter()
404            .map(|(oid, property, info)| {
405                let connector_name = get_connector_from_property(property);
406                let mut format = None;
407                let mut encode = None;
408                if let Some(info) = info {
409                    let pb_info = info.to_protobuf();
410                    format = Some(pb_info.format().as_str_name());
411                    encode = Some(pb_info.encode().as_str_name());
412                }
413                jsonbb::json!({
414                    oid.to_string(): {
415                        "connector": connector_name,
416                        "format": format,
417                        "encode": encode,
418                    },
419                })
420            })
421            .collect_vec();
422
423        Ok(jsonbb::json!({
424                "source": source_report,
425                "sink": sink_report,
426        }))
427    }
428
429    pub async fn clean_dirty_subscription(
430        &self,
431        database_id: Option<DatabaseId>,
432    ) -> MetaResult<()> {
433        let inner = self.inner.write().await;
434        let txn = inner.db.begin().await?;
435        let filter_condition = object::Column::ObjType.eq(ObjectType::Subscription).and(
436            object::Column::Oid.not_in_subquery(
437                Query::select()
438                    .column(subscription::Column::SubscriptionId)
439                    .from(Subscription)
440                    .and_where(
441                        subscription::Column::SubscriptionState
442                            .eq(SubscriptionState::Created as i32),
443                    )
444                    .take(),
445            ),
446        );
447
448        let filter_condition = if let Some(database_id) = database_id {
449            filter_condition.and(object::Column::DatabaseId.eq(database_id))
450        } else {
451            filter_condition
452        };
453        Object::delete_many()
454            .filter(filter_condition)
455            .exec(&txn)
456            .await?;
457        txn.commit().await?;
458        // We don't need to notify the frontend, because the Init subscription is not send to frontend.
459        Ok(())
460    }
461
462    /// `clean_dirty_creating_jobs` cleans up creating jobs that are creating in Foreground mode or in Initial status.
463    pub(crate) async fn clean_dirty_creating_jobs(
464        &self,
465        database_id: Option<DatabaseId>,
466    ) -> MetaResult<CleanedDirtyStreamingJobs> {
467        let mut inner = self.inner.write().await;
468        let txn = inner.db.begin().await?;
469
470        let filter_condition = streaming_job::Column::JobStatus.eq(JobStatus::Initial).or(
471            streaming_job::Column::JobStatus
472                .eq(JobStatus::Creating)
473                .and(streaming_job::Column::CreateType.eq(CreateType::Foreground)),
474        );
475
476        let filter_condition = if let Some(database_id) = database_id {
477            filter_condition.and(object::Column::DatabaseId.eq(database_id))
478        } else {
479            filter_condition
480        };
481
482        let mut dirty_job_objs: Vec<PartialObject> = streaming_job::Entity::find()
483            .select_only()
484            .columns([
485                object::Column::Oid,
486                object::Column::ObjType,
487                object::Column::SchemaId,
488                object::Column::DatabaseId,
489            ])
490            .join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
491            .filter(filter_condition)
492            .into_partial_model()
493            .all(&txn)
494            .await?;
495
496        // Check if there are any pending iceberg table jobs.
497        let dirty_iceberg_jobs = find_dirty_iceberg_table_jobs(&txn, database_id).await?;
498        if !dirty_iceberg_jobs.is_empty() {
499            dirty_job_objs.extend(dirty_iceberg_jobs);
500        }
501
502        Self::clean_dirty_sink_downstreams(&txn).await?;
503
504        if dirty_job_objs.is_empty() {
505            return Ok(CleanedDirtyStreamingJobs::default());
506        }
507
508        self.log_cleaned_dirty_jobs(&dirty_job_objs, &txn).await?;
509
510        let dirty_job_ids = dirty_job_objs
511            .iter()
512            .map(|obj| obj.oid.as_job_id())
513            .collect_vec();
514        let dirty_job_table_ids = dirty_job_ids
515            .iter()
516            .map(|job_id| job_id.as_mv_table_id())
517            .collect_vec();
518
519        // Filter out dummy objs for replacement.
520        // todo: we'd better introduce a new dummy object type for replacement.
521        let all_dirty_table_ids = dirty_job_objs
522            .iter()
523            .filter(|obj| obj.obj_type == ObjectType::Table)
524            .map(|obj| obj.oid)
525            .collect_vec();
526        let dirty_table_type_map: HashMap<ObjectId, TableType> = Table::find()
527            .select_only()
528            .column(table::Column::TableId)
529            .column(table::Column::TableType)
530            .filter(table::Column::TableId.is_in(all_dirty_table_ids))
531            .into_tuple::<(ObjectId, TableType)>()
532            .all(&txn)
533            .await?
534            .into_iter()
535            .collect();
536
537        let dirty_background_jobs: HashSet<JobId> = streaming_job::Entity::find()
538            .select_only()
539            .column(streaming_job::Column::JobId)
540            .filter(
541                streaming_job::Column::JobId
542                    .is_in(dirty_job_ids.iter().copied())
543                    .and(streaming_job::Column::CreateType.eq(CreateType::Background)),
544            )
545            .into_tuple()
546            .all(&txn)
547            .await?
548            .into_iter()
549            .collect();
550
551        // notify delete for failed materialized views and background jobs.
552        let to_notify_objs = dirty_job_objs
553            .iter()
554            .filter(|obj| {
555                matches!(
556                    dirty_table_type_map.get(&obj.oid),
557                    Some(TableType::MaterializedView)
558                ) || dirty_background_jobs.contains(&obj.oid.as_job_id())
559            })
560            .cloned()
561            .collect_vec();
562
563        // The source ids for dirty tables with connector.
564        // FIXME: we should also clean dirty sources.
565        let dirty_associated_source_ids: Vec<SourceId> = Table::find()
566            .select_only()
567            .column(table::Column::OptionalAssociatedSourceId)
568            .filter(
569                table::Column::TableId
570                    .is_in(dirty_job_table_ids)
571                    .and(table::Column::OptionalAssociatedSourceId.is_not_null()),
572            )
573            .into_tuple()
574            .all(&txn)
575            .await?;
576
577        let dirty_internal_state_table_ids: Vec<TableId> = Table::find()
578            .select_only()
579            .column(table::Column::TableId)
580            .filter(table::Column::BelongsToJobId.is_in(dirty_job_ids.iter().copied()))
581            .into_tuple()
582            .all(&txn)
583            .await?;
584        let dirty_state_table_ids = dirty_job_ids
585            .iter()
586            .map(|job_id| job_id.as_mv_table_id())
587            .chain(dirty_internal_state_table_ids.iter().copied())
588            .collect_vec();
589
590        let dirty_internal_table_objs = Object::find()
591            .select_only()
592            .columns([
593                object::Column::Oid,
594                object::Column::ObjType,
595                object::Column::SchemaId,
596                object::Column::DatabaseId,
597            ])
598            .join(JoinType::InnerJoin, object::Relation::Table.def())
599            .filter(table::Column::BelongsToJobId.is_in(to_notify_objs.iter().map(|obj| obj.oid)))
600            .into_partial_model()
601            .all(&txn)
602            .await?;
603
604        let to_delete_objs: HashSet<ObjectId> = dirty_job_ids
605            .iter()
606            .map(|job_id| job_id.as_object_id())
607            .chain(
608                dirty_state_table_ids
609                    .iter()
610                    .copied()
611                    .map(|table_id| table_id.as_object_id()),
612            )
613            .chain(
614                dirty_associated_source_ids
615                    .iter()
616                    .map(|source_id| source_id.as_object_id()),
617            )
618            .collect();
619
620        // Per-database recovery does not run the global Hummock purge. Keep table catalogs so the
621        // caller can reuse the normal dropped-table cleanup path after the catalog rows are deleted.
622        let dropped_tables = if database_id.is_some() {
623            Table::find()
624                .find_also_related(Object)
625                .filter(table::Column::TableId.is_in(dirty_state_table_ids.iter().copied()))
626                .all(&txn)
627                .await?
628                .into_iter()
629                .map(|(table, obj)| PbTable::from(ObjectModel(table, obj.unwrap(), None)))
630                .collect_vec()
631        } else {
632            vec![]
633        };
634        let dropped_table_ids = dropped_tables.iter().map(|table| table.id).collect_vec();
635
636        let res = Object::delete_many()
637            .filter(object::Column::Oid.is_in(to_delete_objs))
638            .exec(&txn)
639            .await?;
640        assert!(res.rows_affected > 0);
641
642        txn.commit().await?;
643        inner
644            .dropped_tables
645            .extend(dropped_tables.into_iter().map(|t| (t.id, t)));
646
647        let object_group = build_object_group_for_delete(
648            to_notify_objs
649                .into_iter()
650                .chain(dirty_internal_table_objs)
651                .collect_vec(),
652        );
653
654        let _version = self
655            .notify_frontend(NotificationOperation::Delete, object_group)
656            .await;
657
658        Ok(CleanedDirtyStreamingJobs {
659            streaming_job_ids: dirty_job_ids,
660            dropped_table_ids,
661            source_ids: dirty_associated_source_ids,
662        })
663    }
664
665    pub async fn comment_on(&self, comment: PbComment) -> MetaResult<NotificationVersion> {
666        let inner = self.inner.write().await;
667        let txn = inner.db.begin().await?;
668        ensure_object_id(ObjectType::Database, comment.database_id, &txn).await?;
669        ensure_object_id(ObjectType::Schema, comment.schema_id, &txn).await?;
670        let (table_obj, streaming_job) = Object::find_by_id(comment.table_id)
671            .find_also_related(StreamingJob)
672            .one(&txn)
673            .await?
674            .ok_or_else(|| MetaError::catalog_id_not_found("object", comment.table_id))?;
675
676        let table = if let Some(col_idx) = comment.column_index {
677            let columns: ColumnCatalogArray = Table::find_by_id(comment.table_id)
678                .select_only()
679                .column(table::Column::Columns)
680                .into_tuple()
681                .one(&txn)
682                .await?
683                .ok_or_else(|| MetaError::catalog_id_not_found("table", comment.table_id))?;
684            let mut pb_columns = columns.to_protobuf();
685
686            let column = pb_columns
687                .get_mut(col_idx as usize)
688                .ok_or_else(|| MetaError::catalog_id_not_found("column", col_idx))?;
689            let column_desc = column.column_desc.as_mut().ok_or_else(|| {
690                anyhow!(
691                    "column desc at index {} for table id {} not found",
692                    col_idx,
693                    comment.table_id
694                )
695            })?;
696            column_desc.description = comment.description;
697            table::ActiveModel {
698                table_id: Set(comment.table_id),
699                columns: Set(pb_columns.into()),
700                ..Default::default()
701            }
702            .update(&txn)
703            .await?
704        } else {
705            table::ActiveModel {
706                table_id: Set(comment.table_id),
707                description: Set(comment.description),
708                ..Default::default()
709            }
710            .update(&txn)
711            .await?
712        };
713        txn.commit().await?;
714
715        let version = self
716            .notify_frontend_relation_info(
717                NotificationOperation::Update,
718                PbObjectInfo::Table(ObjectModel(table, table_obj, streaming_job).into()),
719            )
720            .await;
721
722        Ok(version)
723    }
724
725    async fn notify_hummock_dropped_tables(&self, tables: Vec<PbTable>) {
726        if tables.is_empty() {
727            return;
728        }
729        let objects = tables
730            .into_iter()
731            .map(|t| PbObject {
732                object_info: Some(PbObjectInfo::Table(t)),
733            })
734            .collect();
735        let group = NotificationInfo::ObjectGroup(PbObjectGroup {
736            objects,
737            dependencies: vec![],
738        });
739        self.env
740            .notification_manager()
741            .notify_hummock(NotificationOperation::Delete, group.clone())
742            .await;
743        self.env
744            .notification_manager()
745            .notify_compactor(NotificationOperation::Delete, group)
746            .await;
747    }
748
749    pub async fn complete_dropped_tables(&self, table_ids: impl IntoIterator<Item = TableId>) {
750        let mut inner = self.inner.write().await;
751        let tables = inner.complete_dropped_tables(table_ids);
752        self.notify_hummock_dropped_tables(tables).await;
753    }
754
755    pub async fn cleanup_dropped_tables(&self) {
756        let mut inner = self.inner.write().await;
757        let tables = inner.dropped_tables.drain().map(|(_, t)| t).collect();
758        self.notify_hummock_dropped_tables(tables).await;
759    }
760
761    pub async fn stats(&self) -> MetaResult<CatalogStats> {
762        let inner = self.inner.read().await;
763
764        let mut table_num_map: HashMap<_, _> = Table::find()
765            .select_only()
766            .column(table::Column::TableType)
767            .column_as(table::Column::TableId.count(), "num")
768            .group_by(table::Column::TableType)
769            .having(table::Column::TableType.ne(TableType::Internal))
770            .into_tuple::<(TableType, i64)>()
771            .all(&inner.db)
772            .await?
773            .into_iter()
774            .map(|(table_type, num)| (table_type, num as u64))
775            .collect();
776
777        let source_num = Source::find().count(&inner.db).await?;
778        let sink_num = Sink::find().count(&inner.db).await?;
779        let function_num = Function::find().count(&inner.db).await?;
780        let streaming_job_num = StreamingJob::find().count(&inner.db).await?;
781
782        let actor_num = {
783            let guard = self.env.shared_actor_info.read_guard();
784            guard
785                .iter_over_fragments()
786                .map(|(_, fragment)| fragment.actors.len() as u64)
787                .sum::<u64>()
788        };
789        let database_num = Database::find().count(&inner.db).await?;
790
791        Ok(CatalogStats {
792            table_num: table_num_map.remove(&TableType::Table).unwrap_or(0),
793            mview_num: table_num_map
794                .remove(&TableType::MaterializedView)
795                .unwrap_or(0),
796            index_num: table_num_map.remove(&TableType::Index).unwrap_or(0),
797            source_num,
798            sink_num,
799            function_num,
800            streaming_job_num,
801            actor_num,
802            database_num,
803        })
804    }
805
806    pub async fn fetch_sink_with_state_table_ids(
807        &self,
808        sink_ids: HashSet<SinkId>,
809    ) -> MetaResult<HashMap<SinkId, Vec<TableId>>> {
810        let inner = self.inner.read().await;
811
812        let query = Fragment::find()
813            .select_only()
814            .columns([fragment::Column::JobId, fragment::Column::StateTableIds])
815            .filter(
816                fragment::Column::JobId
817                    .is_in(sink_ids)
818                    .and(FragmentTypeMask::intersects(FragmentTypeFlag::Sink)),
819            );
820
821        let rows: Vec<(JobId, TableIdArray)> = query.into_tuple().all(&inner.db).await?;
822
823        debug_assert!(rows.iter().map(|(job_id, _)| job_id).all_unique());
824
825        let result = rows
826            .into_iter()
827            .map(|(job_id, table_id_array)| (job_id.as_sink_id(), table_id_array.0))
828            .collect::<HashMap<_, _>>();
829
830        Ok(result)
831    }
832
833    pub async fn list_all_pending_sinks(
834        &self,
835        database_id: Option<DatabaseId>,
836    ) -> MetaResult<HashSet<SinkId>> {
837        let inner = self.inner.read().await;
838
839        let mut query = pending_sink_state::Entity::find()
840            .select_only()
841            .columns([pending_sink_state::Column::SinkId])
842            .filter(
843                pending_sink_state::Column::SinkState.eq(pending_sink_state::SinkState::Pending),
844            )
845            .distinct();
846
847        if let Some(db_id) = database_id {
848            query = query
849                .join(
850                    JoinType::InnerJoin,
851                    pending_sink_state::Relation::Object.def(),
852                )
853                .filter(object::Column::DatabaseId.eq(db_id));
854        }
855
856        let result: Vec<SinkId> = query.into_tuple().all(&inner.db).await?;
857
858        Ok(result.into_iter().collect())
859    }
860
861    pub async fn abort_pending_sink_epochs(
862        &self,
863        sink_committed_epoch: HashMap<SinkId, u64>,
864    ) -> MetaResult<()> {
865        let inner = self.inner.write().await;
866        let txn = inner.db.begin().await?;
867
868        for (sink_id, committed_epoch) in sink_committed_epoch {
869            pending_sink_state::Entity::update_many()
870                .col_expr(
871                    pending_sink_state::Column::SinkState,
872                    Expr::value(pending_sink_state::SinkState::Aborted),
873                )
874                .filter(
875                    pending_sink_state::Column::SinkId
876                        .eq(sink_id)
877                        .and(pending_sink_state::Column::Epoch.gt(committed_epoch as i64)),
878                )
879                .exec(&txn)
880                .await?;
881        }
882
883        txn.commit().await?;
884        Ok(())
885    }
886}
887
888/// `CatalogStats` is a struct to store the statistics of all catalogs.
889pub struct CatalogStats {
890    pub table_num: u64,
891    pub mview_num: u64,
892    pub index_num: u64,
893    pub source_num: u64,
894    pub sink_num: u64,
895    pub function_num: u64,
896    pub streaming_job_num: u64,
897    pub actor_num: u64,
898    pub database_num: u64,
899}
900
901impl CatalogControllerInner {
902    pub async fn snapshot(&self) -> MetaResult<(Catalog, Vec<PbUserInfo>)> {
903        let databases = self.list_databases().await?;
904        let schemas = self.list_schemas().await?;
905        let tables = self.list_tables().await?;
906        let sources = self.list_sources().await?;
907        let sinks = self.list_sinks().await?;
908        let subscriptions = self.list_subscriptions().await?;
909        let indexes = self.list_indexes().await?;
910        let views = self.list_views().await?;
911        let functions = self.list_functions().await?;
912        let connections = self.list_connections().await?;
913        let secrets = self.list_secrets().await?;
914
915        let users = self.list_users().await?;
916
917        Ok((
918            (
919                databases,
920                schemas,
921                tables,
922                sources,
923                sinks,
924                subscriptions,
925                indexes,
926                views,
927                functions,
928                connections,
929                secrets,
930            ),
931            users,
932        ))
933    }
934
935    async fn list_databases(&self) -> MetaResult<Vec<PbDatabase>> {
936        let db_objs = Database::find()
937            .find_also_related(Object)
938            .all(&self.db)
939            .await?;
940        Ok(db_objs
941            .into_iter()
942            .map(|(db, obj)| ObjectModel(db, obj.unwrap(), None).into())
943            .collect())
944    }
945
946    async fn list_schemas(&self) -> MetaResult<Vec<PbSchema>> {
947        let schema_objs = Schema::find()
948            .find_also_related(Object)
949            .all(&self.db)
950            .await?;
951
952        Ok(schema_objs
953            .into_iter()
954            .map(|(schema, obj)| ObjectModel(schema, obj.unwrap(), None).into())
955            .collect())
956    }
957
958    async fn list_users(&self) -> MetaResult<Vec<PbUserInfo>> {
959        let mut user_infos: Vec<PbUserInfo> = User::find()
960            .all(&self.db)
961            .await?
962            .into_iter()
963            .map(Into::into)
964            .collect();
965
966        for user_info in &mut user_infos {
967            user_info.grant_privileges = get_user_privilege(user_info.id as _, &self.db).await?;
968        }
969        Ok(user_infos)
970    }
971
972    /// `list_all_tables` return all tables and internal tables.
973    pub async fn list_all_state_tables(&self) -> MetaResult<Vec<PbTable>> {
974        let table_objs = Table::find()
975            .find_also_related(Object)
976            .all(&self.db)
977            .await?;
978        let streaming_jobs = load_streaming_jobs_by_ids(
979            &self.db,
980            table_objs.iter().map(|(table, _)| table.job_id()),
981        )
982        .await?;
983
984        Ok(table_objs
985            .into_iter()
986            .map(|(table, obj)| {
987                let job_id = table.job_id();
988                let streaming_job = streaming_jobs.get(&job_id).cloned();
989                ObjectModel(table, obj.unwrap(), streaming_job).into()
990            })
991            .collect())
992    }
993
994    /// `list_tables` return all `CREATED` tables, `CREATING` materialized views/ `BACKGROUND` jobs and internal tables that belong to them and sinks.
995    async fn list_tables(&self) -> MetaResult<Vec<PbTable>> {
996        let mut table_objs = Table::find()
997            .find_also_related(Object)
998            .all(&self.db)
999            .await?;
1000
1001        let all_streaming_jobs: HashMap<JobId, streaming_job::Model> = StreamingJob::find()
1002            .all(&self.db)
1003            .await?
1004            .into_iter()
1005            .map(|job| (job.job_id, job))
1006            .collect();
1007
1008        let sink_ids: HashSet<SinkId> = Sink::find()
1009            .select_only()
1010            .column(sink::Column::SinkId)
1011            .into_tuple::<SinkId>()
1012            .all(&self.db)
1013            .await?
1014            .into_iter()
1015            .collect();
1016
1017        let mview_job_ids: HashSet<JobId> = table_objs
1018            .iter()
1019            .filter_map(|(table, _)| {
1020                if table.table_type == TableType::MaterializedView {
1021                    Some(table.table_id.as_job_id())
1022                } else {
1023                    None
1024                }
1025            })
1026            .collect();
1027
1028        table_objs.retain(|(table, _)| {
1029            let job_id = table.job_id();
1030
1031            if sink_ids.contains(&job_id.as_sink_id()) || mview_job_ids.contains(&job_id) {
1032                return true;
1033            }
1034            if let Some(streaming_job) = all_streaming_jobs.get(&job_id) {
1035                return streaming_job.job_status == JobStatus::Created
1036                    || (streaming_job.create_type == CreateType::Background);
1037            }
1038            false
1039        });
1040
1041        let mut tables = Vec::with_capacity(table_objs.len());
1042        for (table, obj) in table_objs {
1043            let job_id = table.job_id();
1044            let streaming_job = all_streaming_jobs.get(&job_id).cloned();
1045            let pb_table: PbTable = ObjectModel(table, obj.unwrap(), streaming_job).into();
1046            tables.push(pb_table);
1047        }
1048        Ok(tables)
1049    }
1050
1051    /// `list_sources` return all sources and `CREATED` ones if contains any streaming jobs.
1052    async fn list_sources(&self) -> MetaResult<Vec<PbSource>> {
1053        let mut source_objs = Source::find()
1054            .find_also_related(Object)
1055            .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
1056            .filter(
1057                streaming_job::Column::JobStatus
1058                    .is_null()
1059                    .or(streaming_job::Column::JobStatus.eq(JobStatus::Created)),
1060            )
1061            .all(&self.db)
1062            .await?;
1063
1064        // filter out inner connector sources that are still under creating.
1065        let created_table_ids: HashSet<TableId> = Table::find()
1066            .select_only()
1067            .column(table::Column::TableId)
1068            .join(JoinType::InnerJoin, table::Relation::Object1.def())
1069            .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
1070            .filter(
1071                table::Column::OptionalAssociatedSourceId
1072                    .is_not_null()
1073                    .and(streaming_job::Column::JobStatus.eq(JobStatus::Created)),
1074            )
1075            .into_tuple()
1076            .all(&self.db)
1077            .await?
1078            .into_iter()
1079            .collect();
1080        source_objs.retain_mut(|(source, _)| {
1081            source.optional_associated_table_id.is_none()
1082                || created_table_ids.contains(&source.optional_associated_table_id.unwrap())
1083        });
1084
1085        Ok(source_objs
1086            .into_iter()
1087            .map(|(source, obj)| ObjectModel(source, obj.unwrap(), None).into())
1088            .collect())
1089    }
1090
1091    /// `list_sinks` return all sinks.
1092    async fn list_sinks(&self) -> MetaResult<Vec<PbSink>> {
1093        let sink_objs = Sink::find()
1094            .find_also_related(Object)
1095            .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
1096            .all(&self.db)
1097            .await?;
1098        let streaming_jobs = load_streaming_jobs_by_ids(
1099            &self.db,
1100            sink_objs.iter().map(|(sink, _)| sink.sink_id.as_job_id()),
1101        )
1102        .await?;
1103
1104        Ok(sink_objs
1105            .into_iter()
1106            .map(|(sink, obj)| {
1107                let streaming_job = streaming_jobs.get(&sink.sink_id.as_job_id()).cloned();
1108                ObjectModel(sink, obj.unwrap(), streaming_job).into()
1109            })
1110            .collect())
1111    }
1112
1113    /// `list_subscriptions` return all `CREATED` subscriptions.
1114    async fn list_subscriptions(&self) -> MetaResult<Vec<PbSubscription>> {
1115        let subscription_objs = Subscription::find()
1116            .find_also_related(Object)
1117            .filter(subscription::Column::SubscriptionState.eq(SubscriptionState::Created as i32))
1118            .all(&self.db)
1119            .await?;
1120
1121        Ok(subscription_objs
1122            .into_iter()
1123            .map(|(subscription, obj)| ObjectModel(subscription, obj.unwrap(), None).into())
1124            .collect())
1125    }
1126
1127    async fn list_views(&self) -> MetaResult<Vec<PbView>> {
1128        let view_objs = View::find().find_also_related(Object).all(&self.db).await?;
1129
1130        Ok(view_objs
1131            .into_iter()
1132            .map(|(view, obj)| ObjectModel(view, obj.unwrap(), None).into())
1133            .collect())
1134    }
1135
1136    /// `list_indexes` return all `CREATED` and `BACKGROUND` indexes.
1137    async fn list_indexes(&self) -> MetaResult<Vec<PbIndex>> {
1138        let index_objs = Index::find()
1139            .find_also_related(Object)
1140            .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
1141            .filter(
1142                streaming_job::Column::JobStatus
1143                    .eq(JobStatus::Created)
1144                    .or(streaming_job::Column::CreateType.eq(CreateType::Background)),
1145            )
1146            .all(&self.db)
1147            .await?;
1148        let streaming_jobs = load_streaming_jobs_by_ids(
1149            &self.db,
1150            index_objs
1151                .iter()
1152                .map(|(index, _)| index.index_id.as_job_id()),
1153        )
1154        .await?;
1155
1156        Ok(index_objs
1157            .into_iter()
1158            .map(|(index, obj)| {
1159                let streaming_job = streaming_jobs.get(&index.index_id.as_job_id()).cloned();
1160                ObjectModel(index, obj.unwrap(), streaming_job).into()
1161            })
1162            .collect())
1163    }
1164
1165    async fn list_connections(&self) -> MetaResult<Vec<PbConnection>> {
1166        let conn_objs = Connection::find()
1167            .find_also_related(Object)
1168            .all(&self.db)
1169            .await?;
1170
1171        Ok(conn_objs
1172            .into_iter()
1173            .map(|(conn, obj)| ObjectModel(conn, obj.unwrap(), None).into())
1174            .collect())
1175    }
1176
1177    pub async fn list_secrets(&self) -> MetaResult<Vec<PbSecret>> {
1178        let secret_objs = Secret::find()
1179            .find_also_related(Object)
1180            .all(&self.db)
1181            .await?;
1182        Ok(secret_objs
1183            .into_iter()
1184            .map(|(secret, obj)| ObjectModel(secret, obj.unwrap(), None).into())
1185            .collect())
1186    }
1187
1188    async fn list_functions(&self) -> MetaResult<Vec<PbFunction>> {
1189        let func_objs = Function::find()
1190            .find_also_related(Object)
1191            .all(&self.db)
1192            .await?;
1193
1194        Ok(func_objs
1195            .into_iter()
1196            .map(|(func, obj)| ObjectModel(func, obj.unwrap(), None).into())
1197            .collect())
1198    }
1199
1200    pub(crate) fn register_finish_notifier(
1201        &mut self,
1202        database_id: DatabaseId,
1203        id: JobId,
1204        sender: Sender<Result<NotificationVersion, String>>,
1205    ) {
1206        self.creating_table_finish_notifier
1207            .entry(database_id)
1208            .or_default()
1209            .entry(id)
1210            .or_default()
1211            .push(sender);
1212    }
1213
1214    pub(crate) async fn streaming_job_is_finished(&mut self, id: JobId) -> MetaResult<bool> {
1215        let status = StreamingJob::find()
1216            .select_only()
1217            .column(streaming_job::Column::JobStatus)
1218            .filter(streaming_job::Column::JobId.eq(id))
1219            .into_tuple::<JobStatus>()
1220            .one(&self.db)
1221            .await?;
1222
1223        status
1224            .map(|status| status == JobStatus::Created)
1225            .ok_or_else(|| {
1226                MetaError::catalog_id_not_found("streaming job", "may have been cancelled/dropped")
1227            })
1228    }
1229
1230    pub(crate) fn notify_finish_failed(&mut self, database_id: Option<DatabaseId>, err: String) {
1231        if let Some(database_id) = database_id {
1232            if let Some(creating_tables) = self.creating_table_finish_notifier.remove(&database_id)
1233            {
1234                for tx in creating_tables.into_values().flatten() {
1235                    let _ = tx.send(Err(err.clone()));
1236                }
1237            }
1238        } else {
1239            for tx in take(&mut self.creating_table_finish_notifier)
1240                .into_values()
1241                .flatten()
1242                .flat_map(|(_, txs)| txs.into_iter())
1243            {
1244                let _ = tx.send(Err(err.clone()));
1245            }
1246        }
1247    }
1248
1249    pub async fn list_time_travel_table_ids(&self) -> MetaResult<Vec<TableId>> {
1250        let table_ids: Vec<TableId> = Table::find()
1251            .select_only()
1252            .filter(table::Column::TableType.is_in(vec![
1253                TableType::Table,
1254                TableType::MaterializedView,
1255                TableType::Index,
1256            ]))
1257            .column(table::Column::TableId)
1258            .into_tuple()
1259            .all(&self.db)
1260            .await?;
1261        Ok(table_ids)
1262    }
1263
1264    /// Since the tables have been dropped from both meta store and streaming jobs, this method removes those table copies.
1265    /// Returns the removed table copies.
1266    pub(crate) fn complete_dropped_tables(
1267        &mut self,
1268        table_ids: impl IntoIterator<Item = TableId>,
1269    ) -> Vec<PbTable> {
1270        table_ids
1271            .into_iter()
1272            .filter_map(|table_id| {
1273                self.dropped_tables.remove(&table_id).map_or_else(
1274                    || {
1275                        tracing::warn!(%table_id, "table not found");
1276                        None
1277                    },
1278                    Some,
1279                )
1280            })
1281            .collect()
1282    }
1283}