1use risingwave_common::catalog::ColumnCatalog;
16use risingwave_common::id::JobId;
17
18use super::*;
19use crate::controller::utils::{
20 StreamingJobExtraInfo, get_database_resource_group, get_existing_job_resource_group,
21 get_streaming_job_extra_info as fetch_streaming_job_extra_info, get_table_columns,
22};
23
24impl CatalogController {
25 pub async fn get_secret_by_id(&self, secret_id: SecretId) -> MetaResult<PbSecret> {
26 let inner = self.inner.read().await;
27 let (secret, obj) = Secret::find_by_id(secret_id)
28 .find_also_related(Object)
29 .one(&inner.db)
30 .await?
31 .ok_or_else(|| MetaError::catalog_id_not_found("secret", secret_id))?;
32 Ok(ObjectModel(secret, obj.unwrap()).into())
33 }
34
35 pub async fn get_object_database_id(
36 &self,
37 object_id: impl Into<ObjectId>,
38 ) -> MetaResult<DatabaseId> {
39 let object_id = object_id.into();
40 let inner = self.inner.read().await;
41 let (database_id,): (Option<DatabaseId>,) = Object::find_by_id(object_id)
42 .select_only()
43 .select_column(object::Column::DatabaseId)
44 .into_tuple()
45 .one(&inner.db)
46 .await?
47 .ok_or_else(|| MetaError::catalog_id_not_found("object", object_id))?;
48 Ok(database_id.ok_or_else(|| anyhow!("object has no database id: {object_id}"))?)
49 }
50
51 pub async fn get_connection_by_id(
52 &self,
53 connection_id: ConnectionId,
54 ) -> MetaResult<PbConnection> {
55 let inner = self.inner.read().await;
56 let (conn, obj) = Connection::find_by_id(connection_id)
57 .find_also_related(Object)
58 .one(&inner.db)
59 .await?
60 .ok_or_else(|| MetaError::catalog_id_not_found("connection", connection_id))?;
61
62 Ok(ObjectModel(conn, obj.unwrap()).into())
63 }
64
65 pub async fn get_table_catalog_by_name(
66 &self,
67 database_id: DatabaseId,
68 schema_id: SchemaId,
69 name: &str,
70 ) -> MetaResult<Option<PbTable>> {
71 let inner = self.inner.read().await;
72 let table_obj = Table::find()
73 .find_also_related(Object)
74 .filter(
75 table::Column::Name
76 .eq(name)
77 .and(object::Column::DatabaseId.eq(database_id))
78 .and(object::Column::SchemaId.eq(schema_id)),
79 )
80 .one(&inner.db)
81 .await?;
82 Ok(table_obj.map(|(table, obj)| ObjectModel(table, obj.unwrap()).into()))
83 }
84
85 pub async fn get_table_by_name(
86 &self,
87 database_name: &str,
88 table_name: &str,
89 ) -> MetaResult<Option<PbTable>> {
90 let inner = self.inner.read().await;
91 let table_obj = Table::find()
92 .find_also_related(Object)
93 .join(JoinType::InnerJoin, object::Relation::Database2.def())
94 .filter(
95 table::Column::Name
96 .eq(table_name)
97 .and(database::Column::Name.eq(database_name)),
98 )
99 .one(&inner.db)
100 .await?;
101 Ok(table_obj.map(|(table, obj)| ObjectModel(table, obj.unwrap()).into()))
102 }
103
104 pub async fn get_table_associated_source_id(
105 &self,
106 table_id: TableId,
107 ) -> MetaResult<Option<SourceId>> {
108 let inner = self.inner.read().await;
109 Table::find_by_id(table_id)
110 .select_only()
111 .select_column(table::Column::OptionalAssociatedSourceId)
112 .into_tuple()
113 .one(&inner.db)
114 .await?
115 .ok_or_else(|| MetaError::catalog_id_not_found("table", table_id))
116 }
117
118 pub async fn get_table_by_associate_source_id(
119 &self,
120 associated_source_id: SourceId,
121 ) -> MetaResult<PbTable> {
122 let inner = self.inner.read().await;
123 Table::find()
124 .find_also_related(Object)
125 .filter(table::Column::OptionalAssociatedSourceId.eq(associated_source_id))
126 .one(&inner.db)
127 .await?
128 .map(|(table, obj)| ObjectModel(table, obj.unwrap()).into())
129 .ok_or_else(|| {
130 MetaError::catalog_id_not_found("table associated source", associated_source_id)
131 })
132 }
133
134 pub async fn get_table_by_id(&self, table_id: TableId) -> MetaResult<PbTable> {
135 let inner = self.inner.read().await;
136 let table_obj = Table::find_by_id(table_id)
137 .find_also_related(Object)
138 .one(&inner.db)
139 .await?;
140 if let Some((table, obj)) = table_obj {
141 Ok(ObjectModel(table, obj.unwrap()).into())
142 } else {
143 Err(MetaError::catalog_id_not_found("table", table_id))
144 }
145 }
146
147 pub async fn get_user_created_table_by_ids(
148 &self,
149 job_ids: impl Iterator<Item = JobId>,
150 ) -> MetaResult<Vec<PbTable>> {
151 let inner = self.inner.read().await;
152 let table_objs = Table::find()
153 .find_also_related(Object)
154 .filter(
155 table::Column::TableId
156 .is_in(job_ids.map(|job_id| job_id.as_mv_table_id()).collect_vec())
157 .and(table::Column::TableType.eq(TableType::Table)),
158 )
159 .all(&inner.db)
160 .await?;
161 let tables = table_objs
162 .into_iter()
163 .map(|(table, obj)| ObjectModel(table, obj.unwrap()).into())
164 .collect();
165 Ok(tables)
166 }
167
168 pub async fn get_table_by_ids(
169 &self,
170 table_ids: Vec<TableId>,
171 include_dropped_table: bool,
172 ) -> MetaResult<Vec<PbTable>> {
173 let inner = self.inner.read().await;
174 let table_objs = Table::find()
175 .find_also_related(Object)
176 .filter(table::Column::TableId.is_in(table_ids.clone()))
177 .all(&inner.db)
178 .await?;
179 let tables = table_objs
180 .into_iter()
181 .map(|(table, obj)| ObjectModel(table, obj.unwrap()).into());
182 let tables = if include_dropped_table {
183 tables
184 .chain(inner.dropped_tables.iter().filter_map(|(id, t)| {
185 if table_ids.contains(id) {
186 Some(t.clone())
187 } else {
188 None
189 }
190 }))
191 .collect()
192 } else {
193 tables.collect()
194 };
195 Ok(tables)
196 }
197
198 pub async fn get_table_columns(&self, id: TableId) -> MetaResult<Vec<ColumnCatalog>> {
199 let inner = self.inner.read().await;
200 Ok(get_table_columns(&inner.db, id)
201 .await?
202 .to_protobuf()
203 .into_iter()
204 .map(|col| col.into())
205 .collect())
206 }
207
208 pub async fn get_table_incoming_sinks(&self, table_id: TableId) -> MetaResult<Vec<PbSink>> {
209 let inner = self.inner.read().await;
210 let sink_objs = Sink::find()
211 .find_also_related(Object)
212 .filter(sink::Column::TargetTable.eq(table_id))
213 .all(&inner.db)
214 .await?;
215 Ok(sink_objs
216 .into_iter()
217 .map(|(sink, obj)| ObjectModel(sink, obj.unwrap()).into())
218 .collect())
219 }
220
221 pub async fn get_sink_by_id(&self, sink_id: SinkId) -> MetaResult<Option<PbSink>> {
222 let inner = self.inner.read().await;
223 let sink_objs = Sink::find_by_id(sink_id)
224 .find_also_related(Object)
225 .one(&inner.db)
226 .await?;
227 Ok(sink_objs.map(|(sink, obj)| ObjectModel(sink, obj.unwrap()).into()))
228 }
229
230 pub async fn get_sink_auto_refresh_schema_from(
231 &self,
232 table_id: TableId,
233 ) -> MetaResult<Vec<PbSink>> {
234 let inner = self.inner.read().await;
235 let sink_objs = Sink::find()
236 .find_also_related(Object)
237 .filter(sink::Column::AutoRefreshSchemaFromTable.eq(table_id))
238 .all(&inner.db)
239 .await?;
240 Ok(sink_objs
241 .into_iter()
242 .map(|(sink, obj)| ObjectModel(sink, obj.unwrap()).into())
243 .collect())
244 }
245
246 pub async fn get_sink_state_table_ids(&self, sink_id: SinkId) -> MetaResult<Vec<TableId>> {
247 let inner = self.inner.read().await;
248 let tables: Vec<I32Array> = Fragment::find()
249 .select_only()
250 .column(fragment::Column::StateTableIds)
251 .filter(fragment::Column::JobId.eq(sink_id))
252 .into_tuple()
253 .all(&inner.db)
254 .await?;
255 Ok(tables
256 .into_iter()
257 .flat_map(|ids| {
258 ids.into_inner()
259 .into_iter()
260 .map(|table_id| TableId::new(table_id as _))
261 })
262 .collect())
263 }
264
265 pub async fn get_subscription_by_id(
266 &self,
267 subscription_id: SubscriptionId,
268 ) -> MetaResult<PbSubscription> {
269 let inner = self.inner.read().await;
270 let subscription_objs = Subscription::find()
271 .find_also_related(Object)
272 .filter(subscription::Column::SubscriptionId.eq(subscription_id))
273 .all(&inner.db)
274 .await?;
275 let subscription: PbSubscription = subscription_objs
276 .into_iter()
277 .map(|(subscription, obj)| ObjectModel(subscription, obj.unwrap()).into())
278 .find_or_first(|_| true)
279 .ok_or_else(|| anyhow!("cannot find subscription with id {}", subscription_id))?;
280
281 Ok(subscription)
282 }
283
284 pub async fn get_mv_depended_subscriptions(
285 &self,
286 database_id: Option<DatabaseId>,
287 ) -> MetaResult<HashMap<TableId, HashMap<SubscriptionId, u64>>> {
288 let inner = self.inner.read().await;
289 let select = Subscription::find()
290 .select_only()
291 .select_column(subscription::Column::SubscriptionId)
292 .select_column(subscription::Column::DependentTableId)
293 .select_column(subscription::Column::RetentionSeconds);
294 let select = if let Some(database_id) = database_id {
295 select
296 .join(JoinType::InnerJoin, subscription::Relation::Object.def())
297 .filter(object::Column::DatabaseId.eq(database_id))
298 } else {
299 select
300 };
301 let subscription_objs: Vec<(SubscriptionId, TableId, i64)> =
302 select.into_tuple().all(&inner.db).await?;
303 let mut map: HashMap<_, HashMap<_, _>> = HashMap::new();
304 for (subscription_id, dependent_table_id, retention_seconds) in subscription_objs {
306 map.entry(dependent_table_id)
307 .or_default()
308 .insert(subscription_id, retention_seconds as _);
309 }
310 Ok(map)
311 }
312
313 pub async fn get_all_table_options(&self) -> MetaResult<HashMap<TableId, TableOption>> {
314 let inner = self.inner.read().await;
315 let table_options: Vec<(TableId, Option<i32>)> = Table::find()
316 .select_only()
317 .columns([table::Column::TableId, table::Column::RetentionSeconds])
318 .into_tuple::<(TableId, Option<i32>)>()
319 .all(&inner.db)
320 .await?;
321
322 Ok(table_options
323 .into_iter()
324 .map(|(id, retention_seconds)| {
325 (
326 id,
327 TableOption {
328 retention_seconds: retention_seconds.map(|i| i.try_into().unwrap()),
329 },
330 )
331 })
332 .collect())
333 }
334
335 pub async fn get_all_streaming_parallelisms(
336 &self,
337 ) -> MetaResult<HashMap<ObjectId, StreamingParallelism>> {
338 let inner = self.inner.read().await;
339
340 let job_parallelisms = StreamingJob::find()
341 .select_only()
342 .columns([
343 streaming_job::Column::JobId,
344 streaming_job::Column::Parallelism,
345 ])
346 .into_tuple::<(ObjectId, StreamingParallelism)>()
347 .all(&inner.db)
348 .await?;
349
350 Ok(job_parallelisms
351 .into_iter()
352 .collect::<HashMap<ObjectId, StreamingParallelism>>())
353 }
354
355 pub async fn get_table_name_type_mapping(
356 &self,
357 ) -> MetaResult<HashMap<TableId, (String, String)>> {
358 let inner = self.inner.read().await;
359 let table_name_types: Vec<(TableId, String, TableType)> = Table::find()
360 .select_only()
361 .columns([
362 table::Column::TableId,
363 table::Column::Name,
364 table::Column::TableType,
365 ])
366 .into_tuple()
367 .all(&inner.db)
368 .await?;
369 Ok(table_name_types
370 .into_iter()
371 .map(|(id, name, table_type)| {
372 (
373 id,
374 (name, PbTableType::from(table_type).as_str_name().to_owned()),
375 )
376 })
377 .collect())
378 }
379
380 pub async fn get_table_by_cdc_table_id(
381 &self,
382 cdc_table_id: &String,
383 ) -> MetaResult<Vec<PbTable>> {
384 let inner = self.inner.read().await;
385 let table_objs = Table::find()
386 .find_also_related(Object)
387 .filter(table::Column::CdcTableId.eq(cdc_table_id))
388 .all(&inner.db)
389 .await?;
390 Ok(table_objs
391 .into_iter()
392 .map(|(table, obj)| ObjectModel(table, obj.unwrap()).into())
393 .collect())
394 }
395
396 pub async fn get_created_table_ids(&self) -> MetaResult<Vec<TableId>> {
397 let inner = self.inner.read().await;
398
399 let mut table_ids: Vec<TableId> = StreamingJob::find()
401 .select_only()
402 .column(streaming_job::Column::JobId)
403 .filter(streaming_job::Column::JobStatus.eq(JobStatus::Created))
404 .into_tuple()
405 .all(&inner.db)
406 .await?;
407
408 let internal_table_ids: Vec<TableId> = Table::find()
410 .select_only()
411 .column(table::Column::TableId)
412 .filter(table::Column::BelongsToJobId.is_in(table_ids.clone()))
413 .into_tuple()
414 .all(&inner.db)
415 .await?;
416 table_ids.extend(internal_table_ids);
417
418 Ok(table_ids)
419 }
420
421 pub async fn get_versioned_table_schemas(
424 &self,
425 ) -> MetaResult<HashMap<risingwave_common::catalog::TableId, Vec<i32>>> {
426 let res = self
427 .list_all_state_tables()
428 .await?
429 .into_iter()
430 .filter_map(|t| {
431 if t.version.is_some() {
432 let ret = (
433 t.id,
434 t.columns
435 .iter()
436 .map(|c| c.column_desc.as_ref().unwrap().column_id)
437 .collect_vec(),
438 );
439 return Some(ret);
440 }
441 None
442 })
443 .collect();
444 Ok(res)
445 }
446
447 pub async fn get_existing_job_resource_group(
448 &self,
449 streaming_job_id: JobId,
450 ) -> MetaResult<String> {
451 let inner = self.inner.read().await;
452 get_existing_job_resource_group(&inner.db, streaming_job_id).await
453 }
454
455 pub async fn get_database_resource_group(&self, database_id: DatabaseId) -> MetaResult<String> {
456 let inner = self.inner.read().await;
457 get_database_resource_group(&inner.db, database_id).await
458 }
459
460 pub async fn get_existing_job_resource_groups(
461 &self,
462 streaming_job_ids: Vec<JobId>,
463 ) -> MetaResult<HashMap<JobId, String>> {
464 let inner = self.inner.read().await;
465 let mut resource_groups = HashMap::new();
466 for job_id in streaming_job_ids {
467 let resource_group = get_existing_job_resource_group(&inner.db, job_id).await?;
468 resource_groups.insert(job_id, resource_group);
469 }
470
471 Ok(resource_groups)
472 }
473
474 pub async fn get_existing_job_database_resource_group(
475 &self,
476 streaming_job_id: JobId,
477 ) -> MetaResult<String> {
478 let inner = self.inner.read().await;
479 let database_id: DatabaseId = StreamingJob::find_by_id(streaming_job_id)
480 .select_only()
481 .join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
482 .column(object::Column::DatabaseId)
483 .into_tuple()
484 .one(&inner.db)
485 .await?
486 .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", streaming_job_id))?;
487
488 get_database_resource_group(&inner.db, database_id).await
489 }
490
491 pub async fn get_job_streaming_parallelisms(
492 &self,
493 streaming_job_id: JobId,
494 ) -> MetaResult<StreamingParallelism> {
495 let inner = self.inner.read().await;
496
497 let job_parallelism: StreamingParallelism = StreamingJob::find_by_id(streaming_job_id)
498 .select_only()
499 .column(streaming_job::Column::Parallelism)
500 .into_tuple()
501 .one(&inner.db)
502 .await?
503 .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", streaming_job_id))?;
504
505 Ok(job_parallelism)
506 }
507
508 pub async fn get_fragment_streaming_job_id(
509 &self,
510 fragment_id: FragmentId,
511 ) -> MetaResult<JobId> {
512 let inner = self.inner.read().await;
513 let job_id: JobId = Fragment::find_by_id(fragment_id)
514 .select_only()
515 .column(fragment::Column::JobId)
516 .into_tuple()
517 .one(&inner.db)
518 .await?
519 .ok_or_else(|| MetaError::catalog_id_not_found("fragment", fragment_id))?;
520 Ok(job_id)
521 }
522
523 pub async fn list_streaming_job_with_database(
524 &self,
525 ) -> MetaResult<HashMap<DatabaseId, Vec<JobId>>> {
526 let inner = self.inner.read().await;
527 let database_objects: Vec<(DatabaseId, JobId)> = StreamingJob::find()
528 .select_only()
529 .column(object::Column::DatabaseId)
530 .column(streaming_job::Column::JobId)
531 .join(JoinType::LeftJoin, streaming_job::Relation::Object.def())
532 .into_tuple()
533 .all(&inner.db)
534 .await?;
535
536 Ok(database_objects.into_iter().into_group_map())
537 }
538
539 pub async fn list_table_objects(
541 &self,
542 ) -> MetaResult<Vec<(TableId, String, String, String, String)>> {
543 let inner = self.inner.read().await;
544 Ok(Object::find()
545 .select_only()
546 .join(JoinType::InnerJoin, object::Relation::Table.def())
547 .join(JoinType::InnerJoin, object::Relation::Database2.def())
548 .join(JoinType::InnerJoin, object::Relation::Schema2.def())
549 .column(object::Column::Oid)
550 .column(database::Column::Name)
551 .column(schema::Column::Name)
552 .column(table::Column::Name)
553 .column(database::Column::ResourceGroup)
554 .into_tuple()
555 .all(&inner.db)
556 .await?)
557 }
558
559 pub async fn list_source_objects(
561 &self,
562 ) -> MetaResult<Vec<(TableId, String, String, String, String)>> {
563 let inner = self.inner.read().await;
564 Ok(Object::find()
565 .select_only()
566 .join(JoinType::InnerJoin, object::Relation::Source.def())
567 .join(JoinType::InnerJoin, object::Relation::Database2.def())
568 .join(JoinType::InnerJoin, object::Relation::Schema2.def())
569 .column(object::Column::Oid)
570 .column(database::Column::Name)
571 .column(schema::Column::Name)
572 .column(source::Column::Name)
573 .column(database::Column::ResourceGroup)
574 .into_tuple()
575 .all(&inner.db)
576 .await?)
577 }
578
579 pub async fn list_sink_objects(
581 &self,
582 ) -> MetaResult<Vec<(TableId, String, String, String, String)>> {
583 let inner = self.inner.read().await;
584 Ok(Object::find()
585 .select_only()
586 .join(JoinType::InnerJoin, object::Relation::Sink.def())
587 .join(JoinType::InnerJoin, object::Relation::Database2.def())
588 .join(JoinType::InnerJoin, object::Relation::Schema2.def())
589 .column(object::Column::Oid)
590 .column(database::Column::Name)
591 .column(schema::Column::Name)
592 .column(sink::Column::Name)
593 .column(database::Column::ResourceGroup)
594 .into_tuple()
595 .all(&inner.db)
596 .await?)
597 }
598
599 pub async fn get_streaming_job_status(&self, streaming_job_id: JobId) -> MetaResult<JobStatus> {
600 let inner = self.inner.read().await;
601 let status = StreamingJob::find_by_id(streaming_job_id)
602 .select_only()
603 .column(streaming_job::Column::JobStatus)
604 .into_tuple()
605 .one(&inner.db)
606 .await?
607 .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", streaming_job_id))?;
608 Ok(status)
609 }
610
611 pub async fn get_streaming_job_extra_info(
612 &self,
613 job_ids: Vec<JobId>,
614 ) -> MetaResult<HashMap<JobId, StreamingJobExtraInfo>> {
615 let inner = self.inner.read().await;
616 let txn = inner.db.begin().await?;
617
618 let result = fetch_streaming_job_extra_info(&txn, job_ids).await?;
619 Ok(result)
620 }
621}