risingwave_meta/controller/catalog/
get_op.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::catalog::ColumnCatalog;
16
17use super::*;
18use crate::controller::utils::{
19    get_database_resource_group, get_existing_job_resource_group, get_table_columns,
20};
21
22impl CatalogController {
23    pub async fn get_secret_by_id(&self, secret_id: SecretId) -> MetaResult<PbSecret> {
24        let inner = self.inner.read().await;
25        let (secret, obj) = Secret::find_by_id(secret_id)
26            .find_also_related(Object)
27            .one(&inner.db)
28            .await?
29            .ok_or_else(|| MetaError::catalog_id_not_found("secret", secret_id))?;
30        Ok(ObjectModel(secret, obj.unwrap()).into())
31    }
32
33    pub async fn get_object_database_id(&self, object_id: ObjectId) -> MetaResult<DatabaseId> {
34        let inner = self.inner.read().await;
35        let (database_id,): (Option<DatabaseId>,) = Object::find_by_id(object_id)
36            .select_only()
37            .select_column(object::Column::DatabaseId)
38            .into_tuple()
39            .one(&inner.db)
40            .await?
41            .ok_or_else(|| MetaError::catalog_id_not_found("object", object_id))?;
42        Ok(database_id.ok_or_else(|| anyhow!("object has no database id: {object_id}"))?)
43    }
44
45    pub async fn get_connection_by_id(
46        &self,
47        connection_id: ConnectionId,
48    ) -> MetaResult<PbConnection> {
49        let inner = self.inner.read().await;
50        let (conn, obj) = Connection::find_by_id(connection_id)
51            .find_also_related(Object)
52            .one(&inner.db)
53            .await?
54            .ok_or_else(|| MetaError::catalog_id_not_found("connection", connection_id))?;
55
56        Ok(ObjectModel(conn, obj.unwrap()).into())
57    }
58
59    pub async fn get_table_by_name(
60        &self,
61        database_name: &str,
62        table_name: &str,
63    ) -> MetaResult<Option<PbTable>> {
64        let inner = self.inner.read().await;
65        let table_obj = Table::find()
66            .find_also_related(Object)
67            .join(JoinType::InnerJoin, object::Relation::Database2.def())
68            .filter(
69                table::Column::Name
70                    .eq(table_name)
71                    .and(database::Column::Name.eq(database_name)),
72            )
73            .one(&inner.db)
74            .await?;
75        Ok(table_obj.map(|(table, obj)| ObjectModel(table, obj.unwrap()).into()))
76    }
77
78    pub async fn get_table_associated_source_id(
79        &self,
80        table_id: TableId,
81    ) -> MetaResult<Option<SourceId>> {
82        let inner = self.inner.read().await;
83        Table::find_by_id(table_id)
84            .select_only()
85            .select_column(table::Column::OptionalAssociatedSourceId)
86            .into_tuple()
87            .one(&inner.db)
88            .await?
89            .ok_or_else(|| MetaError::catalog_id_not_found("table", table_id))
90    }
91
92    pub async fn get_table_by_id(&self, table_id: TableId) -> MetaResult<PbTable> {
93        let inner = self.inner.read().await;
94        let table_obj = Table::find_by_id(table_id)
95            .find_also_related(Object)
96            .one(&inner.db)
97            .await?;
98        if let Some((table, obj)) = table_obj {
99            Ok(ObjectModel(table, obj.unwrap()).into())
100        } else {
101            Err(MetaError::catalog_id_not_found("table", table_id))
102        }
103    }
104
105    pub async fn get_user_created_table_by_ids(
106        &self,
107        table_ids: Vec<TableId>,
108    ) -> MetaResult<Vec<PbTable>> {
109        let inner = self.inner.read().await;
110        let table_objs = Table::find()
111            .find_also_related(Object)
112            .filter(
113                table::Column::TableId
114                    .is_in(table_ids.clone())
115                    .and(table::Column::TableType.eq(TableType::Table)),
116            )
117            .all(&inner.db)
118            .await?;
119        let tables = table_objs
120            .into_iter()
121            .map(|(table, obj)| ObjectModel(table, obj.unwrap()).into())
122            .collect();
123        Ok(tables)
124    }
125
126    pub async fn get_table_by_ids(
127        &self,
128        table_ids: Vec<TableId>,
129        include_dropped_table: bool,
130    ) -> MetaResult<Vec<PbTable>> {
131        let inner = self.inner.read().await;
132        let table_objs = Table::find()
133            .find_also_related(Object)
134            .filter(table::Column::TableId.is_in(table_ids.clone()))
135            .all(&inner.db)
136            .await?;
137        let tables = table_objs
138            .into_iter()
139            .map(|(table, obj)| ObjectModel(table, obj.unwrap()).into());
140        let tables = if include_dropped_table {
141            tables
142                .chain(inner.dropped_tables.iter().filter_map(|(id, t)| {
143                    if table_ids.contains(id) {
144                        Some(t.clone())
145                    } else {
146                        None
147                    }
148                }))
149                .collect()
150        } else {
151            tables.collect()
152        };
153        Ok(tables)
154    }
155
156    pub async fn get_table_columns(&self, id: TableId) -> MetaResult<Vec<ColumnCatalog>> {
157        let inner = self.inner.read().await;
158        Ok(get_table_columns(&inner.db, id)
159            .await?
160            .to_protobuf()
161            .into_iter()
162            .map(|col| col.into())
163            .collect())
164    }
165
166    pub async fn get_table_incoming_sinks(&self, table_id: TableId) -> MetaResult<Vec<PbSink>> {
167        let inner = self.inner.read().await;
168        let sink_objs = Sink::find()
169            .find_also_related(Object)
170            .filter(sink::Column::TargetTable.eq(table_id))
171            .all(&inner.db)
172            .await?;
173        Ok(sink_objs
174            .into_iter()
175            .map(|(sink, obj)| ObjectModel(sink, obj.unwrap()).into())
176            .collect())
177    }
178
179    pub async fn get_sink_by_ids(&self, sink_ids: Vec<SinkId>) -> MetaResult<Vec<PbSink>> {
180        let inner = self.inner.read().await;
181        let sink_objs = Sink::find()
182            .find_also_related(Object)
183            .filter(sink::Column::SinkId.is_in(sink_ids))
184            .all(&inner.db)
185            .await?;
186        Ok(sink_objs
187            .into_iter()
188            .map(|(sink, obj)| ObjectModel(sink, obj.unwrap()).into())
189            .collect())
190    }
191
192    pub async fn get_sink_auto_refresh_schema_from(
193        &self,
194        table_id: TableId,
195    ) -> MetaResult<Vec<PbSink>> {
196        let inner = self.inner.read().await;
197        let sink_objs = Sink::find()
198            .find_also_related(Object)
199            .filter(sink::Column::AutoRefreshSchemaFromTable.eq(table_id))
200            .all(&inner.db)
201            .await?;
202        Ok(sink_objs
203            .into_iter()
204            .map(|(sink, obj)| ObjectModel(sink, obj.unwrap()).into())
205            .collect())
206    }
207
208    pub async fn get_sink_state_table_ids(&self, sink_id: SinkId) -> MetaResult<Vec<TableId>> {
209        let inner = self.inner.read().await;
210        let tables: Vec<I32Array> = Fragment::find()
211            .select_only()
212            .column(fragment::Column::StateTableIds)
213            .filter(fragment::Column::JobId.eq(sink_id))
214            .into_tuple()
215            .all(&inner.db)
216            .await?;
217        Ok(tables
218            .into_iter()
219            .flat_map(|ids| ids.into_inner().into_iter())
220            .collect())
221    }
222
223    pub async fn get_subscription_by_id(
224        &self,
225        subscription_id: SubscriptionId,
226    ) -> MetaResult<PbSubscription> {
227        let inner = self.inner.read().await;
228        let subscription_objs = Subscription::find()
229            .find_also_related(Object)
230            .filter(subscription::Column::SubscriptionId.eq(subscription_id))
231            .all(&inner.db)
232            .await?;
233        let subscription: PbSubscription = subscription_objs
234            .into_iter()
235            .map(|(subscription, obj)| ObjectModel(subscription, obj.unwrap()).into())
236            .find_or_first(|_| true)
237            .ok_or_else(|| anyhow!("cannot find subscription with id {}", subscription_id))?;
238
239        Ok(subscription)
240    }
241
242    pub async fn get_mv_depended_subscriptions(
243        &self,
244        database_id: Option<DatabaseId>,
245    ) -> MetaResult<HashMap<DatabaseId, HashMap<TableId, HashMap<SubscriptionId, u64>>>> {
246        let inner = self.inner.read().await;
247        let select = Subscription::find()
248            .select_only()
249            .select_column(subscription::Column::SubscriptionId)
250            .select_column(subscription::Column::DependentTableId)
251            .select_column(subscription::Column::RetentionSeconds)
252            .select_column(object::Column::DatabaseId)
253            .join(JoinType::InnerJoin, subscription::Relation::Object.def());
254        let select = if let Some(database_id) = database_id {
255            select.filter(object::Column::DatabaseId.eq(database_id))
256        } else {
257            select
258        };
259        let subscription_objs: Vec<(SubscriptionId, ObjectId, i64, DatabaseId)> =
260            select.into_tuple().all(&inner.db).await?;
261        let mut map: HashMap<_, HashMap<_, HashMap<_, _>>> = HashMap::new();
262        // Write object at the same time we write subscription, so we must be able to get obj
263        for (subscription_id, dependent_table_id, retention_seconds, database_id) in
264            subscription_objs
265        {
266            map.entry(database_id)
267                .or_default()
268                .entry(dependent_table_id)
269                .or_default()
270                .insert(subscription_id, retention_seconds as _);
271        }
272        Ok(map)
273    }
274
275    pub async fn get_all_table_options(&self) -> MetaResult<HashMap<TableId, TableOption>> {
276        let inner = self.inner.read().await;
277        let table_options: Vec<(TableId, Option<i32>)> = Table::find()
278            .select_only()
279            .columns([table::Column::TableId, table::Column::RetentionSeconds])
280            .into_tuple::<(TableId, Option<i32>)>()
281            .all(&inner.db)
282            .await?;
283
284        Ok(table_options
285            .into_iter()
286            .map(|(id, retention_seconds)| {
287                (
288                    id,
289                    TableOption {
290                        retention_seconds: retention_seconds.map(|i| i.try_into().unwrap()),
291                    },
292                )
293            })
294            .collect())
295    }
296
297    pub async fn get_all_streaming_parallelisms(
298        &self,
299    ) -> MetaResult<HashMap<ObjectId, StreamingParallelism>> {
300        let inner = self.inner.read().await;
301
302        let job_parallelisms = StreamingJob::find()
303            .select_only()
304            .columns([
305                streaming_job::Column::JobId,
306                streaming_job::Column::Parallelism,
307            ])
308            .into_tuple::<(ObjectId, StreamingParallelism)>()
309            .all(&inner.db)
310            .await?;
311
312        Ok(job_parallelisms
313            .into_iter()
314            .collect::<HashMap<ObjectId, StreamingParallelism>>())
315    }
316
317    pub async fn get_table_name_type_mapping(
318        &self,
319    ) -> MetaResult<HashMap<TableId, (String, String)>> {
320        let inner = self.inner.read().await;
321        let table_name_types: Vec<(TableId, String, TableType)> = Table::find()
322            .select_only()
323            .columns([
324                table::Column::TableId,
325                table::Column::Name,
326                table::Column::TableType,
327            ])
328            .into_tuple()
329            .all(&inner.db)
330            .await?;
331        Ok(table_name_types
332            .into_iter()
333            .map(|(id, name, table_type)| {
334                (
335                    id,
336                    (name, PbTableType::from(table_type).as_str_name().to_owned()),
337                )
338            })
339            .collect())
340    }
341
342    pub async fn get_table_by_cdc_table_id(
343        &self,
344        cdc_table_id: &String,
345    ) -> MetaResult<Vec<PbTable>> {
346        let inner = self.inner.read().await;
347        let table_objs = Table::find()
348            .find_also_related(Object)
349            .filter(table::Column::CdcTableId.eq(cdc_table_id))
350            .all(&inner.db)
351            .await?;
352        Ok(table_objs
353            .into_iter()
354            .map(|(table, obj)| ObjectModel(table, obj.unwrap()).into())
355            .collect())
356    }
357
358    pub async fn get_created_table_ids(&self) -> MetaResult<Vec<TableId>> {
359        let inner = self.inner.read().await;
360
361        // created table ids.
362        let mut table_ids: Vec<TableId> = StreamingJob::find()
363            .select_only()
364            .column(streaming_job::Column::JobId)
365            .filter(streaming_job::Column::JobStatus.eq(JobStatus::Created))
366            .into_tuple()
367            .all(&inner.db)
368            .await?;
369
370        // internal table ids.
371        let internal_table_ids: Vec<TableId> = Table::find()
372            .select_only()
373            .column(table::Column::TableId)
374            .filter(table::Column::BelongsToJobId.is_in(table_ids.clone()))
375            .into_tuple()
376            .all(&inner.db)
377            .await?;
378        table_ids.extend(internal_table_ids);
379
380        Ok(table_ids)
381    }
382
383    /// Returns column ids of versioned tables.
384    /// Being versioned implies using `ColumnAwareSerde`.
385    pub async fn get_versioned_table_schemas(&self) -> MetaResult<HashMap<TableId, Vec<i32>>> {
386        let res = self
387            .list_all_state_tables()
388            .await?
389            .into_iter()
390            .filter_map(|t| {
391                if t.version.is_some() {
392                    let ret = (
393                        t.id.try_into().unwrap(),
394                        t.columns
395                            .iter()
396                            .map(|c| c.column_desc.as_ref().unwrap().column_id)
397                            .collect_vec(),
398                    );
399                    return Some(ret);
400                }
401                None
402            })
403            .collect();
404        Ok(res)
405    }
406
407    pub async fn get_existing_job_resource_group(
408        &self,
409        streaming_job_id: ObjectId,
410    ) -> MetaResult<String> {
411        let inner = self.inner.read().await;
412        get_existing_job_resource_group(&inner.db, streaming_job_id).await
413    }
414
415    pub async fn get_database_resource_group(&self, database_id: ObjectId) -> MetaResult<String> {
416        let inner = self.inner.read().await;
417        get_database_resource_group(&inner.db, database_id).await
418    }
419
420    pub async fn get_existing_job_resource_groups(
421        &self,
422        streaming_job_ids: Vec<ObjectId>,
423    ) -> MetaResult<HashMap<ObjectId, String>> {
424        let inner = self.inner.read().await;
425        let mut resource_groups = HashMap::new();
426        for job_id in streaming_job_ids {
427            let resource_group = get_existing_job_resource_group(&inner.db, job_id).await?;
428            resource_groups.insert(job_id, resource_group);
429        }
430
431        Ok(resource_groups)
432    }
433
434    pub async fn get_existing_job_database_resource_group(
435        &self,
436        streaming_job_id: ObjectId,
437    ) -> MetaResult<String> {
438        let inner = self.inner.read().await;
439        let database_id: ObjectId = StreamingJob::find_by_id(streaming_job_id)
440            .select_only()
441            .join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
442            .column(object::Column::DatabaseId)
443            .into_tuple()
444            .one(&inner.db)
445            .await?
446            .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", streaming_job_id))?;
447
448        get_database_resource_group(&inner.db, database_id).await
449    }
450
451    pub async fn get_job_streaming_parallelisms(
452        &self,
453        streaming_job_id: ObjectId,
454    ) -> MetaResult<StreamingParallelism> {
455        let inner = self.inner.read().await;
456
457        let job_parallelism: StreamingParallelism = StreamingJob::find_by_id(streaming_job_id)
458            .select_only()
459            .column(streaming_job::Column::Parallelism)
460            .into_tuple()
461            .one(&inner.db)
462            .await?
463            .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", streaming_job_id))?;
464
465        Ok(job_parallelism)
466    }
467
468    pub async fn get_fragment_streaming_job_id(
469        &self,
470        fragment_id: FragmentId,
471    ) -> MetaResult<ObjectId> {
472        let inner = self.inner.read().await;
473        let job_id: ObjectId = Fragment::find_by_id(fragment_id)
474            .select_only()
475            .column(fragment::Column::JobId)
476            .into_tuple()
477            .one(&inner.db)
478            .await?
479            .ok_or_else(|| MetaError::catalog_id_not_found("fragment", fragment_id))?;
480        Ok(job_id)
481    }
482
483    // Output: Vec<(table id, db name, schema name, table name, resource group)>
484    pub async fn list_table_objects(
485        &self,
486    ) -> MetaResult<Vec<(TableId, String, String, String, String)>> {
487        let inner = self.inner.read().await;
488        Ok(Object::find()
489            .select_only()
490            .join(JoinType::InnerJoin, object::Relation::Table.def())
491            .join(JoinType::InnerJoin, object::Relation::Database2.def())
492            .join(JoinType::InnerJoin, object::Relation::Schema2.def())
493            .column(object::Column::Oid)
494            .column(database::Column::Name)
495            .column(schema::Column::Name)
496            .column(table::Column::Name)
497            .column(database::Column::ResourceGroup)
498            .into_tuple()
499            .all(&inner.db)
500            .await?)
501    }
502
503    // Output: Vec<(source id, db name, schema name, source name, resource group)>
504    pub async fn list_source_objects(
505        &self,
506    ) -> MetaResult<Vec<(TableId, String, String, String, String)>> {
507        let inner = self.inner.read().await;
508        Ok(Object::find()
509            .select_only()
510            .join(JoinType::InnerJoin, object::Relation::Source.def())
511            .join(JoinType::InnerJoin, object::Relation::Database2.def())
512            .join(JoinType::InnerJoin, object::Relation::Schema2.def())
513            .column(object::Column::Oid)
514            .column(database::Column::Name)
515            .column(schema::Column::Name)
516            .column(source::Column::Name)
517            .column(database::Column::ResourceGroup)
518            .into_tuple()
519            .all(&inner.db)
520            .await?)
521    }
522
523    // Output: Vec<(sink id, db name, schema name, sink name, resource group)>
524    pub async fn list_sink_objects(
525        &self,
526    ) -> MetaResult<Vec<(TableId, String, String, String, String)>> {
527        let inner = self.inner.read().await;
528        Ok(Object::find()
529            .select_only()
530            .join(JoinType::InnerJoin, object::Relation::Sink.def())
531            .join(JoinType::InnerJoin, object::Relation::Database2.def())
532            .join(JoinType::InnerJoin, object::Relation::Schema2.def())
533            .column(object::Column::Oid)
534            .column(database::Column::Name)
535            .column(schema::Column::Name)
536            .column(sink::Column::Name)
537            .column(database::Column::ResourceGroup)
538            .into_tuple()
539            .all(&inner.db)
540            .await?)
541    }
542
543    pub async fn get_streaming_job_status(
544        &self,
545        streaming_job_id: ObjectId,
546    ) -> MetaResult<JobStatus> {
547        let inner = self.inner.read().await;
548        let status = StreamingJob::find_by_id(streaming_job_id)
549            .select_only()
550            .column(streaming_job::Column::JobStatus)
551            .into_tuple()
552            .one(&inner.db)
553            .await?
554            .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", streaming_job_id))?;
555        Ok(status)
556    }
557}