risingwave_meta/controller/catalog/
get_op.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use super::*;
16use crate::controller::utils::{get_database_resource_group, get_existing_job_resource_group};
17
18impl CatalogController {
19    pub async fn get_secret_by_id(&self, secret_id: SecretId) -> MetaResult<PbSecret> {
20        let inner = self.inner.read().await;
21        let (secret, obj) = Secret::find_by_id(secret_id)
22            .find_also_related(Object)
23            .one(&inner.db)
24            .await?
25            .ok_or_else(|| MetaError::catalog_id_not_found("secret", secret_id))?;
26        Ok(ObjectModel(secret, obj.unwrap()).into())
27    }
28
29    pub async fn get_object_database_id(&self, object_id: ObjectId) -> MetaResult<DatabaseId> {
30        let inner = self.inner.read().await;
31        let (database_id,): (Option<DatabaseId>,) = Object::find_by_id(object_id)
32            .select_only()
33            .select_column(object::Column::DatabaseId)
34            .into_tuple()
35            .one(&inner.db)
36            .await?
37            .ok_or_else(|| MetaError::catalog_id_not_found("object", object_id))?;
38        Ok(database_id.ok_or_else(|| anyhow!("object has no database id: {object_id}"))?)
39    }
40
41    pub async fn get_connection_by_id(
42        &self,
43        connection_id: ConnectionId,
44    ) -> MetaResult<PbConnection> {
45        let inner = self.inner.read().await;
46        let (conn, obj) = Connection::find_by_id(connection_id)
47            .find_also_related(Object)
48            .one(&inner.db)
49            .await?
50            .ok_or_else(|| MetaError::catalog_id_not_found("connection", connection_id))?;
51
52        Ok(ObjectModel(conn, obj.unwrap()).into())
53    }
54
55    pub async fn get_table_by_name(
56        &self,
57        database_name: &str,
58        table_name: &str,
59    ) -> MetaResult<Option<PbTable>> {
60        let inner = self.inner.read().await;
61        let table_obj = Table::find()
62            .find_also_related(Object)
63            .join(JoinType::InnerJoin, object::Relation::Database2.def())
64            .filter(
65                table::Column::Name
66                    .eq(table_name)
67                    .and(database::Column::Name.eq(database_name)),
68            )
69            .one(&inner.db)
70            .await?;
71        Ok(table_obj.map(|(table, obj)| ObjectModel(table, obj.unwrap()).into()))
72    }
73
74    pub async fn get_table_associated_source_id(
75        &self,
76        table_id: TableId,
77    ) -> MetaResult<Option<SourceId>> {
78        let inner = self.inner.read().await;
79        Table::find_by_id(table_id)
80            .select_only()
81            .select_column(table::Column::OptionalAssociatedSourceId)
82            .into_tuple()
83            .one(&inner.db)
84            .await?
85            .ok_or_else(|| MetaError::catalog_id_not_found("table", table_id))
86    }
87
88    pub async fn get_table_by_ids(
89        &self,
90        table_ids: Vec<TableId>,
91        include_dropped_table: bool,
92    ) -> MetaResult<Vec<PbTable>> {
93        let inner = self.inner.read().await;
94        let table_objs = Table::find()
95            .find_also_related(Object)
96            .filter(table::Column::TableId.is_in(table_ids.clone()))
97            .all(&inner.db)
98            .await?;
99        let tables = table_objs
100            .into_iter()
101            .map(|(table, obj)| ObjectModel(table, obj.unwrap()).into());
102        let tables = if include_dropped_table {
103            tables
104                .chain(inner.dropped_tables.iter().filter_map(|(id, t)| {
105                    if table_ids.contains(id) {
106                        Some(t.clone())
107                    } else {
108                        None
109                    }
110                }))
111                .collect()
112        } else {
113            tables.collect()
114        };
115        Ok(tables)
116    }
117
118    pub async fn get_sink_by_ids(&self, sink_ids: Vec<SinkId>) -> MetaResult<Vec<PbSink>> {
119        let inner = self.inner.read().await;
120        let sink_objs = Sink::find()
121            .find_also_related(Object)
122            .filter(sink::Column::SinkId.is_in(sink_ids))
123            .all(&inner.db)
124            .await?;
125        Ok(sink_objs
126            .into_iter()
127            .map(|(sink, obj)| ObjectModel(sink, obj.unwrap()).into())
128            .collect())
129    }
130
131    pub async fn get_sink_state_table_ids(&self, sink_id: SinkId) -> MetaResult<Vec<TableId>> {
132        let inner = self.inner.read().await;
133        let tables: Vec<I32Array> = Fragment::find()
134            .select_only()
135            .column(fragment::Column::StateTableIds)
136            .filter(fragment::Column::JobId.eq(sink_id))
137            .into_tuple()
138            .all(&inner.db)
139            .await?;
140        Ok(tables
141            .into_iter()
142            .flat_map(|ids| ids.into_inner().into_iter())
143            .collect())
144    }
145
146    pub async fn get_subscription_by_id(
147        &self,
148        subscription_id: SubscriptionId,
149    ) -> MetaResult<PbSubscription> {
150        let inner = self.inner.read().await;
151        let subscription_objs = Subscription::find()
152            .find_also_related(Object)
153            .filter(subscription::Column::SubscriptionId.eq(subscription_id))
154            .all(&inner.db)
155            .await?;
156        let subscription: PbSubscription = subscription_objs
157            .into_iter()
158            .map(|(subscription, obj)| ObjectModel(subscription, obj.unwrap()).into())
159            .find_or_first(|_| true)
160            .ok_or_else(|| anyhow!("cannot find subscription with id {}", subscription_id))?;
161
162        Ok(subscription)
163    }
164
165    pub async fn get_mv_depended_subscriptions(
166        &self,
167        database_id: Option<DatabaseId>,
168    ) -> MetaResult<HashMap<DatabaseId, HashMap<TableId, HashMap<SubscriptionId, u64>>>> {
169        let inner = self.inner.read().await;
170        let select = Subscription::find()
171            .select_only()
172            .select_column(subscription::Column::SubscriptionId)
173            .select_column(subscription::Column::DependentTableId)
174            .select_column(subscription::Column::RetentionSeconds)
175            .select_column(object::Column::DatabaseId)
176            .join(JoinType::InnerJoin, subscription::Relation::Object.def());
177        let select = if let Some(database_id) = database_id {
178            select.filter(object::Column::DatabaseId.eq(database_id))
179        } else {
180            select
181        };
182        let subscription_objs: Vec<(SubscriptionId, ObjectId, i64, DatabaseId)> =
183            select.into_tuple().all(&inner.db).await?;
184        let mut map: HashMap<_, HashMap<_, HashMap<_, _>>> = HashMap::new();
185        // Write object at the same time we write subscription, so we must be able to get obj
186        for (subscription_id, dependent_table_id, retention_seconds, database_id) in
187            subscription_objs
188        {
189            map.entry(database_id)
190                .or_default()
191                .entry(dependent_table_id)
192                .or_default()
193                .insert(subscription_id, retention_seconds as _);
194        }
195        Ok(map)
196    }
197
198    pub async fn get_all_table_options(&self) -> MetaResult<HashMap<TableId, TableOption>> {
199        let inner = self.inner.read().await;
200        let table_options: Vec<(TableId, Option<i32>)> = Table::find()
201            .select_only()
202            .columns([table::Column::TableId, table::Column::RetentionSeconds])
203            .into_tuple::<(TableId, Option<i32>)>()
204            .all(&inner.db)
205            .await?;
206
207        Ok(table_options
208            .into_iter()
209            .map(|(id, retention_seconds)| {
210                (
211                    id,
212                    TableOption {
213                        retention_seconds: retention_seconds.map(|i| i.try_into().unwrap()),
214                    },
215                )
216            })
217            .collect())
218    }
219
220    pub async fn get_all_streaming_parallelisms(
221        &self,
222    ) -> MetaResult<HashMap<ObjectId, StreamingParallelism>> {
223        let inner = self.inner.read().await;
224
225        let job_parallelisms = StreamingJob::find()
226            .select_only()
227            .columns([
228                streaming_job::Column::JobId,
229                streaming_job::Column::Parallelism,
230            ])
231            .into_tuple::<(ObjectId, StreamingParallelism)>()
232            .all(&inner.db)
233            .await?;
234
235        Ok(job_parallelisms
236            .into_iter()
237            .collect::<HashMap<ObjectId, StreamingParallelism>>())
238    }
239
240    pub async fn get_table_name_type_mapping(
241        &self,
242    ) -> MetaResult<HashMap<TableId, (String, String)>> {
243        let inner = self.inner.read().await;
244        let table_name_types: Vec<(TableId, String, TableType)> = Table::find()
245            .select_only()
246            .columns([
247                table::Column::TableId,
248                table::Column::Name,
249                table::Column::TableType,
250            ])
251            .into_tuple()
252            .all(&inner.db)
253            .await?;
254        Ok(table_name_types
255            .into_iter()
256            .map(|(id, name, table_type)| {
257                (
258                    id,
259                    (name, PbTableType::from(table_type).as_str_name().to_owned()),
260                )
261            })
262            .collect())
263    }
264
265    pub async fn get_table_by_cdc_table_id(
266        &self,
267        cdc_table_id: &String,
268    ) -> MetaResult<Vec<PbTable>> {
269        let inner = self.inner.read().await;
270        let table_objs = Table::find()
271            .find_also_related(Object)
272            .filter(table::Column::CdcTableId.eq(cdc_table_id))
273            .all(&inner.db)
274            .await?;
275        Ok(table_objs
276            .into_iter()
277            .map(|(table, obj)| ObjectModel(table, obj.unwrap()).into())
278            .collect())
279    }
280
281    pub async fn get_created_table_ids(&self) -> MetaResult<Vec<TableId>> {
282        let inner = self.inner.read().await;
283
284        // created table ids.
285        let mut table_ids: Vec<TableId> = StreamingJob::find()
286            .select_only()
287            .column(streaming_job::Column::JobId)
288            .filter(streaming_job::Column::JobStatus.eq(JobStatus::Created))
289            .into_tuple()
290            .all(&inner.db)
291            .await?;
292
293        // internal table ids.
294        let internal_table_ids: Vec<TableId> = Table::find()
295            .select_only()
296            .column(table::Column::TableId)
297            .filter(table::Column::BelongsToJobId.is_in(table_ids.clone()))
298            .into_tuple()
299            .all(&inner.db)
300            .await?;
301        table_ids.extend(internal_table_ids);
302
303        Ok(table_ids)
304    }
305
306    /// Returns column ids of versioned tables.
307    /// Being versioned implies using `ColumnAwareSerde`.
308    pub async fn get_versioned_table_schemas(&self) -> MetaResult<HashMap<TableId, Vec<i32>>> {
309        let res = self
310            .list_all_state_tables()
311            .await?
312            .into_iter()
313            .filter_map(|t| {
314                if t.version.is_some() {
315                    let ret = (
316                        t.id.try_into().unwrap(),
317                        t.columns
318                            .iter()
319                            .map(|c| c.column_desc.as_ref().unwrap().column_id)
320                            .collect_vec(),
321                    );
322                    return Some(ret);
323                }
324                None
325            })
326            .collect();
327        Ok(res)
328    }
329
330    pub async fn get_existing_job_resource_group(
331        &self,
332        streaming_job_id: ObjectId,
333    ) -> MetaResult<String> {
334        let inner = self.inner.read().await;
335        get_existing_job_resource_group(&inner.db, streaming_job_id).await
336    }
337
338    pub async fn get_database_resource_group(&self, database_id: ObjectId) -> MetaResult<String> {
339        let inner = self.inner.read().await;
340        get_database_resource_group(&inner.db, database_id).await
341    }
342
343    pub async fn get_existing_job_resource_groups(
344        &self,
345        streaming_job_ids: Vec<ObjectId>,
346    ) -> MetaResult<HashMap<ObjectId, String>> {
347        let inner = self.inner.read().await;
348        let mut resource_groups = HashMap::new();
349        for job_id in streaming_job_ids {
350            let resource_group = get_existing_job_resource_group(&inner.db, job_id).await?;
351            resource_groups.insert(job_id, resource_group);
352        }
353
354        Ok(resource_groups)
355    }
356
357    pub async fn get_existing_job_database_resource_group(
358        &self,
359        streaming_job_id: ObjectId,
360    ) -> MetaResult<String> {
361        let inner = self.inner.read().await;
362        let database_id: ObjectId = StreamingJob::find_by_id(streaming_job_id)
363            .select_only()
364            .join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
365            .column(object::Column::DatabaseId)
366            .into_tuple()
367            .one(&inner.db)
368            .await?
369            .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", streaming_job_id))?;
370
371        get_database_resource_group(&inner.db, database_id).await
372    }
373
374    pub async fn get_job_streaming_parallelisms(
375        &self,
376        streaming_job_id: ObjectId,
377    ) -> MetaResult<StreamingParallelism> {
378        let inner = self.inner.read().await;
379
380        let job_parallelism: StreamingParallelism = StreamingJob::find_by_id(streaming_job_id)
381            .select_only()
382            .column(streaming_job::Column::Parallelism)
383            .into_tuple()
384            .one(&inner.db)
385            .await?
386            .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", streaming_job_id))?;
387
388        Ok(job_parallelism)
389    }
390
391    pub async fn get_fragment_streaming_job_id(
392        &self,
393        fragment_id: FragmentId,
394    ) -> MetaResult<ObjectId> {
395        let inner = self.inner.read().await;
396        let job_id: ObjectId = Fragment::find_by_id(fragment_id)
397            .select_only()
398            .column(fragment::Column::JobId)
399            .into_tuple()
400            .one(&inner.db)
401            .await?
402            .ok_or_else(|| MetaError::catalog_id_not_found("fragment", fragment_id))?;
403        Ok(job_id)
404    }
405}