1mod alter_op;
16mod create_op;
17mod drop_op;
18mod get_op;
19mod list_op;
20mod test;
21mod util;
22
23use std::collections::{BTreeSet, HashMap, HashSet};
24use std::iter;
25use std::mem::take;
26use std::sync::Arc;
27
28use anyhow::{Context, anyhow};
29use itertools::Itertools;
30use risingwave_common::catalog::{
31 DEFAULT_SCHEMA_NAME, FragmentTypeFlag, SYSTEM_SCHEMAS, TableOption,
32};
33use risingwave_common::current_cluster_version;
34use risingwave_common::secret::LocalSecretManager;
35use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont_mut;
36use risingwave_connector::source::UPSTREAM_SOURCE_KEY;
37use risingwave_connector::source::cdc::build_cdc_table_id;
38use risingwave_meta_model::object::ObjectType;
39use risingwave_meta_model::prelude::*;
40use risingwave_meta_model::table::{RefreshState, TableType};
41use risingwave_meta_model::{
42 ActorId, ColumnCatalogArray, ConnectionId, CreateType, DatabaseId, FragmentId, I32Array,
43 IndexId, JobStatus, ObjectId, Property, SchemaId, SecretId, SinkFormatDesc, SinkId, SourceId,
44 StreamNode, StreamSourceInfo, StreamingParallelism, SubscriptionId, TableId, UserId, ViewId,
45 connection, database, fragment, function, index, object, object_dependency, schema, secret,
46 sink, source, streaming_job, subscription, table, user_privilege, view,
47};
48use risingwave_pb::catalog::connection::Info as ConnectionInfo;
49use risingwave_pb::catalog::subscription::SubscriptionState;
50use risingwave_pb::catalog::table::PbTableType;
51use risingwave_pb::catalog::{
52 PbComment, PbConnection, PbDatabase, PbFunction, PbIndex, PbSchema, PbSecret, PbSink, PbSource,
53 PbStreamJobStatus, PbSubscription, PbTable, PbView,
54};
55use risingwave_pb::meta::cancel_creating_jobs_request::PbCreatingJobInfo;
56use risingwave_pb::meta::list_object_dependencies_response::PbObjectDependencies;
57use risingwave_pb::meta::object::PbObjectInfo;
58use risingwave_pb::meta::subscribe_response::{
59 Info as NotificationInfo, Info, Operation as NotificationOperation, Operation,
60};
61use risingwave_pb::meta::{PbObject, PbObjectGroup};
62use risingwave_pb::stream_plan::stream_node::NodeBody;
63use risingwave_pb::telemetry::PbTelemetryEventStage;
64use risingwave_pb::user::PbUserInfo;
65use sea_orm::ActiveValue::Set;
66use sea_orm::sea_query::{Expr, Query, SimpleExpr};
67use sea_orm::{
68 ActiveModelTrait, ColumnTrait, DatabaseConnection, DatabaseTransaction, EntityTrait,
69 IntoActiveModel, JoinType, PaginatorTrait, QueryFilter, QuerySelect, RelationTrait,
70 SelectColumns, TransactionTrait, Value,
71};
72use tokio::sync::oneshot::Sender;
73use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
74use tracing::info;
75
76use super::utils::{
77 check_subscription_name_duplicate, get_internal_tables_by_id, rename_relation,
78 rename_relation_refer,
79};
80use crate::controller::ObjectModel;
81use crate::controller::catalog::util::update_internal_tables;
82use crate::controller::utils::*;
83use crate::manager::{
84 IGNORED_NOTIFICATION_VERSION, MetaSrvEnv, NotificationVersion,
85 get_referred_connection_ids_from_source, get_referred_secret_ids_from_source,
86};
87use crate::rpc::ddl_controller::DropMode;
88use crate::telemetry::{MetaTelemetryJobDesc, report_event};
89use crate::{MetaError, MetaResult};
90
91pub type Catalog = (
92 Vec<PbDatabase>,
93 Vec<PbSchema>,
94 Vec<PbTable>,
95 Vec<PbSource>,
96 Vec<PbSink>,
97 Vec<PbSubscription>,
98 Vec<PbIndex>,
99 Vec<PbView>,
100 Vec<PbFunction>,
101 Vec<PbConnection>,
102 Vec<PbSecret>,
103);
104
105pub type CatalogControllerRef = Arc<CatalogController>;
106
107pub struct CatalogController {
109 pub(crate) env: MetaSrvEnv,
110 pub(crate) inner: RwLock<CatalogControllerInner>,
111}
112
113#[derive(Clone, Default, Debug)]
114pub struct DropTableConnectorContext {
115 pub(crate) to_change_streaming_job_id: ObjectId,
117 pub(crate) to_remove_state_table_id: TableId,
118 pub(crate) to_remove_source_id: SourceId,
119}
120
121#[derive(Clone, Default, Debug)]
122pub struct ReleaseContext {
123 pub(crate) database_id: DatabaseId,
124 pub(crate) removed_streaming_job_ids: Vec<ObjectId>,
125 pub(crate) removed_state_table_ids: Vec<TableId>,
127
128 pub(crate) removed_secret_ids: Vec<SecretId>,
130 pub(crate) removed_source_ids: Vec<SourceId>,
132 pub(crate) removed_source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
135
136 pub(crate) removed_actors: HashSet<ActorId>,
137 pub(crate) removed_fragments: HashSet<FragmentId>,
138
139 pub(crate) removed_sink_fragment_by_targets: HashMap<FragmentId, Vec<FragmentId>>,
141}
142
143impl CatalogController {
144 pub async fn new(env: MetaSrvEnv) -> MetaResult<Self> {
145 let meta_store = env.meta_store();
146 let catalog_controller = Self {
147 env,
148 inner: RwLock::new(CatalogControllerInner {
149 db: meta_store.conn,
150 creating_table_finish_notifier: HashMap::new(),
151 dropped_tables: HashMap::new(),
152 }),
153 };
154
155 catalog_controller.init().await?;
156 Ok(catalog_controller)
157 }
158
159 pub async fn get_inner_read_guard(&self) -> RwLockReadGuard<'_, CatalogControllerInner> {
162 self.inner.read().await
163 }
164
165 pub async fn get_inner_write_guard(&self) -> RwLockWriteGuard<'_, CatalogControllerInner> {
166 self.inner.write().await
167 }
168}
169
170pub struct CatalogControllerInner {
171 pub(crate) db: DatabaseConnection,
172 #[expect(clippy::type_complexity)]
177 pub creating_table_finish_notifier:
178 HashMap<DatabaseId, HashMap<ObjectId, Vec<Sender<Result<NotificationVersion, String>>>>>,
179 pub dropped_tables: HashMap<TableId, PbTable>,
181}
182
183impl CatalogController {
184 pub(crate) async fn notify_frontend(
185 &self,
186 operation: NotificationOperation,
187 info: NotificationInfo,
188 ) -> NotificationVersion {
189 self.env
190 .notification_manager()
191 .notify_frontend(operation, info)
192 .await
193 }
194
195 pub(crate) async fn notify_frontend_relation_info(
196 &self,
197 operation: NotificationOperation,
198 relation_info: PbObjectInfo,
199 ) -> NotificationVersion {
200 self.env
201 .notification_manager()
202 .notify_frontend_object_info(operation, relation_info)
203 .await
204 }
205
206 pub(crate) async fn current_notification_version(&self) -> NotificationVersion {
207 self.env.notification_manager().current_version().await
208 }
209}
210
211impl CatalogController {
212 pub async fn finish_create_subscription_catalog(&self, subscription_id: u32) -> MetaResult<()> {
213 let inner = self.inner.write().await;
214 let txn = inner.db.begin().await?;
215 let job_id = subscription_id as i32;
216
217 let res = Object::update_many()
219 .col_expr(object::Column::CreatedAt, Expr::current_timestamp().into())
220 .col_expr(
221 object::Column::CreatedAtClusterVersion,
222 current_cluster_version().into(),
223 )
224 .filter(object::Column::Oid.eq(job_id))
225 .exec(&txn)
226 .await?;
227 if res.rows_affected == 0 {
228 return Err(MetaError::catalog_id_not_found("subscription", job_id));
229 }
230
231 let job = subscription::ActiveModel {
233 subscription_id: Set(job_id),
234 subscription_state: Set(SubscriptionState::Created.into()),
235 ..Default::default()
236 };
237 job.update(&txn).await?;
238
239 let _ = grant_default_privileges_automatically(&txn, job_id).await?;
240
241 txn.commit().await?;
242
243 Ok(())
244 }
245
246 pub async fn notify_create_subscription(
247 &self,
248 subscription_id: u32,
249 ) -> MetaResult<NotificationVersion> {
250 let inner = self.inner.read().await;
251 let job_id = subscription_id as i32;
252 let (subscription, obj) = Subscription::find_by_id(job_id)
253 .find_also_related(Object)
254 .filter(subscription::Column::SubscriptionState.eq(SubscriptionState::Created as i32))
255 .one(&inner.db)
256 .await?
257 .ok_or_else(|| MetaError::catalog_id_not_found("subscription", job_id))?;
258
259 let mut version = self
260 .notify_frontend(
261 NotificationOperation::Add,
262 NotificationInfo::ObjectGroup(PbObjectGroup {
263 objects: vec![PbObject {
264 object_info: PbObjectInfo::Subscription(
265 ObjectModel(subscription, obj.unwrap()).into(),
266 )
267 .into(),
268 }],
269 }),
270 )
271 .await;
272
273 let updated_user_ids: Vec<UserId> = UserPrivilege::find()
275 .select_only()
276 .distinct()
277 .column(user_privilege::Column::UserId)
278 .filter(user_privilege::Column::Oid.eq(subscription_id as ObjectId))
279 .into_tuple()
280 .all(&inner.db)
281 .await?;
282
283 if !updated_user_ids.is_empty() {
284 let updated_user_infos = list_user_info_by_ids(updated_user_ids, &inner.db).await?;
285 version = self.notify_users_update(updated_user_infos).await;
286 }
287
288 Ok(version)
289 }
290
291 pub async fn get_connector_usage(&self) -> MetaResult<jsonbb::Value> {
293 let inner = self.inner.read().await;
313 let source_props_and_info: Vec<(i32, Property, Option<StreamSourceInfo>)> = Source::find()
314 .select_only()
315 .column(source::Column::SourceId)
316 .column(source::Column::WithProperties)
317 .column(source::Column::SourceInfo)
318 .into_tuple()
319 .all(&inner.db)
320 .await?;
321 let sink_props_and_info: Vec<(i32, Property, Option<SinkFormatDesc>)> = Sink::find()
322 .select_only()
323 .column(sink::Column::SinkId)
324 .column(sink::Column::Properties)
325 .column(sink::Column::SinkFormatDesc)
326 .into_tuple()
327 .all(&inner.db)
328 .await?;
329 drop(inner);
330
331 let get_connector_from_property = |property: &Property| -> String {
332 property
333 .0
334 .get(UPSTREAM_SOURCE_KEY)
335 .cloned()
336 .unwrap_or_default()
337 };
338
339 let source_report: Vec<jsonbb::Value> = source_props_and_info
340 .iter()
341 .map(|(oid, property, info)| {
342 let connector_name = get_connector_from_property(property);
343 let mut format = None;
344 let mut encode = None;
345 if let Some(info) = info {
346 let pb_info = info.to_protobuf();
347 format = Some(pb_info.format().as_str_name());
348 encode = Some(pb_info.row_encode().as_str_name());
349 }
350 jsonbb::json!({
351 oid.to_string(): {
352 "connector": connector_name,
353 "format": format,
354 "encode": encode,
355 },
356 })
357 })
358 .collect_vec();
359
360 let sink_report: Vec<jsonbb::Value> = sink_props_and_info
361 .iter()
362 .map(|(oid, property, info)| {
363 let connector_name = get_connector_from_property(property);
364 let mut format = None;
365 let mut encode = None;
366 if let Some(info) = info {
367 let pb_info = info.to_protobuf();
368 format = Some(pb_info.format().as_str_name());
369 encode = Some(pb_info.encode().as_str_name());
370 }
371 jsonbb::json!({
372 oid.to_string(): {
373 "connector": connector_name,
374 "format": format,
375 "encode": encode,
376 },
377 })
378 })
379 .collect_vec();
380
381 Ok(jsonbb::json!({
382 "source": source_report,
383 "sink": sink_report,
384 }))
385 }
386
387 pub async fn clean_dirty_subscription(
388 &self,
389 database_id: Option<DatabaseId>,
390 ) -> MetaResult<()> {
391 let inner = self.inner.write().await;
392 let txn = inner.db.begin().await?;
393 let filter_condition = object::Column::ObjType.eq(ObjectType::Subscription).and(
394 object::Column::Oid.not_in_subquery(
395 Query::select()
396 .column(subscription::Column::SubscriptionId)
397 .from(Subscription)
398 .and_where(
399 subscription::Column::SubscriptionState
400 .eq(SubscriptionState::Created as i32),
401 )
402 .take(),
403 ),
404 );
405
406 let filter_condition = if let Some(database_id) = database_id {
407 filter_condition.and(object::Column::DatabaseId.eq(database_id))
408 } else {
409 filter_condition
410 };
411 Object::delete_many()
412 .filter(filter_condition)
413 .exec(&txn)
414 .await?;
415 txn.commit().await?;
416 Ok(())
418 }
419
420 pub async fn clean_dirty_creating_jobs(
422 &self,
423 database_id: Option<DatabaseId>,
424 ) -> MetaResult<Vec<SourceId>> {
425 let inner = self.inner.write().await;
426 let txn = inner.db.begin().await?;
427
428 let filter_condition = streaming_job::Column::JobStatus.eq(JobStatus::Initial).or(
429 streaming_job::Column::JobStatus
430 .eq(JobStatus::Creating)
431 .and(streaming_job::Column::CreateType.eq(CreateType::Foreground)),
432 );
433
434 let filter_condition = if let Some(database_id) = database_id {
435 filter_condition.and(object::Column::DatabaseId.eq(database_id))
436 } else {
437 filter_condition
438 };
439
440 let dirty_job_objs: Vec<PartialObject> = streaming_job::Entity::find()
441 .select_only()
442 .column(streaming_job::Column::JobId)
443 .columns([
444 object::Column::Oid,
445 object::Column::ObjType,
446 object::Column::SchemaId,
447 object::Column::DatabaseId,
448 ])
449 .join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
450 .filter(filter_condition)
451 .into_partial_model()
452 .all(&txn)
453 .await?;
454
455 let updated_table_ids = Self::clean_dirty_sink_downstreams(&txn).await?;
456 let updated_table_objs = if !updated_table_ids.is_empty() {
457 Table::find()
458 .find_also_related(Object)
459 .filter(table::Column::TableId.is_in(updated_table_ids))
460 .all(&txn)
461 .await?
462 } else {
463 vec![]
464 };
465
466 if dirty_job_objs.is_empty() {
467 if !updated_table_objs.is_empty() {
468 txn.commit().await?;
469
470 self.notify_frontend(
472 NotificationOperation::Update,
473 NotificationInfo::ObjectGroup(PbObjectGroup {
474 objects: updated_table_objs
475 .into_iter()
476 .map(|(t, obj)| PbObject {
477 object_info: PbObjectInfo::Table(
478 ObjectModel(t, obj.unwrap()).into(),
479 )
480 .into(),
481 })
482 .collect(),
483 }),
484 )
485 .await;
486 }
487
488 return Ok(vec![]);
489 }
490
491 self.log_cleaned_dirty_jobs(&dirty_job_objs, &txn).await?;
492
493 let dirty_job_ids = dirty_job_objs.iter().map(|obj| obj.oid).collect::<Vec<_>>();
494
495 let all_dirty_table_ids = dirty_job_objs
498 .iter()
499 .filter(|obj| obj.obj_type == ObjectType::Table)
500 .map(|obj| obj.oid)
501 .collect_vec();
502 let dirty_table_type_map: HashMap<ObjectId, TableType> = Table::find()
503 .select_only()
504 .column(table::Column::TableId)
505 .column(table::Column::TableType)
506 .filter(table::Column::TableId.is_in(all_dirty_table_ids))
507 .into_tuple::<(ObjectId, TableType)>()
508 .all(&txn)
509 .await?
510 .into_iter()
511 .collect();
512
513 let dirty_background_jobs: HashSet<ObjectId> = streaming_job::Entity::find()
514 .select_only()
515 .column(streaming_job::Column::JobId)
516 .filter(
517 streaming_job::Column::JobId
518 .is_in(dirty_job_ids.clone())
519 .and(streaming_job::Column::CreateType.eq(CreateType::Background)),
520 )
521 .into_tuple()
522 .all(&txn)
523 .await?
524 .into_iter()
525 .collect();
526
527 let to_notify_objs = dirty_job_objs
529 .into_iter()
530 .filter(|obj| {
531 matches!(
532 dirty_table_type_map.get(&obj.oid),
533 Some(TableType::MaterializedView)
534 ) || dirty_background_jobs.contains(&obj.oid)
535 })
536 .collect_vec();
537
538 let dirty_associated_source_ids: Vec<SourceId> = Table::find()
541 .select_only()
542 .column(table::Column::OptionalAssociatedSourceId)
543 .filter(
544 table::Column::TableId
545 .is_in(dirty_job_ids.clone())
546 .and(table::Column::OptionalAssociatedSourceId.is_not_null()),
547 )
548 .into_tuple()
549 .all(&txn)
550 .await?;
551
552 let dirty_state_table_ids: Vec<TableId> = Table::find()
553 .select_only()
554 .column(table::Column::TableId)
555 .filter(table::Column::BelongsToJobId.is_in(dirty_job_ids.clone()))
556 .into_tuple()
557 .all(&txn)
558 .await?;
559
560 let dirty_internal_table_objs = Object::find()
561 .select_only()
562 .columns([
563 object::Column::Oid,
564 object::Column::ObjType,
565 object::Column::SchemaId,
566 object::Column::DatabaseId,
567 ])
568 .join(JoinType::InnerJoin, object::Relation::Table.def())
569 .filter(table::Column::BelongsToJobId.is_in(to_notify_objs.iter().map(|obj| obj.oid)))
570 .into_partial_model()
571 .all(&txn)
572 .await?;
573
574 let to_delete_objs: HashSet<ObjectId> = dirty_job_ids
575 .clone()
576 .into_iter()
577 .chain(dirty_state_table_ids.into_iter())
578 .chain(dirty_associated_source_ids.clone().into_iter())
579 .collect();
580
581 let res = Object::delete_many()
582 .filter(object::Column::Oid.is_in(to_delete_objs))
583 .exec(&txn)
584 .await?;
585 assert!(res.rows_affected > 0);
586
587 txn.commit().await?;
588
589 let object_group = build_object_group_for_delete(
590 to_notify_objs
591 .into_iter()
592 .chain(dirty_internal_table_objs.into_iter())
593 .collect_vec(),
594 );
595
596 let _version = self
597 .notify_frontend(NotificationOperation::Delete, object_group)
598 .await;
599
600 if !updated_table_objs.is_empty() {
602 self.notify_frontend(
603 NotificationOperation::Update,
604 NotificationInfo::ObjectGroup(PbObjectGroup {
605 objects: updated_table_objs
606 .into_iter()
607 .map(|(t, obj)| PbObject {
608 object_info: PbObjectInfo::Table(ObjectModel(t, obj.unwrap()).into())
609 .into(),
610 })
611 .collect(),
612 }),
613 )
614 .await;
615 }
616
617 Ok(dirty_associated_source_ids)
618 }
619
620 pub async fn reset_refreshing_tables(&self, database_id: Option<DatabaseId>) -> MetaResult<()> {
622 let inner = self.inner.write().await;
623 let txn = inner.db.begin().await?;
624
625 let filter_condition = table::Column::RefreshState.eq(RefreshState::Refreshing);
629
630 let filter_condition = if let Some(database_id) = database_id {
631 filter_condition.and(object::Column::DatabaseId.eq(database_id))
632 } else {
633 filter_condition
634 };
635
636 let table_ids: Vec<TableId> = Table::find()
637 .find_also_related(Object)
638 .filter(filter_condition)
639 .all(&txn)
640 .await
641 .context("reset_refreshing_tables: finding table ids")?
642 .into_iter()
643 .map(|(t, _)| t.table_id)
644 .collect();
645
646 let res = Table::update_many()
647 .col_expr(table::Column::RefreshState, Expr::value(RefreshState::Idle))
648 .filter(table::Column::TableId.is_in(table_ids))
649 .exec(&txn)
650 .await
651 .context("reset_refreshing_tables: update refresh state")?;
652
653 txn.commit().await?;
654
655 tracing::debug!(
656 "reset refreshing tables: {} tables updated",
657 res.rows_affected
658 );
659
660 Ok(())
661 }
662
663 pub async fn comment_on(&self, comment: PbComment) -> MetaResult<NotificationVersion> {
664 let inner = self.inner.write().await;
665 let txn = inner.db.begin().await?;
666 ensure_object_id(ObjectType::Database, comment.database_id as _, &txn).await?;
667 ensure_object_id(ObjectType::Schema, comment.schema_id as _, &txn).await?;
668 let table_obj = Object::find_by_id(comment.table_id as ObjectId)
669 .one(&txn)
670 .await?
671 .ok_or_else(|| MetaError::catalog_id_not_found("table", comment.table_id))?;
672
673 let table = if let Some(col_idx) = comment.column_index {
674 let columns: ColumnCatalogArray = Table::find_by_id(comment.table_id as TableId)
675 .select_only()
676 .column(table::Column::Columns)
677 .into_tuple()
678 .one(&txn)
679 .await?
680 .ok_or_else(|| MetaError::catalog_id_not_found("table", comment.table_id))?;
681 let mut pb_columns = columns.to_protobuf();
682
683 let column = pb_columns
684 .get_mut(col_idx as usize)
685 .ok_or_else(|| MetaError::catalog_id_not_found("column", col_idx))?;
686 let column_desc = column.column_desc.as_mut().ok_or_else(|| {
687 anyhow!(
688 "column desc at index {} for table id {} not found",
689 col_idx,
690 comment.table_id
691 )
692 })?;
693 column_desc.description = comment.description;
694 table::ActiveModel {
695 table_id: Set(comment.table_id as _),
696 columns: Set(pb_columns.into()),
697 ..Default::default()
698 }
699 .update(&txn)
700 .await?
701 } else {
702 table::ActiveModel {
703 table_id: Set(comment.table_id as _),
704 description: Set(comment.description),
705 ..Default::default()
706 }
707 .update(&txn)
708 .await?
709 };
710 txn.commit().await?;
711
712 let version = self
713 .notify_frontend_relation_info(
714 NotificationOperation::Update,
715 PbObjectInfo::Table(ObjectModel(table, table_obj).into()),
716 )
717 .await;
718
719 Ok(version)
720 }
721
722 pub async fn complete_dropped_tables(
723 &self,
724 table_ids: impl Iterator<Item = TableId>,
725 ) -> Vec<PbTable> {
726 let mut inner = self.inner.write().await;
727 inner.complete_dropped_tables(table_ids)
728 }
729}
730
731pub struct CatalogStats {
733 pub table_num: u64,
734 pub mview_num: u64,
735 pub index_num: u64,
736 pub source_num: u64,
737 pub sink_num: u64,
738 pub function_num: u64,
739 pub streaming_job_num: u64,
740 pub actor_num: u64,
741}
742
743impl CatalogControllerInner {
744 pub async fn snapshot(&self) -> MetaResult<(Catalog, Vec<PbUserInfo>)> {
745 let databases = self.list_databases().await?;
746 let schemas = self.list_schemas().await?;
747 let tables = self.list_tables().await?;
748 let sources = self.list_sources().await?;
749 let sinks = self.list_sinks().await?;
750 let subscriptions = self.list_subscriptions().await?;
751 let indexes = self.list_indexes().await?;
752 let views = self.list_views().await?;
753 let functions = self.list_functions().await?;
754 let connections = self.list_connections().await?;
755 let secrets = self.list_secrets().await?;
756
757 let users = self.list_users().await?;
758
759 Ok((
760 (
761 databases,
762 schemas,
763 tables,
764 sources,
765 sinks,
766 subscriptions,
767 indexes,
768 views,
769 functions,
770 connections,
771 secrets,
772 ),
773 users,
774 ))
775 }
776
777 pub async fn stats(&self) -> MetaResult<CatalogStats> {
778 let mut table_num_map: HashMap<_, _> = Table::find()
779 .select_only()
780 .column(table::Column::TableType)
781 .column_as(table::Column::TableId.count(), "num")
782 .group_by(table::Column::TableType)
783 .having(table::Column::TableType.ne(TableType::Internal))
784 .into_tuple::<(TableType, i64)>()
785 .all(&self.db)
786 .await?
787 .into_iter()
788 .map(|(table_type, num)| (table_type, num as u64))
789 .collect();
790
791 let source_num = Source::find().count(&self.db).await?;
792 let sink_num = Sink::find().count(&self.db).await?;
793 let function_num = Function::find().count(&self.db).await?;
794 let streaming_job_num = StreamingJob::find().count(&self.db).await?;
795 let actor_num = Actor::find().count(&self.db).await?;
796
797 Ok(CatalogStats {
798 table_num: table_num_map.remove(&TableType::Table).unwrap_or(0),
799 mview_num: table_num_map
800 .remove(&TableType::MaterializedView)
801 .unwrap_or(0),
802 index_num: table_num_map.remove(&TableType::Index).unwrap_or(0),
803 source_num,
804 sink_num,
805 function_num,
806 streaming_job_num,
807 actor_num,
808 })
809 }
810
811 async fn list_databases(&self) -> MetaResult<Vec<PbDatabase>> {
812 let db_objs = Database::find()
813 .find_also_related(Object)
814 .all(&self.db)
815 .await?;
816 Ok(db_objs
817 .into_iter()
818 .map(|(db, obj)| ObjectModel(db, obj.unwrap()).into())
819 .collect())
820 }
821
822 async fn list_schemas(&self) -> MetaResult<Vec<PbSchema>> {
823 let schema_objs = Schema::find()
824 .find_also_related(Object)
825 .all(&self.db)
826 .await?;
827
828 Ok(schema_objs
829 .into_iter()
830 .map(|(schema, obj)| ObjectModel(schema, obj.unwrap()).into())
831 .collect())
832 }
833
834 async fn list_users(&self) -> MetaResult<Vec<PbUserInfo>> {
835 let mut user_infos: Vec<PbUserInfo> = User::find()
836 .all(&self.db)
837 .await?
838 .into_iter()
839 .map(Into::into)
840 .collect();
841
842 for user_info in &mut user_infos {
843 user_info.grant_privileges = get_user_privilege(user_info.id as _, &self.db).await?;
844 }
845 Ok(user_infos)
846 }
847
848 pub async fn list_all_state_tables(&self) -> MetaResult<Vec<PbTable>> {
850 let table_objs = Table::find()
851 .find_also_related(Object)
852 .all(&self.db)
853 .await?;
854
855 Ok(table_objs
856 .into_iter()
857 .map(|(table, obj)| ObjectModel(table, obj.unwrap()).into())
858 .collect())
859 }
860
861 async fn list_tables(&self) -> MetaResult<Vec<PbTable>> {
863 let table_objs = Table::find()
864 .find_also_related(Object)
865 .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
866 .filter(
867 streaming_job::Column::JobStatus.eq(JobStatus::Created).or(
868 table::Column::TableType
869 .eq(TableType::MaterializedView)
870 .or(streaming_job::Column::CreateType.eq(CreateType::Background)),
871 ),
872 )
873 .all(&self.db)
874 .await?;
875
876 let job_statuses: HashMap<ObjectId, JobStatus> = StreamingJob::find()
877 .select_only()
878 .column(streaming_job::Column::JobId)
879 .column(streaming_job::Column::JobStatus)
880 .filter(
881 streaming_job::Column::JobStatus
882 .eq(JobStatus::Created)
883 .or(streaming_job::Column::CreateType.eq(CreateType::Background)),
884 )
885 .into_tuple::<(ObjectId, JobStatus)>()
886 .all(&self.db)
887 .await?
888 .into_iter()
889 .collect();
890
891 let job_ids: HashSet<ObjectId> = table_objs
892 .iter()
893 .map(|(t, _)| t.table_id)
894 .chain(job_statuses.keys().cloned())
895 .collect();
896
897 let internal_table_objs = Table::find()
898 .find_also_related(Object)
899 .filter(
900 table::Column::TableType
901 .eq(TableType::Internal)
902 .and(table::Column::BelongsToJobId.is_in(job_ids)),
903 )
904 .all(&self.db)
905 .await?;
906
907 Ok(table_objs
908 .into_iter()
909 .chain(internal_table_objs.into_iter())
910 .map(|(table, obj)| {
911 let status: PbStreamJobStatus = if table.table_type == TableType::Internal {
914 (*job_statuses
915 .get(&table.belongs_to_job_id.unwrap())
916 .unwrap_or(&JobStatus::Creating))
917 .into()
918 } else {
919 (*job_statuses
920 .get(&table.table_id)
921 .unwrap_or(&JobStatus::Creating))
922 .into()
923 };
924 let mut pb_table: PbTable = ObjectModel(table, obj.unwrap()).into();
925 pb_table.stream_job_status = status.into();
926 pb_table
927 })
928 .collect())
929 }
930
931 async fn list_sources(&self) -> MetaResult<Vec<PbSource>> {
933 let mut source_objs = Source::find()
934 .find_also_related(Object)
935 .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
936 .filter(
937 streaming_job::Column::JobStatus
938 .is_null()
939 .or(streaming_job::Column::JobStatus.eq(JobStatus::Created)),
940 )
941 .all(&self.db)
942 .await?;
943
944 let created_table_ids: HashSet<TableId> = Table::find()
946 .select_only()
947 .column(table::Column::TableId)
948 .join(JoinType::InnerJoin, table::Relation::Object1.def())
949 .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
950 .filter(
951 table::Column::OptionalAssociatedSourceId
952 .is_not_null()
953 .and(streaming_job::Column::JobStatus.eq(JobStatus::Created)),
954 )
955 .into_tuple()
956 .all(&self.db)
957 .await?
958 .into_iter()
959 .collect();
960 source_objs.retain_mut(|(source, _)| {
961 source.optional_associated_table_id.is_none()
962 || created_table_ids.contains(&source.optional_associated_table_id.unwrap())
963 });
964
965 Ok(source_objs
966 .into_iter()
967 .map(|(source, obj)| ObjectModel(source, obj.unwrap()).into())
968 .collect())
969 }
970
971 async fn list_sinks(&self) -> MetaResult<Vec<PbSink>> {
973 let sink_objs = Sink::find()
974 .find_also_related(Object)
975 .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
976 .filter(
977 streaming_job::Column::JobStatus
978 .eq(JobStatus::Created)
979 .or(streaming_job::Column::CreateType.eq(CreateType::Background)),
980 )
981 .all(&self.db)
982 .await?;
983
984 let creating_sinks: HashSet<_> = StreamingJob::find()
985 .select_only()
986 .column(streaming_job::Column::JobId)
987 .filter(
988 streaming_job::Column::JobStatus
989 .eq(JobStatus::Creating)
990 .and(
991 streaming_job::Column::JobId
992 .is_in(sink_objs.iter().map(|(sink, _)| sink.sink_id)),
993 ),
994 )
995 .into_tuple::<SinkId>()
996 .all(&self.db)
997 .await?
998 .into_iter()
999 .collect();
1000
1001 Ok(sink_objs
1002 .into_iter()
1003 .map(|(sink, obj)| {
1004 let is_creating = creating_sinks.contains(&sink.sink_id);
1005 let mut pb_sink: PbSink = ObjectModel(sink, obj.unwrap()).into();
1006 pb_sink.stream_job_status = if is_creating {
1007 PbStreamJobStatus::Creating.into()
1008 } else {
1009 PbStreamJobStatus::Created.into()
1010 };
1011 pb_sink
1012 })
1013 .collect())
1014 }
1015
1016 async fn list_subscriptions(&self) -> MetaResult<Vec<PbSubscription>> {
1018 let subscription_objs = Subscription::find()
1019 .find_also_related(Object)
1020 .filter(subscription::Column::SubscriptionState.eq(SubscriptionState::Created as i32))
1021 .all(&self.db)
1022 .await?;
1023
1024 Ok(subscription_objs
1025 .into_iter()
1026 .map(|(subscription, obj)| ObjectModel(subscription, obj.unwrap()).into())
1027 .collect())
1028 }
1029
1030 async fn list_views(&self) -> MetaResult<Vec<PbView>> {
1031 let view_objs = View::find().find_also_related(Object).all(&self.db).await?;
1032
1033 Ok(view_objs
1034 .into_iter()
1035 .map(|(view, obj)| ObjectModel(view, obj.unwrap()).into())
1036 .collect())
1037 }
1038
1039 async fn list_indexes(&self) -> MetaResult<Vec<PbIndex>> {
1041 let index_objs = Index::find()
1042 .find_also_related(Object)
1043 .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
1044 .filter(
1045 streaming_job::Column::JobStatus
1046 .eq(JobStatus::Created)
1047 .or(streaming_job::Column::CreateType.eq(CreateType::Background)),
1048 )
1049 .all(&self.db)
1050 .await?;
1051
1052 let creating_indexes: HashSet<_> = StreamingJob::find()
1053 .select_only()
1054 .column(streaming_job::Column::JobId)
1055 .filter(
1056 streaming_job::Column::JobStatus
1057 .eq(JobStatus::Creating)
1058 .and(
1059 streaming_job::Column::JobId
1060 .is_in(index_objs.iter().map(|(index, _)| index.index_id)),
1061 ),
1062 )
1063 .into_tuple::<IndexId>()
1064 .all(&self.db)
1065 .await?
1066 .into_iter()
1067 .collect();
1068
1069 Ok(index_objs
1070 .into_iter()
1071 .map(|(index, obj)| {
1072 let is_creating = creating_indexes.contains(&index.index_id);
1073 let mut pb_index: PbIndex = ObjectModel(index, obj.unwrap()).into();
1074 pb_index.stream_job_status = if is_creating {
1075 PbStreamJobStatus::Creating.into()
1076 } else {
1077 PbStreamJobStatus::Created.into()
1078 };
1079 pb_index
1080 })
1081 .collect())
1082 }
1083
1084 async fn list_connections(&self) -> MetaResult<Vec<PbConnection>> {
1085 let conn_objs = Connection::find()
1086 .find_also_related(Object)
1087 .all(&self.db)
1088 .await?;
1089
1090 Ok(conn_objs
1091 .into_iter()
1092 .map(|(conn, obj)| ObjectModel(conn, obj.unwrap()).into())
1093 .collect())
1094 }
1095
1096 pub async fn list_secrets(&self) -> MetaResult<Vec<PbSecret>> {
1097 let secret_objs = Secret::find()
1098 .find_also_related(Object)
1099 .all(&self.db)
1100 .await?;
1101 Ok(secret_objs
1102 .into_iter()
1103 .map(|(secret, obj)| ObjectModel(secret, obj.unwrap()).into())
1104 .collect())
1105 }
1106
1107 async fn list_functions(&self) -> MetaResult<Vec<PbFunction>> {
1108 let func_objs = Function::find()
1109 .find_also_related(Object)
1110 .all(&self.db)
1111 .await?;
1112
1113 Ok(func_objs
1114 .into_iter()
1115 .map(|(func, obj)| ObjectModel(func, obj.unwrap()).into())
1116 .collect())
1117 }
1118
1119 pub(crate) fn register_finish_notifier(
1120 &mut self,
1121 database_id: DatabaseId,
1122 id: ObjectId,
1123 sender: Sender<Result<NotificationVersion, String>>,
1124 ) {
1125 self.creating_table_finish_notifier
1126 .entry(database_id)
1127 .or_default()
1128 .entry(id)
1129 .or_default()
1130 .push(sender);
1131 }
1132
1133 pub(crate) async fn streaming_job_is_finished(&mut self, id: i32) -> MetaResult<bool> {
1134 let status = StreamingJob::find()
1135 .select_only()
1136 .column(streaming_job::Column::JobStatus)
1137 .filter(streaming_job::Column::JobId.eq(id))
1138 .into_tuple::<JobStatus>()
1139 .one(&self.db)
1140 .await?;
1141
1142 status
1143 .map(|status| status == JobStatus::Created)
1144 .ok_or_else(|| {
1145 MetaError::catalog_id_not_found("streaming job", "may have been cancelled/dropped")
1146 })
1147 }
1148
1149 pub(crate) fn notify_finish_failed(&mut self, database_id: Option<DatabaseId>, err: String) {
1150 if let Some(database_id) = database_id {
1151 if let Some(creating_tables) = self.creating_table_finish_notifier.remove(&database_id)
1152 {
1153 for tx in creating_tables.into_values().flatten() {
1154 let _ = tx.send(Err(err.clone()));
1155 }
1156 }
1157 } else {
1158 for tx in take(&mut self.creating_table_finish_notifier)
1159 .into_values()
1160 .flatten()
1161 .flat_map(|(_, txs)| txs.into_iter())
1162 {
1163 let _ = tx.send(Err(err.clone()));
1164 }
1165 }
1166 }
1167
1168 pub(crate) fn notify_cancelled(&mut self, database_id: DatabaseId, id: ObjectId) {
1169 if let Some(creating_tables) = self.creating_table_finish_notifier.get_mut(&database_id)
1170 && let Some(tx_list) = creating_tables.remove(&id)
1171 {
1172 for tx in tx_list {
1173 let _ = tx.send(Err("Cancelled".to_owned()));
1174 }
1175 }
1176 }
1177
1178 pub async fn list_time_travel_table_ids(&self) -> MetaResult<Vec<TableId>> {
1179 let table_ids: Vec<TableId> = Table::find()
1180 .select_only()
1181 .filter(table::Column::TableType.is_in(vec![
1182 TableType::Table,
1183 TableType::MaterializedView,
1184 TableType::Index,
1185 ]))
1186 .column(table::Column::TableId)
1187 .into_tuple()
1188 .all(&self.db)
1189 .await?;
1190 Ok(table_ids)
1191 }
1192
1193 pub(crate) fn complete_dropped_tables(
1196 &mut self,
1197 table_ids: impl Iterator<Item = TableId>,
1198 ) -> Vec<PbTable> {
1199 table_ids
1200 .filter_map(|table_id| {
1201 self.dropped_tables.remove(&table_id).map_or_else(
1202 || {
1203 tracing::warn!(table_id, "table not found");
1204 None
1205 },
1206 Some,
1207 )
1208 })
1209 .collect()
1210 }
1211}