risingwave_meta/controller/catalog/
get_op.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::catalog::ColumnCatalog;
16use risingwave_common::id::JobId;
17use risingwave_meta_model::table::RefreshState;
18
19use super::*;
20use crate::controller::utils::{
21    StreamingJobExtraInfo, get_database_resource_group, get_existing_job_resource_group,
22    get_streaming_job_extra_info as fetch_streaming_job_extra_info, get_table_columns,
23};
24
25impl CatalogController {
26    pub async fn get_secret_by_id(&self, secret_id: SecretId) -> MetaResult<PbSecret> {
27        let inner = self.inner.read().await;
28        let (secret, obj) = Secret::find_by_id(secret_id)
29            .find_also_related(Object)
30            .one(&inner.db)
31            .await?
32            .ok_or_else(|| MetaError::catalog_id_not_found("secret", secret_id))?;
33        Ok(ObjectModel(secret, obj.unwrap()).into())
34    }
35
36    pub async fn get_object_database_id(&self, object_id: ObjectId) -> MetaResult<DatabaseId> {
37        let inner = self.inner.read().await;
38        let (database_id,): (Option<DatabaseId>,) = Object::find_by_id(object_id)
39            .select_only()
40            .select_column(object::Column::DatabaseId)
41            .into_tuple()
42            .one(&inner.db)
43            .await?
44            .ok_or_else(|| MetaError::catalog_id_not_found("object", object_id))?;
45        Ok(database_id.ok_or_else(|| anyhow!("object has no database id: {object_id}"))?)
46    }
47
48    pub async fn get_connection_by_id(
49        &self,
50        connection_id: ConnectionId,
51    ) -> MetaResult<PbConnection> {
52        let inner = self.inner.read().await;
53        let (conn, obj) = Connection::find_by_id(connection_id)
54            .find_also_related(Object)
55            .one(&inner.db)
56            .await?
57            .ok_or_else(|| MetaError::catalog_id_not_found("connection", connection_id))?;
58
59        Ok(ObjectModel(conn, obj.unwrap()).into())
60    }
61
62    pub async fn get_table_catalog_by_name(
63        &self,
64        database_id: DatabaseId,
65        schema_id: SchemaId,
66        name: &str,
67    ) -> MetaResult<Option<PbTable>> {
68        let inner = self.inner.read().await;
69        let table_obj = Table::find()
70            .find_also_related(Object)
71            .filter(
72                table::Column::Name
73                    .eq(name)
74                    .and(object::Column::DatabaseId.eq(database_id))
75                    .and(object::Column::SchemaId.eq(schema_id)),
76            )
77            .one(&inner.db)
78            .await?;
79        Ok(table_obj.map(|(table, obj)| ObjectModel(table, obj.unwrap()).into()))
80    }
81
82    pub async fn get_table_by_name(
83        &self,
84        database_name: &str,
85        table_name: &str,
86    ) -> MetaResult<Option<PbTable>> {
87        let inner = self.inner.read().await;
88        let table_obj = Table::find()
89            .find_also_related(Object)
90            .join(JoinType::InnerJoin, object::Relation::Database2.def())
91            .filter(
92                table::Column::Name
93                    .eq(table_name)
94                    .and(database::Column::Name.eq(database_name)),
95            )
96            .one(&inner.db)
97            .await?;
98        Ok(table_obj.map(|(table, obj)| ObjectModel(table, obj.unwrap()).into()))
99    }
100
101    pub async fn get_table_associated_source_id(
102        &self,
103        table_id: TableId,
104    ) -> MetaResult<Option<SourceId>> {
105        let inner = self.inner.read().await;
106        Table::find_by_id(table_id)
107            .select_only()
108            .select_column(table::Column::OptionalAssociatedSourceId)
109            .into_tuple()
110            .one(&inner.db)
111            .await?
112            .ok_or_else(|| MetaError::catalog_id_not_found("table", table_id))
113    }
114
115    pub async fn get_table_by_associate_source_id(
116        &self,
117        associated_source_id: SourceId,
118    ) -> MetaResult<PbTable> {
119        let inner = self.inner.read().await;
120        Table::find()
121            .find_also_related(Object)
122            .filter(table::Column::OptionalAssociatedSourceId.eq(associated_source_id))
123            .one(&inner.db)
124            .await?
125            .map(|(table, obj)| ObjectModel(table, obj.unwrap()).into())
126            .ok_or_else(|| {
127                MetaError::catalog_id_not_found("table associated source", associated_source_id)
128            })
129    }
130
131    pub async fn get_table_by_id(&self, table_id: TableId) -> MetaResult<PbTable> {
132        let inner = self.inner.read().await;
133        let table_obj = Table::find_by_id(table_id)
134            .find_also_related(Object)
135            .one(&inner.db)
136            .await?;
137        if let Some((table, obj)) = table_obj {
138            Ok(ObjectModel(table, obj.unwrap()).into())
139        } else {
140            Err(MetaError::catalog_id_not_found("table", table_id))
141        }
142    }
143
144    pub async fn get_user_created_table_by_ids(
145        &self,
146        job_ids: impl Iterator<Item = JobId>,
147    ) -> MetaResult<Vec<PbTable>> {
148        let inner = self.inner.read().await;
149        let table_objs = Table::find()
150            .find_also_related(Object)
151            .filter(
152                table::Column::TableId
153                    .is_in(job_ids.map(|job_id| job_id.as_mv_table_id()).collect_vec())
154                    .and(table::Column::TableType.eq(TableType::Table)),
155            )
156            .all(&inner.db)
157            .await?;
158        let tables = table_objs
159            .into_iter()
160            .map(|(table, obj)| ObjectModel(table, obj.unwrap()).into())
161            .collect();
162        Ok(tables)
163    }
164
165    pub async fn get_table_by_ids(
166        &self,
167        table_ids: Vec<TableId>,
168        include_dropped_table: bool,
169    ) -> MetaResult<Vec<PbTable>> {
170        let inner = self.inner.read().await;
171        let table_objs = Table::find()
172            .find_also_related(Object)
173            .filter(table::Column::TableId.is_in(table_ids.clone()))
174            .all(&inner.db)
175            .await?;
176        let tables = table_objs
177            .into_iter()
178            .map(|(table, obj)| ObjectModel(table, obj.unwrap()).into());
179        let tables = if include_dropped_table {
180            tables
181                .chain(inner.dropped_tables.iter().filter_map(|(id, t)| {
182                    if table_ids.contains(id) {
183                        Some(t.clone())
184                    } else {
185                        None
186                    }
187                }))
188                .collect()
189        } else {
190            tables.collect()
191        };
192        Ok(tables)
193    }
194
195    pub async fn get_table_columns(&self, id: TableId) -> MetaResult<Vec<ColumnCatalog>> {
196        let inner = self.inner.read().await;
197        Ok(get_table_columns(&inner.db, id)
198            .await?
199            .to_protobuf()
200            .into_iter()
201            .map(|col| col.into())
202            .collect())
203    }
204
205    pub async fn get_table_incoming_sinks(&self, table_id: TableId) -> MetaResult<Vec<PbSink>> {
206        let inner = self.inner.read().await;
207        let sink_objs = Sink::find()
208            .find_also_related(Object)
209            .filter(sink::Column::TargetTable.eq(table_id))
210            .all(&inner.db)
211            .await?;
212        Ok(sink_objs
213            .into_iter()
214            .map(|(sink, obj)| ObjectModel(sink, obj.unwrap()).into())
215            .collect())
216    }
217
218    /// Get the refresh state of a table
219    pub async fn get_table_refresh_state(
220        &self,
221        table_id: TableId,
222    ) -> MetaResult<Option<RefreshState>> {
223        let inner = self.inner.read().await;
224        let (refresh_state,): (Option<RefreshState>,) = Table::find_by_id(table_id)
225            .select_only()
226            .select_column(table::Column::RefreshState)
227            .into_tuple()
228            .one(&inner.db)
229            .await?
230            .ok_or_else(|| MetaError::catalog_id_not_found("table", table_id))?;
231
232        // Default to IDLE if not set (for backward compatibility)
233        Ok(Some(refresh_state.unwrap_or(RefreshState::Idle)))
234    }
235
236    pub async fn get_sink_by_ids(&self, sink_ids: Vec<SinkId>) -> MetaResult<Vec<PbSink>> {
237        let inner = self.inner.read().await;
238        let sink_objs = Sink::find()
239            .find_also_related(Object)
240            .filter(sink::Column::SinkId.is_in(sink_ids))
241            .all(&inner.db)
242            .await?;
243        Ok(sink_objs
244            .into_iter()
245            .map(|(sink, obj)| ObjectModel(sink, obj.unwrap()).into())
246            .collect())
247    }
248
249    pub async fn get_sink_auto_refresh_schema_from(
250        &self,
251        table_id: TableId,
252    ) -> MetaResult<Vec<PbSink>> {
253        let inner = self.inner.read().await;
254        let sink_objs = Sink::find()
255            .find_also_related(Object)
256            .filter(sink::Column::AutoRefreshSchemaFromTable.eq(table_id))
257            .all(&inner.db)
258            .await?;
259        Ok(sink_objs
260            .into_iter()
261            .map(|(sink, obj)| ObjectModel(sink, obj.unwrap()).into())
262            .collect())
263    }
264
265    pub async fn get_sink_state_table_ids(&self, sink_id: SinkId) -> MetaResult<Vec<TableId>> {
266        let inner = self.inner.read().await;
267        let tables: Vec<I32Array> = Fragment::find()
268            .select_only()
269            .column(fragment::Column::StateTableIds)
270            .filter(fragment::Column::JobId.eq(sink_id))
271            .into_tuple()
272            .all(&inner.db)
273            .await?;
274        Ok(tables
275            .into_iter()
276            .flat_map(|ids| {
277                ids.into_inner()
278                    .into_iter()
279                    .map(|table_id| TableId::new(table_id as _))
280            })
281            .collect())
282    }
283
284    pub async fn get_subscription_by_id(
285        &self,
286        subscription_id: SubscriptionId,
287    ) -> MetaResult<PbSubscription> {
288        let inner = self.inner.read().await;
289        let subscription_objs = Subscription::find()
290            .find_also_related(Object)
291            .filter(subscription::Column::SubscriptionId.eq(subscription_id))
292            .all(&inner.db)
293            .await?;
294        let subscription: PbSubscription = subscription_objs
295            .into_iter()
296            .map(|(subscription, obj)| ObjectModel(subscription, obj.unwrap()).into())
297            .find_or_first(|_| true)
298            .ok_or_else(|| anyhow!("cannot find subscription with id {}", subscription_id))?;
299
300        Ok(subscription)
301    }
302
303    pub async fn get_mv_depended_subscriptions(
304        &self,
305        database_id: Option<DatabaseId>,
306    ) -> MetaResult<HashMap<TableId, HashMap<SubscriptionId, u64>>> {
307        let inner = self.inner.read().await;
308        let select = Subscription::find()
309            .select_only()
310            .select_column(subscription::Column::SubscriptionId)
311            .select_column(subscription::Column::DependentTableId)
312            .select_column(subscription::Column::RetentionSeconds);
313        let select = if let Some(database_id) = database_id {
314            select
315                .join(JoinType::InnerJoin, subscription::Relation::Object.def())
316                .filter(object::Column::DatabaseId.eq(database_id))
317        } else {
318            select
319        };
320        let subscription_objs: Vec<(SubscriptionId, TableId, i64)> =
321            select.into_tuple().all(&inner.db).await?;
322        let mut map: HashMap<_, HashMap<_, _>> = HashMap::new();
323        // Write object at the same time we write subscription, so we must be able to get obj
324        for (subscription_id, dependent_table_id, retention_seconds) in subscription_objs {
325            map.entry(dependent_table_id)
326                .or_default()
327                .insert(subscription_id, retention_seconds as _);
328        }
329        Ok(map)
330    }
331
332    pub async fn get_all_table_options(&self) -> MetaResult<HashMap<TableId, TableOption>> {
333        let inner = self.inner.read().await;
334        let table_options: Vec<(TableId, Option<i32>)> = Table::find()
335            .select_only()
336            .columns([table::Column::TableId, table::Column::RetentionSeconds])
337            .into_tuple::<(TableId, Option<i32>)>()
338            .all(&inner.db)
339            .await?;
340
341        Ok(table_options
342            .into_iter()
343            .map(|(id, retention_seconds)| {
344                (
345                    id,
346                    TableOption {
347                        retention_seconds: retention_seconds.map(|i| i.try_into().unwrap()),
348                    },
349                )
350            })
351            .collect())
352    }
353
354    pub async fn get_all_streaming_parallelisms(
355        &self,
356    ) -> MetaResult<HashMap<ObjectId, StreamingParallelism>> {
357        let inner = self.inner.read().await;
358
359        let job_parallelisms = StreamingJob::find()
360            .select_only()
361            .columns([
362                streaming_job::Column::JobId,
363                streaming_job::Column::Parallelism,
364            ])
365            .into_tuple::<(ObjectId, StreamingParallelism)>()
366            .all(&inner.db)
367            .await?;
368
369        Ok(job_parallelisms
370            .into_iter()
371            .collect::<HashMap<ObjectId, StreamingParallelism>>())
372    }
373
374    pub async fn get_table_name_type_mapping(
375        &self,
376    ) -> MetaResult<HashMap<TableId, (String, String)>> {
377        let inner = self.inner.read().await;
378        let table_name_types: Vec<(TableId, String, TableType)> = Table::find()
379            .select_only()
380            .columns([
381                table::Column::TableId,
382                table::Column::Name,
383                table::Column::TableType,
384            ])
385            .into_tuple()
386            .all(&inner.db)
387            .await?;
388        Ok(table_name_types
389            .into_iter()
390            .map(|(id, name, table_type)| {
391                (
392                    id,
393                    (name, PbTableType::from(table_type).as_str_name().to_owned()),
394                )
395            })
396            .collect())
397    }
398
399    pub async fn get_table_by_cdc_table_id(
400        &self,
401        cdc_table_id: &String,
402    ) -> MetaResult<Vec<PbTable>> {
403        let inner = self.inner.read().await;
404        let table_objs = Table::find()
405            .find_also_related(Object)
406            .filter(table::Column::CdcTableId.eq(cdc_table_id))
407            .all(&inner.db)
408            .await?;
409        Ok(table_objs
410            .into_iter()
411            .map(|(table, obj)| ObjectModel(table, obj.unwrap()).into())
412            .collect())
413    }
414
415    pub async fn get_created_table_ids(&self) -> MetaResult<Vec<TableId>> {
416        let inner = self.inner.read().await;
417
418        // created table ids.
419        let mut table_ids: Vec<TableId> = StreamingJob::find()
420            .select_only()
421            .column(streaming_job::Column::JobId)
422            .filter(streaming_job::Column::JobStatus.eq(JobStatus::Created))
423            .into_tuple()
424            .all(&inner.db)
425            .await?;
426
427        // internal table ids.
428        let internal_table_ids: Vec<TableId> = Table::find()
429            .select_only()
430            .column(table::Column::TableId)
431            .filter(table::Column::BelongsToJobId.is_in(table_ids.clone()))
432            .into_tuple()
433            .all(&inner.db)
434            .await?;
435        table_ids.extend(internal_table_ids);
436
437        Ok(table_ids)
438    }
439
440    /// Returns column ids of versioned tables.
441    /// Being versioned implies using `ColumnAwareSerde`.
442    pub async fn get_versioned_table_schemas(
443        &self,
444    ) -> MetaResult<HashMap<risingwave_common::catalog::TableId, Vec<i32>>> {
445        let res = self
446            .list_all_state_tables()
447            .await?
448            .into_iter()
449            .filter_map(|t| {
450                if t.version.is_some() {
451                    let ret = (
452                        t.id.into(),
453                        t.columns
454                            .iter()
455                            .map(|c| c.column_desc.as_ref().unwrap().column_id)
456                            .collect_vec(),
457                    );
458                    return Some(ret);
459                }
460                None
461            })
462            .collect();
463        Ok(res)
464    }
465
466    pub async fn get_existing_job_resource_group(
467        &self,
468        streaming_job_id: JobId,
469    ) -> MetaResult<String> {
470        let inner = self.inner.read().await;
471        get_existing_job_resource_group(&inner.db, streaming_job_id).await
472    }
473
474    pub async fn get_database_resource_group(&self, database_id: ObjectId) -> MetaResult<String> {
475        let inner = self.inner.read().await;
476        get_database_resource_group(&inner.db, database_id).await
477    }
478
479    pub async fn get_existing_job_resource_groups(
480        &self,
481        streaming_job_ids: Vec<JobId>,
482    ) -> MetaResult<HashMap<JobId, String>> {
483        let inner = self.inner.read().await;
484        let mut resource_groups = HashMap::new();
485        for job_id in streaming_job_ids {
486            let resource_group = get_existing_job_resource_group(&inner.db, job_id).await?;
487            resource_groups.insert(job_id, resource_group);
488        }
489
490        Ok(resource_groups)
491    }
492
493    pub async fn get_existing_job_database_resource_group(
494        &self,
495        streaming_job_id: JobId,
496    ) -> MetaResult<String> {
497        let inner = self.inner.read().await;
498        let database_id: ObjectId = StreamingJob::find_by_id(streaming_job_id)
499            .select_only()
500            .join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
501            .column(object::Column::DatabaseId)
502            .into_tuple()
503            .one(&inner.db)
504            .await?
505            .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", streaming_job_id))?;
506
507        get_database_resource_group(&inner.db, database_id).await
508    }
509
510    pub async fn get_job_streaming_parallelisms(
511        &self,
512        streaming_job_id: JobId,
513    ) -> MetaResult<StreamingParallelism> {
514        let inner = self.inner.read().await;
515
516        let job_parallelism: StreamingParallelism = StreamingJob::find_by_id(streaming_job_id)
517            .select_only()
518            .column(streaming_job::Column::Parallelism)
519            .into_tuple()
520            .one(&inner.db)
521            .await?
522            .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", streaming_job_id))?;
523
524        Ok(job_parallelism)
525    }
526
527    pub async fn get_fragment_streaming_job_id(
528        &self,
529        fragment_id: FragmentId,
530    ) -> MetaResult<ObjectId> {
531        let inner = self.inner.read().await;
532        let job_id: ObjectId = Fragment::find_by_id(fragment_id)
533            .select_only()
534            .column(fragment::Column::JobId)
535            .into_tuple()
536            .one(&inner.db)
537            .await?
538            .ok_or_else(|| MetaError::catalog_id_not_found("fragment", fragment_id))?;
539        Ok(job_id)
540    }
541
542    pub async fn list_streaming_job_with_database(
543        &self,
544    ) -> MetaResult<HashMap<DatabaseId, Vec<JobId>>> {
545        let inner = self.inner.read().await;
546        let database_objects: Vec<(DatabaseId, JobId)> = StreamingJob::find()
547            .select_only()
548            .column(object::Column::DatabaseId)
549            .column(streaming_job::Column::JobId)
550            .join(JoinType::LeftJoin, streaming_job::Relation::Object.def())
551            .into_tuple()
552            .all(&inner.db)
553            .await?;
554
555        Ok(database_objects.into_iter().into_group_map())
556    }
557
558    // Output: Vec<(table id, db name, schema name, table name, resource group)>
559    pub async fn list_table_objects(
560        &self,
561    ) -> MetaResult<Vec<(TableId, String, String, String, String)>> {
562        let inner = self.inner.read().await;
563        Ok(Object::find()
564            .select_only()
565            .join(JoinType::InnerJoin, object::Relation::Table.def())
566            .join(JoinType::InnerJoin, object::Relation::Database2.def())
567            .join(JoinType::InnerJoin, object::Relation::Schema2.def())
568            .column(object::Column::Oid)
569            .column(database::Column::Name)
570            .column(schema::Column::Name)
571            .column(table::Column::Name)
572            .column(database::Column::ResourceGroup)
573            .into_tuple()
574            .all(&inner.db)
575            .await?)
576    }
577
578    // Output: Vec<(source id, db name, schema name, source name, resource group)>
579    pub async fn list_source_objects(
580        &self,
581    ) -> MetaResult<Vec<(TableId, String, String, String, String)>> {
582        let inner = self.inner.read().await;
583        Ok(Object::find()
584            .select_only()
585            .join(JoinType::InnerJoin, object::Relation::Source.def())
586            .join(JoinType::InnerJoin, object::Relation::Database2.def())
587            .join(JoinType::InnerJoin, object::Relation::Schema2.def())
588            .column(object::Column::Oid)
589            .column(database::Column::Name)
590            .column(schema::Column::Name)
591            .column(source::Column::Name)
592            .column(database::Column::ResourceGroup)
593            .into_tuple()
594            .all(&inner.db)
595            .await?)
596    }
597
598    // Output: Vec<(sink id, db name, schema name, sink name, resource group)>
599    pub async fn list_sink_objects(
600        &self,
601    ) -> MetaResult<Vec<(TableId, String, String, String, String)>> {
602        let inner = self.inner.read().await;
603        Ok(Object::find()
604            .select_only()
605            .join(JoinType::InnerJoin, object::Relation::Sink.def())
606            .join(JoinType::InnerJoin, object::Relation::Database2.def())
607            .join(JoinType::InnerJoin, object::Relation::Schema2.def())
608            .column(object::Column::Oid)
609            .column(database::Column::Name)
610            .column(schema::Column::Name)
611            .column(sink::Column::Name)
612            .column(database::Column::ResourceGroup)
613            .into_tuple()
614            .all(&inner.db)
615            .await?)
616    }
617
618    pub async fn get_streaming_job_status(&self, streaming_job_id: JobId) -> MetaResult<JobStatus> {
619        let inner = self.inner.read().await;
620        let status = StreamingJob::find_by_id(streaming_job_id)
621            .select_only()
622            .column(streaming_job::Column::JobStatus)
623            .into_tuple()
624            .one(&inner.db)
625            .await?
626            .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", streaming_job_id))?;
627        Ok(status)
628    }
629
630    pub async fn get_streaming_job_extra_info(
631        &self,
632        job_ids: Vec<JobId>,
633    ) -> MetaResult<HashMap<JobId, StreamingJobExtraInfo>> {
634        let inner = self.inner.read().await;
635        let txn = inner.db.begin().await?;
636
637        let result = fetch_streaming_job_extra_info(&txn, job_ids).await?;
638        Ok(result)
639    }
640}