risingwave_meta/controller/catalog/
list_op.rs1use sea_orm::prelude::DateTime;
16
17use super::*;
18
19impl CatalogController {
20 pub async fn list_time_travel_table_ids(&self) -> MetaResult<Vec<TableId>> {
21 self.inner.read().await.list_time_travel_table_ids().await
22 }
23
24 pub async fn list_stream_job_desc_for_telemetry(
25 &self,
26 ) -> MetaResult<Vec<MetaTelemetryJobDesc>> {
27 let inner = self.inner.read().await;
28 let info: Vec<(TableId, Option<Property>)> = Table::find()
29 .select_only()
30 .column(table::Column::TableId)
31 .column(source::Column::WithProperties)
32 .join(JoinType::LeftJoin, table::Relation::Source.def())
33 .filter(
34 table::Column::TableType
35 .eq(TableType::Table)
36 .or(table::Column::TableType.eq(TableType::MaterializedView)),
37 )
38 .into_tuple()
39 .all(&inner.db)
40 .await?;
41
42 Ok(info
43 .into_iter()
44 .map(|(table_id, properties)| {
45 let connector_info = if let Some(inner_props) = properties {
46 inner_props
47 .inner_ref()
48 .get(UPSTREAM_SOURCE_KEY)
49 .map(|v| v.to_lowercase())
50 } else {
51 None
52 };
53 MetaTelemetryJobDesc {
54 table_id,
55 connector: connector_info,
56 optimization: vec![],
57 }
58 })
59 .collect())
60 }
61
62 pub async fn list_background_creating_jobs(
63 &self,
64 include_initial: bool,
65 ) -> MetaResult<Vec<(ObjectId, String, DateTime)>> {
66 let inner = self.inner.read().await;
67 let status_cond = if include_initial {
68 streaming_job::Column::JobStatus.is_in([JobStatus::Initial, JobStatus::Creating])
69 } else {
70 streaming_job::Column::JobStatus.eq(JobStatus::Creating)
71 };
72 let mut table_info: Vec<(ObjectId, String, DateTime)> = Table::find()
73 .select_only()
74 .columns([table::Column::TableId, table::Column::Definition])
75 .column(object::Column::InitializedAt)
76 .join(JoinType::LeftJoin, table::Relation::Object1.def())
77 .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
78 .filter(
79 table::Column::TableType
80 .eq(TableType::MaterializedView)
81 .and(
82 streaming_job::Column::CreateType
83 .eq(CreateType::Background)
84 .and(status_cond.clone()),
85 ),
86 )
87 .into_tuple()
88 .all(&inner.db)
89 .await?;
90 let sink_info: Vec<(ObjectId, String, DateTime)> = Sink::find()
91 .select_only()
92 .columns([sink::Column::SinkId, sink::Column::Definition])
93 .column(object::Column::InitializedAt)
94 .join(JoinType::LeftJoin, sink::Relation::Object.def())
95 .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
96 .filter(
97 streaming_job::Column::CreateType
98 .eq(CreateType::Background)
99 .and(status_cond),
100 )
101 .into_tuple()
102 .all(&inner.db)
103 .await?;
104
105 table_info.extend(sink_info.into_iter());
106
107 Ok(table_info)
108 }
109
110 pub async fn list_databases(&self) -> MetaResult<Vec<PbDatabase>> {
111 let inner = self.inner.read().await;
112 inner.list_databases().await
113 }
114
115 pub async fn list_all_object_dependencies(&self) -> MetaResult<Vec<PbObjectDependencies>> {
116 self.list_object_dependencies(true).await
117 }
118
119 pub async fn list_created_object_dependencies(&self) -> MetaResult<Vec<PbObjectDependencies>> {
120 self.list_object_dependencies(false).await
121 }
122
123 pub async fn list_schemas(&self) -> MetaResult<Vec<PbSchema>> {
124 let inner = self.inner.read().await;
125 inner.list_schemas().await
126 }
127
128 pub async fn list_all_state_tables(&self) -> MetaResult<Vec<PbTable>> {
130 let inner = self.inner.read().await;
131 inner.list_all_state_tables().await
132 }
133
134 pub async fn list_readonly_table_ids(&self, schema_id: SchemaId) -> MetaResult<Vec<TableId>> {
135 let inner = self.inner.read().await;
136 let table_ids: Vec<TableId> = Table::find()
137 .select_only()
138 .column(table::Column::TableId)
139 .join(JoinType::InnerJoin, table::Relation::Object1.def())
140 .filter(
141 object::Column::SchemaId
142 .eq(schema_id)
143 .and(table::Column::TableType.ne(TableType::Table)),
144 )
145 .into_tuple()
146 .all(&inner.db)
147 .await?;
148 Ok(table_ids)
149 }
150
151 pub async fn list_dml_table_ids(&self, schema_id: SchemaId) -> MetaResult<Vec<TableId>> {
152 let inner = self.inner.read().await;
153 let table_ids: Vec<TableId> = Table::find()
154 .select_only()
155 .column(table::Column::TableId)
156 .join(JoinType::InnerJoin, table::Relation::Object1.def())
157 .filter(
158 object::Column::SchemaId
159 .eq(schema_id)
160 .and(table::Column::TableType.eq(TableType::Table)),
161 )
162 .into_tuple()
163 .all(&inner.db)
164 .await?;
165 Ok(table_ids)
166 }
167
168 pub async fn list_view_ids(&self, schema_id: SchemaId) -> MetaResult<Vec<ViewId>> {
169 let inner = self.inner.read().await;
170 let view_ids: Vec<ViewId> = View::find()
171 .select_only()
172 .column(view::Column::ViewId)
173 .join(JoinType::InnerJoin, view::Relation::Object.def())
174 .filter(object::Column::SchemaId.eq(schema_id))
175 .into_tuple()
176 .all(&inner.db)
177 .await?;
178 Ok(view_ids)
179 }
180
181 pub async fn list_tables_by_type(&self, table_type: TableType) -> MetaResult<Vec<PbTable>> {
183 let inner = self.inner.read().await;
184 let table_objs = Table::find()
185 .find_also_related(Object)
186 .filter(table::Column::TableType.eq(table_type))
187 .all(&inner.db)
188 .await?;
189 Ok(table_objs
190 .into_iter()
191 .map(|(table, obj)| ObjectModel(table, obj.unwrap()).into())
192 .collect())
193 }
194
195 pub async fn list_sources(&self) -> MetaResult<Vec<PbSource>> {
196 let inner = self.inner.read().await;
197 inner.list_sources().await
198 }
199
200 pub async fn list_source_id_with_shared_types(&self) -> MetaResult<HashMap<SourceId, bool>> {
202 let inner = self.inner.read().await;
203 let source_ids: Vec<(SourceId, Option<StreamSourceInfo>)> = Source::find()
204 .select_only()
205 .columns([source::Column::SourceId, source::Column::SourceInfo])
206 .into_tuple()
207 .all(&inner.db)
208 .await?;
209
210 Ok(source_ids
211 .into_iter()
212 .map(|(source_id, info)| {
213 (
214 source_id,
215 info.map(|info| info.to_protobuf().cdc_source_job)
216 .unwrap_or(false),
217 )
218 })
219 .collect())
220 }
221
222 pub async fn list_connections(&self) -> MetaResult<Vec<PbConnection>> {
223 let inner = self.inner.read().await;
224 let conn_objs = Connection::find()
225 .find_also_related(Object)
226 .all(&inner.db)
227 .await?;
228 Ok(conn_objs
229 .into_iter()
230 .map(|(conn, obj)| ObjectModel(conn, obj.unwrap()).into())
231 .collect())
232 }
233
234 pub async fn list_source_ids(&self, schema_id: SchemaId) -> MetaResult<Vec<SourceId>> {
235 let inner = self.inner.read().await;
236 let source_ids: Vec<SourceId> = Source::find()
237 .select_only()
238 .column(source::Column::SourceId)
239 .join(JoinType::InnerJoin, source::Relation::Object.def())
240 .filter(object::Column::SchemaId.eq(schema_id))
241 .into_tuple()
242 .all(&inner.db)
243 .await?;
244 Ok(source_ids)
245 }
246
247 pub async fn list_indexes(&self) -> MetaResult<Vec<PbIndex>> {
248 let inner = self.inner.read().await;
249 inner.list_indexes().await
250 }
251
252 pub async fn list_sinks(&self) -> MetaResult<Vec<PbSink>> {
253 let inner = self.inner.read().await;
254 inner.list_sinks().await
255 }
256
257 pub async fn list_subscriptions(&self) -> MetaResult<Vec<PbSubscription>> {
258 let inner = self.inner.read().await;
259 inner.list_subscriptions().await
260 }
261
262 pub async fn list_views(&self) -> MetaResult<Vec<PbView>> {
263 let inner = self.inner.read().await;
264 inner.list_views().await
265 }
266
267 pub async fn list_users(&self) -> MetaResult<Vec<PbUserInfo>> {
268 let inner = self.inner.read().await;
269 inner.list_users().await
270 }
271
272 pub async fn list_functions(&self) -> MetaResult<Vec<PbFunction>> {
273 let inner = self.inner.read().await;
274 inner.list_functions().await
275 }
276}