risingwave_meta/controller/catalog/
get_op.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::catalog::ColumnCatalog;
16use risingwave_common::id::JobId;
17
18use super::*;
19use crate::controller::utils::{
20    StreamingJobExtraInfo, get_database_resource_group, get_existing_job_resource_group,
21    get_streaming_job_extra_info as fetch_streaming_job_extra_info, get_table_columns,
22};
23
24impl CatalogController {
25    pub async fn get_secret_by_id(&self, secret_id: SecretId) -> MetaResult<PbSecret> {
26        let inner = self.inner.read().await;
27        let (secret, obj) = Secret::find_by_id(secret_id)
28            .find_also_related(Object)
29            .one(&inner.db)
30            .await?
31            .ok_or_else(|| MetaError::catalog_id_not_found("secret", secret_id))?;
32        Ok(ObjectModel(secret, obj.unwrap()).into())
33    }
34
35    pub async fn get_object_database_id(
36        &self,
37        object_id: impl Into<ObjectId>,
38    ) -> MetaResult<DatabaseId> {
39        let object_id = object_id.into();
40        let inner = self.inner.read().await;
41        let (database_id,): (Option<DatabaseId>,) = Object::find_by_id(object_id)
42            .select_only()
43            .select_column(object::Column::DatabaseId)
44            .into_tuple()
45            .one(&inner.db)
46            .await?
47            .ok_or_else(|| MetaError::catalog_id_not_found("object", object_id))?;
48        Ok(database_id.ok_or_else(|| anyhow!("object has no database id: {object_id}"))?)
49    }
50
51    pub async fn get_connection_by_id(
52        &self,
53        connection_id: ConnectionId,
54    ) -> MetaResult<PbConnection> {
55        let inner = self.inner.read().await;
56        let (conn, obj) = Connection::find_by_id(connection_id)
57            .find_also_related(Object)
58            .one(&inner.db)
59            .await?
60            .ok_or_else(|| MetaError::catalog_id_not_found("connection", connection_id))?;
61
62        Ok(ObjectModel(conn, obj.unwrap()).into())
63    }
64
65    pub async fn get_table_catalog_by_name(
66        &self,
67        database_id: DatabaseId,
68        schema_id: SchemaId,
69        name: &str,
70    ) -> MetaResult<Option<PbTable>> {
71        let inner = self.inner.read().await;
72        let table_obj = Table::find()
73            .find_also_related(Object)
74            .filter(
75                table::Column::Name
76                    .eq(name)
77                    .and(object::Column::DatabaseId.eq(database_id))
78                    .and(object::Column::SchemaId.eq(schema_id)),
79            )
80            .one(&inner.db)
81            .await?;
82        Ok(table_obj.map(|(table, obj)| ObjectModel(table, obj.unwrap()).into()))
83    }
84
85    pub async fn get_table_by_name(
86        &self,
87        database_name: &str,
88        table_name: &str,
89    ) -> MetaResult<Option<PbTable>> {
90        let inner = self.inner.read().await;
91        let table_obj = Table::find()
92            .find_also_related(Object)
93            .join(JoinType::InnerJoin, object::Relation::Database2.def())
94            .filter(
95                table::Column::Name
96                    .eq(table_name)
97                    .and(database::Column::Name.eq(database_name)),
98            )
99            .one(&inner.db)
100            .await?;
101        Ok(table_obj.map(|(table, obj)| ObjectModel(table, obj.unwrap()).into()))
102    }
103
104    pub async fn get_table_associated_source_id(
105        &self,
106        table_id: TableId,
107    ) -> MetaResult<Option<SourceId>> {
108        let inner = self.inner.read().await;
109        Table::find_by_id(table_id)
110            .select_only()
111            .select_column(table::Column::OptionalAssociatedSourceId)
112            .into_tuple()
113            .one(&inner.db)
114            .await?
115            .ok_or_else(|| MetaError::catalog_id_not_found("table", table_id))
116    }
117
118    pub async fn get_table_by_associate_source_id(
119        &self,
120        associated_source_id: SourceId,
121    ) -> MetaResult<PbTable> {
122        let inner = self.inner.read().await;
123        Table::find()
124            .find_also_related(Object)
125            .filter(table::Column::OptionalAssociatedSourceId.eq(associated_source_id))
126            .one(&inner.db)
127            .await?
128            .map(|(table, obj)| ObjectModel(table, obj.unwrap()).into())
129            .ok_or_else(|| {
130                MetaError::catalog_id_not_found("table associated source", associated_source_id)
131            })
132    }
133
134    pub async fn get_table_by_id(&self, table_id: TableId) -> MetaResult<PbTable> {
135        let inner = self.inner.read().await;
136        let table_obj = Table::find_by_id(table_id)
137            .find_also_related(Object)
138            .one(&inner.db)
139            .await?;
140        if let Some((table, obj)) = table_obj {
141            Ok(ObjectModel(table, obj.unwrap()).into())
142        } else {
143            Err(MetaError::catalog_id_not_found("table", table_id))
144        }
145    }
146
147    pub async fn get_user_created_table_by_ids(
148        &self,
149        job_ids: impl Iterator<Item = JobId>,
150    ) -> MetaResult<Vec<PbTable>> {
151        let inner = self.inner.read().await;
152        let table_objs = Table::find()
153            .find_also_related(Object)
154            .filter(
155                table::Column::TableId
156                    .is_in(job_ids.map(|job_id| job_id.as_mv_table_id()).collect_vec())
157                    .and(table::Column::TableType.eq(TableType::Table)),
158            )
159            .all(&inner.db)
160            .await?;
161        let tables = table_objs
162            .into_iter()
163            .map(|(table, obj)| ObjectModel(table, obj.unwrap()).into())
164            .collect();
165        Ok(tables)
166    }
167
168    pub async fn get_table_by_ids(
169        &self,
170        table_ids: Vec<TableId>,
171        include_dropped_table: bool,
172    ) -> MetaResult<Vec<PbTable>> {
173        let inner = self.inner.read().await;
174        let table_objs = Table::find()
175            .find_also_related(Object)
176            .filter(table::Column::TableId.is_in(table_ids.clone()))
177            .all(&inner.db)
178            .await?;
179        let tables = table_objs
180            .into_iter()
181            .map(|(table, obj)| ObjectModel(table, obj.unwrap()).into());
182        let tables = if include_dropped_table {
183            tables
184                .chain(inner.dropped_tables.iter().filter_map(|(id, t)| {
185                    if table_ids.contains(id) {
186                        Some(t.clone())
187                    } else {
188                        None
189                    }
190                }))
191                .collect()
192        } else {
193            tables.collect()
194        };
195        Ok(tables)
196    }
197
198    pub async fn get_table_columns(&self, id: TableId) -> MetaResult<Vec<ColumnCatalog>> {
199        let inner = self.inner.read().await;
200        Ok(get_table_columns(&inner.db, id)
201            .await?
202            .to_protobuf()
203            .into_iter()
204            .map(|col| col.into())
205            .collect())
206    }
207
208    pub async fn get_table_incoming_sinks(&self, table_id: TableId) -> MetaResult<Vec<PbSink>> {
209        let inner = self.inner.read().await;
210        let sink_objs = Sink::find()
211            .find_also_related(Object)
212            .filter(sink::Column::TargetTable.eq(table_id))
213            .all(&inner.db)
214            .await?;
215        Ok(sink_objs
216            .into_iter()
217            .map(|(sink, obj)| ObjectModel(sink, obj.unwrap()).into())
218            .collect())
219    }
220
221    pub async fn get_sink_by_id(&self, sink_id: SinkId) -> MetaResult<Option<PbSink>> {
222        let inner = self.inner.read().await;
223        let sink_objs = Sink::find_by_id(sink_id)
224            .find_also_related(Object)
225            .one(&inner.db)
226            .await?;
227        Ok(sink_objs.map(|(sink, obj)| ObjectModel(sink, obj.unwrap()).into()))
228    }
229
230    pub async fn get_sink_auto_refresh_schema_from(
231        &self,
232        table_id: TableId,
233    ) -> MetaResult<Vec<PbSink>> {
234        let inner = self.inner.read().await;
235        let sink_objs = Sink::find()
236            .find_also_related(Object)
237            .filter(sink::Column::AutoRefreshSchemaFromTable.eq(table_id))
238            .all(&inner.db)
239            .await?;
240        Ok(sink_objs
241            .into_iter()
242            .map(|(sink, obj)| ObjectModel(sink, obj.unwrap()).into())
243            .collect())
244    }
245
246    pub async fn get_sink_state_table_ids(&self, sink_id: SinkId) -> MetaResult<Vec<TableId>> {
247        let inner = self.inner.read().await;
248        let tables: Vec<I32Array> = Fragment::find()
249            .select_only()
250            .column(fragment::Column::StateTableIds)
251            .filter(fragment::Column::JobId.eq(sink_id))
252            .into_tuple()
253            .all(&inner.db)
254            .await?;
255        Ok(tables
256            .into_iter()
257            .flat_map(|ids| {
258                ids.into_inner()
259                    .into_iter()
260                    .map(|table_id| TableId::new(table_id as _))
261            })
262            .collect())
263    }
264
265    pub async fn get_subscription_by_id(
266        &self,
267        subscription_id: SubscriptionId,
268    ) -> MetaResult<PbSubscription> {
269        let inner = self.inner.read().await;
270        let subscription_objs = Subscription::find()
271            .find_also_related(Object)
272            .filter(subscription::Column::SubscriptionId.eq(subscription_id))
273            .all(&inner.db)
274            .await?;
275        let subscription: PbSubscription = subscription_objs
276            .into_iter()
277            .map(|(subscription, obj)| ObjectModel(subscription, obj.unwrap()).into())
278            .find_or_first(|_| true)
279            .ok_or_else(|| anyhow!("cannot find subscription with id {}", subscription_id))?;
280
281        Ok(subscription)
282    }
283
284    pub async fn get_mv_depended_subscriptions(
285        &self,
286        database_id: Option<DatabaseId>,
287    ) -> MetaResult<HashMap<TableId, HashMap<SubscriptionId, u64>>> {
288        let inner = self.inner.read().await;
289        let select = Subscription::find()
290            .select_only()
291            .select_column(subscription::Column::SubscriptionId)
292            .select_column(subscription::Column::DependentTableId)
293            .select_column(subscription::Column::RetentionSeconds);
294        let select = if let Some(database_id) = database_id {
295            select
296                .join(JoinType::InnerJoin, subscription::Relation::Object.def())
297                .filter(object::Column::DatabaseId.eq(database_id))
298        } else {
299            select
300        };
301        let subscription_objs: Vec<(SubscriptionId, TableId, i64)> =
302            select.into_tuple().all(&inner.db).await?;
303        let mut map: HashMap<_, HashMap<_, _>> = HashMap::new();
304        // Write object at the same time we write subscription, so we must be able to get obj
305        for (subscription_id, dependent_table_id, retention_seconds) in subscription_objs {
306            map.entry(dependent_table_id)
307                .or_default()
308                .insert(subscription_id, retention_seconds as _);
309        }
310        Ok(map)
311    }
312
313    pub async fn get_all_table_options(&self) -> MetaResult<HashMap<TableId, TableOption>> {
314        let inner = self.inner.read().await;
315        let table_options: Vec<(TableId, Option<i32>)> = Table::find()
316            .select_only()
317            .columns([table::Column::TableId, table::Column::RetentionSeconds])
318            .into_tuple::<(TableId, Option<i32>)>()
319            .all(&inner.db)
320            .await?;
321
322        Ok(table_options
323            .into_iter()
324            .map(|(id, retention_seconds)| {
325                (
326                    id,
327                    TableOption {
328                        retention_seconds: retention_seconds.map(|i| i.try_into().unwrap()),
329                    },
330                )
331            })
332            .collect())
333    }
334
335    pub async fn get_all_streaming_parallelisms(
336        &self,
337    ) -> MetaResult<HashMap<ObjectId, StreamingParallelism>> {
338        let inner = self.inner.read().await;
339
340        let job_parallelisms = StreamingJob::find()
341            .select_only()
342            .columns([
343                streaming_job::Column::JobId,
344                streaming_job::Column::Parallelism,
345            ])
346            .into_tuple::<(ObjectId, StreamingParallelism)>()
347            .all(&inner.db)
348            .await?;
349
350        Ok(job_parallelisms
351            .into_iter()
352            .collect::<HashMap<ObjectId, StreamingParallelism>>())
353    }
354
355    pub async fn get_table_name_type_mapping(
356        &self,
357    ) -> MetaResult<HashMap<TableId, (String, String)>> {
358        let inner = self.inner.read().await;
359        let table_name_types: Vec<(TableId, String, TableType)> = Table::find()
360            .select_only()
361            .columns([
362                table::Column::TableId,
363                table::Column::Name,
364                table::Column::TableType,
365            ])
366            .into_tuple()
367            .all(&inner.db)
368            .await?;
369        Ok(table_name_types
370            .into_iter()
371            .map(|(id, name, table_type)| {
372                (
373                    id,
374                    (name, PbTableType::from(table_type).as_str_name().to_owned()),
375                )
376            })
377            .collect())
378    }
379
380    pub async fn get_table_by_cdc_table_id(
381        &self,
382        cdc_table_id: &String,
383    ) -> MetaResult<Vec<PbTable>> {
384        let inner = self.inner.read().await;
385        let table_objs = Table::find()
386            .find_also_related(Object)
387            .filter(table::Column::CdcTableId.eq(cdc_table_id))
388            .all(&inner.db)
389            .await?;
390        Ok(table_objs
391            .into_iter()
392            .map(|(table, obj)| ObjectModel(table, obj.unwrap()).into())
393            .collect())
394    }
395
396    pub async fn get_created_table_ids(&self) -> MetaResult<Vec<TableId>> {
397        let inner = self.inner.read().await;
398
399        // created table ids.
400        let mut table_ids: Vec<TableId> = StreamingJob::find()
401            .select_only()
402            .column(streaming_job::Column::JobId)
403            .filter(streaming_job::Column::JobStatus.eq(JobStatus::Created))
404            .into_tuple()
405            .all(&inner.db)
406            .await?;
407
408        // internal table ids.
409        let internal_table_ids: Vec<TableId> = Table::find()
410            .select_only()
411            .column(table::Column::TableId)
412            .filter(table::Column::BelongsToJobId.is_in(table_ids.clone()))
413            .into_tuple()
414            .all(&inner.db)
415            .await?;
416        table_ids.extend(internal_table_ids);
417
418        Ok(table_ids)
419    }
420
421    /// Returns column ids of versioned tables.
422    /// Being versioned implies using `ColumnAwareSerde`.
423    pub async fn get_versioned_table_schemas(
424        &self,
425    ) -> MetaResult<HashMap<risingwave_common::catalog::TableId, Vec<i32>>> {
426        let res = self
427            .list_all_state_tables()
428            .await?
429            .into_iter()
430            .filter_map(|t| {
431                if t.version.is_some() {
432                    let ret = (
433                        t.id,
434                        t.columns
435                            .iter()
436                            .map(|c| c.column_desc.as_ref().unwrap().column_id)
437                            .collect_vec(),
438                    );
439                    return Some(ret);
440                }
441                None
442            })
443            .collect();
444        Ok(res)
445    }
446
447    pub async fn get_existing_job_resource_group(
448        &self,
449        streaming_job_id: JobId,
450    ) -> MetaResult<String> {
451        let inner = self.inner.read().await;
452        get_existing_job_resource_group(&inner.db, streaming_job_id).await
453    }
454
455    pub async fn get_database_resource_group(&self, database_id: DatabaseId) -> MetaResult<String> {
456        let inner = self.inner.read().await;
457        get_database_resource_group(&inner.db, database_id).await
458    }
459
460    pub async fn get_existing_job_resource_groups(
461        &self,
462        streaming_job_ids: Vec<JobId>,
463    ) -> MetaResult<HashMap<JobId, String>> {
464        let inner = self.inner.read().await;
465        let mut resource_groups = HashMap::new();
466        for job_id in streaming_job_ids {
467            let resource_group = get_existing_job_resource_group(&inner.db, job_id).await?;
468            resource_groups.insert(job_id, resource_group);
469        }
470
471        Ok(resource_groups)
472    }
473
474    pub async fn get_existing_job_database_resource_group(
475        &self,
476        streaming_job_id: JobId,
477    ) -> MetaResult<String> {
478        let inner = self.inner.read().await;
479        let database_id: DatabaseId = StreamingJob::find_by_id(streaming_job_id)
480            .select_only()
481            .join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
482            .column(object::Column::DatabaseId)
483            .into_tuple()
484            .one(&inner.db)
485            .await?
486            .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", streaming_job_id))?;
487
488        get_database_resource_group(&inner.db, database_id).await
489    }
490
491    pub async fn get_job_streaming_parallelisms(
492        &self,
493        streaming_job_id: JobId,
494    ) -> MetaResult<StreamingParallelism> {
495        let inner = self.inner.read().await;
496
497        let job_parallelism: StreamingParallelism = StreamingJob::find_by_id(streaming_job_id)
498            .select_only()
499            .column(streaming_job::Column::Parallelism)
500            .into_tuple()
501            .one(&inner.db)
502            .await?
503            .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", streaming_job_id))?;
504
505        Ok(job_parallelism)
506    }
507
508    pub async fn get_fragment_streaming_job_id(
509        &self,
510        fragment_id: FragmentId,
511    ) -> MetaResult<JobId> {
512        let inner = self.inner.read().await;
513        let job_id: JobId = Fragment::find_by_id(fragment_id)
514            .select_only()
515            .column(fragment::Column::JobId)
516            .into_tuple()
517            .one(&inner.db)
518            .await?
519            .ok_or_else(|| MetaError::catalog_id_not_found("fragment", fragment_id))?;
520        Ok(job_id)
521    }
522
523    pub async fn list_streaming_job_with_database(
524        &self,
525    ) -> MetaResult<HashMap<DatabaseId, Vec<JobId>>> {
526        let inner = self.inner.read().await;
527        let database_objects: Vec<(DatabaseId, JobId)> = StreamingJob::find()
528            .select_only()
529            .column(object::Column::DatabaseId)
530            .column(streaming_job::Column::JobId)
531            .join(JoinType::LeftJoin, streaming_job::Relation::Object.def())
532            .into_tuple()
533            .all(&inner.db)
534            .await?;
535
536        Ok(database_objects.into_iter().into_group_map())
537    }
538
539    // Output: Vec<(table id, db name, schema name, table name, resource group)>
540    pub async fn list_table_objects(
541        &self,
542    ) -> MetaResult<Vec<(TableId, String, String, String, String)>> {
543        let inner = self.inner.read().await;
544        Ok(Object::find()
545            .select_only()
546            .join(JoinType::InnerJoin, object::Relation::Table.def())
547            .join(JoinType::InnerJoin, object::Relation::Database2.def())
548            .join(JoinType::InnerJoin, object::Relation::Schema2.def())
549            .column(object::Column::Oid)
550            .column(database::Column::Name)
551            .column(schema::Column::Name)
552            .column(table::Column::Name)
553            .column(database::Column::ResourceGroup)
554            .into_tuple()
555            .all(&inner.db)
556            .await?)
557    }
558
559    // Output: Vec<(source id, db name, schema name, source name, resource group)>
560    pub async fn list_source_objects(
561        &self,
562    ) -> MetaResult<Vec<(TableId, String, String, String, String)>> {
563        let inner = self.inner.read().await;
564        Ok(Object::find()
565            .select_only()
566            .join(JoinType::InnerJoin, object::Relation::Source.def())
567            .join(JoinType::InnerJoin, object::Relation::Database2.def())
568            .join(JoinType::InnerJoin, object::Relation::Schema2.def())
569            .column(object::Column::Oid)
570            .column(database::Column::Name)
571            .column(schema::Column::Name)
572            .column(source::Column::Name)
573            .column(database::Column::ResourceGroup)
574            .into_tuple()
575            .all(&inner.db)
576            .await?)
577    }
578
579    // Output: Vec<(sink id, db name, schema name, sink name, resource group)>
580    pub async fn list_sink_objects(
581        &self,
582    ) -> MetaResult<Vec<(TableId, String, String, String, String)>> {
583        let inner = self.inner.read().await;
584        Ok(Object::find()
585            .select_only()
586            .join(JoinType::InnerJoin, object::Relation::Sink.def())
587            .join(JoinType::InnerJoin, object::Relation::Database2.def())
588            .join(JoinType::InnerJoin, object::Relation::Schema2.def())
589            .column(object::Column::Oid)
590            .column(database::Column::Name)
591            .column(schema::Column::Name)
592            .column(sink::Column::Name)
593            .column(database::Column::ResourceGroup)
594            .into_tuple()
595            .all(&inner.db)
596            .await?)
597    }
598
599    pub async fn get_streaming_job_status(&self, streaming_job_id: JobId) -> MetaResult<JobStatus> {
600        let inner = self.inner.read().await;
601        let status = StreamingJob::find_by_id(streaming_job_id)
602            .select_only()
603            .column(streaming_job::Column::JobStatus)
604            .into_tuple()
605            .one(&inner.db)
606            .await?
607            .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", streaming_job_id))?;
608        Ok(status)
609    }
610
611    pub async fn get_streaming_job_extra_info(
612        &self,
613        job_ids: Vec<JobId>,
614    ) -> MetaResult<HashMap<JobId, StreamingJobExtraInfo>> {
615        let inner = self.inner.read().await;
616        let txn = inner.db.begin().await?;
617
618        let result = fetch_streaming_job_extra_info(&txn, job_ids).await?;
619        Ok(result)
620    }
621}