risingwave_meta/controller/catalog/
get_op.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::catalog::ColumnCatalog;
16use risingwave_meta_model::table::RefreshState;
17
18use super::*;
19use crate::controller::utils::{
20    get_database_resource_group, get_existing_job_resource_group, get_table_columns,
21};
22
23impl CatalogController {
24    pub async fn get_secret_by_id(&self, secret_id: SecretId) -> MetaResult<PbSecret> {
25        let inner = self.inner.read().await;
26        let (secret, obj) = Secret::find_by_id(secret_id)
27            .find_also_related(Object)
28            .one(&inner.db)
29            .await?
30            .ok_or_else(|| MetaError::catalog_id_not_found("secret", secret_id))?;
31        Ok(ObjectModel(secret, obj.unwrap()).into())
32    }
33
34    pub async fn get_object_database_id(&self, object_id: ObjectId) -> MetaResult<DatabaseId> {
35        let inner = self.inner.read().await;
36        let (database_id,): (Option<DatabaseId>,) = Object::find_by_id(object_id)
37            .select_only()
38            .select_column(object::Column::DatabaseId)
39            .into_tuple()
40            .one(&inner.db)
41            .await?
42            .ok_or_else(|| MetaError::catalog_id_not_found("object", object_id))?;
43        Ok(database_id.ok_or_else(|| anyhow!("object has no database id: {object_id}"))?)
44    }
45
46    pub async fn get_connection_by_id(
47        &self,
48        connection_id: ConnectionId,
49    ) -> MetaResult<PbConnection> {
50        let inner = self.inner.read().await;
51        let (conn, obj) = Connection::find_by_id(connection_id)
52            .find_also_related(Object)
53            .one(&inner.db)
54            .await?
55            .ok_or_else(|| MetaError::catalog_id_not_found("connection", connection_id))?;
56
57        Ok(ObjectModel(conn, obj.unwrap()).into())
58    }
59
60    pub async fn get_table_by_name(
61        &self,
62        database_name: &str,
63        table_name: &str,
64    ) -> MetaResult<Option<PbTable>> {
65        let inner = self.inner.read().await;
66        let table_obj = Table::find()
67            .find_also_related(Object)
68            .join(JoinType::InnerJoin, object::Relation::Database2.def())
69            .filter(
70                table::Column::Name
71                    .eq(table_name)
72                    .and(database::Column::Name.eq(database_name)),
73            )
74            .one(&inner.db)
75            .await?;
76        Ok(table_obj.map(|(table, obj)| ObjectModel(table, obj.unwrap()).into()))
77    }
78
79    pub async fn get_table_associated_source_id(
80        &self,
81        table_id: TableId,
82    ) -> MetaResult<Option<SourceId>> {
83        let inner = self.inner.read().await;
84        Table::find_by_id(table_id)
85            .select_only()
86            .select_column(table::Column::OptionalAssociatedSourceId)
87            .into_tuple()
88            .one(&inner.db)
89            .await?
90            .ok_or_else(|| MetaError::catalog_id_not_found("table", table_id))
91    }
92
93    pub async fn get_table_by_associate_source_id(
94        &self,
95        associated_source_id: SourceId,
96    ) -> MetaResult<PbTable> {
97        let inner = self.inner.read().await;
98        Table::find()
99            .find_also_related(Object)
100            .filter(table::Column::OptionalAssociatedSourceId.eq(associated_source_id))
101            .one(&inner.db)
102            .await?
103            .map(|(table, obj)| ObjectModel(table, obj.unwrap()).into())
104            .ok_or_else(|| {
105                MetaError::catalog_id_not_found("table associated source", associated_source_id)
106            })
107    }
108
109    pub async fn get_table_by_id(&self, table_id: TableId) -> MetaResult<PbTable> {
110        let inner = self.inner.read().await;
111        let table_obj = Table::find_by_id(table_id)
112            .find_also_related(Object)
113            .one(&inner.db)
114            .await?;
115        if let Some((table, obj)) = table_obj {
116            Ok(ObjectModel(table, obj.unwrap()).into())
117        } else {
118            Err(MetaError::catalog_id_not_found("table", table_id))
119        }
120    }
121
122    pub async fn get_user_created_table_by_ids(
123        &self,
124        table_ids: Vec<TableId>,
125    ) -> MetaResult<Vec<PbTable>> {
126        let inner = self.inner.read().await;
127        let table_objs = Table::find()
128            .find_also_related(Object)
129            .filter(
130                table::Column::TableId
131                    .is_in(table_ids.clone())
132                    .and(table::Column::TableType.eq(TableType::Table)),
133            )
134            .all(&inner.db)
135            .await?;
136        let tables = table_objs
137            .into_iter()
138            .map(|(table, obj)| ObjectModel(table, obj.unwrap()).into())
139            .collect();
140        Ok(tables)
141    }
142
143    pub async fn get_table_by_ids(
144        &self,
145        table_ids: Vec<TableId>,
146        include_dropped_table: bool,
147    ) -> MetaResult<Vec<PbTable>> {
148        let inner = self.inner.read().await;
149        let table_objs = Table::find()
150            .find_also_related(Object)
151            .filter(table::Column::TableId.is_in(table_ids.clone()))
152            .all(&inner.db)
153            .await?;
154        let tables = table_objs
155            .into_iter()
156            .map(|(table, obj)| ObjectModel(table, obj.unwrap()).into());
157        let tables = if include_dropped_table {
158            tables
159                .chain(inner.dropped_tables.iter().filter_map(|(id, t)| {
160                    if table_ids.contains(id) {
161                        Some(t.clone())
162                    } else {
163                        None
164                    }
165                }))
166                .collect()
167        } else {
168            tables.collect()
169        };
170        Ok(tables)
171    }
172
173    pub async fn get_table_columns(&self, id: TableId) -> MetaResult<Vec<ColumnCatalog>> {
174        let inner = self.inner.read().await;
175        Ok(get_table_columns(&inner.db, id)
176            .await?
177            .to_protobuf()
178            .into_iter()
179            .map(|col| col.into())
180            .collect())
181    }
182
183    pub async fn get_table_incoming_sinks(&self, table_id: TableId) -> MetaResult<Vec<PbSink>> {
184        let inner = self.inner.read().await;
185        let sink_objs = Sink::find()
186            .find_also_related(Object)
187            .filter(sink::Column::TargetTable.eq(table_id))
188            .all(&inner.db)
189            .await?;
190        Ok(sink_objs
191            .into_iter()
192            .map(|(sink, obj)| ObjectModel(sink, obj.unwrap()).into())
193            .collect())
194    }
195
196    /// Get the refresh state of a table
197    pub async fn get_table_refresh_state(
198        &self,
199        table_id: TableId,
200    ) -> MetaResult<Option<RefreshState>> {
201        let inner = self.inner.read().await;
202        let (refresh_state,): (Option<RefreshState>,) = Table::find_by_id(table_id)
203            .select_only()
204            .select_column(table::Column::RefreshState)
205            .into_tuple()
206            .one(&inner.db)
207            .await?
208            .ok_or_else(|| MetaError::catalog_id_not_found("table", table_id))?;
209
210        // Default to IDLE if not set (for backward compatibility)
211        Ok(Some(refresh_state.unwrap_or(RefreshState::Idle)))
212    }
213
214    pub async fn get_sink_by_ids(&self, sink_ids: Vec<SinkId>) -> MetaResult<Vec<PbSink>> {
215        let inner = self.inner.read().await;
216        let sink_objs = Sink::find()
217            .find_also_related(Object)
218            .filter(sink::Column::SinkId.is_in(sink_ids))
219            .all(&inner.db)
220            .await?;
221        Ok(sink_objs
222            .into_iter()
223            .map(|(sink, obj)| ObjectModel(sink, obj.unwrap()).into())
224            .collect())
225    }
226
227    pub async fn get_sink_auto_refresh_schema_from(
228        &self,
229        table_id: TableId,
230    ) -> MetaResult<Vec<PbSink>> {
231        let inner = self.inner.read().await;
232        let sink_objs = Sink::find()
233            .find_also_related(Object)
234            .filter(sink::Column::AutoRefreshSchemaFromTable.eq(table_id))
235            .all(&inner.db)
236            .await?;
237        Ok(sink_objs
238            .into_iter()
239            .map(|(sink, obj)| ObjectModel(sink, obj.unwrap()).into())
240            .collect())
241    }
242
243    pub async fn get_sink_state_table_ids(&self, sink_id: SinkId) -> MetaResult<Vec<TableId>> {
244        let inner = self.inner.read().await;
245        let tables: Vec<I32Array> = Fragment::find()
246            .select_only()
247            .column(fragment::Column::StateTableIds)
248            .filter(fragment::Column::JobId.eq(sink_id))
249            .into_tuple()
250            .all(&inner.db)
251            .await?;
252        Ok(tables
253            .into_iter()
254            .flat_map(|ids| ids.into_inner().into_iter())
255            .collect())
256    }
257
258    pub async fn get_subscription_by_id(
259        &self,
260        subscription_id: SubscriptionId,
261    ) -> MetaResult<PbSubscription> {
262        let inner = self.inner.read().await;
263        let subscription_objs = Subscription::find()
264            .find_also_related(Object)
265            .filter(subscription::Column::SubscriptionId.eq(subscription_id))
266            .all(&inner.db)
267            .await?;
268        let subscription: PbSubscription = subscription_objs
269            .into_iter()
270            .map(|(subscription, obj)| ObjectModel(subscription, obj.unwrap()).into())
271            .find_or_first(|_| true)
272            .ok_or_else(|| anyhow!("cannot find subscription with id {}", subscription_id))?;
273
274        Ok(subscription)
275    }
276
277    pub async fn get_mv_depended_subscriptions(
278        &self,
279        database_id: Option<DatabaseId>,
280    ) -> MetaResult<HashMap<DatabaseId, HashMap<TableId, HashMap<SubscriptionId, u64>>>> {
281        let inner = self.inner.read().await;
282        let select = Subscription::find()
283            .select_only()
284            .select_column(subscription::Column::SubscriptionId)
285            .select_column(subscription::Column::DependentTableId)
286            .select_column(subscription::Column::RetentionSeconds)
287            .select_column(object::Column::DatabaseId)
288            .join(JoinType::InnerJoin, subscription::Relation::Object.def());
289        let select = if let Some(database_id) = database_id {
290            select.filter(object::Column::DatabaseId.eq(database_id))
291        } else {
292            select
293        };
294        let subscription_objs: Vec<(SubscriptionId, ObjectId, i64, DatabaseId)> =
295            select.into_tuple().all(&inner.db).await?;
296        let mut map: HashMap<_, HashMap<_, HashMap<_, _>>> = HashMap::new();
297        // Write object at the same time we write subscription, so we must be able to get obj
298        for (subscription_id, dependent_table_id, retention_seconds, database_id) in
299            subscription_objs
300        {
301            map.entry(database_id)
302                .or_default()
303                .entry(dependent_table_id)
304                .or_default()
305                .insert(subscription_id, retention_seconds as _);
306        }
307        Ok(map)
308    }
309
310    pub async fn get_all_table_options(&self) -> MetaResult<HashMap<TableId, TableOption>> {
311        let inner = self.inner.read().await;
312        let table_options: Vec<(TableId, Option<i32>)> = Table::find()
313            .select_only()
314            .columns([table::Column::TableId, table::Column::RetentionSeconds])
315            .into_tuple::<(TableId, Option<i32>)>()
316            .all(&inner.db)
317            .await?;
318
319        Ok(table_options
320            .into_iter()
321            .map(|(id, retention_seconds)| {
322                (
323                    id,
324                    TableOption {
325                        retention_seconds: retention_seconds.map(|i| i.try_into().unwrap()),
326                    },
327                )
328            })
329            .collect())
330    }
331
332    pub async fn get_all_streaming_parallelisms(
333        &self,
334    ) -> MetaResult<HashMap<ObjectId, StreamingParallelism>> {
335        let inner = self.inner.read().await;
336
337        let job_parallelisms = StreamingJob::find()
338            .select_only()
339            .columns([
340                streaming_job::Column::JobId,
341                streaming_job::Column::Parallelism,
342            ])
343            .into_tuple::<(ObjectId, StreamingParallelism)>()
344            .all(&inner.db)
345            .await?;
346
347        Ok(job_parallelisms
348            .into_iter()
349            .collect::<HashMap<ObjectId, StreamingParallelism>>())
350    }
351
352    pub async fn get_table_name_type_mapping(
353        &self,
354    ) -> MetaResult<HashMap<TableId, (String, String)>> {
355        let inner = self.inner.read().await;
356        let table_name_types: Vec<(TableId, String, TableType)> = Table::find()
357            .select_only()
358            .columns([
359                table::Column::TableId,
360                table::Column::Name,
361                table::Column::TableType,
362            ])
363            .into_tuple()
364            .all(&inner.db)
365            .await?;
366        Ok(table_name_types
367            .into_iter()
368            .map(|(id, name, table_type)| {
369                (
370                    id,
371                    (name, PbTableType::from(table_type).as_str_name().to_owned()),
372                )
373            })
374            .collect())
375    }
376
377    pub async fn get_table_by_cdc_table_id(
378        &self,
379        cdc_table_id: &String,
380    ) -> MetaResult<Vec<PbTable>> {
381        let inner = self.inner.read().await;
382        let table_objs = Table::find()
383            .find_also_related(Object)
384            .filter(table::Column::CdcTableId.eq(cdc_table_id))
385            .all(&inner.db)
386            .await?;
387        Ok(table_objs
388            .into_iter()
389            .map(|(table, obj)| ObjectModel(table, obj.unwrap()).into())
390            .collect())
391    }
392
393    pub async fn get_created_table_ids(&self) -> MetaResult<Vec<TableId>> {
394        let inner = self.inner.read().await;
395
396        // created table ids.
397        let mut table_ids: Vec<TableId> = StreamingJob::find()
398            .select_only()
399            .column(streaming_job::Column::JobId)
400            .filter(streaming_job::Column::JobStatus.eq(JobStatus::Created))
401            .into_tuple()
402            .all(&inner.db)
403            .await?;
404
405        // internal table ids.
406        let internal_table_ids: Vec<TableId> = Table::find()
407            .select_only()
408            .column(table::Column::TableId)
409            .filter(table::Column::BelongsToJobId.is_in(table_ids.clone()))
410            .into_tuple()
411            .all(&inner.db)
412            .await?;
413        table_ids.extend(internal_table_ids);
414
415        Ok(table_ids)
416    }
417
418    /// Returns column ids of versioned tables.
419    /// Being versioned implies using `ColumnAwareSerde`.
420    pub async fn get_versioned_table_schemas(&self) -> MetaResult<HashMap<TableId, Vec<i32>>> {
421        let res = self
422            .list_all_state_tables()
423            .await?
424            .into_iter()
425            .filter_map(|t| {
426                if t.version.is_some() {
427                    let ret = (
428                        t.id.try_into().unwrap(),
429                        t.columns
430                            .iter()
431                            .map(|c| c.column_desc.as_ref().unwrap().column_id)
432                            .collect_vec(),
433                    );
434                    return Some(ret);
435                }
436                None
437            })
438            .collect();
439        Ok(res)
440    }
441
442    pub async fn get_existing_job_resource_group(
443        &self,
444        streaming_job_id: ObjectId,
445    ) -> MetaResult<String> {
446        let inner = self.inner.read().await;
447        get_existing_job_resource_group(&inner.db, streaming_job_id).await
448    }
449
450    pub async fn get_database_resource_group(&self, database_id: ObjectId) -> MetaResult<String> {
451        let inner = self.inner.read().await;
452        get_database_resource_group(&inner.db, database_id).await
453    }
454
455    pub async fn get_existing_job_resource_groups(
456        &self,
457        streaming_job_ids: Vec<ObjectId>,
458    ) -> MetaResult<HashMap<ObjectId, String>> {
459        let inner = self.inner.read().await;
460        let mut resource_groups = HashMap::new();
461        for job_id in streaming_job_ids {
462            let resource_group = get_existing_job_resource_group(&inner.db, job_id).await?;
463            resource_groups.insert(job_id, resource_group);
464        }
465
466        Ok(resource_groups)
467    }
468
469    pub async fn get_existing_job_database_resource_group(
470        &self,
471        streaming_job_id: ObjectId,
472    ) -> MetaResult<String> {
473        let inner = self.inner.read().await;
474        let database_id: ObjectId = StreamingJob::find_by_id(streaming_job_id)
475            .select_only()
476            .join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
477            .column(object::Column::DatabaseId)
478            .into_tuple()
479            .one(&inner.db)
480            .await?
481            .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", streaming_job_id))?;
482
483        get_database_resource_group(&inner.db, database_id).await
484    }
485
486    pub async fn get_job_streaming_parallelisms(
487        &self,
488        streaming_job_id: ObjectId,
489    ) -> MetaResult<StreamingParallelism> {
490        let inner = self.inner.read().await;
491
492        let job_parallelism: StreamingParallelism = StreamingJob::find_by_id(streaming_job_id)
493            .select_only()
494            .column(streaming_job::Column::Parallelism)
495            .into_tuple()
496            .one(&inner.db)
497            .await?
498            .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", streaming_job_id))?;
499
500        Ok(job_parallelism)
501    }
502
503    pub async fn get_fragment_streaming_job_id(
504        &self,
505        fragment_id: FragmentId,
506    ) -> MetaResult<ObjectId> {
507        let inner = self.inner.read().await;
508        let job_id: ObjectId = Fragment::find_by_id(fragment_id)
509            .select_only()
510            .column(fragment::Column::JobId)
511            .into_tuple()
512            .one(&inner.db)
513            .await?
514            .ok_or_else(|| MetaError::catalog_id_not_found("fragment", fragment_id))?;
515        Ok(job_id)
516    }
517
518    // Output: Vec<(table id, db name, schema name, table name, resource group)>
519    pub async fn list_table_objects(
520        &self,
521    ) -> MetaResult<Vec<(TableId, String, String, String, String)>> {
522        let inner = self.inner.read().await;
523        Ok(Object::find()
524            .select_only()
525            .join(JoinType::InnerJoin, object::Relation::Table.def())
526            .join(JoinType::InnerJoin, object::Relation::Database2.def())
527            .join(JoinType::InnerJoin, object::Relation::Schema2.def())
528            .column(object::Column::Oid)
529            .column(database::Column::Name)
530            .column(schema::Column::Name)
531            .column(table::Column::Name)
532            .column(database::Column::ResourceGroup)
533            .into_tuple()
534            .all(&inner.db)
535            .await?)
536    }
537
538    // Output: Vec<(source id, db name, schema name, source name, resource group)>
539    pub async fn list_source_objects(
540        &self,
541    ) -> MetaResult<Vec<(TableId, String, String, String, String)>> {
542        let inner = self.inner.read().await;
543        Ok(Object::find()
544            .select_only()
545            .join(JoinType::InnerJoin, object::Relation::Source.def())
546            .join(JoinType::InnerJoin, object::Relation::Database2.def())
547            .join(JoinType::InnerJoin, object::Relation::Schema2.def())
548            .column(object::Column::Oid)
549            .column(database::Column::Name)
550            .column(schema::Column::Name)
551            .column(source::Column::Name)
552            .column(database::Column::ResourceGroup)
553            .into_tuple()
554            .all(&inner.db)
555            .await?)
556    }
557
558    // Output: Vec<(sink id, db name, schema name, sink name, resource group)>
559    pub async fn list_sink_objects(
560        &self,
561    ) -> MetaResult<Vec<(TableId, String, String, String, String)>> {
562        let inner = self.inner.read().await;
563        Ok(Object::find()
564            .select_only()
565            .join(JoinType::InnerJoin, object::Relation::Sink.def())
566            .join(JoinType::InnerJoin, object::Relation::Database2.def())
567            .join(JoinType::InnerJoin, object::Relation::Schema2.def())
568            .column(object::Column::Oid)
569            .column(database::Column::Name)
570            .column(schema::Column::Name)
571            .column(sink::Column::Name)
572            .column(database::Column::ResourceGroup)
573            .into_tuple()
574            .all(&inner.db)
575            .await?)
576    }
577
578    pub async fn get_streaming_job_status(
579        &self,
580        streaming_job_id: ObjectId,
581    ) -> MetaResult<JobStatus> {
582        let inner = self.inner.read().await;
583        let status = StreamingJob::find_by_id(streaming_job_id)
584            .select_only()
585            .column(streaming_job::Column::JobStatus)
586            .into_tuple()
587            .one(&inner.db)
588            .await?
589            .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", streaming_job_id))?;
590        Ok(status)
591    }
592}