risingwave_meta/controller/catalog/
list_op.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::catalog::FragmentTypeMask;
16use risingwave_common::id::JobId;
17use sea_orm::prelude::DateTime;
18
19use super::*;
20use crate::controller::fragment::FragmentTypeMaskExt;
21
22impl CatalogController {
23    pub async fn list_time_travel_table_ids(&self) -> MetaResult<Vec<TableId>> {
24        self.inner.read().await.list_time_travel_table_ids().await
25    }
26
27    pub async fn list_stream_job_desc_for_telemetry(
28        &self,
29    ) -> MetaResult<Vec<MetaTelemetryJobDesc>> {
30        let inner = self.inner.read().await;
31        let info: Vec<(TableId, Option<Property>)> = Table::find()
32            .select_only()
33            .column(table::Column::TableId)
34            .column(source::Column::WithProperties)
35            .join(JoinType::LeftJoin, table::Relation::Source.def())
36            .filter(
37                table::Column::TableType
38                    .eq(TableType::Table)
39                    .or(table::Column::TableType.eq(TableType::MaterializedView)),
40            )
41            .into_tuple()
42            .all(&inner.db)
43            .await?;
44
45        Ok(info
46            .into_iter()
47            .map(|(table_id, properties)| {
48                let connector_info = if let Some(inner_props) = properties {
49                    inner_props
50                        .inner_ref()
51                        .get(UPSTREAM_SOURCE_KEY)
52                        .map(|v| v.to_lowercase())
53                } else {
54                    None
55                };
56                MetaTelemetryJobDesc {
57                    table_id: table_id.as_raw_id() as _,
58                    connector: connector_info,
59                    optimization: vec![],
60                }
61            })
62            .collect())
63    }
64
65    pub async fn list_background_creating_jobs(
66        &self,
67        include_initial: bool,
68        database_id: Option<DatabaseId>,
69    ) -> MetaResult<Vec<(JobId, String, DateTime)>> {
70        let inner = self.inner.read().await;
71        let status_cond = if include_initial {
72            streaming_job::Column::JobStatus.is_in([JobStatus::Initial, JobStatus::Creating])
73        } else {
74            streaming_job::Column::JobStatus.eq(JobStatus::Creating)
75        };
76        let mut table_info: Vec<(JobId, String, DateTime)> = Table::find()
77            .select_only()
78            .columns([table::Column::TableId, table::Column::Definition])
79            .column(object::Column::InitializedAt)
80            .join(JoinType::LeftJoin, table::Relation::Object1.def())
81            .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
82            .filter(
83                streaming_job::Column::CreateType
84                    .eq(CreateType::Background)
85                    .and(status_cond.clone())
86                    .and(
87                        database_id
88                            .map(|database_id| object::Column::DatabaseId.eq(database_id))
89                            .unwrap_or_else(|| SimpleExpr::from(true)),
90                    ),
91            )
92            .into_tuple()
93            .all(&inner.db)
94            .await?;
95        let sink_info: Vec<(JobId, String, DateTime)> = Sink::find()
96            .select_only()
97            .columns([sink::Column::SinkId, sink::Column::Definition])
98            .column(object::Column::InitializedAt)
99            .join(JoinType::LeftJoin, sink::Relation::Object.def())
100            .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
101            .filter(
102                streaming_job::Column::CreateType
103                    .eq(CreateType::Background)
104                    .and(status_cond)
105                    .and(
106                        database_id
107                            .map(|database_id| object::Column::DatabaseId.eq(database_id))
108                            .unwrap_or_else(|| SimpleExpr::from(true)),
109                    ),
110            )
111            .into_tuple()
112            .all(&inner.db)
113            .await?;
114
115        table_info.extend(sink_info.into_iter());
116
117        Ok(table_info)
118    }
119
120    pub async fn list_databases(&self) -> MetaResult<Vec<PbDatabase>> {
121        let inner = self.inner.read().await;
122        inner.list_databases().await
123    }
124
125    pub async fn list_all_object_dependencies(&self) -> MetaResult<Vec<PbObjectDependencies>> {
126        self.list_object_dependencies(true).await
127    }
128
129    pub async fn list_created_object_dependencies(&self) -> MetaResult<Vec<PbObjectDependencies>> {
130        self.list_object_dependencies(false).await
131    }
132
133    pub async fn list_schemas(&self) -> MetaResult<Vec<PbSchema>> {
134        let inner = self.inner.read().await;
135        inner.list_schemas().await
136    }
137
138    /// [`Self::list_tables_by_type`] with all types.
139    pub async fn list_all_state_tables(&self) -> MetaResult<Vec<PbTable>> {
140        let inner = self.inner.read().await;
141        inner.list_all_state_tables().await
142    }
143
144    pub async fn list_readonly_table_ids(&self, schema_id: SchemaId) -> MetaResult<Vec<TableId>> {
145        let inner = self.inner.read().await;
146        let table_ids: Vec<TableId> = Table::find()
147            .select_only()
148            .column(table::Column::TableId)
149            .join(JoinType::InnerJoin, table::Relation::Object1.def())
150            .filter(
151                object::Column::SchemaId
152                    .eq(schema_id)
153                    .and(table::Column::TableType.ne(TableType::Table)),
154            )
155            .into_tuple()
156            .all(&inner.db)
157            .await?;
158        Ok(table_ids)
159    }
160
161    pub async fn list_dml_table_ids(&self, schema_id: SchemaId) -> MetaResult<Vec<TableId>> {
162        let inner = self.inner.read().await;
163        let table_ids: Vec<TableId> = Table::find()
164            .select_only()
165            .column(table::Column::TableId)
166            .join(JoinType::InnerJoin, table::Relation::Object1.def())
167            .filter(
168                object::Column::SchemaId
169                    .eq(schema_id)
170                    .and(table::Column::TableType.eq(TableType::Table)),
171            )
172            .into_tuple()
173            .all(&inner.db)
174            .await?;
175        Ok(table_ids)
176    }
177
178    pub async fn list_view_ids(&self, schema_id: SchemaId) -> MetaResult<Vec<ViewId>> {
179        let inner = self.inner.read().await;
180        let view_ids: Vec<ViewId> = View::find()
181            .select_only()
182            .column(view::Column::ViewId)
183            .join(JoinType::InnerJoin, view::Relation::Object.def())
184            .filter(object::Column::SchemaId.eq(schema_id))
185            .into_tuple()
186            .all(&inner.db)
187            .await?;
188        Ok(view_ids)
189    }
190
191    /// Use [`Self::list_all_state_tables`] to get all types.
192    pub async fn list_tables_by_type(&self, table_type: TableType) -> MetaResult<Vec<PbTable>> {
193        let inner = self.inner.read().await;
194        let table_objs = Table::find()
195            .find_also_related(Object)
196            .filter(table::Column::TableType.eq(table_type))
197            .all(&inner.db)
198            .await?;
199        Ok(table_objs
200            .into_iter()
201            .map(|(table, obj)| ObjectModel(table, obj.unwrap()).into())
202            .collect())
203    }
204
205    pub async fn list_sources(&self) -> MetaResult<Vec<PbSource>> {
206        let inner = self.inner.read().await;
207        inner.list_sources().await
208    }
209
210    // Return a hashmap to distinguish whether each source is shared or not.
211    pub async fn list_source_id_with_shared_types(&self) -> MetaResult<HashMap<SourceId, bool>> {
212        let inner = self.inner.read().await;
213        let source_ids: Vec<(SourceId, Option<StreamSourceInfo>)> = Source::find()
214            .select_only()
215            .columns([source::Column::SourceId, source::Column::SourceInfo])
216            .into_tuple()
217            .all(&inner.db)
218            .await?;
219
220        Ok(source_ids
221            .into_iter()
222            .map(|(source_id, info)| {
223                (
224                    source_id,
225                    info.map(|info| info.to_protobuf().cdc_source_job)
226                        .unwrap_or(false),
227                )
228            })
229            .collect())
230    }
231
232    pub async fn list_connections(&self) -> MetaResult<Vec<PbConnection>> {
233        let inner = self.inner.read().await;
234        let conn_objs = Connection::find()
235            .find_also_related(Object)
236            .all(&inner.db)
237            .await?;
238        Ok(conn_objs
239            .into_iter()
240            .map(|(conn, obj)| ObjectModel(conn, obj.unwrap()).into())
241            .collect())
242    }
243
244    pub async fn list_source_ids(&self, schema_id: SchemaId) -> MetaResult<Vec<SourceId>> {
245        let inner = self.inner.read().await;
246        let source_ids: Vec<SourceId> = Source::find()
247            .select_only()
248            .column(source::Column::SourceId)
249            .join(JoinType::InnerJoin, source::Relation::Object.def())
250            .filter(object::Column::SchemaId.eq(schema_id))
251            .into_tuple()
252            .all(&inner.db)
253            .await?;
254        Ok(source_ids)
255    }
256
257    pub async fn list_indexes(&self) -> MetaResult<Vec<PbIndex>> {
258        let inner = self.inner.read().await;
259        inner.list_indexes().await
260    }
261
262    pub async fn list_sinks(&self) -> MetaResult<Vec<PbSink>> {
263        let inner = self.inner.read().await;
264        inner.list_sinks().await
265    }
266
267    pub async fn list_subscriptions(&self) -> MetaResult<Vec<PbSubscription>> {
268        let inner = self.inner.read().await;
269        inner.list_subscriptions().await
270    }
271
272    pub async fn list_views(&self) -> MetaResult<Vec<PbView>> {
273        let inner = self.inner.read().await;
274        inner.list_views().await
275    }
276
277    pub async fn list_users(&self) -> MetaResult<Vec<PbUserInfo>> {
278        let inner = self.inner.read().await;
279        inner.list_users().await
280    }
281
282    pub async fn list_functions(&self) -> MetaResult<Vec<PbFunction>> {
283        let inner = self.inner.read().await;
284        inner.list_functions().await
285    }
286
287    /// `Unmigrated` refers to table-fragments that have not yet been migrated to the new plan (for now, this means
288    /// table-fragments that do not use `UpstreamSinkUnion` operator to receive multiple upstream sinks)
289    pub async fn list_unmigrated_tables(&self) -> MetaResult<Vec<PbTable>> {
290        let inner = self.inner.read().await;
291
292        let table_objs = Table::find()
293            .filter(table::Column::TableType.eq(TableType::Table))
294            .find_also_related(Object)
295            .join(JoinType::InnerJoin, object::Relation::Fragment.def())
296            .filter(FragmentTypeMask::intersects(FragmentTypeFlag::Mview).and(
297                FragmentTypeMask::disjoint(FragmentTypeFlag::UpstreamSinkUnion),
298            ))
299            .all(&inner.db)
300            .await?;
301
302        Ok(table_objs
303            .into_iter()
304            .map(|(table, obj)| ObjectModel(table, obj.unwrap()).into())
305            .collect())
306    }
307}