risingwave_meta/controller/catalog/
get_op.rs
1use super::*;
16use crate::controller::utils::{get_database_resource_group, get_existing_job_resource_group};
17
18impl CatalogController {
19 pub async fn get_secret_by_id(&self, secret_id: SecretId) -> MetaResult<PbSecret> {
20 let inner = self.inner.read().await;
21 let (secret, obj) = Secret::find_by_id(secret_id)
22 .find_also_related(Object)
23 .one(&inner.db)
24 .await?
25 .ok_or_else(|| MetaError::catalog_id_not_found("secret", secret_id))?;
26 Ok(ObjectModel(secret, obj.unwrap()).into())
27 }
28
29 pub async fn get_object_database_id(&self, object_id: ObjectId) -> MetaResult<DatabaseId> {
30 let inner = self.inner.read().await;
31 let (database_id,): (Option<DatabaseId>,) = Object::find_by_id(object_id)
32 .select_only()
33 .select_column(object::Column::DatabaseId)
34 .into_tuple()
35 .one(&inner.db)
36 .await?
37 .ok_or_else(|| MetaError::catalog_id_not_found("object", object_id))?;
38 Ok(database_id.ok_or_else(|| anyhow!("object has no database id: {object_id}"))?)
39 }
40
41 pub async fn get_connection_by_id(
42 &self,
43 connection_id: ConnectionId,
44 ) -> MetaResult<PbConnection> {
45 let inner = self.inner.read().await;
46 let (conn, obj) = Connection::find_by_id(connection_id)
47 .find_also_related(Object)
48 .one(&inner.db)
49 .await?
50 .ok_or_else(|| MetaError::catalog_id_not_found("connection", connection_id))?;
51
52 Ok(ObjectModel(conn, obj.unwrap()).into())
53 }
54
55 pub async fn get_table_by_name(
56 &self,
57 database_name: &str,
58 table_name: &str,
59 ) -> MetaResult<Option<PbTable>> {
60 let inner = self.inner.read().await;
61 let table_obj = Table::find()
62 .find_also_related(Object)
63 .join(JoinType::InnerJoin, object::Relation::Database2.def())
64 .filter(
65 table::Column::Name
66 .eq(table_name)
67 .and(database::Column::Name.eq(database_name)),
68 )
69 .one(&inner.db)
70 .await?;
71 Ok(table_obj.map(|(table, obj)| ObjectModel(table, obj.unwrap()).into()))
72 }
73
74 pub async fn get_table_associated_source_id(
75 &self,
76 table_id: TableId,
77 ) -> MetaResult<Option<SourceId>> {
78 let inner = self.inner.read().await;
79 Table::find_by_id(table_id)
80 .select_only()
81 .select_column(table::Column::OptionalAssociatedSourceId)
82 .into_tuple()
83 .one(&inner.db)
84 .await?
85 .ok_or_else(|| MetaError::catalog_id_not_found("table", table_id))
86 }
87
88 pub async fn get_table_by_ids(
89 &self,
90 table_ids: Vec<TableId>,
91 include_dropped_table: bool,
92 ) -> MetaResult<Vec<PbTable>> {
93 let inner = self.inner.read().await;
94 let table_objs = Table::find()
95 .find_also_related(Object)
96 .filter(table::Column::TableId.is_in(table_ids.clone()))
97 .all(&inner.db)
98 .await?;
99 let tables = table_objs
100 .into_iter()
101 .map(|(table, obj)| ObjectModel(table, obj.unwrap()).into());
102 let tables = if include_dropped_table {
103 tables
104 .chain(inner.dropped_tables.iter().filter_map(|(id, t)| {
105 if table_ids.contains(id) {
106 Some(t.clone())
107 } else {
108 None
109 }
110 }))
111 .collect()
112 } else {
113 tables.collect()
114 };
115 Ok(tables)
116 }
117
118 pub async fn get_sink_by_ids(&self, sink_ids: Vec<SinkId>) -> MetaResult<Vec<PbSink>> {
119 let inner = self.inner.read().await;
120 let sink_objs = Sink::find()
121 .find_also_related(Object)
122 .filter(sink::Column::SinkId.is_in(sink_ids))
123 .all(&inner.db)
124 .await?;
125 Ok(sink_objs
126 .into_iter()
127 .map(|(sink, obj)| ObjectModel(sink, obj.unwrap()).into())
128 .collect())
129 }
130
131 pub async fn get_sink_state_table_ids(&self, sink_id: SinkId) -> MetaResult<Vec<TableId>> {
132 let inner = self.inner.read().await;
133 let tables: Vec<I32Array> = Fragment::find()
134 .select_only()
135 .column(fragment::Column::StateTableIds)
136 .filter(fragment::Column::JobId.eq(sink_id))
137 .into_tuple()
138 .all(&inner.db)
139 .await?;
140 Ok(tables
141 .into_iter()
142 .flat_map(|ids| ids.into_inner().into_iter())
143 .collect())
144 }
145
146 pub async fn get_subscription_by_id(
147 &self,
148 subscription_id: SubscriptionId,
149 ) -> MetaResult<PbSubscription> {
150 let inner = self.inner.read().await;
151 let subscription_objs = Subscription::find()
152 .find_also_related(Object)
153 .filter(subscription::Column::SubscriptionId.eq(subscription_id))
154 .all(&inner.db)
155 .await?;
156 let subscription: PbSubscription = subscription_objs
157 .into_iter()
158 .map(|(subscription, obj)| ObjectModel(subscription, obj.unwrap()).into())
159 .find_or_first(|_| true)
160 .ok_or_else(|| anyhow!("cannot find subscription with id {}", subscription_id))?;
161
162 Ok(subscription)
163 }
164
165 pub async fn get_mv_depended_subscriptions(
166 &self,
167 database_id: Option<DatabaseId>,
168 ) -> MetaResult<HashMap<DatabaseId, HashMap<TableId, HashMap<SubscriptionId, u64>>>> {
169 let inner = self.inner.read().await;
170 let select = Subscription::find()
171 .select_only()
172 .select_column(subscription::Column::SubscriptionId)
173 .select_column(subscription::Column::DependentTableId)
174 .select_column(subscription::Column::RetentionSeconds)
175 .select_column(object::Column::DatabaseId)
176 .join(JoinType::InnerJoin, subscription::Relation::Object.def());
177 let select = if let Some(database_id) = database_id {
178 select.filter(object::Column::DatabaseId.eq(database_id))
179 } else {
180 select
181 };
182 let subscription_objs: Vec<(SubscriptionId, ObjectId, i64, DatabaseId)> =
183 select.into_tuple().all(&inner.db).await?;
184 let mut map: HashMap<_, HashMap<_, HashMap<_, _>>> = HashMap::new();
185 for (subscription_id, dependent_table_id, retention_seconds, database_id) in
187 subscription_objs
188 {
189 map.entry(database_id)
190 .or_default()
191 .entry(dependent_table_id)
192 .or_default()
193 .insert(subscription_id, retention_seconds as _);
194 }
195 Ok(map)
196 }
197
198 pub async fn get_all_table_options(&self) -> MetaResult<HashMap<TableId, TableOption>> {
199 let inner = self.inner.read().await;
200 let table_options: Vec<(TableId, Option<i32>)> = Table::find()
201 .select_only()
202 .columns([table::Column::TableId, table::Column::RetentionSeconds])
203 .into_tuple::<(TableId, Option<i32>)>()
204 .all(&inner.db)
205 .await?;
206
207 Ok(table_options
208 .into_iter()
209 .map(|(id, retention_seconds)| {
210 (
211 id,
212 TableOption {
213 retention_seconds: retention_seconds.map(|i| i.try_into().unwrap()),
214 },
215 )
216 })
217 .collect())
218 }
219
220 pub async fn get_all_streaming_parallelisms(
221 &self,
222 ) -> MetaResult<HashMap<ObjectId, StreamingParallelism>> {
223 let inner = self.inner.read().await;
224
225 let job_parallelisms = StreamingJob::find()
226 .select_only()
227 .columns([
228 streaming_job::Column::JobId,
229 streaming_job::Column::Parallelism,
230 ])
231 .into_tuple::<(ObjectId, StreamingParallelism)>()
232 .all(&inner.db)
233 .await?;
234
235 Ok(job_parallelisms
236 .into_iter()
237 .collect::<HashMap<ObjectId, StreamingParallelism>>())
238 }
239
240 pub async fn get_table_name_type_mapping(
241 &self,
242 ) -> MetaResult<HashMap<TableId, (String, String)>> {
243 let inner = self.inner.read().await;
244 let table_name_types: Vec<(TableId, String, TableType)> = Table::find()
245 .select_only()
246 .columns([
247 table::Column::TableId,
248 table::Column::Name,
249 table::Column::TableType,
250 ])
251 .into_tuple()
252 .all(&inner.db)
253 .await?;
254 Ok(table_name_types
255 .into_iter()
256 .map(|(id, name, table_type)| {
257 (
258 id,
259 (name, PbTableType::from(table_type).as_str_name().to_owned()),
260 )
261 })
262 .collect())
263 }
264
265 pub async fn get_table_by_cdc_table_id(
266 &self,
267 cdc_table_id: &String,
268 ) -> MetaResult<Vec<PbTable>> {
269 let inner = self.inner.read().await;
270 let table_objs = Table::find()
271 .find_also_related(Object)
272 .filter(table::Column::CdcTableId.eq(cdc_table_id))
273 .all(&inner.db)
274 .await?;
275 Ok(table_objs
276 .into_iter()
277 .map(|(table, obj)| ObjectModel(table, obj.unwrap()).into())
278 .collect())
279 }
280
281 pub async fn get_created_table_ids(&self) -> MetaResult<Vec<TableId>> {
282 let inner = self.inner.read().await;
283
284 let mut table_ids: Vec<TableId> = StreamingJob::find()
286 .select_only()
287 .column(streaming_job::Column::JobId)
288 .filter(streaming_job::Column::JobStatus.eq(JobStatus::Created))
289 .into_tuple()
290 .all(&inner.db)
291 .await?;
292
293 let internal_table_ids: Vec<TableId> = Table::find()
295 .select_only()
296 .column(table::Column::TableId)
297 .filter(table::Column::BelongsToJobId.is_in(table_ids.clone()))
298 .into_tuple()
299 .all(&inner.db)
300 .await?;
301 table_ids.extend(internal_table_ids);
302
303 Ok(table_ids)
304 }
305
306 pub async fn get_versioned_table_schemas(&self) -> MetaResult<HashMap<TableId, Vec<i32>>> {
309 let res = self
310 .list_all_state_tables()
311 .await?
312 .into_iter()
313 .filter_map(|t| {
314 if t.version.is_some() {
315 let ret = (
316 t.id.try_into().unwrap(),
317 t.columns
318 .iter()
319 .map(|c| c.column_desc.as_ref().unwrap().column_id)
320 .collect_vec(),
321 );
322 return Some(ret);
323 }
324 None
325 })
326 .collect();
327 Ok(res)
328 }
329
330 pub async fn get_existing_job_resource_group(
331 &self,
332 streaming_job_id: ObjectId,
333 ) -> MetaResult<String> {
334 let inner = self.inner.read().await;
335 get_existing_job_resource_group(&inner.db, streaming_job_id).await
336 }
337
338 pub async fn get_database_resource_group(&self, database_id: ObjectId) -> MetaResult<String> {
339 let inner = self.inner.read().await;
340 get_database_resource_group(&inner.db, database_id).await
341 }
342
343 pub async fn get_existing_job_resource_groups(
344 &self,
345 streaming_job_ids: Vec<ObjectId>,
346 ) -> MetaResult<HashMap<ObjectId, String>> {
347 let inner = self.inner.read().await;
348 let mut resource_groups = HashMap::new();
349 for job_id in streaming_job_ids {
350 let resource_group = get_existing_job_resource_group(&inner.db, job_id).await?;
351 resource_groups.insert(job_id, resource_group);
352 }
353
354 Ok(resource_groups)
355 }
356
357 pub async fn get_existing_job_database_resource_group(
358 &self,
359 streaming_job_id: ObjectId,
360 ) -> MetaResult<String> {
361 let inner = self.inner.read().await;
362 let database_id: ObjectId = StreamingJob::find_by_id(streaming_job_id)
363 .select_only()
364 .join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
365 .column(object::Column::DatabaseId)
366 .into_tuple()
367 .one(&inner.db)
368 .await?
369 .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", streaming_job_id))?;
370
371 get_database_resource_group(&inner.db, database_id).await
372 }
373
374 pub async fn get_job_streaming_parallelisms(
375 &self,
376 streaming_job_id: ObjectId,
377 ) -> MetaResult<StreamingParallelism> {
378 let inner = self.inner.read().await;
379
380 let job_parallelism: StreamingParallelism = StreamingJob::find_by_id(streaming_job_id)
381 .select_only()
382 .column(streaming_job::Column::Parallelism)
383 .into_tuple()
384 .one(&inner.db)
385 .await?
386 .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", streaming_job_id))?;
387
388 Ok(job_parallelism)
389 }
390
391 pub async fn get_fragment_streaming_job_id(
392 &self,
393 fragment_id: FragmentId,
394 ) -> MetaResult<ObjectId> {
395 let inner = self.inner.read().await;
396 let job_id: ObjectId = Fragment::find_by_id(fragment_id)
397 .select_only()
398 .column(fragment::Column::JobId)
399 .into_tuple()
400 .one(&inner.db)
401 .await?
402 .ok_or_else(|| MetaError::catalog_id_not_found("fragment", fragment_id))?;
403 Ok(job_id)
404 }
405}