risingwave_meta/controller/catalog/
get_op.rs1use risingwave_common::catalog::ColumnCatalog;
16
17use super::*;
18use crate::controller::utils::{
19 get_database_resource_group, get_existing_job_resource_group, get_table_columns,
20};
21
22impl CatalogController {
23 pub async fn get_secret_by_id(&self, secret_id: SecretId) -> MetaResult<PbSecret> {
24 let inner = self.inner.read().await;
25 let (secret, obj) = Secret::find_by_id(secret_id)
26 .find_also_related(Object)
27 .one(&inner.db)
28 .await?
29 .ok_or_else(|| MetaError::catalog_id_not_found("secret", secret_id))?;
30 Ok(ObjectModel(secret, obj.unwrap()).into())
31 }
32
33 pub async fn get_object_database_id(&self, object_id: ObjectId) -> MetaResult<DatabaseId> {
34 let inner = self.inner.read().await;
35 let (database_id,): (Option<DatabaseId>,) = Object::find_by_id(object_id)
36 .select_only()
37 .select_column(object::Column::DatabaseId)
38 .into_tuple()
39 .one(&inner.db)
40 .await?
41 .ok_or_else(|| MetaError::catalog_id_not_found("object", object_id))?;
42 Ok(database_id.ok_or_else(|| anyhow!("object has no database id: {object_id}"))?)
43 }
44
45 pub async fn get_connection_by_id(
46 &self,
47 connection_id: ConnectionId,
48 ) -> MetaResult<PbConnection> {
49 let inner = self.inner.read().await;
50 let (conn, obj) = Connection::find_by_id(connection_id)
51 .find_also_related(Object)
52 .one(&inner.db)
53 .await?
54 .ok_or_else(|| MetaError::catalog_id_not_found("connection", connection_id))?;
55
56 Ok(ObjectModel(conn, obj.unwrap()).into())
57 }
58
59 pub async fn get_table_by_name(
60 &self,
61 database_name: &str,
62 table_name: &str,
63 ) -> MetaResult<Option<PbTable>> {
64 let inner = self.inner.read().await;
65 let table_obj = Table::find()
66 .find_also_related(Object)
67 .join(JoinType::InnerJoin, object::Relation::Database2.def())
68 .filter(
69 table::Column::Name
70 .eq(table_name)
71 .and(database::Column::Name.eq(database_name)),
72 )
73 .one(&inner.db)
74 .await?;
75 Ok(table_obj.map(|(table, obj)| ObjectModel(table, obj.unwrap()).into()))
76 }
77
78 pub async fn get_table_associated_source_id(
79 &self,
80 table_id: TableId,
81 ) -> MetaResult<Option<SourceId>> {
82 let inner = self.inner.read().await;
83 Table::find_by_id(table_id)
84 .select_only()
85 .select_column(table::Column::OptionalAssociatedSourceId)
86 .into_tuple()
87 .one(&inner.db)
88 .await?
89 .ok_or_else(|| MetaError::catalog_id_not_found("table", table_id))
90 }
91
92 pub async fn get_table_by_id(&self, table_id: TableId) -> MetaResult<PbTable> {
93 let inner = self.inner.read().await;
94 let table_obj = Table::find_by_id(table_id)
95 .find_also_related(Object)
96 .one(&inner.db)
97 .await?;
98 if let Some((table, obj)) = table_obj {
99 Ok(ObjectModel(table, obj.unwrap()).into())
100 } else {
101 Err(MetaError::catalog_id_not_found("table", table_id))
102 }
103 }
104
105 pub async fn get_table_by_ids(
106 &self,
107 table_ids: Vec<TableId>,
108 include_dropped_table: bool,
109 ) -> MetaResult<Vec<PbTable>> {
110 let inner = self.inner.read().await;
111 let table_objs = Table::find()
112 .find_also_related(Object)
113 .filter(table::Column::TableId.is_in(table_ids.clone()))
114 .all(&inner.db)
115 .await?;
116 let tables = table_objs
117 .into_iter()
118 .map(|(table, obj)| ObjectModel(table, obj.unwrap()).into());
119 let tables = if include_dropped_table {
120 tables
121 .chain(inner.dropped_tables.iter().filter_map(|(id, t)| {
122 if table_ids.contains(id) {
123 Some(t.clone())
124 } else {
125 None
126 }
127 }))
128 .collect()
129 } else {
130 tables.collect()
131 };
132 Ok(tables)
133 }
134
135 pub async fn get_table_columns(&self, id: TableId) -> MetaResult<Vec<ColumnCatalog>> {
136 let inner = self.inner.read().await;
137 Ok(get_table_columns(&inner.db, id)
138 .await?
139 .to_protobuf()
140 .into_iter()
141 .map(|col| col.into())
142 .collect())
143 }
144
145 pub async fn get_sink_by_ids(&self, sink_ids: Vec<SinkId>) -> MetaResult<Vec<PbSink>> {
146 let inner = self.inner.read().await;
147 let sink_objs = Sink::find()
148 .find_also_related(Object)
149 .filter(sink::Column::SinkId.is_in(sink_ids))
150 .all(&inner.db)
151 .await?;
152 Ok(sink_objs
153 .into_iter()
154 .map(|(sink, obj)| ObjectModel(sink, obj.unwrap()).into())
155 .collect())
156 }
157
158 pub async fn get_sink_auto_refresh_schema_from(
159 &self,
160 table_id: TableId,
161 ) -> MetaResult<Vec<PbSink>> {
162 let inner = self.inner.read().await;
163 let sink_objs = Sink::find()
164 .find_also_related(Object)
165 .filter(sink::Column::AutoRefreshSchemaFromTable.eq(table_id))
166 .all(&inner.db)
167 .await?;
168 Ok(sink_objs
169 .into_iter()
170 .map(|(sink, obj)| ObjectModel(sink, obj.unwrap()).into())
171 .collect())
172 }
173
174 pub async fn get_sink_state_table_ids(&self, sink_id: SinkId) -> MetaResult<Vec<TableId>> {
175 let inner = self.inner.read().await;
176 let tables: Vec<I32Array> = Fragment::find()
177 .select_only()
178 .column(fragment::Column::StateTableIds)
179 .filter(fragment::Column::JobId.eq(sink_id))
180 .into_tuple()
181 .all(&inner.db)
182 .await?;
183 Ok(tables
184 .into_iter()
185 .flat_map(|ids| ids.into_inner().into_iter())
186 .collect())
187 }
188
189 pub async fn get_subscription_by_id(
190 &self,
191 subscription_id: SubscriptionId,
192 ) -> MetaResult<PbSubscription> {
193 let inner = self.inner.read().await;
194 let subscription_objs = Subscription::find()
195 .find_also_related(Object)
196 .filter(subscription::Column::SubscriptionId.eq(subscription_id))
197 .all(&inner.db)
198 .await?;
199 let subscription: PbSubscription = subscription_objs
200 .into_iter()
201 .map(|(subscription, obj)| ObjectModel(subscription, obj.unwrap()).into())
202 .find_or_first(|_| true)
203 .ok_or_else(|| anyhow!("cannot find subscription with id {}", subscription_id))?;
204
205 Ok(subscription)
206 }
207
208 pub async fn get_mv_depended_subscriptions(
209 &self,
210 database_id: Option<DatabaseId>,
211 ) -> MetaResult<HashMap<DatabaseId, HashMap<TableId, HashMap<SubscriptionId, u64>>>> {
212 let inner = self.inner.read().await;
213 let select = Subscription::find()
214 .select_only()
215 .select_column(subscription::Column::SubscriptionId)
216 .select_column(subscription::Column::DependentTableId)
217 .select_column(subscription::Column::RetentionSeconds)
218 .select_column(object::Column::DatabaseId)
219 .join(JoinType::InnerJoin, subscription::Relation::Object.def());
220 let select = if let Some(database_id) = database_id {
221 select.filter(object::Column::DatabaseId.eq(database_id))
222 } else {
223 select
224 };
225 let subscription_objs: Vec<(SubscriptionId, ObjectId, i64, DatabaseId)> =
226 select.into_tuple().all(&inner.db).await?;
227 let mut map: HashMap<_, HashMap<_, HashMap<_, _>>> = HashMap::new();
228 for (subscription_id, dependent_table_id, retention_seconds, database_id) in
230 subscription_objs
231 {
232 map.entry(database_id)
233 .or_default()
234 .entry(dependent_table_id)
235 .or_default()
236 .insert(subscription_id, retention_seconds as _);
237 }
238 Ok(map)
239 }
240
241 pub async fn get_all_table_options(&self) -> MetaResult<HashMap<TableId, TableOption>> {
242 let inner = self.inner.read().await;
243 let table_options: Vec<(TableId, Option<i32>)> = Table::find()
244 .select_only()
245 .columns([table::Column::TableId, table::Column::RetentionSeconds])
246 .into_tuple::<(TableId, Option<i32>)>()
247 .all(&inner.db)
248 .await?;
249
250 Ok(table_options
251 .into_iter()
252 .map(|(id, retention_seconds)| {
253 (
254 id,
255 TableOption {
256 retention_seconds: retention_seconds.map(|i| i.try_into().unwrap()),
257 },
258 )
259 })
260 .collect())
261 }
262
263 pub async fn get_all_streaming_parallelisms(
264 &self,
265 ) -> MetaResult<HashMap<ObjectId, StreamingParallelism>> {
266 let inner = self.inner.read().await;
267
268 let job_parallelisms = StreamingJob::find()
269 .select_only()
270 .columns([
271 streaming_job::Column::JobId,
272 streaming_job::Column::Parallelism,
273 ])
274 .into_tuple::<(ObjectId, StreamingParallelism)>()
275 .all(&inner.db)
276 .await?;
277
278 Ok(job_parallelisms
279 .into_iter()
280 .collect::<HashMap<ObjectId, StreamingParallelism>>())
281 }
282
283 pub async fn get_table_name_type_mapping(
284 &self,
285 ) -> MetaResult<HashMap<TableId, (String, String)>> {
286 let inner = self.inner.read().await;
287 let table_name_types: Vec<(TableId, String, TableType)> = Table::find()
288 .select_only()
289 .columns([
290 table::Column::TableId,
291 table::Column::Name,
292 table::Column::TableType,
293 ])
294 .into_tuple()
295 .all(&inner.db)
296 .await?;
297 Ok(table_name_types
298 .into_iter()
299 .map(|(id, name, table_type)| {
300 (
301 id,
302 (name, PbTableType::from(table_type).as_str_name().to_owned()),
303 )
304 })
305 .collect())
306 }
307
308 pub async fn get_table_by_cdc_table_id(
309 &self,
310 cdc_table_id: &String,
311 ) -> MetaResult<Vec<PbTable>> {
312 let inner = self.inner.read().await;
313 let table_objs = Table::find()
314 .find_also_related(Object)
315 .filter(table::Column::CdcTableId.eq(cdc_table_id))
316 .all(&inner.db)
317 .await?;
318 Ok(table_objs
319 .into_iter()
320 .map(|(table, obj)| ObjectModel(table, obj.unwrap()).into())
321 .collect())
322 }
323
324 pub async fn get_created_table_ids(&self) -> MetaResult<Vec<TableId>> {
325 let inner = self.inner.read().await;
326
327 let mut table_ids: Vec<TableId> = StreamingJob::find()
329 .select_only()
330 .column(streaming_job::Column::JobId)
331 .filter(streaming_job::Column::JobStatus.eq(JobStatus::Created))
332 .into_tuple()
333 .all(&inner.db)
334 .await?;
335
336 let internal_table_ids: Vec<TableId> = Table::find()
338 .select_only()
339 .column(table::Column::TableId)
340 .filter(table::Column::BelongsToJobId.is_in(table_ids.clone()))
341 .into_tuple()
342 .all(&inner.db)
343 .await?;
344 table_ids.extend(internal_table_ids);
345
346 Ok(table_ids)
347 }
348
349 pub async fn get_versioned_table_schemas(&self) -> MetaResult<HashMap<TableId, Vec<i32>>> {
352 let res = self
353 .list_all_state_tables()
354 .await?
355 .into_iter()
356 .filter_map(|t| {
357 if t.version.is_some() {
358 let ret = (
359 t.id.try_into().unwrap(),
360 t.columns
361 .iter()
362 .map(|c| c.column_desc.as_ref().unwrap().column_id)
363 .collect_vec(),
364 );
365 return Some(ret);
366 }
367 None
368 })
369 .collect();
370 Ok(res)
371 }
372
373 pub async fn get_existing_job_resource_group(
374 &self,
375 streaming_job_id: ObjectId,
376 ) -> MetaResult<String> {
377 let inner = self.inner.read().await;
378 get_existing_job_resource_group(&inner.db, streaming_job_id).await
379 }
380
381 pub async fn get_database_resource_group(&self, database_id: ObjectId) -> MetaResult<String> {
382 let inner = self.inner.read().await;
383 get_database_resource_group(&inner.db, database_id).await
384 }
385
386 pub async fn get_existing_job_resource_groups(
387 &self,
388 streaming_job_ids: Vec<ObjectId>,
389 ) -> MetaResult<HashMap<ObjectId, String>> {
390 let inner = self.inner.read().await;
391 let mut resource_groups = HashMap::new();
392 for job_id in streaming_job_ids {
393 let resource_group = get_existing_job_resource_group(&inner.db, job_id).await?;
394 resource_groups.insert(job_id, resource_group);
395 }
396
397 Ok(resource_groups)
398 }
399
400 pub async fn get_existing_job_database_resource_group(
401 &self,
402 streaming_job_id: ObjectId,
403 ) -> MetaResult<String> {
404 let inner = self.inner.read().await;
405 let database_id: ObjectId = StreamingJob::find_by_id(streaming_job_id)
406 .select_only()
407 .join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
408 .column(object::Column::DatabaseId)
409 .into_tuple()
410 .one(&inner.db)
411 .await?
412 .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", streaming_job_id))?;
413
414 get_database_resource_group(&inner.db, database_id).await
415 }
416
417 pub async fn get_job_streaming_parallelisms(
418 &self,
419 streaming_job_id: ObjectId,
420 ) -> MetaResult<StreamingParallelism> {
421 let inner = self.inner.read().await;
422
423 let job_parallelism: StreamingParallelism = StreamingJob::find_by_id(streaming_job_id)
424 .select_only()
425 .column(streaming_job::Column::Parallelism)
426 .into_tuple()
427 .one(&inner.db)
428 .await?
429 .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", streaming_job_id))?;
430
431 Ok(job_parallelism)
432 }
433
434 pub async fn get_fragment_streaming_job_id(
435 &self,
436 fragment_id: FragmentId,
437 ) -> MetaResult<ObjectId> {
438 let inner = self.inner.read().await;
439 let job_id: ObjectId = Fragment::find_by_id(fragment_id)
440 .select_only()
441 .column(fragment::Column::JobId)
442 .into_tuple()
443 .one(&inner.db)
444 .await?
445 .ok_or_else(|| MetaError::catalog_id_not_found("fragment", fragment_id))?;
446 Ok(job_id)
447 }
448
449 pub async fn list_table_objects(
451 &self,
452 ) -> MetaResult<Vec<(TableId, String, String, String, String)>> {
453 let inner = self.inner.read().await;
454 Ok(Object::find()
455 .select_only()
456 .join(JoinType::InnerJoin, object::Relation::Table.def())
457 .join(JoinType::InnerJoin, object::Relation::Database2.def())
458 .join(JoinType::InnerJoin, object::Relation::Schema2.def())
459 .column(object::Column::Oid)
460 .column(database::Column::Name)
461 .column(schema::Column::Name)
462 .column(table::Column::Name)
463 .column(database::Column::ResourceGroup)
464 .into_tuple()
465 .all(&inner.db)
466 .await?)
467 }
468
469 pub async fn list_source_objects(
471 &self,
472 ) -> MetaResult<Vec<(TableId, String, String, String, String)>> {
473 let inner = self.inner.read().await;
474 Ok(Object::find()
475 .select_only()
476 .join(JoinType::InnerJoin, object::Relation::Source.def())
477 .join(JoinType::InnerJoin, object::Relation::Database2.def())
478 .join(JoinType::InnerJoin, object::Relation::Schema2.def())
479 .column(object::Column::Oid)
480 .column(database::Column::Name)
481 .column(schema::Column::Name)
482 .column(source::Column::Name)
483 .column(database::Column::ResourceGroup)
484 .into_tuple()
485 .all(&inner.db)
486 .await?)
487 }
488
489 pub async fn list_sink_objects(
491 &self,
492 ) -> MetaResult<Vec<(TableId, String, String, String, String)>> {
493 let inner = self.inner.read().await;
494 Ok(Object::find()
495 .select_only()
496 .join(JoinType::InnerJoin, object::Relation::Sink.def())
497 .join(JoinType::InnerJoin, object::Relation::Database2.def())
498 .join(JoinType::InnerJoin, object::Relation::Schema2.def())
499 .column(object::Column::Oid)
500 .column(database::Column::Name)
501 .column(schema::Column::Name)
502 .column(sink::Column::Name)
503 .column(database::Column::ResourceGroup)
504 .into_tuple()
505 .all(&inner.db)
506 .await?)
507 }
508}