1mod alter_op;
16mod create_op;
17mod drop_op;
18mod get_op;
19mod list_op;
20mod test;
21mod util;
22
23use std::collections::{BTreeSet, HashMap, HashSet};
24use std::iter;
25use std::mem::take;
26use std::sync::Arc;
27
28use anyhow::{Context, anyhow};
29use itertools::Itertools;
30use risingwave_common::catalog::{
31 DEFAULT_SCHEMA_NAME, FragmentTypeFlag, SYSTEM_SCHEMAS, TableOption,
32};
33use risingwave_common::current_cluster_version;
34use risingwave_common::id::JobId;
35use risingwave_common::secret::LocalSecretManager;
36use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont_mut;
37use risingwave_connector::source::UPSTREAM_SOURCE_KEY;
38use risingwave_connector::source::cdc::build_cdc_table_id;
39use risingwave_meta_model::object::ObjectType;
40use risingwave_meta_model::prelude::*;
41use risingwave_meta_model::table::{RefreshState, TableType};
42use risingwave_meta_model::{
43 ActorId, ColumnCatalogArray, ConnectionId, CreateType, DatabaseId, FragmentId, I32Array,
44 IndexId, JobStatus, ObjectId, Property, SchemaId, SecretId, SinkFormatDesc, SinkId, SourceId,
45 StreamNode, StreamSourceInfo, StreamingParallelism, SubscriptionId, TableId, UserId, ViewId,
46 connection, database, fragment, function, index, object, object_dependency, schema, secret,
47 sink, source, streaming_job, subscription, table, user_privilege, view,
48};
49use risingwave_pb::catalog::connection::Info as ConnectionInfo;
50use risingwave_pb::catalog::subscription::SubscriptionState;
51use risingwave_pb::catalog::table::PbTableType;
52use risingwave_pb::catalog::{
53 PbComment, PbConnection, PbDatabase, PbFunction, PbIndex, PbSchema, PbSecret, PbSink, PbSource,
54 PbStreamJobStatus, PbSubscription, PbTable, PbView,
55};
56use risingwave_pb::meta::cancel_creating_jobs_request::PbCreatingJobInfo;
57use risingwave_pb::meta::list_object_dependencies_response::PbObjectDependencies;
58use risingwave_pb::meta::object::PbObjectInfo;
59use risingwave_pb::meta::subscribe_response::{
60 Info as NotificationInfo, Info, Operation as NotificationOperation, Operation,
61};
62use risingwave_pb::meta::{PbObject, PbObjectGroup};
63use risingwave_pb::stream_plan::stream_node::NodeBody;
64use risingwave_pb::telemetry::PbTelemetryEventStage;
65use risingwave_pb::user::PbUserInfo;
66use sea_orm::ActiveValue::Set;
67use sea_orm::sea_query::{Expr, Query, SimpleExpr};
68use sea_orm::{
69 ActiveModelTrait, ColumnTrait, DatabaseConnection, DatabaseTransaction, EntityTrait,
70 IntoActiveModel, JoinType, PaginatorTrait, QueryFilter, QuerySelect, RelationTrait,
71 SelectColumns, TransactionTrait, Value,
72};
73use tokio::sync::oneshot::Sender;
74use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
75use tracing::info;
76
77use super::utils::{
78 check_subscription_name_duplicate, get_internal_tables_by_id, rename_relation,
79 rename_relation_refer,
80};
81use crate::controller::ObjectModel;
82use crate::controller::catalog::util::update_internal_tables;
83use crate::controller::utils::*;
84use crate::manager::{
85 IGNORED_NOTIFICATION_VERSION, MetaSrvEnv, NotificationVersion,
86 get_referred_connection_ids_from_source, get_referred_secret_ids_from_source,
87};
88use crate::rpc::ddl_controller::DropMode;
89use crate::telemetry::{MetaTelemetryJobDesc, report_event};
90use crate::{MetaError, MetaResult};
91
92pub type Catalog = (
93 Vec<PbDatabase>,
94 Vec<PbSchema>,
95 Vec<PbTable>,
96 Vec<PbSource>,
97 Vec<PbSink>,
98 Vec<PbSubscription>,
99 Vec<PbIndex>,
100 Vec<PbView>,
101 Vec<PbFunction>,
102 Vec<PbConnection>,
103 Vec<PbSecret>,
104);
105
106pub type CatalogControllerRef = Arc<CatalogController>;
107
108pub struct CatalogController {
110 pub(crate) env: MetaSrvEnv,
111 pub(crate) inner: RwLock<CatalogControllerInner>,
112}
113
114#[derive(Clone, Default, Debug)]
115pub struct DropTableConnectorContext {
116 pub(crate) to_change_streaming_job_id: JobId,
118 pub(crate) to_remove_state_table_id: TableId,
119 pub(crate) to_remove_source_id: SourceId,
120}
121
122#[derive(Clone, Default, Debug)]
123pub struct ReleaseContext {
124 pub(crate) database_id: DatabaseId,
125 pub(crate) removed_streaming_job_ids: Vec<JobId>,
126 pub(crate) removed_state_table_ids: Vec<TableId>,
128
129 pub(crate) removed_secret_ids: Vec<SecretId>,
131 pub(crate) removed_source_ids: Vec<SourceId>,
133 pub(crate) removed_source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
136
137 pub(crate) removed_actors: HashSet<ActorId>,
138 pub(crate) removed_fragments: HashSet<FragmentId>,
139
140 pub(crate) removed_sink_fragment_by_targets: HashMap<FragmentId, Vec<FragmentId>>,
142}
143
144impl CatalogController {
145 pub async fn new(env: MetaSrvEnv) -> MetaResult<Self> {
146 let meta_store = env.meta_store();
147 let catalog_controller = Self {
148 env,
149 inner: RwLock::new(CatalogControllerInner {
150 db: meta_store.conn,
151 creating_table_finish_notifier: HashMap::new(),
152 dropped_tables: HashMap::new(),
153 }),
154 };
155
156 catalog_controller.init().await?;
157 Ok(catalog_controller)
158 }
159
160 pub async fn get_inner_read_guard(&self) -> RwLockReadGuard<'_, CatalogControllerInner> {
163 self.inner.read().await
164 }
165
166 pub async fn get_inner_write_guard(&self) -> RwLockWriteGuard<'_, CatalogControllerInner> {
167 self.inner.write().await
168 }
169}
170
171pub struct CatalogControllerInner {
172 pub(crate) db: DatabaseConnection,
173 #[expect(clippy::type_complexity)]
178 pub creating_table_finish_notifier:
179 HashMap<DatabaseId, HashMap<JobId, Vec<Sender<Result<NotificationVersion, String>>>>>,
180 pub dropped_tables: HashMap<TableId, PbTable>,
182}
183
184impl CatalogController {
185 pub(crate) async fn notify_frontend(
186 &self,
187 operation: NotificationOperation,
188 info: NotificationInfo,
189 ) -> NotificationVersion {
190 self.env
191 .notification_manager()
192 .notify_frontend(operation, info)
193 .await
194 }
195
196 pub(crate) async fn notify_frontend_relation_info(
197 &self,
198 operation: NotificationOperation,
199 relation_info: PbObjectInfo,
200 ) -> NotificationVersion {
201 self.env
202 .notification_manager()
203 .notify_frontend_object_info(operation, relation_info)
204 .await
205 }
206
207 pub(crate) async fn current_notification_version(&self) -> NotificationVersion {
208 self.env.notification_manager().current_version().await
209 }
210}
211
212impl CatalogController {
213 pub async fn finish_create_subscription_catalog(&self, subscription_id: u32) -> MetaResult<()> {
214 let inner = self.inner.write().await;
215 let txn = inner.db.begin().await?;
216 let job_id = subscription_id as i32;
217
218 let res = Object::update_many()
220 .col_expr(object::Column::CreatedAt, Expr::current_timestamp().into())
221 .col_expr(
222 object::Column::CreatedAtClusterVersion,
223 current_cluster_version().into(),
224 )
225 .filter(object::Column::Oid.eq(job_id))
226 .exec(&txn)
227 .await?;
228 if res.rows_affected == 0 {
229 return Err(MetaError::catalog_id_not_found("subscription", job_id));
230 }
231
232 let job = subscription::ActiveModel {
234 subscription_id: Set(job_id),
235 subscription_state: Set(SubscriptionState::Created.into()),
236 ..Default::default()
237 };
238 job.update(&txn).await?;
239
240 let _ = grant_default_privileges_automatically(&txn, job_id).await?;
241
242 txn.commit().await?;
243
244 Ok(())
245 }
246
247 pub async fn notify_create_subscription(
248 &self,
249 subscription_id: u32,
250 ) -> MetaResult<NotificationVersion> {
251 let inner = self.inner.read().await;
252 let job_id = subscription_id as i32;
253 let (subscription, obj) = Subscription::find_by_id(job_id)
254 .find_also_related(Object)
255 .filter(subscription::Column::SubscriptionState.eq(SubscriptionState::Created as i32))
256 .one(&inner.db)
257 .await?
258 .ok_or_else(|| MetaError::catalog_id_not_found("subscription", job_id))?;
259
260 let mut version = self
261 .notify_frontend(
262 NotificationOperation::Add,
263 NotificationInfo::ObjectGroup(PbObjectGroup {
264 objects: vec![PbObject {
265 object_info: PbObjectInfo::Subscription(
266 ObjectModel(subscription, obj.unwrap()).into(),
267 )
268 .into(),
269 }],
270 }),
271 )
272 .await;
273
274 let updated_user_ids: Vec<UserId> = UserPrivilege::find()
276 .select_only()
277 .distinct()
278 .column(user_privilege::Column::UserId)
279 .filter(user_privilege::Column::Oid.eq(subscription_id as ObjectId))
280 .into_tuple()
281 .all(&inner.db)
282 .await?;
283
284 if !updated_user_ids.is_empty() {
285 let updated_user_infos = list_user_info_by_ids(updated_user_ids, &inner.db).await?;
286 version = self.notify_users_update(updated_user_infos).await;
287 }
288
289 Ok(version)
290 }
291
292 pub async fn get_connector_usage(&self) -> MetaResult<jsonbb::Value> {
294 let inner = self.inner.read().await;
314 let source_props_and_info: Vec<(i32, Property, Option<StreamSourceInfo>)> = Source::find()
315 .select_only()
316 .column(source::Column::SourceId)
317 .column(source::Column::WithProperties)
318 .column(source::Column::SourceInfo)
319 .into_tuple()
320 .all(&inner.db)
321 .await?;
322 let sink_props_and_info: Vec<(i32, Property, Option<SinkFormatDesc>)> = Sink::find()
323 .select_only()
324 .column(sink::Column::SinkId)
325 .column(sink::Column::Properties)
326 .column(sink::Column::SinkFormatDesc)
327 .into_tuple()
328 .all(&inner.db)
329 .await?;
330 drop(inner);
331
332 let get_connector_from_property = |property: &Property| -> String {
333 property
334 .0
335 .get(UPSTREAM_SOURCE_KEY)
336 .cloned()
337 .unwrap_or_default()
338 };
339
340 let source_report: Vec<jsonbb::Value> = source_props_and_info
341 .iter()
342 .map(|(oid, property, info)| {
343 let connector_name = get_connector_from_property(property);
344 let mut format = None;
345 let mut encode = None;
346 if let Some(info) = info {
347 let pb_info = info.to_protobuf();
348 format = Some(pb_info.format().as_str_name());
349 encode = Some(pb_info.row_encode().as_str_name());
350 }
351 jsonbb::json!({
352 oid.to_string(): {
353 "connector": connector_name,
354 "format": format,
355 "encode": encode,
356 },
357 })
358 })
359 .collect_vec();
360
361 let sink_report: Vec<jsonbb::Value> = sink_props_and_info
362 .iter()
363 .map(|(oid, property, info)| {
364 let connector_name = get_connector_from_property(property);
365 let mut format = None;
366 let mut encode = None;
367 if let Some(info) = info {
368 let pb_info = info.to_protobuf();
369 format = Some(pb_info.format().as_str_name());
370 encode = Some(pb_info.encode().as_str_name());
371 }
372 jsonbb::json!({
373 oid.to_string(): {
374 "connector": connector_name,
375 "format": format,
376 "encode": encode,
377 },
378 })
379 })
380 .collect_vec();
381
382 Ok(jsonbb::json!({
383 "source": source_report,
384 "sink": sink_report,
385 }))
386 }
387
388 pub async fn clean_dirty_subscription(
389 &self,
390 database_id: Option<DatabaseId>,
391 ) -> MetaResult<()> {
392 let inner = self.inner.write().await;
393 let txn = inner.db.begin().await?;
394 let filter_condition = object::Column::ObjType.eq(ObjectType::Subscription).and(
395 object::Column::Oid.not_in_subquery(
396 Query::select()
397 .column(subscription::Column::SubscriptionId)
398 .from(Subscription)
399 .and_where(
400 subscription::Column::SubscriptionState
401 .eq(SubscriptionState::Created as i32),
402 )
403 .take(),
404 ),
405 );
406
407 let filter_condition = if let Some(database_id) = database_id {
408 filter_condition.and(object::Column::DatabaseId.eq(database_id))
409 } else {
410 filter_condition
411 };
412 Object::delete_many()
413 .filter(filter_condition)
414 .exec(&txn)
415 .await?;
416 txn.commit().await?;
417 Ok(())
419 }
420
421 pub async fn clean_dirty_creating_jobs(
423 &self,
424 database_id: Option<DatabaseId>,
425 ) -> MetaResult<Vec<SourceId>> {
426 let inner = self.inner.write().await;
427 let txn = inner.db.begin().await?;
428
429 let filter_condition = streaming_job::Column::JobStatus.eq(JobStatus::Initial).or(
430 streaming_job::Column::JobStatus
431 .eq(JobStatus::Creating)
432 .and(streaming_job::Column::CreateType.eq(CreateType::Foreground)),
433 );
434
435 let filter_condition = if let Some(database_id) = database_id {
436 filter_condition.and(object::Column::DatabaseId.eq(database_id))
437 } else {
438 filter_condition
439 };
440
441 let dirty_job_objs: Vec<PartialObject> = streaming_job::Entity::find()
442 .select_only()
443 .column(streaming_job::Column::JobId)
444 .columns([
445 object::Column::Oid,
446 object::Column::ObjType,
447 object::Column::SchemaId,
448 object::Column::DatabaseId,
449 ])
450 .join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
451 .filter(filter_condition)
452 .into_partial_model()
453 .all(&txn)
454 .await?;
455
456 Self::clean_dirty_sink_downstreams(&txn).await?;
457
458 if dirty_job_objs.is_empty() {
459 return Ok(vec![]);
460 }
461
462 self.log_cleaned_dirty_jobs(&dirty_job_objs, &txn).await?;
463
464 let dirty_job_ids = dirty_job_objs.iter().map(|obj| obj.oid).collect::<Vec<_>>();
465
466 let all_dirty_table_ids = dirty_job_objs
469 .iter()
470 .filter(|obj| obj.obj_type == ObjectType::Table)
471 .map(|obj| obj.oid)
472 .collect_vec();
473 let dirty_table_type_map: HashMap<ObjectId, TableType> = Table::find()
474 .select_only()
475 .column(table::Column::TableId)
476 .column(table::Column::TableType)
477 .filter(table::Column::TableId.is_in(all_dirty_table_ids))
478 .into_tuple::<(ObjectId, TableType)>()
479 .all(&txn)
480 .await?
481 .into_iter()
482 .collect();
483
484 let dirty_background_jobs: HashSet<ObjectId> = streaming_job::Entity::find()
485 .select_only()
486 .column(streaming_job::Column::JobId)
487 .filter(
488 streaming_job::Column::JobId
489 .is_in(dirty_job_ids.clone())
490 .and(streaming_job::Column::CreateType.eq(CreateType::Background)),
491 )
492 .into_tuple()
493 .all(&txn)
494 .await?
495 .into_iter()
496 .collect();
497
498 let to_notify_objs = dirty_job_objs
500 .into_iter()
501 .filter(|obj| {
502 matches!(
503 dirty_table_type_map.get(&obj.oid),
504 Some(TableType::MaterializedView)
505 ) || dirty_background_jobs.contains(&obj.oid)
506 })
507 .collect_vec();
508
509 let dirty_associated_source_ids: Vec<SourceId> = Table::find()
512 .select_only()
513 .column(table::Column::OptionalAssociatedSourceId)
514 .filter(
515 table::Column::TableId
516 .is_in(dirty_job_ids.clone())
517 .and(table::Column::OptionalAssociatedSourceId.is_not_null()),
518 )
519 .into_tuple()
520 .all(&txn)
521 .await?;
522
523 let dirty_state_table_ids: Vec<TableId> = Table::find()
524 .select_only()
525 .column(table::Column::TableId)
526 .filter(table::Column::BelongsToJobId.is_in(dirty_job_ids.clone()))
527 .into_tuple()
528 .all(&txn)
529 .await?;
530
531 let dirty_internal_table_objs = Object::find()
532 .select_only()
533 .columns([
534 object::Column::Oid,
535 object::Column::ObjType,
536 object::Column::SchemaId,
537 object::Column::DatabaseId,
538 ])
539 .join(JoinType::InnerJoin, object::Relation::Table.def())
540 .filter(table::Column::BelongsToJobId.is_in(to_notify_objs.iter().map(|obj| obj.oid)))
541 .into_partial_model()
542 .all(&txn)
543 .await?;
544
545 let to_delete_objs: HashSet<ObjectId> = dirty_job_ids
546 .clone()
547 .into_iter()
548 .chain(
549 dirty_state_table_ids
550 .into_iter()
551 .map(|table_id| table_id.as_raw_id() as _),
552 )
553 .chain(dirty_associated_source_ids.clone().into_iter())
554 .collect();
555
556 let res = Object::delete_many()
557 .filter(object::Column::Oid.is_in(to_delete_objs))
558 .exec(&txn)
559 .await?;
560 assert!(res.rows_affected > 0);
561
562 txn.commit().await?;
563
564 let object_group = build_object_group_for_delete(
565 to_notify_objs
566 .into_iter()
567 .chain(dirty_internal_table_objs.into_iter())
568 .collect_vec(),
569 );
570
571 let _version = self
572 .notify_frontend(NotificationOperation::Delete, object_group)
573 .await;
574
575 Ok(dirty_associated_source_ids)
576 }
577
578 pub async fn reset_refreshing_tables(&self, database_id: Option<DatabaseId>) -> MetaResult<()> {
580 let inner = self.inner.write().await;
581 let txn = inner.db.begin().await?;
582
583 let filter_condition = table::Column::RefreshState.eq(RefreshState::Refreshing);
587
588 let filter_condition = if let Some(database_id) = database_id {
589 filter_condition.and(object::Column::DatabaseId.eq(database_id))
590 } else {
591 filter_condition
592 };
593
594 let table_ids: Vec<TableId> = Table::find()
595 .find_also_related(Object)
596 .filter(filter_condition)
597 .all(&txn)
598 .await
599 .context("reset_refreshing_tables: finding table ids")?
600 .into_iter()
601 .map(|(t, _)| t.table_id)
602 .collect();
603
604 let res = Table::update_many()
605 .col_expr(table::Column::RefreshState, Expr::value(RefreshState::Idle))
606 .filter(table::Column::TableId.is_in(table_ids))
607 .exec(&txn)
608 .await
609 .context("reset_refreshing_tables: update refresh state")?;
610
611 txn.commit().await?;
612
613 tracing::debug!(
614 "reset refreshing tables: {} tables updated",
615 res.rows_affected
616 );
617
618 Ok(())
619 }
620
621 pub async fn comment_on(&self, comment: PbComment) -> MetaResult<NotificationVersion> {
622 let inner = self.inner.write().await;
623 let txn = inner.db.begin().await?;
624 ensure_object_id(ObjectType::Database, comment.database_id as _, &txn).await?;
625 ensure_object_id(ObjectType::Schema, comment.schema_id as _, &txn).await?;
626 let table_obj = Object::find_by_id(comment.table_id as ObjectId)
627 .one(&txn)
628 .await?
629 .ok_or_else(|| MetaError::catalog_id_not_found("table", comment.table_id))?;
630
631 let table = if let Some(col_idx) = comment.column_index {
632 let columns: ColumnCatalogArray = Table::find_by_id(TableId::new(comment.table_id))
633 .select_only()
634 .column(table::Column::Columns)
635 .into_tuple()
636 .one(&txn)
637 .await?
638 .ok_or_else(|| MetaError::catalog_id_not_found("table", comment.table_id))?;
639 let mut pb_columns = columns.to_protobuf();
640
641 let column = pb_columns
642 .get_mut(col_idx as usize)
643 .ok_or_else(|| MetaError::catalog_id_not_found("column", col_idx))?;
644 let column_desc = column.column_desc.as_mut().ok_or_else(|| {
645 anyhow!(
646 "column desc at index {} for table id {} not found",
647 col_idx,
648 comment.table_id
649 )
650 })?;
651 column_desc.description = comment.description;
652 table::ActiveModel {
653 table_id: Set(comment.table_id.into()),
654 columns: Set(pb_columns.into()),
655 ..Default::default()
656 }
657 .update(&txn)
658 .await?
659 } else {
660 table::ActiveModel {
661 table_id: Set(comment.table_id.into()),
662 description: Set(comment.description),
663 ..Default::default()
664 }
665 .update(&txn)
666 .await?
667 };
668 txn.commit().await?;
669
670 let version = self
671 .notify_frontend_relation_info(
672 NotificationOperation::Update,
673 PbObjectInfo::Table(ObjectModel(table, table_obj).into()),
674 )
675 .await;
676
677 Ok(version)
678 }
679
680 async fn notify_hummock_dropped_tables(&self, tables: Vec<PbTable>) {
681 if tables.is_empty() {
682 return;
683 }
684 let objects = tables
685 .into_iter()
686 .map(|t| PbObject {
687 object_info: Some(PbObjectInfo::Table(t)),
688 })
689 .collect();
690 let group = NotificationInfo::ObjectGroup(PbObjectGroup { objects });
691 self.env
692 .notification_manager()
693 .notify_hummock(NotificationOperation::Delete, group.clone())
694 .await;
695 self.env
696 .notification_manager()
697 .notify_compactor(NotificationOperation::Delete, group)
698 .await;
699 }
700
701 pub async fn complete_dropped_tables(&self, table_ids: impl IntoIterator<Item = TableId>) {
702 let mut inner = self.inner.write().await;
703 let tables = inner.complete_dropped_tables(table_ids);
704 self.notify_hummock_dropped_tables(tables).await;
705 }
706
707 pub async fn cleanup_dropped_tables(&self) {
708 let mut inner = self.inner.write().await;
709 let tables = inner.dropped_tables.drain().map(|(_, t)| t).collect();
710 self.notify_hummock_dropped_tables(tables).await;
711 }
712
713 pub async fn stats(&self) -> MetaResult<CatalogStats> {
714 let inner = self.inner.read().await;
715
716 let mut table_num_map: HashMap<_, _> = Table::find()
717 .select_only()
718 .column(table::Column::TableType)
719 .column_as(table::Column::TableId.count(), "num")
720 .group_by(table::Column::TableType)
721 .having(table::Column::TableType.ne(TableType::Internal))
722 .into_tuple::<(TableType, i64)>()
723 .all(&inner.db)
724 .await?
725 .into_iter()
726 .map(|(table_type, num)| (table_type, num as u64))
727 .collect();
728
729 let source_num = Source::find().count(&inner.db).await?;
730 let sink_num = Sink::find().count(&inner.db).await?;
731 let function_num = Function::find().count(&inner.db).await?;
732 let streaming_job_num = StreamingJob::find().count(&inner.db).await?;
733
734 let actor_num = {
735 let guard = self.env.shared_actor_info.read_guard();
736 guard
737 .iter_over_fragments()
738 .map(|(_, fragment)| fragment.actors.len() as u64)
739 .sum::<u64>()
740 };
741
742 Ok(CatalogStats {
743 table_num: table_num_map.remove(&TableType::Table).unwrap_or(0),
744 mview_num: table_num_map
745 .remove(&TableType::MaterializedView)
746 .unwrap_or(0),
747 index_num: table_num_map.remove(&TableType::Index).unwrap_or(0),
748 source_num,
749 sink_num,
750 function_num,
751 streaming_job_num,
752 actor_num,
753 })
754 }
755}
756
757pub struct CatalogStats {
759 pub table_num: u64,
760 pub mview_num: u64,
761 pub index_num: u64,
762 pub source_num: u64,
763 pub sink_num: u64,
764 pub function_num: u64,
765 pub streaming_job_num: u64,
766 pub actor_num: u64,
767}
768
769impl CatalogControllerInner {
770 pub async fn snapshot(&self) -> MetaResult<(Catalog, Vec<PbUserInfo>)> {
771 let databases = self.list_databases().await?;
772 let schemas = self.list_schemas().await?;
773 let tables = self.list_tables().await?;
774 let sources = self.list_sources().await?;
775 let sinks = self.list_sinks().await?;
776 let subscriptions = self.list_subscriptions().await?;
777 let indexes = self.list_indexes().await?;
778 let views = self.list_views().await?;
779 let functions = self.list_functions().await?;
780 let connections = self.list_connections().await?;
781 let secrets = self.list_secrets().await?;
782
783 let users = self.list_users().await?;
784
785 Ok((
786 (
787 databases,
788 schemas,
789 tables,
790 sources,
791 sinks,
792 subscriptions,
793 indexes,
794 views,
795 functions,
796 connections,
797 secrets,
798 ),
799 users,
800 ))
801 }
802
803 async fn list_databases(&self) -> MetaResult<Vec<PbDatabase>> {
804 let db_objs = Database::find()
805 .find_also_related(Object)
806 .all(&self.db)
807 .await?;
808 Ok(db_objs
809 .into_iter()
810 .map(|(db, obj)| ObjectModel(db, obj.unwrap()).into())
811 .collect())
812 }
813
814 async fn list_schemas(&self) -> MetaResult<Vec<PbSchema>> {
815 let schema_objs = Schema::find()
816 .find_also_related(Object)
817 .all(&self.db)
818 .await?;
819
820 Ok(schema_objs
821 .into_iter()
822 .map(|(schema, obj)| ObjectModel(schema, obj.unwrap()).into())
823 .collect())
824 }
825
826 async fn list_users(&self) -> MetaResult<Vec<PbUserInfo>> {
827 let mut user_infos: Vec<PbUserInfo> = User::find()
828 .all(&self.db)
829 .await?
830 .into_iter()
831 .map(Into::into)
832 .collect();
833
834 for user_info in &mut user_infos {
835 user_info.grant_privileges = get_user_privilege(user_info.id as _, &self.db).await?;
836 }
837 Ok(user_infos)
838 }
839
840 pub async fn list_all_state_tables(&self) -> MetaResult<Vec<PbTable>> {
842 let table_objs = Table::find()
843 .find_also_related(Object)
844 .all(&self.db)
845 .await?;
846
847 Ok(table_objs
848 .into_iter()
849 .map(|(table, obj)| ObjectModel(table, obj.unwrap()).into())
850 .collect())
851 }
852
853 async fn list_tables(&self) -> MetaResult<Vec<PbTable>> {
855 let table_objs = Table::find()
856 .find_also_related(Object)
857 .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
858 .filter(
859 streaming_job::Column::JobStatus.eq(JobStatus::Created).or(
860 table::Column::TableType
861 .eq(TableType::MaterializedView)
862 .or(streaming_job::Column::CreateType.eq(CreateType::Background)),
863 ),
864 )
865 .all(&self.db)
866 .await?;
867
868 let job_statuses: HashMap<JobId, JobStatus> = StreamingJob::find()
869 .select_only()
870 .column(streaming_job::Column::JobId)
871 .column(streaming_job::Column::JobStatus)
872 .filter(
873 streaming_job::Column::JobStatus
874 .eq(JobStatus::Created)
875 .or(streaming_job::Column::CreateType.eq(CreateType::Background)),
876 )
877 .into_tuple::<(JobId, JobStatus)>()
878 .all(&self.db)
879 .await?
880 .into_iter()
881 .collect();
882
883 let job_ids: HashSet<JobId> = table_objs
884 .iter()
885 .map(|(t, _)| t.table_id.as_job_id())
886 .chain(job_statuses.keys().cloned())
887 .collect();
888
889 let internal_table_objs = Table::find()
890 .find_also_related(Object)
891 .filter(
892 table::Column::TableType
893 .eq(TableType::Internal)
894 .and(table::Column::BelongsToJobId.is_in(job_ids)),
895 )
896 .all(&self.db)
897 .await?;
898
899 Ok(table_objs
900 .into_iter()
901 .chain(internal_table_objs.into_iter())
902 .map(|(table, obj)| {
903 let status: PbStreamJobStatus = if table.table_type == TableType::Internal {
906 (*job_statuses
907 .get(&table.belongs_to_job_id.unwrap())
908 .unwrap_or(&JobStatus::Creating))
909 .into()
910 } else {
911 (*job_statuses
912 .get(&table.table_id.as_job_id())
913 .unwrap_or(&JobStatus::Creating))
914 .into()
915 };
916 let mut pb_table: PbTable = ObjectModel(table, obj.unwrap()).into();
917 pb_table.stream_job_status = status.into();
918 pb_table
919 })
920 .collect())
921 }
922
923 async fn list_sources(&self) -> MetaResult<Vec<PbSource>> {
925 let mut source_objs = Source::find()
926 .find_also_related(Object)
927 .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
928 .filter(
929 streaming_job::Column::JobStatus
930 .is_null()
931 .or(streaming_job::Column::JobStatus.eq(JobStatus::Created)),
932 )
933 .all(&self.db)
934 .await?;
935
936 let created_table_ids: HashSet<TableId> = Table::find()
938 .select_only()
939 .column(table::Column::TableId)
940 .join(JoinType::InnerJoin, table::Relation::Object1.def())
941 .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
942 .filter(
943 table::Column::OptionalAssociatedSourceId
944 .is_not_null()
945 .and(streaming_job::Column::JobStatus.eq(JobStatus::Created)),
946 )
947 .into_tuple()
948 .all(&self.db)
949 .await?
950 .into_iter()
951 .collect();
952 source_objs.retain_mut(|(source, _)| {
953 source.optional_associated_table_id.is_none()
954 || created_table_ids.contains(&source.optional_associated_table_id.unwrap())
955 });
956
957 Ok(source_objs
958 .into_iter()
959 .map(|(source, obj)| ObjectModel(source, obj.unwrap()).into())
960 .collect())
961 }
962
963 async fn list_sinks(&self) -> MetaResult<Vec<PbSink>> {
965 let sink_objs = Sink::find()
966 .find_also_related(Object)
967 .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
968 .filter(
969 streaming_job::Column::JobStatus
970 .eq(JobStatus::Created)
971 .or(streaming_job::Column::CreateType.eq(CreateType::Background)),
972 )
973 .all(&self.db)
974 .await?;
975
976 let creating_sinks: HashSet<_> = StreamingJob::find()
977 .select_only()
978 .column(streaming_job::Column::JobId)
979 .filter(
980 streaming_job::Column::JobStatus
981 .eq(JobStatus::Creating)
982 .and(
983 streaming_job::Column::JobId
984 .is_in(sink_objs.iter().map(|(sink, _)| sink.sink_id)),
985 ),
986 )
987 .into_tuple::<SinkId>()
988 .all(&self.db)
989 .await?
990 .into_iter()
991 .collect();
992
993 Ok(sink_objs
994 .into_iter()
995 .map(|(sink, obj)| {
996 let is_creating = creating_sinks.contains(&sink.sink_id);
997 let mut pb_sink: PbSink = ObjectModel(sink, obj.unwrap()).into();
998 pb_sink.stream_job_status = if is_creating {
999 PbStreamJobStatus::Creating.into()
1000 } else {
1001 PbStreamJobStatus::Created.into()
1002 };
1003 pb_sink
1004 })
1005 .collect())
1006 }
1007
1008 async fn list_subscriptions(&self) -> MetaResult<Vec<PbSubscription>> {
1010 let subscription_objs = Subscription::find()
1011 .find_also_related(Object)
1012 .filter(subscription::Column::SubscriptionState.eq(SubscriptionState::Created as i32))
1013 .all(&self.db)
1014 .await?;
1015
1016 Ok(subscription_objs
1017 .into_iter()
1018 .map(|(subscription, obj)| ObjectModel(subscription, obj.unwrap()).into())
1019 .collect())
1020 }
1021
1022 async fn list_views(&self) -> MetaResult<Vec<PbView>> {
1023 let view_objs = View::find().find_also_related(Object).all(&self.db).await?;
1024
1025 Ok(view_objs
1026 .into_iter()
1027 .map(|(view, obj)| ObjectModel(view, obj.unwrap()).into())
1028 .collect())
1029 }
1030
1031 async fn list_indexes(&self) -> MetaResult<Vec<PbIndex>> {
1033 let index_objs = Index::find()
1034 .find_also_related(Object)
1035 .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
1036 .filter(
1037 streaming_job::Column::JobStatus
1038 .eq(JobStatus::Created)
1039 .or(streaming_job::Column::CreateType.eq(CreateType::Background)),
1040 )
1041 .all(&self.db)
1042 .await?;
1043
1044 let creating_indexes: HashSet<_> = StreamingJob::find()
1045 .select_only()
1046 .column(streaming_job::Column::JobId)
1047 .filter(
1048 streaming_job::Column::JobStatus
1049 .eq(JobStatus::Creating)
1050 .and(
1051 streaming_job::Column::JobId
1052 .is_in(index_objs.iter().map(|(index, _)| index.index_id)),
1053 ),
1054 )
1055 .into_tuple::<IndexId>()
1056 .all(&self.db)
1057 .await?
1058 .into_iter()
1059 .collect();
1060
1061 Ok(index_objs
1062 .into_iter()
1063 .map(|(index, obj)| {
1064 let is_creating = creating_indexes.contains(&index.index_id);
1065 let mut pb_index: PbIndex = ObjectModel(index, obj.unwrap()).into();
1066 pb_index.stream_job_status = if is_creating {
1067 PbStreamJobStatus::Creating.into()
1068 } else {
1069 PbStreamJobStatus::Created.into()
1070 };
1071 pb_index
1072 })
1073 .collect())
1074 }
1075
1076 async fn list_connections(&self) -> MetaResult<Vec<PbConnection>> {
1077 let conn_objs = Connection::find()
1078 .find_also_related(Object)
1079 .all(&self.db)
1080 .await?;
1081
1082 Ok(conn_objs
1083 .into_iter()
1084 .map(|(conn, obj)| ObjectModel(conn, obj.unwrap()).into())
1085 .collect())
1086 }
1087
1088 pub async fn list_secrets(&self) -> MetaResult<Vec<PbSecret>> {
1089 let secret_objs = Secret::find()
1090 .find_also_related(Object)
1091 .all(&self.db)
1092 .await?;
1093 Ok(secret_objs
1094 .into_iter()
1095 .map(|(secret, obj)| ObjectModel(secret, obj.unwrap()).into())
1096 .collect())
1097 }
1098
1099 async fn list_functions(&self) -> MetaResult<Vec<PbFunction>> {
1100 let func_objs = Function::find()
1101 .find_also_related(Object)
1102 .all(&self.db)
1103 .await?;
1104
1105 Ok(func_objs
1106 .into_iter()
1107 .map(|(func, obj)| ObjectModel(func, obj.unwrap()).into())
1108 .collect())
1109 }
1110
1111 pub(crate) fn register_finish_notifier(
1112 &mut self,
1113 database_id: DatabaseId,
1114 id: JobId,
1115 sender: Sender<Result<NotificationVersion, String>>,
1116 ) {
1117 self.creating_table_finish_notifier
1118 .entry(database_id)
1119 .or_default()
1120 .entry(id)
1121 .or_default()
1122 .push(sender);
1123 }
1124
1125 pub(crate) async fn streaming_job_is_finished(&mut self, id: JobId) -> MetaResult<bool> {
1126 let status = StreamingJob::find()
1127 .select_only()
1128 .column(streaming_job::Column::JobStatus)
1129 .filter(streaming_job::Column::JobId.eq(id))
1130 .into_tuple::<JobStatus>()
1131 .one(&self.db)
1132 .await?;
1133
1134 status
1135 .map(|status| status == JobStatus::Created)
1136 .ok_or_else(|| {
1137 MetaError::catalog_id_not_found("streaming job", "may have been cancelled/dropped")
1138 })
1139 }
1140
1141 pub(crate) fn notify_finish_failed(&mut self, database_id: Option<DatabaseId>, err: String) {
1142 if let Some(database_id) = database_id {
1143 if let Some(creating_tables) = self.creating_table_finish_notifier.remove(&database_id)
1144 {
1145 for tx in creating_tables.into_values().flatten() {
1146 let _ = tx.send(Err(err.clone()));
1147 }
1148 }
1149 } else {
1150 for tx in take(&mut self.creating_table_finish_notifier)
1151 .into_values()
1152 .flatten()
1153 .flat_map(|(_, txs)| txs.into_iter())
1154 {
1155 let _ = tx.send(Err(err.clone()));
1156 }
1157 }
1158 }
1159
1160 pub(crate) fn notify_cancelled(&mut self, database_id: DatabaseId, job_id: JobId) {
1161 if let Some(creating_tables) = self.creating_table_finish_notifier.get_mut(&database_id)
1162 && let Some(tx_list) = creating_tables.remove(&job_id)
1163 {
1164 for tx in tx_list {
1165 let _ = tx.send(Err("Cancelled".to_owned()));
1166 }
1167 }
1168 }
1169
1170 pub async fn list_time_travel_table_ids(&self) -> MetaResult<Vec<TableId>> {
1171 let table_ids: Vec<TableId> = Table::find()
1172 .select_only()
1173 .filter(table::Column::TableType.is_in(vec![
1174 TableType::Table,
1175 TableType::MaterializedView,
1176 TableType::Index,
1177 ]))
1178 .column(table::Column::TableId)
1179 .into_tuple()
1180 .all(&self.db)
1181 .await?;
1182 Ok(table_ids)
1183 }
1184
1185 pub(crate) fn complete_dropped_tables(
1188 &mut self,
1189 table_ids: impl IntoIterator<Item = TableId>,
1190 ) -> Vec<PbTable> {
1191 table_ids
1192 .into_iter()
1193 .filter_map(|table_id| {
1194 self.dropped_tables.remove(&table_id).map_or_else(
1195 || {
1196 tracing::warn!(%table_id, "table not found");
1197 None
1198 },
1199 Some,
1200 )
1201 })
1202 .collect()
1203 }
1204}