1mod alter_op;
16mod create_op;
17mod drop_op;
18mod get_op;
19mod list_op;
20mod test;
21mod util;
22
23use std::collections::{BTreeSet, HashMap, HashSet};
24use std::iter;
25use std::mem::take;
26use std::sync::Arc;
27
28use anyhow::anyhow;
29use itertools::Itertools;
30use risingwave_common::catalog::{DEFAULT_SCHEMA_NAME, SYSTEM_SCHEMAS, TableOption};
31use risingwave_common::current_cluster_version;
32use risingwave_common::secret::LocalSecretManager;
33use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont_mut;
34use risingwave_connector::source::UPSTREAM_SOURCE_KEY;
35use risingwave_connector::source::cdc::build_cdc_table_id;
36use risingwave_meta_model::object::ObjectType;
37use risingwave_meta_model::prelude::*;
38use risingwave_meta_model::table::TableType;
39use risingwave_meta_model::{
40 ActorId, ColumnCatalogArray, ConnectionId, CreateType, DatabaseId, FragmentId, I32Array,
41 IndexId, JobStatus, ObjectId, Property, SchemaId, SecretId, SinkFormatDesc, SinkId, SourceId,
42 StreamNode, StreamSourceInfo, StreamingParallelism, SubscriptionId, TableId, UserId, ViewId,
43 connection, database, fragment, function, index, object, object_dependency, schema, secret,
44 sink, source, streaming_job, subscription, table, user_privilege, view,
45};
46use risingwave_pb::catalog::connection::Info as ConnectionInfo;
47use risingwave_pb::catalog::subscription::SubscriptionState;
48use risingwave_pb::catalog::table::PbTableType;
49use risingwave_pb::catalog::{
50 PbComment, PbConnection, PbDatabase, PbFunction, PbIndex, PbSchema, PbSecret, PbSink, PbSource,
51 PbStreamJobStatus, PbSubscription, PbTable, PbView,
52};
53use risingwave_pb::meta::cancel_creating_jobs_request::PbCreatingJobInfo;
54use risingwave_pb::meta::list_object_dependencies_response::PbObjectDependencies;
55use risingwave_pb::meta::object::PbObjectInfo;
56use risingwave_pb::meta::subscribe_response::{
57 Info as NotificationInfo, Info, Operation as NotificationOperation, Operation,
58};
59use risingwave_pb::meta::{PbFragmentWorkerSlotMapping, PbObject, PbObjectGroup};
60use risingwave_pb::stream_plan::FragmentTypeFlag;
61use risingwave_pb::stream_plan::stream_node::NodeBody;
62use risingwave_pb::telemetry::PbTelemetryEventStage;
63use risingwave_pb::user::PbUserInfo;
64use sea_orm::ActiveValue::Set;
65use sea_orm::sea_query::{Expr, Query, SimpleExpr};
66use sea_orm::{
67 ActiveModelTrait, ColumnTrait, DatabaseConnection, DatabaseTransaction, EntityTrait,
68 IntoActiveModel, JoinType, PaginatorTrait, QueryFilter, QuerySelect, RelationTrait,
69 SelectColumns, TransactionTrait, Value,
70};
71use tokio::sync::oneshot::Sender;
72use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
73use tracing::info;
74
75use super::utils::{
76 check_subscription_name_duplicate, get_internal_tables_by_id, rename_relation,
77 rename_relation_refer,
78};
79use crate::controller::ObjectModel;
80use crate::controller::catalog::util::update_internal_tables;
81use crate::controller::utils::*;
82use crate::manager::{
83 IGNORED_NOTIFICATION_VERSION, MetaSrvEnv, NotificationVersion,
84 get_referred_connection_ids_from_source, get_referred_secret_ids_from_source,
85};
86use crate::rpc::ddl_controller::DropMode;
87use crate::telemetry::{MetaTelemetryJobDesc, report_event};
88use crate::{MetaError, MetaResult};
89
90pub type Catalog = (
91 Vec<PbDatabase>,
92 Vec<PbSchema>,
93 Vec<PbTable>,
94 Vec<PbSource>,
95 Vec<PbSink>,
96 Vec<PbSubscription>,
97 Vec<PbIndex>,
98 Vec<PbView>,
99 Vec<PbFunction>,
100 Vec<PbConnection>,
101 Vec<PbSecret>,
102);
103
104pub type CatalogControllerRef = Arc<CatalogController>;
105
106pub struct CatalogController {
108 pub(crate) env: MetaSrvEnv,
109 pub(crate) inner: RwLock<CatalogControllerInner>,
110}
111
112#[derive(Clone, Default, Debug)]
113pub struct DropTableConnectorContext {
114 pub(crate) to_change_streaming_job_id: ObjectId,
116 pub(crate) to_remove_state_table_id: TableId,
117 pub(crate) to_remove_source_id: SourceId,
118}
119
120#[derive(Clone, Default, Debug)]
121pub struct ReleaseContext {
122 pub(crate) database_id: DatabaseId,
123 pub(crate) removed_streaming_job_ids: Vec<ObjectId>,
124 pub(crate) removed_state_table_ids: Vec<TableId>,
126
127 pub(crate) removed_secret_ids: Vec<SecretId>,
129 pub(crate) removed_source_ids: Vec<SourceId>,
131 pub(crate) removed_source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
134
135 pub(crate) removed_actors: HashSet<ActorId>,
136 pub(crate) removed_fragments: HashSet<FragmentId>,
137}
138
139impl CatalogController {
140 pub async fn new(env: MetaSrvEnv) -> MetaResult<Self> {
141 let meta_store = env.meta_store();
142 let catalog_controller = Self {
143 env,
144 inner: RwLock::new(CatalogControllerInner {
145 db: meta_store.conn,
146 creating_table_finish_notifier: HashMap::new(),
147 dropped_tables: HashMap::new(),
148 }),
149 };
150
151 catalog_controller.init().await?;
152 Ok(catalog_controller)
153 }
154
155 pub async fn get_inner_read_guard(&self) -> RwLockReadGuard<'_, CatalogControllerInner> {
158 self.inner.read().await
159 }
160
161 pub async fn get_inner_write_guard(&self) -> RwLockWriteGuard<'_, CatalogControllerInner> {
162 self.inner.write().await
163 }
164}
165
166pub struct CatalogControllerInner {
167 pub(crate) db: DatabaseConnection,
168 #[expect(clippy::type_complexity)]
173 pub creating_table_finish_notifier:
174 HashMap<DatabaseId, HashMap<ObjectId, Vec<Sender<Result<NotificationVersion, String>>>>>,
175 pub dropped_tables: HashMap<TableId, PbTable>,
177}
178
179impl CatalogController {
180 pub(crate) async fn notify_frontend(
181 &self,
182 operation: NotificationOperation,
183 info: NotificationInfo,
184 ) -> NotificationVersion {
185 self.env
186 .notification_manager()
187 .notify_frontend(operation, info)
188 .await
189 }
190
191 pub(crate) async fn notify_frontend_relation_info(
192 &self,
193 operation: NotificationOperation,
194 relation_info: PbObjectInfo,
195 ) -> NotificationVersion {
196 self.env
197 .notification_manager()
198 .notify_frontend_object_info(operation, relation_info)
199 .await
200 }
201
202 pub(crate) async fn current_notification_version(&self) -> NotificationVersion {
203 self.env.notification_manager().current_version().await
204 }
205}
206
207impl CatalogController {
208 pub async fn finish_create_subscription_catalog(&self, subscription_id: u32) -> MetaResult<()> {
209 let inner = self.inner.write().await;
210 let txn = inner.db.begin().await?;
211 let job_id = subscription_id as i32;
212
213 let res = Object::update_many()
215 .col_expr(object::Column::CreatedAt, Expr::current_timestamp().into())
216 .col_expr(
217 object::Column::CreatedAtClusterVersion,
218 current_cluster_version().into(),
219 )
220 .filter(object::Column::Oid.eq(job_id))
221 .exec(&txn)
222 .await?;
223 if res.rows_affected == 0 {
224 return Err(MetaError::catalog_id_not_found("subscription", job_id));
225 }
226
227 let job = subscription::ActiveModel {
229 subscription_id: Set(job_id),
230 subscription_state: Set(SubscriptionState::Created.into()),
231 ..Default::default()
232 };
233 job.update(&txn).await?;
234 txn.commit().await?;
235
236 Ok(())
237 }
238
239 pub async fn notify_create_subscription(
240 &self,
241 subscription_id: u32,
242 ) -> MetaResult<NotificationVersion> {
243 let inner = self.inner.read().await;
244 let job_id = subscription_id as i32;
245 let (subscription, obj) = Subscription::find_by_id(job_id)
246 .find_also_related(Object)
247 .one(&inner.db)
248 .await?
249 .ok_or_else(|| MetaError::catalog_id_not_found("subscription", job_id))?;
250
251 let version = self
252 .notify_frontend(
253 Operation::Add,
254 Info::ObjectGroup(PbObjectGroup {
255 objects: vec![PbObject {
256 object_info: PbObjectInfo::Subscription(
257 ObjectModel(subscription, obj.unwrap()).into(),
258 )
259 .into(),
260 }],
261 }),
262 )
263 .await;
264 Ok(version)
265 }
266
267 pub async fn get_connector_usage(&self) -> MetaResult<jsonbb::Value> {
269 let inner = self.inner.read().await;
289 let source_props_and_info: Vec<(i32, Property, Option<StreamSourceInfo>)> = Source::find()
290 .select_only()
291 .column(source::Column::SourceId)
292 .column(source::Column::WithProperties)
293 .column(source::Column::SourceInfo)
294 .into_tuple()
295 .all(&inner.db)
296 .await?;
297 let sink_props_and_info: Vec<(i32, Property, Option<SinkFormatDesc>)> = Sink::find()
298 .select_only()
299 .column(sink::Column::SinkId)
300 .column(sink::Column::Properties)
301 .column(sink::Column::SinkFormatDesc)
302 .into_tuple()
303 .all(&inner.db)
304 .await?;
305 drop(inner);
306
307 let get_connector_from_property = |property: &Property| -> String {
308 property
309 .0
310 .get(UPSTREAM_SOURCE_KEY)
311 .map(|v| v.to_string())
312 .unwrap_or_default()
313 };
314
315 let source_report: Vec<jsonbb::Value> = source_props_and_info
316 .iter()
317 .map(|(oid, property, info)| {
318 let connector_name = get_connector_from_property(property);
319 let mut format = None;
320 let mut encode = None;
321 if let Some(info) = info {
322 let pb_info = info.to_protobuf();
323 format = Some(pb_info.format().as_str_name());
324 encode = Some(pb_info.row_encode().as_str_name());
325 }
326 jsonbb::json!({
327 oid.to_string(): {
328 "connector": connector_name,
329 "format": format,
330 "encode": encode,
331 },
332 })
333 })
334 .collect_vec();
335
336 let sink_report: Vec<jsonbb::Value> = sink_props_and_info
337 .iter()
338 .map(|(oid, property, info)| {
339 let connector_name = get_connector_from_property(property);
340 let mut format = None;
341 let mut encode = None;
342 if let Some(info) = info {
343 let pb_info = info.to_protobuf();
344 format = Some(pb_info.format().as_str_name());
345 encode = Some(pb_info.encode().as_str_name());
346 }
347 jsonbb::json!({
348 oid.to_string(): {
349 "connector": connector_name,
350 "format": format,
351 "encode": encode,
352 },
353 })
354 })
355 .collect_vec();
356
357 Ok(jsonbb::json!({
358 "source": source_report,
359 "sink": sink_report,
360 }))
361 }
362
363 pub async fn clean_dirty_subscription(
364 &self,
365 database_id: Option<DatabaseId>,
366 ) -> MetaResult<()> {
367 let inner = self.inner.write().await;
368 let txn = inner.db.begin().await?;
369 let filter_condition = object::Column::ObjType.eq(ObjectType::Subscription).and(
370 object::Column::Oid.not_in_subquery(
371 Query::select()
372 .column(subscription::Column::SubscriptionId)
373 .from(Subscription)
374 .and_where(
375 subscription::Column::SubscriptionState
376 .eq(SubscriptionState::Created as i32),
377 )
378 .take(),
379 ),
380 );
381
382 let filter_condition = if let Some(database_id) = database_id {
383 filter_condition.and(object::Column::DatabaseId.eq(database_id))
384 } else {
385 filter_condition
386 };
387
388 Object::delete_many()
389 .filter(filter_condition)
390 .exec(&txn)
391 .await?;
392 txn.commit().await?;
393 Ok(())
394 }
395
396 pub async fn clean_dirty_creating_jobs(
398 &self,
399 database_id: Option<DatabaseId>,
400 ) -> MetaResult<Vec<SourceId>> {
401 let inner = self.inner.write().await;
402 let txn = inner.db.begin().await?;
403
404 let filter_condition = streaming_job::Column::JobStatus.eq(JobStatus::Initial).or(
405 streaming_job::Column::JobStatus
406 .eq(JobStatus::Creating)
407 .and(streaming_job::Column::CreateType.eq(CreateType::Foreground)),
408 );
409
410 let filter_condition = if let Some(database_id) = database_id {
411 filter_condition.and(object::Column::DatabaseId.eq(database_id))
412 } else {
413 filter_condition
414 };
415
416 let dirty_job_objs: Vec<PartialObject> = streaming_job::Entity::find()
417 .select_only()
418 .column(streaming_job::Column::JobId)
419 .columns([
420 object::Column::Oid,
421 object::Column::ObjType,
422 object::Column::SchemaId,
423 object::Column::DatabaseId,
424 ])
425 .join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
426 .filter(filter_condition)
427 .into_partial_model()
428 .all(&txn)
429 .await?;
430
431 let changed = Self::clean_dirty_sink_downstreams(&txn).await?;
432
433 if dirty_job_objs.is_empty() {
434 if changed {
435 txn.commit().await?;
436 }
437
438 return Ok(vec![]);
439 }
440
441 self.log_cleaned_dirty_jobs(&dirty_job_objs, &txn).await?;
442
443 let dirty_job_ids = dirty_job_objs.iter().map(|obj| obj.oid).collect::<Vec<_>>();
444
445 let all_dirty_table_ids = dirty_job_objs
448 .iter()
449 .filter(|obj| obj.obj_type == ObjectType::Table)
450 .map(|obj| obj.oid)
451 .collect_vec();
452 let dirty_table_type_map: HashMap<ObjectId, TableType> = Table::find()
453 .select_only()
454 .column(table::Column::TableId)
455 .column(table::Column::TableType)
456 .filter(table::Column::TableId.is_in(all_dirty_table_ids))
457 .into_tuple::<(ObjectId, TableType)>()
458 .all(&txn)
459 .await?
460 .into_iter()
461 .collect();
462
463 let dirty_mview_objs = dirty_job_objs
465 .into_iter()
466 .filter(|obj| {
467 matches!(
468 dirty_table_type_map.get(&obj.oid),
469 Some(TableType::MaterializedView)
470 )
471 })
472 .collect_vec();
473
474 let dirty_associated_source_ids: Vec<SourceId> = Table::find()
477 .select_only()
478 .column(table::Column::OptionalAssociatedSourceId)
479 .filter(
480 table::Column::TableId
481 .is_in(dirty_job_ids.clone())
482 .and(table::Column::OptionalAssociatedSourceId.is_not_null()),
483 )
484 .into_tuple()
485 .all(&txn)
486 .await?;
487
488 let dirty_state_table_ids: Vec<TableId> = Table::find()
489 .select_only()
490 .column(table::Column::TableId)
491 .filter(table::Column::BelongsToJobId.is_in(dirty_job_ids.clone()))
492 .into_tuple()
493 .all(&txn)
494 .await?;
495
496 let dirty_mview_internal_table_objs = Object::find()
497 .select_only()
498 .columns([
499 object::Column::Oid,
500 object::Column::ObjType,
501 object::Column::SchemaId,
502 object::Column::DatabaseId,
503 ])
504 .join(JoinType::InnerJoin, object::Relation::Table.def())
505 .filter(table::Column::BelongsToJobId.is_in(dirty_mview_objs.iter().map(|obj| obj.oid)))
506 .into_partial_model()
507 .all(&txn)
508 .await?;
509
510 let to_delete_objs: HashSet<ObjectId> = dirty_job_ids
511 .clone()
512 .into_iter()
513 .chain(dirty_state_table_ids.into_iter())
514 .chain(dirty_associated_source_ids.clone().into_iter())
515 .collect();
516
517 let res = Object::delete_many()
518 .filter(object::Column::Oid.is_in(to_delete_objs))
519 .exec(&txn)
520 .await?;
521 assert!(res.rows_affected > 0);
522
523 txn.commit().await?;
524
525 let object_group = build_object_group_for_delete(
526 dirty_mview_objs
527 .into_iter()
528 .chain(dirty_mview_internal_table_objs.into_iter())
529 .collect_vec(),
530 );
531
532 let _version = self
533 .notify_frontend(NotificationOperation::Delete, object_group)
534 .await;
535
536 Ok(dirty_associated_source_ids)
537 }
538
539 pub async fn comment_on(&self, comment: PbComment) -> MetaResult<NotificationVersion> {
540 let inner = self.inner.write().await;
541 let txn = inner.db.begin().await?;
542 ensure_object_id(ObjectType::Database, comment.database_id as _, &txn).await?;
543 ensure_object_id(ObjectType::Schema, comment.schema_id as _, &txn).await?;
544 let table_obj = Object::find_by_id(comment.table_id as ObjectId)
545 .one(&txn)
546 .await?
547 .ok_or_else(|| MetaError::catalog_id_not_found("table", comment.table_id))?;
548
549 let table = if let Some(col_idx) = comment.column_index {
550 let columns: ColumnCatalogArray = Table::find_by_id(comment.table_id as TableId)
551 .select_only()
552 .column(table::Column::Columns)
553 .into_tuple()
554 .one(&txn)
555 .await?
556 .ok_or_else(|| MetaError::catalog_id_not_found("table", comment.table_id))?;
557 let mut pb_columns = columns.to_protobuf();
558
559 let column = pb_columns
560 .get_mut(col_idx as usize)
561 .ok_or_else(|| MetaError::catalog_id_not_found("column", col_idx))?;
562 let column_desc = column.column_desc.as_mut().ok_or_else(|| {
563 anyhow!(
564 "column desc at index {} for table id {} not found",
565 col_idx,
566 comment.table_id
567 )
568 })?;
569 column_desc.description = comment.description;
570 table::ActiveModel {
571 table_id: Set(comment.table_id as _),
572 columns: Set(pb_columns.into()),
573 ..Default::default()
574 }
575 .update(&txn)
576 .await?
577 } else {
578 table::ActiveModel {
579 table_id: Set(comment.table_id as _),
580 description: Set(comment.description),
581 ..Default::default()
582 }
583 .update(&txn)
584 .await?
585 };
586 txn.commit().await?;
587
588 let version = self
589 .notify_frontend_relation_info(
590 NotificationOperation::Update,
591 PbObjectInfo::Table(ObjectModel(table, table_obj).into()),
592 )
593 .await;
594
595 Ok(version)
596 }
597
598 pub async fn complete_dropped_tables(
599 &self,
600 table_ids: impl Iterator<Item = TableId>,
601 ) -> Vec<PbTable> {
602 let mut inner = self.inner.write().await;
603 inner.complete_dropped_tables(table_ids)
604 }
605}
606
607pub struct CatalogStats {
609 pub table_num: u64,
610 pub mview_num: u64,
611 pub index_num: u64,
612 pub source_num: u64,
613 pub sink_num: u64,
614 pub function_num: u64,
615 pub streaming_job_num: u64,
616 pub actor_num: u64,
617}
618
619impl CatalogControllerInner {
620 pub async fn snapshot(&self) -> MetaResult<(Catalog, Vec<PbUserInfo>)> {
621 let databases = self.list_databases().await?;
622 let schemas = self.list_schemas().await?;
623 let tables = self.list_tables().await?;
624 let sources = self.list_sources().await?;
625 let sinks = self.list_sinks().await?;
626 let subscriptions = self.list_subscriptions().await?;
627 let indexes = self.list_indexes().await?;
628 let views = self.list_views().await?;
629 let functions = self.list_functions().await?;
630 let connections = self.list_connections().await?;
631 let secrets = self.list_secrets().await?;
632
633 let users = self.list_users().await?;
634
635 Ok((
636 (
637 databases,
638 schemas,
639 tables,
640 sources,
641 sinks,
642 subscriptions,
643 indexes,
644 views,
645 functions,
646 connections,
647 secrets,
648 ),
649 users,
650 ))
651 }
652
653 pub async fn stats(&self) -> MetaResult<CatalogStats> {
654 let mut table_num_map: HashMap<_, _> = Table::find()
655 .select_only()
656 .column(table::Column::TableType)
657 .column_as(table::Column::TableId.count(), "num")
658 .group_by(table::Column::TableType)
659 .having(table::Column::TableType.ne(TableType::Internal))
660 .into_tuple::<(TableType, i64)>()
661 .all(&self.db)
662 .await?
663 .into_iter()
664 .map(|(table_type, num)| (table_type, num as u64))
665 .collect();
666
667 let source_num = Source::find().count(&self.db).await?;
668 let sink_num = Sink::find().count(&self.db).await?;
669 let function_num = Function::find().count(&self.db).await?;
670 let streaming_job_num = StreamingJob::find().count(&self.db).await?;
671 let actor_num = Actor::find().count(&self.db).await?;
672
673 Ok(CatalogStats {
674 table_num: table_num_map.remove(&TableType::Table).unwrap_or(0),
675 mview_num: table_num_map
676 .remove(&TableType::MaterializedView)
677 .unwrap_or(0),
678 index_num: table_num_map.remove(&TableType::Index).unwrap_or(0),
679 source_num,
680 sink_num,
681 function_num,
682 streaming_job_num,
683 actor_num,
684 })
685 }
686
687 async fn list_databases(&self) -> MetaResult<Vec<PbDatabase>> {
688 let db_objs = Database::find()
689 .find_also_related(Object)
690 .all(&self.db)
691 .await?;
692 Ok(db_objs
693 .into_iter()
694 .map(|(db, obj)| ObjectModel(db, obj.unwrap()).into())
695 .collect())
696 }
697
698 async fn list_schemas(&self) -> MetaResult<Vec<PbSchema>> {
699 let schema_objs = Schema::find()
700 .find_also_related(Object)
701 .all(&self.db)
702 .await?;
703
704 Ok(schema_objs
705 .into_iter()
706 .map(|(schema, obj)| ObjectModel(schema, obj.unwrap()).into())
707 .collect())
708 }
709
710 async fn list_users(&self) -> MetaResult<Vec<PbUserInfo>> {
711 let mut user_infos: Vec<PbUserInfo> = User::find()
712 .all(&self.db)
713 .await?
714 .into_iter()
715 .map(Into::into)
716 .collect();
717
718 for user_info in &mut user_infos {
719 user_info.grant_privileges = get_user_privilege(user_info.id as _, &self.db).await?;
720 }
721 Ok(user_infos)
722 }
723
724 pub async fn list_all_state_tables(&self) -> MetaResult<Vec<PbTable>> {
726 let table_objs = Table::find()
727 .find_also_related(Object)
728 .all(&self.db)
729 .await?;
730
731 Ok(table_objs
732 .into_iter()
733 .map(|(table, obj)| ObjectModel(table, obj.unwrap()).into())
734 .collect())
735 }
736
737 async fn list_tables(&self) -> MetaResult<Vec<PbTable>> {
739 let table_objs = Table::find()
740 .find_also_related(Object)
741 .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
742 .filter(
743 streaming_job::Column::JobStatus
744 .eq(JobStatus::Created)
745 .or(table::Column::TableType.eq(TableType::MaterializedView)),
746 )
747 .all(&self.db)
748 .await?;
749
750 let created_streaming_job_ids: Vec<ObjectId> = StreamingJob::find()
751 .select_only()
752 .column(streaming_job::Column::JobId)
753 .filter(streaming_job::Column::JobStatus.eq(JobStatus::Created))
754 .into_tuple()
755 .all(&self.db)
756 .await?;
757
758 let job_ids: HashSet<ObjectId> = table_objs
759 .iter()
760 .map(|(t, _)| t.table_id)
761 .chain(created_streaming_job_ids.iter().cloned())
762 .collect();
763
764 let internal_table_objs = Table::find()
765 .find_also_related(Object)
766 .filter(
767 table::Column::TableType
768 .eq(TableType::Internal)
769 .and(table::Column::BelongsToJobId.is_in(job_ids)),
770 )
771 .all(&self.db)
772 .await?;
773
774 Ok(table_objs
775 .into_iter()
776 .chain(internal_table_objs.into_iter())
777 .map(|(table, obj)| {
778 let is_created = created_streaming_job_ids.contains(&table.table_id)
780 || (table.table_type == TableType::Internal
781 && created_streaming_job_ids.contains(&table.belongs_to_job_id.unwrap()));
782 let mut pb_table: PbTable = ObjectModel(table, obj.unwrap()).into();
783 pb_table.stream_job_status = if is_created {
784 PbStreamJobStatus::Created.into()
785 } else {
786 PbStreamJobStatus::Creating.into()
787 };
788 pb_table
789 })
790 .collect())
791 }
792
793 async fn list_sources(&self) -> MetaResult<Vec<PbSource>> {
795 let mut source_objs = Source::find()
796 .find_also_related(Object)
797 .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
798 .filter(
799 streaming_job::Column::JobStatus
800 .is_null()
801 .or(streaming_job::Column::JobStatus.eq(JobStatus::Created)),
802 )
803 .all(&self.db)
804 .await?;
805
806 let created_table_ids: HashSet<TableId> = Table::find()
808 .select_only()
809 .column(table::Column::TableId)
810 .join(JoinType::InnerJoin, table::Relation::Object1.def())
811 .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
812 .filter(
813 table::Column::OptionalAssociatedSourceId
814 .is_not_null()
815 .and(streaming_job::Column::JobStatus.eq(JobStatus::Created)),
816 )
817 .into_tuple()
818 .all(&self.db)
819 .await?
820 .into_iter()
821 .collect();
822 source_objs.retain_mut(|(source, _)| {
823 source.optional_associated_table_id.is_none()
824 || created_table_ids.contains(&source.optional_associated_table_id.unwrap())
825 });
826
827 Ok(source_objs
828 .into_iter()
829 .map(|(source, obj)| ObjectModel(source, obj.unwrap()).into())
830 .collect())
831 }
832
833 async fn list_sinks(&self) -> MetaResult<Vec<PbSink>> {
835 let sink_objs = Sink::find()
836 .find_also_related(Object)
837 .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
838 .filter(streaming_job::Column::JobStatus.eq(JobStatus::Created))
839 .all(&self.db)
840 .await?;
841
842 Ok(sink_objs
843 .into_iter()
844 .map(|(sink, obj)| ObjectModel(sink, obj.unwrap()).into())
845 .collect())
846 }
847
848 async fn list_subscriptions(&self) -> MetaResult<Vec<PbSubscription>> {
850 let subscription_objs = Subscription::find()
851 .find_also_related(Object)
852 .all(&self.db)
853 .await?;
854
855 Ok(subscription_objs
856 .into_iter()
857 .map(|(subscription, obj)| ObjectModel(subscription, obj.unwrap()).into())
858 .collect())
859 }
860
861 async fn list_views(&self) -> MetaResult<Vec<PbView>> {
862 let view_objs = View::find().find_also_related(Object).all(&self.db).await?;
863
864 Ok(view_objs
865 .into_iter()
866 .map(|(view, obj)| ObjectModel(view, obj.unwrap()).into())
867 .collect())
868 }
869
870 async fn list_indexes(&self) -> MetaResult<Vec<PbIndex>> {
872 let index_objs = Index::find()
873 .find_also_related(Object)
874 .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
875 .filter(streaming_job::Column::JobStatus.eq(JobStatus::Created))
876 .all(&self.db)
877 .await?;
878
879 Ok(index_objs
880 .into_iter()
881 .map(|(index, obj)| ObjectModel(index, obj.unwrap()).into())
882 .collect())
883 }
884
885 async fn list_connections(&self) -> MetaResult<Vec<PbConnection>> {
886 let conn_objs = Connection::find()
887 .find_also_related(Object)
888 .all(&self.db)
889 .await?;
890
891 Ok(conn_objs
892 .into_iter()
893 .map(|(conn, obj)| ObjectModel(conn, obj.unwrap()).into())
894 .collect())
895 }
896
897 pub async fn list_secrets(&self) -> MetaResult<Vec<PbSecret>> {
898 let secret_objs = Secret::find()
899 .find_also_related(Object)
900 .all(&self.db)
901 .await?;
902 Ok(secret_objs
903 .into_iter()
904 .map(|(secret, obj)| ObjectModel(secret, obj.unwrap()).into())
905 .collect())
906 }
907
908 async fn list_functions(&self) -> MetaResult<Vec<PbFunction>> {
909 let func_objs = Function::find()
910 .find_also_related(Object)
911 .all(&self.db)
912 .await?;
913
914 Ok(func_objs
915 .into_iter()
916 .map(|(func, obj)| ObjectModel(func, obj.unwrap()).into())
917 .collect())
918 }
919
920 pub(crate) fn register_finish_notifier(
921 &mut self,
922 database_id: DatabaseId,
923 id: ObjectId,
924 sender: Sender<Result<NotificationVersion, String>>,
925 ) {
926 self.creating_table_finish_notifier
927 .entry(database_id)
928 .or_default()
929 .entry(id)
930 .or_default()
931 .push(sender);
932 }
933
934 pub(crate) async fn streaming_job_is_finished(&mut self, id: i32) -> MetaResult<bool> {
935 let status = StreamingJob::find()
936 .select_only()
937 .column(streaming_job::Column::JobStatus)
938 .filter(streaming_job::Column::JobId.eq(id))
939 .into_tuple::<JobStatus>()
940 .one(&self.db)
941 .await?;
942
943 status
944 .map(|status| status == JobStatus::Created)
945 .ok_or_else(|| {
946 MetaError::catalog_id_not_found("streaming job", "may have been cancelled/dropped")
947 })
948 }
949
950 pub(crate) fn notify_finish_failed(&mut self, database_id: Option<DatabaseId>, err: String) {
951 if let Some(database_id) = database_id {
952 if let Some(creating_tables) = self.creating_table_finish_notifier.remove(&database_id)
953 {
954 for tx in creating_tables.into_values().flatten() {
955 let _ = tx.send(Err(err.clone()));
956 }
957 }
958 } else {
959 for tx in take(&mut self.creating_table_finish_notifier)
960 .into_values()
961 .flatten()
962 .flat_map(|(_, txs)| txs.into_iter())
963 {
964 let _ = tx.send(Err(err.clone()));
965 }
966 }
967 }
968
969 pub async fn list_time_travel_table_ids(&self) -> MetaResult<Vec<TableId>> {
970 let table_ids: Vec<TableId> = Table::find()
971 .select_only()
972 .filter(table::Column::TableType.is_in(vec![
973 TableType::Table,
974 TableType::MaterializedView,
975 TableType::Index,
976 ]))
977 .column(table::Column::TableId)
978 .into_tuple()
979 .all(&self.db)
980 .await?;
981 Ok(table_ids)
982 }
983
984 pub(crate) fn complete_dropped_tables(
987 &mut self,
988 table_ids: impl Iterator<Item = TableId>,
989 ) -> Vec<PbTable> {
990 table_ids
991 .filter_map(|table_id| {
992 self.dropped_tables.remove(&table_id).map_or_else(
993 || {
994 tracing::warn!(table_id, "table not found");
995 None
996 },
997 Some,
998 )
999 })
1000 .collect()
1001 }
1002}