1mod alter_op;
16mod create_op;
17mod drop_op;
18mod get_op;
19mod list_op;
20mod test;
21mod util;
22
23use std::collections::{BTreeSet, HashMap, HashSet};
24use std::iter;
25use std::mem::take;
26use std::sync::Arc;
27
28use anyhow::anyhow;
29use itertools::Itertools;
30use risingwave_common::catalog::{
31 DEFAULT_SCHEMA_NAME, FragmentTypeFlag, FragmentTypeMask, SYSTEM_SCHEMAS, TableOption,
32};
33use risingwave_common::current_cluster_version;
34use risingwave_common::id::JobId;
35use risingwave_common::secret::LocalSecretManager;
36use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont_mut;
37use risingwave_connector::source::UPSTREAM_SOURCE_KEY;
38use risingwave_connector::source::cdc::build_cdc_table_id;
39use risingwave_meta_model::object::ObjectType;
40use risingwave_meta_model::prelude::*;
41use risingwave_meta_model::table::TableType;
42use risingwave_meta_model::{
43 ColumnCatalogArray, ConnectionId, CreateType, DatabaseId, FragmentId, I32Array, IndexId,
44 JobStatus, ObjectId, Property, SchemaId, SecretId, SinkFormatDesc, SinkId, SourceId,
45 StreamNode, StreamSourceInfo, StreamingParallelism, SubscriptionId, TableId, TableIdArray,
46 UserId, ViewId, connection, database, fragment, function, index, object, object_dependency,
47 pending_sink_state, schema, secret, sink, source, streaming_job, subscription, table,
48 user_privilege, view,
49};
50use risingwave_pb::catalog::connection::Info as ConnectionInfo;
51use risingwave_pb::catalog::subscription::SubscriptionState;
52use risingwave_pb::catalog::table::PbTableType;
53use risingwave_pb::catalog::{
54 PbComment, PbConnection, PbDatabase, PbFunction, PbIndex, PbSchema, PbSecret, PbSink, PbSource,
55 PbSubscription, PbTable, PbView,
56};
57use risingwave_pb::meta::cancel_creating_jobs_request::PbCreatingJobInfo;
58use risingwave_pb::meta::object::PbObjectInfo;
59use risingwave_pb::meta::subscribe_response::{
60 Info as NotificationInfo, Info, Operation as NotificationOperation, Operation,
61};
62use risingwave_pb::meta::{PbObject, PbObjectGroup};
63use risingwave_pb::stream_plan::stream_node::NodeBody;
64use risingwave_pb::telemetry::PbTelemetryEventStage;
65use risingwave_pb::user::PbUserInfo;
66use sea_orm::ActiveValue::Set;
67use sea_orm::sea_query::{Expr, Query, SimpleExpr};
68use sea_orm::{
69 ActiveModelTrait, ColumnTrait, DatabaseConnection, DatabaseTransaction, EntityTrait,
70 IntoActiveModel, JoinType, PaginatorTrait, QueryFilter, QuerySelect, RelationTrait,
71 SelectColumns, TransactionTrait, Value,
72};
73use tokio::sync::oneshot::Sender;
74use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
75use tracing::info;
76
77use super::utils::{
78 check_subscription_name_duplicate, get_internal_tables_by_id, load_streaming_jobs_by_ids,
79 rename_relation, rename_relation_refer,
80};
81use crate::controller::ObjectModel;
82use crate::controller::catalog::util::update_internal_tables;
83use crate::controller::fragment::FragmentTypeMaskExt;
84use crate::controller::utils::*;
85use crate::manager::{
86 IGNORED_NOTIFICATION_VERSION, MetaSrvEnv, NotificationVersion,
87 get_referred_connection_ids_from_source, get_referred_secret_ids_from_source,
88};
89use crate::rpc::ddl_controller::DropMode;
90use crate::telemetry::{MetaTelemetryJobDesc, report_event};
91use crate::{MetaError, MetaResult};
92
93pub type Catalog = (
94 Vec<PbDatabase>,
95 Vec<PbSchema>,
96 Vec<PbTable>,
97 Vec<PbSource>,
98 Vec<PbSink>,
99 Vec<PbSubscription>,
100 Vec<PbIndex>,
101 Vec<PbView>,
102 Vec<PbFunction>,
103 Vec<PbConnection>,
104 Vec<PbSecret>,
105);
106
107pub type CatalogControllerRef = Arc<CatalogController>;
108
109pub struct CatalogController {
111 pub(crate) env: MetaSrvEnv,
112 pub(crate) inner: RwLock<CatalogControllerInner>,
113}
114
115#[derive(Clone, Default, Debug)]
116pub struct DropTableConnectorContext {
117 pub(crate) to_change_streaming_job_id: JobId,
119 pub(crate) to_remove_state_table_id: TableId,
120 pub(crate) to_remove_source_id: SourceId,
121}
122
123#[derive(Clone, Default, Debug)]
124pub struct ReleaseContext {
125 pub(crate) database_id: DatabaseId,
126 pub(crate) removed_streaming_job_ids: Vec<JobId>,
127 pub(crate) removed_state_table_ids: Vec<TableId>,
129
130 pub(crate) removed_secret_ids: Vec<SecretId>,
132 pub(crate) removed_source_ids: Vec<SourceId>,
134 pub(crate) removed_source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
137
138 pub(crate) removed_fragments: HashSet<FragmentId>,
139
140 pub(crate) removed_sink_fragment_by_targets: HashMap<FragmentId, Vec<FragmentId>>,
142
143 pub(crate) removed_iceberg_table_sinks: Vec<PbSink>,
145
146 pub(crate) removed_iceberg_v3_sink_ids: Vec<SinkId>,
150}
151
152#[derive(Default)]
153pub(crate) struct CleanedDirtyStreamingJobs {
154 pub(crate) streaming_job_ids: Vec<JobId>,
155 pub(crate) dropped_table_ids: Vec<TableId>,
157 pub(crate) source_ids: Vec<SourceId>,
158}
159
160impl CatalogController {
161 pub async fn new(env: MetaSrvEnv) -> MetaResult<Self> {
162 let meta_store = env.meta_store();
163 let catalog_controller = Self {
164 env,
165 inner: RwLock::new(CatalogControllerInner {
166 db: meta_store.conn,
167 creating_table_finish_notifier: HashMap::new(),
168 dropped_tables: HashMap::new(),
169 }),
170 };
171
172 catalog_controller.init().await?;
173 Ok(catalog_controller)
174 }
175
176 pub async fn get_inner_read_guard(&self) -> RwLockReadGuard<'_, CatalogControllerInner> {
179 self.inner.read().await
180 }
181
182 pub async fn get_inner_write_guard(&self) -> RwLockWriteGuard<'_, CatalogControllerInner> {
183 self.inner.write().await
184 }
185}
186
187pub struct CatalogControllerInner {
188 pub(crate) db: DatabaseConnection,
189 #[expect(clippy::type_complexity)]
194 pub creating_table_finish_notifier:
195 HashMap<DatabaseId, HashMap<JobId, Vec<Sender<Result<NotificationVersion, String>>>>>,
196 pub dropped_tables: HashMap<TableId, PbTable>,
198}
199
200impl CatalogController {
201 pub(crate) async fn notify_frontend(
202 &self,
203 operation: NotificationOperation,
204 info: NotificationInfo,
205 ) -> NotificationVersion {
206 self.env
207 .notification_manager()
208 .notify_frontend(operation, info)
209 .await
210 }
211
212 pub(crate) async fn notify_frontend_relation_info(
213 &self,
214 operation: NotificationOperation,
215 relation_info: PbObjectInfo,
216 ) -> NotificationVersion {
217 self.env
218 .notification_manager()
219 .notify_frontend_object_info(operation, relation_info)
220 .await
221 }
222
223 pub(crate) async fn notify_frontend_trivial(&self) -> NotificationVersion {
230 self.env
231 .notification_manager()
232 .notify_frontend(
233 NotificationOperation::Update,
234 NotificationInfo::ObjectGroup(PbObjectGroup {
235 objects: vec![],
236 dependencies: vec![],
237 }),
238 )
239 .await
240 }
241}
242
243impl CatalogController {
244 pub async fn finish_create_subscription_catalog(
245 &self,
246 subscription_id: SubscriptionId,
247 ) -> MetaResult<()> {
248 let inner = self.inner.write().await;
249 let txn = inner.db.begin().await?;
250
251 let res = Object::update_many()
253 .col_expr(object::Column::CreatedAt, Expr::current_timestamp().into())
254 .col_expr(
255 object::Column::CreatedAtClusterVersion,
256 current_cluster_version().into(),
257 )
258 .filter(object::Column::Oid.eq(subscription_id))
259 .exec(&txn)
260 .await?;
261 if res.rows_affected == 0 {
262 return Err(MetaError::catalog_id_not_found(
263 "subscription",
264 subscription_id,
265 ));
266 }
267
268 let job = subscription::ActiveModel {
270 subscription_id: Set(subscription_id),
271 subscription_state: Set(SubscriptionState::Created.into()),
272 ..Default::default()
273 };
274 Subscription::update(job).exec(&txn).await?;
275
276 let _ = grant_default_privileges_automatically(&txn, subscription_id).await?;
277
278 txn.commit().await?;
279
280 Ok(())
281 }
282
283 pub async fn notify_create_subscription(
284 &self,
285 subscription_id: SubscriptionId,
286 ) -> MetaResult<NotificationVersion> {
287 let inner = self.inner.read().await;
288 let txn = inner.db.begin().await?;
289 let (subscription, obj) = Subscription::find_by_id(subscription_id)
290 .find_also_related(Object)
291 .filter(subscription::Column::SubscriptionState.eq(SubscriptionState::Created as i32))
292 .one(&txn)
293 .await?
294 .ok_or_else(|| MetaError::catalog_id_not_found("subscription", subscription_id))?;
295
296 let dependencies =
297 list_object_dependencies_by_object_id(&txn, subscription_id.into()).await?;
298 txn.commit().await?;
299
300 let mut version = self
301 .notify_frontend(
302 NotificationOperation::Add,
303 NotificationInfo::ObjectGroup(PbObjectGroup {
304 objects: vec![PbObject {
305 object_info: PbObjectInfo::Subscription(
306 ObjectModel(subscription, obj.unwrap(), None).into(),
307 )
308 .into(),
309 }],
310 dependencies,
311 }),
312 )
313 .await;
314
315 let updated_user_ids: Vec<UserId> = UserPrivilege::find()
317 .select_only()
318 .distinct()
319 .column(user_privilege::Column::UserId)
320 .filter(user_privilege::Column::Oid.eq(subscription_id.as_object_id()))
321 .into_tuple()
322 .all(&inner.db)
323 .await?;
324
325 if !updated_user_ids.is_empty() {
326 let updated_user_infos = list_user_info_by_ids(updated_user_ids, &inner.db).await?;
327 version = self.notify_users_update(updated_user_infos).await;
328 }
329
330 Ok(version)
331 }
332
333 pub async fn get_connector_usage(&self) -> MetaResult<jsonbb::Value> {
335 let inner = self.inner.read().await;
355 let source_props_and_info: Vec<(i32, Property, Option<StreamSourceInfo>)> = Source::find()
356 .select_only()
357 .column(source::Column::SourceId)
358 .column(source::Column::WithProperties)
359 .column(source::Column::SourceInfo)
360 .into_tuple()
361 .all(&inner.db)
362 .await?;
363 let sink_props_and_info: Vec<(i32, Property, Option<SinkFormatDesc>)> = Sink::find()
364 .select_only()
365 .column(sink::Column::SinkId)
366 .column(sink::Column::Properties)
367 .column(sink::Column::SinkFormatDesc)
368 .into_tuple()
369 .all(&inner.db)
370 .await?;
371 drop(inner);
372
373 let get_connector_from_property = |property: &Property| -> String {
374 property
375 .0
376 .get(UPSTREAM_SOURCE_KEY)
377 .cloned()
378 .unwrap_or_default()
379 };
380
381 let source_report: Vec<jsonbb::Value> = source_props_and_info
382 .iter()
383 .map(|(oid, property, info)| {
384 let connector_name = get_connector_from_property(property);
385 let mut format = None;
386 let mut encode = None;
387 if let Some(info) = info {
388 let pb_info = info.to_protobuf();
389 format = Some(pb_info.format().as_str_name());
390 encode = Some(pb_info.row_encode().as_str_name());
391 }
392 jsonbb::json!({
393 oid.to_string(): {
394 "connector": connector_name,
395 "format": format,
396 "encode": encode,
397 },
398 })
399 })
400 .collect_vec();
401
402 let sink_report: Vec<jsonbb::Value> = sink_props_and_info
403 .iter()
404 .map(|(oid, property, info)| {
405 let connector_name = get_connector_from_property(property);
406 let mut format = None;
407 let mut encode = None;
408 if let Some(info) = info {
409 let pb_info = info.to_protobuf();
410 format = Some(pb_info.format().as_str_name());
411 encode = Some(pb_info.encode().as_str_name());
412 }
413 jsonbb::json!({
414 oid.to_string(): {
415 "connector": connector_name,
416 "format": format,
417 "encode": encode,
418 },
419 })
420 })
421 .collect_vec();
422
423 Ok(jsonbb::json!({
424 "source": source_report,
425 "sink": sink_report,
426 }))
427 }
428
429 pub async fn clean_dirty_subscription(
430 &self,
431 database_id: Option<DatabaseId>,
432 ) -> MetaResult<()> {
433 let inner = self.inner.write().await;
434 let txn = inner.db.begin().await?;
435 let filter_condition = object::Column::ObjType.eq(ObjectType::Subscription).and(
436 object::Column::Oid.not_in_subquery(
437 Query::select()
438 .column(subscription::Column::SubscriptionId)
439 .from(Subscription)
440 .and_where(
441 subscription::Column::SubscriptionState
442 .eq(SubscriptionState::Created as i32),
443 )
444 .take(),
445 ),
446 );
447
448 let filter_condition = if let Some(database_id) = database_id {
449 filter_condition.and(object::Column::DatabaseId.eq(database_id))
450 } else {
451 filter_condition
452 };
453 Object::delete_many()
454 .filter(filter_condition)
455 .exec(&txn)
456 .await?;
457 txn.commit().await?;
458 Ok(())
460 }
461
462 pub(crate) async fn clean_dirty_creating_jobs(
464 &self,
465 database_id: Option<DatabaseId>,
466 ) -> MetaResult<CleanedDirtyStreamingJobs> {
467 let mut inner = self.inner.write().await;
468 let txn = inner.db.begin().await?;
469
470 let filter_condition = streaming_job::Column::JobStatus.eq(JobStatus::Initial).or(
471 streaming_job::Column::JobStatus
472 .eq(JobStatus::Creating)
473 .and(streaming_job::Column::CreateType.eq(CreateType::Foreground)),
474 );
475
476 let filter_condition = if let Some(database_id) = database_id {
477 filter_condition.and(object::Column::DatabaseId.eq(database_id))
478 } else {
479 filter_condition
480 };
481
482 let mut dirty_job_objs: Vec<PartialObject> = streaming_job::Entity::find()
483 .select_only()
484 .columns([
485 object::Column::Oid,
486 object::Column::ObjType,
487 object::Column::SchemaId,
488 object::Column::DatabaseId,
489 ])
490 .join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
491 .filter(filter_condition)
492 .into_partial_model()
493 .all(&txn)
494 .await?;
495
496 let dirty_iceberg_jobs = find_dirty_iceberg_table_jobs(&txn, database_id).await?;
498 if !dirty_iceberg_jobs.is_empty() {
499 dirty_job_objs.extend(dirty_iceberg_jobs);
500 }
501
502 Self::clean_dirty_sink_downstreams(&txn).await?;
503
504 if dirty_job_objs.is_empty() {
505 return Ok(CleanedDirtyStreamingJobs::default());
506 }
507
508 self.log_cleaned_dirty_jobs(&dirty_job_objs, &txn).await?;
509
510 let dirty_job_ids = dirty_job_objs
511 .iter()
512 .map(|obj| obj.oid.as_job_id())
513 .collect_vec();
514 let dirty_job_table_ids = dirty_job_ids
515 .iter()
516 .map(|job_id| job_id.as_mv_table_id())
517 .collect_vec();
518
519 let all_dirty_table_ids = dirty_job_objs
522 .iter()
523 .filter(|obj| obj.obj_type == ObjectType::Table)
524 .map(|obj| obj.oid)
525 .collect_vec();
526 let dirty_table_type_map: HashMap<ObjectId, TableType> = Table::find()
527 .select_only()
528 .column(table::Column::TableId)
529 .column(table::Column::TableType)
530 .filter(table::Column::TableId.is_in(all_dirty_table_ids))
531 .into_tuple::<(ObjectId, TableType)>()
532 .all(&txn)
533 .await?
534 .into_iter()
535 .collect();
536
537 let dirty_background_jobs: HashSet<JobId> = streaming_job::Entity::find()
538 .select_only()
539 .column(streaming_job::Column::JobId)
540 .filter(
541 streaming_job::Column::JobId
542 .is_in(dirty_job_ids.iter().copied())
543 .and(streaming_job::Column::CreateType.eq(CreateType::Background)),
544 )
545 .into_tuple()
546 .all(&txn)
547 .await?
548 .into_iter()
549 .collect();
550
551 let to_notify_objs = dirty_job_objs
553 .iter()
554 .filter(|obj| {
555 matches!(
556 dirty_table_type_map.get(&obj.oid),
557 Some(TableType::MaterializedView)
558 ) || dirty_background_jobs.contains(&obj.oid.as_job_id())
559 })
560 .cloned()
561 .collect_vec();
562
563 let dirty_associated_source_ids: Vec<SourceId> = Table::find()
566 .select_only()
567 .column(table::Column::OptionalAssociatedSourceId)
568 .filter(
569 table::Column::TableId
570 .is_in(dirty_job_table_ids)
571 .and(table::Column::OptionalAssociatedSourceId.is_not_null()),
572 )
573 .into_tuple()
574 .all(&txn)
575 .await?;
576
577 let dirty_internal_state_table_ids: Vec<TableId> = Table::find()
578 .select_only()
579 .column(table::Column::TableId)
580 .filter(table::Column::BelongsToJobId.is_in(dirty_job_ids.iter().copied()))
581 .into_tuple()
582 .all(&txn)
583 .await?;
584 let dirty_state_table_ids = dirty_job_ids
585 .iter()
586 .map(|job_id| job_id.as_mv_table_id())
587 .chain(dirty_internal_state_table_ids.iter().copied())
588 .collect_vec();
589
590 let dirty_internal_table_objs = Object::find()
591 .select_only()
592 .columns([
593 object::Column::Oid,
594 object::Column::ObjType,
595 object::Column::SchemaId,
596 object::Column::DatabaseId,
597 ])
598 .join(JoinType::InnerJoin, object::Relation::Table.def())
599 .filter(table::Column::BelongsToJobId.is_in(to_notify_objs.iter().map(|obj| obj.oid)))
600 .into_partial_model()
601 .all(&txn)
602 .await?;
603
604 let to_delete_objs: HashSet<ObjectId> = dirty_job_ids
605 .iter()
606 .map(|job_id| job_id.as_object_id())
607 .chain(
608 dirty_state_table_ids
609 .iter()
610 .copied()
611 .map(|table_id| table_id.as_object_id()),
612 )
613 .chain(
614 dirty_associated_source_ids
615 .iter()
616 .map(|source_id| source_id.as_object_id()),
617 )
618 .collect();
619
620 let dropped_tables = if database_id.is_some() {
623 Table::find()
624 .find_also_related(Object)
625 .filter(table::Column::TableId.is_in(dirty_state_table_ids.iter().copied()))
626 .all(&txn)
627 .await?
628 .into_iter()
629 .map(|(table, obj)| PbTable::from(ObjectModel(table, obj.unwrap(), None)))
630 .collect_vec()
631 } else {
632 vec![]
633 };
634 let dropped_table_ids = dropped_tables.iter().map(|table| table.id).collect_vec();
635
636 let res = Object::delete_many()
637 .filter(object::Column::Oid.is_in(to_delete_objs))
638 .exec(&txn)
639 .await?;
640 assert!(res.rows_affected > 0);
641
642 txn.commit().await?;
643 inner
644 .dropped_tables
645 .extend(dropped_tables.into_iter().map(|t| (t.id, t)));
646
647 let object_group = build_object_group_for_delete(
648 to_notify_objs
649 .into_iter()
650 .chain(dirty_internal_table_objs)
651 .collect_vec(),
652 );
653
654 let _version = self
655 .notify_frontend(NotificationOperation::Delete, object_group)
656 .await;
657
658 Ok(CleanedDirtyStreamingJobs {
659 streaming_job_ids: dirty_job_ids,
660 dropped_table_ids,
661 source_ids: dirty_associated_source_ids,
662 })
663 }
664
665 pub async fn comment_on(&self, comment: PbComment) -> MetaResult<NotificationVersion> {
666 let inner = self.inner.write().await;
667 let txn = inner.db.begin().await?;
668 ensure_object_id(ObjectType::Database, comment.database_id, &txn).await?;
669 ensure_object_id(ObjectType::Schema, comment.schema_id, &txn).await?;
670 let (table_obj, streaming_job) = Object::find_by_id(comment.table_id)
671 .find_also_related(StreamingJob)
672 .one(&txn)
673 .await?
674 .ok_or_else(|| MetaError::catalog_id_not_found("object", comment.table_id))?;
675
676 let table = if let Some(col_idx) = comment.column_index {
677 let columns: ColumnCatalogArray = Table::find_by_id(comment.table_id)
678 .select_only()
679 .column(table::Column::Columns)
680 .into_tuple()
681 .one(&txn)
682 .await?
683 .ok_or_else(|| MetaError::catalog_id_not_found("table", comment.table_id))?;
684 let mut pb_columns = columns.to_protobuf();
685
686 let column = pb_columns
687 .get_mut(col_idx as usize)
688 .ok_or_else(|| MetaError::catalog_id_not_found("column", col_idx))?;
689 let column_desc = column.column_desc.as_mut().ok_or_else(|| {
690 anyhow!(
691 "column desc at index {} for table id {} not found",
692 col_idx,
693 comment.table_id
694 )
695 })?;
696 column_desc.description = comment.description;
697 table::ActiveModel {
698 table_id: Set(comment.table_id),
699 columns: Set(pb_columns.into()),
700 ..Default::default()
701 }
702 .update(&txn)
703 .await?
704 } else {
705 table::ActiveModel {
706 table_id: Set(comment.table_id),
707 description: Set(comment.description),
708 ..Default::default()
709 }
710 .update(&txn)
711 .await?
712 };
713 txn.commit().await?;
714
715 let version = self
716 .notify_frontend_relation_info(
717 NotificationOperation::Update,
718 PbObjectInfo::Table(ObjectModel(table, table_obj, streaming_job).into()),
719 )
720 .await;
721
722 Ok(version)
723 }
724
725 async fn notify_hummock_dropped_tables(&self, tables: Vec<PbTable>) {
726 if tables.is_empty() {
727 return;
728 }
729 let objects = tables
730 .into_iter()
731 .map(|t| PbObject {
732 object_info: Some(PbObjectInfo::Table(t)),
733 })
734 .collect();
735 let group = NotificationInfo::ObjectGroup(PbObjectGroup {
736 objects,
737 dependencies: vec![],
738 });
739 self.env
740 .notification_manager()
741 .notify_hummock(NotificationOperation::Delete, group.clone())
742 .await;
743 self.env
744 .notification_manager()
745 .notify_compactor(NotificationOperation::Delete, group)
746 .await;
747 }
748
749 pub async fn complete_dropped_tables(&self, table_ids: impl IntoIterator<Item = TableId>) {
750 let mut inner = self.inner.write().await;
751 let tables = inner.complete_dropped_tables(table_ids);
752 self.notify_hummock_dropped_tables(tables).await;
753 }
754
755 pub async fn cleanup_dropped_tables(&self) {
756 let mut inner = self.inner.write().await;
757 let tables = inner.dropped_tables.drain().map(|(_, t)| t).collect();
758 self.notify_hummock_dropped_tables(tables).await;
759 }
760
761 pub async fn stats(&self) -> MetaResult<CatalogStats> {
762 let inner = self.inner.read().await;
763
764 let mut table_num_map: HashMap<_, _> = Table::find()
765 .select_only()
766 .column(table::Column::TableType)
767 .column_as(table::Column::TableId.count(), "num")
768 .group_by(table::Column::TableType)
769 .having(table::Column::TableType.ne(TableType::Internal))
770 .into_tuple::<(TableType, i64)>()
771 .all(&inner.db)
772 .await?
773 .into_iter()
774 .map(|(table_type, num)| (table_type, num as u64))
775 .collect();
776
777 let source_num = Source::find().count(&inner.db).await?;
778 let sink_num = Sink::find().count(&inner.db).await?;
779 let function_num = Function::find().count(&inner.db).await?;
780 let streaming_job_num = StreamingJob::find().count(&inner.db).await?;
781
782 let actor_num = {
783 let guard = self.env.shared_actor_info.read_guard();
784 guard
785 .iter_over_fragments()
786 .map(|(_, fragment)| fragment.actors.len() as u64)
787 .sum::<u64>()
788 };
789 let database_num = Database::find().count(&inner.db).await?;
790
791 Ok(CatalogStats {
792 table_num: table_num_map.remove(&TableType::Table).unwrap_or(0),
793 mview_num: table_num_map
794 .remove(&TableType::MaterializedView)
795 .unwrap_or(0),
796 index_num: table_num_map.remove(&TableType::Index).unwrap_or(0),
797 source_num,
798 sink_num,
799 function_num,
800 streaming_job_num,
801 actor_num,
802 database_num,
803 })
804 }
805
806 pub async fn fetch_sink_with_state_table_ids(
807 &self,
808 sink_ids: HashSet<SinkId>,
809 ) -> MetaResult<HashMap<SinkId, Vec<TableId>>> {
810 let inner = self.inner.read().await;
811
812 let query = Fragment::find()
813 .select_only()
814 .columns([fragment::Column::JobId, fragment::Column::StateTableIds])
815 .filter(
816 fragment::Column::JobId
817 .is_in(sink_ids)
818 .and(FragmentTypeMask::intersects(FragmentTypeFlag::Sink)),
819 );
820
821 let rows: Vec<(JobId, TableIdArray)> = query.into_tuple().all(&inner.db).await?;
822
823 debug_assert!(rows.iter().map(|(job_id, _)| job_id).all_unique());
824
825 let result = rows
826 .into_iter()
827 .map(|(job_id, table_id_array)| (job_id.as_sink_id(), table_id_array.0))
828 .collect::<HashMap<_, _>>();
829
830 Ok(result)
831 }
832
833 pub async fn list_all_pending_sinks(
834 &self,
835 database_id: Option<DatabaseId>,
836 ) -> MetaResult<HashSet<SinkId>> {
837 let inner = self.inner.read().await;
838
839 let mut query = pending_sink_state::Entity::find()
840 .select_only()
841 .columns([pending_sink_state::Column::SinkId])
842 .filter(
843 pending_sink_state::Column::SinkState.eq(pending_sink_state::SinkState::Pending),
844 )
845 .distinct();
846
847 if let Some(db_id) = database_id {
848 query = query
849 .join(
850 JoinType::InnerJoin,
851 pending_sink_state::Relation::Object.def(),
852 )
853 .filter(object::Column::DatabaseId.eq(db_id));
854 }
855
856 let result: Vec<SinkId> = query.into_tuple().all(&inner.db).await?;
857
858 Ok(result.into_iter().collect())
859 }
860
861 pub async fn abort_pending_sink_epochs(
862 &self,
863 sink_committed_epoch: HashMap<SinkId, u64>,
864 ) -> MetaResult<()> {
865 let inner = self.inner.write().await;
866 let txn = inner.db.begin().await?;
867
868 for (sink_id, committed_epoch) in sink_committed_epoch {
869 pending_sink_state::Entity::update_many()
870 .col_expr(
871 pending_sink_state::Column::SinkState,
872 Expr::value(pending_sink_state::SinkState::Aborted),
873 )
874 .filter(
875 pending_sink_state::Column::SinkId
876 .eq(sink_id)
877 .and(pending_sink_state::Column::Epoch.gt(committed_epoch as i64)),
878 )
879 .exec(&txn)
880 .await?;
881 }
882
883 txn.commit().await?;
884 Ok(())
885 }
886}
887
888pub struct CatalogStats {
890 pub table_num: u64,
891 pub mview_num: u64,
892 pub index_num: u64,
893 pub source_num: u64,
894 pub sink_num: u64,
895 pub function_num: u64,
896 pub streaming_job_num: u64,
897 pub actor_num: u64,
898 pub database_num: u64,
899}
900
901impl CatalogControllerInner {
902 pub async fn snapshot(&self) -> MetaResult<(Catalog, Vec<PbUserInfo>)> {
903 let databases = self.list_databases().await?;
904 let schemas = self.list_schemas().await?;
905 let tables = self.list_tables().await?;
906 let sources = self.list_sources().await?;
907 let sinks = self.list_sinks().await?;
908 let subscriptions = self.list_subscriptions().await?;
909 let indexes = self.list_indexes().await?;
910 let views = self.list_views().await?;
911 let functions = self.list_functions().await?;
912 let connections = self.list_connections().await?;
913 let secrets = self.list_secrets().await?;
914
915 let users = self.list_users().await?;
916
917 Ok((
918 (
919 databases,
920 schemas,
921 tables,
922 sources,
923 sinks,
924 subscriptions,
925 indexes,
926 views,
927 functions,
928 connections,
929 secrets,
930 ),
931 users,
932 ))
933 }
934
935 async fn list_databases(&self) -> MetaResult<Vec<PbDatabase>> {
936 let db_objs = Database::find()
937 .find_also_related(Object)
938 .all(&self.db)
939 .await?;
940 Ok(db_objs
941 .into_iter()
942 .map(|(db, obj)| ObjectModel(db, obj.unwrap(), None).into())
943 .collect())
944 }
945
946 async fn list_schemas(&self) -> MetaResult<Vec<PbSchema>> {
947 let schema_objs = Schema::find()
948 .find_also_related(Object)
949 .all(&self.db)
950 .await?;
951
952 Ok(schema_objs
953 .into_iter()
954 .map(|(schema, obj)| ObjectModel(schema, obj.unwrap(), None).into())
955 .collect())
956 }
957
958 async fn list_users(&self) -> MetaResult<Vec<PbUserInfo>> {
959 let mut user_infos: Vec<PbUserInfo> = User::find()
960 .all(&self.db)
961 .await?
962 .into_iter()
963 .map(Into::into)
964 .collect();
965
966 for user_info in &mut user_infos {
967 user_info.grant_privileges = get_user_privilege(user_info.id as _, &self.db).await?;
968 }
969 Ok(user_infos)
970 }
971
972 pub async fn list_all_state_tables(&self) -> MetaResult<Vec<PbTable>> {
974 let table_objs = Table::find()
975 .find_also_related(Object)
976 .all(&self.db)
977 .await?;
978 let streaming_jobs = load_streaming_jobs_by_ids(
979 &self.db,
980 table_objs.iter().map(|(table, _)| table.job_id()),
981 )
982 .await?;
983
984 Ok(table_objs
985 .into_iter()
986 .map(|(table, obj)| {
987 let job_id = table.job_id();
988 let streaming_job = streaming_jobs.get(&job_id).cloned();
989 ObjectModel(table, obj.unwrap(), streaming_job).into()
990 })
991 .collect())
992 }
993
994 async fn list_tables(&self) -> MetaResult<Vec<PbTable>> {
996 let mut table_objs = Table::find()
997 .find_also_related(Object)
998 .all(&self.db)
999 .await?;
1000
1001 let all_streaming_jobs: HashMap<JobId, streaming_job::Model> = StreamingJob::find()
1002 .all(&self.db)
1003 .await?
1004 .into_iter()
1005 .map(|job| (job.job_id, job))
1006 .collect();
1007
1008 let sink_ids: HashSet<SinkId> = Sink::find()
1009 .select_only()
1010 .column(sink::Column::SinkId)
1011 .into_tuple::<SinkId>()
1012 .all(&self.db)
1013 .await?
1014 .into_iter()
1015 .collect();
1016
1017 let mview_job_ids: HashSet<JobId> = table_objs
1018 .iter()
1019 .filter_map(|(table, _)| {
1020 if table.table_type == TableType::MaterializedView {
1021 Some(table.table_id.as_job_id())
1022 } else {
1023 None
1024 }
1025 })
1026 .collect();
1027
1028 table_objs.retain(|(table, _)| {
1029 let job_id = table.job_id();
1030
1031 if sink_ids.contains(&job_id.as_sink_id()) || mview_job_ids.contains(&job_id) {
1032 return true;
1033 }
1034 if let Some(streaming_job) = all_streaming_jobs.get(&job_id) {
1035 return streaming_job.job_status == JobStatus::Created
1036 || (streaming_job.create_type == CreateType::Background);
1037 }
1038 false
1039 });
1040
1041 let mut tables = Vec::with_capacity(table_objs.len());
1042 for (table, obj) in table_objs {
1043 let job_id = table.job_id();
1044 let streaming_job = all_streaming_jobs.get(&job_id).cloned();
1045 let pb_table: PbTable = ObjectModel(table, obj.unwrap(), streaming_job).into();
1046 tables.push(pb_table);
1047 }
1048 Ok(tables)
1049 }
1050
1051 async fn list_sources(&self) -> MetaResult<Vec<PbSource>> {
1053 let mut source_objs = Source::find()
1054 .find_also_related(Object)
1055 .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
1056 .filter(
1057 streaming_job::Column::JobStatus
1058 .is_null()
1059 .or(streaming_job::Column::JobStatus.eq(JobStatus::Created)),
1060 )
1061 .all(&self.db)
1062 .await?;
1063
1064 let created_table_ids: HashSet<TableId> = Table::find()
1066 .select_only()
1067 .column(table::Column::TableId)
1068 .join(JoinType::InnerJoin, table::Relation::Object1.def())
1069 .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
1070 .filter(
1071 table::Column::OptionalAssociatedSourceId
1072 .is_not_null()
1073 .and(streaming_job::Column::JobStatus.eq(JobStatus::Created)),
1074 )
1075 .into_tuple()
1076 .all(&self.db)
1077 .await?
1078 .into_iter()
1079 .collect();
1080 source_objs.retain_mut(|(source, _)| {
1081 source.optional_associated_table_id.is_none()
1082 || created_table_ids.contains(&source.optional_associated_table_id.unwrap())
1083 });
1084
1085 Ok(source_objs
1086 .into_iter()
1087 .map(|(source, obj)| ObjectModel(source, obj.unwrap(), None).into())
1088 .collect())
1089 }
1090
1091 async fn list_sinks(&self) -> MetaResult<Vec<PbSink>> {
1093 let sink_objs = Sink::find()
1094 .find_also_related(Object)
1095 .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
1096 .all(&self.db)
1097 .await?;
1098 let streaming_jobs = load_streaming_jobs_by_ids(
1099 &self.db,
1100 sink_objs.iter().map(|(sink, _)| sink.sink_id.as_job_id()),
1101 )
1102 .await?;
1103
1104 Ok(sink_objs
1105 .into_iter()
1106 .map(|(sink, obj)| {
1107 let streaming_job = streaming_jobs.get(&sink.sink_id.as_job_id()).cloned();
1108 ObjectModel(sink, obj.unwrap(), streaming_job).into()
1109 })
1110 .collect())
1111 }
1112
1113 async fn list_subscriptions(&self) -> MetaResult<Vec<PbSubscription>> {
1115 let subscription_objs = Subscription::find()
1116 .find_also_related(Object)
1117 .filter(subscription::Column::SubscriptionState.eq(SubscriptionState::Created as i32))
1118 .all(&self.db)
1119 .await?;
1120
1121 Ok(subscription_objs
1122 .into_iter()
1123 .map(|(subscription, obj)| ObjectModel(subscription, obj.unwrap(), None).into())
1124 .collect())
1125 }
1126
1127 async fn list_views(&self) -> MetaResult<Vec<PbView>> {
1128 let view_objs = View::find().find_also_related(Object).all(&self.db).await?;
1129
1130 Ok(view_objs
1131 .into_iter()
1132 .map(|(view, obj)| ObjectModel(view, obj.unwrap(), None).into())
1133 .collect())
1134 }
1135
1136 async fn list_indexes(&self) -> MetaResult<Vec<PbIndex>> {
1138 let index_objs = Index::find()
1139 .find_also_related(Object)
1140 .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
1141 .filter(
1142 streaming_job::Column::JobStatus
1143 .eq(JobStatus::Created)
1144 .or(streaming_job::Column::CreateType.eq(CreateType::Background)),
1145 )
1146 .all(&self.db)
1147 .await?;
1148 let streaming_jobs = load_streaming_jobs_by_ids(
1149 &self.db,
1150 index_objs
1151 .iter()
1152 .map(|(index, _)| index.index_id.as_job_id()),
1153 )
1154 .await?;
1155
1156 Ok(index_objs
1157 .into_iter()
1158 .map(|(index, obj)| {
1159 let streaming_job = streaming_jobs.get(&index.index_id.as_job_id()).cloned();
1160 ObjectModel(index, obj.unwrap(), streaming_job).into()
1161 })
1162 .collect())
1163 }
1164
1165 async fn list_connections(&self) -> MetaResult<Vec<PbConnection>> {
1166 let conn_objs = Connection::find()
1167 .find_also_related(Object)
1168 .all(&self.db)
1169 .await?;
1170
1171 Ok(conn_objs
1172 .into_iter()
1173 .map(|(conn, obj)| ObjectModel(conn, obj.unwrap(), None).into())
1174 .collect())
1175 }
1176
1177 pub async fn list_secrets(&self) -> MetaResult<Vec<PbSecret>> {
1178 let secret_objs = Secret::find()
1179 .find_also_related(Object)
1180 .all(&self.db)
1181 .await?;
1182 Ok(secret_objs
1183 .into_iter()
1184 .map(|(secret, obj)| ObjectModel(secret, obj.unwrap(), None).into())
1185 .collect())
1186 }
1187
1188 async fn list_functions(&self) -> MetaResult<Vec<PbFunction>> {
1189 let func_objs = Function::find()
1190 .find_also_related(Object)
1191 .all(&self.db)
1192 .await?;
1193
1194 Ok(func_objs
1195 .into_iter()
1196 .map(|(func, obj)| ObjectModel(func, obj.unwrap(), None).into())
1197 .collect())
1198 }
1199
1200 pub(crate) fn register_finish_notifier(
1201 &mut self,
1202 database_id: DatabaseId,
1203 id: JobId,
1204 sender: Sender<Result<NotificationVersion, String>>,
1205 ) {
1206 self.creating_table_finish_notifier
1207 .entry(database_id)
1208 .or_default()
1209 .entry(id)
1210 .or_default()
1211 .push(sender);
1212 }
1213
1214 pub(crate) async fn streaming_job_is_finished(&mut self, id: JobId) -> MetaResult<bool> {
1215 let status = StreamingJob::find()
1216 .select_only()
1217 .column(streaming_job::Column::JobStatus)
1218 .filter(streaming_job::Column::JobId.eq(id))
1219 .into_tuple::<JobStatus>()
1220 .one(&self.db)
1221 .await?;
1222
1223 status
1224 .map(|status| status == JobStatus::Created)
1225 .ok_or_else(|| {
1226 MetaError::catalog_id_not_found("streaming job", "may have been cancelled/dropped")
1227 })
1228 }
1229
1230 pub(crate) fn notify_finish_failed(&mut self, database_id: Option<DatabaseId>, err: String) {
1231 if let Some(database_id) = database_id {
1232 if let Some(creating_tables) = self.creating_table_finish_notifier.remove(&database_id)
1233 {
1234 for tx in creating_tables.into_values().flatten() {
1235 let _ = tx.send(Err(err.clone()));
1236 }
1237 }
1238 } else {
1239 for tx in take(&mut self.creating_table_finish_notifier)
1240 .into_values()
1241 .flatten()
1242 .flat_map(|(_, txs)| txs.into_iter())
1243 {
1244 let _ = tx.send(Err(err.clone()));
1245 }
1246 }
1247 }
1248
1249 pub async fn list_time_travel_table_ids(&self) -> MetaResult<Vec<TableId>> {
1250 let table_ids: Vec<TableId> = Table::find()
1251 .select_only()
1252 .filter(table::Column::TableType.is_in(vec![
1253 TableType::Table,
1254 TableType::MaterializedView,
1255 TableType::Index,
1256 ]))
1257 .column(table::Column::TableId)
1258 .into_tuple()
1259 .all(&self.db)
1260 .await?;
1261 Ok(table_ids)
1262 }
1263
1264 pub(crate) fn complete_dropped_tables(
1267 &mut self,
1268 table_ids: impl IntoIterator<Item = TableId>,
1269 ) -> Vec<PbTable> {
1270 table_ids
1271 .into_iter()
1272 .filter_map(|table_id| {
1273 self.dropped_tables.remove(&table_id).map_or_else(
1274 || {
1275 tracing::warn!(%table_id, "table not found");
1276 None
1277 },
1278 Some,
1279 )
1280 })
1281 .collect()
1282 }
1283}