1mod alter_op;
16mod create_op;
17mod drop_op;
18mod get_op;
19mod list_op;
20mod test;
21mod util;
22
23use std::collections::{BTreeSet, HashMap, HashSet};
24use std::iter;
25use std::mem::take;
26use std::sync::Arc;
27
28use anyhow::anyhow;
29use itertools::Itertools;
30use risingwave_common::catalog::{DEFAULT_SCHEMA_NAME, SYSTEM_SCHEMAS, TableOption};
31use risingwave_common::current_cluster_version;
32use risingwave_common::secret::LocalSecretManager;
33use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont_mut;
34use risingwave_connector::source::UPSTREAM_SOURCE_KEY;
35use risingwave_connector::source::cdc::build_cdc_table_id;
36use risingwave_meta_model::object::ObjectType;
37use risingwave_meta_model::prelude::*;
38use risingwave_meta_model::table::TableType;
39use risingwave_meta_model::{
40 ActorId, ColumnCatalogArray, ConnectionId, CreateType, DatabaseId, FragmentId, I32Array,
41 IndexId, JobStatus, ObjectId, Property, SchemaId, SecretId, SinkFormatDesc, SinkId, SourceId,
42 StreamNode, StreamSourceInfo, StreamingParallelism, SubscriptionId, TableId, UserId, ViewId,
43 connection, database, fragment, function, index, object, object_dependency, schema, secret,
44 sink, source, streaming_job, subscription, table, user_privilege, view,
45};
46use risingwave_pb::catalog::connection::Info as ConnectionInfo;
47use risingwave_pb::catalog::subscription::SubscriptionState;
48use risingwave_pb::catalog::table::PbTableType;
49use risingwave_pb::catalog::{
50 PbComment, PbConnection, PbDatabase, PbFunction, PbIndex, PbSchema, PbSecret, PbSink, PbSource,
51 PbStreamJobStatus, PbSubscription, PbTable, PbView,
52};
53use risingwave_pb::meta::cancel_creating_jobs_request::PbCreatingJobInfo;
54use risingwave_pb::meta::list_object_dependencies_response::PbObjectDependencies;
55use risingwave_pb::meta::object::PbObjectInfo;
56use risingwave_pb::meta::subscribe_response::{
57 Info as NotificationInfo, Info, Operation as NotificationOperation, Operation,
58};
59use risingwave_pb::meta::{PbFragmentWorkerSlotMapping, PbObject, PbObjectGroup};
60use risingwave_pb::stream_plan::FragmentTypeFlag;
61use risingwave_pb::stream_plan::stream_node::NodeBody;
62use risingwave_pb::telemetry::PbTelemetryEventStage;
63use risingwave_pb::user::PbUserInfo;
64use sea_orm::ActiveValue::Set;
65use sea_orm::sea_query::{Expr, Query, SimpleExpr};
66use sea_orm::{
67 ActiveModelTrait, ColumnTrait, DatabaseConnection, DatabaseTransaction, EntityTrait,
68 IntoActiveModel, JoinType, PaginatorTrait, QueryFilter, QuerySelect, RelationTrait,
69 SelectColumns, TransactionTrait, Value,
70};
71use tokio::sync::oneshot::Sender;
72use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
73use tracing::info;
74
75use super::utils::{
76 check_subscription_name_duplicate, get_internal_tables_by_id, rename_relation,
77 rename_relation_refer,
78};
79use crate::controller::ObjectModel;
80use crate::controller::catalog::util::update_internal_tables;
81use crate::controller::utils::*;
82use crate::manager::{
83 IGNORED_NOTIFICATION_VERSION, MetaSrvEnv, NotificationVersion,
84 get_referred_connection_ids_from_source, get_referred_secret_ids_from_source,
85};
86use crate::rpc::ddl_controller::DropMode;
87use crate::telemetry::{MetaTelemetryJobDesc, report_event};
88use crate::{MetaError, MetaResult};
89
90pub type Catalog = (
91 Vec<PbDatabase>,
92 Vec<PbSchema>,
93 Vec<PbTable>,
94 Vec<PbSource>,
95 Vec<PbSink>,
96 Vec<PbSubscription>,
97 Vec<PbIndex>,
98 Vec<PbView>,
99 Vec<PbFunction>,
100 Vec<PbConnection>,
101 Vec<PbSecret>,
102);
103
104pub type CatalogControllerRef = Arc<CatalogController>;
105
106pub struct CatalogController {
108 pub(crate) env: MetaSrvEnv,
109 pub(crate) inner: RwLock<CatalogControllerInner>,
110}
111
112#[derive(Clone, Default, Debug)]
113pub struct DropTableConnectorContext {
114 pub(crate) to_change_streaming_job_id: ObjectId,
116 pub(crate) to_remove_state_table_id: TableId,
117 pub(crate) to_remove_source_id: SourceId,
118}
119
120#[derive(Clone, Default, Debug)]
121pub struct ReleaseContext {
122 pub(crate) database_id: DatabaseId,
123 pub(crate) removed_streaming_job_ids: Vec<ObjectId>,
124 pub(crate) removed_state_table_ids: Vec<TableId>,
126
127 pub(crate) removed_secret_ids: Vec<SecretId>,
129 pub(crate) removed_source_ids: Vec<SourceId>,
131 pub(crate) removed_source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
134
135 pub(crate) removed_actors: HashSet<ActorId>,
136 pub(crate) removed_fragments: HashSet<FragmentId>,
137}
138
139impl CatalogController {
140 pub async fn new(env: MetaSrvEnv) -> MetaResult<Self> {
141 let meta_store = env.meta_store();
142 let catalog_controller = Self {
143 env,
144 inner: RwLock::new(CatalogControllerInner {
145 db: meta_store.conn,
146 creating_table_finish_notifier: HashMap::new(),
147 dropped_tables: HashMap::new(),
148 }),
149 };
150
151 catalog_controller.init().await?;
152 Ok(catalog_controller)
153 }
154
155 pub async fn get_inner_read_guard(&self) -> RwLockReadGuard<'_, CatalogControllerInner> {
158 self.inner.read().await
159 }
160
161 pub async fn get_inner_write_guard(&self) -> RwLockWriteGuard<'_, CatalogControllerInner> {
162 self.inner.write().await
163 }
164}
165
166pub struct CatalogControllerInner {
167 pub(crate) db: DatabaseConnection,
168 #[expect(clippy::type_complexity)]
173 pub creating_table_finish_notifier:
174 HashMap<DatabaseId, HashMap<ObjectId, Vec<Sender<Result<NotificationVersion, String>>>>>,
175 pub dropped_tables: HashMap<TableId, PbTable>,
177}
178
179impl CatalogController {
180 pub(crate) async fn notify_frontend(
181 &self,
182 operation: NotificationOperation,
183 info: NotificationInfo,
184 ) -> NotificationVersion {
185 self.env
186 .notification_manager()
187 .notify_frontend(operation, info)
188 .await
189 }
190
191 pub(crate) async fn notify_frontend_relation_info(
192 &self,
193 operation: NotificationOperation,
194 relation_info: PbObjectInfo,
195 ) -> NotificationVersion {
196 self.env
197 .notification_manager()
198 .notify_frontend_object_info(operation, relation_info)
199 .await
200 }
201
202 pub(crate) async fn current_notification_version(&self) -> NotificationVersion {
203 self.env.notification_manager().current_version().await
204 }
205}
206
207impl CatalogController {
208 pub async fn finish_create_subscription_catalog(&self, subscription_id: u32) -> MetaResult<()> {
209 let inner = self.inner.write().await;
210 let txn = inner.db.begin().await?;
211 let job_id = subscription_id as i32;
212
213 let res = Object::update_many()
215 .col_expr(object::Column::CreatedAt, Expr::current_timestamp().into())
216 .col_expr(
217 object::Column::CreatedAtClusterVersion,
218 current_cluster_version().into(),
219 )
220 .filter(object::Column::Oid.eq(job_id))
221 .exec(&txn)
222 .await?;
223 if res.rows_affected == 0 {
224 return Err(MetaError::catalog_id_not_found("subscription", job_id));
225 }
226
227 let job = subscription::ActiveModel {
229 subscription_id: Set(job_id),
230 subscription_state: Set(SubscriptionState::Created.into()),
231 ..Default::default()
232 };
233 job.update(&txn).await?;
234 txn.commit().await?;
235
236 Ok(())
237 }
238
239 pub async fn notify_create_subscription(
240 &self,
241 subscription_id: u32,
242 ) -> MetaResult<NotificationVersion> {
243 let inner = self.inner.read().await;
244 let job_id = subscription_id as i32;
245 let (subscription, obj) = Subscription::find_by_id(job_id)
246 .find_also_related(Object)
247 .filter(subscription::Column::SubscriptionState.eq(SubscriptionState::Created as i32))
248 .one(&inner.db)
249 .await?
250 .ok_or_else(|| MetaError::catalog_id_not_found("subscription", job_id))?;
251
252 let version = self
253 .notify_frontend(
254 NotificationOperation::Add,
255 NotificationInfo::ObjectGroup(PbObjectGroup {
256 objects: vec![PbObject {
257 object_info: PbObjectInfo::Subscription(
258 ObjectModel(subscription, obj.unwrap()).into(),
259 )
260 .into(),
261 }],
262 }),
263 )
264 .await;
265 Ok(version)
266 }
267
268 pub async fn get_connector_usage(&self) -> MetaResult<jsonbb::Value> {
270 let inner = self.inner.read().await;
290 let source_props_and_info: Vec<(i32, Property, Option<StreamSourceInfo>)> = Source::find()
291 .select_only()
292 .column(source::Column::SourceId)
293 .column(source::Column::WithProperties)
294 .column(source::Column::SourceInfo)
295 .into_tuple()
296 .all(&inner.db)
297 .await?;
298 let sink_props_and_info: Vec<(i32, Property, Option<SinkFormatDesc>)> = Sink::find()
299 .select_only()
300 .column(sink::Column::SinkId)
301 .column(sink::Column::Properties)
302 .column(sink::Column::SinkFormatDesc)
303 .into_tuple()
304 .all(&inner.db)
305 .await?;
306 drop(inner);
307
308 let get_connector_from_property = |property: &Property| -> String {
309 property
310 .0
311 .get(UPSTREAM_SOURCE_KEY)
312 .map(|v| v.to_string())
313 .unwrap_or_default()
314 };
315
316 let source_report: Vec<jsonbb::Value> = source_props_and_info
317 .iter()
318 .map(|(oid, property, info)| {
319 let connector_name = get_connector_from_property(property);
320 let mut format = None;
321 let mut encode = None;
322 if let Some(info) = info {
323 let pb_info = info.to_protobuf();
324 format = Some(pb_info.format().as_str_name());
325 encode = Some(pb_info.row_encode().as_str_name());
326 }
327 jsonbb::json!({
328 oid.to_string(): {
329 "connector": connector_name,
330 "format": format,
331 "encode": encode,
332 },
333 })
334 })
335 .collect_vec();
336
337 let sink_report: Vec<jsonbb::Value> = sink_props_and_info
338 .iter()
339 .map(|(oid, property, info)| {
340 let connector_name = get_connector_from_property(property);
341 let mut format = None;
342 let mut encode = None;
343 if let Some(info) = info {
344 let pb_info = info.to_protobuf();
345 format = Some(pb_info.format().as_str_name());
346 encode = Some(pb_info.encode().as_str_name());
347 }
348 jsonbb::json!({
349 oid.to_string(): {
350 "connector": connector_name,
351 "format": format,
352 "encode": encode,
353 },
354 })
355 })
356 .collect_vec();
357
358 Ok(jsonbb::json!({
359 "source": source_report,
360 "sink": sink_report,
361 }))
362 }
363
364 pub async fn clean_dirty_subscription(
365 &self,
366 database_id: Option<DatabaseId>,
367 ) -> MetaResult<()> {
368 let inner = self.inner.write().await;
369 let txn = inner.db.begin().await?;
370 let filter_condition = object::Column::ObjType.eq(ObjectType::Subscription).and(
371 object::Column::Oid.not_in_subquery(
372 Query::select()
373 .column(subscription::Column::SubscriptionId)
374 .from(Subscription)
375 .and_where(
376 subscription::Column::SubscriptionState
377 .eq(SubscriptionState::Created as i32),
378 )
379 .take(),
380 ),
381 );
382
383 let filter_condition = if let Some(database_id) = database_id {
384 filter_condition.and(object::Column::DatabaseId.eq(database_id))
385 } else {
386 filter_condition
387 };
388 Object::delete_many()
389 .filter(filter_condition)
390 .exec(&txn)
391 .await?;
392 txn.commit().await?;
393 Ok(())
395 }
396
397 pub async fn clean_dirty_creating_jobs(
399 &self,
400 database_id: Option<DatabaseId>,
401 ) -> MetaResult<Vec<SourceId>> {
402 let inner = self.inner.write().await;
403 let txn = inner.db.begin().await?;
404
405 let filter_condition = streaming_job::Column::JobStatus.eq(JobStatus::Initial).or(
406 streaming_job::Column::JobStatus
407 .eq(JobStatus::Creating)
408 .and(streaming_job::Column::CreateType.eq(CreateType::Foreground)),
409 );
410
411 let filter_condition = if let Some(database_id) = database_id {
412 filter_condition.and(object::Column::DatabaseId.eq(database_id))
413 } else {
414 filter_condition
415 };
416
417 let dirty_job_objs: Vec<PartialObject> = streaming_job::Entity::find()
418 .select_only()
419 .column(streaming_job::Column::JobId)
420 .columns([
421 object::Column::Oid,
422 object::Column::ObjType,
423 object::Column::SchemaId,
424 object::Column::DatabaseId,
425 ])
426 .join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
427 .filter(filter_condition)
428 .into_partial_model()
429 .all(&txn)
430 .await?;
431
432 let updated_table_ids = Self::clean_dirty_sink_downstreams(&txn).await?;
433 let updated_table_objs = if !updated_table_ids.is_empty() {
434 Table::find()
435 .find_also_related(Object)
436 .filter(table::Column::TableId.is_in(updated_table_ids))
437 .all(&txn)
438 .await?
439 } else {
440 vec![]
441 };
442
443 if dirty_job_objs.is_empty() {
444 if !updated_table_objs.is_empty() {
445 txn.commit().await?;
446
447 self.notify_frontend(
449 NotificationOperation::Update,
450 NotificationInfo::ObjectGroup(PbObjectGroup {
451 objects: updated_table_objs
452 .into_iter()
453 .map(|(t, obj)| PbObject {
454 object_info: PbObjectInfo::Table(
455 ObjectModel(t, obj.unwrap()).into(),
456 )
457 .into(),
458 })
459 .collect(),
460 }),
461 )
462 .await;
463 }
464
465 return Ok(vec![]);
466 }
467
468 self.log_cleaned_dirty_jobs(&dirty_job_objs, &txn).await?;
469
470 let dirty_job_ids = dirty_job_objs.iter().map(|obj| obj.oid).collect::<Vec<_>>();
471
472 let all_dirty_table_ids = dirty_job_objs
475 .iter()
476 .filter(|obj| obj.obj_type == ObjectType::Table)
477 .map(|obj| obj.oid)
478 .collect_vec();
479 let dirty_table_type_map: HashMap<ObjectId, TableType> = Table::find()
480 .select_only()
481 .column(table::Column::TableId)
482 .column(table::Column::TableType)
483 .filter(table::Column::TableId.is_in(all_dirty_table_ids))
484 .into_tuple::<(ObjectId, TableType)>()
485 .all(&txn)
486 .await?
487 .into_iter()
488 .collect();
489
490 let dirty_mview_objs = dirty_job_objs
492 .into_iter()
493 .filter(|obj| {
494 matches!(
495 dirty_table_type_map.get(&obj.oid),
496 Some(TableType::MaterializedView)
497 )
498 })
499 .collect_vec();
500
501 let dirty_associated_source_ids: Vec<SourceId> = Table::find()
504 .select_only()
505 .column(table::Column::OptionalAssociatedSourceId)
506 .filter(
507 table::Column::TableId
508 .is_in(dirty_job_ids.clone())
509 .and(table::Column::OptionalAssociatedSourceId.is_not_null()),
510 )
511 .into_tuple()
512 .all(&txn)
513 .await?;
514
515 let dirty_state_table_ids: Vec<TableId> = Table::find()
516 .select_only()
517 .column(table::Column::TableId)
518 .filter(table::Column::BelongsToJobId.is_in(dirty_job_ids.clone()))
519 .into_tuple()
520 .all(&txn)
521 .await?;
522
523 let dirty_mview_internal_table_objs = Object::find()
524 .select_only()
525 .columns([
526 object::Column::Oid,
527 object::Column::ObjType,
528 object::Column::SchemaId,
529 object::Column::DatabaseId,
530 ])
531 .join(JoinType::InnerJoin, object::Relation::Table.def())
532 .filter(table::Column::BelongsToJobId.is_in(dirty_mview_objs.iter().map(|obj| obj.oid)))
533 .into_partial_model()
534 .all(&txn)
535 .await?;
536
537 let to_delete_objs: HashSet<ObjectId> = dirty_job_ids
538 .clone()
539 .into_iter()
540 .chain(dirty_state_table_ids.into_iter())
541 .chain(dirty_associated_source_ids.clone().into_iter())
542 .collect();
543
544 let res = Object::delete_many()
545 .filter(object::Column::Oid.is_in(to_delete_objs))
546 .exec(&txn)
547 .await?;
548 assert!(res.rows_affected > 0);
549
550 txn.commit().await?;
551
552 let object_group = build_object_group_for_delete(
553 dirty_mview_objs
554 .into_iter()
555 .chain(dirty_mview_internal_table_objs.into_iter())
556 .collect_vec(),
557 );
558
559 let _version = self
560 .notify_frontend(NotificationOperation::Delete, object_group)
561 .await;
562
563 if !updated_table_objs.is_empty() {
565 self.notify_frontend(
566 NotificationOperation::Update,
567 NotificationInfo::ObjectGroup(PbObjectGroup {
568 objects: updated_table_objs
569 .into_iter()
570 .map(|(t, obj)| PbObject {
571 object_info: PbObjectInfo::Table(ObjectModel(t, obj.unwrap()).into())
572 .into(),
573 })
574 .collect(),
575 }),
576 )
577 .await;
578 }
579
580 Ok(dirty_associated_source_ids)
581 }
582
583 pub async fn comment_on(&self, comment: PbComment) -> MetaResult<NotificationVersion> {
584 let inner = self.inner.write().await;
585 let txn = inner.db.begin().await?;
586 ensure_object_id(ObjectType::Database, comment.database_id as _, &txn).await?;
587 ensure_object_id(ObjectType::Schema, comment.schema_id as _, &txn).await?;
588 let table_obj = Object::find_by_id(comment.table_id as ObjectId)
589 .one(&txn)
590 .await?
591 .ok_or_else(|| MetaError::catalog_id_not_found("table", comment.table_id))?;
592
593 let table = if let Some(col_idx) = comment.column_index {
594 let columns: ColumnCatalogArray = Table::find_by_id(comment.table_id as TableId)
595 .select_only()
596 .column(table::Column::Columns)
597 .into_tuple()
598 .one(&txn)
599 .await?
600 .ok_or_else(|| MetaError::catalog_id_not_found("table", comment.table_id))?;
601 let mut pb_columns = columns.to_protobuf();
602
603 let column = pb_columns
604 .get_mut(col_idx as usize)
605 .ok_or_else(|| MetaError::catalog_id_not_found("column", col_idx))?;
606 let column_desc = column.column_desc.as_mut().ok_or_else(|| {
607 anyhow!(
608 "column desc at index {} for table id {} not found",
609 col_idx,
610 comment.table_id
611 )
612 })?;
613 column_desc.description = comment.description;
614 table::ActiveModel {
615 table_id: Set(comment.table_id as _),
616 columns: Set(pb_columns.into()),
617 ..Default::default()
618 }
619 .update(&txn)
620 .await?
621 } else {
622 table::ActiveModel {
623 table_id: Set(comment.table_id as _),
624 description: Set(comment.description),
625 ..Default::default()
626 }
627 .update(&txn)
628 .await?
629 };
630 txn.commit().await?;
631
632 let version = self
633 .notify_frontend_relation_info(
634 NotificationOperation::Update,
635 PbObjectInfo::Table(ObjectModel(table, table_obj).into()),
636 )
637 .await;
638
639 Ok(version)
640 }
641
642 pub async fn complete_dropped_tables(
643 &self,
644 table_ids: impl Iterator<Item = TableId>,
645 ) -> Vec<PbTable> {
646 let mut inner = self.inner.write().await;
647 inner.complete_dropped_tables(table_ids)
648 }
649}
650
651pub struct CatalogStats {
653 pub table_num: u64,
654 pub mview_num: u64,
655 pub index_num: u64,
656 pub source_num: u64,
657 pub sink_num: u64,
658 pub function_num: u64,
659 pub streaming_job_num: u64,
660 pub actor_num: u64,
661}
662
663impl CatalogControllerInner {
664 pub async fn snapshot(&self) -> MetaResult<(Catalog, Vec<PbUserInfo>)> {
665 let databases = self.list_databases().await?;
666 let schemas = self.list_schemas().await?;
667 let tables = self.list_tables().await?;
668 let sources = self.list_sources().await?;
669 let sinks = self.list_sinks().await?;
670 let subscriptions = self.list_subscriptions().await?;
671 let indexes = self.list_indexes().await?;
672 let views = self.list_views().await?;
673 let functions = self.list_functions().await?;
674 let connections = self.list_connections().await?;
675 let secrets = self.list_secrets().await?;
676
677 let users = self.list_users().await?;
678
679 Ok((
680 (
681 databases,
682 schemas,
683 tables,
684 sources,
685 sinks,
686 subscriptions,
687 indexes,
688 views,
689 functions,
690 connections,
691 secrets,
692 ),
693 users,
694 ))
695 }
696
697 pub async fn stats(&self) -> MetaResult<CatalogStats> {
698 let mut table_num_map: HashMap<_, _> = Table::find()
699 .select_only()
700 .column(table::Column::TableType)
701 .column_as(table::Column::TableId.count(), "num")
702 .group_by(table::Column::TableType)
703 .having(table::Column::TableType.ne(TableType::Internal))
704 .into_tuple::<(TableType, i64)>()
705 .all(&self.db)
706 .await?
707 .into_iter()
708 .map(|(table_type, num)| (table_type, num as u64))
709 .collect();
710
711 let source_num = Source::find().count(&self.db).await?;
712 let sink_num = Sink::find().count(&self.db).await?;
713 let function_num = Function::find().count(&self.db).await?;
714 let streaming_job_num = StreamingJob::find().count(&self.db).await?;
715 let actor_num = Actor::find().count(&self.db).await?;
716
717 Ok(CatalogStats {
718 table_num: table_num_map.remove(&TableType::Table).unwrap_or(0),
719 mview_num: table_num_map
720 .remove(&TableType::MaterializedView)
721 .unwrap_or(0),
722 index_num: table_num_map.remove(&TableType::Index).unwrap_or(0),
723 source_num,
724 sink_num,
725 function_num,
726 streaming_job_num,
727 actor_num,
728 })
729 }
730
731 async fn list_databases(&self) -> MetaResult<Vec<PbDatabase>> {
732 let db_objs = Database::find()
733 .find_also_related(Object)
734 .all(&self.db)
735 .await?;
736 Ok(db_objs
737 .into_iter()
738 .map(|(db, obj)| ObjectModel(db, obj.unwrap()).into())
739 .collect())
740 }
741
742 async fn list_schemas(&self) -> MetaResult<Vec<PbSchema>> {
743 let schema_objs = Schema::find()
744 .find_also_related(Object)
745 .all(&self.db)
746 .await?;
747
748 Ok(schema_objs
749 .into_iter()
750 .map(|(schema, obj)| ObjectModel(schema, obj.unwrap()).into())
751 .collect())
752 }
753
754 async fn list_users(&self) -> MetaResult<Vec<PbUserInfo>> {
755 let mut user_infos: Vec<PbUserInfo> = User::find()
756 .all(&self.db)
757 .await?
758 .into_iter()
759 .map(Into::into)
760 .collect();
761
762 for user_info in &mut user_infos {
763 user_info.grant_privileges = get_user_privilege(user_info.id as _, &self.db).await?;
764 }
765 Ok(user_infos)
766 }
767
768 pub async fn list_all_state_tables(&self) -> MetaResult<Vec<PbTable>> {
770 let table_objs = Table::find()
771 .find_also_related(Object)
772 .all(&self.db)
773 .await?;
774
775 Ok(table_objs
776 .into_iter()
777 .map(|(table, obj)| ObjectModel(table, obj.unwrap()).into())
778 .collect())
779 }
780
781 async fn list_tables(&self) -> MetaResult<Vec<PbTable>> {
783 let table_objs = Table::find()
784 .find_also_related(Object)
785 .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
786 .filter(
787 streaming_job::Column::JobStatus
788 .eq(JobStatus::Created)
789 .or(table::Column::TableType.eq(TableType::MaterializedView)),
790 )
791 .all(&self.db)
792 .await?;
793
794 let created_streaming_job_ids: Vec<ObjectId> = StreamingJob::find()
795 .select_only()
796 .column(streaming_job::Column::JobId)
797 .filter(streaming_job::Column::JobStatus.eq(JobStatus::Created))
798 .into_tuple()
799 .all(&self.db)
800 .await?;
801
802 let job_ids: HashSet<ObjectId> = table_objs
803 .iter()
804 .map(|(t, _)| t.table_id)
805 .chain(created_streaming_job_ids.iter().cloned())
806 .collect();
807
808 let internal_table_objs = Table::find()
809 .find_also_related(Object)
810 .filter(
811 table::Column::TableType
812 .eq(TableType::Internal)
813 .and(table::Column::BelongsToJobId.is_in(job_ids)),
814 )
815 .all(&self.db)
816 .await?;
817
818 Ok(table_objs
819 .into_iter()
820 .chain(internal_table_objs.into_iter())
821 .map(|(table, obj)| {
822 let is_created = created_streaming_job_ids.contains(&table.table_id)
824 || (table.table_type == TableType::Internal
825 && created_streaming_job_ids.contains(&table.belongs_to_job_id.unwrap()));
826 let mut pb_table: PbTable = ObjectModel(table, obj.unwrap()).into();
827 pb_table.stream_job_status = if is_created {
828 PbStreamJobStatus::Created.into()
829 } else {
830 PbStreamJobStatus::Creating.into()
831 };
832 pb_table
833 })
834 .collect())
835 }
836
837 async fn list_sources(&self) -> MetaResult<Vec<PbSource>> {
839 let mut source_objs = Source::find()
840 .find_also_related(Object)
841 .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
842 .filter(
843 streaming_job::Column::JobStatus
844 .is_null()
845 .or(streaming_job::Column::JobStatus.eq(JobStatus::Created)),
846 )
847 .all(&self.db)
848 .await?;
849
850 let created_table_ids: HashSet<TableId> = Table::find()
852 .select_only()
853 .column(table::Column::TableId)
854 .join(JoinType::InnerJoin, table::Relation::Object1.def())
855 .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
856 .filter(
857 table::Column::OptionalAssociatedSourceId
858 .is_not_null()
859 .and(streaming_job::Column::JobStatus.eq(JobStatus::Created)),
860 )
861 .into_tuple()
862 .all(&self.db)
863 .await?
864 .into_iter()
865 .collect();
866 source_objs.retain_mut(|(source, _)| {
867 source.optional_associated_table_id.is_none()
868 || created_table_ids.contains(&source.optional_associated_table_id.unwrap())
869 });
870
871 Ok(source_objs
872 .into_iter()
873 .map(|(source, obj)| ObjectModel(source, obj.unwrap()).into())
874 .collect())
875 }
876
877 async fn list_sinks(&self) -> MetaResult<Vec<PbSink>> {
879 let sink_objs = Sink::find()
880 .find_also_related(Object)
881 .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
882 .filter(streaming_job::Column::JobStatus.eq(JobStatus::Created))
883 .all(&self.db)
884 .await?;
885
886 Ok(sink_objs
887 .into_iter()
888 .map(|(sink, obj)| ObjectModel(sink, obj.unwrap()).into())
889 .collect())
890 }
891
892 async fn list_subscriptions(&self) -> MetaResult<Vec<PbSubscription>> {
894 let subscription_objs = Subscription::find()
895 .find_also_related(Object)
896 .filter(subscription::Column::SubscriptionState.eq(SubscriptionState::Created as i32))
897 .all(&self.db)
898 .await?;
899
900 Ok(subscription_objs
901 .into_iter()
902 .map(|(subscription, obj)| ObjectModel(subscription, obj.unwrap()).into())
903 .collect())
904 }
905
906 async fn list_views(&self) -> MetaResult<Vec<PbView>> {
907 let view_objs = View::find().find_also_related(Object).all(&self.db).await?;
908
909 Ok(view_objs
910 .into_iter()
911 .map(|(view, obj)| ObjectModel(view, obj.unwrap()).into())
912 .collect())
913 }
914
915 async fn list_indexes(&self) -> MetaResult<Vec<PbIndex>> {
917 let index_objs = Index::find()
918 .find_also_related(Object)
919 .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
920 .filter(streaming_job::Column::JobStatus.eq(JobStatus::Created))
921 .all(&self.db)
922 .await?;
923
924 Ok(index_objs
925 .into_iter()
926 .map(|(index, obj)| ObjectModel(index, obj.unwrap()).into())
927 .collect())
928 }
929
930 async fn list_connections(&self) -> MetaResult<Vec<PbConnection>> {
931 let conn_objs = Connection::find()
932 .find_also_related(Object)
933 .all(&self.db)
934 .await?;
935
936 Ok(conn_objs
937 .into_iter()
938 .map(|(conn, obj)| ObjectModel(conn, obj.unwrap()).into())
939 .collect())
940 }
941
942 pub async fn list_secrets(&self) -> MetaResult<Vec<PbSecret>> {
943 let secret_objs = Secret::find()
944 .find_also_related(Object)
945 .all(&self.db)
946 .await?;
947 Ok(secret_objs
948 .into_iter()
949 .map(|(secret, obj)| ObjectModel(secret, obj.unwrap()).into())
950 .collect())
951 }
952
953 async fn list_functions(&self) -> MetaResult<Vec<PbFunction>> {
954 let func_objs = Function::find()
955 .find_also_related(Object)
956 .all(&self.db)
957 .await?;
958
959 Ok(func_objs
960 .into_iter()
961 .map(|(func, obj)| ObjectModel(func, obj.unwrap()).into())
962 .collect())
963 }
964
965 pub(crate) fn register_finish_notifier(
966 &mut self,
967 database_id: DatabaseId,
968 id: ObjectId,
969 sender: Sender<Result<NotificationVersion, String>>,
970 ) {
971 self.creating_table_finish_notifier
972 .entry(database_id)
973 .or_default()
974 .entry(id)
975 .or_default()
976 .push(sender);
977 }
978
979 pub(crate) async fn streaming_job_is_finished(&mut self, id: i32) -> MetaResult<bool> {
980 let status = StreamingJob::find()
981 .select_only()
982 .column(streaming_job::Column::JobStatus)
983 .filter(streaming_job::Column::JobId.eq(id))
984 .into_tuple::<JobStatus>()
985 .one(&self.db)
986 .await?;
987
988 status
989 .map(|status| status == JobStatus::Created)
990 .ok_or_else(|| {
991 MetaError::catalog_id_not_found("streaming job", "may have been cancelled/dropped")
992 })
993 }
994
995 pub(crate) fn notify_finish_failed(&mut self, database_id: Option<DatabaseId>, err: String) {
996 if let Some(database_id) = database_id {
997 if let Some(creating_tables) = self.creating_table_finish_notifier.remove(&database_id)
998 {
999 for tx in creating_tables.into_values().flatten() {
1000 let _ = tx.send(Err(err.clone()));
1001 }
1002 }
1003 } else {
1004 for tx in take(&mut self.creating_table_finish_notifier)
1005 .into_values()
1006 .flatten()
1007 .flat_map(|(_, txs)| txs.into_iter())
1008 {
1009 let _ = tx.send(Err(err.clone()));
1010 }
1011 }
1012 }
1013
1014 pub async fn list_time_travel_table_ids(&self) -> MetaResult<Vec<TableId>> {
1015 let table_ids: Vec<TableId> = Table::find()
1016 .select_only()
1017 .filter(table::Column::TableType.is_in(vec![
1018 TableType::Table,
1019 TableType::MaterializedView,
1020 TableType::Index,
1021 ]))
1022 .column(table::Column::TableId)
1023 .into_tuple()
1024 .all(&self.db)
1025 .await?;
1026 Ok(table_ids)
1027 }
1028
1029 pub(crate) fn complete_dropped_tables(
1032 &mut self,
1033 table_ids: impl Iterator<Item = TableId>,
1034 ) -> Vec<PbTable> {
1035 table_ids
1036 .filter_map(|table_id| {
1037 self.dropped_tables.remove(&table_id).map_or_else(
1038 || {
1039 tracing::warn!(table_id, "table not found");
1040 None
1041 },
1042 Some,
1043 )
1044 })
1045 .collect()
1046 }
1047}