risingwave_meta/controller/catalog/
get_op.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17use risingwave_common::catalog::ColumnCatalog;
18use risingwave_common::id::JobId;
19use sea_orm::ConnectionTrait;
20
21use super::*;
22use crate::controller::utils::{
23    StreamingJobExtraInfo, get_database_resource_group, get_existing_job_resource_group,
24    get_streaming_job_extra_info as fetch_streaming_job_extra_info, get_table_columns,
25    load_streaming_jobs_by_ids,
26};
27
28impl CatalogController {
29    pub async fn get_secret_by_id(&self, secret_id: SecretId) -> MetaResult<PbSecret> {
30        let inner = self.inner.read().await;
31        let (secret, obj) = Secret::find_by_id(secret_id)
32            .find_also_related(Object)
33            .one(&inner.db)
34            .await?
35            .ok_or_else(|| MetaError::catalog_id_not_found("secret", secret_id))?;
36        Ok(ObjectModel(secret, obj.unwrap(), None).into())
37    }
38
39    pub async fn get_object_database_id(
40        &self,
41        object_id: impl Into<ObjectId>,
42    ) -> MetaResult<DatabaseId> {
43        let object_id = object_id.into();
44        let inner = self.inner.read().await;
45        let (database_id,): (Option<DatabaseId>,) = Object::find_by_id(object_id)
46            .select_only()
47            .select_column(object::Column::DatabaseId)
48            .into_tuple()
49            .one(&inner.db)
50            .await?
51            .ok_or_else(|| MetaError::catalog_id_not_found("object", object_id))?;
52        Ok(database_id.ok_or_else(|| anyhow!("object has no database id: {object_id}"))?)
53    }
54
55    pub async fn get_connection_by_id(
56        &self,
57        connection_id: ConnectionId,
58    ) -> MetaResult<PbConnection> {
59        let inner = self.inner.read().await;
60        let (conn, obj) = Connection::find_by_id(connection_id)
61            .find_also_related(Object)
62            .one(&inner.db)
63            .await?
64            .ok_or_else(|| MetaError::catalog_id_not_found("connection", connection_id))?;
65
66        Ok(ObjectModel(conn, obj.unwrap(), None).into())
67    }
68
69    pub async fn get_table_catalog_by_name(
70        &self,
71        database_id: DatabaseId,
72        schema_id: SchemaId,
73        name: &str,
74    ) -> MetaResult<Option<PbTable>> {
75        let inner = self.inner.read().await;
76        let table_obj = Table::find()
77            .find_also_related(Object)
78            .filter(
79                table::Column::Name
80                    .eq(name)
81                    .and(object::Column::DatabaseId.eq(database_id))
82                    .and(object::Column::SchemaId.eq(schema_id)),
83            )
84            .one(&inner.db)
85            .await?;
86        if let Some((table, obj)) = table_obj {
87            let streaming_job = streaming_job::Entity::find_by_id(table.job_id())
88                .one(&inner.db)
89                .await?;
90            Ok(Some(ObjectModel(table, obj.unwrap(), streaming_job).into()))
91        } else {
92            Ok(None)
93        }
94    }
95
96    pub async fn get_table_by_name(
97        &self,
98        database_name: &str,
99        table_name: &str,
100    ) -> MetaResult<Option<PbTable>> {
101        let inner = self.inner.read().await;
102        let table_obj = Table::find()
103            .find_also_related(Object)
104            .join(JoinType::InnerJoin, object::Relation::Database2.def())
105            .filter(
106                table::Column::Name
107                    .eq(table_name)
108                    .and(database::Column::Name.eq(database_name)),
109            )
110            .one(&inner.db)
111            .await?;
112        if let Some((table, obj)) = table_obj {
113            let streaming_job = streaming_job::Entity::find_by_id(table.job_id())
114                .one(&inner.db)
115                .await?;
116            Ok(Some(ObjectModel(table, obj.unwrap(), streaming_job).into()))
117        } else {
118            Ok(None)
119        }
120    }
121
122    pub async fn get_table_associated_source_id(
123        &self,
124        table_id: TableId,
125    ) -> MetaResult<Option<SourceId>> {
126        let inner = self.inner.read().await;
127        Table::find_by_id(table_id)
128            .select_only()
129            .select_column(table::Column::OptionalAssociatedSourceId)
130            .into_tuple()
131            .one(&inner.db)
132            .await?
133            .ok_or_else(|| MetaError::catalog_id_not_found("table", table_id))
134    }
135
136    pub async fn get_table_by_associate_source_id(
137        &self,
138        associated_source_id: SourceId,
139    ) -> MetaResult<PbTable> {
140        let inner = self.inner.read().await;
141        let table_obj = Table::find()
142            .find_also_related(Object)
143            .filter(table::Column::OptionalAssociatedSourceId.eq(associated_source_id))
144            .one(&inner.db)
145            .await?
146            .ok_or_else(|| {
147                MetaError::catalog_id_not_found("table associated source", associated_source_id)
148            })?;
149        let (table, obj) = table_obj;
150        let streaming_job = streaming_job::Entity::find_by_id(table.job_id())
151            .one(&inner.db)
152            .await?;
153        Ok(ObjectModel(table, obj.unwrap(), streaming_job).into())
154    }
155
156    pub async fn get_table_by_id(&self, table_id: TableId) -> MetaResult<PbTable> {
157        let inner = self.inner.read().await;
158        let table_obj = Table::find_by_id(table_id)
159            .find_also_related(Object)
160            .one(&inner.db)
161            .await?;
162        if let Some((table, obj)) = table_obj {
163            let streaming_job = streaming_job::Entity::find_by_id(table.job_id())
164                .one(&inner.db)
165                .await?;
166            Ok(ObjectModel(table, obj.unwrap(), streaming_job).into())
167        } else {
168            Err(MetaError::catalog_id_not_found("table", table_id))
169        }
170    }
171
172    pub async fn get_user_created_table_by_ids(
173        &self,
174        job_ids: impl Iterator<Item = JobId>,
175    ) -> MetaResult<Vec<PbTable>> {
176        let inner = self.inner.read().await;
177        self.get_user_created_table_by_ids_in_txn(&inner.db, job_ids)
178            .await
179    }
180
181    pub async fn get_user_created_table_by_ids_in_txn<C>(
182        &self,
183        txn: &C,
184        job_ids: impl Iterator<Item = JobId>,
185    ) -> MetaResult<Vec<PbTable>>
186    where
187        C: ConnectionTrait,
188    {
189        let table_objs = Table::find()
190            .find_also_related(Object)
191            .filter(
192                table::Column::TableId
193                    .is_in(job_ids.map(|job_id| job_id.as_mv_table_id()).collect_vec())
194                    .and(table::Column::TableType.eq(TableType::Table)),
195            )
196            .all(txn)
197            .await?;
198        let streaming_jobs = streaming_job::Entity::find()
199            .filter(
200                streaming_job::Column::JobId.is_in(
201                    table_objs
202                        .iter()
203                        .map(|(table, _)| table.job_id())
204                        .collect_vec(),
205                ),
206            )
207            .all(txn)
208            .await?
209            .into_iter()
210            .map(|job| (job.job_id, job))
211            .collect::<HashMap<JobId, streaming_job::Model>>();
212        Ok(table_objs
213            .into_iter()
214            .map(|(table, obj)| {
215                let job_id = table.job_id();
216                let streaming_job = streaming_jobs.get(&job_id).cloned();
217                ObjectModel(table, obj.unwrap(), streaming_job).into()
218            })
219            .collect())
220    }
221
222    pub async fn get_table_by_ids(
223        &self,
224        table_ids: Vec<TableId>,
225        include_dropped_table: bool,
226    ) -> MetaResult<Vec<PbTable>> {
227        let inner = self.inner.read().await;
228        let table_objs = Table::find()
229            .find_also_related(Object)
230            .filter(table::Column::TableId.is_in(table_ids.clone()))
231            .all(&inner.db)
232            .await?;
233        let streaming_jobs = load_streaming_jobs_by_ids(
234            &inner.db,
235            table_objs.iter().map(|(table, _)| table.job_id()),
236        )
237        .await?;
238        let tables = table_objs.into_iter().map(|(table, obj)| {
239            let job_id = table.job_id();
240            let streaming_job = streaming_jobs.get(&job_id).cloned();
241            ObjectModel(table, obj.unwrap(), streaming_job).into()
242        });
243        let tables = if include_dropped_table {
244            tables
245                .chain(inner.dropped_tables.iter().filter_map(|(id, t)| {
246                    if table_ids.contains(id) {
247                        Some(t.clone())
248                    } else {
249                        None
250                    }
251                }))
252                .collect()
253        } else {
254            tables.collect()
255        };
256        Ok(tables)
257    }
258
259    pub async fn get_table_columns(&self, id: TableId) -> MetaResult<Vec<ColumnCatalog>> {
260        let inner = self.inner.read().await;
261        Ok(get_table_columns(&inner.db, id)
262            .await?
263            .to_protobuf()
264            .into_iter()
265            .map(|col| col.into())
266            .collect())
267    }
268
269    pub async fn get_sink_by_id(&self, sink_id: SinkId) -> MetaResult<Option<PbSink>> {
270        let inner = self.inner.read().await;
271        let sink_objs = Sink::find_by_id(sink_id)
272            .find_also_related(Object)
273            .one(&inner.db)
274            .await?;
275        if let Some((sink, obj)) = sink_objs {
276            let streaming_job = streaming_job::Entity::find_by_id(sink.sink_id.as_job_id())
277                .one(&inner.db)
278                .await?;
279            Ok(Some(ObjectModel(sink, obj.unwrap(), streaming_job).into()))
280        } else {
281            Ok(None)
282        }
283    }
284
285    pub async fn get_sink_auto_refresh_schema_from(
286        &self,
287        table_id: TableId,
288    ) -> MetaResult<Vec<PbSink>> {
289        let inner = self.inner.read().await;
290        let sink_objs = Sink::find()
291            .find_also_related(Object)
292            .filter(sink::Column::AutoRefreshSchemaFromTable.eq(table_id))
293            .all(&inner.db)
294            .await?;
295        let streaming_jobs = load_streaming_jobs_by_ids(
296            &inner.db,
297            sink_objs.iter().map(|(sink, _)| sink.sink_id.as_job_id()),
298        )
299        .await?;
300        Ok(sink_objs
301            .into_iter()
302            .map(|(sink, obj)| {
303                let streaming_job = streaming_jobs.get(&sink.sink_id.as_job_id()).cloned();
304                ObjectModel(sink, obj.unwrap(), streaming_job).into()
305            })
306            .collect())
307    }
308
309    pub async fn get_sink_state_table_ids(&self, sink_id: SinkId) -> MetaResult<Vec<TableId>> {
310        let inner = self.inner.read().await;
311        let tables: Vec<I32Array> = Fragment::find()
312            .select_only()
313            .column(fragment::Column::StateTableIds)
314            .filter(fragment::Column::JobId.eq(sink_id))
315            .into_tuple()
316            .all(&inner.db)
317            .await?;
318        Ok(tables
319            .into_iter()
320            .flat_map(|ids| {
321                ids.into_inner()
322                    .into_iter()
323                    .map(|table_id| TableId::new(table_id as _))
324            })
325            .collect())
326    }
327
328    pub async fn get_subscription_by_id(
329        &self,
330        subscription_id: SubscriptionId,
331    ) -> MetaResult<PbSubscription> {
332        let inner = self.inner.read().await;
333        let subscription_objs = Subscription::find()
334            .find_also_related(Object)
335            .filter(subscription::Column::SubscriptionId.eq(subscription_id))
336            .all(&inner.db)
337            .await?;
338        let subscription: PbSubscription = subscription_objs
339            .into_iter()
340            .map(|(subscription, obj)| ObjectModel(subscription, obj.unwrap(), None).into())
341            .find_or_first(|_| true)
342            .ok_or_else(|| anyhow!("cannot find subscription with id {}", subscription_id))?;
343
344        Ok(subscription)
345    }
346
347    pub async fn get_mv_depended_subscriptions(
348        &self,
349        database_id: Option<DatabaseId>,
350    ) -> MetaResult<HashMap<TableId, HashMap<SubscriptionId, u64>>> {
351        let inner = self.inner.read().await;
352        let select = Subscription::find()
353            .select_only()
354            .select_column(subscription::Column::SubscriptionId)
355            .select_column(subscription::Column::DependentTableId)
356            .select_column(subscription::Column::RetentionSeconds);
357        let select = if let Some(database_id) = database_id {
358            select
359                .join(JoinType::InnerJoin, subscription::Relation::Object.def())
360                .filter(object::Column::DatabaseId.eq(database_id))
361        } else {
362            select
363        };
364        let subscription_objs: Vec<(SubscriptionId, TableId, i64)> =
365            select.into_tuple().all(&inner.db).await?;
366        let mut map: HashMap<_, HashMap<_, _>> = HashMap::new();
367        // Write object at the same time we write subscription, so we must be able to get obj
368        for (subscription_id, dependent_table_id, retention_seconds) in subscription_objs {
369            map.entry(dependent_table_id)
370                .or_default()
371                .insert(subscription_id, retention_seconds as _);
372        }
373        Ok(map)
374    }
375
376    pub async fn get_all_table_options(&self) -> MetaResult<HashMap<TableId, TableOption>> {
377        let inner = self.inner.read().await;
378        let table_options: Vec<(TableId, Option<i32>)> = Table::find()
379            .select_only()
380            .columns([table::Column::TableId, table::Column::RetentionSeconds])
381            .into_tuple::<(TableId, Option<i32>)>()
382            .all(&inner.db)
383            .await?;
384
385        Ok(table_options
386            .into_iter()
387            .map(|(id, retention_seconds)| {
388                (
389                    id,
390                    TableOption {
391                        retention_seconds: retention_seconds.map(|i| i.try_into().unwrap()),
392                    },
393                )
394            })
395            .collect())
396    }
397
398    pub async fn get_all_streaming_parallelisms(
399        &self,
400    ) -> MetaResult<HashMap<ObjectId, StreamingParallelism>> {
401        let inner = self.inner.read().await;
402
403        let job_parallelisms = StreamingJob::find()
404            .select_only()
405            .columns([
406                streaming_job::Column::JobId,
407                streaming_job::Column::Parallelism,
408            ])
409            .into_tuple::<(ObjectId, StreamingParallelism)>()
410            .all(&inner.db)
411            .await?;
412
413        Ok(job_parallelisms
414            .into_iter()
415            .collect::<HashMap<ObjectId, StreamingParallelism>>())
416    }
417
418    pub async fn get_table_name_type_mapping(
419        &self,
420    ) -> MetaResult<HashMap<TableId, (String, String)>> {
421        let inner = self.inner.read().await;
422        let table_name_types: Vec<(TableId, String, TableType)> = Table::find()
423            .select_only()
424            .columns([
425                table::Column::TableId,
426                table::Column::Name,
427                table::Column::TableType,
428            ])
429            .into_tuple()
430            .all(&inner.db)
431            .await?;
432        Ok(table_name_types
433            .into_iter()
434            .map(|(id, name, table_type)| {
435                (
436                    id,
437                    (name, PbTableType::from(table_type).as_str_name().to_owned()),
438                )
439            })
440            .collect())
441    }
442
443    pub async fn get_table_by_cdc_table_id(
444        &self,
445        cdc_table_id: &String,
446    ) -> MetaResult<Vec<PbTable>> {
447        let inner = self.inner.read().await;
448        let table_objs = Table::find()
449            .find_also_related(Object)
450            .filter(table::Column::CdcTableId.eq(cdc_table_id))
451            .all(&inner.db)
452            .await?;
453        let streaming_jobs = load_streaming_jobs_by_ids(
454            &inner.db,
455            table_objs.iter().map(|(table, _)| table.job_id()),
456        )
457        .await?;
458        Ok(table_objs
459            .into_iter()
460            .map(|(table, obj)| {
461                let job_id = table.job_id();
462                let streaming_job = streaming_jobs.get(&job_id).cloned();
463                ObjectModel(table, obj.unwrap(), streaming_job).into()
464            })
465            .collect())
466    }
467
468    pub async fn get_created_table_ids(&self) -> MetaResult<Vec<TableId>> {
469        let inner = self.inner.read().await;
470
471        // created table ids.
472        let mut table_ids: Vec<TableId> = StreamingJob::find()
473            .select_only()
474            .column(streaming_job::Column::JobId)
475            .filter(streaming_job::Column::JobStatus.eq(JobStatus::Created))
476            .into_tuple()
477            .all(&inner.db)
478            .await?;
479
480        // internal table ids.
481        let internal_table_ids: Vec<TableId> = Table::find()
482            .select_only()
483            .column(table::Column::TableId)
484            .filter(table::Column::BelongsToJobId.is_in(table_ids.clone()))
485            .into_tuple()
486            .all(&inner.db)
487            .await?;
488        table_ids.extend(internal_table_ids);
489
490        Ok(table_ids)
491    }
492
493    /// Returns column ids of versioned tables.
494    /// Being versioned implies using `ColumnAwareSerde`.
495    pub async fn get_versioned_table_schemas(
496        &self,
497    ) -> MetaResult<HashMap<risingwave_common::catalog::TableId, Vec<i32>>> {
498        let res = self
499            .list_all_state_tables()
500            .await?
501            .into_iter()
502            .filter_map(|t| {
503                if t.version.is_some() {
504                    let ret = (
505                        t.id,
506                        t.columns
507                            .iter()
508                            .map(|c| c.column_desc.as_ref().unwrap().column_id)
509                            .collect_vec(),
510                    );
511                    return Some(ret);
512                }
513                None
514            })
515            .collect();
516        Ok(res)
517    }
518
519    pub async fn get_existing_job_resource_group(
520        &self,
521        streaming_job_id: JobId,
522    ) -> MetaResult<String> {
523        let inner = self.inner.read().await;
524        get_existing_job_resource_group(&inner.db, streaming_job_id).await
525    }
526
527    pub async fn get_database_resource_group(&self, database_id: DatabaseId) -> MetaResult<String> {
528        let inner = self.inner.read().await;
529        get_database_resource_group(&inner.db, database_id).await
530    }
531
532    pub async fn get_existing_job_resource_groups(
533        &self,
534        streaming_job_ids: Vec<JobId>,
535    ) -> MetaResult<HashMap<JobId, String>> {
536        let inner = self.inner.read().await;
537        let mut resource_groups = HashMap::new();
538        for job_id in streaming_job_ids {
539            let resource_group = get_existing_job_resource_group(&inner.db, job_id).await?;
540            resource_groups.insert(job_id, resource_group);
541        }
542
543        Ok(resource_groups)
544    }
545
546    pub async fn get_existing_job_database_resource_group(
547        &self,
548        streaming_job_id: JobId,
549    ) -> MetaResult<String> {
550        let inner = self.inner.read().await;
551        let database_id: DatabaseId = StreamingJob::find_by_id(streaming_job_id)
552            .select_only()
553            .join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
554            .column(object::Column::DatabaseId)
555            .into_tuple()
556            .one(&inner.db)
557            .await?
558            .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", streaming_job_id))?;
559
560        get_database_resource_group(&inner.db, database_id).await
561    }
562
563    pub async fn get_job_streaming_parallelisms(
564        &self,
565        streaming_job_id: JobId,
566    ) -> MetaResult<StreamingParallelism> {
567        let inner = self.inner.read().await;
568
569        let job_parallelism: StreamingParallelism = StreamingJob::find_by_id(streaming_job_id)
570            .select_only()
571            .column(streaming_job::Column::Parallelism)
572            .into_tuple()
573            .one(&inner.db)
574            .await?
575            .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", streaming_job_id))?;
576
577        Ok(job_parallelism)
578    }
579
580    pub async fn get_job_parallelisms(
581        &self,
582        streaming_job_id: JobId,
583    ) -> MetaResult<(StreamingParallelism, Option<StreamingParallelism>)> {
584        let inner = self.inner.read().await;
585
586        let (job_parallelism, backfill_parallelism): (
587            StreamingParallelism,
588            Option<StreamingParallelism>,
589        ) = StreamingJob::find_by_id(streaming_job_id)
590            .select_only()
591            .column(streaming_job::Column::Parallelism)
592            .column(streaming_job::Column::BackfillParallelism)
593            .into_tuple()
594            .one(&inner.db)
595            .await?
596            .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", streaming_job_id))?;
597
598        Ok((job_parallelism, backfill_parallelism))
599    }
600
601    pub async fn get_fragment_streaming_job_id(
602        &self,
603        fragment_id: FragmentId,
604    ) -> MetaResult<JobId> {
605        let inner = self.inner.read().await;
606        let job_id: JobId = Fragment::find_by_id(fragment_id)
607            .select_only()
608            .column(fragment::Column::JobId)
609            .into_tuple()
610            .one(&inner.db)
611            .await?
612            .ok_or_else(|| MetaError::catalog_id_not_found("fragment", fragment_id))?;
613        Ok(job_id)
614    }
615
616    pub async fn list_streaming_job_with_database(
617        &self,
618    ) -> MetaResult<HashMap<DatabaseId, Vec<JobId>>> {
619        let inner = self.inner.read().await;
620        let database_objects: Vec<(DatabaseId, JobId)> = StreamingJob::find()
621            .select_only()
622            .column(object::Column::DatabaseId)
623            .column(streaming_job::Column::JobId)
624            .join(JoinType::LeftJoin, streaming_job::Relation::Object.def())
625            .into_tuple()
626            .all(&inner.db)
627            .await?;
628
629        Ok(database_objects.into_iter().into_group_map())
630    }
631
632    // Output: Vec<(table id, db name, schema name, table name, resource group)>
633    pub async fn list_table_objects(
634        &self,
635    ) -> MetaResult<Vec<(TableId, String, String, String, String)>> {
636        let inner = self.inner.read().await;
637        Ok(Object::find()
638            .select_only()
639            .join(JoinType::InnerJoin, object::Relation::Table.def())
640            .join(JoinType::InnerJoin, object::Relation::Database2.def())
641            .join(JoinType::InnerJoin, object::Relation::Schema2.def())
642            .column(object::Column::Oid)
643            .column(database::Column::Name)
644            .column(schema::Column::Name)
645            .column(table::Column::Name)
646            .column(database::Column::ResourceGroup)
647            .into_tuple()
648            .all(&inner.db)
649            .await?)
650    }
651
652    // Output: Vec<(source id, db name, schema name, source name, resource group)>
653    pub async fn list_source_objects(
654        &self,
655    ) -> MetaResult<Vec<(TableId, String, String, String, String)>> {
656        let inner = self.inner.read().await;
657        Ok(Object::find()
658            .select_only()
659            .join(JoinType::InnerJoin, object::Relation::Source.def())
660            .join(JoinType::InnerJoin, object::Relation::Database2.def())
661            .join(JoinType::InnerJoin, object::Relation::Schema2.def())
662            .column(object::Column::Oid)
663            .column(database::Column::Name)
664            .column(schema::Column::Name)
665            .column(source::Column::Name)
666            .column(database::Column::ResourceGroup)
667            .into_tuple()
668            .all(&inner.db)
669            .await?)
670    }
671
672    // Output: Vec<(sink id, db name, schema name, sink name, resource group)>
673    pub async fn list_sink_objects(
674        &self,
675    ) -> MetaResult<Vec<(TableId, String, String, String, String)>> {
676        let inner = self.inner.read().await;
677        Ok(Object::find()
678            .select_only()
679            .join(JoinType::InnerJoin, object::Relation::Sink.def())
680            .join(JoinType::InnerJoin, object::Relation::Database2.def())
681            .join(JoinType::InnerJoin, object::Relation::Schema2.def())
682            .column(object::Column::Oid)
683            .column(database::Column::Name)
684            .column(schema::Column::Name)
685            .column(sink::Column::Name)
686            .column(database::Column::ResourceGroup)
687            .into_tuple()
688            .all(&inner.db)
689            .await?)
690    }
691
692    pub async fn get_streaming_job_status(&self, streaming_job_id: JobId) -> MetaResult<JobStatus> {
693        let inner = self.inner.read().await;
694        let status = StreamingJob::find_by_id(streaming_job_id)
695            .select_only()
696            .column(streaming_job::Column::JobStatus)
697            .into_tuple()
698            .one(&inner.db)
699            .await?
700            .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", streaming_job_id))?;
701        Ok(status)
702    }
703
704    pub async fn get_streaming_job_extra_info(
705        &self,
706        job_ids: Vec<JobId>,
707    ) -> MetaResult<HashMap<JobId, StreamingJobExtraInfo>> {
708        let inner = self.inner.read().await;
709        let txn = inner.db.begin().await?;
710
711        self.get_streaming_job_extra_info_in_txn(&txn, job_ids)
712            .await
713    }
714
715    pub async fn get_streaming_job_extra_info_in_txn<C>(
716        &self,
717        txn: &C,
718        job_ids: Vec<JobId>,
719    ) -> MetaResult<HashMap<JobId, StreamingJobExtraInfo>>
720    where
721        C: ConnectionTrait,
722    {
723        fetch_streaming_job_extra_info(txn, job_ids).await
724    }
725}