risingwave_meta/controller/catalog/
get_op.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::catalog::ColumnCatalog;
16use risingwave_meta_model::table::RefreshState;
17
18use super::*;
19use crate::controller::utils::{
20    StreamingJobExtraInfo, get_database_resource_group, get_existing_job_resource_group,
21    get_streaming_job_extra_info as fetch_streaming_job_extra_info, get_table_columns,
22};
23
24impl CatalogController {
25    pub async fn get_secret_by_id(&self, secret_id: SecretId) -> MetaResult<PbSecret> {
26        let inner = self.inner.read().await;
27        let (secret, obj) = Secret::find_by_id(secret_id)
28            .find_also_related(Object)
29            .one(&inner.db)
30            .await?
31            .ok_or_else(|| MetaError::catalog_id_not_found("secret", secret_id))?;
32        Ok(ObjectModel(secret, obj.unwrap()).into())
33    }
34
35    pub async fn get_object_database_id(&self, object_id: ObjectId) -> MetaResult<DatabaseId> {
36        let inner = self.inner.read().await;
37        let (database_id,): (Option<DatabaseId>,) = Object::find_by_id(object_id)
38            .select_only()
39            .select_column(object::Column::DatabaseId)
40            .into_tuple()
41            .one(&inner.db)
42            .await?
43            .ok_or_else(|| MetaError::catalog_id_not_found("object", object_id))?;
44        Ok(database_id.ok_or_else(|| anyhow!("object has no database id: {object_id}"))?)
45    }
46
47    pub async fn get_connection_by_id(
48        &self,
49        connection_id: ConnectionId,
50    ) -> MetaResult<PbConnection> {
51        let inner = self.inner.read().await;
52        let (conn, obj) = Connection::find_by_id(connection_id)
53            .find_also_related(Object)
54            .one(&inner.db)
55            .await?
56            .ok_or_else(|| MetaError::catalog_id_not_found("connection", connection_id))?;
57
58        Ok(ObjectModel(conn, obj.unwrap()).into())
59    }
60
61    pub async fn get_table_catalog_by_name(
62        &self,
63        database_id: DatabaseId,
64        schema_id: SchemaId,
65        name: &str,
66    ) -> MetaResult<Option<PbTable>> {
67        let inner = self.inner.read().await;
68        let table_obj = Table::find()
69            .find_also_related(Object)
70            .filter(
71                table::Column::Name
72                    .eq(name)
73                    .and(object::Column::DatabaseId.eq(database_id))
74                    .and(object::Column::SchemaId.eq(schema_id)),
75            )
76            .one(&inner.db)
77            .await?;
78        Ok(table_obj.map(|(table, obj)| ObjectModel(table, obj.unwrap()).into()))
79    }
80
81    pub async fn get_table_by_name(
82        &self,
83        database_name: &str,
84        table_name: &str,
85    ) -> MetaResult<Option<PbTable>> {
86        let inner = self.inner.read().await;
87        let table_obj = Table::find()
88            .find_also_related(Object)
89            .join(JoinType::InnerJoin, object::Relation::Database2.def())
90            .filter(
91                table::Column::Name
92                    .eq(table_name)
93                    .and(database::Column::Name.eq(database_name)),
94            )
95            .one(&inner.db)
96            .await?;
97        Ok(table_obj.map(|(table, obj)| ObjectModel(table, obj.unwrap()).into()))
98    }
99
100    pub async fn get_table_associated_source_id(
101        &self,
102        table_id: TableId,
103    ) -> MetaResult<Option<SourceId>> {
104        let inner = self.inner.read().await;
105        Table::find_by_id(table_id)
106            .select_only()
107            .select_column(table::Column::OptionalAssociatedSourceId)
108            .into_tuple()
109            .one(&inner.db)
110            .await?
111            .ok_or_else(|| MetaError::catalog_id_not_found("table", table_id))
112    }
113
114    pub async fn get_table_by_associate_source_id(
115        &self,
116        associated_source_id: SourceId,
117    ) -> MetaResult<PbTable> {
118        let inner = self.inner.read().await;
119        Table::find()
120            .find_also_related(Object)
121            .filter(table::Column::OptionalAssociatedSourceId.eq(associated_source_id))
122            .one(&inner.db)
123            .await?
124            .map(|(table, obj)| ObjectModel(table, obj.unwrap()).into())
125            .ok_or_else(|| {
126                MetaError::catalog_id_not_found("table associated source", associated_source_id)
127            })
128    }
129
130    pub async fn get_table_by_id(&self, table_id: TableId) -> MetaResult<PbTable> {
131        let inner = self.inner.read().await;
132        let table_obj = Table::find_by_id(table_id)
133            .find_also_related(Object)
134            .one(&inner.db)
135            .await?;
136        if let Some((table, obj)) = table_obj {
137            Ok(ObjectModel(table, obj.unwrap()).into())
138        } else {
139            Err(MetaError::catalog_id_not_found("table", table_id))
140        }
141    }
142
143    pub async fn get_user_created_table_by_ids(
144        &self,
145        table_ids: Vec<TableId>,
146    ) -> MetaResult<Vec<PbTable>> {
147        let inner = self.inner.read().await;
148        let table_objs = Table::find()
149            .find_also_related(Object)
150            .filter(
151                table::Column::TableId
152                    .is_in(table_ids.clone())
153                    .and(table::Column::TableType.eq(TableType::Table)),
154            )
155            .all(&inner.db)
156            .await?;
157        let tables = table_objs
158            .into_iter()
159            .map(|(table, obj)| ObjectModel(table, obj.unwrap()).into())
160            .collect();
161        Ok(tables)
162    }
163
164    pub async fn get_table_by_ids(
165        &self,
166        table_ids: Vec<TableId>,
167        include_dropped_table: bool,
168    ) -> MetaResult<Vec<PbTable>> {
169        let inner = self.inner.read().await;
170        let table_objs = Table::find()
171            .find_also_related(Object)
172            .filter(table::Column::TableId.is_in(table_ids.clone()))
173            .all(&inner.db)
174            .await?;
175        let tables = table_objs
176            .into_iter()
177            .map(|(table, obj)| ObjectModel(table, obj.unwrap()).into());
178        let tables = if include_dropped_table {
179            tables
180                .chain(inner.dropped_tables.iter().filter_map(|(id, t)| {
181                    if table_ids.contains(id) {
182                        Some(t.clone())
183                    } else {
184                        None
185                    }
186                }))
187                .collect()
188        } else {
189            tables.collect()
190        };
191        Ok(tables)
192    }
193
194    pub async fn get_table_columns(&self, id: TableId) -> MetaResult<Vec<ColumnCatalog>> {
195        let inner = self.inner.read().await;
196        Ok(get_table_columns(&inner.db, id)
197            .await?
198            .to_protobuf()
199            .into_iter()
200            .map(|col| col.into())
201            .collect())
202    }
203
204    pub async fn get_table_incoming_sinks(&self, table_id: TableId) -> MetaResult<Vec<PbSink>> {
205        let inner = self.inner.read().await;
206        let sink_objs = Sink::find()
207            .find_also_related(Object)
208            .filter(sink::Column::TargetTable.eq(table_id))
209            .all(&inner.db)
210            .await?;
211        Ok(sink_objs
212            .into_iter()
213            .map(|(sink, obj)| ObjectModel(sink, obj.unwrap()).into())
214            .collect())
215    }
216
217    /// Get the refresh state of a table
218    pub async fn get_table_refresh_state(
219        &self,
220        table_id: TableId,
221    ) -> MetaResult<Option<RefreshState>> {
222        let inner = self.inner.read().await;
223        let (refresh_state,): (Option<RefreshState>,) = Table::find_by_id(table_id)
224            .select_only()
225            .select_column(table::Column::RefreshState)
226            .into_tuple()
227            .one(&inner.db)
228            .await?
229            .ok_or_else(|| MetaError::catalog_id_not_found("table", table_id))?;
230
231        // Default to IDLE if not set (for backward compatibility)
232        Ok(Some(refresh_state.unwrap_or(RefreshState::Idle)))
233    }
234
235    pub async fn get_sink_by_ids(&self, sink_ids: Vec<SinkId>) -> MetaResult<Vec<PbSink>> {
236        let inner = self.inner.read().await;
237        let sink_objs = Sink::find()
238            .find_also_related(Object)
239            .filter(sink::Column::SinkId.is_in(sink_ids))
240            .all(&inner.db)
241            .await?;
242        Ok(sink_objs
243            .into_iter()
244            .map(|(sink, obj)| ObjectModel(sink, obj.unwrap()).into())
245            .collect())
246    }
247
248    pub async fn get_sink_auto_refresh_schema_from(
249        &self,
250        table_id: TableId,
251    ) -> MetaResult<Vec<PbSink>> {
252        let inner = self.inner.read().await;
253        let sink_objs = Sink::find()
254            .find_also_related(Object)
255            .filter(sink::Column::AutoRefreshSchemaFromTable.eq(table_id))
256            .all(&inner.db)
257            .await?;
258        Ok(sink_objs
259            .into_iter()
260            .map(|(sink, obj)| ObjectModel(sink, obj.unwrap()).into())
261            .collect())
262    }
263
264    pub async fn get_sink_state_table_ids(&self, sink_id: SinkId) -> MetaResult<Vec<TableId>> {
265        let inner = self.inner.read().await;
266        let tables: Vec<I32Array> = Fragment::find()
267            .select_only()
268            .column(fragment::Column::StateTableIds)
269            .filter(fragment::Column::JobId.eq(sink_id))
270            .into_tuple()
271            .all(&inner.db)
272            .await?;
273        Ok(tables
274            .into_iter()
275            .flat_map(|ids| ids.into_inner().into_iter())
276            .collect())
277    }
278
279    pub async fn get_subscription_by_id(
280        &self,
281        subscription_id: SubscriptionId,
282    ) -> MetaResult<PbSubscription> {
283        let inner = self.inner.read().await;
284        let subscription_objs = Subscription::find()
285            .find_also_related(Object)
286            .filter(subscription::Column::SubscriptionId.eq(subscription_id))
287            .all(&inner.db)
288            .await?;
289        let subscription: PbSubscription = subscription_objs
290            .into_iter()
291            .map(|(subscription, obj)| ObjectModel(subscription, obj.unwrap()).into())
292            .find_or_first(|_| true)
293            .ok_or_else(|| anyhow!("cannot find subscription with id {}", subscription_id))?;
294
295        Ok(subscription)
296    }
297
298    pub async fn get_mv_depended_subscriptions(
299        &self,
300        database_id: Option<DatabaseId>,
301    ) -> MetaResult<HashMap<TableId, HashMap<SubscriptionId, u64>>> {
302        let inner = self.inner.read().await;
303        let select = Subscription::find()
304            .select_only()
305            .select_column(subscription::Column::SubscriptionId)
306            .select_column(subscription::Column::DependentTableId)
307            .select_column(subscription::Column::RetentionSeconds);
308        let select = if let Some(database_id) = database_id {
309            select
310                .join(JoinType::InnerJoin, subscription::Relation::Object.def())
311                .filter(object::Column::DatabaseId.eq(database_id))
312        } else {
313            select
314        };
315        let subscription_objs: Vec<(SubscriptionId, ObjectId, i64)> =
316            select.into_tuple().all(&inner.db).await?;
317        let mut map: HashMap<_, HashMap<_, _>> = HashMap::new();
318        // Write object at the same time we write subscription, so we must be able to get obj
319        for (subscription_id, dependent_table_id, retention_seconds) in subscription_objs {
320            map.entry(dependent_table_id)
321                .or_default()
322                .insert(subscription_id, retention_seconds as _);
323        }
324        Ok(map)
325    }
326
327    pub async fn get_all_table_options(&self) -> MetaResult<HashMap<TableId, TableOption>> {
328        let inner = self.inner.read().await;
329        let table_options: Vec<(TableId, Option<i32>)> = Table::find()
330            .select_only()
331            .columns([table::Column::TableId, table::Column::RetentionSeconds])
332            .into_tuple::<(TableId, Option<i32>)>()
333            .all(&inner.db)
334            .await?;
335
336        Ok(table_options
337            .into_iter()
338            .map(|(id, retention_seconds)| {
339                (
340                    id,
341                    TableOption {
342                        retention_seconds: retention_seconds.map(|i| i.try_into().unwrap()),
343                    },
344                )
345            })
346            .collect())
347    }
348
349    pub async fn get_all_streaming_parallelisms(
350        &self,
351    ) -> MetaResult<HashMap<ObjectId, StreamingParallelism>> {
352        let inner = self.inner.read().await;
353
354        let job_parallelisms = StreamingJob::find()
355            .select_only()
356            .columns([
357                streaming_job::Column::JobId,
358                streaming_job::Column::Parallelism,
359            ])
360            .into_tuple::<(ObjectId, StreamingParallelism)>()
361            .all(&inner.db)
362            .await?;
363
364        Ok(job_parallelisms
365            .into_iter()
366            .collect::<HashMap<ObjectId, StreamingParallelism>>())
367    }
368
369    pub async fn get_table_name_type_mapping(
370        &self,
371    ) -> MetaResult<HashMap<TableId, (String, String)>> {
372        let inner = self.inner.read().await;
373        let table_name_types: Vec<(TableId, String, TableType)> = Table::find()
374            .select_only()
375            .columns([
376                table::Column::TableId,
377                table::Column::Name,
378                table::Column::TableType,
379            ])
380            .into_tuple()
381            .all(&inner.db)
382            .await?;
383        Ok(table_name_types
384            .into_iter()
385            .map(|(id, name, table_type)| {
386                (
387                    id,
388                    (name, PbTableType::from(table_type).as_str_name().to_owned()),
389                )
390            })
391            .collect())
392    }
393
394    pub async fn get_table_by_cdc_table_id(
395        &self,
396        cdc_table_id: &String,
397    ) -> MetaResult<Vec<PbTable>> {
398        let inner = self.inner.read().await;
399        let table_objs = Table::find()
400            .find_also_related(Object)
401            .filter(table::Column::CdcTableId.eq(cdc_table_id))
402            .all(&inner.db)
403            .await?;
404        Ok(table_objs
405            .into_iter()
406            .map(|(table, obj)| ObjectModel(table, obj.unwrap()).into())
407            .collect())
408    }
409
410    pub async fn get_created_table_ids(&self) -> MetaResult<Vec<TableId>> {
411        let inner = self.inner.read().await;
412
413        // created table ids.
414        let mut table_ids: Vec<TableId> = StreamingJob::find()
415            .select_only()
416            .column(streaming_job::Column::JobId)
417            .filter(streaming_job::Column::JobStatus.eq(JobStatus::Created))
418            .into_tuple()
419            .all(&inner.db)
420            .await?;
421
422        // internal table ids.
423        let internal_table_ids: Vec<TableId> = Table::find()
424            .select_only()
425            .column(table::Column::TableId)
426            .filter(table::Column::BelongsToJobId.is_in(table_ids.clone()))
427            .into_tuple()
428            .all(&inner.db)
429            .await?;
430        table_ids.extend(internal_table_ids);
431
432        Ok(table_ids)
433    }
434
435    /// Returns column ids of versioned tables.
436    /// Being versioned implies using `ColumnAwareSerde`.
437    pub async fn get_versioned_table_schemas(&self) -> MetaResult<HashMap<TableId, Vec<i32>>> {
438        let res = self
439            .list_all_state_tables()
440            .await?
441            .into_iter()
442            .filter_map(|t| {
443                if t.version.is_some() {
444                    let ret = (
445                        t.id.try_into().unwrap(),
446                        t.columns
447                            .iter()
448                            .map(|c| c.column_desc.as_ref().unwrap().column_id)
449                            .collect_vec(),
450                    );
451                    return Some(ret);
452                }
453                None
454            })
455            .collect();
456        Ok(res)
457    }
458
459    pub async fn get_existing_job_resource_group(
460        &self,
461        streaming_job_id: ObjectId,
462    ) -> MetaResult<String> {
463        let inner = self.inner.read().await;
464        get_existing_job_resource_group(&inner.db, streaming_job_id).await
465    }
466
467    pub async fn get_database_resource_group(&self, database_id: ObjectId) -> MetaResult<String> {
468        let inner = self.inner.read().await;
469        get_database_resource_group(&inner.db, database_id).await
470    }
471
472    pub async fn get_existing_job_resource_groups(
473        &self,
474        streaming_job_ids: Vec<ObjectId>,
475    ) -> MetaResult<HashMap<ObjectId, String>> {
476        let inner = self.inner.read().await;
477        let mut resource_groups = HashMap::new();
478        for job_id in streaming_job_ids {
479            let resource_group = get_existing_job_resource_group(&inner.db, job_id).await?;
480            resource_groups.insert(job_id, resource_group);
481        }
482
483        Ok(resource_groups)
484    }
485
486    pub async fn get_existing_job_database_resource_group(
487        &self,
488        streaming_job_id: ObjectId,
489    ) -> MetaResult<String> {
490        let inner = self.inner.read().await;
491        let database_id: ObjectId = StreamingJob::find_by_id(streaming_job_id)
492            .select_only()
493            .join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
494            .column(object::Column::DatabaseId)
495            .into_tuple()
496            .one(&inner.db)
497            .await?
498            .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", streaming_job_id))?;
499
500        get_database_resource_group(&inner.db, database_id).await
501    }
502
503    pub async fn get_job_streaming_parallelisms(
504        &self,
505        streaming_job_id: ObjectId,
506    ) -> MetaResult<StreamingParallelism> {
507        let inner = self.inner.read().await;
508
509        let job_parallelism: StreamingParallelism = StreamingJob::find_by_id(streaming_job_id)
510            .select_only()
511            .column(streaming_job::Column::Parallelism)
512            .into_tuple()
513            .one(&inner.db)
514            .await?
515            .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", streaming_job_id))?;
516
517        Ok(job_parallelism)
518    }
519
520    pub async fn get_fragment_streaming_job_id(
521        &self,
522        fragment_id: FragmentId,
523    ) -> MetaResult<ObjectId> {
524        let inner = self.inner.read().await;
525        let job_id: ObjectId = Fragment::find_by_id(fragment_id)
526            .select_only()
527            .column(fragment::Column::JobId)
528            .into_tuple()
529            .one(&inner.db)
530            .await?
531            .ok_or_else(|| MetaError::catalog_id_not_found("fragment", fragment_id))?;
532        Ok(job_id)
533    }
534
535    pub async fn list_streaming_job_with_database(
536        &self,
537    ) -> MetaResult<HashMap<DatabaseId, Vec<ObjectId>>> {
538        let inner = self.inner.read().await;
539        let database_objects: Vec<(DatabaseId, ObjectId)> = StreamingJob::find()
540            .select_only()
541            .column(object::Column::DatabaseId)
542            .column(streaming_job::Column::JobId)
543            .join(JoinType::LeftJoin, streaming_job::Relation::Object.def())
544            .into_tuple()
545            .all(&inner.db)
546            .await?;
547
548        Ok(database_objects.into_iter().into_group_map())
549    }
550
551    // Output: Vec<(table id, db name, schema name, table name, resource group)>
552    pub async fn list_table_objects(
553        &self,
554    ) -> MetaResult<Vec<(TableId, String, String, String, String)>> {
555        let inner = self.inner.read().await;
556        Ok(Object::find()
557            .select_only()
558            .join(JoinType::InnerJoin, object::Relation::Table.def())
559            .join(JoinType::InnerJoin, object::Relation::Database2.def())
560            .join(JoinType::InnerJoin, object::Relation::Schema2.def())
561            .column(object::Column::Oid)
562            .column(database::Column::Name)
563            .column(schema::Column::Name)
564            .column(table::Column::Name)
565            .column(database::Column::ResourceGroup)
566            .into_tuple()
567            .all(&inner.db)
568            .await?)
569    }
570
571    // Output: Vec<(source id, db name, schema name, source name, resource group)>
572    pub async fn list_source_objects(
573        &self,
574    ) -> MetaResult<Vec<(TableId, String, String, String, String)>> {
575        let inner = self.inner.read().await;
576        Ok(Object::find()
577            .select_only()
578            .join(JoinType::InnerJoin, object::Relation::Source.def())
579            .join(JoinType::InnerJoin, object::Relation::Database2.def())
580            .join(JoinType::InnerJoin, object::Relation::Schema2.def())
581            .column(object::Column::Oid)
582            .column(database::Column::Name)
583            .column(schema::Column::Name)
584            .column(source::Column::Name)
585            .column(database::Column::ResourceGroup)
586            .into_tuple()
587            .all(&inner.db)
588            .await?)
589    }
590
591    // Output: Vec<(sink id, db name, schema name, sink name, resource group)>
592    pub async fn list_sink_objects(
593        &self,
594    ) -> MetaResult<Vec<(TableId, String, String, String, String)>> {
595        let inner = self.inner.read().await;
596        Ok(Object::find()
597            .select_only()
598            .join(JoinType::InnerJoin, object::Relation::Sink.def())
599            .join(JoinType::InnerJoin, object::Relation::Database2.def())
600            .join(JoinType::InnerJoin, object::Relation::Schema2.def())
601            .column(object::Column::Oid)
602            .column(database::Column::Name)
603            .column(schema::Column::Name)
604            .column(sink::Column::Name)
605            .column(database::Column::ResourceGroup)
606            .into_tuple()
607            .all(&inner.db)
608            .await?)
609    }
610
611    pub async fn get_streaming_job_status(
612        &self,
613        streaming_job_id: ObjectId,
614    ) -> MetaResult<JobStatus> {
615        let inner = self.inner.read().await;
616        let status = StreamingJob::find_by_id(streaming_job_id)
617            .select_only()
618            .column(streaming_job::Column::JobStatus)
619            .into_tuple()
620            .one(&inner.db)
621            .await?
622            .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", streaming_job_id))?;
623        Ok(status)
624    }
625
626    pub async fn get_streaming_job_extra_info(
627        &self,
628        job_ids: Vec<ObjectId>,
629    ) -> MetaResult<HashMap<ObjectId, StreamingJobExtraInfo>> {
630        let inner = self.inner.read().await;
631        let txn = inner.db.begin().await?;
632
633        let result = fetch_streaming_job_extra_info(&txn, job_ids).await?;
634        Ok(result)
635    }
636}