risingwave_meta/controller/catalog/
list_op.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use sea_orm::prelude::DateTime;
16
17use super::*;
18
19impl CatalogController {
20    pub async fn list_time_travel_table_ids(&self) -> MetaResult<Vec<TableId>> {
21        self.inner.read().await.list_time_travel_table_ids().await
22    }
23
24    pub async fn list_stream_job_desc_for_telemetry(
25        &self,
26    ) -> MetaResult<Vec<MetaTelemetryJobDesc>> {
27        let inner = self.inner.read().await;
28        let info: Vec<(TableId, Option<Property>)> = Table::find()
29            .select_only()
30            .column(table::Column::TableId)
31            .column(source::Column::WithProperties)
32            .join(JoinType::LeftJoin, table::Relation::Source.def())
33            .filter(
34                table::Column::TableType
35                    .eq(TableType::Table)
36                    .or(table::Column::TableType.eq(TableType::MaterializedView)),
37            )
38            .into_tuple()
39            .all(&inner.db)
40            .await?;
41
42        Ok(info
43            .into_iter()
44            .map(|(table_id, properties)| {
45                let connector_info = if let Some(inner_props) = properties {
46                    inner_props
47                        .inner_ref()
48                        .get(UPSTREAM_SOURCE_KEY)
49                        .map(|v| v.to_lowercase())
50                } else {
51                    None
52                };
53                MetaTelemetryJobDesc {
54                    table_id,
55                    connector: connector_info,
56                    optimization: vec![],
57                }
58            })
59            .collect())
60    }
61
62    pub async fn list_background_creating_jobs(
63        &self,
64        include_initial: bool,
65    ) -> MetaResult<Vec<(ObjectId, String, DateTime)>> {
66        let inner = self.inner.read().await;
67        let status_cond = if include_initial {
68            streaming_job::Column::JobStatus.is_in([JobStatus::Initial, JobStatus::Creating])
69        } else {
70            streaming_job::Column::JobStatus.eq(JobStatus::Creating)
71        };
72        let mut table_info: Vec<(ObjectId, String, DateTime)> = Table::find()
73            .select_only()
74            .columns([table::Column::TableId, table::Column::Definition])
75            .column(object::Column::InitializedAt)
76            .join(JoinType::LeftJoin, table::Relation::Object1.def())
77            .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
78            .filter(
79                streaming_job::Column::CreateType
80                    .eq(CreateType::Background)
81                    .and(status_cond.clone()),
82            )
83            .into_tuple()
84            .all(&inner.db)
85            .await?;
86        let sink_info: Vec<(ObjectId, String, DateTime)> = Sink::find()
87            .select_only()
88            .columns([sink::Column::SinkId, sink::Column::Definition])
89            .column(object::Column::InitializedAt)
90            .join(JoinType::LeftJoin, sink::Relation::Object.def())
91            .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
92            .filter(
93                streaming_job::Column::CreateType
94                    .eq(CreateType::Background)
95                    .and(status_cond),
96            )
97            .into_tuple()
98            .all(&inner.db)
99            .await?;
100
101        table_info.extend(sink_info.into_iter());
102
103        Ok(table_info)
104    }
105
106    pub async fn list_databases(&self) -> MetaResult<Vec<PbDatabase>> {
107        let inner = self.inner.read().await;
108        inner.list_databases().await
109    }
110
111    pub async fn list_all_object_dependencies(&self) -> MetaResult<Vec<PbObjectDependencies>> {
112        self.list_object_dependencies(true).await
113    }
114
115    pub async fn list_created_object_dependencies(&self) -> MetaResult<Vec<PbObjectDependencies>> {
116        self.list_object_dependencies(false).await
117    }
118
119    pub async fn list_schemas(&self) -> MetaResult<Vec<PbSchema>> {
120        let inner = self.inner.read().await;
121        inner.list_schemas().await
122    }
123
124    /// [`Self::list_tables_by_type`] with all types.
125    pub async fn list_all_state_tables(&self) -> MetaResult<Vec<PbTable>> {
126        let inner = self.inner.read().await;
127        inner.list_all_state_tables().await
128    }
129
130    pub async fn list_readonly_table_ids(&self, schema_id: SchemaId) -> MetaResult<Vec<TableId>> {
131        let inner = self.inner.read().await;
132        let table_ids: Vec<TableId> = Table::find()
133            .select_only()
134            .column(table::Column::TableId)
135            .join(JoinType::InnerJoin, table::Relation::Object1.def())
136            .filter(
137                object::Column::SchemaId
138                    .eq(schema_id)
139                    .and(table::Column::TableType.ne(TableType::Table)),
140            )
141            .into_tuple()
142            .all(&inner.db)
143            .await?;
144        Ok(table_ids)
145    }
146
147    pub async fn list_dml_table_ids(&self, schema_id: SchemaId) -> MetaResult<Vec<TableId>> {
148        let inner = self.inner.read().await;
149        let table_ids: Vec<TableId> = Table::find()
150            .select_only()
151            .column(table::Column::TableId)
152            .join(JoinType::InnerJoin, table::Relation::Object1.def())
153            .filter(
154                object::Column::SchemaId
155                    .eq(schema_id)
156                    .and(table::Column::TableType.eq(TableType::Table)),
157            )
158            .into_tuple()
159            .all(&inner.db)
160            .await?;
161        Ok(table_ids)
162    }
163
164    pub async fn list_view_ids(&self, schema_id: SchemaId) -> MetaResult<Vec<ViewId>> {
165        let inner = self.inner.read().await;
166        let view_ids: Vec<ViewId> = View::find()
167            .select_only()
168            .column(view::Column::ViewId)
169            .join(JoinType::InnerJoin, view::Relation::Object.def())
170            .filter(object::Column::SchemaId.eq(schema_id))
171            .into_tuple()
172            .all(&inner.db)
173            .await?;
174        Ok(view_ids)
175    }
176
177    /// Use [`Self::list_all_state_tables`] to get all types.
178    pub async fn list_tables_by_type(&self, table_type: TableType) -> MetaResult<Vec<PbTable>> {
179        let inner = self.inner.read().await;
180        let table_objs = Table::find()
181            .find_also_related(Object)
182            .filter(table::Column::TableType.eq(table_type))
183            .all(&inner.db)
184            .await?;
185        Ok(table_objs
186            .into_iter()
187            .map(|(table, obj)| ObjectModel(table, obj.unwrap()).into())
188            .collect())
189    }
190
191    pub async fn list_sources(&self) -> MetaResult<Vec<PbSource>> {
192        let inner = self.inner.read().await;
193        inner.list_sources().await
194    }
195
196    // Return a hashmap to distinguish whether each source is shared or not.
197    pub async fn list_source_id_with_shared_types(&self) -> MetaResult<HashMap<SourceId, bool>> {
198        let inner = self.inner.read().await;
199        let source_ids: Vec<(SourceId, Option<StreamSourceInfo>)> = Source::find()
200            .select_only()
201            .columns([source::Column::SourceId, source::Column::SourceInfo])
202            .into_tuple()
203            .all(&inner.db)
204            .await?;
205
206        Ok(source_ids
207            .into_iter()
208            .map(|(source_id, info)| {
209                (
210                    source_id,
211                    info.map(|info| info.to_protobuf().cdc_source_job)
212                        .unwrap_or(false),
213                )
214            })
215            .collect())
216    }
217
218    pub async fn list_connections(&self) -> MetaResult<Vec<PbConnection>> {
219        let inner = self.inner.read().await;
220        let conn_objs = Connection::find()
221            .find_also_related(Object)
222            .all(&inner.db)
223            .await?;
224        Ok(conn_objs
225            .into_iter()
226            .map(|(conn, obj)| ObjectModel(conn, obj.unwrap()).into())
227            .collect())
228    }
229
230    pub async fn list_source_ids(&self, schema_id: SchemaId) -> MetaResult<Vec<SourceId>> {
231        let inner = self.inner.read().await;
232        let source_ids: Vec<SourceId> = Source::find()
233            .select_only()
234            .column(source::Column::SourceId)
235            .join(JoinType::InnerJoin, source::Relation::Object.def())
236            .filter(object::Column::SchemaId.eq(schema_id))
237            .into_tuple()
238            .all(&inner.db)
239            .await?;
240        Ok(source_ids)
241    }
242
243    pub async fn list_indexes(&self) -> MetaResult<Vec<PbIndex>> {
244        let inner = self.inner.read().await;
245        inner.list_indexes().await
246    }
247
248    pub async fn list_sinks(&self) -> MetaResult<Vec<PbSink>> {
249        let inner = self.inner.read().await;
250        inner.list_sinks().await
251    }
252
253    pub async fn list_subscriptions(&self) -> MetaResult<Vec<PbSubscription>> {
254        let inner = self.inner.read().await;
255        inner.list_subscriptions().await
256    }
257
258    pub async fn list_views(&self) -> MetaResult<Vec<PbView>> {
259        let inner = self.inner.read().await;
260        inner.list_views().await
261    }
262
263    pub async fn list_users(&self) -> MetaResult<Vec<PbUserInfo>> {
264        let inner = self.inner.read().await;
265        inner.list_users().await
266    }
267
268    pub async fn list_functions(&self) -> MetaResult<Vec<PbFunction>> {
269        let inner = self.inner.read().await;
270        inner.list_functions().await
271    }
272}