risingwave_meta/controller/catalog/
get_op.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::catalog::ColumnCatalog;
16
17use super::*;
18use crate::controller::utils::{
19    get_database_resource_group, get_existing_job_resource_group, get_table_columns,
20};
21
22impl CatalogController {
23    pub async fn get_secret_by_id(&self, secret_id: SecretId) -> MetaResult<PbSecret> {
24        let inner = self.inner.read().await;
25        let (secret, obj) = Secret::find_by_id(secret_id)
26            .find_also_related(Object)
27            .one(&inner.db)
28            .await?
29            .ok_or_else(|| MetaError::catalog_id_not_found("secret", secret_id))?;
30        Ok(ObjectModel(secret, obj.unwrap()).into())
31    }
32
33    pub async fn get_object_database_id(&self, object_id: ObjectId) -> MetaResult<DatabaseId> {
34        let inner = self.inner.read().await;
35        let (database_id,): (Option<DatabaseId>,) = Object::find_by_id(object_id)
36            .select_only()
37            .select_column(object::Column::DatabaseId)
38            .into_tuple()
39            .one(&inner.db)
40            .await?
41            .ok_or_else(|| MetaError::catalog_id_not_found("object", object_id))?;
42        Ok(database_id.ok_or_else(|| anyhow!("object has no database id: {object_id}"))?)
43    }
44
45    pub async fn get_connection_by_id(
46        &self,
47        connection_id: ConnectionId,
48    ) -> MetaResult<PbConnection> {
49        let inner = self.inner.read().await;
50        let (conn, obj) = Connection::find_by_id(connection_id)
51            .find_also_related(Object)
52            .one(&inner.db)
53            .await?
54            .ok_or_else(|| MetaError::catalog_id_not_found("connection", connection_id))?;
55
56        Ok(ObjectModel(conn, obj.unwrap()).into())
57    }
58
59    pub async fn get_table_by_name(
60        &self,
61        database_name: &str,
62        table_name: &str,
63    ) -> MetaResult<Option<PbTable>> {
64        let inner = self.inner.read().await;
65        let table_obj = Table::find()
66            .find_also_related(Object)
67            .join(JoinType::InnerJoin, object::Relation::Database2.def())
68            .filter(
69                table::Column::Name
70                    .eq(table_name)
71                    .and(database::Column::Name.eq(database_name)),
72            )
73            .one(&inner.db)
74            .await?;
75        Ok(table_obj.map(|(table, obj)| ObjectModel(table, obj.unwrap()).into()))
76    }
77
78    pub async fn get_table_associated_source_id(
79        &self,
80        table_id: TableId,
81    ) -> MetaResult<Option<SourceId>> {
82        let inner = self.inner.read().await;
83        Table::find_by_id(table_id)
84            .select_only()
85            .select_column(table::Column::OptionalAssociatedSourceId)
86            .into_tuple()
87            .one(&inner.db)
88            .await?
89            .ok_or_else(|| MetaError::catalog_id_not_found("table", table_id))
90    }
91
92    pub async fn get_table_by_id(&self, table_id: TableId) -> MetaResult<PbTable> {
93        let inner = self.inner.read().await;
94        let table_obj = Table::find_by_id(table_id)
95            .find_also_related(Object)
96            .one(&inner.db)
97            .await?;
98        if let Some((table, obj)) = table_obj {
99            Ok(ObjectModel(table, obj.unwrap()).into())
100        } else {
101            Err(MetaError::catalog_id_not_found("table", table_id))
102        }
103    }
104
105    pub async fn get_table_by_ids(
106        &self,
107        table_ids: Vec<TableId>,
108        include_dropped_table: bool,
109    ) -> MetaResult<Vec<PbTable>> {
110        let inner = self.inner.read().await;
111        let table_objs = Table::find()
112            .find_also_related(Object)
113            .filter(table::Column::TableId.is_in(table_ids.clone()))
114            .all(&inner.db)
115            .await?;
116        let tables = table_objs
117            .into_iter()
118            .map(|(table, obj)| ObjectModel(table, obj.unwrap()).into());
119        let tables = if include_dropped_table {
120            tables
121                .chain(inner.dropped_tables.iter().filter_map(|(id, t)| {
122                    if table_ids.contains(id) {
123                        Some(t.clone())
124                    } else {
125                        None
126                    }
127                }))
128                .collect()
129        } else {
130            tables.collect()
131        };
132        Ok(tables)
133    }
134
135    pub async fn get_table_columns(&self, id: TableId) -> MetaResult<Vec<ColumnCatalog>> {
136        let inner = self.inner.read().await;
137        Ok(get_table_columns(&inner.db, id)
138            .await?
139            .to_protobuf()
140            .into_iter()
141            .map(|col| col.into())
142            .collect())
143    }
144
145    pub async fn get_sink_by_ids(&self, sink_ids: Vec<SinkId>) -> MetaResult<Vec<PbSink>> {
146        let inner = self.inner.read().await;
147        let sink_objs = Sink::find()
148            .find_also_related(Object)
149            .filter(sink::Column::SinkId.is_in(sink_ids))
150            .all(&inner.db)
151            .await?;
152        Ok(sink_objs
153            .into_iter()
154            .map(|(sink, obj)| ObjectModel(sink, obj.unwrap()).into())
155            .collect())
156    }
157
158    pub async fn get_sink_auto_refresh_schema_from(
159        &self,
160        table_id: TableId,
161    ) -> MetaResult<Vec<PbSink>> {
162        let inner = self.inner.read().await;
163        let sink_objs = Sink::find()
164            .find_also_related(Object)
165            .filter(sink::Column::AutoRefreshSchemaFromTable.eq(table_id))
166            .all(&inner.db)
167            .await?;
168        Ok(sink_objs
169            .into_iter()
170            .map(|(sink, obj)| ObjectModel(sink, obj.unwrap()).into())
171            .collect())
172    }
173
174    pub async fn get_sink_state_table_ids(&self, sink_id: SinkId) -> MetaResult<Vec<TableId>> {
175        let inner = self.inner.read().await;
176        let tables: Vec<I32Array> = Fragment::find()
177            .select_only()
178            .column(fragment::Column::StateTableIds)
179            .filter(fragment::Column::JobId.eq(sink_id))
180            .into_tuple()
181            .all(&inner.db)
182            .await?;
183        Ok(tables
184            .into_iter()
185            .flat_map(|ids| ids.into_inner().into_iter())
186            .collect())
187    }
188
189    pub async fn get_subscription_by_id(
190        &self,
191        subscription_id: SubscriptionId,
192    ) -> MetaResult<PbSubscription> {
193        let inner = self.inner.read().await;
194        let subscription_objs = Subscription::find()
195            .find_also_related(Object)
196            .filter(subscription::Column::SubscriptionId.eq(subscription_id))
197            .all(&inner.db)
198            .await?;
199        let subscription: PbSubscription = subscription_objs
200            .into_iter()
201            .map(|(subscription, obj)| ObjectModel(subscription, obj.unwrap()).into())
202            .find_or_first(|_| true)
203            .ok_or_else(|| anyhow!("cannot find subscription with id {}", subscription_id))?;
204
205        Ok(subscription)
206    }
207
208    pub async fn get_mv_depended_subscriptions(
209        &self,
210        database_id: Option<DatabaseId>,
211    ) -> MetaResult<HashMap<DatabaseId, HashMap<TableId, HashMap<SubscriptionId, u64>>>> {
212        let inner = self.inner.read().await;
213        let select = Subscription::find()
214            .select_only()
215            .select_column(subscription::Column::SubscriptionId)
216            .select_column(subscription::Column::DependentTableId)
217            .select_column(subscription::Column::RetentionSeconds)
218            .select_column(object::Column::DatabaseId)
219            .join(JoinType::InnerJoin, subscription::Relation::Object.def());
220        let select = if let Some(database_id) = database_id {
221            select.filter(object::Column::DatabaseId.eq(database_id))
222        } else {
223            select
224        };
225        let subscription_objs: Vec<(SubscriptionId, ObjectId, i64, DatabaseId)> =
226            select.into_tuple().all(&inner.db).await?;
227        let mut map: HashMap<_, HashMap<_, HashMap<_, _>>> = HashMap::new();
228        // Write object at the same time we write subscription, so we must be able to get obj
229        for (subscription_id, dependent_table_id, retention_seconds, database_id) in
230            subscription_objs
231        {
232            map.entry(database_id)
233                .or_default()
234                .entry(dependent_table_id)
235                .or_default()
236                .insert(subscription_id, retention_seconds as _);
237        }
238        Ok(map)
239    }
240
241    pub async fn get_all_table_options(&self) -> MetaResult<HashMap<TableId, TableOption>> {
242        let inner = self.inner.read().await;
243        let table_options: Vec<(TableId, Option<i32>)> = Table::find()
244            .select_only()
245            .columns([table::Column::TableId, table::Column::RetentionSeconds])
246            .into_tuple::<(TableId, Option<i32>)>()
247            .all(&inner.db)
248            .await?;
249
250        Ok(table_options
251            .into_iter()
252            .map(|(id, retention_seconds)| {
253                (
254                    id,
255                    TableOption {
256                        retention_seconds: retention_seconds.map(|i| i.try_into().unwrap()),
257                    },
258                )
259            })
260            .collect())
261    }
262
263    pub async fn get_all_streaming_parallelisms(
264        &self,
265    ) -> MetaResult<HashMap<ObjectId, StreamingParallelism>> {
266        let inner = self.inner.read().await;
267
268        let job_parallelisms = StreamingJob::find()
269            .select_only()
270            .columns([
271                streaming_job::Column::JobId,
272                streaming_job::Column::Parallelism,
273            ])
274            .into_tuple::<(ObjectId, StreamingParallelism)>()
275            .all(&inner.db)
276            .await?;
277
278        Ok(job_parallelisms
279            .into_iter()
280            .collect::<HashMap<ObjectId, StreamingParallelism>>())
281    }
282
283    pub async fn get_table_name_type_mapping(
284        &self,
285    ) -> MetaResult<HashMap<TableId, (String, String)>> {
286        let inner = self.inner.read().await;
287        let table_name_types: Vec<(TableId, String, TableType)> = Table::find()
288            .select_only()
289            .columns([
290                table::Column::TableId,
291                table::Column::Name,
292                table::Column::TableType,
293            ])
294            .into_tuple()
295            .all(&inner.db)
296            .await?;
297        Ok(table_name_types
298            .into_iter()
299            .map(|(id, name, table_type)| {
300                (
301                    id,
302                    (name, PbTableType::from(table_type).as_str_name().to_owned()),
303                )
304            })
305            .collect())
306    }
307
308    pub async fn get_table_by_cdc_table_id(
309        &self,
310        cdc_table_id: &String,
311    ) -> MetaResult<Vec<PbTable>> {
312        let inner = self.inner.read().await;
313        let table_objs = Table::find()
314            .find_also_related(Object)
315            .filter(table::Column::CdcTableId.eq(cdc_table_id))
316            .all(&inner.db)
317            .await?;
318        Ok(table_objs
319            .into_iter()
320            .map(|(table, obj)| ObjectModel(table, obj.unwrap()).into())
321            .collect())
322    }
323
324    pub async fn get_created_table_ids(&self) -> MetaResult<Vec<TableId>> {
325        let inner = self.inner.read().await;
326
327        // created table ids.
328        let mut table_ids: Vec<TableId> = StreamingJob::find()
329            .select_only()
330            .column(streaming_job::Column::JobId)
331            .filter(streaming_job::Column::JobStatus.eq(JobStatus::Created))
332            .into_tuple()
333            .all(&inner.db)
334            .await?;
335
336        // internal table ids.
337        let internal_table_ids: Vec<TableId> = Table::find()
338            .select_only()
339            .column(table::Column::TableId)
340            .filter(table::Column::BelongsToJobId.is_in(table_ids.clone()))
341            .into_tuple()
342            .all(&inner.db)
343            .await?;
344        table_ids.extend(internal_table_ids);
345
346        Ok(table_ids)
347    }
348
349    /// Returns column ids of versioned tables.
350    /// Being versioned implies using `ColumnAwareSerde`.
351    pub async fn get_versioned_table_schemas(&self) -> MetaResult<HashMap<TableId, Vec<i32>>> {
352        let res = self
353            .list_all_state_tables()
354            .await?
355            .into_iter()
356            .filter_map(|t| {
357                if t.version.is_some() {
358                    let ret = (
359                        t.id.try_into().unwrap(),
360                        t.columns
361                            .iter()
362                            .map(|c| c.column_desc.as_ref().unwrap().column_id)
363                            .collect_vec(),
364                    );
365                    return Some(ret);
366                }
367                None
368            })
369            .collect();
370        Ok(res)
371    }
372
373    pub async fn get_existing_job_resource_group(
374        &self,
375        streaming_job_id: ObjectId,
376    ) -> MetaResult<String> {
377        let inner = self.inner.read().await;
378        get_existing_job_resource_group(&inner.db, streaming_job_id).await
379    }
380
381    pub async fn get_database_resource_group(&self, database_id: ObjectId) -> MetaResult<String> {
382        let inner = self.inner.read().await;
383        get_database_resource_group(&inner.db, database_id).await
384    }
385
386    pub async fn get_existing_job_resource_groups(
387        &self,
388        streaming_job_ids: Vec<ObjectId>,
389    ) -> MetaResult<HashMap<ObjectId, String>> {
390        let inner = self.inner.read().await;
391        let mut resource_groups = HashMap::new();
392        for job_id in streaming_job_ids {
393            let resource_group = get_existing_job_resource_group(&inner.db, job_id).await?;
394            resource_groups.insert(job_id, resource_group);
395        }
396
397        Ok(resource_groups)
398    }
399
400    pub async fn get_existing_job_database_resource_group(
401        &self,
402        streaming_job_id: ObjectId,
403    ) -> MetaResult<String> {
404        let inner = self.inner.read().await;
405        let database_id: ObjectId = StreamingJob::find_by_id(streaming_job_id)
406            .select_only()
407            .join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
408            .column(object::Column::DatabaseId)
409            .into_tuple()
410            .one(&inner.db)
411            .await?
412            .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", streaming_job_id))?;
413
414        get_database_resource_group(&inner.db, database_id).await
415    }
416
417    pub async fn get_job_streaming_parallelisms(
418        &self,
419        streaming_job_id: ObjectId,
420    ) -> MetaResult<StreamingParallelism> {
421        let inner = self.inner.read().await;
422
423        let job_parallelism: StreamingParallelism = StreamingJob::find_by_id(streaming_job_id)
424            .select_only()
425            .column(streaming_job::Column::Parallelism)
426            .into_tuple()
427            .one(&inner.db)
428            .await?
429            .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", streaming_job_id))?;
430
431        Ok(job_parallelism)
432    }
433
434    pub async fn get_fragment_streaming_job_id(
435        &self,
436        fragment_id: FragmentId,
437    ) -> MetaResult<ObjectId> {
438        let inner = self.inner.read().await;
439        let job_id: ObjectId = Fragment::find_by_id(fragment_id)
440            .select_only()
441            .column(fragment::Column::JobId)
442            .into_tuple()
443            .one(&inner.db)
444            .await?
445            .ok_or_else(|| MetaError::catalog_id_not_found("fragment", fragment_id))?;
446        Ok(job_id)
447    }
448
449    // Output: Vec<(table id, db name, schema name, table name, resource group)>
450    pub async fn list_table_objects(
451        &self,
452    ) -> MetaResult<Vec<(TableId, String, String, String, String)>> {
453        let inner = self.inner.read().await;
454        Ok(Object::find()
455            .select_only()
456            .join(JoinType::InnerJoin, object::Relation::Table.def())
457            .join(JoinType::InnerJoin, object::Relation::Database2.def())
458            .join(JoinType::InnerJoin, object::Relation::Schema2.def())
459            .column(object::Column::Oid)
460            .column(database::Column::Name)
461            .column(schema::Column::Name)
462            .column(table::Column::Name)
463            .column(database::Column::ResourceGroup)
464            .into_tuple()
465            .all(&inner.db)
466            .await?)
467    }
468
469    // Output: Vec<(source id, db name, schema name, source name, resource group)>
470    pub async fn list_source_objects(
471        &self,
472    ) -> MetaResult<Vec<(TableId, String, String, String, String)>> {
473        let inner = self.inner.read().await;
474        Ok(Object::find()
475            .select_only()
476            .join(JoinType::InnerJoin, object::Relation::Source.def())
477            .join(JoinType::InnerJoin, object::Relation::Database2.def())
478            .join(JoinType::InnerJoin, object::Relation::Schema2.def())
479            .column(object::Column::Oid)
480            .column(database::Column::Name)
481            .column(schema::Column::Name)
482            .column(source::Column::Name)
483            .column(database::Column::ResourceGroup)
484            .into_tuple()
485            .all(&inner.db)
486            .await?)
487    }
488
489    // Output: Vec<(sink id, db name, schema name, sink name, resource group)>
490    pub async fn list_sink_objects(
491        &self,
492    ) -> MetaResult<Vec<(TableId, String, String, String, String)>> {
493        let inner = self.inner.read().await;
494        Ok(Object::find()
495            .select_only()
496            .join(JoinType::InnerJoin, object::Relation::Sink.def())
497            .join(JoinType::InnerJoin, object::Relation::Database2.def())
498            .join(JoinType::InnerJoin, object::Relation::Schema2.def())
499            .column(object::Column::Oid)
500            .column(database::Column::Name)
501            .column(schema::Column::Name)
502            .column(sink::Column::Name)
503            .column(database::Column::ResourceGroup)
504            .into_tuple()
505            .all(&inner.db)
506            .await?)
507    }
508}