risingwave_meta/controller/catalog/
list_op.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::catalog::FragmentTypeMask;
16use sea_orm::prelude::DateTime;
17
18use super::*;
19use crate::controller::fragment::FragmentTypeMaskExt;
20
21impl CatalogController {
22    pub async fn list_time_travel_table_ids(&self) -> MetaResult<Vec<TableId>> {
23        self.inner.read().await.list_time_travel_table_ids().await
24    }
25
26    pub async fn list_stream_job_desc_for_telemetry(
27        &self,
28    ) -> MetaResult<Vec<MetaTelemetryJobDesc>> {
29        let inner = self.inner.read().await;
30        let info: Vec<(TableId, Option<Property>)> = Table::find()
31            .select_only()
32            .column(table::Column::TableId)
33            .column(source::Column::WithProperties)
34            .join(JoinType::LeftJoin, table::Relation::Source.def())
35            .filter(
36                table::Column::TableType
37                    .eq(TableType::Table)
38                    .or(table::Column::TableType.eq(TableType::MaterializedView)),
39            )
40            .into_tuple()
41            .all(&inner.db)
42            .await?;
43
44        Ok(info
45            .into_iter()
46            .map(|(table_id, properties)| {
47                let connector_info = if let Some(inner_props) = properties {
48                    inner_props
49                        .inner_ref()
50                        .get(UPSTREAM_SOURCE_KEY)
51                        .map(|v| v.to_lowercase())
52                } else {
53                    None
54                };
55                MetaTelemetryJobDesc {
56                    table_id,
57                    connector: connector_info,
58                    optimization: vec![],
59                }
60            })
61            .collect())
62    }
63
64    pub async fn list_background_creating_jobs(
65        &self,
66        include_initial: bool,
67    ) -> MetaResult<Vec<(ObjectId, String, DateTime)>> {
68        let inner = self.inner.read().await;
69        let status_cond = if include_initial {
70            streaming_job::Column::JobStatus.is_in([JobStatus::Initial, JobStatus::Creating])
71        } else {
72            streaming_job::Column::JobStatus.eq(JobStatus::Creating)
73        };
74        let mut table_info: Vec<(ObjectId, String, DateTime)> = Table::find()
75            .select_only()
76            .columns([table::Column::TableId, table::Column::Definition])
77            .column(object::Column::InitializedAt)
78            .join(JoinType::LeftJoin, table::Relation::Object1.def())
79            .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
80            .filter(
81                streaming_job::Column::CreateType
82                    .eq(CreateType::Background)
83                    .and(status_cond.clone()),
84            )
85            .into_tuple()
86            .all(&inner.db)
87            .await?;
88        let sink_info: Vec<(ObjectId, String, DateTime)> = Sink::find()
89            .select_only()
90            .columns([sink::Column::SinkId, sink::Column::Definition])
91            .column(object::Column::InitializedAt)
92            .join(JoinType::LeftJoin, sink::Relation::Object.def())
93            .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
94            .filter(
95                streaming_job::Column::CreateType
96                    .eq(CreateType::Background)
97                    .and(status_cond),
98            )
99            .into_tuple()
100            .all(&inner.db)
101            .await?;
102
103        table_info.extend(sink_info.into_iter());
104
105        Ok(table_info)
106    }
107
108    pub async fn list_databases(&self) -> MetaResult<Vec<PbDatabase>> {
109        let inner = self.inner.read().await;
110        inner.list_databases().await
111    }
112
113    pub async fn list_all_object_dependencies(&self) -> MetaResult<Vec<PbObjectDependencies>> {
114        self.list_object_dependencies(true).await
115    }
116
117    pub async fn list_created_object_dependencies(&self) -> MetaResult<Vec<PbObjectDependencies>> {
118        self.list_object_dependencies(false).await
119    }
120
121    pub async fn list_schemas(&self) -> MetaResult<Vec<PbSchema>> {
122        let inner = self.inner.read().await;
123        inner.list_schemas().await
124    }
125
126    /// [`Self::list_tables_by_type`] with all types.
127    pub async fn list_all_state_tables(&self) -> MetaResult<Vec<PbTable>> {
128        let inner = self.inner.read().await;
129        inner.list_all_state_tables().await
130    }
131
132    pub async fn list_readonly_table_ids(&self, schema_id: SchemaId) -> MetaResult<Vec<TableId>> {
133        let inner = self.inner.read().await;
134        let table_ids: Vec<TableId> = Table::find()
135            .select_only()
136            .column(table::Column::TableId)
137            .join(JoinType::InnerJoin, table::Relation::Object1.def())
138            .filter(
139                object::Column::SchemaId
140                    .eq(schema_id)
141                    .and(table::Column::TableType.ne(TableType::Table)),
142            )
143            .into_tuple()
144            .all(&inner.db)
145            .await?;
146        Ok(table_ids)
147    }
148
149    pub async fn list_dml_table_ids(&self, schema_id: SchemaId) -> MetaResult<Vec<TableId>> {
150        let inner = self.inner.read().await;
151        let table_ids: Vec<TableId> = Table::find()
152            .select_only()
153            .column(table::Column::TableId)
154            .join(JoinType::InnerJoin, table::Relation::Object1.def())
155            .filter(
156                object::Column::SchemaId
157                    .eq(schema_id)
158                    .and(table::Column::TableType.eq(TableType::Table)),
159            )
160            .into_tuple()
161            .all(&inner.db)
162            .await?;
163        Ok(table_ids)
164    }
165
166    pub async fn list_view_ids(&self, schema_id: SchemaId) -> MetaResult<Vec<ViewId>> {
167        let inner = self.inner.read().await;
168        let view_ids: Vec<ViewId> = View::find()
169            .select_only()
170            .column(view::Column::ViewId)
171            .join(JoinType::InnerJoin, view::Relation::Object.def())
172            .filter(object::Column::SchemaId.eq(schema_id))
173            .into_tuple()
174            .all(&inner.db)
175            .await?;
176        Ok(view_ids)
177    }
178
179    /// Use [`Self::list_all_state_tables`] to get all types.
180    pub async fn list_tables_by_type(&self, table_type: TableType) -> MetaResult<Vec<PbTable>> {
181        let inner = self.inner.read().await;
182        let table_objs = Table::find()
183            .find_also_related(Object)
184            .filter(table::Column::TableType.eq(table_type))
185            .all(&inner.db)
186            .await?;
187        Ok(table_objs
188            .into_iter()
189            .map(|(table, obj)| ObjectModel(table, obj.unwrap()).into())
190            .collect())
191    }
192
193    pub async fn list_sources(&self) -> MetaResult<Vec<PbSource>> {
194        let inner = self.inner.read().await;
195        inner.list_sources().await
196    }
197
198    // Return a hashmap to distinguish whether each source is shared or not.
199    pub async fn list_source_id_with_shared_types(&self) -> MetaResult<HashMap<SourceId, bool>> {
200        let inner = self.inner.read().await;
201        let source_ids: Vec<(SourceId, Option<StreamSourceInfo>)> = Source::find()
202            .select_only()
203            .columns([source::Column::SourceId, source::Column::SourceInfo])
204            .into_tuple()
205            .all(&inner.db)
206            .await?;
207
208        Ok(source_ids
209            .into_iter()
210            .map(|(source_id, info)| {
211                (
212                    source_id,
213                    info.map(|info| info.to_protobuf().cdc_source_job)
214                        .unwrap_or(false),
215                )
216            })
217            .collect())
218    }
219
220    pub async fn list_connections(&self) -> MetaResult<Vec<PbConnection>> {
221        let inner = self.inner.read().await;
222        let conn_objs = Connection::find()
223            .find_also_related(Object)
224            .all(&inner.db)
225            .await?;
226        Ok(conn_objs
227            .into_iter()
228            .map(|(conn, obj)| ObjectModel(conn, obj.unwrap()).into())
229            .collect())
230    }
231
232    pub async fn list_source_ids(&self, schema_id: SchemaId) -> MetaResult<Vec<SourceId>> {
233        let inner = self.inner.read().await;
234        let source_ids: Vec<SourceId> = Source::find()
235            .select_only()
236            .column(source::Column::SourceId)
237            .join(JoinType::InnerJoin, source::Relation::Object.def())
238            .filter(object::Column::SchemaId.eq(schema_id))
239            .into_tuple()
240            .all(&inner.db)
241            .await?;
242        Ok(source_ids)
243    }
244
245    pub async fn list_indexes(&self) -> MetaResult<Vec<PbIndex>> {
246        let inner = self.inner.read().await;
247        inner.list_indexes().await
248    }
249
250    pub async fn list_sinks(&self) -> MetaResult<Vec<PbSink>> {
251        let inner = self.inner.read().await;
252        inner.list_sinks().await
253    }
254
255    pub async fn list_subscriptions(&self) -> MetaResult<Vec<PbSubscription>> {
256        let inner = self.inner.read().await;
257        inner.list_subscriptions().await
258    }
259
260    pub async fn list_views(&self) -> MetaResult<Vec<PbView>> {
261        let inner = self.inner.read().await;
262        inner.list_views().await
263    }
264
265    pub async fn list_users(&self) -> MetaResult<Vec<PbUserInfo>> {
266        let inner = self.inner.read().await;
267        inner.list_users().await
268    }
269
270    pub async fn list_functions(&self) -> MetaResult<Vec<PbFunction>> {
271        let inner = self.inner.read().await;
272        inner.list_functions().await
273    }
274
275    /// `Unmigrated` refers to table-fragments that have not yet been migrated to the new plan (for now, this means
276    /// table-fragments that do not use `UpstreamSinkUnion` operator to receive multiple upstream sinks)
277    pub async fn list_unmigrated_tables(&self) -> MetaResult<Vec<PbTable>> {
278        let inner = self.inner.read().await;
279
280        let table_objs = Table::find()
281            .filter(table::Column::TableType.eq(TableType::Table))
282            .find_also_related(Object)
283            .join(JoinType::InnerJoin, object::Relation::Fragment.def())
284            .filter(FragmentTypeMask::intersects(FragmentTypeFlag::Mview).and(
285                FragmentTypeMask::disjoint(FragmentTypeFlag::UpstreamSinkUnion),
286            ))
287            .all(&inner.db)
288            .await?;
289
290        Ok(table_objs
291            .into_iter()
292            .map(|(table, obj)| ObjectModel(table, obj.unwrap()).into())
293            .collect())
294    }
295}