1use risingwave_common::catalog::ColumnCatalog;
16use risingwave_common::id::JobId;
17use risingwave_meta_model::table::RefreshState;
18
19use super::*;
20use crate::controller::utils::{
21 StreamingJobExtraInfo, get_database_resource_group, get_existing_job_resource_group,
22 get_streaming_job_extra_info as fetch_streaming_job_extra_info, get_table_columns,
23};
24
25impl CatalogController {
26 pub async fn get_secret_by_id(&self, secret_id: SecretId) -> MetaResult<PbSecret> {
27 let inner = self.inner.read().await;
28 let (secret, obj) = Secret::find_by_id(secret_id)
29 .find_also_related(Object)
30 .one(&inner.db)
31 .await?
32 .ok_or_else(|| MetaError::catalog_id_not_found("secret", secret_id))?;
33 Ok(ObjectModel(secret, obj.unwrap()).into())
34 }
35
36 pub async fn get_object_database_id(
37 &self,
38 object_id: impl Into<ObjectId>,
39 ) -> MetaResult<DatabaseId> {
40 let object_id = object_id.into();
41 let inner = self.inner.read().await;
42 let (database_id,): (Option<DatabaseId>,) = Object::find_by_id(object_id)
43 .select_only()
44 .select_column(object::Column::DatabaseId)
45 .into_tuple()
46 .one(&inner.db)
47 .await?
48 .ok_or_else(|| MetaError::catalog_id_not_found("object", object_id))?;
49 Ok(database_id.ok_or_else(|| anyhow!("object has no database id: {object_id}"))?)
50 }
51
52 pub async fn get_connection_by_id(
53 &self,
54 connection_id: ConnectionId,
55 ) -> MetaResult<PbConnection> {
56 let inner = self.inner.read().await;
57 let (conn, obj) = Connection::find_by_id(connection_id)
58 .find_also_related(Object)
59 .one(&inner.db)
60 .await?
61 .ok_or_else(|| MetaError::catalog_id_not_found("connection", connection_id))?;
62
63 Ok(ObjectModel(conn, obj.unwrap()).into())
64 }
65
66 pub async fn get_table_catalog_by_name(
67 &self,
68 database_id: DatabaseId,
69 schema_id: SchemaId,
70 name: &str,
71 ) -> MetaResult<Option<PbTable>> {
72 let inner = self.inner.read().await;
73 let table_obj = Table::find()
74 .find_also_related(Object)
75 .filter(
76 table::Column::Name
77 .eq(name)
78 .and(object::Column::DatabaseId.eq(database_id))
79 .and(object::Column::SchemaId.eq(schema_id)),
80 )
81 .one(&inner.db)
82 .await?;
83 Ok(table_obj.map(|(table, obj)| ObjectModel(table, obj.unwrap()).into()))
84 }
85
86 pub async fn get_table_by_name(
87 &self,
88 database_name: &str,
89 table_name: &str,
90 ) -> MetaResult<Option<PbTable>> {
91 let inner = self.inner.read().await;
92 let table_obj = Table::find()
93 .find_also_related(Object)
94 .join(JoinType::InnerJoin, object::Relation::Database2.def())
95 .filter(
96 table::Column::Name
97 .eq(table_name)
98 .and(database::Column::Name.eq(database_name)),
99 )
100 .one(&inner.db)
101 .await?;
102 Ok(table_obj.map(|(table, obj)| ObjectModel(table, obj.unwrap()).into()))
103 }
104
105 pub async fn get_table_associated_source_id(
106 &self,
107 table_id: TableId,
108 ) -> MetaResult<Option<SourceId>> {
109 let inner = self.inner.read().await;
110 Table::find_by_id(table_id)
111 .select_only()
112 .select_column(table::Column::OptionalAssociatedSourceId)
113 .into_tuple()
114 .one(&inner.db)
115 .await?
116 .ok_or_else(|| MetaError::catalog_id_not_found("table", table_id))
117 }
118
119 pub async fn get_table_by_associate_source_id(
120 &self,
121 associated_source_id: SourceId,
122 ) -> MetaResult<PbTable> {
123 let inner = self.inner.read().await;
124 Table::find()
125 .find_also_related(Object)
126 .filter(table::Column::OptionalAssociatedSourceId.eq(associated_source_id))
127 .one(&inner.db)
128 .await?
129 .map(|(table, obj)| ObjectModel(table, obj.unwrap()).into())
130 .ok_or_else(|| {
131 MetaError::catalog_id_not_found("table associated source", associated_source_id)
132 })
133 }
134
135 pub async fn get_table_by_id(&self, table_id: TableId) -> MetaResult<PbTable> {
136 let inner = self.inner.read().await;
137 let table_obj = Table::find_by_id(table_id)
138 .find_also_related(Object)
139 .one(&inner.db)
140 .await?;
141 if let Some((table, obj)) = table_obj {
142 Ok(ObjectModel(table, obj.unwrap()).into())
143 } else {
144 Err(MetaError::catalog_id_not_found("table", table_id))
145 }
146 }
147
148 pub async fn get_user_created_table_by_ids(
149 &self,
150 job_ids: impl Iterator<Item = JobId>,
151 ) -> MetaResult<Vec<PbTable>> {
152 let inner = self.inner.read().await;
153 let table_objs = Table::find()
154 .find_also_related(Object)
155 .filter(
156 table::Column::TableId
157 .is_in(job_ids.map(|job_id| job_id.as_mv_table_id()).collect_vec())
158 .and(table::Column::TableType.eq(TableType::Table)),
159 )
160 .all(&inner.db)
161 .await?;
162 let tables = table_objs
163 .into_iter()
164 .map(|(table, obj)| ObjectModel(table, obj.unwrap()).into())
165 .collect();
166 Ok(tables)
167 }
168
169 pub async fn get_table_by_ids(
170 &self,
171 table_ids: Vec<TableId>,
172 include_dropped_table: bool,
173 ) -> MetaResult<Vec<PbTable>> {
174 let inner = self.inner.read().await;
175 let table_objs = Table::find()
176 .find_also_related(Object)
177 .filter(table::Column::TableId.is_in(table_ids.clone()))
178 .all(&inner.db)
179 .await?;
180 let tables = table_objs
181 .into_iter()
182 .map(|(table, obj)| ObjectModel(table, obj.unwrap()).into());
183 let tables = if include_dropped_table {
184 tables
185 .chain(inner.dropped_tables.iter().filter_map(|(id, t)| {
186 if table_ids.contains(id) {
187 Some(t.clone())
188 } else {
189 None
190 }
191 }))
192 .collect()
193 } else {
194 tables.collect()
195 };
196 Ok(tables)
197 }
198
199 pub async fn get_table_columns(&self, id: TableId) -> MetaResult<Vec<ColumnCatalog>> {
200 let inner = self.inner.read().await;
201 Ok(get_table_columns(&inner.db, id)
202 .await?
203 .to_protobuf()
204 .into_iter()
205 .map(|col| col.into())
206 .collect())
207 }
208
209 pub async fn get_table_incoming_sinks(&self, table_id: TableId) -> MetaResult<Vec<PbSink>> {
210 let inner = self.inner.read().await;
211 let sink_objs = Sink::find()
212 .find_also_related(Object)
213 .filter(sink::Column::TargetTable.eq(table_id))
214 .all(&inner.db)
215 .await?;
216 Ok(sink_objs
217 .into_iter()
218 .map(|(sink, obj)| ObjectModel(sink, obj.unwrap()).into())
219 .collect())
220 }
221
222 pub async fn get_table_refresh_state(
224 &self,
225 table_id: TableId,
226 ) -> MetaResult<Option<RefreshState>> {
227 let inner = self.inner.read().await;
228 let (refresh_state,): (Option<RefreshState>,) = Table::find_by_id(table_id)
229 .select_only()
230 .select_column(table::Column::RefreshState)
231 .into_tuple()
232 .one(&inner.db)
233 .await?
234 .ok_or_else(|| MetaError::catalog_id_not_found("table", table_id))?;
235
236 Ok(Some(refresh_state.unwrap_or(RefreshState::Idle)))
238 }
239
240 pub async fn get_sink_by_id(&self, sink_id: SinkId) -> MetaResult<Option<PbSink>> {
241 let inner = self.inner.read().await;
242 let sink_objs = Sink::find_by_id(sink_id)
243 .find_also_related(Object)
244 .one(&inner.db)
245 .await?;
246 Ok(sink_objs.map(|(sink, obj)| ObjectModel(sink, obj.unwrap()).into()))
247 }
248
249 pub async fn get_sink_auto_refresh_schema_from(
250 &self,
251 table_id: TableId,
252 ) -> MetaResult<Vec<PbSink>> {
253 let inner = self.inner.read().await;
254 let sink_objs = Sink::find()
255 .find_also_related(Object)
256 .filter(sink::Column::AutoRefreshSchemaFromTable.eq(table_id))
257 .all(&inner.db)
258 .await?;
259 Ok(sink_objs
260 .into_iter()
261 .map(|(sink, obj)| ObjectModel(sink, obj.unwrap()).into())
262 .collect())
263 }
264
265 pub async fn get_sink_state_table_ids(&self, sink_id: SinkId) -> MetaResult<Vec<TableId>> {
266 let inner = self.inner.read().await;
267 let tables: Vec<I32Array> = Fragment::find()
268 .select_only()
269 .column(fragment::Column::StateTableIds)
270 .filter(fragment::Column::JobId.eq(sink_id))
271 .into_tuple()
272 .all(&inner.db)
273 .await?;
274 Ok(tables
275 .into_iter()
276 .flat_map(|ids| {
277 ids.into_inner()
278 .into_iter()
279 .map(|table_id| TableId::new(table_id as _))
280 })
281 .collect())
282 }
283
284 pub async fn get_subscription_by_id(
285 &self,
286 subscription_id: SubscriptionId,
287 ) -> MetaResult<PbSubscription> {
288 let inner = self.inner.read().await;
289 let subscription_objs = Subscription::find()
290 .find_also_related(Object)
291 .filter(subscription::Column::SubscriptionId.eq(subscription_id))
292 .all(&inner.db)
293 .await?;
294 let subscription: PbSubscription = subscription_objs
295 .into_iter()
296 .map(|(subscription, obj)| ObjectModel(subscription, obj.unwrap()).into())
297 .find_or_first(|_| true)
298 .ok_or_else(|| anyhow!("cannot find subscription with id {}", subscription_id))?;
299
300 Ok(subscription)
301 }
302
303 pub async fn get_mv_depended_subscriptions(
304 &self,
305 database_id: Option<DatabaseId>,
306 ) -> MetaResult<HashMap<TableId, HashMap<SubscriptionId, u64>>> {
307 let inner = self.inner.read().await;
308 let select = Subscription::find()
309 .select_only()
310 .select_column(subscription::Column::SubscriptionId)
311 .select_column(subscription::Column::DependentTableId)
312 .select_column(subscription::Column::RetentionSeconds);
313 let select = if let Some(database_id) = database_id {
314 select
315 .join(JoinType::InnerJoin, subscription::Relation::Object.def())
316 .filter(object::Column::DatabaseId.eq(database_id))
317 } else {
318 select
319 };
320 let subscription_objs: Vec<(SubscriptionId, TableId, i64)> =
321 select.into_tuple().all(&inner.db).await?;
322 let mut map: HashMap<_, HashMap<_, _>> = HashMap::new();
323 for (subscription_id, dependent_table_id, retention_seconds) in subscription_objs {
325 map.entry(dependent_table_id)
326 .or_default()
327 .insert(subscription_id, retention_seconds as _);
328 }
329 Ok(map)
330 }
331
332 pub async fn get_all_table_options(&self) -> MetaResult<HashMap<TableId, TableOption>> {
333 let inner = self.inner.read().await;
334 let table_options: Vec<(TableId, Option<i32>)> = Table::find()
335 .select_only()
336 .columns([table::Column::TableId, table::Column::RetentionSeconds])
337 .into_tuple::<(TableId, Option<i32>)>()
338 .all(&inner.db)
339 .await?;
340
341 Ok(table_options
342 .into_iter()
343 .map(|(id, retention_seconds)| {
344 (
345 id,
346 TableOption {
347 retention_seconds: retention_seconds.map(|i| i.try_into().unwrap()),
348 },
349 )
350 })
351 .collect())
352 }
353
354 pub async fn get_all_streaming_parallelisms(
355 &self,
356 ) -> MetaResult<HashMap<ObjectId, StreamingParallelism>> {
357 let inner = self.inner.read().await;
358
359 let job_parallelisms = StreamingJob::find()
360 .select_only()
361 .columns([
362 streaming_job::Column::JobId,
363 streaming_job::Column::Parallelism,
364 ])
365 .into_tuple::<(ObjectId, StreamingParallelism)>()
366 .all(&inner.db)
367 .await?;
368
369 Ok(job_parallelisms
370 .into_iter()
371 .collect::<HashMap<ObjectId, StreamingParallelism>>())
372 }
373
374 pub async fn get_table_name_type_mapping(
375 &self,
376 ) -> MetaResult<HashMap<TableId, (String, String)>> {
377 let inner = self.inner.read().await;
378 let table_name_types: Vec<(TableId, String, TableType)> = Table::find()
379 .select_only()
380 .columns([
381 table::Column::TableId,
382 table::Column::Name,
383 table::Column::TableType,
384 ])
385 .into_tuple()
386 .all(&inner.db)
387 .await?;
388 Ok(table_name_types
389 .into_iter()
390 .map(|(id, name, table_type)| {
391 (
392 id,
393 (name, PbTableType::from(table_type).as_str_name().to_owned()),
394 )
395 })
396 .collect())
397 }
398
399 pub async fn get_table_by_cdc_table_id(
400 &self,
401 cdc_table_id: &String,
402 ) -> MetaResult<Vec<PbTable>> {
403 let inner = self.inner.read().await;
404 let table_objs = Table::find()
405 .find_also_related(Object)
406 .filter(table::Column::CdcTableId.eq(cdc_table_id))
407 .all(&inner.db)
408 .await?;
409 Ok(table_objs
410 .into_iter()
411 .map(|(table, obj)| ObjectModel(table, obj.unwrap()).into())
412 .collect())
413 }
414
415 pub async fn get_created_table_ids(&self) -> MetaResult<Vec<TableId>> {
416 let inner = self.inner.read().await;
417
418 let mut table_ids: Vec<TableId> = StreamingJob::find()
420 .select_only()
421 .column(streaming_job::Column::JobId)
422 .filter(streaming_job::Column::JobStatus.eq(JobStatus::Created))
423 .into_tuple()
424 .all(&inner.db)
425 .await?;
426
427 let internal_table_ids: Vec<TableId> = Table::find()
429 .select_only()
430 .column(table::Column::TableId)
431 .filter(table::Column::BelongsToJobId.is_in(table_ids.clone()))
432 .into_tuple()
433 .all(&inner.db)
434 .await?;
435 table_ids.extend(internal_table_ids);
436
437 Ok(table_ids)
438 }
439
440 pub async fn get_versioned_table_schemas(
443 &self,
444 ) -> MetaResult<HashMap<risingwave_common::catalog::TableId, Vec<i32>>> {
445 let res = self
446 .list_all_state_tables()
447 .await?
448 .into_iter()
449 .filter_map(|t| {
450 if t.version.is_some() {
451 let ret = (
452 t.id,
453 t.columns
454 .iter()
455 .map(|c| c.column_desc.as_ref().unwrap().column_id)
456 .collect_vec(),
457 );
458 return Some(ret);
459 }
460 None
461 })
462 .collect();
463 Ok(res)
464 }
465
466 pub async fn get_existing_job_resource_group(
467 &self,
468 streaming_job_id: JobId,
469 ) -> MetaResult<String> {
470 let inner = self.inner.read().await;
471 get_existing_job_resource_group(&inner.db, streaming_job_id).await
472 }
473
474 pub async fn get_database_resource_group(&self, database_id: DatabaseId) -> MetaResult<String> {
475 let inner = self.inner.read().await;
476 get_database_resource_group(&inner.db, database_id).await
477 }
478
479 pub async fn get_existing_job_resource_groups(
480 &self,
481 streaming_job_ids: Vec<JobId>,
482 ) -> MetaResult<HashMap<JobId, String>> {
483 let inner = self.inner.read().await;
484 let mut resource_groups = HashMap::new();
485 for job_id in streaming_job_ids {
486 let resource_group = get_existing_job_resource_group(&inner.db, job_id).await?;
487 resource_groups.insert(job_id, resource_group);
488 }
489
490 Ok(resource_groups)
491 }
492
493 pub async fn get_existing_job_database_resource_group(
494 &self,
495 streaming_job_id: JobId,
496 ) -> MetaResult<String> {
497 let inner = self.inner.read().await;
498 let database_id: DatabaseId = StreamingJob::find_by_id(streaming_job_id)
499 .select_only()
500 .join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
501 .column(object::Column::DatabaseId)
502 .into_tuple()
503 .one(&inner.db)
504 .await?
505 .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", streaming_job_id))?;
506
507 get_database_resource_group(&inner.db, database_id).await
508 }
509
510 pub async fn get_job_streaming_parallelisms(
511 &self,
512 streaming_job_id: JobId,
513 ) -> MetaResult<StreamingParallelism> {
514 let inner = self.inner.read().await;
515
516 let job_parallelism: StreamingParallelism = StreamingJob::find_by_id(streaming_job_id)
517 .select_only()
518 .column(streaming_job::Column::Parallelism)
519 .into_tuple()
520 .one(&inner.db)
521 .await?
522 .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", streaming_job_id))?;
523
524 Ok(job_parallelism)
525 }
526
527 pub async fn get_fragment_streaming_job_id(
528 &self,
529 fragment_id: FragmentId,
530 ) -> MetaResult<JobId> {
531 let inner = self.inner.read().await;
532 let job_id: JobId = Fragment::find_by_id(fragment_id)
533 .select_only()
534 .column(fragment::Column::JobId)
535 .into_tuple()
536 .one(&inner.db)
537 .await?
538 .ok_or_else(|| MetaError::catalog_id_not_found("fragment", fragment_id))?;
539 Ok(job_id)
540 }
541
542 pub async fn list_streaming_job_with_database(
543 &self,
544 ) -> MetaResult<HashMap<DatabaseId, Vec<JobId>>> {
545 let inner = self.inner.read().await;
546 let database_objects: Vec<(DatabaseId, JobId)> = StreamingJob::find()
547 .select_only()
548 .column(object::Column::DatabaseId)
549 .column(streaming_job::Column::JobId)
550 .join(JoinType::LeftJoin, streaming_job::Relation::Object.def())
551 .into_tuple()
552 .all(&inner.db)
553 .await?;
554
555 Ok(database_objects.into_iter().into_group_map())
556 }
557
558 pub async fn list_table_objects(
560 &self,
561 ) -> MetaResult<Vec<(TableId, String, String, String, String)>> {
562 let inner = self.inner.read().await;
563 Ok(Object::find()
564 .select_only()
565 .join(JoinType::InnerJoin, object::Relation::Table.def())
566 .join(JoinType::InnerJoin, object::Relation::Database2.def())
567 .join(JoinType::InnerJoin, object::Relation::Schema2.def())
568 .column(object::Column::Oid)
569 .column(database::Column::Name)
570 .column(schema::Column::Name)
571 .column(table::Column::Name)
572 .column(database::Column::ResourceGroup)
573 .into_tuple()
574 .all(&inner.db)
575 .await?)
576 }
577
578 pub async fn list_source_objects(
580 &self,
581 ) -> MetaResult<Vec<(TableId, String, String, String, String)>> {
582 let inner = self.inner.read().await;
583 Ok(Object::find()
584 .select_only()
585 .join(JoinType::InnerJoin, object::Relation::Source.def())
586 .join(JoinType::InnerJoin, object::Relation::Database2.def())
587 .join(JoinType::InnerJoin, object::Relation::Schema2.def())
588 .column(object::Column::Oid)
589 .column(database::Column::Name)
590 .column(schema::Column::Name)
591 .column(source::Column::Name)
592 .column(database::Column::ResourceGroup)
593 .into_tuple()
594 .all(&inner.db)
595 .await?)
596 }
597
598 pub async fn list_sink_objects(
600 &self,
601 ) -> MetaResult<Vec<(TableId, String, String, String, String)>> {
602 let inner = self.inner.read().await;
603 Ok(Object::find()
604 .select_only()
605 .join(JoinType::InnerJoin, object::Relation::Sink.def())
606 .join(JoinType::InnerJoin, object::Relation::Database2.def())
607 .join(JoinType::InnerJoin, object::Relation::Schema2.def())
608 .column(object::Column::Oid)
609 .column(database::Column::Name)
610 .column(schema::Column::Name)
611 .column(sink::Column::Name)
612 .column(database::Column::ResourceGroup)
613 .into_tuple()
614 .all(&inner.db)
615 .await?)
616 }
617
618 pub async fn get_streaming_job_status(&self, streaming_job_id: JobId) -> MetaResult<JobStatus> {
619 let inner = self.inner.read().await;
620 let status = StreamingJob::find_by_id(streaming_job_id)
621 .select_only()
622 .column(streaming_job::Column::JobStatus)
623 .into_tuple()
624 .one(&inner.db)
625 .await?
626 .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", streaming_job_id))?;
627 Ok(status)
628 }
629
630 pub async fn get_streaming_job_extra_info(
631 &self,
632 job_ids: Vec<JobId>,
633 ) -> MetaResult<HashMap<JobId, StreamingJobExtraInfo>> {
634 let inner = self.inner.read().await;
635 let txn = inner.db.begin().await?;
636
637 let result = fetch_streaming_job_extra_info(&txn, job_ids).await?;
638 Ok(result)
639 }
640}