1mod alter_op;
16mod create_op;
17mod drop_op;
18mod get_op;
19mod list_op;
20mod test;
21mod util;
22
23use std::collections::{BTreeSet, HashMap, HashSet};
24use std::iter;
25use std::mem::take;
26use std::sync::Arc;
27
28use anyhow::anyhow;
29use itertools::Itertools;
30use risingwave_common::catalog::{
31 DEFAULT_SCHEMA_NAME, FragmentTypeFlag, FragmentTypeMask, SYSTEM_SCHEMAS, TableOption,
32};
33use risingwave_common::current_cluster_version;
34use risingwave_common::id::JobId;
35use risingwave_common::secret::LocalSecretManager;
36use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont_mut;
37use risingwave_connector::source::UPSTREAM_SOURCE_KEY;
38use risingwave_connector::source::cdc::build_cdc_table_id;
39use risingwave_meta_model::object::ObjectType;
40use risingwave_meta_model::prelude::*;
41use risingwave_meta_model::table::TableType;
42use risingwave_meta_model::{
43 ActorId, ColumnCatalogArray, ConnectionId, CreateType, DatabaseId, FragmentId, I32Array,
44 IndexId, JobStatus, ObjectId, Property, SchemaId, SecretId, SinkFormatDesc, SinkId, SourceId,
45 StreamNode, StreamSourceInfo, StreamingParallelism, SubscriptionId, TableId, TableIdArray,
46 UserId, ViewId, connection, database, fragment, function, index, object, object_dependency,
47 pending_sink_state, schema, secret, sink, source, streaming_job, subscription, table,
48 user_privilege, view,
49};
50use risingwave_pb::catalog::connection::Info as ConnectionInfo;
51use risingwave_pb::catalog::subscription::SubscriptionState;
52use risingwave_pb::catalog::table::PbTableType;
53use risingwave_pb::catalog::{
54 PbComment, PbConnection, PbDatabase, PbFunction, PbIndex, PbSchema, PbSecret, PbSink, PbSource,
55 PbStreamJobStatus, PbSubscription, PbTable, PbView,
56};
57use risingwave_pb::meta::cancel_creating_jobs_request::PbCreatingJobInfo;
58use risingwave_pb::meta::list_object_dependencies_response::PbObjectDependencies;
59use risingwave_pb::meta::object::PbObjectInfo;
60use risingwave_pb::meta::subscribe_response::{
61 Info as NotificationInfo, Info, Operation as NotificationOperation, Operation,
62};
63use risingwave_pb::meta::{PbObject, PbObjectGroup};
64use risingwave_pb::stream_plan::stream_node::NodeBody;
65use risingwave_pb::telemetry::PbTelemetryEventStage;
66use risingwave_pb::user::PbUserInfo;
67use sea_orm::ActiveValue::Set;
68use sea_orm::sea_query::{Expr, Query, SimpleExpr};
69use sea_orm::{
70 ActiveModelTrait, ColumnTrait, DatabaseConnection, DatabaseTransaction, EntityTrait,
71 IntoActiveModel, JoinType, PaginatorTrait, QueryFilter, QuerySelect, RelationTrait,
72 SelectColumns, TransactionTrait, Value,
73};
74use tokio::sync::oneshot::Sender;
75use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
76use tracing::info;
77
78use super::utils::{
79 check_subscription_name_duplicate, get_internal_tables_by_id, rename_relation,
80 rename_relation_refer,
81};
82use crate::controller::ObjectModel;
83use crate::controller::catalog::util::update_internal_tables;
84use crate::controller::fragment::FragmentTypeMaskExt;
85use crate::controller::utils::*;
86use crate::manager::{
87 IGNORED_NOTIFICATION_VERSION, MetaSrvEnv, NotificationVersion,
88 get_referred_connection_ids_from_source, get_referred_secret_ids_from_source,
89};
90use crate::rpc::ddl_controller::DropMode;
91use crate::telemetry::{MetaTelemetryJobDesc, report_event};
92use crate::{MetaError, MetaResult};
93
94pub type Catalog = (
95 Vec<PbDatabase>,
96 Vec<PbSchema>,
97 Vec<PbTable>,
98 Vec<PbSource>,
99 Vec<PbSink>,
100 Vec<PbSubscription>,
101 Vec<PbIndex>,
102 Vec<PbView>,
103 Vec<PbFunction>,
104 Vec<PbConnection>,
105 Vec<PbSecret>,
106);
107
108pub type CatalogControllerRef = Arc<CatalogController>;
109
110pub struct CatalogController {
112 pub(crate) env: MetaSrvEnv,
113 pub(crate) inner: RwLock<CatalogControllerInner>,
114}
115
116#[derive(Clone, Default, Debug)]
117pub struct DropTableConnectorContext {
118 pub(crate) to_change_streaming_job_id: JobId,
120 pub(crate) to_remove_state_table_id: TableId,
121 pub(crate) to_remove_source_id: SourceId,
122}
123
124#[derive(Clone, Default, Debug)]
125pub struct ReleaseContext {
126 pub(crate) database_id: DatabaseId,
127 pub(crate) removed_streaming_job_ids: Vec<JobId>,
128 pub(crate) removed_state_table_ids: Vec<TableId>,
130
131 pub(crate) removed_secret_ids: Vec<SecretId>,
133 pub(crate) removed_source_ids: Vec<SourceId>,
135 pub(crate) removed_source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
138
139 pub(crate) removed_actors: HashSet<ActorId>,
140 pub(crate) removed_fragments: HashSet<FragmentId>,
141
142 pub(crate) removed_sink_fragment_by_targets: HashMap<FragmentId, Vec<FragmentId>>,
144
145 pub(crate) removed_iceberg_table_sinks: Vec<PbSink>,
147}
148
149impl CatalogController {
150 pub async fn new(env: MetaSrvEnv) -> MetaResult<Self> {
151 let meta_store = env.meta_store();
152 let catalog_controller = Self {
153 env,
154 inner: RwLock::new(CatalogControllerInner {
155 db: meta_store.conn,
156 creating_table_finish_notifier: HashMap::new(),
157 dropped_tables: HashMap::new(),
158 }),
159 };
160
161 catalog_controller.init().await?;
162 Ok(catalog_controller)
163 }
164
165 pub async fn get_inner_read_guard(&self) -> RwLockReadGuard<'_, CatalogControllerInner> {
168 self.inner.read().await
169 }
170
171 pub async fn get_inner_write_guard(&self) -> RwLockWriteGuard<'_, CatalogControllerInner> {
172 self.inner.write().await
173 }
174}
175
176pub struct CatalogControllerInner {
177 pub(crate) db: DatabaseConnection,
178 #[expect(clippy::type_complexity)]
183 pub creating_table_finish_notifier:
184 HashMap<DatabaseId, HashMap<JobId, Vec<Sender<Result<NotificationVersion, String>>>>>,
185 pub dropped_tables: HashMap<TableId, PbTable>,
187}
188
189impl CatalogController {
190 pub(crate) async fn notify_frontend(
191 &self,
192 operation: NotificationOperation,
193 info: NotificationInfo,
194 ) -> NotificationVersion {
195 self.env
196 .notification_manager()
197 .notify_frontend(operation, info)
198 .await
199 }
200
201 pub(crate) async fn notify_frontend_relation_info(
202 &self,
203 operation: NotificationOperation,
204 relation_info: PbObjectInfo,
205 ) -> NotificationVersion {
206 self.env
207 .notification_manager()
208 .notify_frontend_object_info(operation, relation_info)
209 .await
210 }
211
212 pub(crate) async fn current_notification_version(&self) -> NotificationVersion {
213 self.env.notification_manager().current_version().await
214 }
215}
216
217impl CatalogController {
218 pub async fn finish_create_subscription_catalog(
219 &self,
220 subscription_id: SubscriptionId,
221 ) -> MetaResult<()> {
222 let inner = self.inner.write().await;
223 let txn = inner.db.begin().await?;
224
225 let res = Object::update_many()
227 .col_expr(object::Column::CreatedAt, Expr::current_timestamp().into())
228 .col_expr(
229 object::Column::CreatedAtClusterVersion,
230 current_cluster_version().into(),
231 )
232 .filter(object::Column::Oid.eq(subscription_id))
233 .exec(&txn)
234 .await?;
235 if res.rows_affected == 0 {
236 return Err(MetaError::catalog_id_not_found(
237 "subscription",
238 subscription_id,
239 ));
240 }
241
242 let job = subscription::ActiveModel {
244 subscription_id: Set(subscription_id),
245 subscription_state: Set(SubscriptionState::Created.into()),
246 ..Default::default()
247 };
248 job.update(&txn).await?;
249
250 let _ = grant_default_privileges_automatically(&txn, subscription_id).await?;
251
252 txn.commit().await?;
253
254 Ok(())
255 }
256
257 pub async fn notify_create_subscription(
258 &self,
259 subscription_id: SubscriptionId,
260 ) -> MetaResult<NotificationVersion> {
261 let inner = self.inner.read().await;
262 let (subscription, obj) = Subscription::find_by_id(subscription_id)
263 .find_also_related(Object)
264 .filter(subscription::Column::SubscriptionState.eq(SubscriptionState::Created as i32))
265 .one(&inner.db)
266 .await?
267 .ok_or_else(|| MetaError::catalog_id_not_found("subscription", subscription_id))?;
268
269 let mut version = self
270 .notify_frontend(
271 NotificationOperation::Add,
272 NotificationInfo::ObjectGroup(PbObjectGroup {
273 objects: vec![PbObject {
274 object_info: PbObjectInfo::Subscription(
275 ObjectModel(subscription, obj.unwrap()).into(),
276 )
277 .into(),
278 }],
279 }),
280 )
281 .await;
282
283 let updated_user_ids: Vec<UserId> = UserPrivilege::find()
285 .select_only()
286 .distinct()
287 .column(user_privilege::Column::UserId)
288 .filter(user_privilege::Column::Oid.eq(subscription_id.as_object_id()))
289 .into_tuple()
290 .all(&inner.db)
291 .await?;
292
293 if !updated_user_ids.is_empty() {
294 let updated_user_infos = list_user_info_by_ids(updated_user_ids, &inner.db).await?;
295 version = self.notify_users_update(updated_user_infos).await;
296 }
297
298 Ok(version)
299 }
300
301 pub async fn get_connector_usage(&self) -> MetaResult<jsonbb::Value> {
303 let inner = self.inner.read().await;
323 let source_props_and_info: Vec<(i32, Property, Option<StreamSourceInfo>)> = Source::find()
324 .select_only()
325 .column(source::Column::SourceId)
326 .column(source::Column::WithProperties)
327 .column(source::Column::SourceInfo)
328 .into_tuple()
329 .all(&inner.db)
330 .await?;
331 let sink_props_and_info: Vec<(i32, Property, Option<SinkFormatDesc>)> = Sink::find()
332 .select_only()
333 .column(sink::Column::SinkId)
334 .column(sink::Column::Properties)
335 .column(sink::Column::SinkFormatDesc)
336 .into_tuple()
337 .all(&inner.db)
338 .await?;
339 drop(inner);
340
341 let get_connector_from_property = |property: &Property| -> String {
342 property
343 .0
344 .get(UPSTREAM_SOURCE_KEY)
345 .cloned()
346 .unwrap_or_default()
347 };
348
349 let source_report: Vec<jsonbb::Value> = source_props_and_info
350 .iter()
351 .map(|(oid, property, info)| {
352 let connector_name = get_connector_from_property(property);
353 let mut format = None;
354 let mut encode = None;
355 if let Some(info) = info {
356 let pb_info = info.to_protobuf();
357 format = Some(pb_info.format().as_str_name());
358 encode = Some(pb_info.row_encode().as_str_name());
359 }
360 jsonbb::json!({
361 oid.to_string(): {
362 "connector": connector_name,
363 "format": format,
364 "encode": encode,
365 },
366 })
367 })
368 .collect_vec();
369
370 let sink_report: Vec<jsonbb::Value> = sink_props_and_info
371 .iter()
372 .map(|(oid, property, info)| {
373 let connector_name = get_connector_from_property(property);
374 let mut format = None;
375 let mut encode = None;
376 if let Some(info) = info {
377 let pb_info = info.to_protobuf();
378 format = Some(pb_info.format().as_str_name());
379 encode = Some(pb_info.encode().as_str_name());
380 }
381 jsonbb::json!({
382 oid.to_string(): {
383 "connector": connector_name,
384 "format": format,
385 "encode": encode,
386 },
387 })
388 })
389 .collect_vec();
390
391 Ok(jsonbb::json!({
392 "source": source_report,
393 "sink": sink_report,
394 }))
395 }
396
397 pub async fn clean_dirty_subscription(
398 &self,
399 database_id: Option<DatabaseId>,
400 ) -> MetaResult<()> {
401 let inner = self.inner.write().await;
402 let txn = inner.db.begin().await?;
403 let filter_condition = object::Column::ObjType.eq(ObjectType::Subscription).and(
404 object::Column::Oid.not_in_subquery(
405 Query::select()
406 .column(subscription::Column::SubscriptionId)
407 .from(Subscription)
408 .and_where(
409 subscription::Column::SubscriptionState
410 .eq(SubscriptionState::Created as i32),
411 )
412 .take(),
413 ),
414 );
415
416 let filter_condition = if let Some(database_id) = database_id {
417 filter_condition.and(object::Column::DatabaseId.eq(database_id))
418 } else {
419 filter_condition
420 };
421 Object::delete_many()
422 .filter(filter_condition)
423 .exec(&txn)
424 .await?;
425 txn.commit().await?;
426 Ok(())
428 }
429
430 pub async fn clean_dirty_creating_jobs(
432 &self,
433 database_id: Option<DatabaseId>,
434 ) -> MetaResult<Vec<SourceId>> {
435 let inner = self.inner.write().await;
436 let txn = inner.db.begin().await?;
437
438 let filter_condition = streaming_job::Column::JobStatus.eq(JobStatus::Initial).or(
439 streaming_job::Column::JobStatus
440 .eq(JobStatus::Creating)
441 .and(streaming_job::Column::CreateType.eq(CreateType::Foreground)),
442 );
443
444 let filter_condition = if let Some(database_id) = database_id {
445 filter_condition.and(object::Column::DatabaseId.eq(database_id))
446 } else {
447 filter_condition
448 };
449
450 let mut dirty_job_objs: Vec<PartialObject> = streaming_job::Entity::find()
451 .select_only()
452 .column(streaming_job::Column::JobId)
453 .columns([
454 object::Column::Oid,
455 object::Column::ObjType,
456 object::Column::SchemaId,
457 object::Column::DatabaseId,
458 ])
459 .join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
460 .filter(filter_condition)
461 .into_partial_model()
462 .all(&txn)
463 .await?;
464
465 let dirty_iceberg_jobs = find_dirty_iceberg_table_jobs(&txn, database_id).await?;
467 if !dirty_iceberg_jobs.is_empty() {
468 dirty_job_objs.extend(dirty_iceberg_jobs);
469 }
470
471 Self::clean_dirty_sink_downstreams(&txn).await?;
472
473 if dirty_job_objs.is_empty() {
474 return Ok(vec![]);
475 }
476
477 self.log_cleaned_dirty_jobs(&dirty_job_objs, &txn).await?;
478
479 let dirty_job_ids = dirty_job_objs.iter().map(|obj| obj.oid).collect::<Vec<_>>();
480
481 let all_dirty_table_ids = dirty_job_objs
484 .iter()
485 .filter(|obj| obj.obj_type == ObjectType::Table)
486 .map(|obj| obj.oid)
487 .collect_vec();
488 let dirty_table_type_map: HashMap<ObjectId, TableType> = Table::find()
489 .select_only()
490 .column(table::Column::TableId)
491 .column(table::Column::TableType)
492 .filter(table::Column::TableId.is_in(all_dirty_table_ids))
493 .into_tuple::<(ObjectId, TableType)>()
494 .all(&txn)
495 .await?
496 .into_iter()
497 .collect();
498
499 let dirty_background_jobs: HashSet<ObjectId> = streaming_job::Entity::find()
500 .select_only()
501 .column(streaming_job::Column::JobId)
502 .filter(
503 streaming_job::Column::JobId
504 .is_in(dirty_job_ids.clone())
505 .and(streaming_job::Column::CreateType.eq(CreateType::Background)),
506 )
507 .into_tuple()
508 .all(&txn)
509 .await?
510 .into_iter()
511 .collect();
512
513 let to_notify_objs = dirty_job_objs
515 .into_iter()
516 .filter(|obj| {
517 matches!(
518 dirty_table_type_map.get(&obj.oid),
519 Some(TableType::MaterializedView)
520 ) || dirty_background_jobs.contains(&obj.oid)
521 })
522 .collect_vec();
523
524 let dirty_associated_source_ids: Vec<SourceId> = Table::find()
527 .select_only()
528 .column(table::Column::OptionalAssociatedSourceId)
529 .filter(
530 table::Column::TableId
531 .is_in(dirty_job_ids.clone())
532 .and(table::Column::OptionalAssociatedSourceId.is_not_null()),
533 )
534 .into_tuple()
535 .all(&txn)
536 .await?;
537
538 let dirty_state_table_ids: Vec<TableId> = Table::find()
539 .select_only()
540 .column(table::Column::TableId)
541 .filter(table::Column::BelongsToJobId.is_in(dirty_job_ids.clone()))
542 .into_tuple()
543 .all(&txn)
544 .await?;
545
546 let dirty_internal_table_objs = Object::find()
547 .select_only()
548 .columns([
549 object::Column::Oid,
550 object::Column::ObjType,
551 object::Column::SchemaId,
552 object::Column::DatabaseId,
553 ])
554 .join(JoinType::InnerJoin, object::Relation::Table.def())
555 .filter(table::Column::BelongsToJobId.is_in(to_notify_objs.iter().map(|obj| obj.oid)))
556 .into_partial_model()
557 .all(&txn)
558 .await?;
559
560 let to_delete_objs: HashSet<ObjectId> = dirty_job_ids
561 .clone()
562 .into_iter()
563 .chain(
564 dirty_state_table_ids
565 .into_iter()
566 .map(|table_id| table_id.as_object_id()),
567 )
568 .chain(
569 dirty_associated_source_ids
570 .iter()
571 .map(|source_id| source_id.as_object_id()),
572 )
573 .collect();
574
575 let res = Object::delete_many()
576 .filter(object::Column::Oid.is_in(to_delete_objs))
577 .exec(&txn)
578 .await?;
579 assert!(res.rows_affected > 0);
580
581 txn.commit().await?;
582
583 let object_group = build_object_group_for_delete(
584 to_notify_objs
585 .into_iter()
586 .chain(dirty_internal_table_objs.into_iter())
587 .collect_vec(),
588 );
589
590 let _version = self
591 .notify_frontend(NotificationOperation::Delete, object_group)
592 .await;
593
594 Ok(dirty_associated_source_ids)
595 }
596
597 pub async fn comment_on(&self, comment: PbComment) -> MetaResult<NotificationVersion> {
598 let inner = self.inner.write().await;
599 let txn = inner.db.begin().await?;
600 ensure_object_id(ObjectType::Database, comment.database_id, &txn).await?;
601 ensure_object_id(ObjectType::Schema, comment.schema_id, &txn).await?;
602 let table_obj = Object::find_by_id(comment.table_id)
603 .one(&txn)
604 .await?
605 .ok_or_else(|| MetaError::catalog_id_not_found("table", comment.table_id))?;
606
607 let table = if let Some(col_idx) = comment.column_index {
608 let columns: ColumnCatalogArray = Table::find_by_id(comment.table_id)
609 .select_only()
610 .column(table::Column::Columns)
611 .into_tuple()
612 .one(&txn)
613 .await?
614 .ok_or_else(|| MetaError::catalog_id_not_found("table", comment.table_id))?;
615 let mut pb_columns = columns.to_protobuf();
616
617 let column = pb_columns
618 .get_mut(col_idx as usize)
619 .ok_or_else(|| MetaError::catalog_id_not_found("column", col_idx))?;
620 let column_desc = column.column_desc.as_mut().ok_or_else(|| {
621 anyhow!(
622 "column desc at index {} for table id {} not found",
623 col_idx,
624 comment.table_id
625 )
626 })?;
627 column_desc.description = comment.description;
628 table::ActiveModel {
629 table_id: Set(comment.table_id),
630 columns: Set(pb_columns.into()),
631 ..Default::default()
632 }
633 .update(&txn)
634 .await?
635 } else {
636 table::ActiveModel {
637 table_id: Set(comment.table_id),
638 description: Set(comment.description),
639 ..Default::default()
640 }
641 .update(&txn)
642 .await?
643 };
644 txn.commit().await?;
645
646 let version = self
647 .notify_frontend_relation_info(
648 NotificationOperation::Update,
649 PbObjectInfo::Table(ObjectModel(table, table_obj).into()),
650 )
651 .await;
652
653 Ok(version)
654 }
655
656 async fn notify_hummock_dropped_tables(&self, tables: Vec<PbTable>) {
657 if tables.is_empty() {
658 return;
659 }
660 let objects = tables
661 .into_iter()
662 .map(|t| PbObject {
663 object_info: Some(PbObjectInfo::Table(t)),
664 })
665 .collect();
666 let group = NotificationInfo::ObjectGroup(PbObjectGroup { objects });
667 self.env
668 .notification_manager()
669 .notify_hummock(NotificationOperation::Delete, group.clone())
670 .await;
671 self.env
672 .notification_manager()
673 .notify_compactor(NotificationOperation::Delete, group)
674 .await;
675 }
676
677 pub async fn complete_dropped_tables(&self, table_ids: impl IntoIterator<Item = TableId>) {
678 let mut inner = self.inner.write().await;
679 let tables = inner.complete_dropped_tables(table_ids);
680 self.notify_hummock_dropped_tables(tables).await;
681 }
682
683 pub async fn cleanup_dropped_tables(&self) {
684 let mut inner = self.inner.write().await;
685 let tables = inner.dropped_tables.drain().map(|(_, t)| t).collect();
686 self.notify_hummock_dropped_tables(tables).await;
687 }
688
689 pub async fn stats(&self) -> MetaResult<CatalogStats> {
690 let inner = self.inner.read().await;
691
692 let mut table_num_map: HashMap<_, _> = Table::find()
693 .select_only()
694 .column(table::Column::TableType)
695 .column_as(table::Column::TableId.count(), "num")
696 .group_by(table::Column::TableType)
697 .having(table::Column::TableType.ne(TableType::Internal))
698 .into_tuple::<(TableType, i64)>()
699 .all(&inner.db)
700 .await?
701 .into_iter()
702 .map(|(table_type, num)| (table_type, num as u64))
703 .collect();
704
705 let source_num = Source::find().count(&inner.db).await?;
706 let sink_num = Sink::find().count(&inner.db).await?;
707 let function_num = Function::find().count(&inner.db).await?;
708 let streaming_job_num = StreamingJob::find().count(&inner.db).await?;
709
710 let actor_num = {
711 let guard = self.env.shared_actor_info.read_guard();
712 guard
713 .iter_over_fragments()
714 .map(|(_, fragment)| fragment.actors.len() as u64)
715 .sum::<u64>()
716 };
717 let database_num = Database::find().count(&inner.db).await?;
718
719 Ok(CatalogStats {
720 table_num: table_num_map.remove(&TableType::Table).unwrap_or(0),
721 mview_num: table_num_map
722 .remove(&TableType::MaterializedView)
723 .unwrap_or(0),
724 index_num: table_num_map.remove(&TableType::Index).unwrap_or(0),
725 source_num,
726 sink_num,
727 function_num,
728 streaming_job_num,
729 actor_num,
730 database_num,
731 })
732 }
733
734 pub async fn fetch_sink_with_state_table_ids(
735 &self,
736 sink_ids: HashSet<SinkId>,
737 ) -> MetaResult<HashMap<SinkId, Vec<TableId>>> {
738 let inner = self.inner.read().await;
739
740 let query = Fragment::find()
741 .select_only()
742 .columns([fragment::Column::JobId, fragment::Column::StateTableIds])
743 .filter(
744 fragment::Column::JobId
745 .is_in(sink_ids)
746 .and(FragmentTypeMask::intersects(FragmentTypeFlag::Sink)),
747 );
748
749 let rows: Vec<(JobId, TableIdArray)> = query.into_tuple().all(&inner.db).await?;
750
751 debug_assert!(rows.iter().map(|(job_id, _)| job_id).all_unique());
752
753 let result = rows
754 .into_iter()
755 .map(|(job_id, table_id_array)| (job_id.as_sink_id(), table_id_array.0))
756 .collect::<HashMap<_, _>>();
757
758 Ok(result)
759 }
760
761 pub async fn list_all_pending_sinks(
762 &self,
763 database_id: Option<DatabaseId>,
764 ) -> MetaResult<HashSet<SinkId>> {
765 let inner = self.inner.read().await;
766
767 let mut query = pending_sink_state::Entity::find()
768 .select_only()
769 .columns([pending_sink_state::Column::SinkId])
770 .filter(
771 pending_sink_state::Column::SinkState.eq(pending_sink_state::SinkState::Pending),
772 )
773 .distinct();
774
775 if let Some(db_id) = database_id {
776 query = query
777 .join(
778 JoinType::InnerJoin,
779 pending_sink_state::Relation::Object.def(),
780 )
781 .filter(object::Column::DatabaseId.eq(db_id));
782 }
783
784 let result: Vec<SinkId> = query.into_tuple().all(&inner.db).await?;
785
786 Ok(result.into_iter().collect())
787 }
788
789 pub async fn abort_pending_sink_epochs(
790 &self,
791 sink_committed_epoch: HashMap<SinkId, u64>,
792 ) -> MetaResult<()> {
793 let inner = self.inner.write().await;
794 let txn = inner.db.begin().await?;
795
796 for (sink_id, committed_epoch) in sink_committed_epoch {
797 pending_sink_state::Entity::update_many()
798 .col_expr(
799 pending_sink_state::Column::SinkState,
800 Expr::value(pending_sink_state::SinkState::Aborted),
801 )
802 .filter(
803 pending_sink_state::Column::SinkId
804 .eq(sink_id)
805 .and(pending_sink_state::Column::Epoch.gt(committed_epoch as i64)),
806 )
807 .exec(&txn)
808 .await?;
809 }
810
811 txn.commit().await?;
812 Ok(())
813 }
814}
815
816pub struct CatalogStats {
818 pub table_num: u64,
819 pub mview_num: u64,
820 pub index_num: u64,
821 pub source_num: u64,
822 pub sink_num: u64,
823 pub function_num: u64,
824 pub streaming_job_num: u64,
825 pub actor_num: u64,
826 pub database_num: u64,
827}
828
829impl CatalogControllerInner {
830 pub async fn snapshot(&self) -> MetaResult<(Catalog, Vec<PbUserInfo>)> {
831 let databases = self.list_databases().await?;
832 let schemas = self.list_schemas().await?;
833 let tables = self.list_tables().await?;
834 let sources = self.list_sources().await?;
835 let sinks = self.list_sinks().await?;
836 let subscriptions = self.list_subscriptions().await?;
837 let indexes = self.list_indexes().await?;
838 let views = self.list_views().await?;
839 let functions = self.list_functions().await?;
840 let connections = self.list_connections().await?;
841 let secrets = self.list_secrets().await?;
842
843 let users = self.list_users().await?;
844
845 Ok((
846 (
847 databases,
848 schemas,
849 tables,
850 sources,
851 sinks,
852 subscriptions,
853 indexes,
854 views,
855 functions,
856 connections,
857 secrets,
858 ),
859 users,
860 ))
861 }
862
863 async fn list_databases(&self) -> MetaResult<Vec<PbDatabase>> {
864 let db_objs = Database::find()
865 .find_also_related(Object)
866 .all(&self.db)
867 .await?;
868 Ok(db_objs
869 .into_iter()
870 .map(|(db, obj)| ObjectModel(db, obj.unwrap()).into())
871 .collect())
872 }
873
874 async fn list_schemas(&self) -> MetaResult<Vec<PbSchema>> {
875 let schema_objs = Schema::find()
876 .find_also_related(Object)
877 .all(&self.db)
878 .await?;
879
880 Ok(schema_objs
881 .into_iter()
882 .map(|(schema, obj)| ObjectModel(schema, obj.unwrap()).into())
883 .collect())
884 }
885
886 async fn list_users(&self) -> MetaResult<Vec<PbUserInfo>> {
887 let mut user_infos: Vec<PbUserInfo> = User::find()
888 .all(&self.db)
889 .await?
890 .into_iter()
891 .map(Into::into)
892 .collect();
893
894 for user_info in &mut user_infos {
895 user_info.grant_privileges = get_user_privilege(user_info.id as _, &self.db).await?;
896 }
897 Ok(user_infos)
898 }
899
900 pub async fn list_all_state_tables(&self) -> MetaResult<Vec<PbTable>> {
902 let table_objs = Table::find()
903 .find_also_related(Object)
904 .all(&self.db)
905 .await?;
906
907 Ok(table_objs
908 .into_iter()
909 .map(|(table, obj)| ObjectModel(table, obj.unwrap()).into())
910 .collect())
911 }
912
913 async fn list_tables(&self) -> MetaResult<Vec<PbTable>> {
915 let table_objs = Table::find()
916 .find_also_related(Object)
917 .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
918 .filter(
919 streaming_job::Column::JobStatus.eq(JobStatus::Created).or(
920 table::Column::TableType
921 .eq(TableType::MaterializedView)
922 .or(streaming_job::Column::CreateType.eq(CreateType::Background)),
923 ),
924 )
925 .all(&self.db)
926 .await?;
927
928 let job_statuses: HashMap<JobId, JobStatus> = StreamingJob::find()
929 .select_only()
930 .column(streaming_job::Column::JobId)
931 .column(streaming_job::Column::JobStatus)
932 .filter(
933 streaming_job::Column::JobStatus
934 .eq(JobStatus::Created)
935 .or(streaming_job::Column::CreateType.eq(CreateType::Background)),
936 )
937 .into_tuple::<(JobId, JobStatus)>()
938 .all(&self.db)
939 .await?
940 .into_iter()
941 .collect();
942
943 let sink_ids: HashSet<SinkId> = Sink::find()
944 .select_only()
945 .column(sink::Column::SinkId)
946 .into_tuple::<SinkId>()
947 .all(&self.db)
948 .await?
949 .into_iter()
950 .collect();
951
952 let job_ids: HashSet<JobId> = table_objs
953 .iter()
954 .map(|(t, _)| t.table_id.as_job_id())
955 .chain(job_statuses.keys().cloned())
956 .chain(sink_ids.into_iter().map(|s| s.as_job_id()))
957 .collect();
958
959 let internal_table_objs = Table::find()
960 .find_also_related(Object)
961 .filter(
962 table::Column::TableType
963 .eq(TableType::Internal)
964 .and(table::Column::BelongsToJobId.is_in(job_ids)),
965 )
966 .all(&self.db)
967 .await?;
968
969 Ok(table_objs
970 .into_iter()
971 .chain(internal_table_objs.into_iter())
972 .map(|(table, obj)| {
973 let status: PbStreamJobStatus = if table.table_type == TableType::Internal {
976 (*job_statuses
977 .get(&table.belongs_to_job_id.unwrap())
978 .unwrap_or(&JobStatus::Creating))
979 .into()
980 } else {
981 (*job_statuses
982 .get(&table.table_id.as_job_id())
983 .unwrap_or(&JobStatus::Creating))
984 .into()
985 };
986 let mut pb_table: PbTable = ObjectModel(table, obj.unwrap()).into();
987 pb_table.stream_job_status = status.into();
988 pb_table
989 })
990 .collect())
991 }
992
993 async fn list_sources(&self) -> MetaResult<Vec<PbSource>> {
995 let mut source_objs = Source::find()
996 .find_also_related(Object)
997 .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
998 .filter(
999 streaming_job::Column::JobStatus
1000 .is_null()
1001 .or(streaming_job::Column::JobStatus.eq(JobStatus::Created)),
1002 )
1003 .all(&self.db)
1004 .await?;
1005
1006 let created_table_ids: HashSet<TableId> = Table::find()
1008 .select_only()
1009 .column(table::Column::TableId)
1010 .join(JoinType::InnerJoin, table::Relation::Object1.def())
1011 .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
1012 .filter(
1013 table::Column::OptionalAssociatedSourceId
1014 .is_not_null()
1015 .and(streaming_job::Column::JobStatus.eq(JobStatus::Created)),
1016 )
1017 .into_tuple()
1018 .all(&self.db)
1019 .await?
1020 .into_iter()
1021 .collect();
1022 source_objs.retain_mut(|(source, _)| {
1023 source.optional_associated_table_id.is_none()
1024 || created_table_ids.contains(&source.optional_associated_table_id.unwrap())
1025 });
1026
1027 Ok(source_objs
1028 .into_iter()
1029 .map(|(source, obj)| ObjectModel(source, obj.unwrap()).into())
1030 .collect())
1031 }
1032
1033 async fn list_sinks(&self) -> MetaResult<Vec<PbSink>> {
1035 let sink_objs = Sink::find()
1036 .find_also_related(Object)
1037 .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
1038 .all(&self.db)
1039 .await?;
1040
1041 let creating_sinks: HashSet<_> = StreamingJob::find()
1042 .select_only()
1043 .column(streaming_job::Column::JobId)
1044 .filter(
1045 streaming_job::Column::JobStatus
1046 .eq(JobStatus::Creating)
1047 .and(
1048 streaming_job::Column::JobId
1049 .is_in(sink_objs.iter().map(|(sink, _)| sink.sink_id)),
1050 ),
1051 )
1052 .into_tuple::<SinkId>()
1053 .all(&self.db)
1054 .await?
1055 .into_iter()
1056 .collect();
1057
1058 Ok(sink_objs
1059 .into_iter()
1060 .map(|(sink, obj)| {
1061 let is_creating = creating_sinks.contains(&sink.sink_id);
1062 let mut pb_sink: PbSink = ObjectModel(sink, obj.unwrap()).into();
1063 pb_sink.stream_job_status = if is_creating {
1064 PbStreamJobStatus::Creating.into()
1065 } else {
1066 PbStreamJobStatus::Created.into()
1067 };
1068 pb_sink
1069 })
1070 .collect())
1071 }
1072
1073 async fn list_subscriptions(&self) -> MetaResult<Vec<PbSubscription>> {
1075 let subscription_objs = Subscription::find()
1076 .find_also_related(Object)
1077 .filter(subscription::Column::SubscriptionState.eq(SubscriptionState::Created as i32))
1078 .all(&self.db)
1079 .await?;
1080
1081 Ok(subscription_objs
1082 .into_iter()
1083 .map(|(subscription, obj)| ObjectModel(subscription, obj.unwrap()).into())
1084 .collect())
1085 }
1086
1087 async fn list_views(&self) -> MetaResult<Vec<PbView>> {
1088 let view_objs = View::find().find_also_related(Object).all(&self.db).await?;
1089
1090 Ok(view_objs
1091 .into_iter()
1092 .map(|(view, obj)| ObjectModel(view, obj.unwrap()).into())
1093 .collect())
1094 }
1095
1096 async fn list_indexes(&self) -> MetaResult<Vec<PbIndex>> {
1098 let index_objs = Index::find()
1099 .find_also_related(Object)
1100 .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
1101 .filter(
1102 streaming_job::Column::JobStatus
1103 .eq(JobStatus::Created)
1104 .or(streaming_job::Column::CreateType.eq(CreateType::Background)),
1105 )
1106 .all(&self.db)
1107 .await?;
1108
1109 let creating_indexes: HashSet<_> = StreamingJob::find()
1110 .select_only()
1111 .column(streaming_job::Column::JobId)
1112 .filter(
1113 streaming_job::Column::JobStatus
1114 .eq(JobStatus::Creating)
1115 .and(
1116 streaming_job::Column::JobId
1117 .is_in(index_objs.iter().map(|(index, _)| index.index_id)),
1118 ),
1119 )
1120 .into_tuple::<IndexId>()
1121 .all(&self.db)
1122 .await?
1123 .into_iter()
1124 .collect();
1125
1126 Ok(index_objs
1127 .into_iter()
1128 .map(|(index, obj)| {
1129 let is_creating = creating_indexes.contains(&index.index_id);
1130 let mut pb_index: PbIndex = ObjectModel(index, obj.unwrap()).into();
1131 pb_index.stream_job_status = if is_creating {
1132 PbStreamJobStatus::Creating.into()
1133 } else {
1134 PbStreamJobStatus::Created.into()
1135 };
1136 pb_index
1137 })
1138 .collect())
1139 }
1140
1141 async fn list_connections(&self) -> MetaResult<Vec<PbConnection>> {
1142 let conn_objs = Connection::find()
1143 .find_also_related(Object)
1144 .all(&self.db)
1145 .await?;
1146
1147 Ok(conn_objs
1148 .into_iter()
1149 .map(|(conn, obj)| ObjectModel(conn, obj.unwrap()).into())
1150 .collect())
1151 }
1152
1153 pub async fn list_secrets(&self) -> MetaResult<Vec<PbSecret>> {
1154 let secret_objs = Secret::find()
1155 .find_also_related(Object)
1156 .all(&self.db)
1157 .await?;
1158 Ok(secret_objs
1159 .into_iter()
1160 .map(|(secret, obj)| ObjectModel(secret, obj.unwrap()).into())
1161 .collect())
1162 }
1163
1164 async fn list_functions(&self) -> MetaResult<Vec<PbFunction>> {
1165 let func_objs = Function::find()
1166 .find_also_related(Object)
1167 .all(&self.db)
1168 .await?;
1169
1170 Ok(func_objs
1171 .into_iter()
1172 .map(|(func, obj)| ObjectModel(func, obj.unwrap()).into())
1173 .collect())
1174 }
1175
1176 pub(crate) fn register_finish_notifier(
1177 &mut self,
1178 database_id: DatabaseId,
1179 id: JobId,
1180 sender: Sender<Result<NotificationVersion, String>>,
1181 ) {
1182 self.creating_table_finish_notifier
1183 .entry(database_id)
1184 .or_default()
1185 .entry(id)
1186 .or_default()
1187 .push(sender);
1188 }
1189
1190 pub(crate) async fn streaming_job_is_finished(&mut self, id: JobId) -> MetaResult<bool> {
1191 let status = StreamingJob::find()
1192 .select_only()
1193 .column(streaming_job::Column::JobStatus)
1194 .filter(streaming_job::Column::JobId.eq(id))
1195 .into_tuple::<JobStatus>()
1196 .one(&self.db)
1197 .await?;
1198
1199 status
1200 .map(|status| status == JobStatus::Created)
1201 .ok_or_else(|| {
1202 MetaError::catalog_id_not_found("streaming job", "may have been cancelled/dropped")
1203 })
1204 }
1205
1206 pub(crate) fn notify_finish_failed(&mut self, database_id: Option<DatabaseId>, err: String) {
1207 if let Some(database_id) = database_id {
1208 if let Some(creating_tables) = self.creating_table_finish_notifier.remove(&database_id)
1209 {
1210 for tx in creating_tables.into_values().flatten() {
1211 let _ = tx.send(Err(err.clone()));
1212 }
1213 }
1214 } else {
1215 for tx in take(&mut self.creating_table_finish_notifier)
1216 .into_values()
1217 .flatten()
1218 .flat_map(|(_, txs)| txs.into_iter())
1219 {
1220 let _ = tx.send(Err(err.clone()));
1221 }
1222 }
1223 }
1224
1225 pub async fn list_time_travel_table_ids(&self) -> MetaResult<Vec<TableId>> {
1226 let table_ids: Vec<TableId> = Table::find()
1227 .select_only()
1228 .filter(table::Column::TableType.is_in(vec![
1229 TableType::Table,
1230 TableType::MaterializedView,
1231 TableType::Index,
1232 ]))
1233 .column(table::Column::TableId)
1234 .into_tuple()
1235 .all(&self.db)
1236 .await?;
1237 Ok(table_ids)
1238 }
1239
1240 pub(crate) fn complete_dropped_tables(
1243 &mut self,
1244 table_ids: impl IntoIterator<Item = TableId>,
1245 ) -> Vec<PbTable> {
1246 table_ids
1247 .into_iter()
1248 .filter_map(|table_id| {
1249 self.dropped_tables.remove(&table_id).map_or_else(
1250 || {
1251 tracing::warn!(%table_id, "table not found");
1252 None
1253 },
1254 Some,
1255 )
1256 })
1257 .collect()
1258 }
1259}