1mod alter_op;
16mod create_op;
17mod drop_op;
18mod get_op;
19mod list_op;
20mod test;
21mod util;
22
23use std::collections::{BTreeSet, HashMap, HashSet};
24use std::iter;
25use std::mem::take;
26use std::sync::Arc;
27
28use anyhow::anyhow;
29use itertools::Itertools;
30use risingwave_common::catalog::{
31 DEFAULT_SCHEMA_NAME, FragmentTypeFlag, SYSTEM_SCHEMAS, TableOption,
32};
33use risingwave_common::current_cluster_version;
34use risingwave_common::secret::LocalSecretManager;
35use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont_mut;
36use risingwave_connector::source::UPSTREAM_SOURCE_KEY;
37use risingwave_connector::source::cdc::build_cdc_table_id;
38use risingwave_meta_model::object::ObjectType;
39use risingwave_meta_model::prelude::*;
40use risingwave_meta_model::table::TableType;
41use risingwave_meta_model::{
42 ActorId, ColumnCatalogArray, ConnectionId, CreateType, DatabaseId, FragmentId, I32Array,
43 IndexId, JobStatus, ObjectId, Property, SchemaId, SecretId, SinkFormatDesc, SinkId, SourceId,
44 StreamNode, StreamSourceInfo, StreamingParallelism, SubscriptionId, TableId, UserId, ViewId,
45 connection, database, fragment, function, index, object, object_dependency, schema, secret,
46 sink, source, streaming_job, subscription, table, user_privilege, view,
47};
48use risingwave_pb::catalog::connection::Info as ConnectionInfo;
49use risingwave_pb::catalog::subscription::SubscriptionState;
50use risingwave_pb::catalog::table::PbTableType;
51use risingwave_pb::catalog::{
52 PbComment, PbConnection, PbDatabase, PbFunction, PbIndex, PbSchema, PbSecret, PbSink, PbSource,
53 PbStreamJobStatus, PbSubscription, PbTable, PbView,
54};
55use risingwave_pb::meta::cancel_creating_jobs_request::PbCreatingJobInfo;
56use risingwave_pb::meta::list_object_dependencies_response::PbObjectDependencies;
57use risingwave_pb::meta::object::PbObjectInfo;
58use risingwave_pb::meta::subscribe_response::{
59 Info as NotificationInfo, Info, Operation as NotificationOperation, Operation,
60};
61use risingwave_pb::meta::{PbObject, PbObjectGroup};
62use risingwave_pb::stream_plan::stream_node::NodeBody;
63use risingwave_pb::telemetry::PbTelemetryEventStage;
64use risingwave_pb::user::PbUserInfo;
65use sea_orm::ActiveValue::Set;
66use sea_orm::sea_query::{Expr, Query, SimpleExpr};
67use sea_orm::{
68 ActiveModelTrait, ColumnTrait, DatabaseConnection, DatabaseTransaction, EntityTrait,
69 IntoActiveModel, JoinType, PaginatorTrait, QueryFilter, QuerySelect, RelationTrait,
70 SelectColumns, TransactionTrait, Value,
71};
72use tokio::sync::oneshot::Sender;
73use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
74use tracing::info;
75
76use super::utils::{
77 check_subscription_name_duplicate, get_internal_tables_by_id, rename_relation,
78 rename_relation_refer,
79};
80use crate::controller::ObjectModel;
81use crate::controller::catalog::util::update_internal_tables;
82use crate::controller::utils::*;
83use crate::manager::{
84 IGNORED_NOTIFICATION_VERSION, MetaSrvEnv, NotificationVersion,
85 get_referred_connection_ids_from_source, get_referred_secret_ids_from_source,
86};
87use crate::rpc::ddl_controller::DropMode;
88use crate::telemetry::{MetaTelemetryJobDesc, report_event};
89use crate::{MetaError, MetaResult};
90
91pub type Catalog = (
92 Vec<PbDatabase>,
93 Vec<PbSchema>,
94 Vec<PbTable>,
95 Vec<PbSource>,
96 Vec<PbSink>,
97 Vec<PbSubscription>,
98 Vec<PbIndex>,
99 Vec<PbView>,
100 Vec<PbFunction>,
101 Vec<PbConnection>,
102 Vec<PbSecret>,
103);
104
105pub type CatalogControllerRef = Arc<CatalogController>;
106
107pub struct CatalogController {
109 pub(crate) env: MetaSrvEnv,
110 pub(crate) inner: RwLock<CatalogControllerInner>,
111}
112
113#[derive(Clone, Default, Debug)]
114pub struct DropTableConnectorContext {
115 pub(crate) to_change_streaming_job_id: ObjectId,
117 pub(crate) to_remove_state_table_id: TableId,
118 pub(crate) to_remove_source_id: SourceId,
119}
120
121#[derive(Clone, Default, Debug)]
122pub struct ReleaseContext {
123 pub(crate) database_id: DatabaseId,
124 pub(crate) removed_streaming_job_ids: Vec<ObjectId>,
125 pub(crate) removed_state_table_ids: Vec<TableId>,
127
128 pub(crate) removed_secret_ids: Vec<SecretId>,
130 pub(crate) removed_source_ids: Vec<SourceId>,
132 pub(crate) removed_source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
135
136 pub(crate) removed_actors: HashSet<ActorId>,
137 pub(crate) removed_fragments: HashSet<FragmentId>,
138}
139
140impl CatalogController {
141 pub async fn new(env: MetaSrvEnv) -> MetaResult<Self> {
142 let meta_store = env.meta_store();
143 let catalog_controller = Self {
144 env,
145 inner: RwLock::new(CatalogControllerInner {
146 db: meta_store.conn,
147 creating_table_finish_notifier: HashMap::new(),
148 dropped_tables: HashMap::new(),
149 }),
150 };
151
152 catalog_controller.init().await?;
153 Ok(catalog_controller)
154 }
155
156 pub async fn get_inner_read_guard(&self) -> RwLockReadGuard<'_, CatalogControllerInner> {
159 self.inner.read().await
160 }
161
162 pub async fn get_inner_write_guard(&self) -> RwLockWriteGuard<'_, CatalogControllerInner> {
163 self.inner.write().await
164 }
165}
166
167pub struct CatalogControllerInner {
168 pub(crate) db: DatabaseConnection,
169 #[expect(clippy::type_complexity)]
174 pub creating_table_finish_notifier:
175 HashMap<DatabaseId, HashMap<ObjectId, Vec<Sender<Result<NotificationVersion, String>>>>>,
176 pub dropped_tables: HashMap<TableId, PbTable>,
178}
179
180impl CatalogController {
181 pub(crate) async fn notify_frontend(
182 &self,
183 operation: NotificationOperation,
184 info: NotificationInfo,
185 ) -> NotificationVersion {
186 self.env
187 .notification_manager()
188 .notify_frontend(operation, info)
189 .await
190 }
191
192 pub(crate) async fn notify_frontend_relation_info(
193 &self,
194 operation: NotificationOperation,
195 relation_info: PbObjectInfo,
196 ) -> NotificationVersion {
197 self.env
198 .notification_manager()
199 .notify_frontend_object_info(operation, relation_info)
200 .await
201 }
202
203 pub(crate) async fn current_notification_version(&self) -> NotificationVersion {
204 self.env.notification_manager().current_version().await
205 }
206}
207
208impl CatalogController {
209 pub async fn finish_create_subscription_catalog(&self, subscription_id: u32) -> MetaResult<()> {
210 let inner = self.inner.write().await;
211 let txn = inner.db.begin().await?;
212 let job_id = subscription_id as i32;
213
214 let res = Object::update_many()
216 .col_expr(object::Column::CreatedAt, Expr::current_timestamp().into())
217 .col_expr(
218 object::Column::CreatedAtClusterVersion,
219 current_cluster_version().into(),
220 )
221 .filter(object::Column::Oid.eq(job_id))
222 .exec(&txn)
223 .await?;
224 if res.rows_affected == 0 {
225 return Err(MetaError::catalog_id_not_found("subscription", job_id));
226 }
227
228 let job = subscription::ActiveModel {
230 subscription_id: Set(job_id),
231 subscription_state: Set(SubscriptionState::Created.into()),
232 ..Default::default()
233 };
234 job.update(&txn).await?;
235
236 let _ = grant_default_privileges_automatically(&txn, job_id).await?;
237
238 txn.commit().await?;
239
240 Ok(())
241 }
242
243 pub async fn notify_create_subscription(
244 &self,
245 subscription_id: u32,
246 ) -> MetaResult<NotificationVersion> {
247 let inner = self.inner.read().await;
248 let job_id = subscription_id as i32;
249 let (subscription, obj) = Subscription::find_by_id(job_id)
250 .find_also_related(Object)
251 .filter(subscription::Column::SubscriptionState.eq(SubscriptionState::Created as i32))
252 .one(&inner.db)
253 .await?
254 .ok_or_else(|| MetaError::catalog_id_not_found("subscription", job_id))?;
255
256 let mut version = self
257 .notify_frontend(
258 NotificationOperation::Add,
259 NotificationInfo::ObjectGroup(PbObjectGroup {
260 objects: vec![PbObject {
261 object_info: PbObjectInfo::Subscription(
262 ObjectModel(subscription, obj.unwrap()).into(),
263 )
264 .into(),
265 }],
266 }),
267 )
268 .await;
269
270 let updated_user_ids: Vec<UserId> = UserPrivilege::find()
272 .select_only()
273 .distinct()
274 .column(user_privilege::Column::UserId)
275 .filter(user_privilege::Column::Oid.eq(subscription_id as ObjectId))
276 .into_tuple()
277 .all(&inner.db)
278 .await?;
279
280 if !updated_user_ids.is_empty() {
281 let updated_user_infos = list_user_info_by_ids(updated_user_ids, &inner.db).await?;
282 version = self.notify_users_update(updated_user_infos).await;
283 }
284
285 Ok(version)
286 }
287
288 pub async fn get_connector_usage(&self) -> MetaResult<jsonbb::Value> {
290 let inner = self.inner.read().await;
310 let source_props_and_info: Vec<(i32, Property, Option<StreamSourceInfo>)> = Source::find()
311 .select_only()
312 .column(source::Column::SourceId)
313 .column(source::Column::WithProperties)
314 .column(source::Column::SourceInfo)
315 .into_tuple()
316 .all(&inner.db)
317 .await?;
318 let sink_props_and_info: Vec<(i32, Property, Option<SinkFormatDesc>)> = Sink::find()
319 .select_only()
320 .column(sink::Column::SinkId)
321 .column(sink::Column::Properties)
322 .column(sink::Column::SinkFormatDesc)
323 .into_tuple()
324 .all(&inner.db)
325 .await?;
326 drop(inner);
327
328 let get_connector_from_property = |property: &Property| -> String {
329 property
330 .0
331 .get(UPSTREAM_SOURCE_KEY)
332 .cloned()
333 .unwrap_or_default()
334 };
335
336 let source_report: Vec<jsonbb::Value> = source_props_and_info
337 .iter()
338 .map(|(oid, property, info)| {
339 let connector_name = get_connector_from_property(property);
340 let mut format = None;
341 let mut encode = None;
342 if let Some(info) = info {
343 let pb_info = info.to_protobuf();
344 format = Some(pb_info.format().as_str_name());
345 encode = Some(pb_info.row_encode().as_str_name());
346 }
347 jsonbb::json!({
348 oid.to_string(): {
349 "connector": connector_name,
350 "format": format,
351 "encode": encode,
352 },
353 })
354 })
355 .collect_vec();
356
357 let sink_report: Vec<jsonbb::Value> = sink_props_and_info
358 .iter()
359 .map(|(oid, property, info)| {
360 let connector_name = get_connector_from_property(property);
361 let mut format = None;
362 let mut encode = None;
363 if let Some(info) = info {
364 let pb_info = info.to_protobuf();
365 format = Some(pb_info.format().as_str_name());
366 encode = Some(pb_info.encode().as_str_name());
367 }
368 jsonbb::json!({
369 oid.to_string(): {
370 "connector": connector_name,
371 "format": format,
372 "encode": encode,
373 },
374 })
375 })
376 .collect_vec();
377
378 Ok(jsonbb::json!({
379 "source": source_report,
380 "sink": sink_report,
381 }))
382 }
383
384 pub async fn clean_dirty_subscription(
385 &self,
386 database_id: Option<DatabaseId>,
387 ) -> MetaResult<()> {
388 let inner = self.inner.write().await;
389 let txn = inner.db.begin().await?;
390 let filter_condition = object::Column::ObjType.eq(ObjectType::Subscription).and(
391 object::Column::Oid.not_in_subquery(
392 Query::select()
393 .column(subscription::Column::SubscriptionId)
394 .from(Subscription)
395 .and_where(
396 subscription::Column::SubscriptionState
397 .eq(SubscriptionState::Created as i32),
398 )
399 .take(),
400 ),
401 );
402
403 let filter_condition = if let Some(database_id) = database_id {
404 filter_condition.and(object::Column::DatabaseId.eq(database_id))
405 } else {
406 filter_condition
407 };
408 Object::delete_many()
409 .filter(filter_condition)
410 .exec(&txn)
411 .await?;
412 txn.commit().await?;
413 Ok(())
415 }
416
417 pub async fn clean_dirty_creating_jobs(
419 &self,
420 database_id: Option<DatabaseId>,
421 ) -> MetaResult<Vec<SourceId>> {
422 let inner = self.inner.write().await;
423 let txn = inner.db.begin().await?;
424
425 let filter_condition = streaming_job::Column::JobStatus.eq(JobStatus::Initial).or(
426 streaming_job::Column::JobStatus
427 .eq(JobStatus::Creating)
428 .and(streaming_job::Column::CreateType.eq(CreateType::Foreground)),
429 );
430
431 let filter_condition = if let Some(database_id) = database_id {
432 filter_condition.and(object::Column::DatabaseId.eq(database_id))
433 } else {
434 filter_condition
435 };
436
437 let dirty_job_objs: Vec<PartialObject> = streaming_job::Entity::find()
438 .select_only()
439 .column(streaming_job::Column::JobId)
440 .columns([
441 object::Column::Oid,
442 object::Column::ObjType,
443 object::Column::SchemaId,
444 object::Column::DatabaseId,
445 ])
446 .join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
447 .filter(filter_condition)
448 .into_partial_model()
449 .all(&txn)
450 .await?;
451
452 let updated_table_ids = Self::clean_dirty_sink_downstreams(&txn).await?;
453 let updated_table_objs = if !updated_table_ids.is_empty() {
454 Table::find()
455 .find_also_related(Object)
456 .filter(table::Column::TableId.is_in(updated_table_ids))
457 .all(&txn)
458 .await?
459 } else {
460 vec![]
461 };
462
463 if dirty_job_objs.is_empty() {
464 if !updated_table_objs.is_empty() {
465 txn.commit().await?;
466
467 self.notify_frontend(
469 NotificationOperation::Update,
470 NotificationInfo::ObjectGroup(PbObjectGroup {
471 objects: updated_table_objs
472 .into_iter()
473 .map(|(t, obj)| PbObject {
474 object_info: PbObjectInfo::Table(
475 ObjectModel(t, obj.unwrap()).into(),
476 )
477 .into(),
478 })
479 .collect(),
480 }),
481 )
482 .await;
483 }
484
485 return Ok(vec![]);
486 }
487
488 self.log_cleaned_dirty_jobs(&dirty_job_objs, &txn).await?;
489
490 let dirty_job_ids = dirty_job_objs.iter().map(|obj| obj.oid).collect::<Vec<_>>();
491
492 let all_dirty_table_ids = dirty_job_objs
495 .iter()
496 .filter(|obj| obj.obj_type == ObjectType::Table)
497 .map(|obj| obj.oid)
498 .collect_vec();
499 let dirty_table_type_map: HashMap<ObjectId, TableType> = Table::find()
500 .select_only()
501 .column(table::Column::TableId)
502 .column(table::Column::TableType)
503 .filter(table::Column::TableId.is_in(all_dirty_table_ids))
504 .into_tuple::<(ObjectId, TableType)>()
505 .all(&txn)
506 .await?
507 .into_iter()
508 .collect();
509
510 let dirty_background_jobs: HashSet<ObjectId> = streaming_job::Entity::find()
511 .select_only()
512 .column(streaming_job::Column::JobId)
513 .filter(
514 streaming_job::Column::JobId
515 .is_in(dirty_job_ids.clone())
516 .and(streaming_job::Column::CreateType.eq(CreateType::Background)),
517 )
518 .into_tuple()
519 .all(&txn)
520 .await?
521 .into_iter()
522 .collect();
523
524 let to_notify_objs = dirty_job_objs
526 .into_iter()
527 .filter(|obj| {
528 matches!(
529 dirty_table_type_map.get(&obj.oid),
530 Some(TableType::MaterializedView)
531 ) || dirty_background_jobs.contains(&obj.oid)
532 })
533 .collect_vec();
534
535 let dirty_associated_source_ids: Vec<SourceId> = Table::find()
538 .select_only()
539 .column(table::Column::OptionalAssociatedSourceId)
540 .filter(
541 table::Column::TableId
542 .is_in(dirty_job_ids.clone())
543 .and(table::Column::OptionalAssociatedSourceId.is_not_null()),
544 )
545 .into_tuple()
546 .all(&txn)
547 .await?;
548
549 let dirty_state_table_ids: Vec<TableId> = Table::find()
550 .select_only()
551 .column(table::Column::TableId)
552 .filter(table::Column::BelongsToJobId.is_in(dirty_job_ids.clone()))
553 .into_tuple()
554 .all(&txn)
555 .await?;
556
557 let dirty_internal_table_objs = Object::find()
558 .select_only()
559 .columns([
560 object::Column::Oid,
561 object::Column::ObjType,
562 object::Column::SchemaId,
563 object::Column::DatabaseId,
564 ])
565 .join(JoinType::InnerJoin, object::Relation::Table.def())
566 .filter(table::Column::BelongsToJobId.is_in(to_notify_objs.iter().map(|obj| obj.oid)))
567 .into_partial_model()
568 .all(&txn)
569 .await?;
570
571 let to_delete_objs: HashSet<ObjectId> = dirty_job_ids
572 .clone()
573 .into_iter()
574 .chain(dirty_state_table_ids.into_iter())
575 .chain(dirty_associated_source_ids.clone().into_iter())
576 .collect();
577
578 let res = Object::delete_many()
579 .filter(object::Column::Oid.is_in(to_delete_objs))
580 .exec(&txn)
581 .await?;
582 assert!(res.rows_affected > 0);
583
584 txn.commit().await?;
585
586 let object_group = build_object_group_for_delete(
587 to_notify_objs
588 .into_iter()
589 .chain(dirty_internal_table_objs.into_iter())
590 .collect_vec(),
591 );
592
593 let _version = self
594 .notify_frontend(NotificationOperation::Delete, object_group)
595 .await;
596
597 if !updated_table_objs.is_empty() {
599 self.notify_frontend(
600 NotificationOperation::Update,
601 NotificationInfo::ObjectGroup(PbObjectGroup {
602 objects: updated_table_objs
603 .into_iter()
604 .map(|(t, obj)| PbObject {
605 object_info: PbObjectInfo::Table(ObjectModel(t, obj.unwrap()).into())
606 .into(),
607 })
608 .collect(),
609 }),
610 )
611 .await;
612 }
613
614 Ok(dirty_associated_source_ids)
615 }
616
617 pub async fn comment_on(&self, comment: PbComment) -> MetaResult<NotificationVersion> {
618 let inner = self.inner.write().await;
619 let txn = inner.db.begin().await?;
620 ensure_object_id(ObjectType::Database, comment.database_id as _, &txn).await?;
621 ensure_object_id(ObjectType::Schema, comment.schema_id as _, &txn).await?;
622 let table_obj = Object::find_by_id(comment.table_id as ObjectId)
623 .one(&txn)
624 .await?
625 .ok_or_else(|| MetaError::catalog_id_not_found("table", comment.table_id))?;
626
627 let table = if let Some(col_idx) = comment.column_index {
628 let columns: ColumnCatalogArray = Table::find_by_id(comment.table_id as TableId)
629 .select_only()
630 .column(table::Column::Columns)
631 .into_tuple()
632 .one(&txn)
633 .await?
634 .ok_or_else(|| MetaError::catalog_id_not_found("table", comment.table_id))?;
635 let mut pb_columns = columns.to_protobuf();
636
637 let column = pb_columns
638 .get_mut(col_idx as usize)
639 .ok_or_else(|| MetaError::catalog_id_not_found("column", col_idx))?;
640 let column_desc = column.column_desc.as_mut().ok_or_else(|| {
641 anyhow!(
642 "column desc at index {} for table id {} not found",
643 col_idx,
644 comment.table_id
645 )
646 })?;
647 column_desc.description = comment.description;
648 table::ActiveModel {
649 table_id: Set(comment.table_id as _),
650 columns: Set(pb_columns.into()),
651 ..Default::default()
652 }
653 .update(&txn)
654 .await?
655 } else {
656 table::ActiveModel {
657 table_id: Set(comment.table_id as _),
658 description: Set(comment.description),
659 ..Default::default()
660 }
661 .update(&txn)
662 .await?
663 };
664 txn.commit().await?;
665
666 let version = self
667 .notify_frontend_relation_info(
668 NotificationOperation::Update,
669 PbObjectInfo::Table(ObjectModel(table, table_obj).into()),
670 )
671 .await;
672
673 Ok(version)
674 }
675
676 pub async fn complete_dropped_tables(
677 &self,
678 table_ids: impl Iterator<Item = TableId>,
679 ) -> Vec<PbTable> {
680 let mut inner = self.inner.write().await;
681 inner.complete_dropped_tables(table_ids)
682 }
683}
684
685pub struct CatalogStats {
687 pub table_num: u64,
688 pub mview_num: u64,
689 pub index_num: u64,
690 pub source_num: u64,
691 pub sink_num: u64,
692 pub function_num: u64,
693 pub streaming_job_num: u64,
694 pub actor_num: u64,
695}
696
697impl CatalogControllerInner {
698 pub async fn snapshot(&self) -> MetaResult<(Catalog, Vec<PbUserInfo>)> {
699 let databases = self.list_databases().await?;
700 let schemas = self.list_schemas().await?;
701 let tables = self.list_tables().await?;
702 let sources = self.list_sources().await?;
703 let sinks = self.list_sinks().await?;
704 let subscriptions = self.list_subscriptions().await?;
705 let indexes = self.list_indexes().await?;
706 let views = self.list_views().await?;
707 let functions = self.list_functions().await?;
708 let connections = self.list_connections().await?;
709 let secrets = self.list_secrets().await?;
710
711 let users = self.list_users().await?;
712
713 Ok((
714 (
715 databases,
716 schemas,
717 tables,
718 sources,
719 sinks,
720 subscriptions,
721 indexes,
722 views,
723 functions,
724 connections,
725 secrets,
726 ),
727 users,
728 ))
729 }
730
731 pub async fn stats(&self) -> MetaResult<CatalogStats> {
732 let mut table_num_map: HashMap<_, _> = Table::find()
733 .select_only()
734 .column(table::Column::TableType)
735 .column_as(table::Column::TableId.count(), "num")
736 .group_by(table::Column::TableType)
737 .having(table::Column::TableType.ne(TableType::Internal))
738 .into_tuple::<(TableType, i64)>()
739 .all(&self.db)
740 .await?
741 .into_iter()
742 .map(|(table_type, num)| (table_type, num as u64))
743 .collect();
744
745 let source_num = Source::find().count(&self.db).await?;
746 let sink_num = Sink::find().count(&self.db).await?;
747 let function_num = Function::find().count(&self.db).await?;
748 let streaming_job_num = StreamingJob::find().count(&self.db).await?;
749 let actor_num = Actor::find().count(&self.db).await?;
750
751 Ok(CatalogStats {
752 table_num: table_num_map.remove(&TableType::Table).unwrap_or(0),
753 mview_num: table_num_map
754 .remove(&TableType::MaterializedView)
755 .unwrap_or(0),
756 index_num: table_num_map.remove(&TableType::Index).unwrap_or(0),
757 source_num,
758 sink_num,
759 function_num,
760 streaming_job_num,
761 actor_num,
762 })
763 }
764
765 async fn list_databases(&self) -> MetaResult<Vec<PbDatabase>> {
766 let db_objs = Database::find()
767 .find_also_related(Object)
768 .all(&self.db)
769 .await?;
770 Ok(db_objs
771 .into_iter()
772 .map(|(db, obj)| ObjectModel(db, obj.unwrap()).into())
773 .collect())
774 }
775
776 async fn list_schemas(&self) -> MetaResult<Vec<PbSchema>> {
777 let schema_objs = Schema::find()
778 .find_also_related(Object)
779 .all(&self.db)
780 .await?;
781
782 Ok(schema_objs
783 .into_iter()
784 .map(|(schema, obj)| ObjectModel(schema, obj.unwrap()).into())
785 .collect())
786 }
787
788 async fn list_users(&self) -> MetaResult<Vec<PbUserInfo>> {
789 let mut user_infos: Vec<PbUserInfo> = User::find()
790 .all(&self.db)
791 .await?
792 .into_iter()
793 .map(Into::into)
794 .collect();
795
796 for user_info in &mut user_infos {
797 user_info.grant_privileges = get_user_privilege(user_info.id as _, &self.db).await?;
798 }
799 Ok(user_infos)
800 }
801
802 pub async fn list_all_state_tables(&self) -> MetaResult<Vec<PbTable>> {
804 let table_objs = Table::find()
805 .find_also_related(Object)
806 .all(&self.db)
807 .await?;
808
809 Ok(table_objs
810 .into_iter()
811 .map(|(table, obj)| ObjectModel(table, obj.unwrap()).into())
812 .collect())
813 }
814
815 async fn list_tables(&self) -> MetaResult<Vec<PbTable>> {
817 let table_objs = Table::find()
818 .find_also_related(Object)
819 .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
820 .filter(
821 streaming_job::Column::JobStatus.eq(JobStatus::Created).or(
822 table::Column::TableType
823 .eq(TableType::MaterializedView)
824 .or(streaming_job::Column::CreateType.eq(CreateType::Background)),
825 ),
826 )
827 .all(&self.db)
828 .await?;
829
830 let job_statuses: HashMap<ObjectId, JobStatus> = StreamingJob::find()
831 .select_only()
832 .column(streaming_job::Column::JobId)
833 .column(streaming_job::Column::JobStatus)
834 .filter(
835 streaming_job::Column::JobStatus
836 .eq(JobStatus::Created)
837 .or(streaming_job::Column::CreateType.eq(CreateType::Background)),
838 )
839 .into_tuple::<(ObjectId, JobStatus)>()
840 .all(&self.db)
841 .await?
842 .into_iter()
843 .collect();
844
845 let job_ids: HashSet<ObjectId> = table_objs
846 .iter()
847 .map(|(t, _)| t.table_id)
848 .chain(job_statuses.keys().cloned())
849 .collect();
850
851 let internal_table_objs = Table::find()
852 .find_also_related(Object)
853 .filter(
854 table::Column::TableType
855 .eq(TableType::Internal)
856 .and(table::Column::BelongsToJobId.is_in(job_ids)),
857 )
858 .all(&self.db)
859 .await?;
860
861 Ok(table_objs
862 .into_iter()
863 .chain(internal_table_objs.into_iter())
864 .map(|(table, obj)| {
865 let status: PbStreamJobStatus = if table.table_type == TableType::Internal {
868 (*job_statuses
869 .get(&table.belongs_to_job_id.unwrap())
870 .unwrap_or(&JobStatus::Creating))
871 .into()
872 } else {
873 (*job_statuses
874 .get(&table.table_id)
875 .unwrap_or(&JobStatus::Creating))
876 .into()
877 };
878 let mut pb_table: PbTable = ObjectModel(table, obj.unwrap()).into();
879 pb_table.stream_job_status = status.into();
880 pb_table
881 })
882 .collect())
883 }
884
885 async fn list_sources(&self) -> MetaResult<Vec<PbSource>> {
887 let mut source_objs = Source::find()
888 .find_also_related(Object)
889 .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
890 .filter(
891 streaming_job::Column::JobStatus
892 .is_null()
893 .or(streaming_job::Column::JobStatus.eq(JobStatus::Created)),
894 )
895 .all(&self.db)
896 .await?;
897
898 let created_table_ids: HashSet<TableId> = Table::find()
900 .select_only()
901 .column(table::Column::TableId)
902 .join(JoinType::InnerJoin, table::Relation::Object1.def())
903 .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
904 .filter(
905 table::Column::OptionalAssociatedSourceId
906 .is_not_null()
907 .and(streaming_job::Column::JobStatus.eq(JobStatus::Created)),
908 )
909 .into_tuple()
910 .all(&self.db)
911 .await?
912 .into_iter()
913 .collect();
914 source_objs.retain_mut(|(source, _)| {
915 source.optional_associated_table_id.is_none()
916 || created_table_ids.contains(&source.optional_associated_table_id.unwrap())
917 });
918
919 Ok(source_objs
920 .into_iter()
921 .map(|(source, obj)| ObjectModel(source, obj.unwrap()).into())
922 .collect())
923 }
924
925 async fn list_sinks(&self) -> MetaResult<Vec<PbSink>> {
927 let sink_objs = Sink::find()
928 .find_also_related(Object)
929 .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
930 .filter(
931 streaming_job::Column::JobStatus
932 .eq(JobStatus::Created)
933 .or(streaming_job::Column::CreateType.eq(CreateType::Background)),
934 )
935 .all(&self.db)
936 .await?;
937
938 let creating_sinks: HashSet<_> = StreamingJob::find()
939 .select_only()
940 .column(streaming_job::Column::JobId)
941 .filter(
942 streaming_job::Column::JobStatus
943 .eq(JobStatus::Creating)
944 .and(
945 streaming_job::Column::JobId
946 .is_in(sink_objs.iter().map(|(sink, _)| sink.sink_id)),
947 ),
948 )
949 .into_tuple::<SinkId>()
950 .all(&self.db)
951 .await?
952 .into_iter()
953 .collect();
954
955 Ok(sink_objs
956 .into_iter()
957 .map(|(sink, obj)| {
958 let is_creating = creating_sinks.contains(&sink.sink_id);
959 let mut pb_sink: PbSink = ObjectModel(sink, obj.unwrap()).into();
960 pb_sink.stream_job_status = if is_creating {
961 PbStreamJobStatus::Creating.into()
962 } else {
963 PbStreamJobStatus::Created.into()
964 };
965 pb_sink
966 })
967 .collect())
968 }
969
970 async fn list_subscriptions(&self) -> MetaResult<Vec<PbSubscription>> {
972 let subscription_objs = Subscription::find()
973 .find_also_related(Object)
974 .filter(subscription::Column::SubscriptionState.eq(SubscriptionState::Created as i32))
975 .all(&self.db)
976 .await?;
977
978 Ok(subscription_objs
979 .into_iter()
980 .map(|(subscription, obj)| ObjectModel(subscription, obj.unwrap()).into())
981 .collect())
982 }
983
984 async fn list_views(&self) -> MetaResult<Vec<PbView>> {
985 let view_objs = View::find().find_also_related(Object).all(&self.db).await?;
986
987 Ok(view_objs
988 .into_iter()
989 .map(|(view, obj)| ObjectModel(view, obj.unwrap()).into())
990 .collect())
991 }
992
993 async fn list_indexes(&self) -> MetaResult<Vec<PbIndex>> {
995 let index_objs = Index::find()
996 .find_also_related(Object)
997 .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
998 .filter(streaming_job::Column::JobStatus.eq(JobStatus::Created))
999 .all(&self.db)
1000 .await?;
1001
1002 Ok(index_objs
1003 .into_iter()
1004 .map(|(index, obj)| ObjectModel(index, obj.unwrap()).into())
1005 .collect())
1006 }
1007
1008 async fn list_connections(&self) -> MetaResult<Vec<PbConnection>> {
1009 let conn_objs = Connection::find()
1010 .find_also_related(Object)
1011 .all(&self.db)
1012 .await?;
1013
1014 Ok(conn_objs
1015 .into_iter()
1016 .map(|(conn, obj)| ObjectModel(conn, obj.unwrap()).into())
1017 .collect())
1018 }
1019
1020 pub async fn list_secrets(&self) -> MetaResult<Vec<PbSecret>> {
1021 let secret_objs = Secret::find()
1022 .find_also_related(Object)
1023 .all(&self.db)
1024 .await?;
1025 Ok(secret_objs
1026 .into_iter()
1027 .map(|(secret, obj)| ObjectModel(secret, obj.unwrap()).into())
1028 .collect())
1029 }
1030
1031 async fn list_functions(&self) -> MetaResult<Vec<PbFunction>> {
1032 let func_objs = Function::find()
1033 .find_also_related(Object)
1034 .all(&self.db)
1035 .await?;
1036
1037 Ok(func_objs
1038 .into_iter()
1039 .map(|(func, obj)| ObjectModel(func, obj.unwrap()).into())
1040 .collect())
1041 }
1042
1043 pub(crate) fn register_finish_notifier(
1044 &mut self,
1045 database_id: DatabaseId,
1046 id: ObjectId,
1047 sender: Sender<Result<NotificationVersion, String>>,
1048 ) {
1049 self.creating_table_finish_notifier
1050 .entry(database_id)
1051 .or_default()
1052 .entry(id)
1053 .or_default()
1054 .push(sender);
1055 }
1056
1057 pub(crate) async fn streaming_job_is_finished(&mut self, id: i32) -> MetaResult<bool> {
1058 let status = StreamingJob::find()
1059 .select_only()
1060 .column(streaming_job::Column::JobStatus)
1061 .filter(streaming_job::Column::JobId.eq(id))
1062 .into_tuple::<JobStatus>()
1063 .one(&self.db)
1064 .await?;
1065
1066 status
1067 .map(|status| status == JobStatus::Created)
1068 .ok_or_else(|| {
1069 MetaError::catalog_id_not_found("streaming job", "may have been cancelled/dropped")
1070 })
1071 }
1072
1073 pub(crate) fn notify_finish_failed(&mut self, database_id: Option<DatabaseId>, err: String) {
1074 if let Some(database_id) = database_id {
1075 if let Some(creating_tables) = self.creating_table_finish_notifier.remove(&database_id)
1076 {
1077 for tx in creating_tables.into_values().flatten() {
1078 let _ = tx.send(Err(err.clone()));
1079 }
1080 }
1081 } else {
1082 for tx in take(&mut self.creating_table_finish_notifier)
1083 .into_values()
1084 .flatten()
1085 .flat_map(|(_, txs)| txs.into_iter())
1086 {
1087 let _ = tx.send(Err(err.clone()));
1088 }
1089 }
1090 }
1091
1092 pub(crate) fn notify_cancelled(&mut self, database_id: DatabaseId, id: ObjectId) {
1093 if let Some(creating_tables) = self.creating_table_finish_notifier.get_mut(&database_id)
1094 && let Some(tx_list) = creating_tables.remove(&id)
1095 {
1096 for tx in tx_list {
1097 let _ = tx.send(Err("Cancelled".to_owned()));
1098 }
1099 }
1100 }
1101
1102 pub async fn list_time_travel_table_ids(&self) -> MetaResult<Vec<TableId>> {
1103 let table_ids: Vec<TableId> = Table::find()
1104 .select_only()
1105 .filter(table::Column::TableType.is_in(vec![
1106 TableType::Table,
1107 TableType::MaterializedView,
1108 TableType::Index,
1109 ]))
1110 .column(table::Column::TableId)
1111 .into_tuple()
1112 .all(&self.db)
1113 .await?;
1114 Ok(table_ids)
1115 }
1116
1117 pub(crate) fn complete_dropped_tables(
1120 &mut self,
1121 table_ids: impl Iterator<Item = TableId>,
1122 ) -> Vec<PbTable> {
1123 table_ids
1124 .filter_map(|table_id| {
1125 self.dropped_tables.remove(&table_id).map_or_else(
1126 || {
1127 tracing::warn!(table_id, "table not found");
1128 None
1129 },
1130 Some,
1131 )
1132 })
1133 .collect()
1134 }
1135}