risingwave_meta/controller/catalog/
list_op.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::catalog::FragmentTypeMask;
16use risingwave_common::id::JobId;
17use risingwave_meta_model::refresh_job::{self, RefreshState};
18use sea_orm::prelude::DateTime;
19
20use super::*;
21use crate::controller::fragment::FragmentTypeMaskExt;
22use crate::controller::utils::load_streaming_jobs_by_ids;
23
24impl CatalogController {
25    pub async fn list_time_travel_table_ids(&self) -> MetaResult<Vec<TableId>> {
26        self.inner.read().await.list_time_travel_table_ids().await
27    }
28
29    pub async fn list_refresh_jobs(&self) -> MetaResult<Vec<refresh_job::Model>> {
30        let inner = self.inner.read().await;
31        Ok(RefreshJob::find().all(&inner.db).await?)
32    }
33
34    pub async fn get_refresh_job_state_by_table_id(
35        &self,
36        table_id: TableId,
37    ) -> MetaResult<RefreshState> {
38        let inner = self.inner.read().await;
39        let (refresh_job_state,): (RefreshState,) = RefreshJob::find_by_id(table_id)
40            .select_only()
41            .select_column(refresh_job::Column::CurrentStatus)
42            .into_tuple()
43            .one(&inner.db)
44            .await?
45            .ok_or_else(|| MetaError::catalog_id_not_found("refresh_job", table_id))?;
46        Ok(refresh_job_state)
47    }
48
49    pub async fn list_refreshable_table_ids(&self) -> MetaResult<Vec<TableId>> {
50        let inner = self.inner.read().await;
51        Ok(Table::find()
52            .select_only()
53            .column(table::Column::TableId)
54            .filter(table::Column::Refreshable.eq(true))
55            .into_tuple()
56            .all(&inner.db)
57            .await?)
58    }
59
60    pub async fn list_stream_job_desc_for_telemetry(
61        &self,
62    ) -> MetaResult<Vec<MetaTelemetryJobDesc>> {
63        let inner = self.inner.read().await;
64        let info: Vec<(TableId, Option<Property>)> = Table::find()
65            .select_only()
66            .column(table::Column::TableId)
67            .column(source::Column::WithProperties)
68            .join(JoinType::LeftJoin, table::Relation::Source.def())
69            .filter(
70                table::Column::TableType
71                    .eq(TableType::Table)
72                    .or(table::Column::TableType.eq(TableType::MaterializedView)),
73            )
74            .into_tuple()
75            .all(&inner.db)
76            .await?;
77
78        Ok(info
79            .into_iter()
80            .map(|(table_id, properties)| {
81                let connector_info = if let Some(inner_props) = properties {
82                    inner_props
83                        .inner_ref()
84                        .get(UPSTREAM_SOURCE_KEY)
85                        .map(|v| v.to_lowercase())
86                } else {
87                    None
88                };
89                MetaTelemetryJobDesc {
90                    table_id: table_id.as_i32_id(),
91                    connector: connector_info,
92                    optimization: vec![],
93                }
94            })
95            .collect())
96    }
97
98    pub async fn list_background_creating_jobs(
99        &self,
100        include_initial: bool,
101        database_id: Option<DatabaseId>,
102    ) -> MetaResult<HashSet<JobId>> {
103        Ok(self
104            .list_creating_jobs(include_initial, false, database_id)
105            .await?
106            .into_iter()
107            .map(|(job_id, _, _, create_type, _)| {
108                assert_eq!(create_type, CreateType::Background);
109                job_id
110            })
111            .collect())
112    }
113
114    pub async fn list_creating_jobs(
115        &self,
116        include_initial: bool,
117        include_foreground: bool,
118        database_id: Option<DatabaseId>,
119    ) -> MetaResult<Vec<(JobId, String, DateTime, CreateType, bool)>> {
120        let inner = self.inner.read().await;
121        let create_type_cond = if include_foreground {
122            SimpleExpr::from(true)
123        } else {
124            streaming_job::Column::CreateType.eq(CreateType::Background)
125        };
126        let status_cond = if include_initial {
127            streaming_job::Column::JobStatus.is_in([JobStatus::Initial, JobStatus::Creating])
128        } else {
129            streaming_job::Column::JobStatus.eq(JobStatus::Creating)
130        };
131        let database_cond = database_id
132            .map(|database_id| object::Column::DatabaseId.eq(database_id))
133            .unwrap_or_else(|| SimpleExpr::from(true));
134        let filter_cond = create_type_cond.and(status_cond).and(database_cond);
135        let object_columns = [object::Column::InitializedAt];
136        let streaming_job_columns = [
137            streaming_job::Column::CreateType,
138            streaming_job::Column::IsServerlessBackfill,
139        ];
140        let mut table_info: Vec<(JobId, String, DateTime, CreateType, bool)> = Table::find()
141            .select_only()
142            .columns([table::Column::TableId, table::Column::Definition])
143            .columns(object_columns)
144            .columns(streaming_job_columns)
145            .join(JoinType::LeftJoin, table::Relation::Object1.def())
146            .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
147            .filter(filter_cond.clone())
148            .into_tuple()
149            .all(&inner.db)
150            .await?;
151        let sink_info: Vec<(JobId, String, DateTime, CreateType, bool)> = Sink::find()
152            .select_only()
153            .columns([sink::Column::SinkId, sink::Column::Definition])
154            .columns(object_columns)
155            .columns(streaming_job_columns)
156            .join(JoinType::LeftJoin, sink::Relation::Object.def())
157            .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
158            .filter(filter_cond)
159            .into_tuple()
160            .all(&inner.db)
161            .await?;
162
163        table_info.extend(sink_info.into_iter());
164
165        Ok(table_info)
166    }
167
168    pub async fn list_databases(&self) -> MetaResult<Vec<PbDatabase>> {
169        let inner = self.inner.read().await;
170        inner.list_databases().await
171    }
172
173    pub async fn list_all_object_dependencies(&self) -> MetaResult<Vec<PbObjectDependencies>> {
174        self.list_object_dependencies(true).await
175    }
176
177    pub async fn list_created_object_dependencies(&self) -> MetaResult<Vec<PbObjectDependencies>> {
178        self.list_object_dependencies(false).await
179    }
180
181    pub async fn list_schemas(&self) -> MetaResult<Vec<PbSchema>> {
182        let inner = self.inner.read().await;
183        inner.list_schemas().await
184    }
185
186    /// [`Self::list_tables_by_type`] with all types.
187    pub async fn list_all_state_tables(&self) -> MetaResult<Vec<PbTable>> {
188        let inner = self.inner.read().await;
189        inner.list_all_state_tables().await
190    }
191
192    pub async fn list_readonly_table_ids(&self, schema_id: SchemaId) -> MetaResult<Vec<TableId>> {
193        let inner = self.inner.read().await;
194        let table_ids: Vec<TableId> = Table::find()
195            .select_only()
196            .column(table::Column::TableId)
197            .join(JoinType::InnerJoin, table::Relation::Object1.def())
198            .filter(
199                object::Column::SchemaId
200                    .eq(schema_id)
201                    .and(table::Column::TableType.ne(TableType::Table)),
202            )
203            .into_tuple()
204            .all(&inner.db)
205            .await?;
206        Ok(table_ids)
207    }
208
209    pub async fn list_dml_table_ids(&self, schema_id: SchemaId) -> MetaResult<Vec<TableId>> {
210        let inner = self.inner.read().await;
211        let table_ids: Vec<TableId> = Table::find()
212            .select_only()
213            .column(table::Column::TableId)
214            .join(JoinType::InnerJoin, table::Relation::Object1.def())
215            .filter(
216                object::Column::SchemaId
217                    .eq(schema_id)
218                    .and(table::Column::TableType.eq(TableType::Table)),
219            )
220            .into_tuple()
221            .all(&inner.db)
222            .await?;
223        Ok(table_ids)
224    }
225
226    pub async fn list_view_ids(&self, schema_id: SchemaId) -> MetaResult<Vec<ViewId>> {
227        let inner = self.inner.read().await;
228        let view_ids: Vec<ViewId> = View::find()
229            .select_only()
230            .column(view::Column::ViewId)
231            .join(JoinType::InnerJoin, view::Relation::Object.def())
232            .filter(object::Column::SchemaId.eq(schema_id))
233            .into_tuple()
234            .all(&inner.db)
235            .await?;
236        Ok(view_ids)
237    }
238
239    /// Use [`Self::list_all_state_tables`] to get all types.
240    pub async fn list_tables_by_type(&self, table_type: TableType) -> MetaResult<Vec<PbTable>> {
241        let inner = self.inner.read().await;
242        let table_objs = Table::find()
243            .find_also_related(Object)
244            .filter(table::Column::TableType.eq(table_type))
245            .all(&inner.db)
246            .await?;
247        let streaming_jobs = load_streaming_jobs_by_ids(
248            &inner.db,
249            table_objs.iter().map(|(table, _)| table.job_id()),
250        )
251        .await?;
252        Ok(table_objs
253            .into_iter()
254            .map(|(table, obj)| {
255                let job_id = table.job_id();
256                let streaming_job = streaming_jobs.get(&job_id).cloned();
257                ObjectModel(table, obj.unwrap(), streaming_job).into()
258            })
259            .collect())
260    }
261
262    pub async fn list_sources(&self) -> MetaResult<Vec<PbSource>> {
263        let inner = self.inner.read().await;
264        inner.list_sources().await
265    }
266
267    // Return a hashmap to distinguish whether each source is shared or not.
268    pub async fn list_source_id_with_shared_types(&self) -> MetaResult<HashMap<SourceId, bool>> {
269        let inner = self.inner.read().await;
270        let source_ids: Vec<(SourceId, Option<StreamSourceInfo>)> = Source::find()
271            .select_only()
272            .columns([source::Column::SourceId, source::Column::SourceInfo])
273            .into_tuple()
274            .all(&inner.db)
275            .await?;
276
277        Ok(source_ids
278            .into_iter()
279            .map(|(source_id, info)| {
280                (
281                    source_id,
282                    info.map(|info| info.to_protobuf().cdc_source_job)
283                        .unwrap_or(false),
284                )
285            })
286            .collect())
287    }
288
289    pub async fn list_connections(&self) -> MetaResult<Vec<PbConnection>> {
290        let inner = self.inner.read().await;
291        let conn_objs = Connection::find()
292            .find_also_related(Object)
293            .all(&inner.db)
294            .await?;
295        Ok(conn_objs
296            .into_iter()
297            .map(|(conn, obj)| ObjectModel(conn, obj.unwrap(), None).into())
298            .collect())
299    }
300
301    pub async fn list_source_ids(&self, schema_id: SchemaId) -> MetaResult<Vec<SourceId>> {
302        let inner = self.inner.read().await;
303        let source_ids: Vec<SourceId> = Source::find()
304            .select_only()
305            .column(source::Column::SourceId)
306            .join(JoinType::InnerJoin, source::Relation::Object.def())
307            .filter(object::Column::SchemaId.eq(schema_id))
308            .into_tuple()
309            .all(&inner.db)
310            .await?;
311        Ok(source_ids)
312    }
313
314    pub async fn list_indexes(&self) -> MetaResult<Vec<PbIndex>> {
315        let inner = self.inner.read().await;
316        inner.list_indexes().await
317    }
318
319    pub async fn list_sinks(&self) -> MetaResult<Vec<PbSink>> {
320        let inner = self.inner.read().await;
321        inner.list_sinks().await
322    }
323
324    pub async fn list_subscriptions(&self) -> MetaResult<Vec<PbSubscription>> {
325        let inner = self.inner.read().await;
326        inner.list_subscriptions().await
327    }
328
329    pub async fn list_views(&self) -> MetaResult<Vec<PbView>> {
330        let inner = self.inner.read().await;
331        inner.list_views().await
332    }
333
334    pub async fn list_users(&self) -> MetaResult<Vec<PbUserInfo>> {
335        let inner = self.inner.read().await;
336        inner.list_users().await
337    }
338
339    pub async fn list_functions(&self) -> MetaResult<Vec<PbFunction>> {
340        let inner = self.inner.read().await;
341        inner.list_functions().await
342    }
343
344    /// `Unmigrated` refers to table-fragments that have not yet been migrated to the new plan (for now, this means
345    /// table-fragments that do not use `UpstreamSinkUnion` operator to receive multiple upstream sinks)
346    pub async fn list_unmigrated_tables(&self) -> MetaResult<Vec<PbTable>> {
347        let inner = self.inner.read().await;
348
349        let table_objs = Table::find()
350            .filter(table::Column::TableType.eq(TableType::Table))
351            .find_also_related(Object)
352            .join(JoinType::InnerJoin, object::Relation::Fragment.def())
353            .filter(FragmentTypeMask::intersects(FragmentTypeFlag::Mview).and(
354                FragmentTypeMask::disjoint(FragmentTypeFlag::UpstreamSinkUnion),
355            ))
356            .all(&inner.db)
357            .await?;
358        let streaming_jobs = load_streaming_jobs_by_ids(
359            &inner.db,
360            table_objs.iter().map(|(table, _)| table.job_id()),
361        )
362        .await?;
363
364        Ok(table_objs
365            .into_iter()
366            .map(|(table, obj)| {
367                let job_id = table.job_id();
368                let streaming_job = streaming_jobs.get(&job_id).cloned();
369                ObjectModel(table, obj.unwrap(), streaming_job).into()
370            })
371            .collect())
372    }
373
374    pub async fn list_sink_ids(&self, database_id: Option<DatabaseId>) -> MetaResult<Vec<SinkId>> {
375        let inner = self.inner.read().await;
376
377        let mut query = Sink::find().select_only().column(sink::Column::SinkId);
378
379        if let Some(database_id) = database_id {
380            query = query
381                .join(JoinType::InnerJoin, sink::Relation::Object.def())
382                .filter(object::Column::DatabaseId.eq(database_id));
383        }
384
385        let sink_ids: Vec<SinkId> = query.into_tuple().all(&inner.db).await?;
386
387        Ok(sink_ids)
388    }
389}