risingwave_meta/controller/catalog/
list_op.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use super::*;
16
17impl CatalogController {
18    pub async fn list_time_travel_table_ids(&self) -> MetaResult<Vec<TableId>> {
19        self.inner.read().await.list_time_travel_table_ids().await
20    }
21
22    pub async fn list_stream_job_desc_for_telemetry(
23        &self,
24    ) -> MetaResult<Vec<MetaTelemetryJobDesc>> {
25        let inner = self.inner.read().await;
26        let info: Vec<(TableId, Option<Property>)> = Table::find()
27            .select_only()
28            .column(table::Column::TableId)
29            .column(source::Column::WithProperties)
30            .join(JoinType::LeftJoin, table::Relation::Source.def())
31            .filter(
32                table::Column::TableType
33                    .eq(TableType::Table)
34                    .or(table::Column::TableType.eq(TableType::MaterializedView)),
35            )
36            .into_tuple()
37            .all(&inner.db)
38            .await?;
39
40        Ok(info
41            .into_iter()
42            .map(|(table_id, properties)| {
43                let connector_info = if let Some(inner_props) = properties {
44                    inner_props
45                        .inner_ref()
46                        .get(UPSTREAM_SOURCE_KEY)
47                        .map(|v| v.to_lowercase())
48                } else {
49                    None
50                };
51                MetaTelemetryJobDesc {
52                    table_id,
53                    connector: connector_info,
54                    optimization: vec![],
55                }
56            })
57            .collect())
58    }
59
60    pub async fn list_background_creating_mviews(
61        &self,
62        include_initial: bool,
63    ) -> MetaResult<Vec<table::Model>> {
64        let inner = self.inner.read().await;
65        let status_cond = if include_initial {
66            streaming_job::Column::JobStatus.is_in([JobStatus::Initial, JobStatus::Creating])
67        } else {
68            streaming_job::Column::JobStatus.eq(JobStatus::Creating)
69        };
70        let tables = Table::find()
71            .join(JoinType::LeftJoin, table::Relation::Object1.def())
72            .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
73            .filter(
74                table::Column::TableType
75                    .eq(TableType::MaterializedView)
76                    .and(
77                        streaming_job::Column::CreateType
78                            .eq(CreateType::Background)
79                            .and(status_cond),
80                    ),
81            )
82            .all(&inner.db)
83            .await?;
84        Ok(tables)
85    }
86
87    pub async fn list_databases(&self) -> MetaResult<Vec<PbDatabase>> {
88        let inner = self.inner.read().await;
89        inner.list_databases().await
90    }
91
92    pub async fn list_all_object_dependencies(&self) -> MetaResult<Vec<PbObjectDependencies>> {
93        self.list_object_dependencies(true).await
94    }
95
96    pub async fn list_created_object_dependencies(&self) -> MetaResult<Vec<PbObjectDependencies>> {
97        self.list_object_dependencies(false).await
98    }
99
100    pub async fn list_schemas(&self) -> MetaResult<Vec<PbSchema>> {
101        let inner = self.inner.read().await;
102        inner.list_schemas().await
103    }
104
105    pub async fn list_all_state_tables(&self) -> MetaResult<Vec<PbTable>> {
106        let inner = self.inner.read().await;
107        inner.list_all_state_tables().await
108    }
109
110    pub async fn list_readonly_table_ids(&self, schema_id: SchemaId) -> MetaResult<Vec<TableId>> {
111        let inner = self.inner.read().await;
112        let table_ids: Vec<TableId> = Table::find()
113            .select_only()
114            .column(table::Column::TableId)
115            .join(JoinType::InnerJoin, table::Relation::Object1.def())
116            .filter(
117                object::Column::SchemaId
118                    .eq(schema_id)
119                    .and(table::Column::TableType.ne(TableType::Table)),
120            )
121            .into_tuple()
122            .all(&inner.db)
123            .await?;
124        Ok(table_ids)
125    }
126
127    pub async fn list_dml_table_ids(&self, schema_id: SchemaId) -> MetaResult<Vec<TableId>> {
128        let inner = self.inner.read().await;
129        let table_ids: Vec<TableId> = Table::find()
130            .select_only()
131            .column(table::Column::TableId)
132            .join(JoinType::InnerJoin, table::Relation::Object1.def())
133            .filter(
134                object::Column::SchemaId
135                    .eq(schema_id)
136                    .and(table::Column::TableType.eq(TableType::Table)),
137            )
138            .into_tuple()
139            .all(&inner.db)
140            .await?;
141        Ok(table_ids)
142    }
143
144    pub async fn list_view_ids(&self, schema_id: SchemaId) -> MetaResult<Vec<ViewId>> {
145        let inner = self.inner.read().await;
146        let view_ids: Vec<ViewId> = View::find()
147            .select_only()
148            .column(view::Column::ViewId)
149            .join(JoinType::InnerJoin, view::Relation::Object.def())
150            .filter(object::Column::SchemaId.eq(schema_id))
151            .into_tuple()
152            .all(&inner.db)
153            .await?;
154        Ok(view_ids)
155    }
156
157    pub async fn list_tables_by_type(&self, table_type: TableType) -> MetaResult<Vec<PbTable>> {
158        let inner = self.inner.read().await;
159        let table_objs = Table::find()
160            .find_also_related(Object)
161            .filter(table::Column::TableType.eq(table_type))
162            .all(&inner.db)
163            .await?;
164        Ok(table_objs
165            .into_iter()
166            .map(|(table, obj)| ObjectModel(table, obj.unwrap()).into())
167            .collect())
168    }
169
170    pub async fn list_sources(&self) -> MetaResult<Vec<PbSource>> {
171        let inner = self.inner.read().await;
172        inner.list_sources().await
173    }
174
175    // Return a hashmap to distinguish whether each source is shared or not.
176    pub async fn list_source_id_with_shared_types(&self) -> MetaResult<HashMap<SourceId, bool>> {
177        let inner = self.inner.read().await;
178        let source_ids: Vec<(SourceId, Option<StreamSourceInfo>)> = Source::find()
179            .select_only()
180            .columns([source::Column::SourceId, source::Column::SourceInfo])
181            .into_tuple()
182            .all(&inner.db)
183            .await?;
184
185        Ok(source_ids
186            .into_iter()
187            .map(|(source_id, info)| {
188                (
189                    source_id,
190                    info.map(|info| info.to_protobuf().cdc_source_job)
191                        .unwrap_or(false),
192                )
193            })
194            .collect())
195    }
196
197    pub async fn list_connections(&self) -> MetaResult<Vec<PbConnection>> {
198        let inner = self.inner.read().await;
199        let conn_objs = Connection::find()
200            .find_also_related(Object)
201            .all(&inner.db)
202            .await?;
203        Ok(conn_objs
204            .into_iter()
205            .map(|(conn, obj)| ObjectModel(conn, obj.unwrap()).into())
206            .collect())
207    }
208
209    pub async fn list_source_ids(&self, schema_id: SchemaId) -> MetaResult<Vec<SourceId>> {
210        let inner = self.inner.read().await;
211        let source_ids: Vec<SourceId> = Source::find()
212            .select_only()
213            .column(source::Column::SourceId)
214            .join(JoinType::InnerJoin, source::Relation::Object.def())
215            .filter(object::Column::SchemaId.eq(schema_id))
216            .into_tuple()
217            .all(&inner.db)
218            .await?;
219        Ok(source_ids)
220    }
221
222    pub async fn list_sinks(&self) -> MetaResult<Vec<PbSink>> {
223        let inner = self.inner.read().await;
224        inner.list_sinks().await
225    }
226
227    pub async fn list_subscriptions(&self) -> MetaResult<Vec<PbSubscription>> {
228        let inner = self.inner.read().await;
229        inner.list_subscriptions().await
230    }
231
232    pub async fn list_views(&self) -> MetaResult<Vec<PbView>> {
233        let inner = self.inner.read().await;
234        inner.list_views().await
235    }
236
237    pub async fn list_users(&self) -> MetaResult<Vec<PbUserInfo>> {
238        let inner = self.inner.read().await;
239        inner.list_users().await
240    }
241}