1use risingwave_common::catalog::ColumnCatalog;
16use risingwave_meta_model::table::RefreshState;
17
18use super::*;
19use crate::controller::utils::{
20 get_database_resource_group, get_existing_job_resource_group, get_table_columns,
21};
22
23impl CatalogController {
24 pub async fn get_secret_by_id(&self, secret_id: SecretId) -> MetaResult<PbSecret> {
25 let inner = self.inner.read().await;
26 let (secret, obj) = Secret::find_by_id(secret_id)
27 .find_also_related(Object)
28 .one(&inner.db)
29 .await?
30 .ok_or_else(|| MetaError::catalog_id_not_found("secret", secret_id))?;
31 Ok(ObjectModel(secret, obj.unwrap()).into())
32 }
33
34 pub async fn get_object_database_id(&self, object_id: ObjectId) -> MetaResult<DatabaseId> {
35 let inner = self.inner.read().await;
36 let (database_id,): (Option<DatabaseId>,) = Object::find_by_id(object_id)
37 .select_only()
38 .select_column(object::Column::DatabaseId)
39 .into_tuple()
40 .one(&inner.db)
41 .await?
42 .ok_or_else(|| MetaError::catalog_id_not_found("object", object_id))?;
43 Ok(database_id.ok_or_else(|| anyhow!("object has no database id: {object_id}"))?)
44 }
45
46 pub async fn get_connection_by_id(
47 &self,
48 connection_id: ConnectionId,
49 ) -> MetaResult<PbConnection> {
50 let inner = self.inner.read().await;
51 let (conn, obj) = Connection::find_by_id(connection_id)
52 .find_also_related(Object)
53 .one(&inner.db)
54 .await?
55 .ok_or_else(|| MetaError::catalog_id_not_found("connection", connection_id))?;
56
57 Ok(ObjectModel(conn, obj.unwrap()).into())
58 }
59
60 pub async fn get_table_by_name(
61 &self,
62 database_name: &str,
63 table_name: &str,
64 ) -> MetaResult<Option<PbTable>> {
65 let inner = self.inner.read().await;
66 let table_obj = Table::find()
67 .find_also_related(Object)
68 .join(JoinType::InnerJoin, object::Relation::Database2.def())
69 .filter(
70 table::Column::Name
71 .eq(table_name)
72 .and(database::Column::Name.eq(database_name)),
73 )
74 .one(&inner.db)
75 .await?;
76 Ok(table_obj.map(|(table, obj)| ObjectModel(table, obj.unwrap()).into()))
77 }
78
79 pub async fn get_table_associated_source_id(
80 &self,
81 table_id: TableId,
82 ) -> MetaResult<Option<SourceId>> {
83 let inner = self.inner.read().await;
84 Table::find_by_id(table_id)
85 .select_only()
86 .select_column(table::Column::OptionalAssociatedSourceId)
87 .into_tuple()
88 .one(&inner.db)
89 .await?
90 .ok_or_else(|| MetaError::catalog_id_not_found("table", table_id))
91 }
92
93 pub async fn get_table_by_associate_source_id(
94 &self,
95 associated_source_id: SourceId,
96 ) -> MetaResult<PbTable> {
97 let inner = self.inner.read().await;
98 Table::find()
99 .find_also_related(Object)
100 .filter(table::Column::OptionalAssociatedSourceId.eq(associated_source_id))
101 .one(&inner.db)
102 .await?
103 .map(|(table, obj)| ObjectModel(table, obj.unwrap()).into())
104 .ok_or_else(|| {
105 MetaError::catalog_id_not_found("table associated source", associated_source_id)
106 })
107 }
108
109 pub async fn get_table_by_id(&self, table_id: TableId) -> MetaResult<PbTable> {
110 let inner = self.inner.read().await;
111 let table_obj = Table::find_by_id(table_id)
112 .find_also_related(Object)
113 .one(&inner.db)
114 .await?;
115 if let Some((table, obj)) = table_obj {
116 Ok(ObjectModel(table, obj.unwrap()).into())
117 } else {
118 Err(MetaError::catalog_id_not_found("table", table_id))
119 }
120 }
121
122 pub async fn get_user_created_table_by_ids(
123 &self,
124 table_ids: Vec<TableId>,
125 ) -> MetaResult<Vec<PbTable>> {
126 let inner = self.inner.read().await;
127 let table_objs = Table::find()
128 .find_also_related(Object)
129 .filter(
130 table::Column::TableId
131 .is_in(table_ids.clone())
132 .and(table::Column::TableType.eq(TableType::Table)),
133 )
134 .all(&inner.db)
135 .await?;
136 let tables = table_objs
137 .into_iter()
138 .map(|(table, obj)| ObjectModel(table, obj.unwrap()).into())
139 .collect();
140 Ok(tables)
141 }
142
143 pub async fn get_table_by_ids(
144 &self,
145 table_ids: Vec<TableId>,
146 include_dropped_table: bool,
147 ) -> MetaResult<Vec<PbTable>> {
148 let inner = self.inner.read().await;
149 let table_objs = Table::find()
150 .find_also_related(Object)
151 .filter(table::Column::TableId.is_in(table_ids.clone()))
152 .all(&inner.db)
153 .await?;
154 let tables = table_objs
155 .into_iter()
156 .map(|(table, obj)| ObjectModel(table, obj.unwrap()).into());
157 let tables = if include_dropped_table {
158 tables
159 .chain(inner.dropped_tables.iter().filter_map(|(id, t)| {
160 if table_ids.contains(id) {
161 Some(t.clone())
162 } else {
163 None
164 }
165 }))
166 .collect()
167 } else {
168 tables.collect()
169 };
170 Ok(tables)
171 }
172
173 pub async fn get_table_columns(&self, id: TableId) -> MetaResult<Vec<ColumnCatalog>> {
174 let inner = self.inner.read().await;
175 Ok(get_table_columns(&inner.db, id)
176 .await?
177 .to_protobuf()
178 .into_iter()
179 .map(|col| col.into())
180 .collect())
181 }
182
183 pub async fn get_table_incoming_sinks(&self, table_id: TableId) -> MetaResult<Vec<PbSink>> {
184 let inner = self.inner.read().await;
185 let sink_objs = Sink::find()
186 .find_also_related(Object)
187 .filter(sink::Column::TargetTable.eq(table_id))
188 .all(&inner.db)
189 .await?;
190 Ok(sink_objs
191 .into_iter()
192 .map(|(sink, obj)| ObjectModel(sink, obj.unwrap()).into())
193 .collect())
194 }
195
196 pub async fn get_table_refresh_state(
198 &self,
199 table_id: TableId,
200 ) -> MetaResult<Option<RefreshState>> {
201 let inner = self.inner.read().await;
202 let (refresh_state,): (Option<RefreshState>,) = Table::find_by_id(table_id)
203 .select_only()
204 .select_column(table::Column::RefreshState)
205 .into_tuple()
206 .one(&inner.db)
207 .await?
208 .ok_or_else(|| MetaError::catalog_id_not_found("table", table_id))?;
209
210 Ok(Some(refresh_state.unwrap_or(RefreshState::Idle)))
212 }
213
214 pub async fn get_sink_by_ids(&self, sink_ids: Vec<SinkId>) -> MetaResult<Vec<PbSink>> {
215 let inner = self.inner.read().await;
216 let sink_objs = Sink::find()
217 .find_also_related(Object)
218 .filter(sink::Column::SinkId.is_in(sink_ids))
219 .all(&inner.db)
220 .await?;
221 Ok(sink_objs
222 .into_iter()
223 .map(|(sink, obj)| ObjectModel(sink, obj.unwrap()).into())
224 .collect())
225 }
226
227 pub async fn get_sink_auto_refresh_schema_from(
228 &self,
229 table_id: TableId,
230 ) -> MetaResult<Vec<PbSink>> {
231 let inner = self.inner.read().await;
232 let sink_objs = Sink::find()
233 .find_also_related(Object)
234 .filter(sink::Column::AutoRefreshSchemaFromTable.eq(table_id))
235 .all(&inner.db)
236 .await?;
237 Ok(sink_objs
238 .into_iter()
239 .map(|(sink, obj)| ObjectModel(sink, obj.unwrap()).into())
240 .collect())
241 }
242
243 pub async fn get_sink_state_table_ids(&self, sink_id: SinkId) -> MetaResult<Vec<TableId>> {
244 let inner = self.inner.read().await;
245 let tables: Vec<I32Array> = Fragment::find()
246 .select_only()
247 .column(fragment::Column::StateTableIds)
248 .filter(fragment::Column::JobId.eq(sink_id))
249 .into_tuple()
250 .all(&inner.db)
251 .await?;
252 Ok(tables
253 .into_iter()
254 .flat_map(|ids| ids.into_inner().into_iter())
255 .collect())
256 }
257
258 pub async fn get_subscription_by_id(
259 &self,
260 subscription_id: SubscriptionId,
261 ) -> MetaResult<PbSubscription> {
262 let inner = self.inner.read().await;
263 let subscription_objs = Subscription::find()
264 .find_also_related(Object)
265 .filter(subscription::Column::SubscriptionId.eq(subscription_id))
266 .all(&inner.db)
267 .await?;
268 let subscription: PbSubscription = subscription_objs
269 .into_iter()
270 .map(|(subscription, obj)| ObjectModel(subscription, obj.unwrap()).into())
271 .find_or_first(|_| true)
272 .ok_or_else(|| anyhow!("cannot find subscription with id {}", subscription_id))?;
273
274 Ok(subscription)
275 }
276
277 pub async fn get_mv_depended_subscriptions(
278 &self,
279 database_id: Option<DatabaseId>,
280 ) -> MetaResult<HashMap<DatabaseId, HashMap<TableId, HashMap<SubscriptionId, u64>>>> {
281 let inner = self.inner.read().await;
282 let select = Subscription::find()
283 .select_only()
284 .select_column(subscription::Column::SubscriptionId)
285 .select_column(subscription::Column::DependentTableId)
286 .select_column(subscription::Column::RetentionSeconds)
287 .select_column(object::Column::DatabaseId)
288 .join(JoinType::InnerJoin, subscription::Relation::Object.def());
289 let select = if let Some(database_id) = database_id {
290 select.filter(object::Column::DatabaseId.eq(database_id))
291 } else {
292 select
293 };
294 let subscription_objs: Vec<(SubscriptionId, ObjectId, i64, DatabaseId)> =
295 select.into_tuple().all(&inner.db).await?;
296 let mut map: HashMap<_, HashMap<_, HashMap<_, _>>> = HashMap::new();
297 for (subscription_id, dependent_table_id, retention_seconds, database_id) in
299 subscription_objs
300 {
301 map.entry(database_id)
302 .or_default()
303 .entry(dependent_table_id)
304 .or_default()
305 .insert(subscription_id, retention_seconds as _);
306 }
307 Ok(map)
308 }
309
310 pub async fn get_all_table_options(&self) -> MetaResult<HashMap<TableId, TableOption>> {
311 let inner = self.inner.read().await;
312 let table_options: Vec<(TableId, Option<i32>)> = Table::find()
313 .select_only()
314 .columns([table::Column::TableId, table::Column::RetentionSeconds])
315 .into_tuple::<(TableId, Option<i32>)>()
316 .all(&inner.db)
317 .await?;
318
319 Ok(table_options
320 .into_iter()
321 .map(|(id, retention_seconds)| {
322 (
323 id,
324 TableOption {
325 retention_seconds: retention_seconds.map(|i| i.try_into().unwrap()),
326 },
327 )
328 })
329 .collect())
330 }
331
332 pub async fn get_all_streaming_parallelisms(
333 &self,
334 ) -> MetaResult<HashMap<ObjectId, StreamingParallelism>> {
335 let inner = self.inner.read().await;
336
337 let job_parallelisms = StreamingJob::find()
338 .select_only()
339 .columns([
340 streaming_job::Column::JobId,
341 streaming_job::Column::Parallelism,
342 ])
343 .into_tuple::<(ObjectId, StreamingParallelism)>()
344 .all(&inner.db)
345 .await?;
346
347 Ok(job_parallelisms
348 .into_iter()
349 .collect::<HashMap<ObjectId, StreamingParallelism>>())
350 }
351
352 pub async fn get_table_name_type_mapping(
353 &self,
354 ) -> MetaResult<HashMap<TableId, (String, String)>> {
355 let inner = self.inner.read().await;
356 let table_name_types: Vec<(TableId, String, TableType)> = Table::find()
357 .select_only()
358 .columns([
359 table::Column::TableId,
360 table::Column::Name,
361 table::Column::TableType,
362 ])
363 .into_tuple()
364 .all(&inner.db)
365 .await?;
366 Ok(table_name_types
367 .into_iter()
368 .map(|(id, name, table_type)| {
369 (
370 id,
371 (name, PbTableType::from(table_type).as_str_name().to_owned()),
372 )
373 })
374 .collect())
375 }
376
377 pub async fn get_table_by_cdc_table_id(
378 &self,
379 cdc_table_id: &String,
380 ) -> MetaResult<Vec<PbTable>> {
381 let inner = self.inner.read().await;
382 let table_objs = Table::find()
383 .find_also_related(Object)
384 .filter(table::Column::CdcTableId.eq(cdc_table_id))
385 .all(&inner.db)
386 .await?;
387 Ok(table_objs
388 .into_iter()
389 .map(|(table, obj)| ObjectModel(table, obj.unwrap()).into())
390 .collect())
391 }
392
393 pub async fn get_created_table_ids(&self) -> MetaResult<Vec<TableId>> {
394 let inner = self.inner.read().await;
395
396 let mut table_ids: Vec<TableId> = StreamingJob::find()
398 .select_only()
399 .column(streaming_job::Column::JobId)
400 .filter(streaming_job::Column::JobStatus.eq(JobStatus::Created))
401 .into_tuple()
402 .all(&inner.db)
403 .await?;
404
405 let internal_table_ids: Vec<TableId> = Table::find()
407 .select_only()
408 .column(table::Column::TableId)
409 .filter(table::Column::BelongsToJobId.is_in(table_ids.clone()))
410 .into_tuple()
411 .all(&inner.db)
412 .await?;
413 table_ids.extend(internal_table_ids);
414
415 Ok(table_ids)
416 }
417
418 pub async fn get_versioned_table_schemas(&self) -> MetaResult<HashMap<TableId, Vec<i32>>> {
421 let res = self
422 .list_all_state_tables()
423 .await?
424 .into_iter()
425 .filter_map(|t| {
426 if t.version.is_some() {
427 let ret = (
428 t.id.try_into().unwrap(),
429 t.columns
430 .iter()
431 .map(|c| c.column_desc.as_ref().unwrap().column_id)
432 .collect_vec(),
433 );
434 return Some(ret);
435 }
436 None
437 })
438 .collect();
439 Ok(res)
440 }
441
442 pub async fn get_existing_job_resource_group(
443 &self,
444 streaming_job_id: ObjectId,
445 ) -> MetaResult<String> {
446 let inner = self.inner.read().await;
447 get_existing_job_resource_group(&inner.db, streaming_job_id).await
448 }
449
450 pub async fn get_database_resource_group(&self, database_id: ObjectId) -> MetaResult<String> {
451 let inner = self.inner.read().await;
452 get_database_resource_group(&inner.db, database_id).await
453 }
454
455 pub async fn get_existing_job_resource_groups(
456 &self,
457 streaming_job_ids: Vec<ObjectId>,
458 ) -> MetaResult<HashMap<ObjectId, String>> {
459 let inner = self.inner.read().await;
460 let mut resource_groups = HashMap::new();
461 for job_id in streaming_job_ids {
462 let resource_group = get_existing_job_resource_group(&inner.db, job_id).await?;
463 resource_groups.insert(job_id, resource_group);
464 }
465
466 Ok(resource_groups)
467 }
468
469 pub async fn get_existing_job_database_resource_group(
470 &self,
471 streaming_job_id: ObjectId,
472 ) -> MetaResult<String> {
473 let inner = self.inner.read().await;
474 let database_id: ObjectId = StreamingJob::find_by_id(streaming_job_id)
475 .select_only()
476 .join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
477 .column(object::Column::DatabaseId)
478 .into_tuple()
479 .one(&inner.db)
480 .await?
481 .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", streaming_job_id))?;
482
483 get_database_resource_group(&inner.db, database_id).await
484 }
485
486 pub async fn get_job_streaming_parallelisms(
487 &self,
488 streaming_job_id: ObjectId,
489 ) -> MetaResult<StreamingParallelism> {
490 let inner = self.inner.read().await;
491
492 let job_parallelism: StreamingParallelism = StreamingJob::find_by_id(streaming_job_id)
493 .select_only()
494 .column(streaming_job::Column::Parallelism)
495 .into_tuple()
496 .one(&inner.db)
497 .await?
498 .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", streaming_job_id))?;
499
500 Ok(job_parallelism)
501 }
502
503 pub async fn get_fragment_streaming_job_id(
504 &self,
505 fragment_id: FragmentId,
506 ) -> MetaResult<ObjectId> {
507 let inner = self.inner.read().await;
508 let job_id: ObjectId = Fragment::find_by_id(fragment_id)
509 .select_only()
510 .column(fragment::Column::JobId)
511 .into_tuple()
512 .one(&inner.db)
513 .await?
514 .ok_or_else(|| MetaError::catalog_id_not_found("fragment", fragment_id))?;
515 Ok(job_id)
516 }
517
518 pub async fn list_table_objects(
520 &self,
521 ) -> MetaResult<Vec<(TableId, String, String, String, String)>> {
522 let inner = self.inner.read().await;
523 Ok(Object::find()
524 .select_only()
525 .join(JoinType::InnerJoin, object::Relation::Table.def())
526 .join(JoinType::InnerJoin, object::Relation::Database2.def())
527 .join(JoinType::InnerJoin, object::Relation::Schema2.def())
528 .column(object::Column::Oid)
529 .column(database::Column::Name)
530 .column(schema::Column::Name)
531 .column(table::Column::Name)
532 .column(database::Column::ResourceGroup)
533 .into_tuple()
534 .all(&inner.db)
535 .await?)
536 }
537
538 pub async fn list_source_objects(
540 &self,
541 ) -> MetaResult<Vec<(TableId, String, String, String, String)>> {
542 let inner = self.inner.read().await;
543 Ok(Object::find()
544 .select_only()
545 .join(JoinType::InnerJoin, object::Relation::Source.def())
546 .join(JoinType::InnerJoin, object::Relation::Database2.def())
547 .join(JoinType::InnerJoin, object::Relation::Schema2.def())
548 .column(object::Column::Oid)
549 .column(database::Column::Name)
550 .column(schema::Column::Name)
551 .column(source::Column::Name)
552 .column(database::Column::ResourceGroup)
553 .into_tuple()
554 .all(&inner.db)
555 .await?)
556 }
557
558 pub async fn list_sink_objects(
560 &self,
561 ) -> MetaResult<Vec<(TableId, String, String, String, String)>> {
562 let inner = self.inner.read().await;
563 Ok(Object::find()
564 .select_only()
565 .join(JoinType::InnerJoin, object::Relation::Sink.def())
566 .join(JoinType::InnerJoin, object::Relation::Database2.def())
567 .join(JoinType::InnerJoin, object::Relation::Schema2.def())
568 .column(object::Column::Oid)
569 .column(database::Column::Name)
570 .column(schema::Column::Name)
571 .column(sink::Column::Name)
572 .column(database::Column::ResourceGroup)
573 .into_tuple()
574 .all(&inner.db)
575 .await?)
576 }
577
578 pub async fn get_streaming_job_status(
579 &self,
580 streaming_job_id: ObjectId,
581 ) -> MetaResult<JobStatus> {
582 let inner = self.inner.read().await;
583 let status = StreamingJob::find_by_id(streaming_job_id)
584 .select_only()
585 .column(streaming_job::Column::JobStatus)
586 .into_tuple()
587 .one(&inner.db)
588 .await?
589 .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", streaming_job_id))?;
590 Ok(status)
591 }
592}