risingwave_meta/controller/catalog/
list_op.rs1use risingwave_common::catalog::FragmentTypeMask;
16use risingwave_common::id::JobId;
17use risingwave_meta_model::refresh_job::{self, RefreshState};
18use risingwave_pb::meta::ObjectDependency as PbObjectDependency;
19use sea_orm::prelude::DateTime;
20
21use super::*;
22use crate::controller::fragment::FragmentTypeMaskExt;
23use crate::controller::utils::load_streaming_jobs_by_ids;
24
25impl CatalogController {
26 pub async fn list_time_travel_table_ids(&self) -> MetaResult<Vec<TableId>> {
27 self.inner.read().await.list_time_travel_table_ids().await
28 }
29
30 pub async fn list_refresh_jobs(&self) -> MetaResult<Vec<refresh_job::Model>> {
31 let inner = self.inner.read().await;
32 Ok(RefreshJob::find().all(&inner.db).await?)
33 }
34
35 pub async fn get_refresh_job_state_by_table_id(
36 &self,
37 table_id: TableId,
38 ) -> MetaResult<RefreshState> {
39 let inner = self.inner.read().await;
40 let (refresh_job_state,): (RefreshState,) = RefreshJob::find_by_id(table_id)
41 .select_only()
42 .select_column(refresh_job::Column::CurrentStatus)
43 .into_tuple()
44 .one(&inner.db)
45 .await?
46 .ok_or_else(|| MetaError::catalog_id_not_found("refresh_job", table_id))?;
47 Ok(refresh_job_state)
48 }
49
50 pub async fn list_refreshable_table_ids(&self) -> MetaResult<Vec<TableId>> {
51 let inner = self.inner.read().await;
52 Ok(Table::find()
53 .select_only()
54 .column(table::Column::TableId)
55 .filter(table::Column::Refreshable.eq(true))
56 .into_tuple()
57 .all(&inner.db)
58 .await?)
59 }
60
61 pub async fn list_stream_job_desc_for_telemetry(
62 &self,
63 ) -> MetaResult<Vec<MetaTelemetryJobDesc>> {
64 let inner = self.inner.read().await;
65 let info: Vec<(TableId, Option<Property>)> = Table::find()
66 .select_only()
67 .column(table::Column::TableId)
68 .column(source::Column::WithProperties)
69 .join(JoinType::LeftJoin, table::Relation::Source.def())
70 .filter(
71 table::Column::TableType
72 .eq(TableType::Table)
73 .or(table::Column::TableType.eq(TableType::MaterializedView)),
74 )
75 .into_tuple()
76 .all(&inner.db)
77 .await?;
78
79 Ok(info
80 .into_iter()
81 .map(|(table_id, properties)| {
82 let connector_info = if let Some(inner_props) = properties {
83 inner_props
84 .inner_ref()
85 .get(UPSTREAM_SOURCE_KEY)
86 .map(|v| v.to_lowercase())
87 } else {
88 None
89 };
90 MetaTelemetryJobDesc {
91 table_id,
92 connector: connector_info,
93 optimization: vec![],
94 }
95 })
96 .collect())
97 }
98
99 pub async fn list_background_creating_jobs(
100 &self,
101 include_initial: bool,
102 database_id: Option<DatabaseId>,
103 ) -> MetaResult<HashSet<JobId>> {
104 Ok(self
105 .list_creating_jobs(include_initial, false, database_id)
106 .await?
107 .into_iter()
108 .map(|(job_id, _, _, create_type, _)| {
109 assert_eq!(create_type, CreateType::Background);
110 job_id
111 })
112 .collect())
113 }
114
115 pub async fn list_creating_jobs(
116 &self,
117 include_initial: bool,
118 include_foreground: bool,
119 database_id: Option<DatabaseId>,
120 ) -> MetaResult<Vec<(JobId, String, DateTime, CreateType, bool)>> {
121 let inner = self.inner.read().await;
122 let create_type_cond = if include_foreground {
123 SimpleExpr::from(true)
124 } else {
125 streaming_job::Column::CreateType.eq(CreateType::Background)
126 };
127 let status_cond = if include_initial {
128 streaming_job::Column::JobStatus.is_in([JobStatus::Initial, JobStatus::Creating])
129 } else {
130 streaming_job::Column::JobStatus.eq(JobStatus::Creating)
131 };
132 let database_cond = database_id
133 .map(|database_id| object::Column::DatabaseId.eq(database_id))
134 .unwrap_or_else(|| SimpleExpr::from(true));
135 let filter_cond = create_type_cond.and(status_cond).and(database_cond);
136 let object_columns = [object::Column::InitializedAt];
137 let streaming_job_columns = [
138 streaming_job::Column::CreateType,
139 streaming_job::Column::IsServerlessBackfill,
140 ];
141 let mut table_info: Vec<(JobId, String, DateTime, CreateType, bool)> = Table::find()
142 .select_only()
143 .columns([table::Column::TableId, table::Column::Definition])
144 .columns(object_columns)
145 .columns(streaming_job_columns)
146 .join(JoinType::LeftJoin, table::Relation::Object1.def())
147 .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
148 .filter(filter_cond.clone())
149 .into_tuple()
150 .all(&inner.db)
151 .await?;
152 let sink_info: Vec<(JobId, String, DateTime, CreateType, bool)> = Sink::find()
153 .select_only()
154 .columns([sink::Column::SinkId, sink::Column::Definition])
155 .columns(object_columns)
156 .columns(streaming_job_columns)
157 .join(JoinType::LeftJoin, sink::Relation::Object.def())
158 .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
159 .filter(filter_cond)
160 .into_tuple()
161 .all(&inner.db)
162 .await?;
163
164 table_info.extend(sink_info.into_iter());
165
166 Ok(table_info)
167 }
168
169 pub async fn list_databases(&self) -> MetaResult<Vec<PbDatabase>> {
170 let inner = self.inner.read().await;
171 inner.list_databases().await
172 }
173
174 pub async fn list_all_object_dependencies(&self) -> MetaResult<Vec<PbObjectDependency>> {
175 let inner = self.inner.read().await;
176 let txn = inner.db.begin().await?;
177 let dependencies = list_object_dependencies(&txn, true).await?;
178 txn.commit().await?;
179 Ok(dependencies)
180 }
181
182 pub async fn list_created_object_dependencies(&self) -> MetaResult<Vec<PbObjectDependency>> {
183 let inner = self.inner.read().await;
184 let txn = inner.db.begin().await?;
185 let dependencies = list_object_dependencies(&txn, false).await?;
186 txn.commit().await?;
187 Ok(dependencies)
188 }
189
190 pub async fn list_schemas(&self) -> MetaResult<Vec<PbSchema>> {
191 let inner = self.inner.read().await;
192 inner.list_schemas().await
193 }
194
195 pub async fn list_all_state_tables(&self) -> MetaResult<Vec<PbTable>> {
197 let inner = self.inner.read().await;
198 inner.list_all_state_tables().await
199 }
200
201 pub async fn list_readonly_table_ids(&self, schema_id: SchemaId) -> MetaResult<Vec<TableId>> {
202 let inner = self.inner.read().await;
203 let table_ids: Vec<TableId> = Table::find()
204 .select_only()
205 .column(table::Column::TableId)
206 .join(JoinType::InnerJoin, table::Relation::Object1.def())
207 .filter(
208 object::Column::SchemaId
209 .eq(schema_id)
210 .and(table::Column::TableType.ne(TableType::Table)),
211 )
212 .into_tuple()
213 .all(&inner.db)
214 .await?;
215 Ok(table_ids)
216 }
217
218 pub async fn list_dml_table_ids(&self, schema_id: SchemaId) -> MetaResult<Vec<TableId>> {
219 let inner = self.inner.read().await;
220 let table_ids: Vec<TableId> = Table::find()
221 .select_only()
222 .column(table::Column::TableId)
223 .join(JoinType::InnerJoin, table::Relation::Object1.def())
224 .filter(
225 object::Column::SchemaId
226 .eq(schema_id)
227 .and(table::Column::TableType.eq(TableType::Table)),
228 )
229 .into_tuple()
230 .all(&inner.db)
231 .await?;
232 Ok(table_ids)
233 }
234
235 pub async fn list_view_ids(&self, schema_id: SchemaId) -> MetaResult<Vec<ViewId>> {
236 let inner = self.inner.read().await;
237 let view_ids: Vec<ViewId> = View::find()
238 .select_only()
239 .column(view::Column::ViewId)
240 .join(JoinType::InnerJoin, view::Relation::Object.def())
241 .filter(object::Column::SchemaId.eq(schema_id))
242 .into_tuple()
243 .all(&inner.db)
244 .await?;
245 Ok(view_ids)
246 }
247
248 pub async fn list_tables_by_type(&self, table_type: TableType) -> MetaResult<Vec<PbTable>> {
250 let inner = self.inner.read().await;
251 let table_objs = Table::find()
252 .find_also_related(Object)
253 .filter(table::Column::TableType.eq(table_type))
254 .all(&inner.db)
255 .await?;
256 let streaming_jobs = load_streaming_jobs_by_ids(
257 &inner.db,
258 table_objs.iter().map(|(table, _)| table.job_id()),
259 )
260 .await?;
261 Ok(table_objs
262 .into_iter()
263 .map(|(table, obj)| {
264 let job_id = table.job_id();
265 let streaming_job = streaming_jobs.get(&job_id).cloned();
266 ObjectModel(table, obj.unwrap(), streaming_job).into()
267 })
268 .collect())
269 }
270
271 pub async fn list_sources(&self) -> MetaResult<Vec<PbSource>> {
272 let inner = self.inner.read().await;
273 inner.list_sources().await
274 }
275
276 pub async fn list_source_id_with_shared_types(&self) -> MetaResult<HashMap<SourceId, bool>> {
278 let inner = self.inner.read().await;
279 let source_ids: Vec<(SourceId, Option<StreamSourceInfo>)> = Source::find()
280 .select_only()
281 .columns([source::Column::SourceId, source::Column::SourceInfo])
282 .into_tuple()
283 .all(&inner.db)
284 .await?;
285
286 Ok(source_ids
287 .into_iter()
288 .map(|(source_id, info)| {
289 (
290 source_id,
291 info.map(|info| info.to_protobuf().cdc_source_job)
292 .unwrap_or(false),
293 )
294 })
295 .collect())
296 }
297
298 pub async fn list_connections(&self) -> MetaResult<Vec<PbConnection>> {
299 let inner = self.inner.read().await;
300 let conn_objs = Connection::find()
301 .find_also_related(Object)
302 .all(&inner.db)
303 .await?;
304 Ok(conn_objs
305 .into_iter()
306 .map(|(conn, obj)| ObjectModel(conn, obj.unwrap(), None).into())
307 .collect())
308 }
309
310 pub async fn list_source_ids(&self, schema_id: SchemaId) -> MetaResult<Vec<SourceId>> {
311 let inner = self.inner.read().await;
312 let source_ids: Vec<SourceId> = Source::find()
313 .select_only()
314 .column(source::Column::SourceId)
315 .join(JoinType::InnerJoin, source::Relation::Object.def())
316 .filter(object::Column::SchemaId.eq(schema_id))
317 .into_tuple()
318 .all(&inner.db)
319 .await?;
320 Ok(source_ids)
321 }
322
323 pub async fn list_indexes(&self) -> MetaResult<Vec<PbIndex>> {
324 let inner = self.inner.read().await;
325 inner.list_indexes().await
326 }
327
328 pub async fn list_sinks(&self) -> MetaResult<Vec<PbSink>> {
329 let inner = self.inner.read().await;
330 inner.list_sinks().await
331 }
332
333 pub async fn list_subscriptions(&self) -> MetaResult<Vec<PbSubscription>> {
334 let inner = self.inner.read().await;
335 inner.list_subscriptions().await
336 }
337
338 pub async fn list_views(&self) -> MetaResult<Vec<PbView>> {
339 let inner = self.inner.read().await;
340 inner.list_views().await
341 }
342
343 pub async fn list_users(&self) -> MetaResult<Vec<PbUserInfo>> {
344 let inner = self.inner.read().await;
345 inner.list_users().await
346 }
347
348 pub async fn list_functions(&self) -> MetaResult<Vec<PbFunction>> {
349 let inner = self.inner.read().await;
350 inner.list_functions().await
351 }
352
353 pub async fn list_unmigrated_tables(&self) -> MetaResult<Vec<PbTable>> {
356 let inner = self.inner.read().await;
357
358 let table_objs = Table::find()
359 .filter(table::Column::TableType.eq(TableType::Table))
360 .find_also_related(Object)
361 .join(JoinType::InnerJoin, object::Relation::Fragment.def())
362 .filter(FragmentTypeMask::intersects(FragmentTypeFlag::Mview).and(
363 FragmentTypeMask::disjoint(FragmentTypeFlag::UpstreamSinkUnion),
364 ))
365 .all(&inner.db)
366 .await?;
367 let streaming_jobs = load_streaming_jobs_by_ids(
368 &inner.db,
369 table_objs.iter().map(|(table, _)| table.job_id()),
370 )
371 .await?;
372
373 Ok(table_objs
374 .into_iter()
375 .map(|(table, obj)| {
376 let job_id = table.job_id();
377 let streaming_job = streaming_jobs.get(&job_id).cloned();
378 ObjectModel(table, obj.unwrap(), streaming_job).into()
379 })
380 .collect())
381 }
382
383 pub async fn list_sink_ids(&self, database_id: Option<DatabaseId>) -> MetaResult<Vec<SinkId>> {
384 let inner = self.inner.read().await;
385
386 let mut query = Sink::find().select_only().column(sink::Column::SinkId);
387
388 if let Some(database_id) = database_id {
389 query = query
390 .join(JoinType::InnerJoin, sink::Relation::Object.def())
391 .filter(object::Column::DatabaseId.eq(database_id));
392 }
393
394 let sink_ids: Vec<SinkId> = query.into_tuple().all(&inner.db).await?;
395
396 Ok(sink_ids)
397 }
398}