1mod alter_op;
16mod create_op;
17mod drop_op;
18mod get_op;
19mod list_op;
20mod test;
21mod util;
22
23use std::collections::{BTreeSet, HashMap, HashSet};
24use std::iter;
25use std::mem::take;
26use std::sync::Arc;
27
28use anyhow::anyhow;
29use itertools::Itertools;
30use risingwave_common::catalog::{
31 DEFAULT_SCHEMA_NAME, FragmentTypeFlag, FragmentTypeMask, SYSTEM_SCHEMAS, TableOption,
32};
33use risingwave_common::current_cluster_version;
34use risingwave_common::id::JobId;
35use risingwave_common::secret::LocalSecretManager;
36use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont_mut;
37use risingwave_connector::source::UPSTREAM_SOURCE_KEY;
38use risingwave_connector::source::cdc::build_cdc_table_id;
39use risingwave_meta_model::object::ObjectType;
40use risingwave_meta_model::prelude::*;
41use risingwave_meta_model::table::TableType;
42use risingwave_meta_model::{
43 ActorId, ColumnCatalogArray, ConnectionId, CreateType, DatabaseId, FragmentId, I32Array,
44 IndexId, JobStatus, ObjectId, Property, SchemaId, SecretId, SinkFormatDesc, SinkId, SourceId,
45 StreamNode, StreamSourceInfo, StreamingParallelism, SubscriptionId, TableId, TableIdArray,
46 UserId, ViewId, connection, database, fragment, function, index, object, object_dependency,
47 pending_sink_state, schema, secret, sink, source, streaming_job, subscription, table,
48 user_privilege, view,
49};
50use risingwave_pb::catalog::connection::Info as ConnectionInfo;
51use risingwave_pb::catalog::subscription::SubscriptionState;
52use risingwave_pb::catalog::table::PbTableType;
53use risingwave_pb::catalog::{
54 PbComment, PbConnection, PbDatabase, PbFunction, PbIndex, PbSchema, PbSecret, PbSink, PbSource,
55 PbSubscription, PbTable, PbView,
56};
57use risingwave_pb::meta::cancel_creating_jobs_request::PbCreatingJobInfo;
58use risingwave_pb::meta::object::PbObjectInfo;
59use risingwave_pb::meta::subscribe_response::{
60 Info as NotificationInfo, Info, Operation as NotificationOperation, Operation,
61};
62use risingwave_pb::meta::{PbObject, PbObjectGroup};
63use risingwave_pb::stream_plan::stream_node::NodeBody;
64use risingwave_pb::telemetry::PbTelemetryEventStage;
65use risingwave_pb::user::PbUserInfo;
66use sea_orm::ActiveValue::Set;
67use sea_orm::sea_query::{Expr, Query, SimpleExpr};
68use sea_orm::{
69 ActiveModelTrait, ColumnTrait, DatabaseConnection, DatabaseTransaction, EntityTrait,
70 IntoActiveModel, JoinType, PaginatorTrait, QueryFilter, QuerySelect, RelationTrait,
71 SelectColumns, TransactionTrait, Value,
72};
73use tokio::sync::oneshot::Sender;
74use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
75use tracing::info;
76
77use super::utils::{
78 check_subscription_name_duplicate, get_internal_tables_by_id, load_streaming_jobs_by_ids,
79 rename_relation, rename_relation_refer,
80};
81use crate::controller::ObjectModel;
82use crate::controller::catalog::util::update_internal_tables;
83use crate::controller::fragment::FragmentTypeMaskExt;
84use crate::controller::utils::*;
85use crate::manager::{
86 IGNORED_NOTIFICATION_VERSION, MetaSrvEnv, NotificationVersion,
87 get_referred_connection_ids_from_source, get_referred_secret_ids_from_source,
88};
89use crate::rpc::ddl_controller::DropMode;
90use crate::telemetry::{MetaTelemetryJobDesc, report_event};
91use crate::{MetaError, MetaResult};
92
93pub type Catalog = (
94 Vec<PbDatabase>,
95 Vec<PbSchema>,
96 Vec<PbTable>,
97 Vec<PbSource>,
98 Vec<PbSink>,
99 Vec<PbSubscription>,
100 Vec<PbIndex>,
101 Vec<PbView>,
102 Vec<PbFunction>,
103 Vec<PbConnection>,
104 Vec<PbSecret>,
105);
106
107pub type CatalogControllerRef = Arc<CatalogController>;
108
109pub struct CatalogController {
111 pub(crate) env: MetaSrvEnv,
112 pub(crate) inner: RwLock<CatalogControllerInner>,
113}
114
115#[derive(Clone, Default, Debug)]
116pub struct DropTableConnectorContext {
117 pub(crate) to_change_streaming_job_id: JobId,
119 pub(crate) to_remove_state_table_id: TableId,
120 pub(crate) to_remove_source_id: SourceId,
121}
122
123#[derive(Clone, Default, Debug)]
124pub struct ReleaseContext {
125 pub(crate) database_id: DatabaseId,
126 pub(crate) removed_streaming_job_ids: Vec<JobId>,
127 pub(crate) removed_state_table_ids: Vec<TableId>,
129
130 pub(crate) removed_secret_ids: Vec<SecretId>,
132 pub(crate) removed_source_ids: Vec<SourceId>,
134 pub(crate) removed_source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
137
138 pub(crate) removed_actors: HashSet<ActorId>,
139 pub(crate) removed_fragments: HashSet<FragmentId>,
140
141 pub(crate) removed_sink_fragment_by_targets: HashMap<FragmentId, Vec<FragmentId>>,
143
144 pub(crate) removed_iceberg_table_sinks: Vec<PbSink>,
146}
147
148impl CatalogController {
149 pub async fn new(env: MetaSrvEnv) -> MetaResult<Self> {
150 let meta_store = env.meta_store();
151 let catalog_controller = Self {
152 env,
153 inner: RwLock::new(CatalogControllerInner {
154 db: meta_store.conn,
155 creating_table_finish_notifier: HashMap::new(),
156 dropped_tables: HashMap::new(),
157 }),
158 };
159
160 catalog_controller.init().await?;
161 Ok(catalog_controller)
162 }
163
164 pub async fn get_inner_read_guard(&self) -> RwLockReadGuard<'_, CatalogControllerInner> {
167 self.inner.read().await
168 }
169
170 pub async fn get_inner_write_guard(&self) -> RwLockWriteGuard<'_, CatalogControllerInner> {
171 self.inner.write().await
172 }
173}
174
175pub struct CatalogControllerInner {
176 pub(crate) db: DatabaseConnection,
177 #[expect(clippy::type_complexity)]
182 pub creating_table_finish_notifier:
183 HashMap<DatabaseId, HashMap<JobId, Vec<Sender<Result<NotificationVersion, String>>>>>,
184 pub dropped_tables: HashMap<TableId, PbTable>,
186}
187
188impl CatalogController {
189 pub(crate) async fn notify_frontend(
190 &self,
191 operation: NotificationOperation,
192 info: NotificationInfo,
193 ) -> NotificationVersion {
194 self.env
195 .notification_manager()
196 .notify_frontend(operation, info)
197 .await
198 }
199
200 pub(crate) async fn notify_frontend_relation_info(
201 &self,
202 operation: NotificationOperation,
203 relation_info: PbObjectInfo,
204 ) -> NotificationVersion {
205 self.env
206 .notification_manager()
207 .notify_frontend_object_info(operation, relation_info)
208 .await
209 }
210
211 pub(crate) async fn notify_frontend_trivial(&self) -> NotificationVersion {
218 self.env
219 .notification_manager()
220 .notify_frontend(
221 NotificationOperation::Update,
222 NotificationInfo::ObjectGroup(PbObjectGroup {
223 objects: vec![],
224 dependencies: vec![],
225 }),
226 )
227 .await
228 }
229}
230
231impl CatalogController {
232 pub async fn finish_create_subscription_catalog(
233 &self,
234 subscription_id: SubscriptionId,
235 ) -> MetaResult<()> {
236 let inner = self.inner.write().await;
237 let txn = inner.db.begin().await?;
238
239 let res = Object::update_many()
241 .col_expr(object::Column::CreatedAt, Expr::current_timestamp().into())
242 .col_expr(
243 object::Column::CreatedAtClusterVersion,
244 current_cluster_version().into(),
245 )
246 .filter(object::Column::Oid.eq(subscription_id))
247 .exec(&txn)
248 .await?;
249 if res.rows_affected == 0 {
250 return Err(MetaError::catalog_id_not_found(
251 "subscription",
252 subscription_id,
253 ));
254 }
255
256 let job = subscription::ActiveModel {
258 subscription_id: Set(subscription_id),
259 subscription_state: Set(SubscriptionState::Created.into()),
260 ..Default::default()
261 };
262 Subscription::update(job).exec(&txn).await?;
263
264 let _ = grant_default_privileges_automatically(&txn, subscription_id).await?;
265
266 txn.commit().await?;
267
268 Ok(())
269 }
270
271 pub async fn notify_create_subscription(
272 &self,
273 subscription_id: SubscriptionId,
274 ) -> MetaResult<NotificationVersion> {
275 let inner = self.inner.read().await;
276 let txn = inner.db.begin().await?;
277 let (subscription, obj) = Subscription::find_by_id(subscription_id)
278 .find_also_related(Object)
279 .filter(subscription::Column::SubscriptionState.eq(SubscriptionState::Created as i32))
280 .one(&txn)
281 .await?
282 .ok_or_else(|| MetaError::catalog_id_not_found("subscription", subscription_id))?;
283
284 let dependencies =
285 list_object_dependencies_by_object_id(&txn, subscription_id.into()).await?;
286 txn.commit().await?;
287
288 let mut version = self
289 .notify_frontend(
290 NotificationOperation::Add,
291 NotificationInfo::ObjectGroup(PbObjectGroup {
292 objects: vec![PbObject {
293 object_info: PbObjectInfo::Subscription(
294 ObjectModel(subscription, obj.unwrap(), None).into(),
295 )
296 .into(),
297 }],
298 dependencies,
299 }),
300 )
301 .await;
302
303 let updated_user_ids: Vec<UserId> = UserPrivilege::find()
305 .select_only()
306 .distinct()
307 .column(user_privilege::Column::UserId)
308 .filter(user_privilege::Column::Oid.eq(subscription_id.as_object_id()))
309 .into_tuple()
310 .all(&inner.db)
311 .await?;
312
313 if !updated_user_ids.is_empty() {
314 let updated_user_infos = list_user_info_by_ids(updated_user_ids, &inner.db).await?;
315 version = self.notify_users_update(updated_user_infos).await;
316 }
317
318 Ok(version)
319 }
320
321 pub async fn get_connector_usage(&self) -> MetaResult<jsonbb::Value> {
323 let inner = self.inner.read().await;
343 let source_props_and_info: Vec<(i32, Property, Option<StreamSourceInfo>)> = Source::find()
344 .select_only()
345 .column(source::Column::SourceId)
346 .column(source::Column::WithProperties)
347 .column(source::Column::SourceInfo)
348 .into_tuple()
349 .all(&inner.db)
350 .await?;
351 let sink_props_and_info: Vec<(i32, Property, Option<SinkFormatDesc>)> = Sink::find()
352 .select_only()
353 .column(sink::Column::SinkId)
354 .column(sink::Column::Properties)
355 .column(sink::Column::SinkFormatDesc)
356 .into_tuple()
357 .all(&inner.db)
358 .await?;
359 drop(inner);
360
361 let get_connector_from_property = |property: &Property| -> String {
362 property
363 .0
364 .get(UPSTREAM_SOURCE_KEY)
365 .cloned()
366 .unwrap_or_default()
367 };
368
369 let source_report: Vec<jsonbb::Value> = source_props_and_info
370 .iter()
371 .map(|(oid, property, info)| {
372 let connector_name = get_connector_from_property(property);
373 let mut format = None;
374 let mut encode = None;
375 if let Some(info) = info {
376 let pb_info = info.to_protobuf();
377 format = Some(pb_info.format().as_str_name());
378 encode = Some(pb_info.row_encode().as_str_name());
379 }
380 jsonbb::json!({
381 oid.to_string(): {
382 "connector": connector_name,
383 "format": format,
384 "encode": encode,
385 },
386 })
387 })
388 .collect_vec();
389
390 let sink_report: Vec<jsonbb::Value> = sink_props_and_info
391 .iter()
392 .map(|(oid, property, info)| {
393 let connector_name = get_connector_from_property(property);
394 let mut format = None;
395 let mut encode = None;
396 if let Some(info) = info {
397 let pb_info = info.to_protobuf();
398 format = Some(pb_info.format().as_str_name());
399 encode = Some(pb_info.encode().as_str_name());
400 }
401 jsonbb::json!({
402 oid.to_string(): {
403 "connector": connector_name,
404 "format": format,
405 "encode": encode,
406 },
407 })
408 })
409 .collect_vec();
410
411 Ok(jsonbb::json!({
412 "source": source_report,
413 "sink": sink_report,
414 }))
415 }
416
417 pub async fn clean_dirty_subscription(
418 &self,
419 database_id: Option<DatabaseId>,
420 ) -> MetaResult<()> {
421 let inner = self.inner.write().await;
422 let txn = inner.db.begin().await?;
423 let filter_condition = object::Column::ObjType.eq(ObjectType::Subscription).and(
424 object::Column::Oid.not_in_subquery(
425 Query::select()
426 .column(subscription::Column::SubscriptionId)
427 .from(Subscription)
428 .and_where(
429 subscription::Column::SubscriptionState
430 .eq(SubscriptionState::Created as i32),
431 )
432 .take(),
433 ),
434 );
435
436 let filter_condition = if let Some(database_id) = database_id {
437 filter_condition.and(object::Column::DatabaseId.eq(database_id))
438 } else {
439 filter_condition
440 };
441 Object::delete_many()
442 .filter(filter_condition)
443 .exec(&txn)
444 .await?;
445 txn.commit().await?;
446 Ok(())
448 }
449
450 pub async fn clean_dirty_creating_jobs(
452 &self,
453 database_id: Option<DatabaseId>,
454 ) -> MetaResult<Vec<SourceId>> {
455 let inner = self.inner.write().await;
456 let txn = inner.db.begin().await?;
457
458 let filter_condition = streaming_job::Column::JobStatus.eq(JobStatus::Initial).or(
459 streaming_job::Column::JobStatus
460 .eq(JobStatus::Creating)
461 .and(streaming_job::Column::CreateType.eq(CreateType::Foreground)),
462 );
463
464 let filter_condition = if let Some(database_id) = database_id {
465 filter_condition.and(object::Column::DatabaseId.eq(database_id))
466 } else {
467 filter_condition
468 };
469
470 let mut dirty_job_objs: Vec<PartialObject> = streaming_job::Entity::find()
471 .select_only()
472 .column(streaming_job::Column::JobId)
473 .columns([
474 object::Column::Oid,
475 object::Column::ObjType,
476 object::Column::SchemaId,
477 object::Column::DatabaseId,
478 ])
479 .join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
480 .filter(filter_condition)
481 .into_partial_model()
482 .all(&txn)
483 .await?;
484
485 let dirty_iceberg_jobs = find_dirty_iceberg_table_jobs(&txn, database_id).await?;
487 if !dirty_iceberg_jobs.is_empty() {
488 dirty_job_objs.extend(dirty_iceberg_jobs);
489 }
490
491 Self::clean_dirty_sink_downstreams(&txn).await?;
492
493 if dirty_job_objs.is_empty() {
494 return Ok(vec![]);
495 }
496
497 self.log_cleaned_dirty_jobs(&dirty_job_objs, &txn).await?;
498
499 let dirty_job_ids = dirty_job_objs.iter().map(|obj| obj.oid).collect::<Vec<_>>();
500
501 let all_dirty_table_ids = dirty_job_objs
504 .iter()
505 .filter(|obj| obj.obj_type == ObjectType::Table)
506 .map(|obj| obj.oid)
507 .collect_vec();
508 let dirty_table_type_map: HashMap<ObjectId, TableType> = Table::find()
509 .select_only()
510 .column(table::Column::TableId)
511 .column(table::Column::TableType)
512 .filter(table::Column::TableId.is_in(all_dirty_table_ids))
513 .into_tuple::<(ObjectId, TableType)>()
514 .all(&txn)
515 .await?
516 .into_iter()
517 .collect();
518
519 let dirty_background_jobs: HashSet<ObjectId> = streaming_job::Entity::find()
520 .select_only()
521 .column(streaming_job::Column::JobId)
522 .filter(
523 streaming_job::Column::JobId
524 .is_in(dirty_job_ids.clone())
525 .and(streaming_job::Column::CreateType.eq(CreateType::Background)),
526 )
527 .into_tuple()
528 .all(&txn)
529 .await?
530 .into_iter()
531 .collect();
532
533 let to_notify_objs = dirty_job_objs
535 .into_iter()
536 .filter(|obj| {
537 matches!(
538 dirty_table_type_map.get(&obj.oid),
539 Some(TableType::MaterializedView)
540 ) || dirty_background_jobs.contains(&obj.oid)
541 })
542 .collect_vec();
543
544 let dirty_associated_source_ids: Vec<SourceId> = Table::find()
547 .select_only()
548 .column(table::Column::OptionalAssociatedSourceId)
549 .filter(
550 table::Column::TableId
551 .is_in(dirty_job_ids.clone())
552 .and(table::Column::OptionalAssociatedSourceId.is_not_null()),
553 )
554 .into_tuple()
555 .all(&txn)
556 .await?;
557
558 let dirty_state_table_ids: Vec<TableId> = Table::find()
559 .select_only()
560 .column(table::Column::TableId)
561 .filter(table::Column::BelongsToJobId.is_in(dirty_job_ids.clone()))
562 .into_tuple()
563 .all(&txn)
564 .await?;
565
566 let dirty_internal_table_objs = Object::find()
567 .select_only()
568 .columns([
569 object::Column::Oid,
570 object::Column::ObjType,
571 object::Column::SchemaId,
572 object::Column::DatabaseId,
573 ])
574 .join(JoinType::InnerJoin, object::Relation::Table.def())
575 .filter(table::Column::BelongsToJobId.is_in(to_notify_objs.iter().map(|obj| obj.oid)))
576 .into_partial_model()
577 .all(&txn)
578 .await?;
579
580 let to_delete_objs: HashSet<ObjectId> = dirty_job_ids
581 .clone()
582 .into_iter()
583 .chain(
584 dirty_state_table_ids
585 .into_iter()
586 .map(|table_id| table_id.as_object_id()),
587 )
588 .chain(
589 dirty_associated_source_ids
590 .iter()
591 .map(|source_id| source_id.as_object_id()),
592 )
593 .collect();
594
595 let res = Object::delete_many()
596 .filter(object::Column::Oid.is_in(to_delete_objs))
597 .exec(&txn)
598 .await?;
599 assert!(res.rows_affected > 0);
600
601 txn.commit().await?;
602
603 let object_group = build_object_group_for_delete(
604 to_notify_objs
605 .into_iter()
606 .chain(dirty_internal_table_objs.into_iter())
607 .collect_vec(),
608 );
609
610 let _version = self
611 .notify_frontend(NotificationOperation::Delete, object_group)
612 .await;
613
614 Ok(dirty_associated_source_ids)
615 }
616
617 pub async fn comment_on(&self, comment: PbComment) -> MetaResult<NotificationVersion> {
618 let inner = self.inner.write().await;
619 let txn = inner.db.begin().await?;
620 ensure_object_id(ObjectType::Database, comment.database_id, &txn).await?;
621 ensure_object_id(ObjectType::Schema, comment.schema_id, &txn).await?;
622 let (table_obj, streaming_job) = Object::find_by_id(comment.table_id)
623 .find_also_related(StreamingJob)
624 .one(&txn)
625 .await?
626 .ok_or_else(|| MetaError::catalog_id_not_found("object", comment.table_id))?;
627
628 let table = if let Some(col_idx) = comment.column_index {
629 let columns: ColumnCatalogArray = Table::find_by_id(comment.table_id)
630 .select_only()
631 .column(table::Column::Columns)
632 .into_tuple()
633 .one(&txn)
634 .await?
635 .ok_or_else(|| MetaError::catalog_id_not_found("table", comment.table_id))?;
636 let mut pb_columns = columns.to_protobuf();
637
638 let column = pb_columns
639 .get_mut(col_idx as usize)
640 .ok_or_else(|| MetaError::catalog_id_not_found("column", col_idx))?;
641 let column_desc = column.column_desc.as_mut().ok_or_else(|| {
642 anyhow!(
643 "column desc at index {} for table id {} not found",
644 col_idx,
645 comment.table_id
646 )
647 })?;
648 column_desc.description = comment.description;
649 table::ActiveModel {
650 table_id: Set(comment.table_id),
651 columns: Set(pb_columns.into()),
652 ..Default::default()
653 }
654 .update(&txn)
655 .await?
656 } else {
657 table::ActiveModel {
658 table_id: Set(comment.table_id),
659 description: Set(comment.description),
660 ..Default::default()
661 }
662 .update(&txn)
663 .await?
664 };
665 txn.commit().await?;
666
667 let version = self
668 .notify_frontend_relation_info(
669 NotificationOperation::Update,
670 PbObjectInfo::Table(ObjectModel(table, table_obj, streaming_job).into()),
671 )
672 .await;
673
674 Ok(version)
675 }
676
677 async fn notify_hummock_dropped_tables(&self, tables: Vec<PbTable>) {
678 if tables.is_empty() {
679 return;
680 }
681 let objects = tables
682 .into_iter()
683 .map(|t| PbObject {
684 object_info: Some(PbObjectInfo::Table(t)),
685 })
686 .collect();
687 let group = NotificationInfo::ObjectGroup(PbObjectGroup {
688 objects,
689 dependencies: vec![],
690 });
691 self.env
692 .notification_manager()
693 .notify_hummock(NotificationOperation::Delete, group.clone())
694 .await;
695 self.env
696 .notification_manager()
697 .notify_compactor(NotificationOperation::Delete, group)
698 .await;
699 }
700
701 pub async fn complete_dropped_tables(&self, table_ids: impl IntoIterator<Item = TableId>) {
702 let mut inner = self.inner.write().await;
703 let tables = inner.complete_dropped_tables(table_ids);
704 self.notify_hummock_dropped_tables(tables).await;
705 }
706
707 pub async fn cleanup_dropped_tables(&self) {
708 let mut inner = self.inner.write().await;
709 let tables = inner.dropped_tables.drain().map(|(_, t)| t).collect();
710 self.notify_hummock_dropped_tables(tables).await;
711 }
712
713 pub async fn stats(&self) -> MetaResult<CatalogStats> {
714 let inner = self.inner.read().await;
715
716 let mut table_num_map: HashMap<_, _> = Table::find()
717 .select_only()
718 .column(table::Column::TableType)
719 .column_as(table::Column::TableId.count(), "num")
720 .group_by(table::Column::TableType)
721 .having(table::Column::TableType.ne(TableType::Internal))
722 .into_tuple::<(TableType, i64)>()
723 .all(&inner.db)
724 .await?
725 .into_iter()
726 .map(|(table_type, num)| (table_type, num as u64))
727 .collect();
728
729 let source_num = Source::find().count(&inner.db).await?;
730 let sink_num = Sink::find().count(&inner.db).await?;
731 let function_num = Function::find().count(&inner.db).await?;
732 let streaming_job_num = StreamingJob::find().count(&inner.db).await?;
733
734 let actor_num = {
735 let guard = self.env.shared_actor_info.read_guard();
736 guard
737 .iter_over_fragments()
738 .map(|(_, fragment)| fragment.actors.len() as u64)
739 .sum::<u64>()
740 };
741 let database_num = Database::find().count(&inner.db).await?;
742
743 Ok(CatalogStats {
744 table_num: table_num_map.remove(&TableType::Table).unwrap_or(0),
745 mview_num: table_num_map
746 .remove(&TableType::MaterializedView)
747 .unwrap_or(0),
748 index_num: table_num_map.remove(&TableType::Index).unwrap_or(0),
749 source_num,
750 sink_num,
751 function_num,
752 streaming_job_num,
753 actor_num,
754 database_num,
755 })
756 }
757
758 pub async fn fetch_sink_with_state_table_ids(
759 &self,
760 sink_ids: HashSet<SinkId>,
761 ) -> MetaResult<HashMap<SinkId, Vec<TableId>>> {
762 let inner = self.inner.read().await;
763
764 let query = Fragment::find()
765 .select_only()
766 .columns([fragment::Column::JobId, fragment::Column::StateTableIds])
767 .filter(
768 fragment::Column::JobId
769 .is_in(sink_ids)
770 .and(FragmentTypeMask::intersects(FragmentTypeFlag::Sink)),
771 );
772
773 let rows: Vec<(JobId, TableIdArray)> = query.into_tuple().all(&inner.db).await?;
774
775 debug_assert!(rows.iter().map(|(job_id, _)| job_id).all_unique());
776
777 let result = rows
778 .into_iter()
779 .map(|(job_id, table_id_array)| (job_id.as_sink_id(), table_id_array.0))
780 .collect::<HashMap<_, _>>();
781
782 Ok(result)
783 }
784
785 pub async fn list_all_pending_sinks(
786 &self,
787 database_id: Option<DatabaseId>,
788 ) -> MetaResult<HashSet<SinkId>> {
789 let inner = self.inner.read().await;
790
791 let mut query = pending_sink_state::Entity::find()
792 .select_only()
793 .columns([pending_sink_state::Column::SinkId])
794 .filter(
795 pending_sink_state::Column::SinkState.eq(pending_sink_state::SinkState::Pending),
796 )
797 .distinct();
798
799 if let Some(db_id) = database_id {
800 query = query
801 .join(
802 JoinType::InnerJoin,
803 pending_sink_state::Relation::Object.def(),
804 )
805 .filter(object::Column::DatabaseId.eq(db_id));
806 }
807
808 let result: Vec<SinkId> = query.into_tuple().all(&inner.db).await?;
809
810 Ok(result.into_iter().collect())
811 }
812
813 pub async fn abort_pending_sink_epochs(
814 &self,
815 sink_committed_epoch: HashMap<SinkId, u64>,
816 ) -> MetaResult<()> {
817 let inner = self.inner.write().await;
818 let txn = inner.db.begin().await?;
819
820 for (sink_id, committed_epoch) in sink_committed_epoch {
821 pending_sink_state::Entity::update_many()
822 .col_expr(
823 pending_sink_state::Column::SinkState,
824 Expr::value(pending_sink_state::SinkState::Aborted),
825 )
826 .filter(
827 pending_sink_state::Column::SinkId
828 .eq(sink_id)
829 .and(pending_sink_state::Column::Epoch.gt(committed_epoch as i64)),
830 )
831 .exec(&txn)
832 .await?;
833 }
834
835 txn.commit().await?;
836 Ok(())
837 }
838}
839
840pub struct CatalogStats {
842 pub table_num: u64,
843 pub mview_num: u64,
844 pub index_num: u64,
845 pub source_num: u64,
846 pub sink_num: u64,
847 pub function_num: u64,
848 pub streaming_job_num: u64,
849 pub actor_num: u64,
850 pub database_num: u64,
851}
852
853impl CatalogControllerInner {
854 pub async fn snapshot(&self) -> MetaResult<(Catalog, Vec<PbUserInfo>)> {
855 let databases = self.list_databases().await?;
856 let schemas = self.list_schemas().await?;
857 let tables = self.list_tables().await?;
858 let sources = self.list_sources().await?;
859 let sinks = self.list_sinks().await?;
860 let subscriptions = self.list_subscriptions().await?;
861 let indexes = self.list_indexes().await?;
862 let views = self.list_views().await?;
863 let functions = self.list_functions().await?;
864 let connections = self.list_connections().await?;
865 let secrets = self.list_secrets().await?;
866
867 let users = self.list_users().await?;
868
869 Ok((
870 (
871 databases,
872 schemas,
873 tables,
874 sources,
875 sinks,
876 subscriptions,
877 indexes,
878 views,
879 functions,
880 connections,
881 secrets,
882 ),
883 users,
884 ))
885 }
886
887 async fn list_databases(&self) -> MetaResult<Vec<PbDatabase>> {
888 let db_objs = Database::find()
889 .find_also_related(Object)
890 .all(&self.db)
891 .await?;
892 Ok(db_objs
893 .into_iter()
894 .map(|(db, obj)| ObjectModel(db, obj.unwrap(), None).into())
895 .collect())
896 }
897
898 async fn list_schemas(&self) -> MetaResult<Vec<PbSchema>> {
899 let schema_objs = Schema::find()
900 .find_also_related(Object)
901 .all(&self.db)
902 .await?;
903
904 Ok(schema_objs
905 .into_iter()
906 .map(|(schema, obj)| ObjectModel(schema, obj.unwrap(), None).into())
907 .collect())
908 }
909
910 async fn list_users(&self) -> MetaResult<Vec<PbUserInfo>> {
911 let mut user_infos: Vec<PbUserInfo> = User::find()
912 .all(&self.db)
913 .await?
914 .into_iter()
915 .map(Into::into)
916 .collect();
917
918 for user_info in &mut user_infos {
919 user_info.grant_privileges = get_user_privilege(user_info.id as _, &self.db).await?;
920 }
921 Ok(user_infos)
922 }
923
924 pub async fn list_all_state_tables(&self) -> MetaResult<Vec<PbTable>> {
926 let table_objs = Table::find()
927 .find_also_related(Object)
928 .all(&self.db)
929 .await?;
930 let streaming_jobs = load_streaming_jobs_by_ids(
931 &self.db,
932 table_objs.iter().map(|(table, _)| table.job_id()),
933 )
934 .await?;
935
936 Ok(table_objs
937 .into_iter()
938 .map(|(table, obj)| {
939 let job_id = table.job_id();
940 let streaming_job = streaming_jobs.get(&job_id).cloned();
941 ObjectModel(table, obj.unwrap(), streaming_job).into()
942 })
943 .collect())
944 }
945
946 async fn list_tables(&self) -> MetaResult<Vec<PbTable>> {
948 let mut table_objs = Table::find()
949 .find_also_related(Object)
950 .all(&self.db)
951 .await?;
952
953 let all_streaming_jobs: HashMap<JobId, streaming_job::Model> = StreamingJob::find()
954 .all(&self.db)
955 .await?
956 .into_iter()
957 .map(|job| (job.job_id, job))
958 .collect();
959
960 let sink_ids: HashSet<SinkId> = Sink::find()
961 .select_only()
962 .column(sink::Column::SinkId)
963 .into_tuple::<SinkId>()
964 .all(&self.db)
965 .await?
966 .into_iter()
967 .collect();
968
969 let mview_job_ids: HashSet<JobId> = table_objs
970 .iter()
971 .filter_map(|(table, _)| {
972 if table.table_type == TableType::MaterializedView {
973 Some(table.table_id.as_job_id())
974 } else {
975 None
976 }
977 })
978 .collect();
979
980 table_objs.retain(|(table, _)| {
981 let job_id = table.job_id();
982
983 if sink_ids.contains(&job_id.as_sink_id()) || mview_job_ids.contains(&job_id) {
984 return true;
985 }
986 if let Some(streaming_job) = all_streaming_jobs.get(&job_id) {
987 return streaming_job.job_status == JobStatus::Created
988 || (streaming_job.create_type == CreateType::Background);
989 }
990 false
991 });
992
993 let mut tables = Vec::with_capacity(table_objs.len());
994 for (table, obj) in table_objs {
995 let job_id = table.job_id();
996 let streaming_job = all_streaming_jobs.get(&job_id).cloned();
997 let pb_table: PbTable = ObjectModel(table, obj.unwrap(), streaming_job).into();
998 tables.push(pb_table);
999 }
1000 Ok(tables)
1001 }
1002
1003 async fn list_sources(&self) -> MetaResult<Vec<PbSource>> {
1005 let mut source_objs = Source::find()
1006 .find_also_related(Object)
1007 .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
1008 .filter(
1009 streaming_job::Column::JobStatus
1010 .is_null()
1011 .or(streaming_job::Column::JobStatus.eq(JobStatus::Created)),
1012 )
1013 .all(&self.db)
1014 .await?;
1015
1016 let created_table_ids: HashSet<TableId> = Table::find()
1018 .select_only()
1019 .column(table::Column::TableId)
1020 .join(JoinType::InnerJoin, table::Relation::Object1.def())
1021 .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
1022 .filter(
1023 table::Column::OptionalAssociatedSourceId
1024 .is_not_null()
1025 .and(streaming_job::Column::JobStatus.eq(JobStatus::Created)),
1026 )
1027 .into_tuple()
1028 .all(&self.db)
1029 .await?
1030 .into_iter()
1031 .collect();
1032 source_objs.retain_mut(|(source, _)| {
1033 source.optional_associated_table_id.is_none()
1034 || created_table_ids.contains(&source.optional_associated_table_id.unwrap())
1035 });
1036
1037 Ok(source_objs
1038 .into_iter()
1039 .map(|(source, obj)| ObjectModel(source, obj.unwrap(), None).into())
1040 .collect())
1041 }
1042
1043 async fn list_sinks(&self) -> MetaResult<Vec<PbSink>> {
1045 let sink_objs = Sink::find()
1046 .find_also_related(Object)
1047 .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
1048 .all(&self.db)
1049 .await?;
1050 let streaming_jobs = load_streaming_jobs_by_ids(
1051 &self.db,
1052 sink_objs.iter().map(|(sink, _)| sink.sink_id.as_job_id()),
1053 )
1054 .await?;
1055
1056 Ok(sink_objs
1057 .into_iter()
1058 .map(|(sink, obj)| {
1059 let streaming_job = streaming_jobs.get(&sink.sink_id.as_job_id()).cloned();
1060 ObjectModel(sink, obj.unwrap(), streaming_job).into()
1061 })
1062 .collect())
1063 }
1064
1065 async fn list_subscriptions(&self) -> MetaResult<Vec<PbSubscription>> {
1067 let subscription_objs = Subscription::find()
1068 .find_also_related(Object)
1069 .filter(subscription::Column::SubscriptionState.eq(SubscriptionState::Created as i32))
1070 .all(&self.db)
1071 .await?;
1072
1073 Ok(subscription_objs
1074 .into_iter()
1075 .map(|(subscription, obj)| ObjectModel(subscription, obj.unwrap(), None).into())
1076 .collect())
1077 }
1078
1079 async fn list_views(&self) -> MetaResult<Vec<PbView>> {
1080 let view_objs = View::find().find_also_related(Object).all(&self.db).await?;
1081
1082 Ok(view_objs
1083 .into_iter()
1084 .map(|(view, obj)| ObjectModel(view, obj.unwrap(), None).into())
1085 .collect())
1086 }
1087
1088 async fn list_indexes(&self) -> MetaResult<Vec<PbIndex>> {
1090 let index_objs = Index::find()
1091 .find_also_related(Object)
1092 .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
1093 .filter(
1094 streaming_job::Column::JobStatus
1095 .eq(JobStatus::Created)
1096 .or(streaming_job::Column::CreateType.eq(CreateType::Background)),
1097 )
1098 .all(&self.db)
1099 .await?;
1100 let streaming_jobs = load_streaming_jobs_by_ids(
1101 &self.db,
1102 index_objs
1103 .iter()
1104 .map(|(index, _)| index.index_id.as_job_id()),
1105 )
1106 .await?;
1107
1108 Ok(index_objs
1109 .into_iter()
1110 .map(|(index, obj)| {
1111 let streaming_job = streaming_jobs.get(&index.index_id.as_job_id()).cloned();
1112 ObjectModel(index, obj.unwrap(), streaming_job).into()
1113 })
1114 .collect())
1115 }
1116
1117 async fn list_connections(&self) -> MetaResult<Vec<PbConnection>> {
1118 let conn_objs = Connection::find()
1119 .find_also_related(Object)
1120 .all(&self.db)
1121 .await?;
1122
1123 Ok(conn_objs
1124 .into_iter()
1125 .map(|(conn, obj)| ObjectModel(conn, obj.unwrap(), None).into())
1126 .collect())
1127 }
1128
1129 pub async fn list_secrets(&self) -> MetaResult<Vec<PbSecret>> {
1130 let secret_objs = Secret::find()
1131 .find_also_related(Object)
1132 .all(&self.db)
1133 .await?;
1134 Ok(secret_objs
1135 .into_iter()
1136 .map(|(secret, obj)| ObjectModel(secret, obj.unwrap(), None).into())
1137 .collect())
1138 }
1139
1140 async fn list_functions(&self) -> MetaResult<Vec<PbFunction>> {
1141 let func_objs = Function::find()
1142 .find_also_related(Object)
1143 .all(&self.db)
1144 .await?;
1145
1146 Ok(func_objs
1147 .into_iter()
1148 .map(|(func, obj)| ObjectModel(func, obj.unwrap(), None).into())
1149 .collect())
1150 }
1151
1152 pub(crate) fn register_finish_notifier(
1153 &mut self,
1154 database_id: DatabaseId,
1155 id: JobId,
1156 sender: Sender<Result<NotificationVersion, String>>,
1157 ) {
1158 self.creating_table_finish_notifier
1159 .entry(database_id)
1160 .or_default()
1161 .entry(id)
1162 .or_default()
1163 .push(sender);
1164 }
1165
1166 pub(crate) async fn streaming_job_is_finished(&mut self, id: JobId) -> MetaResult<bool> {
1167 let status = StreamingJob::find()
1168 .select_only()
1169 .column(streaming_job::Column::JobStatus)
1170 .filter(streaming_job::Column::JobId.eq(id))
1171 .into_tuple::<JobStatus>()
1172 .one(&self.db)
1173 .await?;
1174
1175 status
1176 .map(|status| status == JobStatus::Created)
1177 .ok_or_else(|| {
1178 MetaError::catalog_id_not_found("streaming job", "may have been cancelled/dropped")
1179 })
1180 }
1181
1182 pub(crate) fn notify_finish_failed(&mut self, database_id: Option<DatabaseId>, err: String) {
1183 if let Some(database_id) = database_id {
1184 if let Some(creating_tables) = self.creating_table_finish_notifier.remove(&database_id)
1185 {
1186 for tx in creating_tables.into_values().flatten() {
1187 let _ = tx.send(Err(err.clone()));
1188 }
1189 }
1190 } else {
1191 for tx in take(&mut self.creating_table_finish_notifier)
1192 .into_values()
1193 .flatten()
1194 .flat_map(|(_, txs)| txs.into_iter())
1195 {
1196 let _ = tx.send(Err(err.clone()));
1197 }
1198 }
1199 }
1200
1201 pub async fn list_time_travel_table_ids(&self) -> MetaResult<Vec<TableId>> {
1202 let table_ids: Vec<TableId> = Table::find()
1203 .select_only()
1204 .filter(table::Column::TableType.is_in(vec![
1205 TableType::Table,
1206 TableType::MaterializedView,
1207 TableType::Index,
1208 ]))
1209 .column(table::Column::TableId)
1210 .into_tuple()
1211 .all(&self.db)
1212 .await?;
1213 Ok(table_ids)
1214 }
1215
1216 pub(crate) fn complete_dropped_tables(
1219 &mut self,
1220 table_ids: impl IntoIterator<Item = TableId>,
1221 ) -> Vec<PbTable> {
1222 table_ids
1223 .into_iter()
1224 .filter_map(|table_id| {
1225 self.dropped_tables.remove(&table_id).map_or_else(
1226 || {
1227 tracing::warn!(%table_id, "table not found");
1228 None
1229 },
1230 Some,
1231 )
1232 })
1233 .collect()
1234 }
1235}