risingwave_meta/controller/catalog/
get_op.rs1use risingwave_common::catalog::ColumnCatalog;
16
17use super::*;
18use crate::controller::utils::{
19 get_database_resource_group, get_existing_job_resource_group, get_table_columns,
20};
21
22impl CatalogController {
23 pub async fn get_secret_by_id(&self, secret_id: SecretId) -> MetaResult<PbSecret> {
24 let inner = self.inner.read().await;
25 let (secret, obj) = Secret::find_by_id(secret_id)
26 .find_also_related(Object)
27 .one(&inner.db)
28 .await?
29 .ok_or_else(|| MetaError::catalog_id_not_found("secret", secret_id))?;
30 Ok(ObjectModel(secret, obj.unwrap()).into())
31 }
32
33 pub async fn get_object_database_id(&self, object_id: ObjectId) -> MetaResult<DatabaseId> {
34 let inner = self.inner.read().await;
35 let (database_id,): (Option<DatabaseId>,) = Object::find_by_id(object_id)
36 .select_only()
37 .select_column(object::Column::DatabaseId)
38 .into_tuple()
39 .one(&inner.db)
40 .await?
41 .ok_or_else(|| MetaError::catalog_id_not_found("object", object_id))?;
42 Ok(database_id.ok_or_else(|| anyhow!("object has no database id: {object_id}"))?)
43 }
44
45 pub async fn get_connection_by_id(
46 &self,
47 connection_id: ConnectionId,
48 ) -> MetaResult<PbConnection> {
49 let inner = self.inner.read().await;
50 let (conn, obj) = Connection::find_by_id(connection_id)
51 .find_also_related(Object)
52 .one(&inner.db)
53 .await?
54 .ok_or_else(|| MetaError::catalog_id_not_found("connection", connection_id))?;
55
56 Ok(ObjectModel(conn, obj.unwrap()).into())
57 }
58
59 pub async fn get_table_by_name(
60 &self,
61 database_name: &str,
62 table_name: &str,
63 ) -> MetaResult<Option<PbTable>> {
64 let inner = self.inner.read().await;
65 let table_obj = Table::find()
66 .find_also_related(Object)
67 .join(JoinType::InnerJoin, object::Relation::Database2.def())
68 .filter(
69 table::Column::Name
70 .eq(table_name)
71 .and(database::Column::Name.eq(database_name)),
72 )
73 .one(&inner.db)
74 .await?;
75 Ok(table_obj.map(|(table, obj)| ObjectModel(table, obj.unwrap()).into()))
76 }
77
78 pub async fn get_table_associated_source_id(
79 &self,
80 table_id: TableId,
81 ) -> MetaResult<Option<SourceId>> {
82 let inner = self.inner.read().await;
83 Table::find_by_id(table_id)
84 .select_only()
85 .select_column(table::Column::OptionalAssociatedSourceId)
86 .into_tuple()
87 .one(&inner.db)
88 .await?
89 .ok_or_else(|| MetaError::catalog_id_not_found("table", table_id))
90 }
91
92 pub async fn get_table_by_id(&self, table_id: TableId) -> MetaResult<PbTable> {
93 let inner = self.inner.read().await;
94 let table_obj = Table::find_by_id(table_id)
95 .find_also_related(Object)
96 .one(&inner.db)
97 .await?;
98 if let Some((table, obj)) = table_obj {
99 Ok(ObjectModel(table, obj.unwrap()).into())
100 } else {
101 Err(MetaError::catalog_id_not_found("table", table_id))
102 }
103 }
104
105 pub async fn get_user_created_table_by_ids(
106 &self,
107 table_ids: Vec<TableId>,
108 ) -> MetaResult<Vec<PbTable>> {
109 let inner = self.inner.read().await;
110 let table_objs = Table::find()
111 .find_also_related(Object)
112 .filter(
113 table::Column::TableId
114 .is_in(table_ids.clone())
115 .and(table::Column::TableType.eq(TableType::Table)),
116 )
117 .all(&inner.db)
118 .await?;
119 let tables = table_objs
120 .into_iter()
121 .map(|(table, obj)| ObjectModel(table, obj.unwrap()).into())
122 .collect();
123 Ok(tables)
124 }
125
126 pub async fn get_table_by_ids(
127 &self,
128 table_ids: Vec<TableId>,
129 include_dropped_table: bool,
130 ) -> MetaResult<Vec<PbTable>> {
131 let inner = self.inner.read().await;
132 let table_objs = Table::find()
133 .find_also_related(Object)
134 .filter(table::Column::TableId.is_in(table_ids.clone()))
135 .all(&inner.db)
136 .await?;
137 let tables = table_objs
138 .into_iter()
139 .map(|(table, obj)| ObjectModel(table, obj.unwrap()).into());
140 let tables = if include_dropped_table {
141 tables
142 .chain(inner.dropped_tables.iter().filter_map(|(id, t)| {
143 if table_ids.contains(id) {
144 Some(t.clone())
145 } else {
146 None
147 }
148 }))
149 .collect()
150 } else {
151 tables.collect()
152 };
153 Ok(tables)
154 }
155
156 pub async fn get_table_columns(&self, id: TableId) -> MetaResult<Vec<ColumnCatalog>> {
157 let inner = self.inner.read().await;
158 Ok(get_table_columns(&inner.db, id)
159 .await?
160 .to_protobuf()
161 .into_iter()
162 .map(|col| col.into())
163 .collect())
164 }
165
166 pub async fn get_table_incoming_sinks(&self, table_id: TableId) -> MetaResult<Vec<PbSink>> {
167 let inner = self.inner.read().await;
168 let sink_objs = Sink::find()
169 .find_also_related(Object)
170 .filter(sink::Column::TargetTable.eq(table_id))
171 .all(&inner.db)
172 .await?;
173 Ok(sink_objs
174 .into_iter()
175 .map(|(sink, obj)| ObjectModel(sink, obj.unwrap()).into())
176 .collect())
177 }
178
179 pub async fn get_sink_by_ids(&self, sink_ids: Vec<SinkId>) -> MetaResult<Vec<PbSink>> {
180 let inner = self.inner.read().await;
181 let sink_objs = Sink::find()
182 .find_also_related(Object)
183 .filter(sink::Column::SinkId.is_in(sink_ids))
184 .all(&inner.db)
185 .await?;
186 Ok(sink_objs
187 .into_iter()
188 .map(|(sink, obj)| ObjectModel(sink, obj.unwrap()).into())
189 .collect())
190 }
191
192 pub async fn get_sink_auto_refresh_schema_from(
193 &self,
194 table_id: TableId,
195 ) -> MetaResult<Vec<PbSink>> {
196 let inner = self.inner.read().await;
197 let sink_objs = Sink::find()
198 .find_also_related(Object)
199 .filter(sink::Column::AutoRefreshSchemaFromTable.eq(table_id))
200 .all(&inner.db)
201 .await?;
202 Ok(sink_objs
203 .into_iter()
204 .map(|(sink, obj)| ObjectModel(sink, obj.unwrap()).into())
205 .collect())
206 }
207
208 pub async fn get_sink_state_table_ids(&self, sink_id: SinkId) -> MetaResult<Vec<TableId>> {
209 let inner = self.inner.read().await;
210 let tables: Vec<I32Array> = Fragment::find()
211 .select_only()
212 .column(fragment::Column::StateTableIds)
213 .filter(fragment::Column::JobId.eq(sink_id))
214 .into_tuple()
215 .all(&inner.db)
216 .await?;
217 Ok(tables
218 .into_iter()
219 .flat_map(|ids| ids.into_inner().into_iter())
220 .collect())
221 }
222
223 pub async fn get_subscription_by_id(
224 &self,
225 subscription_id: SubscriptionId,
226 ) -> MetaResult<PbSubscription> {
227 let inner = self.inner.read().await;
228 let subscription_objs = Subscription::find()
229 .find_also_related(Object)
230 .filter(subscription::Column::SubscriptionId.eq(subscription_id))
231 .all(&inner.db)
232 .await?;
233 let subscription: PbSubscription = subscription_objs
234 .into_iter()
235 .map(|(subscription, obj)| ObjectModel(subscription, obj.unwrap()).into())
236 .find_or_first(|_| true)
237 .ok_or_else(|| anyhow!("cannot find subscription with id {}", subscription_id))?;
238
239 Ok(subscription)
240 }
241
242 pub async fn get_mv_depended_subscriptions(
243 &self,
244 database_id: Option<DatabaseId>,
245 ) -> MetaResult<HashMap<DatabaseId, HashMap<TableId, HashMap<SubscriptionId, u64>>>> {
246 let inner = self.inner.read().await;
247 let select = Subscription::find()
248 .select_only()
249 .select_column(subscription::Column::SubscriptionId)
250 .select_column(subscription::Column::DependentTableId)
251 .select_column(subscription::Column::RetentionSeconds)
252 .select_column(object::Column::DatabaseId)
253 .join(JoinType::InnerJoin, subscription::Relation::Object.def());
254 let select = if let Some(database_id) = database_id {
255 select.filter(object::Column::DatabaseId.eq(database_id))
256 } else {
257 select
258 };
259 let subscription_objs: Vec<(SubscriptionId, ObjectId, i64, DatabaseId)> =
260 select.into_tuple().all(&inner.db).await?;
261 let mut map: HashMap<_, HashMap<_, HashMap<_, _>>> = HashMap::new();
262 for (subscription_id, dependent_table_id, retention_seconds, database_id) in
264 subscription_objs
265 {
266 map.entry(database_id)
267 .or_default()
268 .entry(dependent_table_id)
269 .or_default()
270 .insert(subscription_id, retention_seconds as _);
271 }
272 Ok(map)
273 }
274
275 pub async fn get_all_table_options(&self) -> MetaResult<HashMap<TableId, TableOption>> {
276 let inner = self.inner.read().await;
277 let table_options: Vec<(TableId, Option<i32>)> = Table::find()
278 .select_only()
279 .columns([table::Column::TableId, table::Column::RetentionSeconds])
280 .into_tuple::<(TableId, Option<i32>)>()
281 .all(&inner.db)
282 .await?;
283
284 Ok(table_options
285 .into_iter()
286 .map(|(id, retention_seconds)| {
287 (
288 id,
289 TableOption {
290 retention_seconds: retention_seconds.map(|i| i.try_into().unwrap()),
291 },
292 )
293 })
294 .collect())
295 }
296
297 pub async fn get_all_streaming_parallelisms(
298 &self,
299 ) -> MetaResult<HashMap<ObjectId, StreamingParallelism>> {
300 let inner = self.inner.read().await;
301
302 let job_parallelisms = StreamingJob::find()
303 .select_only()
304 .columns([
305 streaming_job::Column::JobId,
306 streaming_job::Column::Parallelism,
307 ])
308 .into_tuple::<(ObjectId, StreamingParallelism)>()
309 .all(&inner.db)
310 .await?;
311
312 Ok(job_parallelisms
313 .into_iter()
314 .collect::<HashMap<ObjectId, StreamingParallelism>>())
315 }
316
317 pub async fn get_table_name_type_mapping(
318 &self,
319 ) -> MetaResult<HashMap<TableId, (String, String)>> {
320 let inner = self.inner.read().await;
321 let table_name_types: Vec<(TableId, String, TableType)> = Table::find()
322 .select_only()
323 .columns([
324 table::Column::TableId,
325 table::Column::Name,
326 table::Column::TableType,
327 ])
328 .into_tuple()
329 .all(&inner.db)
330 .await?;
331 Ok(table_name_types
332 .into_iter()
333 .map(|(id, name, table_type)| {
334 (
335 id,
336 (name, PbTableType::from(table_type).as_str_name().to_owned()),
337 )
338 })
339 .collect())
340 }
341
342 pub async fn get_table_by_cdc_table_id(
343 &self,
344 cdc_table_id: &String,
345 ) -> MetaResult<Vec<PbTable>> {
346 let inner = self.inner.read().await;
347 let table_objs = Table::find()
348 .find_also_related(Object)
349 .filter(table::Column::CdcTableId.eq(cdc_table_id))
350 .all(&inner.db)
351 .await?;
352 Ok(table_objs
353 .into_iter()
354 .map(|(table, obj)| ObjectModel(table, obj.unwrap()).into())
355 .collect())
356 }
357
358 pub async fn get_created_table_ids(&self) -> MetaResult<Vec<TableId>> {
359 let inner = self.inner.read().await;
360
361 let mut table_ids: Vec<TableId> = StreamingJob::find()
363 .select_only()
364 .column(streaming_job::Column::JobId)
365 .filter(streaming_job::Column::JobStatus.eq(JobStatus::Created))
366 .into_tuple()
367 .all(&inner.db)
368 .await?;
369
370 let internal_table_ids: Vec<TableId> = Table::find()
372 .select_only()
373 .column(table::Column::TableId)
374 .filter(table::Column::BelongsToJobId.is_in(table_ids.clone()))
375 .into_tuple()
376 .all(&inner.db)
377 .await?;
378 table_ids.extend(internal_table_ids);
379
380 Ok(table_ids)
381 }
382
383 pub async fn get_versioned_table_schemas(&self) -> MetaResult<HashMap<TableId, Vec<i32>>> {
386 let res = self
387 .list_all_state_tables()
388 .await?
389 .into_iter()
390 .filter_map(|t| {
391 if t.version.is_some() {
392 let ret = (
393 t.id.try_into().unwrap(),
394 t.columns
395 .iter()
396 .map(|c| c.column_desc.as_ref().unwrap().column_id)
397 .collect_vec(),
398 );
399 return Some(ret);
400 }
401 None
402 })
403 .collect();
404 Ok(res)
405 }
406
407 pub async fn get_existing_job_resource_group(
408 &self,
409 streaming_job_id: ObjectId,
410 ) -> MetaResult<String> {
411 let inner = self.inner.read().await;
412 get_existing_job_resource_group(&inner.db, streaming_job_id).await
413 }
414
415 pub async fn get_database_resource_group(&self, database_id: ObjectId) -> MetaResult<String> {
416 let inner = self.inner.read().await;
417 get_database_resource_group(&inner.db, database_id).await
418 }
419
420 pub async fn get_existing_job_resource_groups(
421 &self,
422 streaming_job_ids: Vec<ObjectId>,
423 ) -> MetaResult<HashMap<ObjectId, String>> {
424 let inner = self.inner.read().await;
425 let mut resource_groups = HashMap::new();
426 for job_id in streaming_job_ids {
427 let resource_group = get_existing_job_resource_group(&inner.db, job_id).await?;
428 resource_groups.insert(job_id, resource_group);
429 }
430
431 Ok(resource_groups)
432 }
433
434 pub async fn get_existing_job_database_resource_group(
435 &self,
436 streaming_job_id: ObjectId,
437 ) -> MetaResult<String> {
438 let inner = self.inner.read().await;
439 let database_id: ObjectId = StreamingJob::find_by_id(streaming_job_id)
440 .select_only()
441 .join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
442 .column(object::Column::DatabaseId)
443 .into_tuple()
444 .one(&inner.db)
445 .await?
446 .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", streaming_job_id))?;
447
448 get_database_resource_group(&inner.db, database_id).await
449 }
450
451 pub async fn get_job_streaming_parallelisms(
452 &self,
453 streaming_job_id: ObjectId,
454 ) -> MetaResult<StreamingParallelism> {
455 let inner = self.inner.read().await;
456
457 let job_parallelism: StreamingParallelism = StreamingJob::find_by_id(streaming_job_id)
458 .select_only()
459 .column(streaming_job::Column::Parallelism)
460 .into_tuple()
461 .one(&inner.db)
462 .await?
463 .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", streaming_job_id))?;
464
465 Ok(job_parallelism)
466 }
467
468 pub async fn get_fragment_streaming_job_id(
469 &self,
470 fragment_id: FragmentId,
471 ) -> MetaResult<ObjectId> {
472 let inner = self.inner.read().await;
473 let job_id: ObjectId = Fragment::find_by_id(fragment_id)
474 .select_only()
475 .column(fragment::Column::JobId)
476 .into_tuple()
477 .one(&inner.db)
478 .await?
479 .ok_or_else(|| MetaError::catalog_id_not_found("fragment", fragment_id))?;
480 Ok(job_id)
481 }
482
483 pub async fn list_table_objects(
485 &self,
486 ) -> MetaResult<Vec<(TableId, String, String, String, String)>> {
487 let inner = self.inner.read().await;
488 Ok(Object::find()
489 .select_only()
490 .join(JoinType::InnerJoin, object::Relation::Table.def())
491 .join(JoinType::InnerJoin, object::Relation::Database2.def())
492 .join(JoinType::InnerJoin, object::Relation::Schema2.def())
493 .column(object::Column::Oid)
494 .column(database::Column::Name)
495 .column(schema::Column::Name)
496 .column(table::Column::Name)
497 .column(database::Column::ResourceGroup)
498 .into_tuple()
499 .all(&inner.db)
500 .await?)
501 }
502
503 pub async fn list_source_objects(
505 &self,
506 ) -> MetaResult<Vec<(TableId, String, String, String, String)>> {
507 let inner = self.inner.read().await;
508 Ok(Object::find()
509 .select_only()
510 .join(JoinType::InnerJoin, object::Relation::Source.def())
511 .join(JoinType::InnerJoin, object::Relation::Database2.def())
512 .join(JoinType::InnerJoin, object::Relation::Schema2.def())
513 .column(object::Column::Oid)
514 .column(database::Column::Name)
515 .column(schema::Column::Name)
516 .column(source::Column::Name)
517 .column(database::Column::ResourceGroup)
518 .into_tuple()
519 .all(&inner.db)
520 .await?)
521 }
522
523 pub async fn list_sink_objects(
525 &self,
526 ) -> MetaResult<Vec<(TableId, String, String, String, String)>> {
527 let inner = self.inner.read().await;
528 Ok(Object::find()
529 .select_only()
530 .join(JoinType::InnerJoin, object::Relation::Sink.def())
531 .join(JoinType::InnerJoin, object::Relation::Database2.def())
532 .join(JoinType::InnerJoin, object::Relation::Schema2.def())
533 .column(object::Column::Oid)
534 .column(database::Column::Name)
535 .column(schema::Column::Name)
536 .column(sink::Column::Name)
537 .column(database::Column::ResourceGroup)
538 .into_tuple()
539 .all(&inner.db)
540 .await?)
541 }
542
543 pub async fn get_streaming_job_status(
544 &self,
545 streaming_job_id: ObjectId,
546 ) -> MetaResult<JobStatus> {
547 let inner = self.inner.read().await;
548 let status = StreamingJob::find_by_id(streaming_job_id)
549 .select_only()
550 .column(streaming_job::Column::JobStatus)
551 .into_tuple()
552 .one(&inner.db)
553 .await?
554 .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", streaming_job_id))?;
555 Ok(status)
556 }
557}