1use std::collections::HashMap;
16
17use risingwave_common::catalog::ColumnCatalog;
18use risingwave_common::id::JobId;
19use sea_orm::ConnectionTrait;
20
21use super::*;
22use crate::controller::utils::{
23 StreamingJobExtraInfo, get_database_resource_group, get_existing_job_resource_group,
24 get_streaming_job_extra_info as fetch_streaming_job_extra_info, get_table_columns,
25 load_streaming_jobs_by_ids,
26};
27
28impl CatalogController {
29 pub async fn get_secret_by_id(&self, secret_id: SecretId) -> MetaResult<PbSecret> {
30 let inner = self.inner.read().await;
31 let (secret, obj) = Secret::find_by_id(secret_id)
32 .find_also_related(Object)
33 .one(&inner.db)
34 .await?
35 .ok_or_else(|| MetaError::catalog_id_not_found("secret", secret_id))?;
36 Ok(ObjectModel(secret, obj.unwrap(), None).into())
37 }
38
39 pub async fn get_object_database_id(
40 &self,
41 object_id: impl Into<ObjectId>,
42 ) -> MetaResult<DatabaseId> {
43 let object_id = object_id.into();
44 let inner = self.inner.read().await;
45 let (database_id,): (Option<DatabaseId>,) = Object::find_by_id(object_id)
46 .select_only()
47 .select_column(object::Column::DatabaseId)
48 .into_tuple()
49 .one(&inner.db)
50 .await?
51 .ok_or_else(|| MetaError::catalog_id_not_found("object", object_id))?;
52 Ok(database_id.ok_or_else(|| anyhow!("object has no database id: {object_id}"))?)
53 }
54
55 pub async fn get_connection_by_id(
56 &self,
57 connection_id: ConnectionId,
58 ) -> MetaResult<PbConnection> {
59 let inner = self.inner.read().await;
60 let (conn, obj) = Connection::find_by_id(connection_id)
61 .find_also_related(Object)
62 .one(&inner.db)
63 .await?
64 .ok_or_else(|| MetaError::catalog_id_not_found("connection", connection_id))?;
65
66 Ok(ObjectModel(conn, obj.unwrap(), None).into())
67 }
68
69 pub async fn get_table_catalog_by_name(
70 &self,
71 database_id: DatabaseId,
72 schema_id: SchemaId,
73 name: &str,
74 ) -> MetaResult<Option<PbTable>> {
75 let inner = self.inner.read().await;
76 let table_obj = Table::find()
77 .find_also_related(Object)
78 .filter(
79 table::Column::Name
80 .eq(name)
81 .and(object::Column::DatabaseId.eq(database_id))
82 .and(object::Column::SchemaId.eq(schema_id)),
83 )
84 .one(&inner.db)
85 .await?;
86 if let Some((table, obj)) = table_obj {
87 let streaming_job = streaming_job::Entity::find_by_id(table.job_id())
88 .one(&inner.db)
89 .await?;
90 Ok(Some(ObjectModel(table, obj.unwrap(), streaming_job).into()))
91 } else {
92 Ok(None)
93 }
94 }
95
96 pub async fn get_table_by_name(
97 &self,
98 database_name: &str,
99 table_name: &str,
100 ) -> MetaResult<Option<PbTable>> {
101 let inner = self.inner.read().await;
102 let table_obj = Table::find()
103 .find_also_related(Object)
104 .join(JoinType::InnerJoin, object::Relation::Database2.def())
105 .filter(
106 table::Column::Name
107 .eq(table_name)
108 .and(database::Column::Name.eq(database_name)),
109 )
110 .one(&inner.db)
111 .await?;
112 if let Some((table, obj)) = table_obj {
113 let streaming_job = streaming_job::Entity::find_by_id(table.job_id())
114 .one(&inner.db)
115 .await?;
116 Ok(Some(ObjectModel(table, obj.unwrap(), streaming_job).into()))
117 } else {
118 Ok(None)
119 }
120 }
121
122 pub async fn get_table_associated_source_id(
123 &self,
124 table_id: TableId,
125 ) -> MetaResult<Option<SourceId>> {
126 let inner = self.inner.read().await;
127 Table::find_by_id(table_id)
128 .select_only()
129 .select_column(table::Column::OptionalAssociatedSourceId)
130 .into_tuple()
131 .one(&inner.db)
132 .await?
133 .ok_or_else(|| MetaError::catalog_id_not_found("table", table_id))
134 }
135
136 pub async fn get_table_by_associate_source_id(
137 &self,
138 associated_source_id: SourceId,
139 ) -> MetaResult<PbTable> {
140 let inner = self.inner.read().await;
141 let table_obj = Table::find()
142 .find_also_related(Object)
143 .filter(table::Column::OptionalAssociatedSourceId.eq(associated_source_id))
144 .one(&inner.db)
145 .await?
146 .ok_or_else(|| {
147 MetaError::catalog_id_not_found("table associated source", associated_source_id)
148 })?;
149 let (table, obj) = table_obj;
150 let streaming_job = streaming_job::Entity::find_by_id(table.job_id())
151 .one(&inner.db)
152 .await?;
153 Ok(ObjectModel(table, obj.unwrap(), streaming_job).into())
154 }
155
156 pub async fn get_table_by_id(&self, table_id: TableId) -> MetaResult<PbTable> {
157 let inner = self.inner.read().await;
158 let table_obj = Table::find_by_id(table_id)
159 .find_also_related(Object)
160 .one(&inner.db)
161 .await?;
162 if let Some((table, obj)) = table_obj {
163 let streaming_job = streaming_job::Entity::find_by_id(table.job_id())
164 .one(&inner.db)
165 .await?;
166 Ok(ObjectModel(table, obj.unwrap(), streaming_job).into())
167 } else {
168 Err(MetaError::catalog_id_not_found("table", table_id))
169 }
170 }
171
172 pub async fn get_user_created_table_by_ids(
173 &self,
174 job_ids: impl Iterator<Item = JobId>,
175 ) -> MetaResult<Vec<PbTable>> {
176 let inner = self.inner.read().await;
177 self.get_user_created_table_by_ids_in_txn(&inner.db, job_ids)
178 .await
179 }
180
181 pub async fn get_user_created_table_by_ids_in_txn<C>(
182 &self,
183 txn: &C,
184 job_ids: impl Iterator<Item = JobId>,
185 ) -> MetaResult<Vec<PbTable>>
186 where
187 C: ConnectionTrait,
188 {
189 let table_objs = Table::find()
190 .find_also_related(Object)
191 .filter(
192 table::Column::TableId
193 .is_in(job_ids.map(|job_id| job_id.as_mv_table_id()).collect_vec())
194 .and(table::Column::TableType.eq(TableType::Table)),
195 )
196 .all(txn)
197 .await?;
198 let streaming_jobs = streaming_job::Entity::find()
199 .filter(
200 streaming_job::Column::JobId.is_in(
201 table_objs
202 .iter()
203 .map(|(table, _)| table.job_id())
204 .collect_vec(),
205 ),
206 )
207 .all(txn)
208 .await?
209 .into_iter()
210 .map(|job| (job.job_id, job))
211 .collect::<HashMap<JobId, streaming_job::Model>>();
212 Ok(table_objs
213 .into_iter()
214 .map(|(table, obj)| {
215 let job_id = table.job_id();
216 let streaming_job = streaming_jobs.get(&job_id).cloned();
217 ObjectModel(table, obj.unwrap(), streaming_job).into()
218 })
219 .collect())
220 }
221
222 pub async fn get_table_by_ids(
223 &self,
224 table_ids: Vec<TableId>,
225 include_dropped_table: bool,
226 ) -> MetaResult<Vec<PbTable>> {
227 let inner = self.inner.read().await;
228 let table_objs = Table::find()
229 .find_also_related(Object)
230 .filter(table::Column::TableId.is_in(table_ids.clone()))
231 .all(&inner.db)
232 .await?;
233 let streaming_jobs = load_streaming_jobs_by_ids(
234 &inner.db,
235 table_objs.iter().map(|(table, _)| table.job_id()),
236 )
237 .await?;
238 let tables = table_objs.into_iter().map(|(table, obj)| {
239 let job_id = table.job_id();
240 let streaming_job = streaming_jobs.get(&job_id).cloned();
241 ObjectModel(table, obj.unwrap(), streaming_job).into()
242 });
243 let tables = if include_dropped_table {
244 tables
245 .chain(inner.dropped_tables.iter().filter_map(|(id, t)| {
246 if table_ids.contains(id) {
247 Some(t.clone())
248 } else {
249 None
250 }
251 }))
252 .collect()
253 } else {
254 tables.collect()
255 };
256 Ok(tables)
257 }
258
259 pub async fn get_table_columns(&self, id: TableId) -> MetaResult<Vec<ColumnCatalog>> {
260 let inner = self.inner.read().await;
261 Ok(get_table_columns(&inner.db, id)
262 .await?
263 .to_protobuf()
264 .into_iter()
265 .map(|col| col.into())
266 .collect())
267 }
268
269 pub async fn get_sink_by_id(&self, sink_id: SinkId) -> MetaResult<Option<PbSink>> {
270 let inner = self.inner.read().await;
271 let sink_objs = Sink::find_by_id(sink_id)
272 .find_also_related(Object)
273 .one(&inner.db)
274 .await?;
275 if let Some((sink, obj)) = sink_objs {
276 let streaming_job = streaming_job::Entity::find_by_id(sink.sink_id.as_job_id())
277 .one(&inner.db)
278 .await?;
279 Ok(Some(ObjectModel(sink, obj.unwrap(), streaming_job).into()))
280 } else {
281 Ok(None)
282 }
283 }
284
285 pub async fn get_sink_auto_refresh_schema_from(
286 &self,
287 table_id: TableId,
288 ) -> MetaResult<Vec<PbSink>> {
289 let inner = self.inner.read().await;
290 let sink_objs = Sink::find()
291 .find_also_related(Object)
292 .filter(sink::Column::AutoRefreshSchemaFromTable.eq(table_id))
293 .all(&inner.db)
294 .await?;
295 let streaming_jobs = load_streaming_jobs_by_ids(
296 &inner.db,
297 sink_objs.iter().map(|(sink, _)| sink.sink_id.as_job_id()),
298 )
299 .await?;
300 Ok(sink_objs
301 .into_iter()
302 .map(|(sink, obj)| {
303 let streaming_job = streaming_jobs.get(&sink.sink_id.as_job_id()).cloned();
304 ObjectModel(sink, obj.unwrap(), streaming_job).into()
305 })
306 .collect())
307 }
308
309 pub async fn get_sink_state_table_ids(&self, sink_id: SinkId) -> MetaResult<Vec<TableId>> {
310 let inner = self.inner.read().await;
311 let tables: Vec<I32Array> = Fragment::find()
312 .select_only()
313 .column(fragment::Column::StateTableIds)
314 .filter(fragment::Column::JobId.eq(sink_id))
315 .into_tuple()
316 .all(&inner.db)
317 .await?;
318 Ok(tables
319 .into_iter()
320 .flat_map(|ids| {
321 ids.into_inner()
322 .into_iter()
323 .map(|table_id| TableId::new(table_id as _))
324 })
325 .collect())
326 }
327
328 pub async fn get_subscription_by_id(
329 &self,
330 subscription_id: SubscriptionId,
331 ) -> MetaResult<PbSubscription> {
332 let inner = self.inner.read().await;
333 let subscription_objs = Subscription::find()
334 .find_also_related(Object)
335 .filter(subscription::Column::SubscriptionId.eq(subscription_id))
336 .all(&inner.db)
337 .await?;
338 let subscription: PbSubscription = subscription_objs
339 .into_iter()
340 .map(|(subscription, obj)| ObjectModel(subscription, obj.unwrap(), None).into())
341 .find_or_first(|_| true)
342 .ok_or_else(|| anyhow!("cannot find subscription with id {}", subscription_id))?;
343
344 Ok(subscription)
345 }
346
347 pub async fn get_mv_depended_subscriptions(
348 &self,
349 database_id: Option<DatabaseId>,
350 ) -> MetaResult<HashMap<TableId, HashMap<SubscriptionId, u64>>> {
351 let inner = self.inner.read().await;
352 let select = Subscription::find()
353 .select_only()
354 .select_column(subscription::Column::SubscriptionId)
355 .select_column(subscription::Column::DependentTableId)
356 .select_column(subscription::Column::RetentionSeconds);
357 let select = if let Some(database_id) = database_id {
358 select
359 .join(JoinType::InnerJoin, subscription::Relation::Object.def())
360 .filter(object::Column::DatabaseId.eq(database_id))
361 } else {
362 select
363 };
364 let subscription_objs: Vec<(SubscriptionId, TableId, i64)> =
365 select.into_tuple().all(&inner.db).await?;
366 let mut map: HashMap<_, HashMap<_, _>> = HashMap::new();
367 for (subscription_id, dependent_table_id, retention_seconds) in subscription_objs {
369 map.entry(dependent_table_id)
370 .or_default()
371 .insert(subscription_id, retention_seconds as _);
372 }
373 Ok(map)
374 }
375
376 pub async fn get_all_table_options(&self) -> MetaResult<HashMap<TableId, TableOption>> {
377 let inner = self.inner.read().await;
378 let table_options: Vec<(TableId, Option<i32>)> = Table::find()
379 .select_only()
380 .columns([table::Column::TableId, table::Column::RetentionSeconds])
381 .into_tuple::<(TableId, Option<i32>)>()
382 .all(&inner.db)
383 .await?;
384
385 Ok(table_options
386 .into_iter()
387 .map(|(id, retention_seconds)| {
388 (
389 id,
390 TableOption {
391 retention_seconds: retention_seconds.map(|i| i.try_into().unwrap()),
392 },
393 )
394 })
395 .collect())
396 }
397
398 pub async fn get_all_streaming_parallelisms(
399 &self,
400 ) -> MetaResult<HashMap<ObjectId, StreamingParallelism>> {
401 let inner = self.inner.read().await;
402
403 let job_parallelisms = StreamingJob::find()
404 .select_only()
405 .columns([
406 streaming_job::Column::JobId,
407 streaming_job::Column::Parallelism,
408 ])
409 .into_tuple::<(ObjectId, StreamingParallelism)>()
410 .all(&inner.db)
411 .await?;
412
413 Ok(job_parallelisms
414 .into_iter()
415 .collect::<HashMap<ObjectId, StreamingParallelism>>())
416 }
417
418 pub async fn get_table_name_type_mapping(
419 &self,
420 ) -> MetaResult<HashMap<TableId, (String, String)>> {
421 let inner = self.inner.read().await;
422 let table_name_types: Vec<(TableId, String, TableType)> = Table::find()
423 .select_only()
424 .columns([
425 table::Column::TableId,
426 table::Column::Name,
427 table::Column::TableType,
428 ])
429 .into_tuple()
430 .all(&inner.db)
431 .await?;
432 Ok(table_name_types
433 .into_iter()
434 .map(|(id, name, table_type)| {
435 (
436 id,
437 (name, PbTableType::from(table_type).as_str_name().to_owned()),
438 )
439 })
440 .collect())
441 }
442
443 pub async fn get_table_by_cdc_table_id(
444 &self,
445 cdc_table_id: &String,
446 ) -> MetaResult<Vec<PbTable>> {
447 let inner = self.inner.read().await;
448 let table_objs = Table::find()
449 .find_also_related(Object)
450 .filter(table::Column::CdcTableId.eq(cdc_table_id))
451 .all(&inner.db)
452 .await?;
453 let streaming_jobs = load_streaming_jobs_by_ids(
454 &inner.db,
455 table_objs.iter().map(|(table, _)| table.job_id()),
456 )
457 .await?;
458 Ok(table_objs
459 .into_iter()
460 .map(|(table, obj)| {
461 let job_id = table.job_id();
462 let streaming_job = streaming_jobs.get(&job_id).cloned();
463 ObjectModel(table, obj.unwrap(), streaming_job).into()
464 })
465 .collect())
466 }
467
468 pub async fn get_created_table_ids(&self) -> MetaResult<Vec<TableId>> {
469 let inner = self.inner.read().await;
470
471 let mut table_ids: Vec<TableId> = StreamingJob::find()
473 .select_only()
474 .column(streaming_job::Column::JobId)
475 .filter(streaming_job::Column::JobStatus.eq(JobStatus::Created))
476 .into_tuple()
477 .all(&inner.db)
478 .await?;
479
480 let internal_table_ids: Vec<TableId> = Table::find()
482 .select_only()
483 .column(table::Column::TableId)
484 .filter(table::Column::BelongsToJobId.is_in(table_ids.clone()))
485 .into_tuple()
486 .all(&inner.db)
487 .await?;
488 table_ids.extend(internal_table_ids);
489
490 Ok(table_ids)
491 }
492
493 pub async fn get_versioned_table_schemas(
496 &self,
497 ) -> MetaResult<HashMap<risingwave_common::catalog::TableId, Vec<i32>>> {
498 let res = self
499 .list_all_state_tables()
500 .await?
501 .into_iter()
502 .filter_map(|t| {
503 if t.version.is_some() {
504 let ret = (
505 t.id,
506 t.columns
507 .iter()
508 .map(|c| c.column_desc.as_ref().unwrap().column_id)
509 .collect_vec(),
510 );
511 return Some(ret);
512 }
513 None
514 })
515 .collect();
516 Ok(res)
517 }
518
519 pub async fn get_existing_job_resource_group(
520 &self,
521 streaming_job_id: JobId,
522 ) -> MetaResult<String> {
523 let inner = self.inner.read().await;
524 get_existing_job_resource_group(&inner.db, streaming_job_id).await
525 }
526
527 pub async fn get_database_resource_group(&self, database_id: DatabaseId) -> MetaResult<String> {
528 let inner = self.inner.read().await;
529 get_database_resource_group(&inner.db, database_id).await
530 }
531
532 pub async fn get_existing_job_resource_groups(
533 &self,
534 streaming_job_ids: Vec<JobId>,
535 ) -> MetaResult<HashMap<JobId, String>> {
536 let inner = self.inner.read().await;
537 let mut resource_groups = HashMap::new();
538 for job_id in streaming_job_ids {
539 let resource_group = get_existing_job_resource_group(&inner.db, job_id).await?;
540 resource_groups.insert(job_id, resource_group);
541 }
542
543 Ok(resource_groups)
544 }
545
546 pub async fn get_existing_job_database_resource_group(
547 &self,
548 streaming_job_id: JobId,
549 ) -> MetaResult<String> {
550 let inner = self.inner.read().await;
551 let database_id: DatabaseId = StreamingJob::find_by_id(streaming_job_id)
552 .select_only()
553 .join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
554 .column(object::Column::DatabaseId)
555 .into_tuple()
556 .one(&inner.db)
557 .await?
558 .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", streaming_job_id))?;
559
560 get_database_resource_group(&inner.db, database_id).await
561 }
562
563 pub async fn get_job_streaming_parallelisms(
564 &self,
565 streaming_job_id: JobId,
566 ) -> MetaResult<StreamingParallelism> {
567 let inner = self.inner.read().await;
568
569 let job_parallelism: StreamingParallelism = StreamingJob::find_by_id(streaming_job_id)
570 .select_only()
571 .column(streaming_job::Column::Parallelism)
572 .into_tuple()
573 .one(&inner.db)
574 .await?
575 .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", streaming_job_id))?;
576
577 Ok(job_parallelism)
578 }
579
580 pub async fn get_job_parallelisms(
581 &self,
582 streaming_job_id: JobId,
583 ) -> MetaResult<(StreamingParallelism, Option<StreamingParallelism>)> {
584 let inner = self.inner.read().await;
585
586 let (job_parallelism, backfill_parallelism): (
587 StreamingParallelism,
588 Option<StreamingParallelism>,
589 ) = StreamingJob::find_by_id(streaming_job_id)
590 .select_only()
591 .column(streaming_job::Column::Parallelism)
592 .column(streaming_job::Column::BackfillParallelism)
593 .into_tuple()
594 .one(&inner.db)
595 .await?
596 .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", streaming_job_id))?;
597
598 Ok((job_parallelism, backfill_parallelism))
599 }
600
601 pub async fn get_fragment_streaming_job_id(
602 &self,
603 fragment_id: FragmentId,
604 ) -> MetaResult<JobId> {
605 let inner = self.inner.read().await;
606 let job_id: JobId = Fragment::find_by_id(fragment_id)
607 .select_only()
608 .column(fragment::Column::JobId)
609 .into_tuple()
610 .one(&inner.db)
611 .await?
612 .ok_or_else(|| MetaError::catalog_id_not_found("fragment", fragment_id))?;
613 Ok(job_id)
614 }
615
616 pub async fn list_streaming_job_with_database(
617 &self,
618 ) -> MetaResult<HashMap<DatabaseId, Vec<JobId>>> {
619 let inner = self.inner.read().await;
620 let database_objects: Vec<(DatabaseId, JobId)> = StreamingJob::find()
621 .select_only()
622 .column(object::Column::DatabaseId)
623 .column(streaming_job::Column::JobId)
624 .join(JoinType::LeftJoin, streaming_job::Relation::Object.def())
625 .into_tuple()
626 .all(&inner.db)
627 .await?;
628
629 Ok(database_objects.into_iter().into_group_map())
630 }
631
632 pub async fn list_table_objects(
634 &self,
635 ) -> MetaResult<Vec<(TableId, String, String, String, String)>> {
636 let inner = self.inner.read().await;
637 Ok(Object::find()
638 .select_only()
639 .join(JoinType::InnerJoin, object::Relation::Table.def())
640 .join(JoinType::InnerJoin, object::Relation::Database2.def())
641 .join(JoinType::InnerJoin, object::Relation::Schema2.def())
642 .column(object::Column::Oid)
643 .column(database::Column::Name)
644 .column(schema::Column::Name)
645 .column(table::Column::Name)
646 .column(database::Column::ResourceGroup)
647 .into_tuple()
648 .all(&inner.db)
649 .await?)
650 }
651
652 pub async fn list_source_objects(
654 &self,
655 ) -> MetaResult<Vec<(TableId, String, String, String, String)>> {
656 let inner = self.inner.read().await;
657 Ok(Object::find()
658 .select_only()
659 .join(JoinType::InnerJoin, object::Relation::Source.def())
660 .join(JoinType::InnerJoin, object::Relation::Database2.def())
661 .join(JoinType::InnerJoin, object::Relation::Schema2.def())
662 .column(object::Column::Oid)
663 .column(database::Column::Name)
664 .column(schema::Column::Name)
665 .column(source::Column::Name)
666 .column(database::Column::ResourceGroup)
667 .into_tuple()
668 .all(&inner.db)
669 .await?)
670 }
671
672 pub async fn list_sink_objects(
674 &self,
675 ) -> MetaResult<Vec<(TableId, String, String, String, String)>> {
676 let inner = self.inner.read().await;
677 Ok(Object::find()
678 .select_only()
679 .join(JoinType::InnerJoin, object::Relation::Sink.def())
680 .join(JoinType::InnerJoin, object::Relation::Database2.def())
681 .join(JoinType::InnerJoin, object::Relation::Schema2.def())
682 .column(object::Column::Oid)
683 .column(database::Column::Name)
684 .column(schema::Column::Name)
685 .column(sink::Column::Name)
686 .column(database::Column::ResourceGroup)
687 .into_tuple()
688 .all(&inner.db)
689 .await?)
690 }
691
692 pub async fn get_streaming_job_status(&self, streaming_job_id: JobId) -> MetaResult<JobStatus> {
693 let inner = self.inner.read().await;
694 let status = StreamingJob::find_by_id(streaming_job_id)
695 .select_only()
696 .column(streaming_job::Column::JobStatus)
697 .into_tuple()
698 .one(&inner.db)
699 .await?
700 .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", streaming_job_id))?;
701 Ok(status)
702 }
703
704 pub async fn get_streaming_job_extra_info(
705 &self,
706 job_ids: Vec<JobId>,
707 ) -> MetaResult<HashMap<JobId, StreamingJobExtraInfo>> {
708 let inner = self.inner.read().await;
709 let txn = inner.db.begin().await?;
710
711 self.get_streaming_job_extra_info_in_txn(&txn, job_ids)
712 .await
713 }
714
715 pub async fn get_streaming_job_extra_info_in_txn<C>(
716 &self,
717 txn: &C,
718 job_ids: Vec<JobId>,
719 ) -> MetaResult<HashMap<JobId, StreamingJobExtraInfo>>
720 where
721 C: ConnectionTrait,
722 {
723 fetch_streaming_job_extra_info(txn, job_ids).await
724 }
725}