risingwave_meta/controller/catalog/
list_op.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::catalog::FragmentTypeMask;
16use risingwave_common::id::JobId;
17use risingwave_meta_model::refresh_job::{self, RefreshState};
18use sea_orm::prelude::DateTime;
19
20use super::*;
21use crate::controller::fragment::FragmentTypeMaskExt;
22
23impl CatalogController {
24    pub async fn list_time_travel_table_ids(&self) -> MetaResult<Vec<TableId>> {
25        self.inner.read().await.list_time_travel_table_ids().await
26    }
27
28    pub async fn list_refresh_jobs(&self) -> MetaResult<Vec<refresh_job::Model>> {
29        let inner = self.inner.read().await;
30        Ok(RefreshJob::find().all(&inner.db).await?)
31    }
32
33    pub async fn get_refresh_job_state_by_table_id(
34        &self,
35        table_id: TableId,
36    ) -> MetaResult<RefreshState> {
37        let inner = self.inner.read().await;
38        let (refresh_job_state,): (RefreshState,) = RefreshJob::find_by_id(table_id)
39            .select_only()
40            .select_column(refresh_job::Column::CurrentStatus)
41            .into_tuple()
42            .one(&inner.db)
43            .await?
44            .ok_or_else(|| MetaError::catalog_id_not_found("refresh_job", table_id))?;
45        Ok(refresh_job_state)
46    }
47
48    pub async fn list_refreshable_table_ids(&self) -> MetaResult<Vec<TableId>> {
49        let inner = self.inner.read().await;
50        Ok(Table::find()
51            .select_only()
52            .column(table::Column::TableId)
53            .filter(table::Column::Refreshable.eq(true))
54            .into_tuple()
55            .all(&inner.db)
56            .await?)
57    }
58
59    pub async fn list_stream_job_desc_for_telemetry(
60        &self,
61    ) -> MetaResult<Vec<MetaTelemetryJobDesc>> {
62        let inner = self.inner.read().await;
63        let info: Vec<(TableId, Option<Property>)> = Table::find()
64            .select_only()
65            .column(table::Column::TableId)
66            .column(source::Column::WithProperties)
67            .join(JoinType::LeftJoin, table::Relation::Source.def())
68            .filter(
69                table::Column::TableType
70                    .eq(TableType::Table)
71                    .or(table::Column::TableType.eq(TableType::MaterializedView)),
72            )
73            .into_tuple()
74            .all(&inner.db)
75            .await?;
76
77        Ok(info
78            .into_iter()
79            .map(|(table_id, properties)| {
80                let connector_info = if let Some(inner_props) = properties {
81                    inner_props
82                        .inner_ref()
83                        .get(UPSTREAM_SOURCE_KEY)
84                        .map(|v| v.to_lowercase())
85                } else {
86                    None
87                };
88                MetaTelemetryJobDesc {
89                    table_id: table_id.as_i32_id(),
90                    connector: connector_info,
91                    optimization: vec![],
92                }
93            })
94            .collect())
95    }
96
97    pub async fn list_background_creating_jobs(
98        &self,
99        include_initial: bool,
100        database_id: Option<DatabaseId>,
101    ) -> MetaResult<Vec<(JobId, String, DateTime)>> {
102        let inner = self.inner.read().await;
103        let status_cond = if include_initial {
104            streaming_job::Column::JobStatus.is_in([JobStatus::Initial, JobStatus::Creating])
105        } else {
106            streaming_job::Column::JobStatus.eq(JobStatus::Creating)
107        };
108        let mut table_info: Vec<(JobId, String, DateTime)> = Table::find()
109            .select_only()
110            .columns([table::Column::TableId, table::Column::Definition])
111            .column(object::Column::InitializedAt)
112            .join(JoinType::LeftJoin, table::Relation::Object1.def())
113            .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
114            .filter(
115                streaming_job::Column::CreateType
116                    .eq(CreateType::Background)
117                    .and(status_cond.clone())
118                    .and(
119                        database_id
120                            .map(|database_id| object::Column::DatabaseId.eq(database_id))
121                            .unwrap_or_else(|| SimpleExpr::from(true)),
122                    ),
123            )
124            .into_tuple()
125            .all(&inner.db)
126            .await?;
127        let sink_info: Vec<(JobId, String, DateTime)> = Sink::find()
128            .select_only()
129            .columns([sink::Column::SinkId, sink::Column::Definition])
130            .column(object::Column::InitializedAt)
131            .join(JoinType::LeftJoin, sink::Relation::Object.def())
132            .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
133            .filter(
134                streaming_job::Column::CreateType
135                    .eq(CreateType::Background)
136                    .and(status_cond)
137                    .and(
138                        database_id
139                            .map(|database_id| object::Column::DatabaseId.eq(database_id))
140                            .unwrap_or_else(|| SimpleExpr::from(true)),
141                    ),
142            )
143            .into_tuple()
144            .all(&inner.db)
145            .await?;
146
147        table_info.extend(sink_info.into_iter());
148
149        Ok(table_info)
150    }
151
152    pub async fn list_databases(&self) -> MetaResult<Vec<PbDatabase>> {
153        let inner = self.inner.read().await;
154        inner.list_databases().await
155    }
156
157    pub async fn list_all_object_dependencies(&self) -> MetaResult<Vec<PbObjectDependencies>> {
158        self.list_object_dependencies(true).await
159    }
160
161    pub async fn list_created_object_dependencies(&self) -> MetaResult<Vec<PbObjectDependencies>> {
162        self.list_object_dependencies(false).await
163    }
164
165    pub async fn list_schemas(&self) -> MetaResult<Vec<PbSchema>> {
166        let inner = self.inner.read().await;
167        inner.list_schemas().await
168    }
169
170    /// [`Self::list_tables_by_type`] with all types.
171    pub async fn list_all_state_tables(&self) -> MetaResult<Vec<PbTable>> {
172        let inner = self.inner.read().await;
173        inner.list_all_state_tables().await
174    }
175
176    pub async fn list_readonly_table_ids(&self, schema_id: SchemaId) -> MetaResult<Vec<TableId>> {
177        let inner = self.inner.read().await;
178        let table_ids: Vec<TableId> = Table::find()
179            .select_only()
180            .column(table::Column::TableId)
181            .join(JoinType::InnerJoin, table::Relation::Object1.def())
182            .filter(
183                object::Column::SchemaId
184                    .eq(schema_id)
185                    .and(table::Column::TableType.ne(TableType::Table)),
186            )
187            .into_tuple()
188            .all(&inner.db)
189            .await?;
190        Ok(table_ids)
191    }
192
193    pub async fn list_dml_table_ids(&self, schema_id: SchemaId) -> MetaResult<Vec<TableId>> {
194        let inner = self.inner.read().await;
195        let table_ids: Vec<TableId> = Table::find()
196            .select_only()
197            .column(table::Column::TableId)
198            .join(JoinType::InnerJoin, table::Relation::Object1.def())
199            .filter(
200                object::Column::SchemaId
201                    .eq(schema_id)
202                    .and(table::Column::TableType.eq(TableType::Table)),
203            )
204            .into_tuple()
205            .all(&inner.db)
206            .await?;
207        Ok(table_ids)
208    }
209
210    pub async fn list_view_ids(&self, schema_id: SchemaId) -> MetaResult<Vec<ViewId>> {
211        let inner = self.inner.read().await;
212        let view_ids: Vec<ViewId> = View::find()
213            .select_only()
214            .column(view::Column::ViewId)
215            .join(JoinType::InnerJoin, view::Relation::Object.def())
216            .filter(object::Column::SchemaId.eq(schema_id))
217            .into_tuple()
218            .all(&inner.db)
219            .await?;
220        Ok(view_ids)
221    }
222
223    /// Use [`Self::list_all_state_tables`] to get all types.
224    pub async fn list_tables_by_type(&self, table_type: TableType) -> MetaResult<Vec<PbTable>> {
225        let inner = self.inner.read().await;
226        let table_objs = Table::find()
227            .find_also_related(Object)
228            .filter(table::Column::TableType.eq(table_type))
229            .all(&inner.db)
230            .await?;
231        Ok(table_objs
232            .into_iter()
233            .map(|(table, obj)| ObjectModel(table, obj.unwrap()).into())
234            .collect())
235    }
236
237    pub async fn list_sources(&self) -> MetaResult<Vec<PbSource>> {
238        let inner = self.inner.read().await;
239        inner.list_sources().await
240    }
241
242    // Return a hashmap to distinguish whether each source is shared or not.
243    pub async fn list_source_id_with_shared_types(&self) -> MetaResult<HashMap<SourceId, bool>> {
244        let inner = self.inner.read().await;
245        let source_ids: Vec<(SourceId, Option<StreamSourceInfo>)> = Source::find()
246            .select_only()
247            .columns([source::Column::SourceId, source::Column::SourceInfo])
248            .into_tuple()
249            .all(&inner.db)
250            .await?;
251
252        Ok(source_ids
253            .into_iter()
254            .map(|(source_id, info)| {
255                (
256                    source_id,
257                    info.map(|info| info.to_protobuf().cdc_source_job)
258                        .unwrap_or(false),
259                )
260            })
261            .collect())
262    }
263
264    pub async fn list_connections(&self) -> MetaResult<Vec<PbConnection>> {
265        let inner = self.inner.read().await;
266        let conn_objs = Connection::find()
267            .find_also_related(Object)
268            .all(&inner.db)
269            .await?;
270        Ok(conn_objs
271            .into_iter()
272            .map(|(conn, obj)| ObjectModel(conn, obj.unwrap()).into())
273            .collect())
274    }
275
276    pub async fn list_source_ids(&self, schema_id: SchemaId) -> MetaResult<Vec<SourceId>> {
277        let inner = self.inner.read().await;
278        let source_ids: Vec<SourceId> = Source::find()
279            .select_only()
280            .column(source::Column::SourceId)
281            .join(JoinType::InnerJoin, source::Relation::Object.def())
282            .filter(object::Column::SchemaId.eq(schema_id))
283            .into_tuple()
284            .all(&inner.db)
285            .await?;
286        Ok(source_ids)
287    }
288
289    pub async fn list_indexes(&self) -> MetaResult<Vec<PbIndex>> {
290        let inner = self.inner.read().await;
291        inner.list_indexes().await
292    }
293
294    pub async fn list_sinks(&self) -> MetaResult<Vec<PbSink>> {
295        let inner = self.inner.read().await;
296        inner.list_sinks().await
297    }
298
299    pub async fn list_subscriptions(&self) -> MetaResult<Vec<PbSubscription>> {
300        let inner = self.inner.read().await;
301        inner.list_subscriptions().await
302    }
303
304    pub async fn list_views(&self) -> MetaResult<Vec<PbView>> {
305        let inner = self.inner.read().await;
306        inner.list_views().await
307    }
308
309    pub async fn list_users(&self) -> MetaResult<Vec<PbUserInfo>> {
310        let inner = self.inner.read().await;
311        inner.list_users().await
312    }
313
314    pub async fn list_functions(&self) -> MetaResult<Vec<PbFunction>> {
315        let inner = self.inner.read().await;
316        inner.list_functions().await
317    }
318
319    /// `Unmigrated` refers to table-fragments that have not yet been migrated to the new plan (for now, this means
320    /// table-fragments that do not use `UpstreamSinkUnion` operator to receive multiple upstream sinks)
321    pub async fn list_unmigrated_tables(&self) -> MetaResult<Vec<PbTable>> {
322        let inner = self.inner.read().await;
323
324        let table_objs = Table::find()
325            .filter(table::Column::TableType.eq(TableType::Table))
326            .find_also_related(Object)
327            .join(JoinType::InnerJoin, object::Relation::Fragment.def())
328            .filter(FragmentTypeMask::intersects(FragmentTypeFlag::Mview).and(
329                FragmentTypeMask::disjoint(FragmentTypeFlag::UpstreamSinkUnion),
330            ))
331            .all(&inner.db)
332            .await?;
333
334        Ok(table_objs
335            .into_iter()
336            .map(|(table, obj)| ObjectModel(table, obj.unwrap()).into())
337            .collect())
338    }
339
340    pub async fn list_sink_ids(&self, database_id: Option<DatabaseId>) -> MetaResult<Vec<SinkId>> {
341        let inner = self.inner.read().await;
342
343        let mut query = Sink::find().select_only().column(sink::Column::SinkId);
344
345        if let Some(database_id) = database_id {
346            query = query
347                .join(JoinType::InnerJoin, sink::Relation::Object.def())
348                .filter(object::Column::DatabaseId.eq(database_id));
349        }
350
351        let sink_ids: Vec<SinkId> = query.into_tuple().all(&inner.db).await?;
352
353        Ok(sink_ids)
354    }
355}