1mod alter_op;
16mod create_op;
17mod drop_op;
18mod get_op;
19mod list_op;
20mod test;
21mod util;
22
23use std::collections::{BTreeSet, HashMap, HashSet};
24use std::iter;
25use std::mem::take;
26use std::sync::Arc;
27
28use anyhow::anyhow;
29use itertools::Itertools;
30use risingwave_common::catalog::{
31 DEFAULT_SCHEMA_NAME, FragmentTypeFlag, FragmentTypeMask, SYSTEM_SCHEMAS, TableOption,
32};
33use risingwave_common::current_cluster_version;
34use risingwave_common::id::JobId;
35use risingwave_common::secret::LocalSecretManager;
36use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont_mut;
37use risingwave_connector::source::UPSTREAM_SOURCE_KEY;
38use risingwave_connector::source::cdc::build_cdc_table_id;
39use risingwave_meta_model::object::ObjectType;
40use risingwave_meta_model::prelude::*;
41use risingwave_meta_model::table::TableType;
42use risingwave_meta_model::{
43 ColumnCatalogArray, ConnectionId, CreateType, DatabaseId, FragmentId, I32Array, IndexId,
44 JobStatus, ObjectId, Property, SchemaId, SecretId, SinkFormatDesc, SinkId, SourceId,
45 StreamNode, StreamSourceInfo, StreamingParallelism, SubscriptionId, TableId, TableIdArray,
46 UserId, ViewId, connection, database, fragment, function, index, object, object_dependency,
47 pending_sink_state, schema, secret, sink, source, streaming_job, subscription, table,
48 user_privilege, view,
49};
50use risingwave_pb::catalog::connection::Info as ConnectionInfo;
51use risingwave_pb::catalog::subscription::SubscriptionState;
52use risingwave_pb::catalog::table::PbTableType;
53use risingwave_pb::catalog::{
54 PbComment, PbConnection, PbDatabase, PbFunction, PbIndex, PbSchema, PbSecret, PbSink, PbSource,
55 PbSubscription, PbTable, PbView,
56};
57use risingwave_pb::meta::cancel_creating_jobs_request::PbCreatingJobInfo;
58use risingwave_pb::meta::object::PbObjectInfo;
59use risingwave_pb::meta::subscribe_response::{
60 Info as NotificationInfo, Info, Operation as NotificationOperation, Operation,
61};
62use risingwave_pb::meta::{PbObject, PbObjectGroup};
63use risingwave_pb::stream_plan::stream_node::NodeBody;
64use risingwave_pb::telemetry::PbTelemetryEventStage;
65use risingwave_pb::user::PbUserInfo;
66use sea_orm::ActiveValue::Set;
67use sea_orm::sea_query::{Expr, Query, SimpleExpr};
68use sea_orm::{
69 ActiveModelTrait, ColumnTrait, DatabaseConnection, DatabaseTransaction, EntityTrait,
70 IntoActiveModel, JoinType, PaginatorTrait, QueryFilter, QuerySelect, RelationTrait,
71 SelectColumns, TransactionTrait, Value,
72};
73use tokio::sync::oneshot::Sender;
74use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
75use tracing::info;
76
77use super::utils::{
78 check_subscription_name_duplicate, get_internal_tables_by_id, load_streaming_jobs_by_ids,
79 rename_relation, rename_relation_refer,
80};
81use crate::controller::ObjectModel;
82use crate::controller::catalog::util::update_internal_tables;
83use crate::controller::fragment::FragmentTypeMaskExt;
84use crate::controller::utils::*;
85use crate::manager::{
86 IGNORED_NOTIFICATION_VERSION, MetaSrvEnv, NotificationVersion,
87 get_referred_connection_ids_from_source, get_referred_secret_ids_from_source,
88};
89use crate::rpc::ddl_controller::DropMode;
90use crate::telemetry::{MetaTelemetryJobDesc, report_event};
91use crate::{MetaError, MetaResult};
92
93pub type Catalog = (
94 Vec<PbDatabase>,
95 Vec<PbSchema>,
96 Vec<PbTable>,
97 Vec<PbSource>,
98 Vec<PbSink>,
99 Vec<PbSubscription>,
100 Vec<PbIndex>,
101 Vec<PbView>,
102 Vec<PbFunction>,
103 Vec<PbConnection>,
104 Vec<PbSecret>,
105);
106
107pub type CatalogControllerRef = Arc<CatalogController>;
108
109pub struct CatalogController {
111 pub(crate) env: MetaSrvEnv,
112 pub(crate) inner: RwLock<CatalogControllerInner>,
113}
114
115#[derive(Clone, Default, Debug)]
116pub struct DropTableConnectorContext {
117 pub(crate) to_change_streaming_job_id: JobId,
119 pub(crate) to_remove_state_table_id: TableId,
120 pub(crate) to_remove_source_id: SourceId,
121}
122
123#[derive(Clone, Default, Debug)]
124pub struct ReleaseContext {
125 pub(crate) database_id: DatabaseId,
126 pub(crate) removed_streaming_job_ids: Vec<JobId>,
127 pub(crate) removed_state_table_ids: Vec<TableId>,
129
130 pub(crate) removed_secret_ids: Vec<SecretId>,
132 pub(crate) removed_source_ids: Vec<SourceId>,
134 pub(crate) removed_source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
137
138 pub(crate) removed_fragments: HashSet<FragmentId>,
139
140 pub(crate) removed_sink_fragment_by_targets: HashMap<FragmentId, Vec<FragmentId>>,
142
143 pub(crate) removed_iceberg_table_sinks: Vec<PbSink>,
145}
146
147#[derive(Default)]
148pub(crate) struct CleanedDirtyStreamingJobs {
149 pub(crate) streaming_job_ids: Vec<JobId>,
150 pub(crate) dropped_table_ids: Vec<TableId>,
152 pub(crate) source_ids: Vec<SourceId>,
153}
154
155impl CatalogController {
156 pub async fn new(env: MetaSrvEnv) -> MetaResult<Self> {
157 let meta_store = env.meta_store();
158 let catalog_controller = Self {
159 env,
160 inner: RwLock::new(CatalogControllerInner {
161 db: meta_store.conn,
162 creating_table_finish_notifier: HashMap::new(),
163 dropped_tables: HashMap::new(),
164 }),
165 };
166
167 catalog_controller.init().await?;
168 Ok(catalog_controller)
169 }
170
171 pub async fn get_inner_read_guard(&self) -> RwLockReadGuard<'_, CatalogControllerInner> {
174 self.inner.read().await
175 }
176
177 pub async fn get_inner_write_guard(&self) -> RwLockWriteGuard<'_, CatalogControllerInner> {
178 self.inner.write().await
179 }
180}
181
182pub struct CatalogControllerInner {
183 pub(crate) db: DatabaseConnection,
184 #[expect(clippy::type_complexity)]
189 pub creating_table_finish_notifier:
190 HashMap<DatabaseId, HashMap<JobId, Vec<Sender<Result<NotificationVersion, String>>>>>,
191 pub dropped_tables: HashMap<TableId, PbTable>,
193}
194
195impl CatalogController {
196 pub(crate) async fn notify_frontend(
197 &self,
198 operation: NotificationOperation,
199 info: NotificationInfo,
200 ) -> NotificationVersion {
201 self.env
202 .notification_manager()
203 .notify_frontend(operation, info)
204 .await
205 }
206
207 pub(crate) async fn notify_frontend_relation_info(
208 &self,
209 operation: NotificationOperation,
210 relation_info: PbObjectInfo,
211 ) -> NotificationVersion {
212 self.env
213 .notification_manager()
214 .notify_frontend_object_info(operation, relation_info)
215 .await
216 }
217
218 pub(crate) async fn notify_frontend_trivial(&self) -> NotificationVersion {
225 self.env
226 .notification_manager()
227 .notify_frontend(
228 NotificationOperation::Update,
229 NotificationInfo::ObjectGroup(PbObjectGroup {
230 objects: vec![],
231 dependencies: vec![],
232 }),
233 )
234 .await
235 }
236}
237
238impl CatalogController {
239 pub async fn finish_create_subscription_catalog(
240 &self,
241 subscription_id: SubscriptionId,
242 ) -> MetaResult<()> {
243 let inner = self.inner.write().await;
244 let txn = inner.db.begin().await?;
245
246 let res = Object::update_many()
248 .col_expr(object::Column::CreatedAt, Expr::current_timestamp().into())
249 .col_expr(
250 object::Column::CreatedAtClusterVersion,
251 current_cluster_version().into(),
252 )
253 .filter(object::Column::Oid.eq(subscription_id))
254 .exec(&txn)
255 .await?;
256 if res.rows_affected == 0 {
257 return Err(MetaError::catalog_id_not_found(
258 "subscription",
259 subscription_id,
260 ));
261 }
262
263 let job = subscription::ActiveModel {
265 subscription_id: Set(subscription_id),
266 subscription_state: Set(SubscriptionState::Created.into()),
267 ..Default::default()
268 };
269 Subscription::update(job).exec(&txn).await?;
270
271 let _ = grant_default_privileges_automatically(&txn, subscription_id).await?;
272
273 txn.commit().await?;
274
275 Ok(())
276 }
277
278 pub async fn notify_create_subscription(
279 &self,
280 subscription_id: SubscriptionId,
281 ) -> MetaResult<NotificationVersion> {
282 let inner = self.inner.read().await;
283 let txn = inner.db.begin().await?;
284 let (subscription, obj) = Subscription::find_by_id(subscription_id)
285 .find_also_related(Object)
286 .filter(subscription::Column::SubscriptionState.eq(SubscriptionState::Created as i32))
287 .one(&txn)
288 .await?
289 .ok_or_else(|| MetaError::catalog_id_not_found("subscription", subscription_id))?;
290
291 let dependencies =
292 list_object_dependencies_by_object_id(&txn, subscription_id.into()).await?;
293 txn.commit().await?;
294
295 let mut version = self
296 .notify_frontend(
297 NotificationOperation::Add,
298 NotificationInfo::ObjectGroup(PbObjectGroup {
299 objects: vec![PbObject {
300 object_info: PbObjectInfo::Subscription(
301 ObjectModel(subscription, obj.unwrap(), None).into(),
302 )
303 .into(),
304 }],
305 dependencies,
306 }),
307 )
308 .await;
309
310 let updated_user_ids: Vec<UserId> = UserPrivilege::find()
312 .select_only()
313 .distinct()
314 .column(user_privilege::Column::UserId)
315 .filter(user_privilege::Column::Oid.eq(subscription_id.as_object_id()))
316 .into_tuple()
317 .all(&inner.db)
318 .await?;
319
320 if !updated_user_ids.is_empty() {
321 let updated_user_infos = list_user_info_by_ids(updated_user_ids, &inner.db).await?;
322 version = self.notify_users_update(updated_user_infos).await;
323 }
324
325 Ok(version)
326 }
327
328 pub async fn get_connector_usage(&self) -> MetaResult<jsonbb::Value> {
330 let inner = self.inner.read().await;
350 let source_props_and_info: Vec<(i32, Property, Option<StreamSourceInfo>)> = Source::find()
351 .select_only()
352 .column(source::Column::SourceId)
353 .column(source::Column::WithProperties)
354 .column(source::Column::SourceInfo)
355 .into_tuple()
356 .all(&inner.db)
357 .await?;
358 let sink_props_and_info: Vec<(i32, Property, Option<SinkFormatDesc>)> = Sink::find()
359 .select_only()
360 .column(sink::Column::SinkId)
361 .column(sink::Column::Properties)
362 .column(sink::Column::SinkFormatDesc)
363 .into_tuple()
364 .all(&inner.db)
365 .await?;
366 drop(inner);
367
368 let get_connector_from_property = |property: &Property| -> String {
369 property
370 .0
371 .get(UPSTREAM_SOURCE_KEY)
372 .cloned()
373 .unwrap_or_default()
374 };
375
376 let source_report: Vec<jsonbb::Value> = source_props_and_info
377 .iter()
378 .map(|(oid, property, info)| {
379 let connector_name = get_connector_from_property(property);
380 let mut format = None;
381 let mut encode = None;
382 if let Some(info) = info {
383 let pb_info = info.to_protobuf();
384 format = Some(pb_info.format().as_str_name());
385 encode = Some(pb_info.row_encode().as_str_name());
386 }
387 jsonbb::json!({
388 oid.to_string(): {
389 "connector": connector_name,
390 "format": format,
391 "encode": encode,
392 },
393 })
394 })
395 .collect_vec();
396
397 let sink_report: Vec<jsonbb::Value> = sink_props_and_info
398 .iter()
399 .map(|(oid, property, info)| {
400 let connector_name = get_connector_from_property(property);
401 let mut format = None;
402 let mut encode = None;
403 if let Some(info) = info {
404 let pb_info = info.to_protobuf();
405 format = Some(pb_info.format().as_str_name());
406 encode = Some(pb_info.encode().as_str_name());
407 }
408 jsonbb::json!({
409 oid.to_string(): {
410 "connector": connector_name,
411 "format": format,
412 "encode": encode,
413 },
414 })
415 })
416 .collect_vec();
417
418 Ok(jsonbb::json!({
419 "source": source_report,
420 "sink": sink_report,
421 }))
422 }
423
424 pub async fn clean_dirty_subscription(
425 &self,
426 database_id: Option<DatabaseId>,
427 ) -> MetaResult<()> {
428 let inner = self.inner.write().await;
429 let txn = inner.db.begin().await?;
430 let filter_condition = object::Column::ObjType.eq(ObjectType::Subscription).and(
431 object::Column::Oid.not_in_subquery(
432 Query::select()
433 .column(subscription::Column::SubscriptionId)
434 .from(Subscription)
435 .and_where(
436 subscription::Column::SubscriptionState
437 .eq(SubscriptionState::Created as i32),
438 )
439 .take(),
440 ),
441 );
442
443 let filter_condition = if let Some(database_id) = database_id {
444 filter_condition.and(object::Column::DatabaseId.eq(database_id))
445 } else {
446 filter_condition
447 };
448 Object::delete_many()
449 .filter(filter_condition)
450 .exec(&txn)
451 .await?;
452 txn.commit().await?;
453 Ok(())
455 }
456
457 pub(crate) async fn clean_dirty_creating_jobs(
459 &self,
460 database_id: Option<DatabaseId>,
461 ) -> MetaResult<CleanedDirtyStreamingJobs> {
462 let mut inner = self.inner.write().await;
463 let txn = inner.db.begin().await?;
464
465 let filter_condition = streaming_job::Column::JobStatus.eq(JobStatus::Initial).or(
466 streaming_job::Column::JobStatus
467 .eq(JobStatus::Creating)
468 .and(streaming_job::Column::CreateType.eq(CreateType::Foreground)),
469 );
470
471 let filter_condition = if let Some(database_id) = database_id {
472 filter_condition.and(object::Column::DatabaseId.eq(database_id))
473 } else {
474 filter_condition
475 };
476
477 let mut dirty_job_objs: Vec<PartialObject> = streaming_job::Entity::find()
478 .select_only()
479 .columns([
480 object::Column::Oid,
481 object::Column::ObjType,
482 object::Column::SchemaId,
483 object::Column::DatabaseId,
484 ])
485 .join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
486 .filter(filter_condition)
487 .into_partial_model()
488 .all(&txn)
489 .await?;
490
491 let dirty_iceberg_jobs = find_dirty_iceberg_table_jobs(&txn, database_id).await?;
493 if !dirty_iceberg_jobs.is_empty() {
494 dirty_job_objs.extend(dirty_iceberg_jobs);
495 }
496
497 Self::clean_dirty_sink_downstreams(&txn).await?;
498
499 if dirty_job_objs.is_empty() {
500 return Ok(CleanedDirtyStreamingJobs::default());
501 }
502
503 self.log_cleaned_dirty_jobs(&dirty_job_objs, &txn).await?;
504
505 let dirty_job_ids = dirty_job_objs
506 .iter()
507 .map(|obj| obj.oid.as_job_id())
508 .collect_vec();
509 let dirty_job_table_ids = dirty_job_ids
510 .iter()
511 .map(|job_id| job_id.as_mv_table_id())
512 .collect_vec();
513
514 let all_dirty_table_ids = dirty_job_objs
517 .iter()
518 .filter(|obj| obj.obj_type == ObjectType::Table)
519 .map(|obj| obj.oid)
520 .collect_vec();
521 let dirty_table_type_map: HashMap<ObjectId, TableType> = Table::find()
522 .select_only()
523 .column(table::Column::TableId)
524 .column(table::Column::TableType)
525 .filter(table::Column::TableId.is_in(all_dirty_table_ids))
526 .into_tuple::<(ObjectId, TableType)>()
527 .all(&txn)
528 .await?
529 .into_iter()
530 .collect();
531
532 let dirty_background_jobs: HashSet<JobId> = streaming_job::Entity::find()
533 .select_only()
534 .column(streaming_job::Column::JobId)
535 .filter(
536 streaming_job::Column::JobId
537 .is_in(dirty_job_ids.iter().copied())
538 .and(streaming_job::Column::CreateType.eq(CreateType::Background)),
539 )
540 .into_tuple()
541 .all(&txn)
542 .await?
543 .into_iter()
544 .collect();
545
546 let to_notify_objs = dirty_job_objs
548 .iter()
549 .filter(|obj| {
550 matches!(
551 dirty_table_type_map.get(&obj.oid),
552 Some(TableType::MaterializedView)
553 ) || dirty_background_jobs.contains(&obj.oid.as_job_id())
554 })
555 .cloned()
556 .collect_vec();
557
558 let dirty_associated_source_ids: Vec<SourceId> = Table::find()
561 .select_only()
562 .column(table::Column::OptionalAssociatedSourceId)
563 .filter(
564 table::Column::TableId
565 .is_in(dirty_job_table_ids)
566 .and(table::Column::OptionalAssociatedSourceId.is_not_null()),
567 )
568 .into_tuple()
569 .all(&txn)
570 .await?;
571
572 let dirty_internal_state_table_ids: Vec<TableId> = Table::find()
573 .select_only()
574 .column(table::Column::TableId)
575 .filter(table::Column::BelongsToJobId.is_in(dirty_job_ids.iter().copied()))
576 .into_tuple()
577 .all(&txn)
578 .await?;
579 let dirty_state_table_ids = dirty_job_ids
580 .iter()
581 .map(|job_id| job_id.as_mv_table_id())
582 .chain(dirty_internal_state_table_ids.iter().copied())
583 .collect_vec();
584
585 let dirty_internal_table_objs = Object::find()
586 .select_only()
587 .columns([
588 object::Column::Oid,
589 object::Column::ObjType,
590 object::Column::SchemaId,
591 object::Column::DatabaseId,
592 ])
593 .join(JoinType::InnerJoin, object::Relation::Table.def())
594 .filter(table::Column::BelongsToJobId.is_in(to_notify_objs.iter().map(|obj| obj.oid)))
595 .into_partial_model()
596 .all(&txn)
597 .await?;
598
599 let to_delete_objs: HashSet<ObjectId> = dirty_job_ids
600 .iter()
601 .map(|job_id| job_id.as_object_id())
602 .chain(
603 dirty_state_table_ids
604 .iter()
605 .copied()
606 .map(|table_id| table_id.as_object_id()),
607 )
608 .chain(
609 dirty_associated_source_ids
610 .iter()
611 .map(|source_id| source_id.as_object_id()),
612 )
613 .collect();
614
615 let dropped_tables = if database_id.is_some() {
618 Table::find()
619 .find_also_related(Object)
620 .filter(table::Column::TableId.is_in(dirty_state_table_ids.iter().copied()))
621 .all(&txn)
622 .await?
623 .into_iter()
624 .map(|(table, obj)| PbTable::from(ObjectModel(table, obj.unwrap(), None)))
625 .collect_vec()
626 } else {
627 vec![]
628 };
629 let dropped_table_ids = dropped_tables.iter().map(|table| table.id).collect_vec();
630
631 let res = Object::delete_many()
632 .filter(object::Column::Oid.is_in(to_delete_objs))
633 .exec(&txn)
634 .await?;
635 assert!(res.rows_affected > 0);
636
637 txn.commit().await?;
638 inner
639 .dropped_tables
640 .extend(dropped_tables.into_iter().map(|t| (t.id, t)));
641
642 let object_group = build_object_group_for_delete(
643 to_notify_objs
644 .into_iter()
645 .chain(dirty_internal_table_objs.into_iter())
646 .collect_vec(),
647 );
648
649 let _version = self
650 .notify_frontend(NotificationOperation::Delete, object_group)
651 .await;
652
653 Ok(CleanedDirtyStreamingJobs {
654 streaming_job_ids: dirty_job_ids,
655 dropped_table_ids,
656 source_ids: dirty_associated_source_ids,
657 })
658 }
659
660 pub async fn comment_on(&self, comment: PbComment) -> MetaResult<NotificationVersion> {
661 let inner = self.inner.write().await;
662 let txn = inner.db.begin().await?;
663 ensure_object_id(ObjectType::Database, comment.database_id, &txn).await?;
664 ensure_object_id(ObjectType::Schema, comment.schema_id, &txn).await?;
665 let (table_obj, streaming_job) = Object::find_by_id(comment.table_id)
666 .find_also_related(StreamingJob)
667 .one(&txn)
668 .await?
669 .ok_or_else(|| MetaError::catalog_id_not_found("object", comment.table_id))?;
670
671 let table = if let Some(col_idx) = comment.column_index {
672 let columns: ColumnCatalogArray = Table::find_by_id(comment.table_id)
673 .select_only()
674 .column(table::Column::Columns)
675 .into_tuple()
676 .one(&txn)
677 .await?
678 .ok_or_else(|| MetaError::catalog_id_not_found("table", comment.table_id))?;
679 let mut pb_columns = columns.to_protobuf();
680
681 let column = pb_columns
682 .get_mut(col_idx as usize)
683 .ok_or_else(|| MetaError::catalog_id_not_found("column", col_idx))?;
684 let column_desc = column.column_desc.as_mut().ok_or_else(|| {
685 anyhow!(
686 "column desc at index {} for table id {} not found",
687 col_idx,
688 comment.table_id
689 )
690 })?;
691 column_desc.description = comment.description;
692 table::ActiveModel {
693 table_id: Set(comment.table_id),
694 columns: Set(pb_columns.into()),
695 ..Default::default()
696 }
697 .update(&txn)
698 .await?
699 } else {
700 table::ActiveModel {
701 table_id: Set(comment.table_id),
702 description: Set(comment.description),
703 ..Default::default()
704 }
705 .update(&txn)
706 .await?
707 };
708 txn.commit().await?;
709
710 let version = self
711 .notify_frontend_relation_info(
712 NotificationOperation::Update,
713 PbObjectInfo::Table(ObjectModel(table, table_obj, streaming_job).into()),
714 )
715 .await;
716
717 Ok(version)
718 }
719
720 async fn notify_hummock_dropped_tables(&self, tables: Vec<PbTable>) {
721 if tables.is_empty() {
722 return;
723 }
724 let objects = tables
725 .into_iter()
726 .map(|t| PbObject {
727 object_info: Some(PbObjectInfo::Table(t)),
728 })
729 .collect();
730 let group = NotificationInfo::ObjectGroup(PbObjectGroup {
731 objects,
732 dependencies: vec![],
733 });
734 self.env
735 .notification_manager()
736 .notify_hummock(NotificationOperation::Delete, group.clone())
737 .await;
738 self.env
739 .notification_manager()
740 .notify_compactor(NotificationOperation::Delete, group)
741 .await;
742 }
743
744 pub async fn complete_dropped_tables(&self, table_ids: impl IntoIterator<Item = TableId>) {
745 let mut inner = self.inner.write().await;
746 let tables = inner.complete_dropped_tables(table_ids);
747 self.notify_hummock_dropped_tables(tables).await;
748 }
749
750 pub async fn cleanup_dropped_tables(&self) {
751 let mut inner = self.inner.write().await;
752 let tables = inner.dropped_tables.drain().map(|(_, t)| t).collect();
753 self.notify_hummock_dropped_tables(tables).await;
754 }
755
756 pub async fn stats(&self) -> MetaResult<CatalogStats> {
757 let inner = self.inner.read().await;
758
759 let mut table_num_map: HashMap<_, _> = Table::find()
760 .select_only()
761 .column(table::Column::TableType)
762 .column_as(table::Column::TableId.count(), "num")
763 .group_by(table::Column::TableType)
764 .having(table::Column::TableType.ne(TableType::Internal))
765 .into_tuple::<(TableType, i64)>()
766 .all(&inner.db)
767 .await?
768 .into_iter()
769 .map(|(table_type, num)| (table_type, num as u64))
770 .collect();
771
772 let source_num = Source::find().count(&inner.db).await?;
773 let sink_num = Sink::find().count(&inner.db).await?;
774 let function_num = Function::find().count(&inner.db).await?;
775 let streaming_job_num = StreamingJob::find().count(&inner.db).await?;
776
777 let actor_num = {
778 let guard = self.env.shared_actor_info.read_guard();
779 guard
780 .iter_over_fragments()
781 .map(|(_, fragment)| fragment.actors.len() as u64)
782 .sum::<u64>()
783 };
784 let database_num = Database::find().count(&inner.db).await?;
785
786 Ok(CatalogStats {
787 table_num: table_num_map.remove(&TableType::Table).unwrap_or(0),
788 mview_num: table_num_map
789 .remove(&TableType::MaterializedView)
790 .unwrap_or(0),
791 index_num: table_num_map.remove(&TableType::Index).unwrap_or(0),
792 source_num,
793 sink_num,
794 function_num,
795 streaming_job_num,
796 actor_num,
797 database_num,
798 })
799 }
800
801 pub async fn fetch_sink_with_state_table_ids(
802 &self,
803 sink_ids: HashSet<SinkId>,
804 ) -> MetaResult<HashMap<SinkId, Vec<TableId>>> {
805 let inner = self.inner.read().await;
806
807 let query = Fragment::find()
808 .select_only()
809 .columns([fragment::Column::JobId, fragment::Column::StateTableIds])
810 .filter(
811 fragment::Column::JobId
812 .is_in(sink_ids)
813 .and(FragmentTypeMask::intersects(FragmentTypeFlag::Sink)),
814 );
815
816 let rows: Vec<(JobId, TableIdArray)> = query.into_tuple().all(&inner.db).await?;
817
818 debug_assert!(rows.iter().map(|(job_id, _)| job_id).all_unique());
819
820 let result = rows
821 .into_iter()
822 .map(|(job_id, table_id_array)| (job_id.as_sink_id(), table_id_array.0))
823 .collect::<HashMap<_, _>>();
824
825 Ok(result)
826 }
827
828 pub async fn list_all_pending_sinks(
829 &self,
830 database_id: Option<DatabaseId>,
831 ) -> MetaResult<HashSet<SinkId>> {
832 let inner = self.inner.read().await;
833
834 let mut query = pending_sink_state::Entity::find()
835 .select_only()
836 .columns([pending_sink_state::Column::SinkId])
837 .filter(
838 pending_sink_state::Column::SinkState.eq(pending_sink_state::SinkState::Pending),
839 )
840 .distinct();
841
842 if let Some(db_id) = database_id {
843 query = query
844 .join(
845 JoinType::InnerJoin,
846 pending_sink_state::Relation::Object.def(),
847 )
848 .filter(object::Column::DatabaseId.eq(db_id));
849 }
850
851 let result: Vec<SinkId> = query.into_tuple().all(&inner.db).await?;
852
853 Ok(result.into_iter().collect())
854 }
855
856 pub async fn abort_pending_sink_epochs(
857 &self,
858 sink_committed_epoch: HashMap<SinkId, u64>,
859 ) -> MetaResult<()> {
860 let inner = self.inner.write().await;
861 let txn = inner.db.begin().await?;
862
863 for (sink_id, committed_epoch) in sink_committed_epoch {
864 pending_sink_state::Entity::update_many()
865 .col_expr(
866 pending_sink_state::Column::SinkState,
867 Expr::value(pending_sink_state::SinkState::Aborted),
868 )
869 .filter(
870 pending_sink_state::Column::SinkId
871 .eq(sink_id)
872 .and(pending_sink_state::Column::Epoch.gt(committed_epoch as i64)),
873 )
874 .exec(&txn)
875 .await?;
876 }
877
878 txn.commit().await?;
879 Ok(())
880 }
881}
882
883pub struct CatalogStats {
885 pub table_num: u64,
886 pub mview_num: u64,
887 pub index_num: u64,
888 pub source_num: u64,
889 pub sink_num: u64,
890 pub function_num: u64,
891 pub streaming_job_num: u64,
892 pub actor_num: u64,
893 pub database_num: u64,
894}
895
896impl CatalogControllerInner {
897 pub async fn snapshot(&self) -> MetaResult<(Catalog, Vec<PbUserInfo>)> {
898 let databases = self.list_databases().await?;
899 let schemas = self.list_schemas().await?;
900 let tables = self.list_tables().await?;
901 let sources = self.list_sources().await?;
902 let sinks = self.list_sinks().await?;
903 let subscriptions = self.list_subscriptions().await?;
904 let indexes = self.list_indexes().await?;
905 let views = self.list_views().await?;
906 let functions = self.list_functions().await?;
907 let connections = self.list_connections().await?;
908 let secrets = self.list_secrets().await?;
909
910 let users = self.list_users().await?;
911
912 Ok((
913 (
914 databases,
915 schemas,
916 tables,
917 sources,
918 sinks,
919 subscriptions,
920 indexes,
921 views,
922 functions,
923 connections,
924 secrets,
925 ),
926 users,
927 ))
928 }
929
930 async fn list_databases(&self) -> MetaResult<Vec<PbDatabase>> {
931 let db_objs = Database::find()
932 .find_also_related(Object)
933 .all(&self.db)
934 .await?;
935 Ok(db_objs
936 .into_iter()
937 .map(|(db, obj)| ObjectModel(db, obj.unwrap(), None).into())
938 .collect())
939 }
940
941 async fn list_schemas(&self) -> MetaResult<Vec<PbSchema>> {
942 let schema_objs = Schema::find()
943 .find_also_related(Object)
944 .all(&self.db)
945 .await?;
946
947 Ok(schema_objs
948 .into_iter()
949 .map(|(schema, obj)| ObjectModel(schema, obj.unwrap(), None).into())
950 .collect())
951 }
952
953 async fn list_users(&self) -> MetaResult<Vec<PbUserInfo>> {
954 let mut user_infos: Vec<PbUserInfo> = User::find()
955 .all(&self.db)
956 .await?
957 .into_iter()
958 .map(Into::into)
959 .collect();
960
961 for user_info in &mut user_infos {
962 user_info.grant_privileges = get_user_privilege(user_info.id as _, &self.db).await?;
963 }
964 Ok(user_infos)
965 }
966
967 pub async fn list_all_state_tables(&self) -> MetaResult<Vec<PbTable>> {
969 let table_objs = Table::find()
970 .find_also_related(Object)
971 .all(&self.db)
972 .await?;
973 let streaming_jobs = load_streaming_jobs_by_ids(
974 &self.db,
975 table_objs.iter().map(|(table, _)| table.job_id()),
976 )
977 .await?;
978
979 Ok(table_objs
980 .into_iter()
981 .map(|(table, obj)| {
982 let job_id = table.job_id();
983 let streaming_job = streaming_jobs.get(&job_id).cloned();
984 ObjectModel(table, obj.unwrap(), streaming_job).into()
985 })
986 .collect())
987 }
988
989 async fn list_tables(&self) -> MetaResult<Vec<PbTable>> {
991 let mut table_objs = Table::find()
992 .find_also_related(Object)
993 .all(&self.db)
994 .await?;
995
996 let all_streaming_jobs: HashMap<JobId, streaming_job::Model> = StreamingJob::find()
997 .all(&self.db)
998 .await?
999 .into_iter()
1000 .map(|job| (job.job_id, job))
1001 .collect();
1002
1003 let sink_ids: HashSet<SinkId> = Sink::find()
1004 .select_only()
1005 .column(sink::Column::SinkId)
1006 .into_tuple::<SinkId>()
1007 .all(&self.db)
1008 .await?
1009 .into_iter()
1010 .collect();
1011
1012 let mview_job_ids: HashSet<JobId> = table_objs
1013 .iter()
1014 .filter_map(|(table, _)| {
1015 if table.table_type == TableType::MaterializedView {
1016 Some(table.table_id.as_job_id())
1017 } else {
1018 None
1019 }
1020 })
1021 .collect();
1022
1023 table_objs.retain(|(table, _)| {
1024 let job_id = table.job_id();
1025
1026 if sink_ids.contains(&job_id.as_sink_id()) || mview_job_ids.contains(&job_id) {
1027 return true;
1028 }
1029 if let Some(streaming_job) = all_streaming_jobs.get(&job_id) {
1030 return streaming_job.job_status == JobStatus::Created
1031 || (streaming_job.create_type == CreateType::Background);
1032 }
1033 false
1034 });
1035
1036 let mut tables = Vec::with_capacity(table_objs.len());
1037 for (table, obj) in table_objs {
1038 let job_id = table.job_id();
1039 let streaming_job = all_streaming_jobs.get(&job_id).cloned();
1040 let pb_table: PbTable = ObjectModel(table, obj.unwrap(), streaming_job).into();
1041 tables.push(pb_table);
1042 }
1043 Ok(tables)
1044 }
1045
1046 async fn list_sources(&self) -> MetaResult<Vec<PbSource>> {
1048 let mut source_objs = Source::find()
1049 .find_also_related(Object)
1050 .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
1051 .filter(
1052 streaming_job::Column::JobStatus
1053 .is_null()
1054 .or(streaming_job::Column::JobStatus.eq(JobStatus::Created)),
1055 )
1056 .all(&self.db)
1057 .await?;
1058
1059 let created_table_ids: HashSet<TableId> = Table::find()
1061 .select_only()
1062 .column(table::Column::TableId)
1063 .join(JoinType::InnerJoin, table::Relation::Object1.def())
1064 .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
1065 .filter(
1066 table::Column::OptionalAssociatedSourceId
1067 .is_not_null()
1068 .and(streaming_job::Column::JobStatus.eq(JobStatus::Created)),
1069 )
1070 .into_tuple()
1071 .all(&self.db)
1072 .await?
1073 .into_iter()
1074 .collect();
1075 source_objs.retain_mut(|(source, _)| {
1076 source.optional_associated_table_id.is_none()
1077 || created_table_ids.contains(&source.optional_associated_table_id.unwrap())
1078 });
1079
1080 Ok(source_objs
1081 .into_iter()
1082 .map(|(source, obj)| ObjectModel(source, obj.unwrap(), None).into())
1083 .collect())
1084 }
1085
1086 async fn list_sinks(&self) -> MetaResult<Vec<PbSink>> {
1088 let sink_objs = Sink::find()
1089 .find_also_related(Object)
1090 .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
1091 .all(&self.db)
1092 .await?;
1093 let streaming_jobs = load_streaming_jobs_by_ids(
1094 &self.db,
1095 sink_objs.iter().map(|(sink, _)| sink.sink_id.as_job_id()),
1096 )
1097 .await?;
1098
1099 Ok(sink_objs
1100 .into_iter()
1101 .map(|(sink, obj)| {
1102 let streaming_job = streaming_jobs.get(&sink.sink_id.as_job_id()).cloned();
1103 ObjectModel(sink, obj.unwrap(), streaming_job).into()
1104 })
1105 .collect())
1106 }
1107
1108 async fn list_subscriptions(&self) -> MetaResult<Vec<PbSubscription>> {
1110 let subscription_objs = Subscription::find()
1111 .find_also_related(Object)
1112 .filter(subscription::Column::SubscriptionState.eq(SubscriptionState::Created as i32))
1113 .all(&self.db)
1114 .await?;
1115
1116 Ok(subscription_objs
1117 .into_iter()
1118 .map(|(subscription, obj)| ObjectModel(subscription, obj.unwrap(), None).into())
1119 .collect())
1120 }
1121
1122 async fn list_views(&self) -> MetaResult<Vec<PbView>> {
1123 let view_objs = View::find().find_also_related(Object).all(&self.db).await?;
1124
1125 Ok(view_objs
1126 .into_iter()
1127 .map(|(view, obj)| ObjectModel(view, obj.unwrap(), None).into())
1128 .collect())
1129 }
1130
1131 async fn list_indexes(&self) -> MetaResult<Vec<PbIndex>> {
1133 let index_objs = Index::find()
1134 .find_also_related(Object)
1135 .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
1136 .filter(
1137 streaming_job::Column::JobStatus
1138 .eq(JobStatus::Created)
1139 .or(streaming_job::Column::CreateType.eq(CreateType::Background)),
1140 )
1141 .all(&self.db)
1142 .await?;
1143 let streaming_jobs = load_streaming_jobs_by_ids(
1144 &self.db,
1145 index_objs
1146 .iter()
1147 .map(|(index, _)| index.index_id.as_job_id()),
1148 )
1149 .await?;
1150
1151 Ok(index_objs
1152 .into_iter()
1153 .map(|(index, obj)| {
1154 let streaming_job = streaming_jobs.get(&index.index_id.as_job_id()).cloned();
1155 ObjectModel(index, obj.unwrap(), streaming_job).into()
1156 })
1157 .collect())
1158 }
1159
1160 async fn list_connections(&self) -> MetaResult<Vec<PbConnection>> {
1161 let conn_objs = Connection::find()
1162 .find_also_related(Object)
1163 .all(&self.db)
1164 .await?;
1165
1166 Ok(conn_objs
1167 .into_iter()
1168 .map(|(conn, obj)| ObjectModel(conn, obj.unwrap(), None).into())
1169 .collect())
1170 }
1171
1172 pub async fn list_secrets(&self) -> MetaResult<Vec<PbSecret>> {
1173 let secret_objs = Secret::find()
1174 .find_also_related(Object)
1175 .all(&self.db)
1176 .await?;
1177 Ok(secret_objs
1178 .into_iter()
1179 .map(|(secret, obj)| ObjectModel(secret, obj.unwrap(), None).into())
1180 .collect())
1181 }
1182
1183 async fn list_functions(&self) -> MetaResult<Vec<PbFunction>> {
1184 let func_objs = Function::find()
1185 .find_also_related(Object)
1186 .all(&self.db)
1187 .await?;
1188
1189 Ok(func_objs
1190 .into_iter()
1191 .map(|(func, obj)| ObjectModel(func, obj.unwrap(), None).into())
1192 .collect())
1193 }
1194
1195 pub(crate) fn register_finish_notifier(
1196 &mut self,
1197 database_id: DatabaseId,
1198 id: JobId,
1199 sender: Sender<Result<NotificationVersion, String>>,
1200 ) {
1201 self.creating_table_finish_notifier
1202 .entry(database_id)
1203 .or_default()
1204 .entry(id)
1205 .or_default()
1206 .push(sender);
1207 }
1208
1209 pub(crate) async fn streaming_job_is_finished(&mut self, id: JobId) -> MetaResult<bool> {
1210 let status = StreamingJob::find()
1211 .select_only()
1212 .column(streaming_job::Column::JobStatus)
1213 .filter(streaming_job::Column::JobId.eq(id))
1214 .into_tuple::<JobStatus>()
1215 .one(&self.db)
1216 .await?;
1217
1218 status
1219 .map(|status| status == JobStatus::Created)
1220 .ok_or_else(|| {
1221 MetaError::catalog_id_not_found("streaming job", "may have been cancelled/dropped")
1222 })
1223 }
1224
1225 pub(crate) fn notify_finish_failed(&mut self, database_id: Option<DatabaseId>, err: String) {
1226 if let Some(database_id) = database_id {
1227 if let Some(creating_tables) = self.creating_table_finish_notifier.remove(&database_id)
1228 {
1229 for tx in creating_tables.into_values().flatten() {
1230 let _ = tx.send(Err(err.clone()));
1231 }
1232 }
1233 } else {
1234 for tx in take(&mut self.creating_table_finish_notifier)
1235 .into_values()
1236 .flatten()
1237 .flat_map(|(_, txs)| txs.into_iter())
1238 {
1239 let _ = tx.send(Err(err.clone()));
1240 }
1241 }
1242 }
1243
1244 pub async fn list_time_travel_table_ids(&self) -> MetaResult<Vec<TableId>> {
1245 let table_ids: Vec<TableId> = Table::find()
1246 .select_only()
1247 .filter(table::Column::TableType.is_in(vec![
1248 TableType::Table,
1249 TableType::MaterializedView,
1250 TableType::Index,
1251 ]))
1252 .column(table::Column::TableId)
1253 .into_tuple()
1254 .all(&self.db)
1255 .await?;
1256 Ok(table_ids)
1257 }
1258
1259 pub(crate) fn complete_dropped_tables(
1262 &mut self,
1263 table_ids: impl IntoIterator<Item = TableId>,
1264 ) -> Vec<PbTable> {
1265 table_ids
1266 .into_iter()
1267 .filter_map(|table_id| {
1268 self.dropped_tables.remove(&table_id).map_or_else(
1269 || {
1270 tracing::warn!(%table_id, "table not found");
1271 None
1272 },
1273 Some,
1274 )
1275 })
1276 .collect()
1277 }
1278}