1mod alter_op;
16mod create_op;
17mod drop_op;
18mod get_op;
19mod list_op;
20mod test;
21mod util;
22
23use std::collections::{BTreeSet, HashMap, HashSet};
24use std::iter;
25use std::mem::take;
26use std::sync::Arc;
27
28use anyhow::{Context, anyhow};
29use itertools::Itertools;
30use risingwave_common::catalog::{
31 DEFAULT_SCHEMA_NAME, FragmentTypeFlag, SYSTEM_SCHEMAS, TableOption,
32};
33use risingwave_common::current_cluster_version;
34use risingwave_common::id::JobId;
35use risingwave_common::secret::LocalSecretManager;
36use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont_mut;
37use risingwave_connector::source::UPSTREAM_SOURCE_KEY;
38use risingwave_connector::source::cdc::build_cdc_table_id;
39use risingwave_meta_model::object::ObjectType;
40use risingwave_meta_model::prelude::*;
41use risingwave_meta_model::table::{RefreshState, TableType};
42use risingwave_meta_model::{
43 ActorId, ColumnCatalogArray, ConnectionId, CreateType, DatabaseId, FragmentId, I32Array,
44 IndexId, JobStatus, ObjectId, Property, SchemaId, SecretId, SinkFormatDesc, SinkId, SourceId,
45 StreamNode, StreamSourceInfo, StreamingParallelism, SubscriptionId, TableId, UserId, ViewId,
46 connection, database, fragment, function, index, object, object_dependency, schema, secret,
47 sink, source, streaming_job, subscription, table, user_privilege, view,
48};
49use risingwave_pb::catalog::connection::Info as ConnectionInfo;
50use risingwave_pb::catalog::subscription::SubscriptionState;
51use risingwave_pb::catalog::table::PbTableType;
52use risingwave_pb::catalog::{
53 PbComment, PbConnection, PbDatabase, PbFunction, PbIndex, PbSchema, PbSecret, PbSink, PbSource,
54 PbStreamJobStatus, PbSubscription, PbTable, PbView,
55};
56use risingwave_pb::meta::cancel_creating_jobs_request::PbCreatingJobInfo;
57use risingwave_pb::meta::list_object_dependencies_response::PbObjectDependencies;
58use risingwave_pb::meta::object::PbObjectInfo;
59use risingwave_pb::meta::subscribe_response::{
60 Info as NotificationInfo, Info, Operation as NotificationOperation, Operation,
61};
62use risingwave_pb::meta::{PbObject, PbObjectGroup};
63use risingwave_pb::stream_plan::stream_node::NodeBody;
64use risingwave_pb::telemetry::PbTelemetryEventStage;
65use risingwave_pb::user::PbUserInfo;
66use sea_orm::ActiveValue::Set;
67use sea_orm::sea_query::{Expr, Query, SimpleExpr};
68use sea_orm::{
69 ActiveModelTrait, ColumnTrait, DatabaseConnection, DatabaseTransaction, EntityTrait,
70 IntoActiveModel, JoinType, PaginatorTrait, QueryFilter, QuerySelect, RelationTrait,
71 SelectColumns, TransactionTrait, Value,
72};
73use tokio::sync::oneshot::Sender;
74use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
75use tracing::info;
76
77use super::utils::{
78 check_subscription_name_duplicate, get_internal_tables_by_id, rename_relation,
79 rename_relation_refer,
80};
81use crate::controller::ObjectModel;
82use crate::controller::catalog::util::update_internal_tables;
83use crate::controller::utils::*;
84use crate::manager::{
85 IGNORED_NOTIFICATION_VERSION, MetaSrvEnv, NotificationVersion,
86 get_referred_connection_ids_from_source, get_referred_secret_ids_from_source,
87};
88use crate::rpc::ddl_controller::DropMode;
89use crate::telemetry::{MetaTelemetryJobDesc, report_event};
90use crate::{MetaError, MetaResult};
91
92pub type Catalog = (
93 Vec<PbDatabase>,
94 Vec<PbSchema>,
95 Vec<PbTable>,
96 Vec<PbSource>,
97 Vec<PbSink>,
98 Vec<PbSubscription>,
99 Vec<PbIndex>,
100 Vec<PbView>,
101 Vec<PbFunction>,
102 Vec<PbConnection>,
103 Vec<PbSecret>,
104);
105
106pub type CatalogControllerRef = Arc<CatalogController>;
107
108pub struct CatalogController {
110 pub(crate) env: MetaSrvEnv,
111 pub(crate) inner: RwLock<CatalogControllerInner>,
112}
113
114#[derive(Clone, Default, Debug)]
115pub struct DropTableConnectorContext {
116 pub(crate) to_change_streaming_job_id: JobId,
118 pub(crate) to_remove_state_table_id: TableId,
119 pub(crate) to_remove_source_id: SourceId,
120}
121
122#[derive(Clone, Default, Debug)]
123pub struct ReleaseContext {
124 pub(crate) database_id: DatabaseId,
125 pub(crate) removed_streaming_job_ids: Vec<JobId>,
126 pub(crate) removed_state_table_ids: Vec<TableId>,
128
129 pub(crate) removed_secret_ids: Vec<SecretId>,
131 pub(crate) removed_source_ids: Vec<SourceId>,
133 pub(crate) removed_source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
136
137 pub(crate) removed_actors: HashSet<ActorId>,
138 pub(crate) removed_fragments: HashSet<FragmentId>,
139
140 pub(crate) removed_sink_fragment_by_targets: HashMap<FragmentId, Vec<FragmentId>>,
142
143 pub(crate) removed_iceberg_table_sinks: Vec<PbSink>,
145}
146
147impl CatalogController {
148 pub async fn new(env: MetaSrvEnv) -> MetaResult<Self> {
149 let meta_store = env.meta_store();
150 let catalog_controller = Self {
151 env,
152 inner: RwLock::new(CatalogControllerInner {
153 db: meta_store.conn,
154 creating_table_finish_notifier: HashMap::new(),
155 dropped_tables: HashMap::new(),
156 }),
157 };
158
159 catalog_controller.init().await?;
160 Ok(catalog_controller)
161 }
162
163 pub async fn get_inner_read_guard(&self) -> RwLockReadGuard<'_, CatalogControllerInner> {
166 self.inner.read().await
167 }
168
169 pub async fn get_inner_write_guard(&self) -> RwLockWriteGuard<'_, CatalogControllerInner> {
170 self.inner.write().await
171 }
172}
173
174pub struct CatalogControllerInner {
175 pub(crate) db: DatabaseConnection,
176 #[expect(clippy::type_complexity)]
181 pub creating_table_finish_notifier:
182 HashMap<DatabaseId, HashMap<JobId, Vec<Sender<Result<NotificationVersion, String>>>>>,
183 pub dropped_tables: HashMap<TableId, PbTable>,
185}
186
187impl CatalogController {
188 pub(crate) async fn notify_frontend(
189 &self,
190 operation: NotificationOperation,
191 info: NotificationInfo,
192 ) -> NotificationVersion {
193 self.env
194 .notification_manager()
195 .notify_frontend(operation, info)
196 .await
197 }
198
199 pub(crate) async fn notify_frontend_relation_info(
200 &self,
201 operation: NotificationOperation,
202 relation_info: PbObjectInfo,
203 ) -> NotificationVersion {
204 self.env
205 .notification_manager()
206 .notify_frontend_object_info(operation, relation_info)
207 .await
208 }
209
210 pub(crate) async fn current_notification_version(&self) -> NotificationVersion {
211 self.env.notification_manager().current_version().await
212 }
213}
214
215impl CatalogController {
216 pub async fn finish_create_subscription_catalog(
217 &self,
218 subscription_id: SubscriptionId,
219 ) -> MetaResult<()> {
220 let inner = self.inner.write().await;
221 let txn = inner.db.begin().await?;
222
223 let res = Object::update_many()
225 .col_expr(object::Column::CreatedAt, Expr::current_timestamp().into())
226 .col_expr(
227 object::Column::CreatedAtClusterVersion,
228 current_cluster_version().into(),
229 )
230 .filter(object::Column::Oid.eq(subscription_id))
231 .exec(&txn)
232 .await?;
233 if res.rows_affected == 0 {
234 return Err(MetaError::catalog_id_not_found(
235 "subscription",
236 subscription_id,
237 ));
238 }
239
240 let job = subscription::ActiveModel {
242 subscription_id: Set(subscription_id),
243 subscription_state: Set(SubscriptionState::Created.into()),
244 ..Default::default()
245 };
246 job.update(&txn).await?;
247
248 let _ = grant_default_privileges_automatically(&txn, subscription_id).await?;
249
250 txn.commit().await?;
251
252 Ok(())
253 }
254
255 pub async fn notify_create_subscription(
256 &self,
257 subscription_id: SubscriptionId,
258 ) -> MetaResult<NotificationVersion> {
259 let inner = self.inner.read().await;
260 let (subscription, obj) = Subscription::find_by_id(subscription_id)
261 .find_also_related(Object)
262 .filter(subscription::Column::SubscriptionState.eq(SubscriptionState::Created as i32))
263 .one(&inner.db)
264 .await?
265 .ok_or_else(|| MetaError::catalog_id_not_found("subscription", subscription_id))?;
266
267 let mut version = self
268 .notify_frontend(
269 NotificationOperation::Add,
270 NotificationInfo::ObjectGroup(PbObjectGroup {
271 objects: vec![PbObject {
272 object_info: PbObjectInfo::Subscription(
273 ObjectModel(subscription, obj.unwrap()).into(),
274 )
275 .into(),
276 }],
277 }),
278 )
279 .await;
280
281 let updated_user_ids: Vec<UserId> = UserPrivilege::find()
283 .select_only()
284 .distinct()
285 .column(user_privilege::Column::UserId)
286 .filter(user_privilege::Column::Oid.eq(subscription_id.as_object_id()))
287 .into_tuple()
288 .all(&inner.db)
289 .await?;
290
291 if !updated_user_ids.is_empty() {
292 let updated_user_infos = list_user_info_by_ids(updated_user_ids, &inner.db).await?;
293 version = self.notify_users_update(updated_user_infos).await;
294 }
295
296 Ok(version)
297 }
298
299 pub async fn get_connector_usage(&self) -> MetaResult<jsonbb::Value> {
301 let inner = self.inner.read().await;
321 let source_props_and_info: Vec<(i32, Property, Option<StreamSourceInfo>)> = Source::find()
322 .select_only()
323 .column(source::Column::SourceId)
324 .column(source::Column::WithProperties)
325 .column(source::Column::SourceInfo)
326 .into_tuple()
327 .all(&inner.db)
328 .await?;
329 let sink_props_and_info: Vec<(i32, Property, Option<SinkFormatDesc>)> = Sink::find()
330 .select_only()
331 .column(sink::Column::SinkId)
332 .column(sink::Column::Properties)
333 .column(sink::Column::SinkFormatDesc)
334 .into_tuple()
335 .all(&inner.db)
336 .await?;
337 drop(inner);
338
339 let get_connector_from_property = |property: &Property| -> String {
340 property
341 .0
342 .get(UPSTREAM_SOURCE_KEY)
343 .cloned()
344 .unwrap_or_default()
345 };
346
347 let source_report: Vec<jsonbb::Value> = source_props_and_info
348 .iter()
349 .map(|(oid, property, info)| {
350 let connector_name = get_connector_from_property(property);
351 let mut format = None;
352 let mut encode = None;
353 if let Some(info) = info {
354 let pb_info = info.to_protobuf();
355 format = Some(pb_info.format().as_str_name());
356 encode = Some(pb_info.row_encode().as_str_name());
357 }
358 jsonbb::json!({
359 oid.to_string(): {
360 "connector": connector_name,
361 "format": format,
362 "encode": encode,
363 },
364 })
365 })
366 .collect_vec();
367
368 let sink_report: Vec<jsonbb::Value> = sink_props_and_info
369 .iter()
370 .map(|(oid, property, info)| {
371 let connector_name = get_connector_from_property(property);
372 let mut format = None;
373 let mut encode = None;
374 if let Some(info) = info {
375 let pb_info = info.to_protobuf();
376 format = Some(pb_info.format().as_str_name());
377 encode = Some(pb_info.encode().as_str_name());
378 }
379 jsonbb::json!({
380 oid.to_string(): {
381 "connector": connector_name,
382 "format": format,
383 "encode": encode,
384 },
385 })
386 })
387 .collect_vec();
388
389 Ok(jsonbb::json!({
390 "source": source_report,
391 "sink": sink_report,
392 }))
393 }
394
395 pub async fn clean_dirty_subscription(
396 &self,
397 database_id: Option<DatabaseId>,
398 ) -> MetaResult<()> {
399 let inner = self.inner.write().await;
400 let txn = inner.db.begin().await?;
401 let filter_condition = object::Column::ObjType.eq(ObjectType::Subscription).and(
402 object::Column::Oid.not_in_subquery(
403 Query::select()
404 .column(subscription::Column::SubscriptionId)
405 .from(Subscription)
406 .and_where(
407 subscription::Column::SubscriptionState
408 .eq(SubscriptionState::Created as i32),
409 )
410 .take(),
411 ),
412 );
413
414 let filter_condition = if let Some(database_id) = database_id {
415 filter_condition.and(object::Column::DatabaseId.eq(database_id))
416 } else {
417 filter_condition
418 };
419 Object::delete_many()
420 .filter(filter_condition)
421 .exec(&txn)
422 .await?;
423 txn.commit().await?;
424 Ok(())
426 }
427
428 pub async fn clean_dirty_creating_jobs(
430 &self,
431 database_id: Option<DatabaseId>,
432 ) -> MetaResult<Vec<SourceId>> {
433 let inner = self.inner.write().await;
434 let txn = inner.db.begin().await?;
435
436 let filter_condition = streaming_job::Column::JobStatus.eq(JobStatus::Initial).or(
437 streaming_job::Column::JobStatus
438 .eq(JobStatus::Creating)
439 .and(streaming_job::Column::CreateType.eq(CreateType::Foreground)),
440 );
441
442 let filter_condition = if let Some(database_id) = database_id {
443 filter_condition.and(object::Column::DatabaseId.eq(database_id))
444 } else {
445 filter_condition
446 };
447
448 let mut dirty_job_objs: Vec<PartialObject> = streaming_job::Entity::find()
449 .select_only()
450 .column(streaming_job::Column::JobId)
451 .columns([
452 object::Column::Oid,
453 object::Column::ObjType,
454 object::Column::SchemaId,
455 object::Column::DatabaseId,
456 ])
457 .join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
458 .filter(filter_condition)
459 .into_partial_model()
460 .all(&txn)
461 .await?;
462
463 let dirty_iceberg_jobs = find_dirty_iceberg_table_jobs(&txn, database_id).await?;
465 if !dirty_iceberg_jobs.is_empty() {
466 dirty_job_objs.extend(dirty_iceberg_jobs);
467 }
468
469 Self::clean_dirty_sink_downstreams(&txn).await?;
470
471 if dirty_job_objs.is_empty() {
472 return Ok(vec![]);
473 }
474
475 self.log_cleaned_dirty_jobs(&dirty_job_objs, &txn).await?;
476
477 let dirty_job_ids = dirty_job_objs.iter().map(|obj| obj.oid).collect::<Vec<_>>();
478
479 let all_dirty_table_ids = dirty_job_objs
482 .iter()
483 .filter(|obj| obj.obj_type == ObjectType::Table)
484 .map(|obj| obj.oid)
485 .collect_vec();
486 let dirty_table_type_map: HashMap<ObjectId, TableType> = Table::find()
487 .select_only()
488 .column(table::Column::TableId)
489 .column(table::Column::TableType)
490 .filter(table::Column::TableId.is_in(all_dirty_table_ids))
491 .into_tuple::<(ObjectId, TableType)>()
492 .all(&txn)
493 .await?
494 .into_iter()
495 .collect();
496
497 let dirty_background_jobs: HashSet<ObjectId> = streaming_job::Entity::find()
498 .select_only()
499 .column(streaming_job::Column::JobId)
500 .filter(
501 streaming_job::Column::JobId
502 .is_in(dirty_job_ids.clone())
503 .and(streaming_job::Column::CreateType.eq(CreateType::Background)),
504 )
505 .into_tuple()
506 .all(&txn)
507 .await?
508 .into_iter()
509 .collect();
510
511 let to_notify_objs = dirty_job_objs
513 .into_iter()
514 .filter(|obj| {
515 matches!(
516 dirty_table_type_map.get(&obj.oid),
517 Some(TableType::MaterializedView)
518 ) || dirty_background_jobs.contains(&obj.oid)
519 })
520 .collect_vec();
521
522 let dirty_associated_source_ids: Vec<SourceId> = Table::find()
525 .select_only()
526 .column(table::Column::OptionalAssociatedSourceId)
527 .filter(
528 table::Column::TableId
529 .is_in(dirty_job_ids.clone())
530 .and(table::Column::OptionalAssociatedSourceId.is_not_null()),
531 )
532 .into_tuple()
533 .all(&txn)
534 .await?;
535
536 let dirty_state_table_ids: Vec<TableId> = Table::find()
537 .select_only()
538 .column(table::Column::TableId)
539 .filter(table::Column::BelongsToJobId.is_in(dirty_job_ids.clone()))
540 .into_tuple()
541 .all(&txn)
542 .await?;
543
544 let dirty_internal_table_objs = Object::find()
545 .select_only()
546 .columns([
547 object::Column::Oid,
548 object::Column::ObjType,
549 object::Column::SchemaId,
550 object::Column::DatabaseId,
551 ])
552 .join(JoinType::InnerJoin, object::Relation::Table.def())
553 .filter(table::Column::BelongsToJobId.is_in(to_notify_objs.iter().map(|obj| obj.oid)))
554 .into_partial_model()
555 .all(&txn)
556 .await?;
557
558 let to_delete_objs: HashSet<ObjectId> = dirty_job_ids
559 .clone()
560 .into_iter()
561 .chain(
562 dirty_state_table_ids
563 .into_iter()
564 .map(|table_id| table_id.as_object_id()),
565 )
566 .chain(
567 dirty_associated_source_ids
568 .iter()
569 .map(|source_id| source_id.as_object_id()),
570 )
571 .collect();
572
573 let res = Object::delete_many()
574 .filter(object::Column::Oid.is_in(to_delete_objs))
575 .exec(&txn)
576 .await?;
577 assert!(res.rows_affected > 0);
578
579 txn.commit().await?;
580
581 let object_group = build_object_group_for_delete(
582 to_notify_objs
583 .into_iter()
584 .chain(dirty_internal_table_objs.into_iter())
585 .collect_vec(),
586 );
587
588 let _version = self
589 .notify_frontend(NotificationOperation::Delete, object_group)
590 .await;
591
592 Ok(dirty_associated_source_ids)
593 }
594
595 pub async fn reset_refreshing_tables(&self, database_id: Option<DatabaseId>) -> MetaResult<()> {
597 let inner = self.inner.write().await;
598 let txn = inner.db.begin().await?;
599
600 let filter_condition = table::Column::RefreshState.eq(RefreshState::Refreshing);
604
605 let filter_condition = if let Some(database_id) = database_id {
606 filter_condition.and(object::Column::DatabaseId.eq(database_id))
607 } else {
608 filter_condition
609 };
610
611 let table_ids: Vec<TableId> = Table::find()
612 .find_also_related(Object)
613 .filter(filter_condition)
614 .all(&txn)
615 .await
616 .context("reset_refreshing_tables: finding table ids")?
617 .into_iter()
618 .map(|(t, _)| t.table_id)
619 .collect();
620
621 let res = Table::update_many()
622 .col_expr(table::Column::RefreshState, Expr::value(RefreshState::Idle))
623 .filter(table::Column::TableId.is_in(table_ids))
624 .exec(&txn)
625 .await
626 .context("reset_refreshing_tables: update refresh state")?;
627
628 txn.commit().await?;
629
630 tracing::debug!(
631 "reset refreshing tables: {} tables updated",
632 res.rows_affected
633 );
634
635 Ok(())
636 }
637
638 pub async fn comment_on(&self, comment: PbComment) -> MetaResult<NotificationVersion> {
639 let inner = self.inner.write().await;
640 let txn = inner.db.begin().await?;
641 ensure_object_id(ObjectType::Database, comment.database_id, &txn).await?;
642 ensure_object_id(ObjectType::Schema, comment.schema_id, &txn).await?;
643 let table_obj = Object::find_by_id(comment.table_id)
644 .one(&txn)
645 .await?
646 .ok_or_else(|| MetaError::catalog_id_not_found("table", comment.table_id))?;
647
648 let table = if let Some(col_idx) = comment.column_index {
649 let columns: ColumnCatalogArray = Table::find_by_id(comment.table_id)
650 .select_only()
651 .column(table::Column::Columns)
652 .into_tuple()
653 .one(&txn)
654 .await?
655 .ok_or_else(|| MetaError::catalog_id_not_found("table", comment.table_id))?;
656 let mut pb_columns = columns.to_protobuf();
657
658 let column = pb_columns
659 .get_mut(col_idx as usize)
660 .ok_or_else(|| MetaError::catalog_id_not_found("column", col_idx))?;
661 let column_desc = column.column_desc.as_mut().ok_or_else(|| {
662 anyhow!(
663 "column desc at index {} for table id {} not found",
664 col_idx,
665 comment.table_id
666 )
667 })?;
668 column_desc.description = comment.description;
669 table::ActiveModel {
670 table_id: Set(comment.table_id),
671 columns: Set(pb_columns.into()),
672 ..Default::default()
673 }
674 .update(&txn)
675 .await?
676 } else {
677 table::ActiveModel {
678 table_id: Set(comment.table_id),
679 description: Set(comment.description),
680 ..Default::default()
681 }
682 .update(&txn)
683 .await?
684 };
685 txn.commit().await?;
686
687 let version = self
688 .notify_frontend_relation_info(
689 NotificationOperation::Update,
690 PbObjectInfo::Table(ObjectModel(table, table_obj).into()),
691 )
692 .await;
693
694 Ok(version)
695 }
696
697 async fn notify_hummock_dropped_tables(&self, tables: Vec<PbTable>) {
698 if tables.is_empty() {
699 return;
700 }
701 let objects = tables
702 .into_iter()
703 .map(|t| PbObject {
704 object_info: Some(PbObjectInfo::Table(t)),
705 })
706 .collect();
707 let group = NotificationInfo::ObjectGroup(PbObjectGroup { objects });
708 self.env
709 .notification_manager()
710 .notify_hummock(NotificationOperation::Delete, group.clone())
711 .await;
712 self.env
713 .notification_manager()
714 .notify_compactor(NotificationOperation::Delete, group)
715 .await;
716 }
717
718 pub async fn complete_dropped_tables(&self, table_ids: impl IntoIterator<Item = TableId>) {
719 let mut inner = self.inner.write().await;
720 let tables = inner.complete_dropped_tables(table_ids);
721 self.notify_hummock_dropped_tables(tables).await;
722 }
723
724 pub async fn cleanup_dropped_tables(&self) {
725 let mut inner = self.inner.write().await;
726 let tables = inner.dropped_tables.drain().map(|(_, t)| t).collect();
727 self.notify_hummock_dropped_tables(tables).await;
728 }
729
730 pub async fn stats(&self) -> MetaResult<CatalogStats> {
731 let inner = self.inner.read().await;
732
733 let mut table_num_map: HashMap<_, _> = Table::find()
734 .select_only()
735 .column(table::Column::TableType)
736 .column_as(table::Column::TableId.count(), "num")
737 .group_by(table::Column::TableType)
738 .having(table::Column::TableType.ne(TableType::Internal))
739 .into_tuple::<(TableType, i64)>()
740 .all(&inner.db)
741 .await?
742 .into_iter()
743 .map(|(table_type, num)| (table_type, num as u64))
744 .collect();
745
746 let source_num = Source::find().count(&inner.db).await?;
747 let sink_num = Sink::find().count(&inner.db).await?;
748 let function_num = Function::find().count(&inner.db).await?;
749 let streaming_job_num = StreamingJob::find().count(&inner.db).await?;
750
751 let actor_num = {
752 let guard = self.env.shared_actor_info.read_guard();
753 guard
754 .iter_over_fragments()
755 .map(|(_, fragment)| fragment.actors.len() as u64)
756 .sum::<u64>()
757 };
758
759 Ok(CatalogStats {
760 table_num: table_num_map.remove(&TableType::Table).unwrap_or(0),
761 mview_num: table_num_map
762 .remove(&TableType::MaterializedView)
763 .unwrap_or(0),
764 index_num: table_num_map.remove(&TableType::Index).unwrap_or(0),
765 source_num,
766 sink_num,
767 function_num,
768 streaming_job_num,
769 actor_num,
770 })
771 }
772}
773
774pub struct CatalogStats {
776 pub table_num: u64,
777 pub mview_num: u64,
778 pub index_num: u64,
779 pub source_num: u64,
780 pub sink_num: u64,
781 pub function_num: u64,
782 pub streaming_job_num: u64,
783 pub actor_num: u64,
784}
785
786impl CatalogControllerInner {
787 pub async fn snapshot(&self) -> MetaResult<(Catalog, Vec<PbUserInfo>)> {
788 let databases = self.list_databases().await?;
789 let schemas = self.list_schemas().await?;
790 let tables = self.list_tables().await?;
791 let sources = self.list_sources().await?;
792 let sinks = self.list_sinks().await?;
793 let subscriptions = self.list_subscriptions().await?;
794 let indexes = self.list_indexes().await?;
795 let views = self.list_views().await?;
796 let functions = self.list_functions().await?;
797 let connections = self.list_connections().await?;
798 let secrets = self.list_secrets().await?;
799
800 let users = self.list_users().await?;
801
802 Ok((
803 (
804 databases,
805 schemas,
806 tables,
807 sources,
808 sinks,
809 subscriptions,
810 indexes,
811 views,
812 functions,
813 connections,
814 secrets,
815 ),
816 users,
817 ))
818 }
819
820 async fn list_databases(&self) -> MetaResult<Vec<PbDatabase>> {
821 let db_objs = Database::find()
822 .find_also_related(Object)
823 .all(&self.db)
824 .await?;
825 Ok(db_objs
826 .into_iter()
827 .map(|(db, obj)| ObjectModel(db, obj.unwrap()).into())
828 .collect())
829 }
830
831 async fn list_schemas(&self) -> MetaResult<Vec<PbSchema>> {
832 let schema_objs = Schema::find()
833 .find_also_related(Object)
834 .all(&self.db)
835 .await?;
836
837 Ok(schema_objs
838 .into_iter()
839 .map(|(schema, obj)| ObjectModel(schema, obj.unwrap()).into())
840 .collect())
841 }
842
843 async fn list_users(&self) -> MetaResult<Vec<PbUserInfo>> {
844 let mut user_infos: Vec<PbUserInfo> = User::find()
845 .all(&self.db)
846 .await?
847 .into_iter()
848 .map(Into::into)
849 .collect();
850
851 for user_info in &mut user_infos {
852 user_info.grant_privileges = get_user_privilege(user_info.id as _, &self.db).await?;
853 }
854 Ok(user_infos)
855 }
856
857 pub async fn list_all_state_tables(&self) -> MetaResult<Vec<PbTable>> {
859 let table_objs = Table::find()
860 .find_also_related(Object)
861 .all(&self.db)
862 .await?;
863
864 Ok(table_objs
865 .into_iter()
866 .map(|(table, obj)| ObjectModel(table, obj.unwrap()).into())
867 .collect())
868 }
869
870 async fn list_tables(&self) -> MetaResult<Vec<PbTable>> {
872 let table_objs = Table::find()
873 .find_also_related(Object)
874 .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
875 .filter(
876 streaming_job::Column::JobStatus.eq(JobStatus::Created).or(
877 table::Column::TableType
878 .eq(TableType::MaterializedView)
879 .or(streaming_job::Column::CreateType.eq(CreateType::Background)),
880 ),
881 )
882 .all(&self.db)
883 .await?;
884
885 let job_statuses: HashMap<JobId, JobStatus> = StreamingJob::find()
886 .select_only()
887 .column(streaming_job::Column::JobId)
888 .column(streaming_job::Column::JobStatus)
889 .filter(
890 streaming_job::Column::JobStatus
891 .eq(JobStatus::Created)
892 .or(streaming_job::Column::CreateType.eq(CreateType::Background)),
893 )
894 .into_tuple::<(JobId, JobStatus)>()
895 .all(&self.db)
896 .await?
897 .into_iter()
898 .collect();
899
900 let job_ids: HashSet<JobId> = table_objs
901 .iter()
902 .map(|(t, _)| t.table_id.as_job_id())
903 .chain(job_statuses.keys().cloned())
904 .collect();
905
906 let internal_table_objs = Table::find()
907 .find_also_related(Object)
908 .filter(
909 table::Column::TableType
910 .eq(TableType::Internal)
911 .and(table::Column::BelongsToJobId.is_in(job_ids)),
912 )
913 .all(&self.db)
914 .await?;
915
916 Ok(table_objs
917 .into_iter()
918 .chain(internal_table_objs.into_iter())
919 .map(|(table, obj)| {
920 let status: PbStreamJobStatus = if table.table_type == TableType::Internal {
923 (*job_statuses
924 .get(&table.belongs_to_job_id.unwrap())
925 .unwrap_or(&JobStatus::Creating))
926 .into()
927 } else {
928 (*job_statuses
929 .get(&table.table_id.as_job_id())
930 .unwrap_or(&JobStatus::Creating))
931 .into()
932 };
933 let mut pb_table: PbTable = ObjectModel(table, obj.unwrap()).into();
934 pb_table.stream_job_status = status.into();
935 pb_table
936 })
937 .collect())
938 }
939
940 async fn list_sources(&self) -> MetaResult<Vec<PbSource>> {
942 let mut source_objs = Source::find()
943 .find_also_related(Object)
944 .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
945 .filter(
946 streaming_job::Column::JobStatus
947 .is_null()
948 .or(streaming_job::Column::JobStatus.eq(JobStatus::Created)),
949 )
950 .all(&self.db)
951 .await?;
952
953 let created_table_ids: HashSet<TableId> = Table::find()
955 .select_only()
956 .column(table::Column::TableId)
957 .join(JoinType::InnerJoin, table::Relation::Object1.def())
958 .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
959 .filter(
960 table::Column::OptionalAssociatedSourceId
961 .is_not_null()
962 .and(streaming_job::Column::JobStatus.eq(JobStatus::Created)),
963 )
964 .into_tuple()
965 .all(&self.db)
966 .await?
967 .into_iter()
968 .collect();
969 source_objs.retain_mut(|(source, _)| {
970 source.optional_associated_table_id.is_none()
971 || created_table_ids.contains(&source.optional_associated_table_id.unwrap())
972 });
973
974 Ok(source_objs
975 .into_iter()
976 .map(|(source, obj)| ObjectModel(source, obj.unwrap()).into())
977 .collect())
978 }
979
980 async fn list_sinks(&self) -> MetaResult<Vec<PbSink>> {
982 let sink_objs = Sink::find()
983 .find_also_related(Object)
984 .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
985 .filter(
986 streaming_job::Column::JobStatus
987 .eq(JobStatus::Created)
988 .or(streaming_job::Column::CreateType.eq(CreateType::Background)),
989 )
990 .all(&self.db)
991 .await?;
992
993 let creating_sinks: HashSet<_> = StreamingJob::find()
994 .select_only()
995 .column(streaming_job::Column::JobId)
996 .filter(
997 streaming_job::Column::JobStatus
998 .eq(JobStatus::Creating)
999 .and(
1000 streaming_job::Column::JobId
1001 .is_in(sink_objs.iter().map(|(sink, _)| sink.sink_id)),
1002 ),
1003 )
1004 .into_tuple::<SinkId>()
1005 .all(&self.db)
1006 .await?
1007 .into_iter()
1008 .collect();
1009
1010 Ok(sink_objs
1011 .into_iter()
1012 .map(|(sink, obj)| {
1013 let is_creating = creating_sinks.contains(&sink.sink_id);
1014 let mut pb_sink: PbSink = ObjectModel(sink, obj.unwrap()).into();
1015 pb_sink.stream_job_status = if is_creating {
1016 PbStreamJobStatus::Creating.into()
1017 } else {
1018 PbStreamJobStatus::Created.into()
1019 };
1020 pb_sink
1021 })
1022 .collect())
1023 }
1024
1025 async fn list_subscriptions(&self) -> MetaResult<Vec<PbSubscription>> {
1027 let subscription_objs = Subscription::find()
1028 .find_also_related(Object)
1029 .filter(subscription::Column::SubscriptionState.eq(SubscriptionState::Created as i32))
1030 .all(&self.db)
1031 .await?;
1032
1033 Ok(subscription_objs
1034 .into_iter()
1035 .map(|(subscription, obj)| ObjectModel(subscription, obj.unwrap()).into())
1036 .collect())
1037 }
1038
1039 async fn list_views(&self) -> MetaResult<Vec<PbView>> {
1040 let view_objs = View::find().find_also_related(Object).all(&self.db).await?;
1041
1042 Ok(view_objs
1043 .into_iter()
1044 .map(|(view, obj)| ObjectModel(view, obj.unwrap()).into())
1045 .collect())
1046 }
1047
1048 async fn list_indexes(&self) -> MetaResult<Vec<PbIndex>> {
1050 let index_objs = Index::find()
1051 .find_also_related(Object)
1052 .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
1053 .filter(
1054 streaming_job::Column::JobStatus
1055 .eq(JobStatus::Created)
1056 .or(streaming_job::Column::CreateType.eq(CreateType::Background)),
1057 )
1058 .all(&self.db)
1059 .await?;
1060
1061 let creating_indexes: HashSet<_> = StreamingJob::find()
1062 .select_only()
1063 .column(streaming_job::Column::JobId)
1064 .filter(
1065 streaming_job::Column::JobStatus
1066 .eq(JobStatus::Creating)
1067 .and(
1068 streaming_job::Column::JobId
1069 .is_in(index_objs.iter().map(|(index, _)| index.index_id)),
1070 ),
1071 )
1072 .into_tuple::<IndexId>()
1073 .all(&self.db)
1074 .await?
1075 .into_iter()
1076 .collect();
1077
1078 Ok(index_objs
1079 .into_iter()
1080 .map(|(index, obj)| {
1081 let is_creating = creating_indexes.contains(&index.index_id);
1082 let mut pb_index: PbIndex = ObjectModel(index, obj.unwrap()).into();
1083 pb_index.stream_job_status = if is_creating {
1084 PbStreamJobStatus::Creating.into()
1085 } else {
1086 PbStreamJobStatus::Created.into()
1087 };
1088 pb_index
1089 })
1090 .collect())
1091 }
1092
1093 async fn list_connections(&self) -> MetaResult<Vec<PbConnection>> {
1094 let conn_objs = Connection::find()
1095 .find_also_related(Object)
1096 .all(&self.db)
1097 .await?;
1098
1099 Ok(conn_objs
1100 .into_iter()
1101 .map(|(conn, obj)| ObjectModel(conn, obj.unwrap()).into())
1102 .collect())
1103 }
1104
1105 pub async fn list_secrets(&self) -> MetaResult<Vec<PbSecret>> {
1106 let secret_objs = Secret::find()
1107 .find_also_related(Object)
1108 .all(&self.db)
1109 .await?;
1110 Ok(secret_objs
1111 .into_iter()
1112 .map(|(secret, obj)| ObjectModel(secret, obj.unwrap()).into())
1113 .collect())
1114 }
1115
1116 async fn list_functions(&self) -> MetaResult<Vec<PbFunction>> {
1117 let func_objs = Function::find()
1118 .find_also_related(Object)
1119 .all(&self.db)
1120 .await?;
1121
1122 Ok(func_objs
1123 .into_iter()
1124 .map(|(func, obj)| ObjectModel(func, obj.unwrap()).into())
1125 .collect())
1126 }
1127
1128 pub(crate) fn register_finish_notifier(
1129 &mut self,
1130 database_id: DatabaseId,
1131 id: JobId,
1132 sender: Sender<Result<NotificationVersion, String>>,
1133 ) {
1134 self.creating_table_finish_notifier
1135 .entry(database_id)
1136 .or_default()
1137 .entry(id)
1138 .or_default()
1139 .push(sender);
1140 }
1141
1142 pub(crate) async fn streaming_job_is_finished(&mut self, id: JobId) -> MetaResult<bool> {
1143 let status = StreamingJob::find()
1144 .select_only()
1145 .column(streaming_job::Column::JobStatus)
1146 .filter(streaming_job::Column::JobId.eq(id))
1147 .into_tuple::<JobStatus>()
1148 .one(&self.db)
1149 .await?;
1150
1151 status
1152 .map(|status| status == JobStatus::Created)
1153 .ok_or_else(|| {
1154 MetaError::catalog_id_not_found("streaming job", "may have been cancelled/dropped")
1155 })
1156 }
1157
1158 pub(crate) fn notify_finish_failed(&mut self, database_id: Option<DatabaseId>, err: String) {
1159 if let Some(database_id) = database_id {
1160 if let Some(creating_tables) = self.creating_table_finish_notifier.remove(&database_id)
1161 {
1162 for tx in creating_tables.into_values().flatten() {
1163 let _ = tx.send(Err(err.clone()));
1164 }
1165 }
1166 } else {
1167 for tx in take(&mut self.creating_table_finish_notifier)
1168 .into_values()
1169 .flatten()
1170 .flat_map(|(_, txs)| txs.into_iter())
1171 {
1172 let _ = tx.send(Err(err.clone()));
1173 }
1174 }
1175 }
1176
1177 pub async fn list_time_travel_table_ids(&self) -> MetaResult<Vec<TableId>> {
1178 let table_ids: Vec<TableId> = Table::find()
1179 .select_only()
1180 .filter(table::Column::TableType.is_in(vec![
1181 TableType::Table,
1182 TableType::MaterializedView,
1183 TableType::Index,
1184 ]))
1185 .column(table::Column::TableId)
1186 .into_tuple()
1187 .all(&self.db)
1188 .await?;
1189 Ok(table_ids)
1190 }
1191
1192 pub(crate) fn complete_dropped_tables(
1195 &mut self,
1196 table_ids: impl IntoIterator<Item = TableId>,
1197 ) -> Vec<PbTable> {
1198 table_ids
1199 .into_iter()
1200 .filter_map(|table_id| {
1201 self.dropped_tables.remove(&table_id).map_or_else(
1202 || {
1203 tracing::warn!(%table_id, "table not found");
1204 None
1205 },
1206 Some,
1207 )
1208 })
1209 .collect()
1210 }
1211}