risingwave_meta/controller/catalog/
list_op.rs1use risingwave_common::catalog::FragmentTypeMask;
16use risingwave_common::id::JobId;
17use sea_orm::prelude::DateTime;
18
19use super::*;
20use crate::controller::fragment::FragmentTypeMaskExt;
21
22impl CatalogController {
23 pub async fn list_time_travel_table_ids(&self) -> MetaResult<Vec<TableId>> {
24 self.inner.read().await.list_time_travel_table_ids().await
25 }
26
27 pub async fn list_stream_job_desc_for_telemetry(
28 &self,
29 ) -> MetaResult<Vec<MetaTelemetryJobDesc>> {
30 let inner = self.inner.read().await;
31 let info: Vec<(TableId, Option<Property>)> = Table::find()
32 .select_only()
33 .column(table::Column::TableId)
34 .column(source::Column::WithProperties)
35 .join(JoinType::LeftJoin, table::Relation::Source.def())
36 .filter(
37 table::Column::TableType
38 .eq(TableType::Table)
39 .or(table::Column::TableType.eq(TableType::MaterializedView)),
40 )
41 .into_tuple()
42 .all(&inner.db)
43 .await?;
44
45 Ok(info
46 .into_iter()
47 .map(|(table_id, properties)| {
48 let connector_info = if let Some(inner_props) = properties {
49 inner_props
50 .inner_ref()
51 .get(UPSTREAM_SOURCE_KEY)
52 .map(|v| v.to_lowercase())
53 } else {
54 None
55 };
56 MetaTelemetryJobDesc {
57 table_id: table_id.as_raw_id() as _,
58 connector: connector_info,
59 optimization: vec![],
60 }
61 })
62 .collect())
63 }
64
65 pub async fn list_background_creating_jobs(
66 &self,
67 include_initial: bool,
68 database_id: Option<DatabaseId>,
69 ) -> MetaResult<Vec<(JobId, String, DateTime)>> {
70 let inner = self.inner.read().await;
71 let status_cond = if include_initial {
72 streaming_job::Column::JobStatus.is_in([JobStatus::Initial, JobStatus::Creating])
73 } else {
74 streaming_job::Column::JobStatus.eq(JobStatus::Creating)
75 };
76 let mut table_info: Vec<(JobId, String, DateTime)> = Table::find()
77 .select_only()
78 .columns([table::Column::TableId, table::Column::Definition])
79 .column(object::Column::InitializedAt)
80 .join(JoinType::LeftJoin, table::Relation::Object1.def())
81 .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
82 .filter(
83 streaming_job::Column::CreateType
84 .eq(CreateType::Background)
85 .and(status_cond.clone())
86 .and(
87 database_id
88 .map(|database_id| object::Column::DatabaseId.eq(database_id))
89 .unwrap_or_else(|| SimpleExpr::from(true)),
90 ),
91 )
92 .into_tuple()
93 .all(&inner.db)
94 .await?;
95 let sink_info: Vec<(JobId, String, DateTime)> = Sink::find()
96 .select_only()
97 .columns([sink::Column::SinkId, sink::Column::Definition])
98 .column(object::Column::InitializedAt)
99 .join(JoinType::LeftJoin, sink::Relation::Object.def())
100 .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
101 .filter(
102 streaming_job::Column::CreateType
103 .eq(CreateType::Background)
104 .and(status_cond)
105 .and(
106 database_id
107 .map(|database_id| object::Column::DatabaseId.eq(database_id))
108 .unwrap_or_else(|| SimpleExpr::from(true)),
109 ),
110 )
111 .into_tuple()
112 .all(&inner.db)
113 .await?;
114
115 table_info.extend(sink_info.into_iter());
116
117 Ok(table_info)
118 }
119
120 pub async fn list_databases(&self) -> MetaResult<Vec<PbDatabase>> {
121 let inner = self.inner.read().await;
122 inner.list_databases().await
123 }
124
125 pub async fn list_all_object_dependencies(&self) -> MetaResult<Vec<PbObjectDependencies>> {
126 self.list_object_dependencies(true).await
127 }
128
129 pub async fn list_created_object_dependencies(&self) -> MetaResult<Vec<PbObjectDependencies>> {
130 self.list_object_dependencies(false).await
131 }
132
133 pub async fn list_schemas(&self) -> MetaResult<Vec<PbSchema>> {
134 let inner = self.inner.read().await;
135 inner.list_schemas().await
136 }
137
138 pub async fn list_all_state_tables(&self) -> MetaResult<Vec<PbTable>> {
140 let inner = self.inner.read().await;
141 inner.list_all_state_tables().await
142 }
143
144 pub async fn list_readonly_table_ids(&self, schema_id: SchemaId) -> MetaResult<Vec<TableId>> {
145 let inner = self.inner.read().await;
146 let table_ids: Vec<TableId> = Table::find()
147 .select_only()
148 .column(table::Column::TableId)
149 .join(JoinType::InnerJoin, table::Relation::Object1.def())
150 .filter(
151 object::Column::SchemaId
152 .eq(schema_id)
153 .and(table::Column::TableType.ne(TableType::Table)),
154 )
155 .into_tuple()
156 .all(&inner.db)
157 .await?;
158 Ok(table_ids)
159 }
160
161 pub async fn list_dml_table_ids(&self, schema_id: SchemaId) -> MetaResult<Vec<TableId>> {
162 let inner = self.inner.read().await;
163 let table_ids: Vec<TableId> = Table::find()
164 .select_only()
165 .column(table::Column::TableId)
166 .join(JoinType::InnerJoin, table::Relation::Object1.def())
167 .filter(
168 object::Column::SchemaId
169 .eq(schema_id)
170 .and(table::Column::TableType.eq(TableType::Table)),
171 )
172 .into_tuple()
173 .all(&inner.db)
174 .await?;
175 Ok(table_ids)
176 }
177
178 pub async fn list_view_ids(&self, schema_id: SchemaId) -> MetaResult<Vec<ViewId>> {
179 let inner = self.inner.read().await;
180 let view_ids: Vec<ViewId> = View::find()
181 .select_only()
182 .column(view::Column::ViewId)
183 .join(JoinType::InnerJoin, view::Relation::Object.def())
184 .filter(object::Column::SchemaId.eq(schema_id))
185 .into_tuple()
186 .all(&inner.db)
187 .await?;
188 Ok(view_ids)
189 }
190
191 pub async fn list_tables_by_type(&self, table_type: TableType) -> MetaResult<Vec<PbTable>> {
193 let inner = self.inner.read().await;
194 let table_objs = Table::find()
195 .find_also_related(Object)
196 .filter(table::Column::TableType.eq(table_type))
197 .all(&inner.db)
198 .await?;
199 Ok(table_objs
200 .into_iter()
201 .map(|(table, obj)| ObjectModel(table, obj.unwrap()).into())
202 .collect())
203 }
204
205 pub async fn list_sources(&self) -> MetaResult<Vec<PbSource>> {
206 let inner = self.inner.read().await;
207 inner.list_sources().await
208 }
209
210 pub async fn list_source_id_with_shared_types(&self) -> MetaResult<HashMap<SourceId, bool>> {
212 let inner = self.inner.read().await;
213 let source_ids: Vec<(SourceId, Option<StreamSourceInfo>)> = Source::find()
214 .select_only()
215 .columns([source::Column::SourceId, source::Column::SourceInfo])
216 .into_tuple()
217 .all(&inner.db)
218 .await?;
219
220 Ok(source_ids
221 .into_iter()
222 .map(|(source_id, info)| {
223 (
224 source_id,
225 info.map(|info| info.to_protobuf().cdc_source_job)
226 .unwrap_or(false),
227 )
228 })
229 .collect())
230 }
231
232 pub async fn list_connections(&self) -> MetaResult<Vec<PbConnection>> {
233 let inner = self.inner.read().await;
234 let conn_objs = Connection::find()
235 .find_also_related(Object)
236 .all(&inner.db)
237 .await?;
238 Ok(conn_objs
239 .into_iter()
240 .map(|(conn, obj)| ObjectModel(conn, obj.unwrap()).into())
241 .collect())
242 }
243
244 pub async fn list_source_ids(&self, schema_id: SchemaId) -> MetaResult<Vec<SourceId>> {
245 let inner = self.inner.read().await;
246 let source_ids: Vec<SourceId> = Source::find()
247 .select_only()
248 .column(source::Column::SourceId)
249 .join(JoinType::InnerJoin, source::Relation::Object.def())
250 .filter(object::Column::SchemaId.eq(schema_id))
251 .into_tuple()
252 .all(&inner.db)
253 .await?;
254 Ok(source_ids)
255 }
256
257 pub async fn list_indexes(&self) -> MetaResult<Vec<PbIndex>> {
258 let inner = self.inner.read().await;
259 inner.list_indexes().await
260 }
261
262 pub async fn list_sinks(&self) -> MetaResult<Vec<PbSink>> {
263 let inner = self.inner.read().await;
264 inner.list_sinks().await
265 }
266
267 pub async fn list_subscriptions(&self) -> MetaResult<Vec<PbSubscription>> {
268 let inner = self.inner.read().await;
269 inner.list_subscriptions().await
270 }
271
272 pub async fn list_views(&self) -> MetaResult<Vec<PbView>> {
273 let inner = self.inner.read().await;
274 inner.list_views().await
275 }
276
277 pub async fn list_users(&self) -> MetaResult<Vec<PbUserInfo>> {
278 let inner = self.inner.read().await;
279 inner.list_users().await
280 }
281
282 pub async fn list_functions(&self) -> MetaResult<Vec<PbFunction>> {
283 let inner = self.inner.read().await;
284 inner.list_functions().await
285 }
286
287 pub async fn list_unmigrated_tables(&self) -> MetaResult<Vec<PbTable>> {
290 let inner = self.inner.read().await;
291
292 let table_objs = Table::find()
293 .filter(table::Column::TableType.eq(TableType::Table))
294 .find_also_related(Object)
295 .join(JoinType::InnerJoin, object::Relation::Fragment.def())
296 .filter(FragmentTypeMask::intersects(FragmentTypeFlag::Mview).and(
297 FragmentTypeMask::disjoint(FragmentTypeFlag::UpstreamSinkUnion),
298 ))
299 .all(&inner.db)
300 .await?;
301
302 Ok(table_objs
303 .into_iter()
304 .map(|(table, obj)| ObjectModel(table, obj.unwrap()).into())
305 .collect())
306 }
307}