risingwave_meta/controller/catalog/
list_op.rs1use risingwave_common::catalog::FragmentTypeMask;
16use risingwave_common::id::JobId;
17use risingwave_meta_model::refresh_job::{self, RefreshState};
18use sea_orm::prelude::DateTime;
19
20use super::*;
21use crate::controller::fragment::FragmentTypeMaskExt;
22use crate::controller::utils::load_streaming_jobs_by_ids;
23
24impl CatalogController {
25 pub async fn list_time_travel_table_ids(&self) -> MetaResult<Vec<TableId>> {
26 self.inner.read().await.list_time_travel_table_ids().await
27 }
28
29 pub async fn list_refresh_jobs(&self) -> MetaResult<Vec<refresh_job::Model>> {
30 let inner = self.inner.read().await;
31 Ok(RefreshJob::find().all(&inner.db).await?)
32 }
33
34 pub async fn get_refresh_job_state_by_table_id(
35 &self,
36 table_id: TableId,
37 ) -> MetaResult<RefreshState> {
38 let inner = self.inner.read().await;
39 let (refresh_job_state,): (RefreshState,) = RefreshJob::find_by_id(table_id)
40 .select_only()
41 .select_column(refresh_job::Column::CurrentStatus)
42 .into_tuple()
43 .one(&inner.db)
44 .await?
45 .ok_or_else(|| MetaError::catalog_id_not_found("refresh_job", table_id))?;
46 Ok(refresh_job_state)
47 }
48
49 pub async fn list_refreshable_table_ids(&self) -> MetaResult<Vec<TableId>> {
50 let inner = self.inner.read().await;
51 Ok(Table::find()
52 .select_only()
53 .column(table::Column::TableId)
54 .filter(table::Column::Refreshable.eq(true))
55 .into_tuple()
56 .all(&inner.db)
57 .await?)
58 }
59
60 pub async fn list_stream_job_desc_for_telemetry(
61 &self,
62 ) -> MetaResult<Vec<MetaTelemetryJobDesc>> {
63 let inner = self.inner.read().await;
64 let info: Vec<(TableId, Option<Property>)> = Table::find()
65 .select_only()
66 .column(table::Column::TableId)
67 .column(source::Column::WithProperties)
68 .join(JoinType::LeftJoin, table::Relation::Source.def())
69 .filter(
70 table::Column::TableType
71 .eq(TableType::Table)
72 .or(table::Column::TableType.eq(TableType::MaterializedView)),
73 )
74 .into_tuple()
75 .all(&inner.db)
76 .await?;
77
78 Ok(info
79 .into_iter()
80 .map(|(table_id, properties)| {
81 let connector_info = if let Some(inner_props) = properties {
82 inner_props
83 .inner_ref()
84 .get(UPSTREAM_SOURCE_KEY)
85 .map(|v| v.to_lowercase())
86 } else {
87 None
88 };
89 MetaTelemetryJobDesc {
90 table_id: table_id.as_i32_id(),
91 connector: connector_info,
92 optimization: vec![],
93 }
94 })
95 .collect())
96 }
97
98 pub async fn list_background_creating_jobs(
99 &self,
100 include_initial: bool,
101 database_id: Option<DatabaseId>,
102 ) -> MetaResult<HashSet<JobId>> {
103 Ok(self
104 .list_creating_jobs(include_initial, false, database_id)
105 .await?
106 .into_iter()
107 .map(|(job_id, _, _, create_type, _)| {
108 assert_eq!(create_type, CreateType::Background);
109 job_id
110 })
111 .collect())
112 }
113
114 pub async fn list_creating_jobs(
115 &self,
116 include_initial: bool,
117 include_foreground: bool,
118 database_id: Option<DatabaseId>,
119 ) -> MetaResult<Vec<(JobId, String, DateTime, CreateType, bool)>> {
120 let inner = self.inner.read().await;
121 let create_type_cond = if include_foreground {
122 SimpleExpr::from(true)
123 } else {
124 streaming_job::Column::CreateType.eq(CreateType::Background)
125 };
126 let status_cond = if include_initial {
127 streaming_job::Column::JobStatus.is_in([JobStatus::Initial, JobStatus::Creating])
128 } else {
129 streaming_job::Column::JobStatus.eq(JobStatus::Creating)
130 };
131 let database_cond = database_id
132 .map(|database_id| object::Column::DatabaseId.eq(database_id))
133 .unwrap_or_else(|| SimpleExpr::from(true));
134 let filter_cond = create_type_cond.and(status_cond).and(database_cond);
135 let object_columns = [object::Column::InitializedAt];
136 let streaming_job_columns = [
137 streaming_job::Column::CreateType,
138 streaming_job::Column::IsServerlessBackfill,
139 ];
140 let mut table_info: Vec<(JobId, String, DateTime, CreateType, bool)> = Table::find()
141 .select_only()
142 .columns([table::Column::TableId, table::Column::Definition])
143 .columns(object_columns)
144 .columns(streaming_job_columns)
145 .join(JoinType::LeftJoin, table::Relation::Object1.def())
146 .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
147 .filter(filter_cond.clone())
148 .into_tuple()
149 .all(&inner.db)
150 .await?;
151 let sink_info: Vec<(JobId, String, DateTime, CreateType, bool)> = Sink::find()
152 .select_only()
153 .columns([sink::Column::SinkId, sink::Column::Definition])
154 .columns(object_columns)
155 .columns(streaming_job_columns)
156 .join(JoinType::LeftJoin, sink::Relation::Object.def())
157 .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
158 .filter(filter_cond)
159 .into_tuple()
160 .all(&inner.db)
161 .await?;
162
163 table_info.extend(sink_info.into_iter());
164
165 Ok(table_info)
166 }
167
168 pub async fn list_databases(&self) -> MetaResult<Vec<PbDatabase>> {
169 let inner = self.inner.read().await;
170 inner.list_databases().await
171 }
172
173 pub async fn list_all_object_dependencies(&self) -> MetaResult<Vec<PbObjectDependencies>> {
174 self.list_object_dependencies(true).await
175 }
176
177 pub async fn list_created_object_dependencies(&self) -> MetaResult<Vec<PbObjectDependencies>> {
178 self.list_object_dependencies(false).await
179 }
180
181 pub async fn list_schemas(&self) -> MetaResult<Vec<PbSchema>> {
182 let inner = self.inner.read().await;
183 inner.list_schemas().await
184 }
185
186 pub async fn list_all_state_tables(&self) -> MetaResult<Vec<PbTable>> {
188 let inner = self.inner.read().await;
189 inner.list_all_state_tables().await
190 }
191
192 pub async fn list_readonly_table_ids(&self, schema_id: SchemaId) -> MetaResult<Vec<TableId>> {
193 let inner = self.inner.read().await;
194 let table_ids: Vec<TableId> = Table::find()
195 .select_only()
196 .column(table::Column::TableId)
197 .join(JoinType::InnerJoin, table::Relation::Object1.def())
198 .filter(
199 object::Column::SchemaId
200 .eq(schema_id)
201 .and(table::Column::TableType.ne(TableType::Table)),
202 )
203 .into_tuple()
204 .all(&inner.db)
205 .await?;
206 Ok(table_ids)
207 }
208
209 pub async fn list_dml_table_ids(&self, schema_id: SchemaId) -> MetaResult<Vec<TableId>> {
210 let inner = self.inner.read().await;
211 let table_ids: Vec<TableId> = Table::find()
212 .select_only()
213 .column(table::Column::TableId)
214 .join(JoinType::InnerJoin, table::Relation::Object1.def())
215 .filter(
216 object::Column::SchemaId
217 .eq(schema_id)
218 .and(table::Column::TableType.eq(TableType::Table)),
219 )
220 .into_tuple()
221 .all(&inner.db)
222 .await?;
223 Ok(table_ids)
224 }
225
226 pub async fn list_view_ids(&self, schema_id: SchemaId) -> MetaResult<Vec<ViewId>> {
227 let inner = self.inner.read().await;
228 let view_ids: Vec<ViewId> = View::find()
229 .select_only()
230 .column(view::Column::ViewId)
231 .join(JoinType::InnerJoin, view::Relation::Object.def())
232 .filter(object::Column::SchemaId.eq(schema_id))
233 .into_tuple()
234 .all(&inner.db)
235 .await?;
236 Ok(view_ids)
237 }
238
239 pub async fn list_tables_by_type(&self, table_type: TableType) -> MetaResult<Vec<PbTable>> {
241 let inner = self.inner.read().await;
242 let table_objs = Table::find()
243 .find_also_related(Object)
244 .filter(table::Column::TableType.eq(table_type))
245 .all(&inner.db)
246 .await?;
247 let streaming_jobs = load_streaming_jobs_by_ids(
248 &inner.db,
249 table_objs.iter().map(|(table, _)| table.job_id()),
250 )
251 .await?;
252 Ok(table_objs
253 .into_iter()
254 .map(|(table, obj)| {
255 let job_id = table.job_id();
256 let streaming_job = streaming_jobs.get(&job_id).cloned();
257 ObjectModel(table, obj.unwrap(), streaming_job).into()
258 })
259 .collect())
260 }
261
262 pub async fn list_sources(&self) -> MetaResult<Vec<PbSource>> {
263 let inner = self.inner.read().await;
264 inner.list_sources().await
265 }
266
267 pub async fn list_source_id_with_shared_types(&self) -> MetaResult<HashMap<SourceId, bool>> {
269 let inner = self.inner.read().await;
270 let source_ids: Vec<(SourceId, Option<StreamSourceInfo>)> = Source::find()
271 .select_only()
272 .columns([source::Column::SourceId, source::Column::SourceInfo])
273 .into_tuple()
274 .all(&inner.db)
275 .await?;
276
277 Ok(source_ids
278 .into_iter()
279 .map(|(source_id, info)| {
280 (
281 source_id,
282 info.map(|info| info.to_protobuf().cdc_source_job)
283 .unwrap_or(false),
284 )
285 })
286 .collect())
287 }
288
289 pub async fn list_connections(&self) -> MetaResult<Vec<PbConnection>> {
290 let inner = self.inner.read().await;
291 let conn_objs = Connection::find()
292 .find_also_related(Object)
293 .all(&inner.db)
294 .await?;
295 Ok(conn_objs
296 .into_iter()
297 .map(|(conn, obj)| ObjectModel(conn, obj.unwrap(), None).into())
298 .collect())
299 }
300
301 pub async fn list_source_ids(&self, schema_id: SchemaId) -> MetaResult<Vec<SourceId>> {
302 let inner = self.inner.read().await;
303 let source_ids: Vec<SourceId> = Source::find()
304 .select_only()
305 .column(source::Column::SourceId)
306 .join(JoinType::InnerJoin, source::Relation::Object.def())
307 .filter(object::Column::SchemaId.eq(schema_id))
308 .into_tuple()
309 .all(&inner.db)
310 .await?;
311 Ok(source_ids)
312 }
313
314 pub async fn list_indexes(&self) -> MetaResult<Vec<PbIndex>> {
315 let inner = self.inner.read().await;
316 inner.list_indexes().await
317 }
318
319 pub async fn list_sinks(&self) -> MetaResult<Vec<PbSink>> {
320 let inner = self.inner.read().await;
321 inner.list_sinks().await
322 }
323
324 pub async fn list_subscriptions(&self) -> MetaResult<Vec<PbSubscription>> {
325 let inner = self.inner.read().await;
326 inner.list_subscriptions().await
327 }
328
329 pub async fn list_views(&self) -> MetaResult<Vec<PbView>> {
330 let inner = self.inner.read().await;
331 inner.list_views().await
332 }
333
334 pub async fn list_users(&self) -> MetaResult<Vec<PbUserInfo>> {
335 let inner = self.inner.read().await;
336 inner.list_users().await
337 }
338
339 pub async fn list_functions(&self) -> MetaResult<Vec<PbFunction>> {
340 let inner = self.inner.read().await;
341 inner.list_functions().await
342 }
343
344 pub async fn list_unmigrated_tables(&self) -> MetaResult<Vec<PbTable>> {
347 let inner = self.inner.read().await;
348
349 let table_objs = Table::find()
350 .filter(table::Column::TableType.eq(TableType::Table))
351 .find_also_related(Object)
352 .join(JoinType::InnerJoin, object::Relation::Fragment.def())
353 .filter(FragmentTypeMask::intersects(FragmentTypeFlag::Mview).and(
354 FragmentTypeMask::disjoint(FragmentTypeFlag::UpstreamSinkUnion),
355 ))
356 .all(&inner.db)
357 .await?;
358 let streaming_jobs = load_streaming_jobs_by_ids(
359 &inner.db,
360 table_objs.iter().map(|(table, _)| table.job_id()),
361 )
362 .await?;
363
364 Ok(table_objs
365 .into_iter()
366 .map(|(table, obj)| {
367 let job_id = table.job_id();
368 let streaming_job = streaming_jobs.get(&job_id).cloned();
369 ObjectModel(table, obj.unwrap(), streaming_job).into()
370 })
371 .collect())
372 }
373
374 pub async fn list_sink_ids(&self, database_id: Option<DatabaseId>) -> MetaResult<Vec<SinkId>> {
375 let inner = self.inner.read().await;
376
377 let mut query = Sink::find().select_only().column(sink::Column::SinkId);
378
379 if let Some(database_id) = database_id {
380 query = query
381 .join(JoinType::InnerJoin, sink::Relation::Object.def())
382 .filter(object::Column::DatabaseId.eq(database_id));
383 }
384
385 let sink_ids: Vec<SinkId> = query.into_tuple().all(&inner.db).await?;
386
387 Ok(sink_ids)
388 }
389}