risingwave_meta/controller/catalog/
list_op.rs1use super::*;
16
17impl CatalogController {
18 pub async fn list_time_travel_table_ids(&self) -> MetaResult<Vec<TableId>> {
19 self.inner.read().await.list_time_travel_table_ids().await
20 }
21
22 pub async fn list_stream_job_desc_for_telemetry(
23 &self,
24 ) -> MetaResult<Vec<MetaTelemetryJobDesc>> {
25 let inner = self.inner.read().await;
26 let info: Vec<(TableId, Option<Property>)> = Table::find()
27 .select_only()
28 .column(table::Column::TableId)
29 .column(source::Column::WithProperties)
30 .join(JoinType::LeftJoin, table::Relation::Source.def())
31 .filter(
32 table::Column::TableType
33 .eq(TableType::Table)
34 .or(table::Column::TableType.eq(TableType::MaterializedView)),
35 )
36 .into_tuple()
37 .all(&inner.db)
38 .await?;
39
40 Ok(info
41 .into_iter()
42 .map(|(table_id, properties)| {
43 let connector_info = if let Some(inner_props) = properties {
44 inner_props
45 .inner_ref()
46 .get(UPSTREAM_SOURCE_KEY)
47 .map(|v| v.to_lowercase())
48 } else {
49 None
50 };
51 MetaTelemetryJobDesc {
52 table_id,
53 connector: connector_info,
54 optimization: vec![],
55 }
56 })
57 .collect())
58 }
59
60 pub async fn list_background_creating_mviews(
61 &self,
62 include_initial: bool,
63 ) -> MetaResult<Vec<table::Model>> {
64 let inner = self.inner.read().await;
65 let status_cond = if include_initial {
66 streaming_job::Column::JobStatus.is_in([JobStatus::Initial, JobStatus::Creating])
67 } else {
68 streaming_job::Column::JobStatus.eq(JobStatus::Creating)
69 };
70 let tables = Table::find()
71 .join(JoinType::LeftJoin, table::Relation::Object1.def())
72 .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
73 .filter(
74 table::Column::TableType
75 .eq(TableType::MaterializedView)
76 .and(
77 streaming_job::Column::CreateType
78 .eq(CreateType::Background)
79 .and(status_cond),
80 ),
81 )
82 .all(&inner.db)
83 .await?;
84 Ok(tables)
85 }
86
87 pub async fn list_databases(&self) -> MetaResult<Vec<PbDatabase>> {
88 let inner = self.inner.read().await;
89 inner.list_databases().await
90 }
91
92 pub async fn list_all_object_dependencies(&self) -> MetaResult<Vec<PbObjectDependencies>> {
93 self.list_object_dependencies(true).await
94 }
95
96 pub async fn list_created_object_dependencies(&self) -> MetaResult<Vec<PbObjectDependencies>> {
97 self.list_object_dependencies(false).await
98 }
99
100 pub async fn list_schemas(&self) -> MetaResult<Vec<PbSchema>> {
101 let inner = self.inner.read().await;
102 inner.list_schemas().await
103 }
104
105 pub async fn list_all_state_tables(&self) -> MetaResult<Vec<PbTable>> {
106 let inner = self.inner.read().await;
107 inner.list_all_state_tables().await
108 }
109
110 pub async fn list_readonly_table_ids(&self, schema_id: SchemaId) -> MetaResult<Vec<TableId>> {
111 let inner = self.inner.read().await;
112 let table_ids: Vec<TableId> = Table::find()
113 .select_only()
114 .column(table::Column::TableId)
115 .join(JoinType::InnerJoin, table::Relation::Object1.def())
116 .filter(
117 object::Column::SchemaId
118 .eq(schema_id)
119 .and(table::Column::TableType.ne(TableType::Table)),
120 )
121 .into_tuple()
122 .all(&inner.db)
123 .await?;
124 Ok(table_ids)
125 }
126
127 pub async fn list_dml_table_ids(&self, schema_id: SchemaId) -> MetaResult<Vec<TableId>> {
128 let inner = self.inner.read().await;
129 let table_ids: Vec<TableId> = Table::find()
130 .select_only()
131 .column(table::Column::TableId)
132 .join(JoinType::InnerJoin, table::Relation::Object1.def())
133 .filter(
134 object::Column::SchemaId
135 .eq(schema_id)
136 .and(table::Column::TableType.eq(TableType::Table)),
137 )
138 .into_tuple()
139 .all(&inner.db)
140 .await?;
141 Ok(table_ids)
142 }
143
144 pub async fn list_view_ids(&self, schema_id: SchemaId) -> MetaResult<Vec<ViewId>> {
145 let inner = self.inner.read().await;
146 let view_ids: Vec<ViewId> = View::find()
147 .select_only()
148 .column(view::Column::ViewId)
149 .join(JoinType::InnerJoin, view::Relation::Object.def())
150 .filter(object::Column::SchemaId.eq(schema_id))
151 .into_tuple()
152 .all(&inner.db)
153 .await?;
154 Ok(view_ids)
155 }
156
157 pub async fn list_tables_by_type(&self, table_type: TableType) -> MetaResult<Vec<PbTable>> {
158 let inner = self.inner.read().await;
159 let table_objs = Table::find()
160 .find_also_related(Object)
161 .filter(table::Column::TableType.eq(table_type))
162 .all(&inner.db)
163 .await?;
164 Ok(table_objs
165 .into_iter()
166 .map(|(table, obj)| ObjectModel(table, obj.unwrap()).into())
167 .collect())
168 }
169
170 pub async fn list_sources(&self) -> MetaResult<Vec<PbSource>> {
171 let inner = self.inner.read().await;
172 inner.list_sources().await
173 }
174
175 pub async fn list_source_id_with_shared_types(&self) -> MetaResult<HashMap<SourceId, bool>> {
177 let inner = self.inner.read().await;
178 let source_ids: Vec<(SourceId, Option<StreamSourceInfo>)> = Source::find()
179 .select_only()
180 .columns([source::Column::SourceId, source::Column::SourceInfo])
181 .into_tuple()
182 .all(&inner.db)
183 .await?;
184
185 Ok(source_ids
186 .into_iter()
187 .map(|(source_id, info)| {
188 (
189 source_id,
190 info.map(|info| info.to_protobuf().cdc_source_job)
191 .unwrap_or(false),
192 )
193 })
194 .collect())
195 }
196
197 pub async fn list_connections(&self) -> MetaResult<Vec<PbConnection>> {
198 let inner = self.inner.read().await;
199 let conn_objs = Connection::find()
200 .find_also_related(Object)
201 .all(&inner.db)
202 .await?;
203 Ok(conn_objs
204 .into_iter()
205 .map(|(conn, obj)| ObjectModel(conn, obj.unwrap()).into())
206 .collect())
207 }
208
209 pub async fn list_source_ids(&self, schema_id: SchemaId) -> MetaResult<Vec<SourceId>> {
210 let inner = self.inner.read().await;
211 let source_ids: Vec<SourceId> = Source::find()
212 .select_only()
213 .column(source::Column::SourceId)
214 .join(JoinType::InnerJoin, source::Relation::Object.def())
215 .filter(object::Column::SchemaId.eq(schema_id))
216 .into_tuple()
217 .all(&inner.db)
218 .await?;
219 Ok(source_ids)
220 }
221
222 pub async fn list_sinks(&self) -> MetaResult<Vec<PbSink>> {
223 let inner = self.inner.read().await;
224 inner.list_sinks().await
225 }
226
227 pub async fn list_subscriptions(&self) -> MetaResult<Vec<PbSubscription>> {
228 let inner = self.inner.read().await;
229 inner.list_subscriptions().await
230 }
231
232 pub async fn list_views(&self) -> MetaResult<Vec<PbView>> {
233 let inner = self.inner.read().await;
234 inner.list_views().await
235 }
236
237 pub async fn list_users(&self) -> MetaResult<Vec<PbUserInfo>> {
238 let inner = self.inner.read().await;
239 inner.list_users().await
240 }
241}