risingwave_meta/controller/catalog/
get_op.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::catalog::ColumnCatalog;
16use risingwave_common::id::JobId;
17use risingwave_meta_model::table::RefreshState;
18
19use super::*;
20use crate::controller::utils::{
21    StreamingJobExtraInfo, get_database_resource_group, get_existing_job_resource_group,
22    get_streaming_job_extra_info as fetch_streaming_job_extra_info, get_table_columns,
23};
24
25impl CatalogController {
26    pub async fn get_secret_by_id(&self, secret_id: SecretId) -> MetaResult<PbSecret> {
27        let inner = self.inner.read().await;
28        let (secret, obj) = Secret::find_by_id(secret_id)
29            .find_also_related(Object)
30            .one(&inner.db)
31            .await?
32            .ok_or_else(|| MetaError::catalog_id_not_found("secret", secret_id))?;
33        Ok(ObjectModel(secret, obj.unwrap()).into())
34    }
35
36    pub async fn get_object_database_id(
37        &self,
38        object_id: impl Into<ObjectId>,
39    ) -> MetaResult<DatabaseId> {
40        let object_id = object_id.into();
41        let inner = self.inner.read().await;
42        let (database_id,): (Option<DatabaseId>,) = Object::find_by_id(object_id)
43            .select_only()
44            .select_column(object::Column::DatabaseId)
45            .into_tuple()
46            .one(&inner.db)
47            .await?
48            .ok_or_else(|| MetaError::catalog_id_not_found("object", object_id))?;
49        Ok(database_id.ok_or_else(|| anyhow!("object has no database id: {object_id}"))?)
50    }
51
52    pub async fn get_connection_by_id(
53        &self,
54        connection_id: ConnectionId,
55    ) -> MetaResult<PbConnection> {
56        let inner = self.inner.read().await;
57        let (conn, obj) = Connection::find_by_id(connection_id)
58            .find_also_related(Object)
59            .one(&inner.db)
60            .await?
61            .ok_or_else(|| MetaError::catalog_id_not_found("connection", connection_id))?;
62
63        Ok(ObjectModel(conn, obj.unwrap()).into())
64    }
65
66    pub async fn get_table_catalog_by_name(
67        &self,
68        database_id: DatabaseId,
69        schema_id: SchemaId,
70        name: &str,
71    ) -> MetaResult<Option<PbTable>> {
72        let inner = self.inner.read().await;
73        let table_obj = Table::find()
74            .find_also_related(Object)
75            .filter(
76                table::Column::Name
77                    .eq(name)
78                    .and(object::Column::DatabaseId.eq(database_id))
79                    .and(object::Column::SchemaId.eq(schema_id)),
80            )
81            .one(&inner.db)
82            .await?;
83        Ok(table_obj.map(|(table, obj)| ObjectModel(table, obj.unwrap()).into()))
84    }
85
86    pub async fn get_table_by_name(
87        &self,
88        database_name: &str,
89        table_name: &str,
90    ) -> MetaResult<Option<PbTable>> {
91        let inner = self.inner.read().await;
92        let table_obj = Table::find()
93            .find_also_related(Object)
94            .join(JoinType::InnerJoin, object::Relation::Database2.def())
95            .filter(
96                table::Column::Name
97                    .eq(table_name)
98                    .and(database::Column::Name.eq(database_name)),
99            )
100            .one(&inner.db)
101            .await?;
102        Ok(table_obj.map(|(table, obj)| ObjectModel(table, obj.unwrap()).into()))
103    }
104
105    pub async fn get_table_associated_source_id(
106        &self,
107        table_id: TableId,
108    ) -> MetaResult<Option<SourceId>> {
109        let inner = self.inner.read().await;
110        Table::find_by_id(table_id)
111            .select_only()
112            .select_column(table::Column::OptionalAssociatedSourceId)
113            .into_tuple()
114            .one(&inner.db)
115            .await?
116            .ok_or_else(|| MetaError::catalog_id_not_found("table", table_id))
117    }
118
119    pub async fn get_table_by_associate_source_id(
120        &self,
121        associated_source_id: SourceId,
122    ) -> MetaResult<PbTable> {
123        let inner = self.inner.read().await;
124        Table::find()
125            .find_also_related(Object)
126            .filter(table::Column::OptionalAssociatedSourceId.eq(associated_source_id))
127            .one(&inner.db)
128            .await?
129            .map(|(table, obj)| ObjectModel(table, obj.unwrap()).into())
130            .ok_or_else(|| {
131                MetaError::catalog_id_not_found("table associated source", associated_source_id)
132            })
133    }
134
135    pub async fn get_table_by_id(&self, table_id: TableId) -> MetaResult<PbTable> {
136        let inner = self.inner.read().await;
137        let table_obj = Table::find_by_id(table_id)
138            .find_also_related(Object)
139            .one(&inner.db)
140            .await?;
141        if let Some((table, obj)) = table_obj {
142            Ok(ObjectModel(table, obj.unwrap()).into())
143        } else {
144            Err(MetaError::catalog_id_not_found("table", table_id))
145        }
146    }
147
148    pub async fn get_user_created_table_by_ids(
149        &self,
150        job_ids: impl Iterator<Item = JobId>,
151    ) -> MetaResult<Vec<PbTable>> {
152        let inner = self.inner.read().await;
153        let table_objs = Table::find()
154            .find_also_related(Object)
155            .filter(
156                table::Column::TableId
157                    .is_in(job_ids.map(|job_id| job_id.as_mv_table_id()).collect_vec())
158                    .and(table::Column::TableType.eq(TableType::Table)),
159            )
160            .all(&inner.db)
161            .await?;
162        let tables = table_objs
163            .into_iter()
164            .map(|(table, obj)| ObjectModel(table, obj.unwrap()).into())
165            .collect();
166        Ok(tables)
167    }
168
169    pub async fn get_table_by_ids(
170        &self,
171        table_ids: Vec<TableId>,
172        include_dropped_table: bool,
173    ) -> MetaResult<Vec<PbTable>> {
174        let inner = self.inner.read().await;
175        let table_objs = Table::find()
176            .find_also_related(Object)
177            .filter(table::Column::TableId.is_in(table_ids.clone()))
178            .all(&inner.db)
179            .await?;
180        let tables = table_objs
181            .into_iter()
182            .map(|(table, obj)| ObjectModel(table, obj.unwrap()).into());
183        let tables = if include_dropped_table {
184            tables
185                .chain(inner.dropped_tables.iter().filter_map(|(id, t)| {
186                    if table_ids.contains(id) {
187                        Some(t.clone())
188                    } else {
189                        None
190                    }
191                }))
192                .collect()
193        } else {
194            tables.collect()
195        };
196        Ok(tables)
197    }
198
199    pub async fn get_table_columns(&self, id: TableId) -> MetaResult<Vec<ColumnCatalog>> {
200        let inner = self.inner.read().await;
201        Ok(get_table_columns(&inner.db, id)
202            .await?
203            .to_protobuf()
204            .into_iter()
205            .map(|col| col.into())
206            .collect())
207    }
208
209    pub async fn get_table_incoming_sinks(&self, table_id: TableId) -> MetaResult<Vec<PbSink>> {
210        let inner = self.inner.read().await;
211        let sink_objs = Sink::find()
212            .find_also_related(Object)
213            .filter(sink::Column::TargetTable.eq(table_id))
214            .all(&inner.db)
215            .await?;
216        Ok(sink_objs
217            .into_iter()
218            .map(|(sink, obj)| ObjectModel(sink, obj.unwrap()).into())
219            .collect())
220    }
221
222    /// Get the refresh state of a table
223    pub async fn get_table_refresh_state(
224        &self,
225        table_id: TableId,
226    ) -> MetaResult<Option<RefreshState>> {
227        let inner = self.inner.read().await;
228        let (refresh_state,): (Option<RefreshState>,) = Table::find_by_id(table_id)
229            .select_only()
230            .select_column(table::Column::RefreshState)
231            .into_tuple()
232            .one(&inner.db)
233            .await?
234            .ok_or_else(|| MetaError::catalog_id_not_found("table", table_id))?;
235
236        // Default to IDLE if not set (for backward compatibility)
237        Ok(Some(refresh_state.unwrap_or(RefreshState::Idle)))
238    }
239
240    pub async fn get_sink_by_id(&self, sink_id: SinkId) -> MetaResult<Option<PbSink>> {
241        let inner = self.inner.read().await;
242        let sink_objs = Sink::find_by_id(sink_id)
243            .find_also_related(Object)
244            .one(&inner.db)
245            .await?;
246        Ok(sink_objs.map(|(sink, obj)| ObjectModel(sink, obj.unwrap()).into()))
247    }
248
249    pub async fn get_sink_auto_refresh_schema_from(
250        &self,
251        table_id: TableId,
252    ) -> MetaResult<Vec<PbSink>> {
253        let inner = self.inner.read().await;
254        let sink_objs = Sink::find()
255            .find_also_related(Object)
256            .filter(sink::Column::AutoRefreshSchemaFromTable.eq(table_id))
257            .all(&inner.db)
258            .await?;
259        Ok(sink_objs
260            .into_iter()
261            .map(|(sink, obj)| ObjectModel(sink, obj.unwrap()).into())
262            .collect())
263    }
264
265    pub async fn get_sink_state_table_ids(&self, sink_id: SinkId) -> MetaResult<Vec<TableId>> {
266        let inner = self.inner.read().await;
267        let tables: Vec<I32Array> = Fragment::find()
268            .select_only()
269            .column(fragment::Column::StateTableIds)
270            .filter(fragment::Column::JobId.eq(sink_id))
271            .into_tuple()
272            .all(&inner.db)
273            .await?;
274        Ok(tables
275            .into_iter()
276            .flat_map(|ids| {
277                ids.into_inner()
278                    .into_iter()
279                    .map(|table_id| TableId::new(table_id as _))
280            })
281            .collect())
282    }
283
284    pub async fn get_subscription_by_id(
285        &self,
286        subscription_id: SubscriptionId,
287    ) -> MetaResult<PbSubscription> {
288        let inner = self.inner.read().await;
289        let subscription_objs = Subscription::find()
290            .find_also_related(Object)
291            .filter(subscription::Column::SubscriptionId.eq(subscription_id))
292            .all(&inner.db)
293            .await?;
294        let subscription: PbSubscription = subscription_objs
295            .into_iter()
296            .map(|(subscription, obj)| ObjectModel(subscription, obj.unwrap()).into())
297            .find_or_first(|_| true)
298            .ok_or_else(|| anyhow!("cannot find subscription with id {}", subscription_id))?;
299
300        Ok(subscription)
301    }
302
303    pub async fn get_mv_depended_subscriptions(
304        &self,
305        database_id: Option<DatabaseId>,
306    ) -> MetaResult<HashMap<TableId, HashMap<SubscriptionId, u64>>> {
307        let inner = self.inner.read().await;
308        let select = Subscription::find()
309            .select_only()
310            .select_column(subscription::Column::SubscriptionId)
311            .select_column(subscription::Column::DependentTableId)
312            .select_column(subscription::Column::RetentionSeconds);
313        let select = if let Some(database_id) = database_id {
314            select
315                .join(JoinType::InnerJoin, subscription::Relation::Object.def())
316                .filter(object::Column::DatabaseId.eq(database_id))
317        } else {
318            select
319        };
320        let subscription_objs: Vec<(SubscriptionId, TableId, i64)> =
321            select.into_tuple().all(&inner.db).await?;
322        let mut map: HashMap<_, HashMap<_, _>> = HashMap::new();
323        // Write object at the same time we write subscription, so we must be able to get obj
324        for (subscription_id, dependent_table_id, retention_seconds) in subscription_objs {
325            map.entry(dependent_table_id)
326                .or_default()
327                .insert(subscription_id, retention_seconds as _);
328        }
329        Ok(map)
330    }
331
332    pub async fn get_all_table_options(&self) -> MetaResult<HashMap<TableId, TableOption>> {
333        let inner = self.inner.read().await;
334        let table_options: Vec<(TableId, Option<i32>)> = Table::find()
335            .select_only()
336            .columns([table::Column::TableId, table::Column::RetentionSeconds])
337            .into_tuple::<(TableId, Option<i32>)>()
338            .all(&inner.db)
339            .await?;
340
341        Ok(table_options
342            .into_iter()
343            .map(|(id, retention_seconds)| {
344                (
345                    id,
346                    TableOption {
347                        retention_seconds: retention_seconds.map(|i| i.try_into().unwrap()),
348                    },
349                )
350            })
351            .collect())
352    }
353
354    pub async fn get_all_streaming_parallelisms(
355        &self,
356    ) -> MetaResult<HashMap<ObjectId, StreamingParallelism>> {
357        let inner = self.inner.read().await;
358
359        let job_parallelisms = StreamingJob::find()
360            .select_only()
361            .columns([
362                streaming_job::Column::JobId,
363                streaming_job::Column::Parallelism,
364            ])
365            .into_tuple::<(ObjectId, StreamingParallelism)>()
366            .all(&inner.db)
367            .await?;
368
369        Ok(job_parallelisms
370            .into_iter()
371            .collect::<HashMap<ObjectId, StreamingParallelism>>())
372    }
373
374    pub async fn get_table_name_type_mapping(
375        &self,
376    ) -> MetaResult<HashMap<TableId, (String, String)>> {
377        let inner = self.inner.read().await;
378        let table_name_types: Vec<(TableId, String, TableType)> = Table::find()
379            .select_only()
380            .columns([
381                table::Column::TableId,
382                table::Column::Name,
383                table::Column::TableType,
384            ])
385            .into_tuple()
386            .all(&inner.db)
387            .await?;
388        Ok(table_name_types
389            .into_iter()
390            .map(|(id, name, table_type)| {
391                (
392                    id,
393                    (name, PbTableType::from(table_type).as_str_name().to_owned()),
394                )
395            })
396            .collect())
397    }
398
399    pub async fn get_table_by_cdc_table_id(
400        &self,
401        cdc_table_id: &String,
402    ) -> MetaResult<Vec<PbTable>> {
403        let inner = self.inner.read().await;
404        let table_objs = Table::find()
405            .find_also_related(Object)
406            .filter(table::Column::CdcTableId.eq(cdc_table_id))
407            .all(&inner.db)
408            .await?;
409        Ok(table_objs
410            .into_iter()
411            .map(|(table, obj)| ObjectModel(table, obj.unwrap()).into())
412            .collect())
413    }
414
415    pub async fn get_created_table_ids(&self) -> MetaResult<Vec<TableId>> {
416        let inner = self.inner.read().await;
417
418        // created table ids.
419        let mut table_ids: Vec<TableId> = StreamingJob::find()
420            .select_only()
421            .column(streaming_job::Column::JobId)
422            .filter(streaming_job::Column::JobStatus.eq(JobStatus::Created))
423            .into_tuple()
424            .all(&inner.db)
425            .await?;
426
427        // internal table ids.
428        let internal_table_ids: Vec<TableId> = Table::find()
429            .select_only()
430            .column(table::Column::TableId)
431            .filter(table::Column::BelongsToJobId.is_in(table_ids.clone()))
432            .into_tuple()
433            .all(&inner.db)
434            .await?;
435        table_ids.extend(internal_table_ids);
436
437        Ok(table_ids)
438    }
439
440    /// Returns column ids of versioned tables.
441    /// Being versioned implies using `ColumnAwareSerde`.
442    pub async fn get_versioned_table_schemas(
443        &self,
444    ) -> MetaResult<HashMap<risingwave_common::catalog::TableId, Vec<i32>>> {
445        let res = self
446            .list_all_state_tables()
447            .await?
448            .into_iter()
449            .filter_map(|t| {
450                if t.version.is_some() {
451                    let ret = (
452                        t.id,
453                        t.columns
454                            .iter()
455                            .map(|c| c.column_desc.as_ref().unwrap().column_id)
456                            .collect_vec(),
457                    );
458                    return Some(ret);
459                }
460                None
461            })
462            .collect();
463        Ok(res)
464    }
465
466    pub async fn get_existing_job_resource_group(
467        &self,
468        streaming_job_id: JobId,
469    ) -> MetaResult<String> {
470        let inner = self.inner.read().await;
471        get_existing_job_resource_group(&inner.db, streaming_job_id).await
472    }
473
474    pub async fn get_database_resource_group(&self, database_id: DatabaseId) -> MetaResult<String> {
475        let inner = self.inner.read().await;
476        get_database_resource_group(&inner.db, database_id).await
477    }
478
479    pub async fn get_existing_job_resource_groups(
480        &self,
481        streaming_job_ids: Vec<JobId>,
482    ) -> MetaResult<HashMap<JobId, String>> {
483        let inner = self.inner.read().await;
484        let mut resource_groups = HashMap::new();
485        for job_id in streaming_job_ids {
486            let resource_group = get_existing_job_resource_group(&inner.db, job_id).await?;
487            resource_groups.insert(job_id, resource_group);
488        }
489
490        Ok(resource_groups)
491    }
492
493    pub async fn get_existing_job_database_resource_group(
494        &self,
495        streaming_job_id: JobId,
496    ) -> MetaResult<String> {
497        let inner = self.inner.read().await;
498        let database_id: DatabaseId = StreamingJob::find_by_id(streaming_job_id)
499            .select_only()
500            .join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
501            .column(object::Column::DatabaseId)
502            .into_tuple()
503            .one(&inner.db)
504            .await?
505            .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", streaming_job_id))?;
506
507        get_database_resource_group(&inner.db, database_id).await
508    }
509
510    pub async fn get_job_streaming_parallelisms(
511        &self,
512        streaming_job_id: JobId,
513    ) -> MetaResult<StreamingParallelism> {
514        let inner = self.inner.read().await;
515
516        let job_parallelism: StreamingParallelism = StreamingJob::find_by_id(streaming_job_id)
517            .select_only()
518            .column(streaming_job::Column::Parallelism)
519            .into_tuple()
520            .one(&inner.db)
521            .await?
522            .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", streaming_job_id))?;
523
524        Ok(job_parallelism)
525    }
526
527    pub async fn get_fragment_streaming_job_id(
528        &self,
529        fragment_id: FragmentId,
530    ) -> MetaResult<JobId> {
531        let inner = self.inner.read().await;
532        let job_id: JobId = Fragment::find_by_id(fragment_id)
533            .select_only()
534            .column(fragment::Column::JobId)
535            .into_tuple()
536            .one(&inner.db)
537            .await?
538            .ok_or_else(|| MetaError::catalog_id_not_found("fragment", fragment_id))?;
539        Ok(job_id)
540    }
541
542    pub async fn list_streaming_job_with_database(
543        &self,
544    ) -> MetaResult<HashMap<DatabaseId, Vec<JobId>>> {
545        let inner = self.inner.read().await;
546        let database_objects: Vec<(DatabaseId, JobId)> = StreamingJob::find()
547            .select_only()
548            .column(object::Column::DatabaseId)
549            .column(streaming_job::Column::JobId)
550            .join(JoinType::LeftJoin, streaming_job::Relation::Object.def())
551            .into_tuple()
552            .all(&inner.db)
553            .await?;
554
555        Ok(database_objects.into_iter().into_group_map())
556    }
557
558    // Output: Vec<(table id, db name, schema name, table name, resource group)>
559    pub async fn list_table_objects(
560        &self,
561    ) -> MetaResult<Vec<(TableId, String, String, String, String)>> {
562        let inner = self.inner.read().await;
563        Ok(Object::find()
564            .select_only()
565            .join(JoinType::InnerJoin, object::Relation::Table.def())
566            .join(JoinType::InnerJoin, object::Relation::Database2.def())
567            .join(JoinType::InnerJoin, object::Relation::Schema2.def())
568            .column(object::Column::Oid)
569            .column(database::Column::Name)
570            .column(schema::Column::Name)
571            .column(table::Column::Name)
572            .column(database::Column::ResourceGroup)
573            .into_tuple()
574            .all(&inner.db)
575            .await?)
576    }
577
578    // Output: Vec<(source id, db name, schema name, source name, resource group)>
579    pub async fn list_source_objects(
580        &self,
581    ) -> MetaResult<Vec<(TableId, String, String, String, String)>> {
582        let inner = self.inner.read().await;
583        Ok(Object::find()
584            .select_only()
585            .join(JoinType::InnerJoin, object::Relation::Source.def())
586            .join(JoinType::InnerJoin, object::Relation::Database2.def())
587            .join(JoinType::InnerJoin, object::Relation::Schema2.def())
588            .column(object::Column::Oid)
589            .column(database::Column::Name)
590            .column(schema::Column::Name)
591            .column(source::Column::Name)
592            .column(database::Column::ResourceGroup)
593            .into_tuple()
594            .all(&inner.db)
595            .await?)
596    }
597
598    // Output: Vec<(sink id, db name, schema name, sink name, resource group)>
599    pub async fn list_sink_objects(
600        &self,
601    ) -> MetaResult<Vec<(TableId, String, String, String, String)>> {
602        let inner = self.inner.read().await;
603        Ok(Object::find()
604            .select_only()
605            .join(JoinType::InnerJoin, object::Relation::Sink.def())
606            .join(JoinType::InnerJoin, object::Relation::Database2.def())
607            .join(JoinType::InnerJoin, object::Relation::Schema2.def())
608            .column(object::Column::Oid)
609            .column(database::Column::Name)
610            .column(schema::Column::Name)
611            .column(sink::Column::Name)
612            .column(database::Column::ResourceGroup)
613            .into_tuple()
614            .all(&inner.db)
615            .await?)
616    }
617
618    pub async fn get_streaming_job_status(&self, streaming_job_id: JobId) -> MetaResult<JobStatus> {
619        let inner = self.inner.read().await;
620        let status = StreamingJob::find_by_id(streaming_job_id)
621            .select_only()
622            .column(streaming_job::Column::JobStatus)
623            .into_tuple()
624            .one(&inner.db)
625            .await?
626            .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", streaming_job_id))?;
627        Ok(status)
628    }
629
630    pub async fn get_streaming_job_extra_info(
631        &self,
632        job_ids: Vec<JobId>,
633    ) -> MetaResult<HashMap<JobId, StreamingJobExtraInfo>> {
634        let inner = self.inner.read().await;
635        let txn = inner.db.begin().await?;
636
637        let result = fetch_streaming_job_extra_info(&txn, job_ids).await?;
638        Ok(result)
639    }
640}