1mod alter_op;
16mod create_op;
17mod drop_op;
18mod get_op;
19mod list_op;
20mod test;
21mod util;
22
23use std::collections::{BTreeSet, HashMap, HashSet};
24use std::iter;
25use std::mem::take;
26use std::sync::Arc;
27
28use anyhow::anyhow;
29use itertools::Itertools;
30use risingwave_common::catalog::{
31 DEFAULT_SCHEMA_NAME, FragmentTypeFlag, FragmentTypeMask, SYSTEM_SCHEMAS, TableOption,
32};
33use risingwave_common::current_cluster_version;
34use risingwave_common::id::JobId;
35use risingwave_common::secret::LocalSecretManager;
36use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont_mut;
37use risingwave_connector::source::UPSTREAM_SOURCE_KEY;
38use risingwave_connector::source::cdc::build_cdc_table_id;
39use risingwave_meta_model::object::ObjectType;
40use risingwave_meta_model::prelude::*;
41use risingwave_meta_model::table::TableType;
42use risingwave_meta_model::{
43 ColumnCatalogArray, ConnectionId, CreateType, DatabaseId, FragmentId, I32Array, IndexId,
44 JobStatus, ObjectId, Property, SchemaId, SecretId, SinkFormatDesc, SinkId, SourceId,
45 StreamNode, StreamSourceInfo, StreamingParallelism, SubscriptionId, TableId, TableIdArray,
46 UserId, ViewId, connection, database, fragment, function, index, object, object_dependency,
47 pending_sink_state, schema, secret, sink, source, streaming_job, subscription, table,
48 user_privilege, view,
49};
50use risingwave_pb::catalog::connection::Info as ConnectionInfo;
51use risingwave_pb::catalog::subscription::SubscriptionState;
52use risingwave_pb::catalog::table::PbTableType;
53use risingwave_pb::catalog::{
54 PbComment, PbConnection, PbDatabase, PbFunction, PbIndex, PbSchema, PbSecret, PbSink, PbSource,
55 PbSubscription, PbTable, PbView,
56};
57use risingwave_pb::meta::cancel_creating_jobs_request::PbCreatingJobInfo;
58use risingwave_pb::meta::object::PbObjectInfo;
59use risingwave_pb::meta::subscribe_response::{
60 Info as NotificationInfo, Info, Operation as NotificationOperation, Operation,
61};
62use risingwave_pb::meta::{PbObject, PbObjectGroup};
63use risingwave_pb::stream_plan::stream_node::NodeBody;
64use risingwave_pb::telemetry::PbTelemetryEventStage;
65use risingwave_pb::user::PbUserInfo;
66use sea_orm::ActiveValue::Set;
67use sea_orm::sea_query::{Expr, Query, SimpleExpr};
68use sea_orm::{
69 ActiveModelTrait, ColumnTrait, DatabaseConnection, DatabaseTransaction, EntityTrait,
70 IntoActiveModel, JoinType, PaginatorTrait, QueryFilter, QuerySelect, RelationTrait,
71 SelectColumns, TransactionTrait, Value,
72};
73use tokio::sync::oneshot::Sender;
74use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
75use tracing::info;
76
77use super::utils::{
78 check_subscription_name_duplicate, get_internal_tables_by_id, load_streaming_jobs_by_ids,
79 rename_relation, rename_relation_refer,
80};
81use crate::controller::ObjectModel;
82use crate::controller::catalog::util::update_internal_tables;
83use crate::controller::fragment::FragmentTypeMaskExt;
84use crate::controller::utils::*;
85use crate::manager::{
86 IGNORED_NOTIFICATION_VERSION, MetaSrvEnv, NotificationVersion,
87 get_referred_connection_ids_from_source, get_referred_secret_ids_from_source,
88};
89use crate::rpc::ddl_controller::DropMode;
90use crate::telemetry::{MetaTelemetryJobDesc, report_event};
91use crate::{MetaError, MetaResult};
92
93pub type Catalog = (
94 Vec<PbDatabase>,
95 Vec<PbSchema>,
96 Vec<PbTable>,
97 Vec<PbSource>,
98 Vec<PbSink>,
99 Vec<PbSubscription>,
100 Vec<PbIndex>,
101 Vec<PbView>,
102 Vec<PbFunction>,
103 Vec<PbConnection>,
104 Vec<PbSecret>,
105);
106
107pub type CatalogControllerRef = Arc<CatalogController>;
108
109pub struct CatalogController {
111 pub(crate) env: MetaSrvEnv,
112 pub(crate) inner: RwLock<CatalogControllerInner>,
113}
114
115#[derive(Clone, Default, Debug)]
116pub struct DropTableConnectorContext {
117 pub(crate) to_change_streaming_job_id: JobId,
119 pub(crate) to_remove_state_table_id: TableId,
120 pub(crate) to_remove_source_id: SourceId,
121}
122
123#[derive(Clone, Default, Debug)]
124pub struct ReleaseContext {
125 pub(crate) database_id: DatabaseId,
126 pub(crate) removed_streaming_job_ids: Vec<JobId>,
127 pub(crate) removed_state_table_ids: Vec<TableId>,
129
130 pub(crate) removed_secret_ids: Vec<SecretId>,
132 pub(crate) removed_source_ids: Vec<SourceId>,
134 pub(crate) removed_source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
137
138 pub(crate) removed_fragments: HashSet<FragmentId>,
139
140 pub(crate) removed_sink_fragment_by_targets: HashMap<FragmentId, Vec<FragmentId>>,
142
143 pub(crate) removed_iceberg_table_sinks: Vec<PbSink>,
145}
146
147impl CatalogController {
148 pub async fn new(env: MetaSrvEnv) -> MetaResult<Self> {
149 let meta_store = env.meta_store();
150 let catalog_controller = Self {
151 env,
152 inner: RwLock::new(CatalogControllerInner {
153 db: meta_store.conn,
154 creating_table_finish_notifier: HashMap::new(),
155 dropped_tables: HashMap::new(),
156 }),
157 };
158
159 catalog_controller.init().await?;
160 Ok(catalog_controller)
161 }
162
163 pub async fn get_inner_read_guard(&self) -> RwLockReadGuard<'_, CatalogControllerInner> {
166 self.inner.read().await
167 }
168
169 pub async fn get_inner_write_guard(&self) -> RwLockWriteGuard<'_, CatalogControllerInner> {
170 self.inner.write().await
171 }
172}
173
174pub struct CatalogControllerInner {
175 pub(crate) db: DatabaseConnection,
176 #[expect(clippy::type_complexity)]
181 pub creating_table_finish_notifier:
182 HashMap<DatabaseId, HashMap<JobId, Vec<Sender<Result<NotificationVersion, String>>>>>,
183 pub dropped_tables: HashMap<TableId, PbTable>,
185}
186
187impl CatalogController {
188 pub(crate) async fn notify_frontend(
189 &self,
190 operation: NotificationOperation,
191 info: NotificationInfo,
192 ) -> NotificationVersion {
193 self.env
194 .notification_manager()
195 .notify_frontend(operation, info)
196 .await
197 }
198
199 pub(crate) async fn notify_frontend_relation_info(
200 &self,
201 operation: NotificationOperation,
202 relation_info: PbObjectInfo,
203 ) -> NotificationVersion {
204 self.env
205 .notification_manager()
206 .notify_frontend_object_info(operation, relation_info)
207 .await
208 }
209
210 pub(crate) async fn notify_frontend_trivial(&self) -> NotificationVersion {
217 self.env
218 .notification_manager()
219 .notify_frontend(
220 NotificationOperation::Update,
221 NotificationInfo::ObjectGroup(PbObjectGroup {
222 objects: vec![],
223 dependencies: vec![],
224 }),
225 )
226 .await
227 }
228}
229
230impl CatalogController {
231 pub async fn finish_create_subscription_catalog(
232 &self,
233 subscription_id: SubscriptionId,
234 ) -> MetaResult<()> {
235 let inner = self.inner.write().await;
236 let txn = inner.db.begin().await?;
237
238 let res = Object::update_many()
240 .col_expr(object::Column::CreatedAt, Expr::current_timestamp().into())
241 .col_expr(
242 object::Column::CreatedAtClusterVersion,
243 current_cluster_version().into(),
244 )
245 .filter(object::Column::Oid.eq(subscription_id))
246 .exec(&txn)
247 .await?;
248 if res.rows_affected == 0 {
249 return Err(MetaError::catalog_id_not_found(
250 "subscription",
251 subscription_id,
252 ));
253 }
254
255 let job = subscription::ActiveModel {
257 subscription_id: Set(subscription_id),
258 subscription_state: Set(SubscriptionState::Created.into()),
259 ..Default::default()
260 };
261 Subscription::update(job).exec(&txn).await?;
262
263 let _ = grant_default_privileges_automatically(&txn, subscription_id).await?;
264
265 txn.commit().await?;
266
267 Ok(())
268 }
269
270 pub async fn notify_create_subscription(
271 &self,
272 subscription_id: SubscriptionId,
273 ) -> MetaResult<NotificationVersion> {
274 let inner = self.inner.read().await;
275 let txn = inner.db.begin().await?;
276 let (subscription, obj) = Subscription::find_by_id(subscription_id)
277 .find_also_related(Object)
278 .filter(subscription::Column::SubscriptionState.eq(SubscriptionState::Created as i32))
279 .one(&txn)
280 .await?
281 .ok_or_else(|| MetaError::catalog_id_not_found("subscription", subscription_id))?;
282
283 let dependencies =
284 list_object_dependencies_by_object_id(&txn, subscription_id.into()).await?;
285 txn.commit().await?;
286
287 let mut version = self
288 .notify_frontend(
289 NotificationOperation::Add,
290 NotificationInfo::ObjectGroup(PbObjectGroup {
291 objects: vec![PbObject {
292 object_info: PbObjectInfo::Subscription(
293 ObjectModel(subscription, obj.unwrap(), None).into(),
294 )
295 .into(),
296 }],
297 dependencies,
298 }),
299 )
300 .await;
301
302 let updated_user_ids: Vec<UserId> = UserPrivilege::find()
304 .select_only()
305 .distinct()
306 .column(user_privilege::Column::UserId)
307 .filter(user_privilege::Column::Oid.eq(subscription_id.as_object_id()))
308 .into_tuple()
309 .all(&inner.db)
310 .await?;
311
312 if !updated_user_ids.is_empty() {
313 let updated_user_infos = list_user_info_by_ids(updated_user_ids, &inner.db).await?;
314 version = self.notify_users_update(updated_user_infos).await;
315 }
316
317 Ok(version)
318 }
319
320 pub async fn get_connector_usage(&self) -> MetaResult<jsonbb::Value> {
322 let inner = self.inner.read().await;
342 let source_props_and_info: Vec<(i32, Property, Option<StreamSourceInfo>)> = Source::find()
343 .select_only()
344 .column(source::Column::SourceId)
345 .column(source::Column::WithProperties)
346 .column(source::Column::SourceInfo)
347 .into_tuple()
348 .all(&inner.db)
349 .await?;
350 let sink_props_and_info: Vec<(i32, Property, Option<SinkFormatDesc>)> = Sink::find()
351 .select_only()
352 .column(sink::Column::SinkId)
353 .column(sink::Column::Properties)
354 .column(sink::Column::SinkFormatDesc)
355 .into_tuple()
356 .all(&inner.db)
357 .await?;
358 drop(inner);
359
360 let get_connector_from_property = |property: &Property| -> String {
361 property
362 .0
363 .get(UPSTREAM_SOURCE_KEY)
364 .cloned()
365 .unwrap_or_default()
366 };
367
368 let source_report: Vec<jsonbb::Value> = source_props_and_info
369 .iter()
370 .map(|(oid, property, info)| {
371 let connector_name = get_connector_from_property(property);
372 let mut format = None;
373 let mut encode = None;
374 if let Some(info) = info {
375 let pb_info = info.to_protobuf();
376 format = Some(pb_info.format().as_str_name());
377 encode = Some(pb_info.row_encode().as_str_name());
378 }
379 jsonbb::json!({
380 oid.to_string(): {
381 "connector": connector_name,
382 "format": format,
383 "encode": encode,
384 },
385 })
386 })
387 .collect_vec();
388
389 let sink_report: Vec<jsonbb::Value> = sink_props_and_info
390 .iter()
391 .map(|(oid, property, info)| {
392 let connector_name = get_connector_from_property(property);
393 let mut format = None;
394 let mut encode = None;
395 if let Some(info) = info {
396 let pb_info = info.to_protobuf();
397 format = Some(pb_info.format().as_str_name());
398 encode = Some(pb_info.encode().as_str_name());
399 }
400 jsonbb::json!({
401 oid.to_string(): {
402 "connector": connector_name,
403 "format": format,
404 "encode": encode,
405 },
406 })
407 })
408 .collect_vec();
409
410 Ok(jsonbb::json!({
411 "source": source_report,
412 "sink": sink_report,
413 }))
414 }
415
416 pub async fn clean_dirty_subscription(
417 &self,
418 database_id: Option<DatabaseId>,
419 ) -> MetaResult<()> {
420 let inner = self.inner.write().await;
421 let txn = inner.db.begin().await?;
422 let filter_condition = object::Column::ObjType.eq(ObjectType::Subscription).and(
423 object::Column::Oid.not_in_subquery(
424 Query::select()
425 .column(subscription::Column::SubscriptionId)
426 .from(Subscription)
427 .and_where(
428 subscription::Column::SubscriptionState
429 .eq(SubscriptionState::Created as i32),
430 )
431 .take(),
432 ),
433 );
434
435 let filter_condition = if let Some(database_id) = database_id {
436 filter_condition.and(object::Column::DatabaseId.eq(database_id))
437 } else {
438 filter_condition
439 };
440 Object::delete_many()
441 .filter(filter_condition)
442 .exec(&txn)
443 .await?;
444 txn.commit().await?;
445 Ok(())
447 }
448
449 pub async fn clean_dirty_creating_jobs(
451 &self,
452 database_id: Option<DatabaseId>,
453 ) -> MetaResult<Vec<SourceId>> {
454 let inner = self.inner.write().await;
455 let txn = inner.db.begin().await?;
456
457 let filter_condition = streaming_job::Column::JobStatus.eq(JobStatus::Initial).or(
458 streaming_job::Column::JobStatus
459 .eq(JobStatus::Creating)
460 .and(streaming_job::Column::CreateType.eq(CreateType::Foreground)),
461 );
462
463 let filter_condition = if let Some(database_id) = database_id {
464 filter_condition.and(object::Column::DatabaseId.eq(database_id))
465 } else {
466 filter_condition
467 };
468
469 let mut dirty_job_objs: Vec<PartialObject> = streaming_job::Entity::find()
470 .select_only()
471 .column(streaming_job::Column::JobId)
472 .columns([
473 object::Column::Oid,
474 object::Column::ObjType,
475 object::Column::SchemaId,
476 object::Column::DatabaseId,
477 ])
478 .join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
479 .filter(filter_condition)
480 .into_partial_model()
481 .all(&txn)
482 .await?;
483
484 let dirty_iceberg_jobs = find_dirty_iceberg_table_jobs(&txn, database_id).await?;
486 if !dirty_iceberg_jobs.is_empty() {
487 dirty_job_objs.extend(dirty_iceberg_jobs);
488 }
489
490 Self::clean_dirty_sink_downstreams(&txn).await?;
491
492 if dirty_job_objs.is_empty() {
493 return Ok(vec![]);
494 }
495
496 self.log_cleaned_dirty_jobs(&dirty_job_objs, &txn).await?;
497
498 let dirty_job_ids = dirty_job_objs.iter().map(|obj| obj.oid).collect::<Vec<_>>();
499
500 let all_dirty_table_ids = dirty_job_objs
503 .iter()
504 .filter(|obj| obj.obj_type == ObjectType::Table)
505 .map(|obj| obj.oid)
506 .collect_vec();
507 let dirty_table_type_map: HashMap<ObjectId, TableType> = Table::find()
508 .select_only()
509 .column(table::Column::TableId)
510 .column(table::Column::TableType)
511 .filter(table::Column::TableId.is_in(all_dirty_table_ids))
512 .into_tuple::<(ObjectId, TableType)>()
513 .all(&txn)
514 .await?
515 .into_iter()
516 .collect();
517
518 let dirty_background_jobs: HashSet<ObjectId> = streaming_job::Entity::find()
519 .select_only()
520 .column(streaming_job::Column::JobId)
521 .filter(
522 streaming_job::Column::JobId
523 .is_in(dirty_job_ids.clone())
524 .and(streaming_job::Column::CreateType.eq(CreateType::Background)),
525 )
526 .into_tuple()
527 .all(&txn)
528 .await?
529 .into_iter()
530 .collect();
531
532 let to_notify_objs = dirty_job_objs
534 .into_iter()
535 .filter(|obj| {
536 matches!(
537 dirty_table_type_map.get(&obj.oid),
538 Some(TableType::MaterializedView)
539 ) || dirty_background_jobs.contains(&obj.oid)
540 })
541 .collect_vec();
542
543 let dirty_associated_source_ids: Vec<SourceId> = Table::find()
546 .select_only()
547 .column(table::Column::OptionalAssociatedSourceId)
548 .filter(
549 table::Column::TableId
550 .is_in(dirty_job_ids.clone())
551 .and(table::Column::OptionalAssociatedSourceId.is_not_null()),
552 )
553 .into_tuple()
554 .all(&txn)
555 .await?;
556
557 let dirty_state_table_ids: Vec<TableId> = Table::find()
558 .select_only()
559 .column(table::Column::TableId)
560 .filter(table::Column::BelongsToJobId.is_in(dirty_job_ids.clone()))
561 .into_tuple()
562 .all(&txn)
563 .await?;
564
565 let dirty_internal_table_objs = Object::find()
566 .select_only()
567 .columns([
568 object::Column::Oid,
569 object::Column::ObjType,
570 object::Column::SchemaId,
571 object::Column::DatabaseId,
572 ])
573 .join(JoinType::InnerJoin, object::Relation::Table.def())
574 .filter(table::Column::BelongsToJobId.is_in(to_notify_objs.iter().map(|obj| obj.oid)))
575 .into_partial_model()
576 .all(&txn)
577 .await?;
578
579 let to_delete_objs: HashSet<ObjectId> = dirty_job_ids
580 .clone()
581 .into_iter()
582 .chain(
583 dirty_state_table_ids
584 .into_iter()
585 .map(|table_id| table_id.as_object_id()),
586 )
587 .chain(
588 dirty_associated_source_ids
589 .iter()
590 .map(|source_id| source_id.as_object_id()),
591 )
592 .collect();
593
594 let res = Object::delete_many()
595 .filter(object::Column::Oid.is_in(to_delete_objs))
596 .exec(&txn)
597 .await?;
598 assert!(res.rows_affected > 0);
599
600 txn.commit().await?;
601
602 let object_group = build_object_group_for_delete(
603 to_notify_objs
604 .into_iter()
605 .chain(dirty_internal_table_objs.into_iter())
606 .collect_vec(),
607 );
608
609 let _version = self
610 .notify_frontend(NotificationOperation::Delete, object_group)
611 .await;
612
613 Ok(dirty_associated_source_ids)
614 }
615
616 pub async fn comment_on(&self, comment: PbComment) -> MetaResult<NotificationVersion> {
617 let inner = self.inner.write().await;
618 let txn = inner.db.begin().await?;
619 ensure_object_id(ObjectType::Database, comment.database_id, &txn).await?;
620 ensure_object_id(ObjectType::Schema, comment.schema_id, &txn).await?;
621 let (table_obj, streaming_job) = Object::find_by_id(comment.table_id)
622 .find_also_related(StreamingJob)
623 .one(&txn)
624 .await?
625 .ok_or_else(|| MetaError::catalog_id_not_found("object", comment.table_id))?;
626
627 let table = if let Some(col_idx) = comment.column_index {
628 let columns: ColumnCatalogArray = Table::find_by_id(comment.table_id)
629 .select_only()
630 .column(table::Column::Columns)
631 .into_tuple()
632 .one(&txn)
633 .await?
634 .ok_or_else(|| MetaError::catalog_id_not_found("table", comment.table_id))?;
635 let mut pb_columns = columns.to_protobuf();
636
637 let column = pb_columns
638 .get_mut(col_idx as usize)
639 .ok_or_else(|| MetaError::catalog_id_not_found("column", col_idx))?;
640 let column_desc = column.column_desc.as_mut().ok_or_else(|| {
641 anyhow!(
642 "column desc at index {} for table id {} not found",
643 col_idx,
644 comment.table_id
645 )
646 })?;
647 column_desc.description = comment.description;
648 table::ActiveModel {
649 table_id: Set(comment.table_id),
650 columns: Set(pb_columns.into()),
651 ..Default::default()
652 }
653 .update(&txn)
654 .await?
655 } else {
656 table::ActiveModel {
657 table_id: Set(comment.table_id),
658 description: Set(comment.description),
659 ..Default::default()
660 }
661 .update(&txn)
662 .await?
663 };
664 txn.commit().await?;
665
666 let version = self
667 .notify_frontend_relation_info(
668 NotificationOperation::Update,
669 PbObjectInfo::Table(ObjectModel(table, table_obj, streaming_job).into()),
670 )
671 .await;
672
673 Ok(version)
674 }
675
676 async fn notify_hummock_dropped_tables(&self, tables: Vec<PbTable>) {
677 if tables.is_empty() {
678 return;
679 }
680 let objects = tables
681 .into_iter()
682 .map(|t| PbObject {
683 object_info: Some(PbObjectInfo::Table(t)),
684 })
685 .collect();
686 let group = NotificationInfo::ObjectGroup(PbObjectGroup {
687 objects,
688 dependencies: vec![],
689 });
690 self.env
691 .notification_manager()
692 .notify_hummock(NotificationOperation::Delete, group.clone())
693 .await;
694 self.env
695 .notification_manager()
696 .notify_compactor(NotificationOperation::Delete, group)
697 .await;
698 }
699
700 pub async fn complete_dropped_tables(&self, table_ids: impl IntoIterator<Item = TableId>) {
701 let mut inner = self.inner.write().await;
702 let tables = inner.complete_dropped_tables(table_ids);
703 self.notify_hummock_dropped_tables(tables).await;
704 }
705
706 pub async fn cleanup_dropped_tables(&self) {
707 let mut inner = self.inner.write().await;
708 let tables = inner.dropped_tables.drain().map(|(_, t)| t).collect();
709 self.notify_hummock_dropped_tables(tables).await;
710 }
711
712 pub async fn stats(&self) -> MetaResult<CatalogStats> {
713 let inner = self.inner.read().await;
714
715 let mut table_num_map: HashMap<_, _> = Table::find()
716 .select_only()
717 .column(table::Column::TableType)
718 .column_as(table::Column::TableId.count(), "num")
719 .group_by(table::Column::TableType)
720 .having(table::Column::TableType.ne(TableType::Internal))
721 .into_tuple::<(TableType, i64)>()
722 .all(&inner.db)
723 .await?
724 .into_iter()
725 .map(|(table_type, num)| (table_type, num as u64))
726 .collect();
727
728 let source_num = Source::find().count(&inner.db).await?;
729 let sink_num = Sink::find().count(&inner.db).await?;
730 let function_num = Function::find().count(&inner.db).await?;
731 let streaming_job_num = StreamingJob::find().count(&inner.db).await?;
732
733 let actor_num = {
734 let guard = self.env.shared_actor_info.read_guard();
735 guard
736 .iter_over_fragments()
737 .map(|(_, fragment)| fragment.actors.len() as u64)
738 .sum::<u64>()
739 };
740 let database_num = Database::find().count(&inner.db).await?;
741
742 Ok(CatalogStats {
743 table_num: table_num_map.remove(&TableType::Table).unwrap_or(0),
744 mview_num: table_num_map
745 .remove(&TableType::MaterializedView)
746 .unwrap_or(0),
747 index_num: table_num_map.remove(&TableType::Index).unwrap_or(0),
748 source_num,
749 sink_num,
750 function_num,
751 streaming_job_num,
752 actor_num,
753 database_num,
754 })
755 }
756
757 pub async fn fetch_sink_with_state_table_ids(
758 &self,
759 sink_ids: HashSet<SinkId>,
760 ) -> MetaResult<HashMap<SinkId, Vec<TableId>>> {
761 let inner = self.inner.read().await;
762
763 let query = Fragment::find()
764 .select_only()
765 .columns([fragment::Column::JobId, fragment::Column::StateTableIds])
766 .filter(
767 fragment::Column::JobId
768 .is_in(sink_ids)
769 .and(FragmentTypeMask::intersects(FragmentTypeFlag::Sink)),
770 );
771
772 let rows: Vec<(JobId, TableIdArray)> = query.into_tuple().all(&inner.db).await?;
773
774 debug_assert!(rows.iter().map(|(job_id, _)| job_id).all_unique());
775
776 let result = rows
777 .into_iter()
778 .map(|(job_id, table_id_array)| (job_id.as_sink_id(), table_id_array.0))
779 .collect::<HashMap<_, _>>();
780
781 Ok(result)
782 }
783
784 pub async fn list_all_pending_sinks(
785 &self,
786 database_id: Option<DatabaseId>,
787 ) -> MetaResult<HashSet<SinkId>> {
788 let inner = self.inner.read().await;
789
790 let mut query = pending_sink_state::Entity::find()
791 .select_only()
792 .columns([pending_sink_state::Column::SinkId])
793 .filter(
794 pending_sink_state::Column::SinkState.eq(pending_sink_state::SinkState::Pending),
795 )
796 .distinct();
797
798 if let Some(db_id) = database_id {
799 query = query
800 .join(
801 JoinType::InnerJoin,
802 pending_sink_state::Relation::Object.def(),
803 )
804 .filter(object::Column::DatabaseId.eq(db_id));
805 }
806
807 let result: Vec<SinkId> = query.into_tuple().all(&inner.db).await?;
808
809 Ok(result.into_iter().collect())
810 }
811
812 pub async fn abort_pending_sink_epochs(
813 &self,
814 sink_committed_epoch: HashMap<SinkId, u64>,
815 ) -> MetaResult<()> {
816 let inner = self.inner.write().await;
817 let txn = inner.db.begin().await?;
818
819 for (sink_id, committed_epoch) in sink_committed_epoch {
820 pending_sink_state::Entity::update_many()
821 .col_expr(
822 pending_sink_state::Column::SinkState,
823 Expr::value(pending_sink_state::SinkState::Aborted),
824 )
825 .filter(
826 pending_sink_state::Column::SinkId
827 .eq(sink_id)
828 .and(pending_sink_state::Column::Epoch.gt(committed_epoch as i64)),
829 )
830 .exec(&txn)
831 .await?;
832 }
833
834 txn.commit().await?;
835 Ok(())
836 }
837}
838
839pub struct CatalogStats {
841 pub table_num: u64,
842 pub mview_num: u64,
843 pub index_num: u64,
844 pub source_num: u64,
845 pub sink_num: u64,
846 pub function_num: u64,
847 pub streaming_job_num: u64,
848 pub actor_num: u64,
849 pub database_num: u64,
850}
851
852impl CatalogControllerInner {
853 pub async fn snapshot(&self) -> MetaResult<(Catalog, Vec<PbUserInfo>)> {
854 let databases = self.list_databases().await?;
855 let schemas = self.list_schemas().await?;
856 let tables = self.list_tables().await?;
857 let sources = self.list_sources().await?;
858 let sinks = self.list_sinks().await?;
859 let subscriptions = self.list_subscriptions().await?;
860 let indexes = self.list_indexes().await?;
861 let views = self.list_views().await?;
862 let functions = self.list_functions().await?;
863 let connections = self.list_connections().await?;
864 let secrets = self.list_secrets().await?;
865
866 let users = self.list_users().await?;
867
868 Ok((
869 (
870 databases,
871 schemas,
872 tables,
873 sources,
874 sinks,
875 subscriptions,
876 indexes,
877 views,
878 functions,
879 connections,
880 secrets,
881 ),
882 users,
883 ))
884 }
885
886 async fn list_databases(&self) -> MetaResult<Vec<PbDatabase>> {
887 let db_objs = Database::find()
888 .find_also_related(Object)
889 .all(&self.db)
890 .await?;
891 Ok(db_objs
892 .into_iter()
893 .map(|(db, obj)| ObjectModel(db, obj.unwrap(), None).into())
894 .collect())
895 }
896
897 async fn list_schemas(&self) -> MetaResult<Vec<PbSchema>> {
898 let schema_objs = Schema::find()
899 .find_also_related(Object)
900 .all(&self.db)
901 .await?;
902
903 Ok(schema_objs
904 .into_iter()
905 .map(|(schema, obj)| ObjectModel(schema, obj.unwrap(), None).into())
906 .collect())
907 }
908
909 async fn list_users(&self) -> MetaResult<Vec<PbUserInfo>> {
910 let mut user_infos: Vec<PbUserInfo> = User::find()
911 .all(&self.db)
912 .await?
913 .into_iter()
914 .map(Into::into)
915 .collect();
916
917 for user_info in &mut user_infos {
918 user_info.grant_privileges = get_user_privilege(user_info.id as _, &self.db).await?;
919 }
920 Ok(user_infos)
921 }
922
923 pub async fn list_all_state_tables(&self) -> MetaResult<Vec<PbTable>> {
925 let table_objs = Table::find()
926 .find_also_related(Object)
927 .all(&self.db)
928 .await?;
929 let streaming_jobs = load_streaming_jobs_by_ids(
930 &self.db,
931 table_objs.iter().map(|(table, _)| table.job_id()),
932 )
933 .await?;
934
935 Ok(table_objs
936 .into_iter()
937 .map(|(table, obj)| {
938 let job_id = table.job_id();
939 let streaming_job = streaming_jobs.get(&job_id).cloned();
940 ObjectModel(table, obj.unwrap(), streaming_job).into()
941 })
942 .collect())
943 }
944
945 async fn list_tables(&self) -> MetaResult<Vec<PbTable>> {
947 let mut table_objs = Table::find()
948 .find_also_related(Object)
949 .all(&self.db)
950 .await?;
951
952 let all_streaming_jobs: HashMap<JobId, streaming_job::Model> = StreamingJob::find()
953 .all(&self.db)
954 .await?
955 .into_iter()
956 .map(|job| (job.job_id, job))
957 .collect();
958
959 let sink_ids: HashSet<SinkId> = Sink::find()
960 .select_only()
961 .column(sink::Column::SinkId)
962 .into_tuple::<SinkId>()
963 .all(&self.db)
964 .await?
965 .into_iter()
966 .collect();
967
968 let mview_job_ids: HashSet<JobId> = table_objs
969 .iter()
970 .filter_map(|(table, _)| {
971 if table.table_type == TableType::MaterializedView {
972 Some(table.table_id.as_job_id())
973 } else {
974 None
975 }
976 })
977 .collect();
978
979 table_objs.retain(|(table, _)| {
980 let job_id = table.job_id();
981
982 if sink_ids.contains(&job_id.as_sink_id()) || mview_job_ids.contains(&job_id) {
983 return true;
984 }
985 if let Some(streaming_job) = all_streaming_jobs.get(&job_id) {
986 return streaming_job.job_status == JobStatus::Created
987 || (streaming_job.create_type == CreateType::Background);
988 }
989 false
990 });
991
992 let mut tables = Vec::with_capacity(table_objs.len());
993 for (table, obj) in table_objs {
994 let job_id = table.job_id();
995 let streaming_job = all_streaming_jobs.get(&job_id).cloned();
996 let pb_table: PbTable = ObjectModel(table, obj.unwrap(), streaming_job).into();
997 tables.push(pb_table);
998 }
999 Ok(tables)
1000 }
1001
1002 async fn list_sources(&self) -> MetaResult<Vec<PbSource>> {
1004 let mut source_objs = Source::find()
1005 .find_also_related(Object)
1006 .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
1007 .filter(
1008 streaming_job::Column::JobStatus
1009 .is_null()
1010 .or(streaming_job::Column::JobStatus.eq(JobStatus::Created)),
1011 )
1012 .all(&self.db)
1013 .await?;
1014
1015 let created_table_ids: HashSet<TableId> = Table::find()
1017 .select_only()
1018 .column(table::Column::TableId)
1019 .join(JoinType::InnerJoin, table::Relation::Object1.def())
1020 .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
1021 .filter(
1022 table::Column::OptionalAssociatedSourceId
1023 .is_not_null()
1024 .and(streaming_job::Column::JobStatus.eq(JobStatus::Created)),
1025 )
1026 .into_tuple()
1027 .all(&self.db)
1028 .await?
1029 .into_iter()
1030 .collect();
1031 source_objs.retain_mut(|(source, _)| {
1032 source.optional_associated_table_id.is_none()
1033 || created_table_ids.contains(&source.optional_associated_table_id.unwrap())
1034 });
1035
1036 Ok(source_objs
1037 .into_iter()
1038 .map(|(source, obj)| ObjectModel(source, obj.unwrap(), None).into())
1039 .collect())
1040 }
1041
1042 async fn list_sinks(&self) -> MetaResult<Vec<PbSink>> {
1044 let sink_objs = Sink::find()
1045 .find_also_related(Object)
1046 .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
1047 .all(&self.db)
1048 .await?;
1049 let streaming_jobs = load_streaming_jobs_by_ids(
1050 &self.db,
1051 sink_objs.iter().map(|(sink, _)| sink.sink_id.as_job_id()),
1052 )
1053 .await?;
1054
1055 Ok(sink_objs
1056 .into_iter()
1057 .map(|(sink, obj)| {
1058 let streaming_job = streaming_jobs.get(&sink.sink_id.as_job_id()).cloned();
1059 ObjectModel(sink, obj.unwrap(), streaming_job).into()
1060 })
1061 .collect())
1062 }
1063
1064 async fn list_subscriptions(&self) -> MetaResult<Vec<PbSubscription>> {
1066 let subscription_objs = Subscription::find()
1067 .find_also_related(Object)
1068 .filter(subscription::Column::SubscriptionState.eq(SubscriptionState::Created as i32))
1069 .all(&self.db)
1070 .await?;
1071
1072 Ok(subscription_objs
1073 .into_iter()
1074 .map(|(subscription, obj)| ObjectModel(subscription, obj.unwrap(), None).into())
1075 .collect())
1076 }
1077
1078 async fn list_views(&self) -> MetaResult<Vec<PbView>> {
1079 let view_objs = View::find().find_also_related(Object).all(&self.db).await?;
1080
1081 Ok(view_objs
1082 .into_iter()
1083 .map(|(view, obj)| ObjectModel(view, obj.unwrap(), None).into())
1084 .collect())
1085 }
1086
1087 async fn list_indexes(&self) -> MetaResult<Vec<PbIndex>> {
1089 let index_objs = Index::find()
1090 .find_also_related(Object)
1091 .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
1092 .filter(
1093 streaming_job::Column::JobStatus
1094 .eq(JobStatus::Created)
1095 .or(streaming_job::Column::CreateType.eq(CreateType::Background)),
1096 )
1097 .all(&self.db)
1098 .await?;
1099 let streaming_jobs = load_streaming_jobs_by_ids(
1100 &self.db,
1101 index_objs
1102 .iter()
1103 .map(|(index, _)| index.index_id.as_job_id()),
1104 )
1105 .await?;
1106
1107 Ok(index_objs
1108 .into_iter()
1109 .map(|(index, obj)| {
1110 let streaming_job = streaming_jobs.get(&index.index_id.as_job_id()).cloned();
1111 ObjectModel(index, obj.unwrap(), streaming_job).into()
1112 })
1113 .collect())
1114 }
1115
1116 async fn list_connections(&self) -> MetaResult<Vec<PbConnection>> {
1117 let conn_objs = Connection::find()
1118 .find_also_related(Object)
1119 .all(&self.db)
1120 .await?;
1121
1122 Ok(conn_objs
1123 .into_iter()
1124 .map(|(conn, obj)| ObjectModel(conn, obj.unwrap(), None).into())
1125 .collect())
1126 }
1127
1128 pub async fn list_secrets(&self) -> MetaResult<Vec<PbSecret>> {
1129 let secret_objs = Secret::find()
1130 .find_also_related(Object)
1131 .all(&self.db)
1132 .await?;
1133 Ok(secret_objs
1134 .into_iter()
1135 .map(|(secret, obj)| ObjectModel(secret, obj.unwrap(), None).into())
1136 .collect())
1137 }
1138
1139 async fn list_functions(&self) -> MetaResult<Vec<PbFunction>> {
1140 let func_objs = Function::find()
1141 .find_also_related(Object)
1142 .all(&self.db)
1143 .await?;
1144
1145 Ok(func_objs
1146 .into_iter()
1147 .map(|(func, obj)| ObjectModel(func, obj.unwrap(), None).into())
1148 .collect())
1149 }
1150
1151 pub(crate) fn register_finish_notifier(
1152 &mut self,
1153 database_id: DatabaseId,
1154 id: JobId,
1155 sender: Sender<Result<NotificationVersion, String>>,
1156 ) {
1157 self.creating_table_finish_notifier
1158 .entry(database_id)
1159 .or_default()
1160 .entry(id)
1161 .or_default()
1162 .push(sender);
1163 }
1164
1165 pub(crate) async fn streaming_job_is_finished(&mut self, id: JobId) -> MetaResult<bool> {
1166 let status = StreamingJob::find()
1167 .select_only()
1168 .column(streaming_job::Column::JobStatus)
1169 .filter(streaming_job::Column::JobId.eq(id))
1170 .into_tuple::<JobStatus>()
1171 .one(&self.db)
1172 .await?;
1173
1174 status
1175 .map(|status| status == JobStatus::Created)
1176 .ok_or_else(|| {
1177 MetaError::catalog_id_not_found("streaming job", "may have been cancelled/dropped")
1178 })
1179 }
1180
1181 pub(crate) fn notify_finish_failed(&mut self, database_id: Option<DatabaseId>, err: String) {
1182 if let Some(database_id) = database_id {
1183 if let Some(creating_tables) = self.creating_table_finish_notifier.remove(&database_id)
1184 {
1185 for tx in creating_tables.into_values().flatten() {
1186 let _ = tx.send(Err(err.clone()));
1187 }
1188 }
1189 } else {
1190 for tx in take(&mut self.creating_table_finish_notifier)
1191 .into_values()
1192 .flatten()
1193 .flat_map(|(_, txs)| txs.into_iter())
1194 {
1195 let _ = tx.send(Err(err.clone()));
1196 }
1197 }
1198 }
1199
1200 pub async fn list_time_travel_table_ids(&self) -> MetaResult<Vec<TableId>> {
1201 let table_ids: Vec<TableId> = Table::find()
1202 .select_only()
1203 .filter(table::Column::TableType.is_in(vec![
1204 TableType::Table,
1205 TableType::MaterializedView,
1206 TableType::Index,
1207 ]))
1208 .column(table::Column::TableId)
1209 .into_tuple()
1210 .all(&self.db)
1211 .await?;
1212 Ok(table_ids)
1213 }
1214
1215 pub(crate) fn complete_dropped_tables(
1218 &mut self,
1219 table_ids: impl IntoIterator<Item = TableId>,
1220 ) -> Vec<PbTable> {
1221 table_ids
1222 .into_iter()
1223 .filter_map(|table_id| {
1224 self.dropped_tables.remove(&table_id).map_or_else(
1225 || {
1226 tracing::warn!(%table_id, "table not found");
1227 None
1228 },
1229 Some,
1230 )
1231 })
1232 .collect()
1233 }
1234}