risingwave_meta/controller/catalog/
list_op.rs1use risingwave_common::catalog::FragmentTypeMask;
16use risingwave_common::id::JobId;
17use risingwave_meta_model::refresh_job::{self, RefreshState};
18use sea_orm::prelude::DateTime;
19
20use super::*;
21use crate::controller::fragment::FragmentTypeMaskExt;
22
23impl CatalogController {
24 pub async fn list_time_travel_table_ids(&self) -> MetaResult<Vec<TableId>> {
25 self.inner.read().await.list_time_travel_table_ids().await
26 }
27
28 pub async fn list_refresh_jobs(&self) -> MetaResult<Vec<refresh_job::Model>> {
29 let inner = self.inner.read().await;
30 Ok(RefreshJob::find().all(&inner.db).await?)
31 }
32
33 pub async fn get_refresh_job_state_by_table_id(
34 &self,
35 table_id: TableId,
36 ) -> MetaResult<RefreshState> {
37 let inner = self.inner.read().await;
38 let (refresh_job_state,): (RefreshState,) = RefreshJob::find_by_id(table_id)
39 .select_only()
40 .select_column(refresh_job::Column::CurrentStatus)
41 .into_tuple()
42 .one(&inner.db)
43 .await?
44 .ok_or_else(|| MetaError::catalog_id_not_found("refresh_job", table_id))?;
45 Ok(refresh_job_state)
46 }
47
48 pub async fn list_refreshable_table_ids(&self) -> MetaResult<Vec<TableId>> {
49 let inner = self.inner.read().await;
50 Ok(Table::find()
51 .select_only()
52 .column(table::Column::TableId)
53 .filter(table::Column::Refreshable.eq(true))
54 .into_tuple()
55 .all(&inner.db)
56 .await?)
57 }
58
59 pub async fn list_stream_job_desc_for_telemetry(
60 &self,
61 ) -> MetaResult<Vec<MetaTelemetryJobDesc>> {
62 let inner = self.inner.read().await;
63 let info: Vec<(TableId, Option<Property>)> = Table::find()
64 .select_only()
65 .column(table::Column::TableId)
66 .column(source::Column::WithProperties)
67 .join(JoinType::LeftJoin, table::Relation::Source.def())
68 .filter(
69 table::Column::TableType
70 .eq(TableType::Table)
71 .or(table::Column::TableType.eq(TableType::MaterializedView)),
72 )
73 .into_tuple()
74 .all(&inner.db)
75 .await?;
76
77 Ok(info
78 .into_iter()
79 .map(|(table_id, properties)| {
80 let connector_info = if let Some(inner_props) = properties {
81 inner_props
82 .inner_ref()
83 .get(UPSTREAM_SOURCE_KEY)
84 .map(|v| v.to_lowercase())
85 } else {
86 None
87 };
88 MetaTelemetryJobDesc {
89 table_id: table_id.as_i32_id(),
90 connector: connector_info,
91 optimization: vec![],
92 }
93 })
94 .collect())
95 }
96
97 pub async fn list_background_creating_jobs(
98 &self,
99 include_initial: bool,
100 database_id: Option<DatabaseId>,
101 ) -> MetaResult<Vec<(JobId, String, DateTime)>> {
102 let inner = self.inner.read().await;
103 let status_cond = if include_initial {
104 streaming_job::Column::JobStatus.is_in([JobStatus::Initial, JobStatus::Creating])
105 } else {
106 streaming_job::Column::JobStatus.eq(JobStatus::Creating)
107 };
108 let mut table_info: Vec<(JobId, String, DateTime)> = Table::find()
109 .select_only()
110 .columns([table::Column::TableId, table::Column::Definition])
111 .column(object::Column::InitializedAt)
112 .join(JoinType::LeftJoin, table::Relation::Object1.def())
113 .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
114 .filter(
115 streaming_job::Column::CreateType
116 .eq(CreateType::Background)
117 .and(status_cond.clone())
118 .and(
119 database_id
120 .map(|database_id| object::Column::DatabaseId.eq(database_id))
121 .unwrap_or_else(|| SimpleExpr::from(true)),
122 ),
123 )
124 .into_tuple()
125 .all(&inner.db)
126 .await?;
127 let sink_info: Vec<(JobId, String, DateTime)> = Sink::find()
128 .select_only()
129 .columns([sink::Column::SinkId, sink::Column::Definition])
130 .column(object::Column::InitializedAt)
131 .join(JoinType::LeftJoin, sink::Relation::Object.def())
132 .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
133 .filter(
134 streaming_job::Column::CreateType
135 .eq(CreateType::Background)
136 .and(status_cond)
137 .and(
138 database_id
139 .map(|database_id| object::Column::DatabaseId.eq(database_id))
140 .unwrap_or_else(|| SimpleExpr::from(true)),
141 ),
142 )
143 .into_tuple()
144 .all(&inner.db)
145 .await?;
146
147 table_info.extend(sink_info.into_iter());
148
149 Ok(table_info)
150 }
151
152 pub async fn list_databases(&self) -> MetaResult<Vec<PbDatabase>> {
153 let inner = self.inner.read().await;
154 inner.list_databases().await
155 }
156
157 pub async fn list_all_object_dependencies(&self) -> MetaResult<Vec<PbObjectDependencies>> {
158 self.list_object_dependencies(true).await
159 }
160
161 pub async fn list_created_object_dependencies(&self) -> MetaResult<Vec<PbObjectDependencies>> {
162 self.list_object_dependencies(false).await
163 }
164
165 pub async fn list_schemas(&self) -> MetaResult<Vec<PbSchema>> {
166 let inner = self.inner.read().await;
167 inner.list_schemas().await
168 }
169
170 pub async fn list_all_state_tables(&self) -> MetaResult<Vec<PbTable>> {
172 let inner = self.inner.read().await;
173 inner.list_all_state_tables().await
174 }
175
176 pub async fn list_readonly_table_ids(&self, schema_id: SchemaId) -> MetaResult<Vec<TableId>> {
177 let inner = self.inner.read().await;
178 let table_ids: Vec<TableId> = Table::find()
179 .select_only()
180 .column(table::Column::TableId)
181 .join(JoinType::InnerJoin, table::Relation::Object1.def())
182 .filter(
183 object::Column::SchemaId
184 .eq(schema_id)
185 .and(table::Column::TableType.ne(TableType::Table)),
186 )
187 .into_tuple()
188 .all(&inner.db)
189 .await?;
190 Ok(table_ids)
191 }
192
193 pub async fn list_dml_table_ids(&self, schema_id: SchemaId) -> MetaResult<Vec<TableId>> {
194 let inner = self.inner.read().await;
195 let table_ids: Vec<TableId> = Table::find()
196 .select_only()
197 .column(table::Column::TableId)
198 .join(JoinType::InnerJoin, table::Relation::Object1.def())
199 .filter(
200 object::Column::SchemaId
201 .eq(schema_id)
202 .and(table::Column::TableType.eq(TableType::Table)),
203 )
204 .into_tuple()
205 .all(&inner.db)
206 .await?;
207 Ok(table_ids)
208 }
209
210 pub async fn list_view_ids(&self, schema_id: SchemaId) -> MetaResult<Vec<ViewId>> {
211 let inner = self.inner.read().await;
212 let view_ids: Vec<ViewId> = View::find()
213 .select_only()
214 .column(view::Column::ViewId)
215 .join(JoinType::InnerJoin, view::Relation::Object.def())
216 .filter(object::Column::SchemaId.eq(schema_id))
217 .into_tuple()
218 .all(&inner.db)
219 .await?;
220 Ok(view_ids)
221 }
222
223 pub async fn list_tables_by_type(&self, table_type: TableType) -> MetaResult<Vec<PbTable>> {
225 let inner = self.inner.read().await;
226 let table_objs = Table::find()
227 .find_also_related(Object)
228 .filter(table::Column::TableType.eq(table_type))
229 .all(&inner.db)
230 .await?;
231 Ok(table_objs
232 .into_iter()
233 .map(|(table, obj)| ObjectModel(table, obj.unwrap()).into())
234 .collect())
235 }
236
237 pub async fn list_sources(&self) -> MetaResult<Vec<PbSource>> {
238 let inner = self.inner.read().await;
239 inner.list_sources().await
240 }
241
242 pub async fn list_source_id_with_shared_types(&self) -> MetaResult<HashMap<SourceId, bool>> {
244 let inner = self.inner.read().await;
245 let source_ids: Vec<(SourceId, Option<StreamSourceInfo>)> = Source::find()
246 .select_only()
247 .columns([source::Column::SourceId, source::Column::SourceInfo])
248 .into_tuple()
249 .all(&inner.db)
250 .await?;
251
252 Ok(source_ids
253 .into_iter()
254 .map(|(source_id, info)| {
255 (
256 source_id,
257 info.map(|info| info.to_protobuf().cdc_source_job)
258 .unwrap_or(false),
259 )
260 })
261 .collect())
262 }
263
264 pub async fn list_connections(&self) -> MetaResult<Vec<PbConnection>> {
265 let inner = self.inner.read().await;
266 let conn_objs = Connection::find()
267 .find_also_related(Object)
268 .all(&inner.db)
269 .await?;
270 Ok(conn_objs
271 .into_iter()
272 .map(|(conn, obj)| ObjectModel(conn, obj.unwrap()).into())
273 .collect())
274 }
275
276 pub async fn list_source_ids(&self, schema_id: SchemaId) -> MetaResult<Vec<SourceId>> {
277 let inner = self.inner.read().await;
278 let source_ids: Vec<SourceId> = Source::find()
279 .select_only()
280 .column(source::Column::SourceId)
281 .join(JoinType::InnerJoin, source::Relation::Object.def())
282 .filter(object::Column::SchemaId.eq(schema_id))
283 .into_tuple()
284 .all(&inner.db)
285 .await?;
286 Ok(source_ids)
287 }
288
289 pub async fn list_indexes(&self) -> MetaResult<Vec<PbIndex>> {
290 let inner = self.inner.read().await;
291 inner.list_indexes().await
292 }
293
294 pub async fn list_sinks(&self) -> MetaResult<Vec<PbSink>> {
295 let inner = self.inner.read().await;
296 inner.list_sinks().await
297 }
298
299 pub async fn list_subscriptions(&self) -> MetaResult<Vec<PbSubscription>> {
300 let inner = self.inner.read().await;
301 inner.list_subscriptions().await
302 }
303
304 pub async fn list_views(&self) -> MetaResult<Vec<PbView>> {
305 let inner = self.inner.read().await;
306 inner.list_views().await
307 }
308
309 pub async fn list_users(&self) -> MetaResult<Vec<PbUserInfo>> {
310 let inner = self.inner.read().await;
311 inner.list_users().await
312 }
313
314 pub async fn list_functions(&self) -> MetaResult<Vec<PbFunction>> {
315 let inner = self.inner.read().await;
316 inner.list_functions().await
317 }
318
319 pub async fn list_unmigrated_tables(&self) -> MetaResult<Vec<PbTable>> {
322 let inner = self.inner.read().await;
323
324 let table_objs = Table::find()
325 .filter(table::Column::TableType.eq(TableType::Table))
326 .find_also_related(Object)
327 .join(JoinType::InnerJoin, object::Relation::Fragment.def())
328 .filter(FragmentTypeMask::intersects(FragmentTypeFlag::Mview).and(
329 FragmentTypeMask::disjoint(FragmentTypeFlag::UpstreamSinkUnion),
330 ))
331 .all(&inner.db)
332 .await?;
333
334 Ok(table_objs
335 .into_iter()
336 .map(|(table, obj)| ObjectModel(table, obj.unwrap()).into())
337 .collect())
338 }
339
340 pub async fn list_sink_ids(&self, database_id: Option<DatabaseId>) -> MetaResult<Vec<SinkId>> {
341 let inner = self.inner.read().await;
342
343 let mut query = Sink::find().select_only().column(sink::Column::SinkId);
344
345 if let Some(database_id) = database_id {
346 query = query
347 .join(JoinType::InnerJoin, sink::Relation::Object.def())
348 .filter(object::Column::DatabaseId.eq(database_id));
349 }
350
351 let sink_ids: Vec<SinkId> = query.into_tuple().all(&inner.db).await?;
352
353 Ok(sink_ids)
354 }
355}