1use risingwave_common::catalog::ColumnCatalog;
16use risingwave_meta_model::table::RefreshState;
17
18use super::*;
19use crate::controller::utils::{
20 StreamingJobExtraInfo, get_database_resource_group, get_existing_job_resource_group,
21 get_streaming_job_extra_info as fetch_streaming_job_extra_info, get_table_columns,
22};
23
24impl CatalogController {
25 pub async fn get_secret_by_id(&self, secret_id: SecretId) -> MetaResult<PbSecret> {
26 let inner = self.inner.read().await;
27 let (secret, obj) = Secret::find_by_id(secret_id)
28 .find_also_related(Object)
29 .one(&inner.db)
30 .await?
31 .ok_or_else(|| MetaError::catalog_id_not_found("secret", secret_id))?;
32 Ok(ObjectModel(secret, obj.unwrap()).into())
33 }
34
35 pub async fn get_object_database_id(&self, object_id: ObjectId) -> MetaResult<DatabaseId> {
36 let inner = self.inner.read().await;
37 let (database_id,): (Option<DatabaseId>,) = Object::find_by_id(object_id)
38 .select_only()
39 .select_column(object::Column::DatabaseId)
40 .into_tuple()
41 .one(&inner.db)
42 .await?
43 .ok_or_else(|| MetaError::catalog_id_not_found("object", object_id))?;
44 Ok(database_id.ok_or_else(|| anyhow!("object has no database id: {object_id}"))?)
45 }
46
47 pub async fn get_connection_by_id(
48 &self,
49 connection_id: ConnectionId,
50 ) -> MetaResult<PbConnection> {
51 let inner = self.inner.read().await;
52 let (conn, obj) = Connection::find_by_id(connection_id)
53 .find_also_related(Object)
54 .one(&inner.db)
55 .await?
56 .ok_or_else(|| MetaError::catalog_id_not_found("connection", connection_id))?;
57
58 Ok(ObjectModel(conn, obj.unwrap()).into())
59 }
60
61 pub async fn get_table_catalog_by_name(
62 &self,
63 database_id: DatabaseId,
64 schema_id: SchemaId,
65 name: &str,
66 ) -> MetaResult<Option<PbTable>> {
67 let inner = self.inner.read().await;
68 let table_obj = Table::find()
69 .find_also_related(Object)
70 .filter(
71 table::Column::Name
72 .eq(name)
73 .and(object::Column::DatabaseId.eq(database_id))
74 .and(object::Column::SchemaId.eq(schema_id)),
75 )
76 .one(&inner.db)
77 .await?;
78 Ok(table_obj.map(|(table, obj)| ObjectModel(table, obj.unwrap()).into()))
79 }
80
81 pub async fn get_table_by_name(
82 &self,
83 database_name: &str,
84 table_name: &str,
85 ) -> MetaResult<Option<PbTable>> {
86 let inner = self.inner.read().await;
87 let table_obj = Table::find()
88 .find_also_related(Object)
89 .join(JoinType::InnerJoin, object::Relation::Database2.def())
90 .filter(
91 table::Column::Name
92 .eq(table_name)
93 .and(database::Column::Name.eq(database_name)),
94 )
95 .one(&inner.db)
96 .await?;
97 Ok(table_obj.map(|(table, obj)| ObjectModel(table, obj.unwrap()).into()))
98 }
99
100 pub async fn get_table_associated_source_id(
101 &self,
102 table_id: TableId,
103 ) -> MetaResult<Option<SourceId>> {
104 let inner = self.inner.read().await;
105 Table::find_by_id(table_id)
106 .select_only()
107 .select_column(table::Column::OptionalAssociatedSourceId)
108 .into_tuple()
109 .one(&inner.db)
110 .await?
111 .ok_or_else(|| MetaError::catalog_id_not_found("table", table_id))
112 }
113
114 pub async fn get_table_by_associate_source_id(
115 &self,
116 associated_source_id: SourceId,
117 ) -> MetaResult<PbTable> {
118 let inner = self.inner.read().await;
119 Table::find()
120 .find_also_related(Object)
121 .filter(table::Column::OptionalAssociatedSourceId.eq(associated_source_id))
122 .one(&inner.db)
123 .await?
124 .map(|(table, obj)| ObjectModel(table, obj.unwrap()).into())
125 .ok_or_else(|| {
126 MetaError::catalog_id_not_found("table associated source", associated_source_id)
127 })
128 }
129
130 pub async fn get_table_by_id(&self, table_id: TableId) -> MetaResult<PbTable> {
131 let inner = self.inner.read().await;
132 let table_obj = Table::find_by_id(table_id)
133 .find_also_related(Object)
134 .one(&inner.db)
135 .await?;
136 if let Some((table, obj)) = table_obj {
137 Ok(ObjectModel(table, obj.unwrap()).into())
138 } else {
139 Err(MetaError::catalog_id_not_found("table", table_id))
140 }
141 }
142
143 pub async fn get_user_created_table_by_ids(
144 &self,
145 table_ids: Vec<TableId>,
146 ) -> MetaResult<Vec<PbTable>> {
147 let inner = self.inner.read().await;
148 let table_objs = Table::find()
149 .find_also_related(Object)
150 .filter(
151 table::Column::TableId
152 .is_in(table_ids.clone())
153 .and(table::Column::TableType.eq(TableType::Table)),
154 )
155 .all(&inner.db)
156 .await?;
157 let tables = table_objs
158 .into_iter()
159 .map(|(table, obj)| ObjectModel(table, obj.unwrap()).into())
160 .collect();
161 Ok(tables)
162 }
163
164 pub async fn get_table_by_ids(
165 &self,
166 table_ids: Vec<TableId>,
167 include_dropped_table: bool,
168 ) -> MetaResult<Vec<PbTable>> {
169 let inner = self.inner.read().await;
170 let table_objs = Table::find()
171 .find_also_related(Object)
172 .filter(table::Column::TableId.is_in(table_ids.clone()))
173 .all(&inner.db)
174 .await?;
175 let tables = table_objs
176 .into_iter()
177 .map(|(table, obj)| ObjectModel(table, obj.unwrap()).into());
178 let tables = if include_dropped_table {
179 tables
180 .chain(inner.dropped_tables.iter().filter_map(|(id, t)| {
181 if table_ids.contains(id) {
182 Some(t.clone())
183 } else {
184 None
185 }
186 }))
187 .collect()
188 } else {
189 tables.collect()
190 };
191 Ok(tables)
192 }
193
194 pub async fn get_table_columns(&self, id: TableId) -> MetaResult<Vec<ColumnCatalog>> {
195 let inner = self.inner.read().await;
196 Ok(get_table_columns(&inner.db, id)
197 .await?
198 .to_protobuf()
199 .into_iter()
200 .map(|col| col.into())
201 .collect())
202 }
203
204 pub async fn get_table_incoming_sinks(&self, table_id: TableId) -> MetaResult<Vec<PbSink>> {
205 let inner = self.inner.read().await;
206 let sink_objs = Sink::find()
207 .find_also_related(Object)
208 .filter(sink::Column::TargetTable.eq(table_id))
209 .all(&inner.db)
210 .await?;
211 Ok(sink_objs
212 .into_iter()
213 .map(|(sink, obj)| ObjectModel(sink, obj.unwrap()).into())
214 .collect())
215 }
216
217 pub async fn get_table_refresh_state(
219 &self,
220 table_id: TableId,
221 ) -> MetaResult<Option<RefreshState>> {
222 let inner = self.inner.read().await;
223 let (refresh_state,): (Option<RefreshState>,) = Table::find_by_id(table_id)
224 .select_only()
225 .select_column(table::Column::RefreshState)
226 .into_tuple()
227 .one(&inner.db)
228 .await?
229 .ok_or_else(|| MetaError::catalog_id_not_found("table", table_id))?;
230
231 Ok(Some(refresh_state.unwrap_or(RefreshState::Idle)))
233 }
234
235 pub async fn get_sink_by_ids(&self, sink_ids: Vec<SinkId>) -> MetaResult<Vec<PbSink>> {
236 let inner = self.inner.read().await;
237 let sink_objs = Sink::find()
238 .find_also_related(Object)
239 .filter(sink::Column::SinkId.is_in(sink_ids))
240 .all(&inner.db)
241 .await?;
242 Ok(sink_objs
243 .into_iter()
244 .map(|(sink, obj)| ObjectModel(sink, obj.unwrap()).into())
245 .collect())
246 }
247
248 pub async fn get_sink_auto_refresh_schema_from(
249 &self,
250 table_id: TableId,
251 ) -> MetaResult<Vec<PbSink>> {
252 let inner = self.inner.read().await;
253 let sink_objs = Sink::find()
254 .find_also_related(Object)
255 .filter(sink::Column::AutoRefreshSchemaFromTable.eq(table_id))
256 .all(&inner.db)
257 .await?;
258 Ok(sink_objs
259 .into_iter()
260 .map(|(sink, obj)| ObjectModel(sink, obj.unwrap()).into())
261 .collect())
262 }
263
264 pub async fn get_sink_state_table_ids(&self, sink_id: SinkId) -> MetaResult<Vec<TableId>> {
265 let inner = self.inner.read().await;
266 let tables: Vec<I32Array> = Fragment::find()
267 .select_only()
268 .column(fragment::Column::StateTableIds)
269 .filter(fragment::Column::JobId.eq(sink_id))
270 .into_tuple()
271 .all(&inner.db)
272 .await?;
273 Ok(tables
274 .into_iter()
275 .flat_map(|ids| ids.into_inner().into_iter())
276 .collect())
277 }
278
279 pub async fn get_subscription_by_id(
280 &self,
281 subscription_id: SubscriptionId,
282 ) -> MetaResult<PbSubscription> {
283 let inner = self.inner.read().await;
284 let subscription_objs = Subscription::find()
285 .find_also_related(Object)
286 .filter(subscription::Column::SubscriptionId.eq(subscription_id))
287 .all(&inner.db)
288 .await?;
289 let subscription: PbSubscription = subscription_objs
290 .into_iter()
291 .map(|(subscription, obj)| ObjectModel(subscription, obj.unwrap()).into())
292 .find_or_first(|_| true)
293 .ok_or_else(|| anyhow!("cannot find subscription with id {}", subscription_id))?;
294
295 Ok(subscription)
296 }
297
298 pub async fn get_mv_depended_subscriptions(
299 &self,
300 database_id: Option<DatabaseId>,
301 ) -> MetaResult<HashMap<TableId, HashMap<SubscriptionId, u64>>> {
302 let inner = self.inner.read().await;
303 let select = Subscription::find()
304 .select_only()
305 .select_column(subscription::Column::SubscriptionId)
306 .select_column(subscription::Column::DependentTableId)
307 .select_column(subscription::Column::RetentionSeconds);
308 let select = if let Some(database_id) = database_id {
309 select
310 .join(JoinType::InnerJoin, subscription::Relation::Object.def())
311 .filter(object::Column::DatabaseId.eq(database_id))
312 } else {
313 select
314 };
315 let subscription_objs: Vec<(SubscriptionId, ObjectId, i64)> =
316 select.into_tuple().all(&inner.db).await?;
317 let mut map: HashMap<_, HashMap<_, _>> = HashMap::new();
318 for (subscription_id, dependent_table_id, retention_seconds) in subscription_objs {
320 map.entry(dependent_table_id)
321 .or_default()
322 .insert(subscription_id, retention_seconds as _);
323 }
324 Ok(map)
325 }
326
327 pub async fn get_all_table_options(&self) -> MetaResult<HashMap<TableId, TableOption>> {
328 let inner = self.inner.read().await;
329 let table_options: Vec<(TableId, Option<i32>)> = Table::find()
330 .select_only()
331 .columns([table::Column::TableId, table::Column::RetentionSeconds])
332 .into_tuple::<(TableId, Option<i32>)>()
333 .all(&inner.db)
334 .await?;
335
336 Ok(table_options
337 .into_iter()
338 .map(|(id, retention_seconds)| {
339 (
340 id,
341 TableOption {
342 retention_seconds: retention_seconds.map(|i| i.try_into().unwrap()),
343 },
344 )
345 })
346 .collect())
347 }
348
349 pub async fn get_all_streaming_parallelisms(
350 &self,
351 ) -> MetaResult<HashMap<ObjectId, StreamingParallelism>> {
352 let inner = self.inner.read().await;
353
354 let job_parallelisms = StreamingJob::find()
355 .select_only()
356 .columns([
357 streaming_job::Column::JobId,
358 streaming_job::Column::Parallelism,
359 ])
360 .into_tuple::<(ObjectId, StreamingParallelism)>()
361 .all(&inner.db)
362 .await?;
363
364 Ok(job_parallelisms
365 .into_iter()
366 .collect::<HashMap<ObjectId, StreamingParallelism>>())
367 }
368
369 pub async fn get_table_name_type_mapping(
370 &self,
371 ) -> MetaResult<HashMap<TableId, (String, String)>> {
372 let inner = self.inner.read().await;
373 let table_name_types: Vec<(TableId, String, TableType)> = Table::find()
374 .select_only()
375 .columns([
376 table::Column::TableId,
377 table::Column::Name,
378 table::Column::TableType,
379 ])
380 .into_tuple()
381 .all(&inner.db)
382 .await?;
383 Ok(table_name_types
384 .into_iter()
385 .map(|(id, name, table_type)| {
386 (
387 id,
388 (name, PbTableType::from(table_type).as_str_name().to_owned()),
389 )
390 })
391 .collect())
392 }
393
394 pub async fn get_table_by_cdc_table_id(
395 &self,
396 cdc_table_id: &String,
397 ) -> MetaResult<Vec<PbTable>> {
398 let inner = self.inner.read().await;
399 let table_objs = Table::find()
400 .find_also_related(Object)
401 .filter(table::Column::CdcTableId.eq(cdc_table_id))
402 .all(&inner.db)
403 .await?;
404 Ok(table_objs
405 .into_iter()
406 .map(|(table, obj)| ObjectModel(table, obj.unwrap()).into())
407 .collect())
408 }
409
410 pub async fn get_created_table_ids(&self) -> MetaResult<Vec<TableId>> {
411 let inner = self.inner.read().await;
412
413 let mut table_ids: Vec<TableId> = StreamingJob::find()
415 .select_only()
416 .column(streaming_job::Column::JobId)
417 .filter(streaming_job::Column::JobStatus.eq(JobStatus::Created))
418 .into_tuple()
419 .all(&inner.db)
420 .await?;
421
422 let internal_table_ids: Vec<TableId> = Table::find()
424 .select_only()
425 .column(table::Column::TableId)
426 .filter(table::Column::BelongsToJobId.is_in(table_ids.clone()))
427 .into_tuple()
428 .all(&inner.db)
429 .await?;
430 table_ids.extend(internal_table_ids);
431
432 Ok(table_ids)
433 }
434
435 pub async fn get_versioned_table_schemas(&self) -> MetaResult<HashMap<TableId, Vec<i32>>> {
438 let res = self
439 .list_all_state_tables()
440 .await?
441 .into_iter()
442 .filter_map(|t| {
443 if t.version.is_some() {
444 let ret = (
445 t.id.try_into().unwrap(),
446 t.columns
447 .iter()
448 .map(|c| c.column_desc.as_ref().unwrap().column_id)
449 .collect_vec(),
450 );
451 return Some(ret);
452 }
453 None
454 })
455 .collect();
456 Ok(res)
457 }
458
459 pub async fn get_existing_job_resource_group(
460 &self,
461 streaming_job_id: ObjectId,
462 ) -> MetaResult<String> {
463 let inner = self.inner.read().await;
464 get_existing_job_resource_group(&inner.db, streaming_job_id).await
465 }
466
467 pub async fn get_database_resource_group(&self, database_id: ObjectId) -> MetaResult<String> {
468 let inner = self.inner.read().await;
469 get_database_resource_group(&inner.db, database_id).await
470 }
471
472 pub async fn get_existing_job_resource_groups(
473 &self,
474 streaming_job_ids: Vec<ObjectId>,
475 ) -> MetaResult<HashMap<ObjectId, String>> {
476 let inner = self.inner.read().await;
477 let mut resource_groups = HashMap::new();
478 for job_id in streaming_job_ids {
479 let resource_group = get_existing_job_resource_group(&inner.db, job_id).await?;
480 resource_groups.insert(job_id, resource_group);
481 }
482
483 Ok(resource_groups)
484 }
485
486 pub async fn get_existing_job_database_resource_group(
487 &self,
488 streaming_job_id: ObjectId,
489 ) -> MetaResult<String> {
490 let inner = self.inner.read().await;
491 let database_id: ObjectId = StreamingJob::find_by_id(streaming_job_id)
492 .select_only()
493 .join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
494 .column(object::Column::DatabaseId)
495 .into_tuple()
496 .one(&inner.db)
497 .await?
498 .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", streaming_job_id))?;
499
500 get_database_resource_group(&inner.db, database_id).await
501 }
502
503 pub async fn get_job_streaming_parallelisms(
504 &self,
505 streaming_job_id: ObjectId,
506 ) -> MetaResult<StreamingParallelism> {
507 let inner = self.inner.read().await;
508
509 let job_parallelism: StreamingParallelism = StreamingJob::find_by_id(streaming_job_id)
510 .select_only()
511 .column(streaming_job::Column::Parallelism)
512 .into_tuple()
513 .one(&inner.db)
514 .await?
515 .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", streaming_job_id))?;
516
517 Ok(job_parallelism)
518 }
519
520 pub async fn get_fragment_streaming_job_id(
521 &self,
522 fragment_id: FragmentId,
523 ) -> MetaResult<ObjectId> {
524 let inner = self.inner.read().await;
525 let job_id: ObjectId = Fragment::find_by_id(fragment_id)
526 .select_only()
527 .column(fragment::Column::JobId)
528 .into_tuple()
529 .one(&inner.db)
530 .await?
531 .ok_or_else(|| MetaError::catalog_id_not_found("fragment", fragment_id))?;
532 Ok(job_id)
533 }
534
535 pub async fn list_streaming_job_with_database(
536 &self,
537 ) -> MetaResult<HashMap<DatabaseId, Vec<ObjectId>>> {
538 let inner = self.inner.read().await;
539 let database_objects: Vec<(DatabaseId, ObjectId)> = StreamingJob::find()
540 .select_only()
541 .column(object::Column::DatabaseId)
542 .column(streaming_job::Column::JobId)
543 .join(JoinType::LeftJoin, streaming_job::Relation::Object.def())
544 .into_tuple()
545 .all(&inner.db)
546 .await?;
547
548 Ok(database_objects.into_iter().into_group_map())
549 }
550
551 pub async fn list_table_objects(
553 &self,
554 ) -> MetaResult<Vec<(TableId, String, String, String, String)>> {
555 let inner = self.inner.read().await;
556 Ok(Object::find()
557 .select_only()
558 .join(JoinType::InnerJoin, object::Relation::Table.def())
559 .join(JoinType::InnerJoin, object::Relation::Database2.def())
560 .join(JoinType::InnerJoin, object::Relation::Schema2.def())
561 .column(object::Column::Oid)
562 .column(database::Column::Name)
563 .column(schema::Column::Name)
564 .column(table::Column::Name)
565 .column(database::Column::ResourceGroup)
566 .into_tuple()
567 .all(&inner.db)
568 .await?)
569 }
570
571 pub async fn list_source_objects(
573 &self,
574 ) -> MetaResult<Vec<(TableId, String, String, String, String)>> {
575 let inner = self.inner.read().await;
576 Ok(Object::find()
577 .select_only()
578 .join(JoinType::InnerJoin, object::Relation::Source.def())
579 .join(JoinType::InnerJoin, object::Relation::Database2.def())
580 .join(JoinType::InnerJoin, object::Relation::Schema2.def())
581 .column(object::Column::Oid)
582 .column(database::Column::Name)
583 .column(schema::Column::Name)
584 .column(source::Column::Name)
585 .column(database::Column::ResourceGroup)
586 .into_tuple()
587 .all(&inner.db)
588 .await?)
589 }
590
591 pub async fn list_sink_objects(
593 &self,
594 ) -> MetaResult<Vec<(TableId, String, String, String, String)>> {
595 let inner = self.inner.read().await;
596 Ok(Object::find()
597 .select_only()
598 .join(JoinType::InnerJoin, object::Relation::Sink.def())
599 .join(JoinType::InnerJoin, object::Relation::Database2.def())
600 .join(JoinType::InnerJoin, object::Relation::Schema2.def())
601 .column(object::Column::Oid)
602 .column(database::Column::Name)
603 .column(schema::Column::Name)
604 .column(sink::Column::Name)
605 .column(database::Column::ResourceGroup)
606 .into_tuple()
607 .all(&inner.db)
608 .await?)
609 }
610
611 pub async fn get_streaming_job_status(
612 &self,
613 streaming_job_id: ObjectId,
614 ) -> MetaResult<JobStatus> {
615 let inner = self.inner.read().await;
616 let status = StreamingJob::find_by_id(streaming_job_id)
617 .select_only()
618 .column(streaming_job::Column::JobStatus)
619 .into_tuple()
620 .one(&inner.db)
621 .await?
622 .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", streaming_job_id))?;
623 Ok(status)
624 }
625
626 pub async fn get_streaming_job_extra_info(
627 &self,
628 job_ids: Vec<ObjectId>,
629 ) -> MetaResult<HashMap<ObjectId, StreamingJobExtraInfo>> {
630 let inner = self.inner.read().await;
631 let txn = inner.db.begin().await?;
632
633 let result = fetch_streaming_job_extra_info(&txn, job_ids).await?;
634 Ok(result)
635 }
636}