risingwave_meta/controller/catalog/
list_op.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use sea_orm::prelude::DateTime;
16
17use super::*;
18
19impl CatalogController {
20    pub async fn list_time_travel_table_ids(&self) -> MetaResult<Vec<TableId>> {
21        self.inner.read().await.list_time_travel_table_ids().await
22    }
23
24    pub async fn list_stream_job_desc_for_telemetry(
25        &self,
26    ) -> MetaResult<Vec<MetaTelemetryJobDesc>> {
27        let inner = self.inner.read().await;
28        let info: Vec<(TableId, Option<Property>)> = Table::find()
29            .select_only()
30            .column(table::Column::TableId)
31            .column(source::Column::WithProperties)
32            .join(JoinType::LeftJoin, table::Relation::Source.def())
33            .filter(
34                table::Column::TableType
35                    .eq(TableType::Table)
36                    .or(table::Column::TableType.eq(TableType::MaterializedView)),
37            )
38            .into_tuple()
39            .all(&inner.db)
40            .await?;
41
42        Ok(info
43            .into_iter()
44            .map(|(table_id, properties)| {
45                let connector_info = if let Some(inner_props) = properties {
46                    inner_props
47                        .inner_ref()
48                        .get(UPSTREAM_SOURCE_KEY)
49                        .map(|v| v.to_lowercase())
50                } else {
51                    None
52                };
53                MetaTelemetryJobDesc {
54                    table_id,
55                    connector: connector_info,
56                    optimization: vec![],
57                }
58            })
59            .collect())
60    }
61
62    pub async fn list_background_creating_jobs(
63        &self,
64        include_initial: bool,
65    ) -> MetaResult<Vec<(ObjectId, String, DateTime)>> {
66        let inner = self.inner.read().await;
67        let status_cond = if include_initial {
68            streaming_job::Column::JobStatus.is_in([JobStatus::Initial, JobStatus::Creating])
69        } else {
70            streaming_job::Column::JobStatus.eq(JobStatus::Creating)
71        };
72        let mut table_info: Vec<(ObjectId, String, DateTime)> = Table::find()
73            .select_only()
74            .columns([table::Column::TableId, table::Column::Definition])
75            .column(object::Column::InitializedAt)
76            .join(JoinType::LeftJoin, table::Relation::Object1.def())
77            .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
78            .filter(
79                table::Column::TableType
80                    .eq(TableType::MaterializedView)
81                    .and(
82                        streaming_job::Column::CreateType
83                            .eq(CreateType::Background)
84                            .and(status_cond.clone()),
85                    ),
86            )
87            .into_tuple()
88            .all(&inner.db)
89            .await?;
90        let sink_info: Vec<(ObjectId, String, DateTime)> = Sink::find()
91            .select_only()
92            .columns([sink::Column::SinkId, sink::Column::Definition])
93            .column(object::Column::InitializedAt)
94            .join(JoinType::LeftJoin, sink::Relation::Object.def())
95            .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
96            .filter(
97                streaming_job::Column::CreateType
98                    .eq(CreateType::Background)
99                    .and(status_cond),
100            )
101            .into_tuple()
102            .all(&inner.db)
103            .await?;
104
105        table_info.extend(sink_info.into_iter());
106
107        Ok(table_info)
108    }
109
110    pub async fn list_databases(&self) -> MetaResult<Vec<PbDatabase>> {
111        let inner = self.inner.read().await;
112        inner.list_databases().await
113    }
114
115    pub async fn list_all_object_dependencies(&self) -> MetaResult<Vec<PbObjectDependencies>> {
116        self.list_object_dependencies(true).await
117    }
118
119    pub async fn list_created_object_dependencies(&self) -> MetaResult<Vec<PbObjectDependencies>> {
120        self.list_object_dependencies(false).await
121    }
122
123    pub async fn list_schemas(&self) -> MetaResult<Vec<PbSchema>> {
124        let inner = self.inner.read().await;
125        inner.list_schemas().await
126    }
127
128    /// [`Self::list_tables_by_type`] with all types.
129    pub async fn list_all_state_tables(&self) -> MetaResult<Vec<PbTable>> {
130        let inner = self.inner.read().await;
131        inner.list_all_state_tables().await
132    }
133
134    pub async fn list_readonly_table_ids(&self, schema_id: SchemaId) -> MetaResult<Vec<TableId>> {
135        let inner = self.inner.read().await;
136        let table_ids: Vec<TableId> = Table::find()
137            .select_only()
138            .column(table::Column::TableId)
139            .join(JoinType::InnerJoin, table::Relation::Object1.def())
140            .filter(
141                object::Column::SchemaId
142                    .eq(schema_id)
143                    .and(table::Column::TableType.ne(TableType::Table)),
144            )
145            .into_tuple()
146            .all(&inner.db)
147            .await?;
148        Ok(table_ids)
149    }
150
151    pub async fn list_dml_table_ids(&self, schema_id: SchemaId) -> MetaResult<Vec<TableId>> {
152        let inner = self.inner.read().await;
153        let table_ids: Vec<TableId> = Table::find()
154            .select_only()
155            .column(table::Column::TableId)
156            .join(JoinType::InnerJoin, table::Relation::Object1.def())
157            .filter(
158                object::Column::SchemaId
159                    .eq(schema_id)
160                    .and(table::Column::TableType.eq(TableType::Table)),
161            )
162            .into_tuple()
163            .all(&inner.db)
164            .await?;
165        Ok(table_ids)
166    }
167
168    pub async fn list_view_ids(&self, schema_id: SchemaId) -> MetaResult<Vec<ViewId>> {
169        let inner = self.inner.read().await;
170        let view_ids: Vec<ViewId> = View::find()
171            .select_only()
172            .column(view::Column::ViewId)
173            .join(JoinType::InnerJoin, view::Relation::Object.def())
174            .filter(object::Column::SchemaId.eq(schema_id))
175            .into_tuple()
176            .all(&inner.db)
177            .await?;
178        Ok(view_ids)
179    }
180
181    /// Use [`Self::list_all_state_tables`] to get all types.
182    pub async fn list_tables_by_type(&self, table_type: TableType) -> MetaResult<Vec<PbTable>> {
183        let inner = self.inner.read().await;
184        let table_objs = Table::find()
185            .find_also_related(Object)
186            .filter(table::Column::TableType.eq(table_type))
187            .all(&inner.db)
188            .await?;
189        Ok(table_objs
190            .into_iter()
191            .map(|(table, obj)| ObjectModel(table, obj.unwrap()).into())
192            .collect())
193    }
194
195    pub async fn list_sources(&self) -> MetaResult<Vec<PbSource>> {
196        let inner = self.inner.read().await;
197        inner.list_sources().await
198    }
199
200    // Return a hashmap to distinguish whether each source is shared or not.
201    pub async fn list_source_id_with_shared_types(&self) -> MetaResult<HashMap<SourceId, bool>> {
202        let inner = self.inner.read().await;
203        let source_ids: Vec<(SourceId, Option<StreamSourceInfo>)> = Source::find()
204            .select_only()
205            .columns([source::Column::SourceId, source::Column::SourceInfo])
206            .into_tuple()
207            .all(&inner.db)
208            .await?;
209
210        Ok(source_ids
211            .into_iter()
212            .map(|(source_id, info)| {
213                (
214                    source_id,
215                    info.map(|info| info.to_protobuf().cdc_source_job)
216                        .unwrap_or(false),
217                )
218            })
219            .collect())
220    }
221
222    pub async fn list_connections(&self) -> MetaResult<Vec<PbConnection>> {
223        let inner = self.inner.read().await;
224        let conn_objs = Connection::find()
225            .find_also_related(Object)
226            .all(&inner.db)
227            .await?;
228        Ok(conn_objs
229            .into_iter()
230            .map(|(conn, obj)| ObjectModel(conn, obj.unwrap()).into())
231            .collect())
232    }
233
234    pub async fn list_source_ids(&self, schema_id: SchemaId) -> MetaResult<Vec<SourceId>> {
235        let inner = self.inner.read().await;
236        let source_ids: Vec<SourceId> = Source::find()
237            .select_only()
238            .column(source::Column::SourceId)
239            .join(JoinType::InnerJoin, source::Relation::Object.def())
240            .filter(object::Column::SchemaId.eq(schema_id))
241            .into_tuple()
242            .all(&inner.db)
243            .await?;
244        Ok(source_ids)
245    }
246
247    pub async fn list_indexes(&self) -> MetaResult<Vec<PbIndex>> {
248        let inner = self.inner.read().await;
249        inner.list_indexes().await
250    }
251
252    pub async fn list_sinks(&self) -> MetaResult<Vec<PbSink>> {
253        let inner = self.inner.read().await;
254        inner.list_sinks().await
255    }
256
257    pub async fn list_subscriptions(&self) -> MetaResult<Vec<PbSubscription>> {
258        let inner = self.inner.read().await;
259        inner.list_subscriptions().await
260    }
261
262    pub async fn list_views(&self) -> MetaResult<Vec<PbView>> {
263        let inner = self.inner.read().await;
264        inner.list_views().await
265    }
266
267    pub async fn list_users(&self) -> MetaResult<Vec<PbUserInfo>> {
268        let inner = self.inner.read().await;
269        inner.list_users().await
270    }
271
272    pub async fn list_functions(&self) -> MetaResult<Vec<PbFunction>> {
273        let inner = self.inner.read().await;
274        inner.list_functions().await
275    }
276}