risingwave_meta/controller/catalog/
list_op.rs1use sea_orm::prelude::DateTime;
16
17use super::*;
18
19impl CatalogController {
20 pub async fn list_time_travel_table_ids(&self) -> MetaResult<Vec<TableId>> {
21 self.inner.read().await.list_time_travel_table_ids().await
22 }
23
24 pub async fn list_stream_job_desc_for_telemetry(
25 &self,
26 ) -> MetaResult<Vec<MetaTelemetryJobDesc>> {
27 let inner = self.inner.read().await;
28 let info: Vec<(TableId, Option<Property>)> = Table::find()
29 .select_only()
30 .column(table::Column::TableId)
31 .column(source::Column::WithProperties)
32 .join(JoinType::LeftJoin, table::Relation::Source.def())
33 .filter(
34 table::Column::TableType
35 .eq(TableType::Table)
36 .or(table::Column::TableType.eq(TableType::MaterializedView)),
37 )
38 .into_tuple()
39 .all(&inner.db)
40 .await?;
41
42 Ok(info
43 .into_iter()
44 .map(|(table_id, properties)| {
45 let connector_info = if let Some(inner_props) = properties {
46 inner_props
47 .inner_ref()
48 .get(UPSTREAM_SOURCE_KEY)
49 .map(|v| v.to_lowercase())
50 } else {
51 None
52 };
53 MetaTelemetryJobDesc {
54 table_id,
55 connector: connector_info,
56 optimization: vec![],
57 }
58 })
59 .collect())
60 }
61
62 pub async fn list_background_creating_jobs(
63 &self,
64 include_initial: bool,
65 ) -> MetaResult<Vec<(ObjectId, String, DateTime)>> {
66 let inner = self.inner.read().await;
67 let status_cond = if include_initial {
68 streaming_job::Column::JobStatus.is_in([JobStatus::Initial, JobStatus::Creating])
69 } else {
70 streaming_job::Column::JobStatus.eq(JobStatus::Creating)
71 };
72 let mut table_info: Vec<(ObjectId, String, DateTime)> = Table::find()
73 .select_only()
74 .columns([table::Column::TableId, table::Column::Definition])
75 .column(object::Column::InitializedAt)
76 .join(JoinType::LeftJoin, table::Relation::Object1.def())
77 .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
78 .filter(
79 streaming_job::Column::CreateType
80 .eq(CreateType::Background)
81 .and(status_cond.clone()),
82 )
83 .into_tuple()
84 .all(&inner.db)
85 .await?;
86 let sink_info: Vec<(ObjectId, String, DateTime)> = Sink::find()
87 .select_only()
88 .columns([sink::Column::SinkId, sink::Column::Definition])
89 .column(object::Column::InitializedAt)
90 .join(JoinType::LeftJoin, sink::Relation::Object.def())
91 .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
92 .filter(
93 streaming_job::Column::CreateType
94 .eq(CreateType::Background)
95 .and(status_cond),
96 )
97 .into_tuple()
98 .all(&inner.db)
99 .await?;
100
101 table_info.extend(sink_info.into_iter());
102
103 Ok(table_info)
104 }
105
106 pub async fn list_databases(&self) -> MetaResult<Vec<PbDatabase>> {
107 let inner = self.inner.read().await;
108 inner.list_databases().await
109 }
110
111 pub async fn list_all_object_dependencies(&self) -> MetaResult<Vec<PbObjectDependencies>> {
112 self.list_object_dependencies(true).await
113 }
114
115 pub async fn list_created_object_dependencies(&self) -> MetaResult<Vec<PbObjectDependencies>> {
116 self.list_object_dependencies(false).await
117 }
118
119 pub async fn list_schemas(&self) -> MetaResult<Vec<PbSchema>> {
120 let inner = self.inner.read().await;
121 inner.list_schemas().await
122 }
123
124 pub async fn list_all_state_tables(&self) -> MetaResult<Vec<PbTable>> {
126 let inner = self.inner.read().await;
127 inner.list_all_state_tables().await
128 }
129
130 pub async fn list_readonly_table_ids(&self, schema_id: SchemaId) -> MetaResult<Vec<TableId>> {
131 let inner = self.inner.read().await;
132 let table_ids: Vec<TableId> = Table::find()
133 .select_only()
134 .column(table::Column::TableId)
135 .join(JoinType::InnerJoin, table::Relation::Object1.def())
136 .filter(
137 object::Column::SchemaId
138 .eq(schema_id)
139 .and(table::Column::TableType.ne(TableType::Table)),
140 )
141 .into_tuple()
142 .all(&inner.db)
143 .await?;
144 Ok(table_ids)
145 }
146
147 pub async fn list_dml_table_ids(&self, schema_id: SchemaId) -> MetaResult<Vec<TableId>> {
148 let inner = self.inner.read().await;
149 let table_ids: Vec<TableId> = Table::find()
150 .select_only()
151 .column(table::Column::TableId)
152 .join(JoinType::InnerJoin, table::Relation::Object1.def())
153 .filter(
154 object::Column::SchemaId
155 .eq(schema_id)
156 .and(table::Column::TableType.eq(TableType::Table)),
157 )
158 .into_tuple()
159 .all(&inner.db)
160 .await?;
161 Ok(table_ids)
162 }
163
164 pub async fn list_view_ids(&self, schema_id: SchemaId) -> MetaResult<Vec<ViewId>> {
165 let inner = self.inner.read().await;
166 let view_ids: Vec<ViewId> = View::find()
167 .select_only()
168 .column(view::Column::ViewId)
169 .join(JoinType::InnerJoin, view::Relation::Object.def())
170 .filter(object::Column::SchemaId.eq(schema_id))
171 .into_tuple()
172 .all(&inner.db)
173 .await?;
174 Ok(view_ids)
175 }
176
177 pub async fn list_tables_by_type(&self, table_type: TableType) -> MetaResult<Vec<PbTable>> {
179 let inner = self.inner.read().await;
180 let table_objs = Table::find()
181 .find_also_related(Object)
182 .filter(table::Column::TableType.eq(table_type))
183 .all(&inner.db)
184 .await?;
185 Ok(table_objs
186 .into_iter()
187 .map(|(table, obj)| ObjectModel(table, obj.unwrap()).into())
188 .collect())
189 }
190
191 pub async fn list_sources(&self) -> MetaResult<Vec<PbSource>> {
192 let inner = self.inner.read().await;
193 inner.list_sources().await
194 }
195
196 pub async fn list_source_id_with_shared_types(&self) -> MetaResult<HashMap<SourceId, bool>> {
198 let inner = self.inner.read().await;
199 let source_ids: Vec<(SourceId, Option<StreamSourceInfo>)> = Source::find()
200 .select_only()
201 .columns([source::Column::SourceId, source::Column::SourceInfo])
202 .into_tuple()
203 .all(&inner.db)
204 .await?;
205
206 Ok(source_ids
207 .into_iter()
208 .map(|(source_id, info)| {
209 (
210 source_id,
211 info.map(|info| info.to_protobuf().cdc_source_job)
212 .unwrap_or(false),
213 )
214 })
215 .collect())
216 }
217
218 pub async fn list_connections(&self) -> MetaResult<Vec<PbConnection>> {
219 let inner = self.inner.read().await;
220 let conn_objs = Connection::find()
221 .find_also_related(Object)
222 .all(&inner.db)
223 .await?;
224 Ok(conn_objs
225 .into_iter()
226 .map(|(conn, obj)| ObjectModel(conn, obj.unwrap()).into())
227 .collect())
228 }
229
230 pub async fn list_source_ids(&self, schema_id: SchemaId) -> MetaResult<Vec<SourceId>> {
231 let inner = self.inner.read().await;
232 let source_ids: Vec<SourceId> = Source::find()
233 .select_only()
234 .column(source::Column::SourceId)
235 .join(JoinType::InnerJoin, source::Relation::Object.def())
236 .filter(object::Column::SchemaId.eq(schema_id))
237 .into_tuple()
238 .all(&inner.db)
239 .await?;
240 Ok(source_ids)
241 }
242
243 pub async fn list_indexes(&self) -> MetaResult<Vec<PbIndex>> {
244 let inner = self.inner.read().await;
245 inner.list_indexes().await
246 }
247
248 pub async fn list_sinks(&self) -> MetaResult<Vec<PbSink>> {
249 let inner = self.inner.read().await;
250 inner.list_sinks().await
251 }
252
253 pub async fn list_subscriptions(&self) -> MetaResult<Vec<PbSubscription>> {
254 let inner = self.inner.read().await;
255 inner.list_subscriptions().await
256 }
257
258 pub async fn list_views(&self) -> MetaResult<Vec<PbView>> {
259 let inner = self.inner.read().await;
260 inner.list_views().await
261 }
262
263 pub async fn list_users(&self) -> MetaResult<Vec<PbUserInfo>> {
264 let inner = self.inner.read().await;
265 inner.list_users().await
266 }
267
268 pub async fn list_functions(&self) -> MetaResult<Vec<PbFunction>> {
269 let inner = self.inner.read().await;
270 inner.list_functions().await
271 }
272}