risingwave_meta/controller/catalog/
list_op.rs1use risingwave_common::catalog::FragmentTypeMask;
16use sea_orm::prelude::DateTime;
17
18use super::*;
19use crate::controller::fragment::FragmentTypeMaskExt;
20
21impl CatalogController {
22 pub async fn list_time_travel_table_ids(&self) -> MetaResult<Vec<TableId>> {
23 self.inner.read().await.list_time_travel_table_ids().await
24 }
25
26 pub async fn list_stream_job_desc_for_telemetry(
27 &self,
28 ) -> MetaResult<Vec<MetaTelemetryJobDesc>> {
29 let inner = self.inner.read().await;
30 let info: Vec<(TableId, Option<Property>)> = Table::find()
31 .select_only()
32 .column(table::Column::TableId)
33 .column(source::Column::WithProperties)
34 .join(JoinType::LeftJoin, table::Relation::Source.def())
35 .filter(
36 table::Column::TableType
37 .eq(TableType::Table)
38 .or(table::Column::TableType.eq(TableType::MaterializedView)),
39 )
40 .into_tuple()
41 .all(&inner.db)
42 .await?;
43
44 Ok(info
45 .into_iter()
46 .map(|(table_id, properties)| {
47 let connector_info = if let Some(inner_props) = properties {
48 inner_props
49 .inner_ref()
50 .get(UPSTREAM_SOURCE_KEY)
51 .map(|v| v.to_lowercase())
52 } else {
53 None
54 };
55 MetaTelemetryJobDesc {
56 table_id,
57 connector: connector_info,
58 optimization: vec![],
59 }
60 })
61 .collect())
62 }
63
64 pub async fn list_background_creating_jobs(
65 &self,
66 include_initial: bool,
67 ) -> MetaResult<Vec<(ObjectId, String, DateTime)>> {
68 let inner = self.inner.read().await;
69 let status_cond = if include_initial {
70 streaming_job::Column::JobStatus.is_in([JobStatus::Initial, JobStatus::Creating])
71 } else {
72 streaming_job::Column::JobStatus.eq(JobStatus::Creating)
73 };
74 let mut table_info: Vec<(ObjectId, String, DateTime)> = Table::find()
75 .select_only()
76 .columns([table::Column::TableId, table::Column::Definition])
77 .column(object::Column::InitializedAt)
78 .join(JoinType::LeftJoin, table::Relation::Object1.def())
79 .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
80 .filter(
81 streaming_job::Column::CreateType
82 .eq(CreateType::Background)
83 .and(status_cond.clone()),
84 )
85 .into_tuple()
86 .all(&inner.db)
87 .await?;
88 let sink_info: Vec<(ObjectId, String, DateTime)> = Sink::find()
89 .select_only()
90 .columns([sink::Column::SinkId, sink::Column::Definition])
91 .column(object::Column::InitializedAt)
92 .join(JoinType::LeftJoin, sink::Relation::Object.def())
93 .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
94 .filter(
95 streaming_job::Column::CreateType
96 .eq(CreateType::Background)
97 .and(status_cond),
98 )
99 .into_tuple()
100 .all(&inner.db)
101 .await?;
102
103 table_info.extend(sink_info.into_iter());
104
105 Ok(table_info)
106 }
107
108 pub async fn list_databases(&self) -> MetaResult<Vec<PbDatabase>> {
109 let inner = self.inner.read().await;
110 inner.list_databases().await
111 }
112
113 pub async fn list_all_object_dependencies(&self) -> MetaResult<Vec<PbObjectDependencies>> {
114 self.list_object_dependencies(true).await
115 }
116
117 pub async fn list_created_object_dependencies(&self) -> MetaResult<Vec<PbObjectDependencies>> {
118 self.list_object_dependencies(false).await
119 }
120
121 pub async fn list_schemas(&self) -> MetaResult<Vec<PbSchema>> {
122 let inner = self.inner.read().await;
123 inner.list_schemas().await
124 }
125
126 pub async fn list_all_state_tables(&self) -> MetaResult<Vec<PbTable>> {
128 let inner = self.inner.read().await;
129 inner.list_all_state_tables().await
130 }
131
132 pub async fn list_readonly_table_ids(&self, schema_id: SchemaId) -> MetaResult<Vec<TableId>> {
133 let inner = self.inner.read().await;
134 let table_ids: Vec<TableId> = Table::find()
135 .select_only()
136 .column(table::Column::TableId)
137 .join(JoinType::InnerJoin, table::Relation::Object1.def())
138 .filter(
139 object::Column::SchemaId
140 .eq(schema_id)
141 .and(table::Column::TableType.ne(TableType::Table)),
142 )
143 .into_tuple()
144 .all(&inner.db)
145 .await?;
146 Ok(table_ids)
147 }
148
149 pub async fn list_dml_table_ids(&self, schema_id: SchemaId) -> MetaResult<Vec<TableId>> {
150 let inner = self.inner.read().await;
151 let table_ids: Vec<TableId> = Table::find()
152 .select_only()
153 .column(table::Column::TableId)
154 .join(JoinType::InnerJoin, table::Relation::Object1.def())
155 .filter(
156 object::Column::SchemaId
157 .eq(schema_id)
158 .and(table::Column::TableType.eq(TableType::Table)),
159 )
160 .into_tuple()
161 .all(&inner.db)
162 .await?;
163 Ok(table_ids)
164 }
165
166 pub async fn list_view_ids(&self, schema_id: SchemaId) -> MetaResult<Vec<ViewId>> {
167 let inner = self.inner.read().await;
168 let view_ids: Vec<ViewId> = View::find()
169 .select_only()
170 .column(view::Column::ViewId)
171 .join(JoinType::InnerJoin, view::Relation::Object.def())
172 .filter(object::Column::SchemaId.eq(schema_id))
173 .into_tuple()
174 .all(&inner.db)
175 .await?;
176 Ok(view_ids)
177 }
178
179 pub async fn list_tables_by_type(&self, table_type: TableType) -> MetaResult<Vec<PbTable>> {
181 let inner = self.inner.read().await;
182 let table_objs = Table::find()
183 .find_also_related(Object)
184 .filter(table::Column::TableType.eq(table_type))
185 .all(&inner.db)
186 .await?;
187 Ok(table_objs
188 .into_iter()
189 .map(|(table, obj)| ObjectModel(table, obj.unwrap()).into())
190 .collect())
191 }
192
193 pub async fn list_sources(&self) -> MetaResult<Vec<PbSource>> {
194 let inner = self.inner.read().await;
195 inner.list_sources().await
196 }
197
198 pub async fn list_source_id_with_shared_types(&self) -> MetaResult<HashMap<SourceId, bool>> {
200 let inner = self.inner.read().await;
201 let source_ids: Vec<(SourceId, Option<StreamSourceInfo>)> = Source::find()
202 .select_only()
203 .columns([source::Column::SourceId, source::Column::SourceInfo])
204 .into_tuple()
205 .all(&inner.db)
206 .await?;
207
208 Ok(source_ids
209 .into_iter()
210 .map(|(source_id, info)| {
211 (
212 source_id,
213 info.map(|info| info.to_protobuf().cdc_source_job)
214 .unwrap_or(false),
215 )
216 })
217 .collect())
218 }
219
220 pub async fn list_connections(&self) -> MetaResult<Vec<PbConnection>> {
221 let inner = self.inner.read().await;
222 let conn_objs = Connection::find()
223 .find_also_related(Object)
224 .all(&inner.db)
225 .await?;
226 Ok(conn_objs
227 .into_iter()
228 .map(|(conn, obj)| ObjectModel(conn, obj.unwrap()).into())
229 .collect())
230 }
231
232 pub async fn list_source_ids(&self, schema_id: SchemaId) -> MetaResult<Vec<SourceId>> {
233 let inner = self.inner.read().await;
234 let source_ids: Vec<SourceId> = Source::find()
235 .select_only()
236 .column(source::Column::SourceId)
237 .join(JoinType::InnerJoin, source::Relation::Object.def())
238 .filter(object::Column::SchemaId.eq(schema_id))
239 .into_tuple()
240 .all(&inner.db)
241 .await?;
242 Ok(source_ids)
243 }
244
245 pub async fn list_indexes(&self) -> MetaResult<Vec<PbIndex>> {
246 let inner = self.inner.read().await;
247 inner.list_indexes().await
248 }
249
250 pub async fn list_sinks(&self) -> MetaResult<Vec<PbSink>> {
251 let inner = self.inner.read().await;
252 inner.list_sinks().await
253 }
254
255 pub async fn list_subscriptions(&self) -> MetaResult<Vec<PbSubscription>> {
256 let inner = self.inner.read().await;
257 inner.list_subscriptions().await
258 }
259
260 pub async fn list_views(&self) -> MetaResult<Vec<PbView>> {
261 let inner = self.inner.read().await;
262 inner.list_views().await
263 }
264
265 pub async fn list_users(&self) -> MetaResult<Vec<PbUserInfo>> {
266 let inner = self.inner.read().await;
267 inner.list_users().await
268 }
269
270 pub async fn list_functions(&self) -> MetaResult<Vec<PbFunction>> {
271 let inner = self.inner.read().await;
272 inner.list_functions().await
273 }
274
275 pub async fn list_unmigrated_tables(&self) -> MetaResult<Vec<PbTable>> {
278 let inner = self.inner.read().await;
279
280 let table_objs = Table::find()
281 .filter(table::Column::TableType.eq(TableType::Table))
282 .find_also_related(Object)
283 .join(JoinType::InnerJoin, object::Relation::Fragment.def())
284 .filter(FragmentTypeMask::intersects(FragmentTypeFlag::Mview).and(
285 FragmentTypeMask::disjoint(FragmentTypeFlag::UpstreamSinkUnion),
286 ))
287 .all(&inner.db)
288 .await?;
289
290 Ok(table_objs
291 .into_iter()
292 .map(|(table, obj)| ObjectModel(table, obj.unwrap()).into())
293 .collect())
294 }
295}