risingwave_meta/controller/catalog/
list_op.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::catalog::FragmentTypeMask;
16use risingwave_common::id::JobId;
17use risingwave_meta_model::refresh_job::{self, RefreshState};
18use risingwave_pb::meta::ObjectDependency as PbObjectDependency;
19use sea_orm::prelude::DateTime;
20
21use super::*;
22use crate::controller::fragment::FragmentTypeMaskExt;
23use crate::controller::utils::load_streaming_jobs_by_ids;
24
25impl CatalogController {
26    pub async fn list_time_travel_table_ids(&self) -> MetaResult<Vec<TableId>> {
27        self.inner.read().await.list_time_travel_table_ids().await
28    }
29
30    pub async fn list_refresh_jobs(&self) -> MetaResult<Vec<refresh_job::Model>> {
31        let inner = self.inner.read().await;
32        Ok(RefreshJob::find().all(&inner.db).await?)
33    }
34
35    pub async fn get_refresh_job_state_by_table_id(
36        &self,
37        table_id: TableId,
38    ) -> MetaResult<RefreshState> {
39        let inner = self.inner.read().await;
40        let (refresh_job_state,): (RefreshState,) = RefreshJob::find_by_id(table_id)
41            .select_only()
42            .select_column(refresh_job::Column::CurrentStatus)
43            .into_tuple()
44            .one(&inner.db)
45            .await?
46            .ok_or_else(|| MetaError::catalog_id_not_found("refresh_job", table_id))?;
47        Ok(refresh_job_state)
48    }
49
50    pub async fn list_refreshable_table_ids(&self) -> MetaResult<Vec<TableId>> {
51        let inner = self.inner.read().await;
52        Ok(Table::find()
53            .select_only()
54            .column(table::Column::TableId)
55            .filter(table::Column::Refreshable.eq(true))
56            .into_tuple()
57            .all(&inner.db)
58            .await?)
59    }
60
61    pub async fn list_stream_job_desc_for_telemetry(
62        &self,
63    ) -> MetaResult<Vec<MetaTelemetryJobDesc>> {
64        let inner = self.inner.read().await;
65        let info: Vec<(TableId, Option<Property>)> = Table::find()
66            .select_only()
67            .column(table::Column::TableId)
68            .column(source::Column::WithProperties)
69            .join(JoinType::LeftJoin, table::Relation::Source.def())
70            .filter(
71                table::Column::TableType
72                    .eq(TableType::Table)
73                    .or(table::Column::TableType.eq(TableType::MaterializedView)),
74            )
75            .into_tuple()
76            .all(&inner.db)
77            .await?;
78
79        Ok(info
80            .into_iter()
81            .map(|(table_id, properties)| {
82                let connector_info = if let Some(inner_props) = properties {
83                    inner_props
84                        .inner_ref()
85                        .get(UPSTREAM_SOURCE_KEY)
86                        .map(|v| v.to_lowercase())
87                } else {
88                    None
89                };
90                MetaTelemetryJobDesc {
91                    table_id,
92                    connector: connector_info,
93                    optimization: vec![],
94                }
95            })
96            .collect())
97    }
98
99    pub async fn list_background_creating_jobs(
100        &self,
101        include_initial: bool,
102        database_id: Option<DatabaseId>,
103    ) -> MetaResult<HashSet<JobId>> {
104        Ok(self
105            .list_creating_jobs(include_initial, false, database_id)
106            .await?
107            .into_iter()
108            .map(|(job_id, _, _, create_type, _)| {
109                assert_eq!(create_type, CreateType::Background);
110                job_id
111            })
112            .collect())
113    }
114
115    pub async fn list_creating_jobs(
116        &self,
117        include_initial: bool,
118        include_foreground: bool,
119        database_id: Option<DatabaseId>,
120    ) -> MetaResult<Vec<(JobId, String, DateTime, CreateType, bool)>> {
121        let inner = self.inner.read().await;
122        let create_type_cond = if include_foreground {
123            SimpleExpr::from(true)
124        } else {
125            streaming_job::Column::CreateType.eq(CreateType::Background)
126        };
127        let status_cond = if include_initial {
128            streaming_job::Column::JobStatus.is_in([JobStatus::Initial, JobStatus::Creating])
129        } else {
130            streaming_job::Column::JobStatus.eq(JobStatus::Creating)
131        };
132        let database_cond = database_id
133            .map(|database_id| object::Column::DatabaseId.eq(database_id))
134            .unwrap_or_else(|| SimpleExpr::from(true));
135        let filter_cond = create_type_cond.and(status_cond).and(database_cond);
136        let object_columns = [object::Column::InitializedAt];
137        let streaming_job_columns = [
138            streaming_job::Column::CreateType,
139            streaming_job::Column::IsServerlessBackfill,
140        ];
141        let mut table_info: Vec<(JobId, String, DateTime, CreateType, bool)> = Table::find()
142            .select_only()
143            .columns([table::Column::TableId, table::Column::Definition])
144            .columns(object_columns)
145            .columns(streaming_job_columns)
146            .join(JoinType::LeftJoin, table::Relation::Object1.def())
147            .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
148            .filter(filter_cond.clone())
149            .into_tuple()
150            .all(&inner.db)
151            .await?;
152        let sink_info: Vec<(JobId, String, DateTime, CreateType, bool)> = Sink::find()
153            .select_only()
154            .columns([sink::Column::SinkId, sink::Column::Definition])
155            .columns(object_columns)
156            .columns(streaming_job_columns)
157            .join(JoinType::LeftJoin, sink::Relation::Object.def())
158            .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
159            .filter(filter_cond)
160            .into_tuple()
161            .all(&inner.db)
162            .await?;
163
164        table_info.extend(sink_info.into_iter());
165
166        Ok(table_info)
167    }
168
169    pub async fn list_databases(&self) -> MetaResult<Vec<PbDatabase>> {
170        let inner = self.inner.read().await;
171        inner.list_databases().await
172    }
173
174    pub async fn list_all_object_dependencies(&self) -> MetaResult<Vec<PbObjectDependency>> {
175        let inner = self.inner.read().await;
176        let txn = inner.db.begin().await?;
177        let dependencies = list_object_dependencies(&txn, true).await?;
178        txn.commit().await?;
179        Ok(dependencies)
180    }
181
182    pub async fn list_created_object_dependencies(&self) -> MetaResult<Vec<PbObjectDependency>> {
183        let inner = self.inner.read().await;
184        let txn = inner.db.begin().await?;
185        let dependencies = list_object_dependencies(&txn, false).await?;
186        txn.commit().await?;
187        Ok(dependencies)
188    }
189
190    pub async fn list_schemas(&self) -> MetaResult<Vec<PbSchema>> {
191        let inner = self.inner.read().await;
192        inner.list_schemas().await
193    }
194
195    /// [`Self::list_tables_by_type`] with all types.
196    pub async fn list_all_state_tables(&self) -> MetaResult<Vec<PbTable>> {
197        let inner = self.inner.read().await;
198        inner.list_all_state_tables().await
199    }
200
201    pub async fn list_readonly_table_ids(&self, schema_id: SchemaId) -> MetaResult<Vec<TableId>> {
202        let inner = self.inner.read().await;
203        let table_ids: Vec<TableId> = Table::find()
204            .select_only()
205            .column(table::Column::TableId)
206            .join(JoinType::InnerJoin, table::Relation::Object1.def())
207            .filter(
208                object::Column::SchemaId
209                    .eq(schema_id)
210                    .and(table::Column::TableType.ne(TableType::Table)),
211            )
212            .into_tuple()
213            .all(&inner.db)
214            .await?;
215        Ok(table_ids)
216    }
217
218    pub async fn list_dml_table_ids(&self, schema_id: SchemaId) -> MetaResult<Vec<TableId>> {
219        let inner = self.inner.read().await;
220        let table_ids: Vec<TableId> = Table::find()
221            .select_only()
222            .column(table::Column::TableId)
223            .join(JoinType::InnerJoin, table::Relation::Object1.def())
224            .filter(
225                object::Column::SchemaId
226                    .eq(schema_id)
227                    .and(table::Column::TableType.eq(TableType::Table)),
228            )
229            .into_tuple()
230            .all(&inner.db)
231            .await?;
232        Ok(table_ids)
233    }
234
235    pub async fn list_view_ids(&self, schema_id: SchemaId) -> MetaResult<Vec<ViewId>> {
236        let inner = self.inner.read().await;
237        let view_ids: Vec<ViewId> = View::find()
238            .select_only()
239            .column(view::Column::ViewId)
240            .join(JoinType::InnerJoin, view::Relation::Object.def())
241            .filter(object::Column::SchemaId.eq(schema_id))
242            .into_tuple()
243            .all(&inner.db)
244            .await?;
245        Ok(view_ids)
246    }
247
248    /// Use [`Self::list_all_state_tables`] to get all types.
249    pub async fn list_tables_by_type(&self, table_type: TableType) -> MetaResult<Vec<PbTable>> {
250        let inner = self.inner.read().await;
251        let table_objs = Table::find()
252            .find_also_related(Object)
253            .filter(table::Column::TableType.eq(table_type))
254            .all(&inner.db)
255            .await?;
256        let streaming_jobs = load_streaming_jobs_by_ids(
257            &inner.db,
258            table_objs.iter().map(|(table, _)| table.job_id()),
259        )
260        .await?;
261        Ok(table_objs
262            .into_iter()
263            .map(|(table, obj)| {
264                let job_id = table.job_id();
265                let streaming_job = streaming_jobs.get(&job_id).cloned();
266                ObjectModel(table, obj.unwrap(), streaming_job).into()
267            })
268            .collect())
269    }
270
271    pub async fn list_sources(&self) -> MetaResult<Vec<PbSource>> {
272        let inner = self.inner.read().await;
273        inner.list_sources().await
274    }
275
276    // Return a hashmap to distinguish whether each source is shared or not.
277    pub async fn list_source_id_with_shared_types(&self) -> MetaResult<HashMap<SourceId, bool>> {
278        let inner = self.inner.read().await;
279        let source_ids: Vec<(SourceId, Option<StreamSourceInfo>)> = Source::find()
280            .select_only()
281            .columns([source::Column::SourceId, source::Column::SourceInfo])
282            .into_tuple()
283            .all(&inner.db)
284            .await?;
285
286        Ok(source_ids
287            .into_iter()
288            .map(|(source_id, info)| {
289                (
290                    source_id,
291                    info.map(|info| info.to_protobuf().cdc_source_job)
292                        .unwrap_or(false),
293                )
294            })
295            .collect())
296    }
297
298    pub async fn list_connections(&self) -> MetaResult<Vec<PbConnection>> {
299        let inner = self.inner.read().await;
300        let conn_objs = Connection::find()
301            .find_also_related(Object)
302            .all(&inner.db)
303            .await?;
304        Ok(conn_objs
305            .into_iter()
306            .map(|(conn, obj)| ObjectModel(conn, obj.unwrap(), None).into())
307            .collect())
308    }
309
310    pub async fn list_source_ids(&self, schema_id: SchemaId) -> MetaResult<Vec<SourceId>> {
311        let inner = self.inner.read().await;
312        let source_ids: Vec<SourceId> = Source::find()
313            .select_only()
314            .column(source::Column::SourceId)
315            .join(JoinType::InnerJoin, source::Relation::Object.def())
316            .filter(object::Column::SchemaId.eq(schema_id))
317            .into_tuple()
318            .all(&inner.db)
319            .await?;
320        Ok(source_ids)
321    }
322
323    pub async fn list_indexes(&self) -> MetaResult<Vec<PbIndex>> {
324        let inner = self.inner.read().await;
325        inner.list_indexes().await
326    }
327
328    pub async fn list_sinks(&self) -> MetaResult<Vec<PbSink>> {
329        let inner = self.inner.read().await;
330        inner.list_sinks().await
331    }
332
333    pub async fn list_subscriptions(&self) -> MetaResult<Vec<PbSubscription>> {
334        let inner = self.inner.read().await;
335        inner.list_subscriptions().await
336    }
337
338    pub async fn list_views(&self) -> MetaResult<Vec<PbView>> {
339        let inner = self.inner.read().await;
340        inner.list_views().await
341    }
342
343    pub async fn list_users(&self) -> MetaResult<Vec<PbUserInfo>> {
344        let inner = self.inner.read().await;
345        inner.list_users().await
346    }
347
348    pub async fn list_functions(&self) -> MetaResult<Vec<PbFunction>> {
349        let inner = self.inner.read().await;
350        inner.list_functions().await
351    }
352
353    /// `Unmigrated` refers to table-fragments that have not yet been migrated to the new plan (for now, this means
354    /// table-fragments that do not use `UpstreamSinkUnion` operator to receive multiple upstream sinks)
355    pub async fn list_unmigrated_tables(&self) -> MetaResult<Vec<PbTable>> {
356        let inner = self.inner.read().await;
357
358        let table_objs = Table::find()
359            .filter(table::Column::TableType.eq(TableType::Table))
360            .find_also_related(Object)
361            .join(JoinType::InnerJoin, object::Relation::Fragment.def())
362            .filter(FragmentTypeMask::intersects(FragmentTypeFlag::Mview).and(
363                FragmentTypeMask::disjoint(FragmentTypeFlag::UpstreamSinkUnion),
364            ))
365            .all(&inner.db)
366            .await?;
367        let streaming_jobs = load_streaming_jobs_by_ids(
368            &inner.db,
369            table_objs.iter().map(|(table, _)| table.job_id()),
370        )
371        .await?;
372
373        Ok(table_objs
374            .into_iter()
375            .map(|(table, obj)| {
376                let job_id = table.job_id();
377                let streaming_job = streaming_jobs.get(&job_id).cloned();
378                ObjectModel(table, obj.unwrap(), streaming_job).into()
379            })
380            .collect())
381    }
382
383    pub async fn list_sink_ids(&self, database_id: Option<DatabaseId>) -> MetaResult<Vec<SinkId>> {
384        let inner = self.inner.read().await;
385
386        let mut query = Sink::find().select_only().column(sink::Column::SinkId);
387
388        if let Some(database_id) = database_id {
389            query = query
390                .join(JoinType::InnerJoin, sink::Relation::Object.def())
391                .filter(object::Column::DatabaseId.eq(database_id));
392        }
393
394        let sink_ids: Vec<SinkId> = query.into_tuple().all(&inner.db).await?;
395
396        Ok(sink_ids)
397    }
398}