1use std::collections::HashMap;
16use std::sync::Arc;
17
18use itertools::Itertools;
19use risingwave_common::catalog::{FunctionId, IndexId, StreamJobStatus, TableId};
20use risingwave_common::session_config::{SearchPath, USER_NAME_WILD_CARD};
21use risingwave_common::types::DataType;
22use risingwave_connector::sink::catalog::SinkCatalog;
23use risingwave_pb::catalog::{
24 PbConnection, PbDatabase, PbFunction, PbIndex, PbSchema, PbSecret, PbSink, PbSource,
25 PbSubscription, PbTable, PbView,
26};
27use risingwave_pb::hummock::HummockVersionStats;
28
29use super::function_catalog::FunctionCatalog;
30use super::source_catalog::SourceCatalog;
31use super::subscription_catalog::{SubscriptionCatalog, SubscriptionState};
32use super::view_catalog::ViewCatalog;
33use super::{
34 CatalogError, CatalogResult, ConnectionId, SecretId, SinkId, SourceId, SubscriptionId, ViewId,
35};
36use crate::catalog::connection_catalog::ConnectionCatalog;
37use crate::catalog::database_catalog::DatabaseCatalog;
38use crate::catalog::schema_catalog::SchemaCatalog;
39use crate::catalog::secret_catalog::SecretCatalog;
40use crate::catalog::system_catalog::{
41 SystemTableCatalog, get_sys_tables_in_schema, get_sys_views_in_schema,
42};
43use crate::catalog::table_catalog::TableCatalog;
44use crate::catalog::{DatabaseId, IndexCatalog, SchemaId};
45use crate::expr::{Expr, ExprImpl};
46
47#[derive(Copy, Clone)]
48pub enum SchemaPath<'a> {
49 Name(&'a str),
50 Path(&'a SearchPath, &'a str),
52}
53
54impl<'a> SchemaPath<'a> {
55 pub fn new(
56 schema_name: Option<&'a str>,
57 search_path: &'a SearchPath,
58 user_name: &'a str,
59 ) -> Self {
60 match schema_name {
61 Some(schema_name) => SchemaPath::Name(schema_name),
62 None => SchemaPath::Path(search_path, user_name),
63 }
64 }
65
66 pub fn try_find<T, E>(
68 &self,
69 mut f: impl FnMut(&str) -> Result<Option<T>, E>,
70 ) -> Result<Option<(T, &'a str)>, E> {
71 match self {
72 SchemaPath::Name(schema_name) => Ok(f(schema_name)?.map(|t| (t, *schema_name))),
73 SchemaPath::Path(search_path, user_name) => {
74 for schema_name in search_path.path() {
75 let mut schema_name: &str = schema_name;
76 if schema_name == USER_NAME_WILD_CARD {
77 schema_name = user_name;
78 }
79 if let Ok(Some(res)) = f(schema_name) {
80 return Ok(Some((res, schema_name)));
81 }
82 }
83 Ok(None)
84 }
85 }
86 }
87}
88
89pub struct Catalog {
101 database_by_name: HashMap<String, DatabaseCatalog>,
102 db_name_by_id: HashMap<DatabaseId, String>,
103 table_by_id: HashMap<TableId, Arc<TableCatalog>>,
105 table_stats: HummockVersionStats,
106}
107
108#[expect(clippy::derivable_impls)]
109impl Default for Catalog {
110 fn default() -> Self {
111 Self {
112 database_by_name: HashMap::new(),
113 db_name_by_id: HashMap::new(),
114 table_by_id: HashMap::new(),
115 table_stats: HummockVersionStats::default(),
116 }
117 }
118}
119
120impl Catalog {
121 fn get_database_mut(&mut self, db_id: DatabaseId) -> Option<&mut DatabaseCatalog> {
122 let name = self.db_name_by_id.get(&db_id)?;
123 self.database_by_name.get_mut(name)
124 }
125
126 pub fn clear(&mut self) {
127 self.database_by_name.clear();
128 self.db_name_by_id.clear();
129 self.table_by_id.clear();
130 }
131
132 pub fn create_database(&mut self, db: &PbDatabase) {
133 let name = db.name.clone();
134 let id = db.id;
135
136 self.database_by_name
137 .try_insert(name.clone(), db.into())
138 .unwrap();
139 self.db_name_by_id.try_insert(id, name).unwrap();
140 }
141
142 pub fn create_schema(&mut self, proto: &PbSchema) {
143 self.get_database_mut(proto.database_id)
144 .unwrap()
145 .create_schema(proto);
146
147 for sys_table in get_sys_tables_in_schema(proto.name.as_str()) {
148 self.get_database_mut(proto.database_id)
149 .unwrap()
150 .get_schema_mut(proto.id)
151 .unwrap()
152 .create_sys_table(sys_table);
153 }
154 for mut sys_view in get_sys_views_in_schema(proto.name.as_str()) {
155 sys_view.database_id = proto.database_id;
156 sys_view.schema_id = proto.id;
157 self.get_database_mut(proto.database_id)
158 .unwrap()
159 .get_schema_mut(proto.id)
160 .unwrap()
161 .create_sys_view(Arc::new(sys_view));
162 }
163 }
164
165 pub fn create_table(&mut self, proto: &PbTable) {
166 let table = self
167 .get_database_mut(proto.database_id)
168 .unwrap()
169 .get_schema_mut(proto.schema_id)
170 .unwrap()
171 .create_table(proto);
172 self.table_by_id.insert(proto.id.into(), table);
173 }
174
175 pub fn create_index(&mut self, proto: &PbIndex) {
176 self.get_database_mut(proto.database_id)
177 .unwrap()
178 .get_schema_mut(proto.schema_id)
179 .unwrap()
180 .create_index(proto);
181 }
182
183 pub fn create_source(&mut self, proto: &PbSource) {
184 self.get_database_mut(proto.database_id)
185 .unwrap()
186 .get_schema_mut(proto.schema_id)
187 .unwrap()
188 .create_source(proto);
189 }
190
191 pub fn create_sink(&mut self, proto: &PbSink) {
192 self.get_database_mut(proto.database_id)
193 .unwrap()
194 .get_schema_mut(proto.schema_id)
195 .unwrap()
196 .create_sink(proto);
197 }
198
199 pub fn create_subscription(&mut self, proto: &PbSubscription) {
200 self.get_database_mut(proto.database_id)
201 .unwrap()
202 .get_schema_mut(proto.schema_id)
203 .unwrap()
204 .create_subscription(proto);
205 }
206
207 pub fn create_secret(&mut self, proto: &PbSecret) {
208 self.get_database_mut(proto.database_id)
209 .unwrap()
210 .get_schema_mut(proto.schema_id)
211 .unwrap()
212 .create_secret(proto);
213 }
214
215 pub fn create_view(&mut self, proto: &PbView) {
216 self.get_database_mut(proto.database_id)
217 .unwrap()
218 .get_schema_mut(proto.schema_id)
219 .unwrap()
220 .create_view(proto);
221 }
222
223 pub fn create_function(&mut self, proto: &PbFunction) {
224 self.get_database_mut(proto.database_id)
225 .unwrap()
226 .get_schema_mut(proto.schema_id)
227 .unwrap()
228 .create_function(proto);
229 }
230
231 pub fn create_connection(&mut self, proto: &PbConnection) {
232 self.get_database_mut(proto.database_id)
233 .unwrap()
234 .get_schema_mut(proto.schema_id)
235 .unwrap()
236 .create_connection(proto);
237 }
238
239 pub fn drop_connection(
240 &mut self,
241 db_id: DatabaseId,
242 schema_id: SchemaId,
243 connection_id: ConnectionId,
244 ) {
245 self.get_database_mut(db_id)
246 .unwrap()
247 .get_schema_mut(schema_id)
248 .unwrap()
249 .drop_connection(connection_id);
250 }
251
252 pub fn update_connection(&mut self, proto: &PbConnection) {
253 let database = self.get_database_mut(proto.database_id).unwrap();
254 let schema = database.get_schema_mut(proto.schema_id).unwrap();
255 if schema.get_connection_by_id(&proto.id).is_some() {
256 schema.update_connection(proto);
257 } else {
258 schema.create_connection(proto);
260 database
261 .iter_schemas_mut()
262 .find(|schema| {
263 schema.id() != proto.schema_id
264 && schema.get_connection_by_id(&proto.id).is_some()
265 })
266 .unwrap()
267 .drop_connection(proto.id);
268 }
269 }
270
271 pub fn update_secret(&mut self, proto: &PbSecret) {
272 let database = self.get_database_mut(proto.database_id).unwrap();
273 let schema = database.get_schema_mut(proto.schema_id).unwrap();
274 let secret_id = SecretId::new(proto.id);
275 if schema.get_secret_by_id(&secret_id).is_some() {
276 schema.update_secret(proto);
277 } else {
278 schema.create_secret(proto);
280 database
281 .iter_schemas_mut()
282 .find(|schema| {
283 schema.id() != proto.schema_id && schema.get_secret_by_id(&secret_id).is_some()
284 })
285 .unwrap()
286 .drop_secret(secret_id);
287 }
288 }
289
290 pub fn drop_database(&mut self, db_id: DatabaseId) {
291 let name = self.db_name_by_id.remove(&db_id).unwrap();
292 let database = self.database_by_name.remove(&name).unwrap();
293 database.iter_all_table_ids().for_each(|table| {
294 self.table_by_id.remove(&table);
295 });
296 }
297
298 pub fn drop_schema(&mut self, db_id: DatabaseId, schema_id: SchemaId) {
299 self.get_database_mut(db_id).unwrap().drop_schema(schema_id);
300 }
301
302 pub fn drop_table(&mut self, db_id: DatabaseId, schema_id: SchemaId, tb_id: TableId) {
303 self.table_by_id.remove(&tb_id);
304 self.get_database_mut(db_id)
305 .unwrap()
306 .get_schema_mut(schema_id)
307 .unwrap()
308 .drop_table(tb_id);
309 }
310
311 pub fn update_table(&mut self, proto: &PbTable) {
312 let database = self.get_database_mut(proto.database_id).unwrap();
313 let schema = database.get_schema_mut(proto.schema_id).unwrap();
314 let table = if schema.get_table_by_id(&proto.id.into()).is_some() {
315 schema.update_table(proto)
316 } else {
317 let new_table = schema.create_table(proto);
319 database
320 .iter_schemas_mut()
321 .find(|schema| {
322 schema.id() != proto.schema_id
323 && schema.get_created_table_by_id(&proto.id.into()).is_some()
324 })
325 .unwrap()
326 .drop_table(proto.id.into());
327 new_table
328 };
329
330 self.table_by_id.insert(proto.id.into(), table);
331 }
332
333 pub fn update_database(&mut self, proto: &PbDatabase) {
334 let id = proto.id;
335 let name = proto.name.clone();
336
337 let old_database_name = self.db_name_by_id.get(&id).unwrap().to_owned();
338 if old_database_name != name {
339 let mut database = self.database_by_name.remove(&old_database_name).unwrap();
340 database.name.clone_from(&name);
341 database.owner = proto.owner;
342 self.database_by_name.insert(name.clone(), database);
343 self.db_name_by_id.insert(id, name);
344 } else {
345 let database = self.get_database_mut(id).unwrap();
346 database.name = name;
347 database.owner = proto.owner;
348 database.barrier_interval_ms = proto.barrier_interval_ms;
349 database.checkpoint_frequency = proto.checkpoint_frequency;
350 }
351 }
352
353 pub fn update_schema(&mut self, proto: &PbSchema) {
354 self.get_database_mut(proto.database_id)
355 .unwrap()
356 .update_schema(proto);
357 }
358
359 pub fn update_index(&mut self, proto: &PbIndex) {
360 let database = self.get_database_mut(proto.database_id).unwrap();
361 let schema = database.get_schema_mut(proto.schema_id).unwrap();
362 if schema.get_index_by_id(&proto.id.into()).is_some() {
363 schema.update_index(proto);
364 } else {
365 schema.create_index(proto);
367 database
368 .iter_schemas_mut()
369 .find(|schema| {
370 schema.id() != proto.schema_id
371 && schema.get_index_by_id(&proto.id.into()).is_some()
372 })
373 .unwrap()
374 .drop_index(proto.id.into());
375 }
376 }
377
378 pub fn drop_source(&mut self, db_id: DatabaseId, schema_id: SchemaId, source_id: SourceId) {
379 self.get_database_mut(db_id)
380 .unwrap()
381 .get_schema_mut(schema_id)
382 .unwrap()
383 .drop_source(source_id);
384 }
385
386 pub fn update_source(&mut self, proto: &PbSource) {
387 let database = self.get_database_mut(proto.database_id).unwrap();
388 let schema = database.get_schema_mut(proto.schema_id).unwrap();
389 if schema.get_source_by_id(&proto.id).is_some() {
390 schema.update_source(proto);
391 } else {
392 schema.create_source(proto);
394 database
395 .iter_schemas_mut()
396 .find(|schema| {
397 schema.id() != proto.schema_id && schema.get_source_by_id(&proto.id).is_some()
398 })
399 .unwrap()
400 .drop_source(proto.id);
401 }
402 }
403
404 pub fn drop_sink(&mut self, db_id: DatabaseId, schema_id: SchemaId, sink_id: SinkId) {
405 self.get_database_mut(db_id)
406 .unwrap()
407 .get_schema_mut(schema_id)
408 .unwrap()
409 .drop_sink(sink_id);
410 }
411
412 pub fn drop_secret(&mut self, db_id: DatabaseId, schema_id: SchemaId, secret_id: SecretId) {
413 self.get_database_mut(db_id)
414 .unwrap()
415 .get_schema_mut(schema_id)
416 .unwrap()
417 .drop_secret(secret_id);
418 }
419
420 pub fn update_sink(&mut self, proto: &PbSink) {
421 let database = self.get_database_mut(proto.database_id).unwrap();
422 let schema = database.get_schema_mut(proto.schema_id).unwrap();
423 if schema.get_sink_by_id(&proto.id).is_some() {
424 schema.update_sink(proto);
425 } else {
426 schema.create_sink(proto);
428 database
429 .iter_schemas_mut()
430 .find(|schema| {
431 schema.id() != proto.schema_id && schema.get_sink_by_id(&proto.id).is_some()
432 })
433 .unwrap()
434 .drop_sink(proto.id);
435 }
436 }
437
438 pub fn drop_subscription(
439 &mut self,
440 db_id: DatabaseId,
441 schema_id: SchemaId,
442 subscription_id: SubscriptionId,
443 ) {
444 self.get_database_mut(db_id)
445 .unwrap()
446 .get_schema_mut(schema_id)
447 .unwrap()
448 .drop_subscription(subscription_id);
449 }
450
451 pub fn update_subscription(&mut self, proto: &PbSubscription) {
452 let database = self.get_database_mut(proto.database_id).unwrap();
453 let schema = database.get_schema_mut(proto.schema_id).unwrap();
454 if schema.get_subscription_by_id(&proto.id).is_some() {
455 schema.update_subscription(proto);
456 } else {
457 schema.create_subscription(proto);
459 database
460 .iter_schemas_mut()
461 .find(|schema| {
462 schema.id() != proto.schema_id
463 && schema.get_subscription_by_id(&proto.id).is_some()
464 })
465 .unwrap()
466 .drop_subscription(proto.id);
467 }
468 }
469
470 pub fn drop_index(&mut self, db_id: DatabaseId, schema_id: SchemaId, index_id: IndexId) {
471 self.get_database_mut(db_id)
472 .unwrap()
473 .get_schema_mut(schema_id)
474 .unwrap()
475 .drop_index(index_id);
476 }
477
478 pub fn drop_view(&mut self, db_id: DatabaseId, schema_id: SchemaId, view_id: ViewId) {
479 self.get_database_mut(db_id)
480 .unwrap()
481 .get_schema_mut(schema_id)
482 .unwrap()
483 .drop_view(view_id);
484 }
485
486 pub fn update_view(&mut self, proto: &PbView) {
487 let database = self.get_database_mut(proto.database_id).unwrap();
488 let schema = database.get_schema_mut(proto.schema_id).unwrap();
489 if schema.get_view_by_id(&proto.id).is_some() {
490 schema.update_view(proto);
491 } else {
492 schema.create_view(proto);
494 database
495 .iter_schemas_mut()
496 .find(|schema| {
497 schema.id() != proto.schema_id && schema.get_view_by_id(&proto.id).is_some()
498 })
499 .unwrap()
500 .drop_view(proto.id);
501 }
502 }
503
504 pub fn drop_function(
505 &mut self,
506 db_id: DatabaseId,
507 schema_id: SchemaId,
508 function_id: FunctionId,
509 ) {
510 self.get_database_mut(db_id)
511 .unwrap()
512 .get_schema_mut(schema_id)
513 .unwrap()
514 .drop_function(function_id);
515 }
516
517 pub fn update_function(&mut self, proto: &PbFunction) {
518 let database = self.get_database_mut(proto.database_id).unwrap();
519 let schema = database.get_schema_mut(proto.schema_id).unwrap();
520 if schema.get_function_by_id(proto.id.into()).is_some() {
521 schema.update_function(proto);
522 } else {
523 schema.create_function(proto);
525 database
526 .iter_schemas_mut()
527 .find(|schema| {
528 schema.id() != proto.schema_id
529 && schema.get_function_by_id(proto.id.into()).is_some()
530 })
531 .unwrap()
532 .drop_function(proto.id.into());
533 }
534
535 self.get_database_mut(proto.database_id)
536 .unwrap()
537 .get_schema_mut(proto.schema_id)
538 .unwrap()
539 .update_function(proto);
540 }
541
542 pub fn get_database_by_name(&self, db_name: &str) -> CatalogResult<&DatabaseCatalog> {
543 self.database_by_name
544 .get(db_name)
545 .ok_or_else(|| CatalogError::NotFound("database", db_name.to_owned()))
546 }
547
548 pub fn get_database_by_id(&self, db_id: &DatabaseId) -> CatalogResult<&DatabaseCatalog> {
549 let db_name = self
550 .db_name_by_id
551 .get(db_id)
552 .ok_or_else(|| CatalogError::NotFound("db_id", db_id.to_string()))?;
553 self.database_by_name
554 .get(db_name)
555 .ok_or_else(|| CatalogError::NotFound("database", db_name.to_string()))
556 }
557
558 pub fn get_all_schema_names(&self, db_name: &str) -> CatalogResult<Vec<String>> {
559 Ok(self.get_database_by_name(db_name)?.get_all_schema_names())
560 }
561
562 pub fn iter_schemas(
563 &self,
564 db_name: &str,
565 ) -> CatalogResult<impl Iterator<Item = &SchemaCatalog>> {
566 Ok(self.get_database_by_name(db_name)?.iter_schemas())
567 }
568
569 pub fn get_all_database_names(&self) -> Vec<String> {
570 self.database_by_name.keys().cloned().collect_vec()
571 }
572
573 pub fn iter_databases(&self) -> impl Iterator<Item = &DatabaseCatalog> {
574 self.database_by_name.values()
575 }
576
577 pub fn get_schema_by_name(
578 &self,
579 db_name: &str,
580 schema_name: &str,
581 ) -> CatalogResult<&SchemaCatalog> {
582 self.get_database_by_name(db_name)?
583 .get_schema_by_name(schema_name)
584 .ok_or_else(|| CatalogError::NotFound("schema", schema_name.to_owned()))
585 }
586
587 pub fn get_table_name_by_id(&self, table_id: TableId) -> CatalogResult<String> {
588 self.get_any_table_by_id(&table_id)
589 .map(|table| table.name.clone())
590 }
591
592 pub fn get_schema_by_id(
593 &self,
594 db_id: &DatabaseId,
595 schema_id: &SchemaId,
596 ) -> CatalogResult<&SchemaCatalog> {
597 self.get_database_by_id(db_id)?
598 .get_schema_by_id(schema_id)
599 .ok_or_else(|| CatalogError::NotFound("schema_id", schema_id.to_string()))
600 }
601
602 pub fn first_valid_schema(
604 &self,
605 db_name: &str,
606 search_path: &SearchPath,
607 user_name: &str,
608 ) -> CatalogResult<&SchemaCatalog> {
609 for path in search_path.real_path() {
610 let mut schema_name: &str = path;
611 if schema_name == USER_NAME_WILD_CARD {
612 schema_name = user_name;
613 }
614
615 if let schema_catalog @ Ok(_) = self.get_schema_by_name(db_name, schema_name) {
616 return schema_catalog;
617 }
618 }
619 Err(CatalogError::NotFound(
620 "first valid schema",
621 "no schema has been selected to create in".to_owned(),
622 ))
623 }
624
625 pub fn get_source_by_id<'a>(
626 &self,
627 db_name: &'a str,
628 schema_path: SchemaPath<'a>,
629 source_id: &SourceId,
630 ) -> CatalogResult<(&Arc<SourceCatalog>, &'a str)> {
631 schema_path
632 .try_find(|schema_name| {
633 Ok(self
634 .get_schema_by_name(db_name, schema_name)?
635 .get_source_by_id(source_id))
636 })?
637 .ok_or_else(|| CatalogError::NotFound("source", source_id.to_string()))
638 }
639
640 pub fn get_table_by_name<'a>(
641 &self,
642 db_name: &str,
643 schema_path: SchemaPath<'a>,
644 table_name: &str,
645 bind_creating_relations: bool,
646 ) -> CatalogResult<(&Arc<TableCatalog>, &'a str)> {
647 schema_path
648 .try_find(|schema_name| {
649 Ok(self
650 .get_schema_by_name(db_name, schema_name)?
651 .get_table_by_name(table_name, bind_creating_relations))
652 })?
653 .ok_or_else(|| CatalogError::NotFound("table", table_name.to_owned()))
654 }
655
656 pub fn get_any_table_by_name<'a>(
659 &self,
660 db_name: &str,
661 schema_path: SchemaPath<'a>,
662 table_name: &str,
663 ) -> CatalogResult<(&Arc<TableCatalog>, &'a str)> {
664 self.get_table_by_name(db_name, schema_path, table_name, true)
665 }
666
667 pub fn get_created_table_by_name<'a>(
670 &self,
671 db_name: &str,
672 schema_path: SchemaPath<'a>,
673 table_name: &str,
674 ) -> CatalogResult<(&Arc<TableCatalog>, &'a str)> {
675 self.get_table_by_name(db_name, schema_path, table_name, false)
676 }
677
678 pub fn get_any_table_by_id(&self, table_id: &TableId) -> CatalogResult<&Arc<TableCatalog>> {
679 self.table_by_id
680 .get(table_id)
681 .ok_or_else(|| CatalogError::NotFound("table id", table_id.to_string()))
682 }
683
684 pub fn get_created_table_by_id_with_db(
686 &self,
687 db_name: &str,
688 table_id: u32,
689 ) -> CatalogResult<&Arc<TableCatalog>> {
690 let table_id = TableId::from(table_id);
691 for schema in self.get_database_by_name(db_name)?.iter_schemas() {
692 if let Some(table) = schema.get_created_table_by_id(&table_id) {
693 return Ok(table);
694 }
695 }
696 Err(CatalogError::NotFound("table id", table_id.to_string()))
697 }
698
699 pub fn iter_tables(&self) -> impl Iterator<Item = &Arc<TableCatalog>> {
700 self.table_by_id.values()
701 }
702
703 pub fn iter_backfilling_internal_tables(&self) -> impl Iterator<Item = &Arc<TableCatalog>> {
704 self.table_by_id
705 .values()
706 .filter(|t| t.is_internal_table() && !t.is_created())
707 }
708
709 pub fn alter_table_name_by_id(&mut self, table_id: &TableId, table_name: &str) {
711 let mut found = false;
712 for database in self.database_by_name.values() {
713 if !found {
714 for schema in database.iter_schemas() {
715 if schema.iter_user_table().any(|t| t.id() == *table_id) {
716 found = true;
717 break;
718 }
719 }
720 }
721 }
722
723 if found {
724 let mut table = self.get_any_table_by_id(table_id).unwrap().to_prost();
725 table.name = table_name.to_owned();
726 self.update_table(&table);
727 }
728 }
729
730 #[cfg(test)]
731 pub fn insert_table_id_mapping(&mut self, table_id: TableId, fragment_id: super::FragmentId) {
732 self.table_by_id.insert(
733 table_id,
734 Arc::new(TableCatalog {
735 fragment_id,
736 ..Default::default()
737 }),
738 );
739 }
740
741 pub fn get_sys_table_by_name(
742 &self,
743 db_name: &str,
744 schema_name: &str,
745 table_name: &str,
746 ) -> CatalogResult<&Arc<SystemTableCatalog>> {
747 self.get_schema_by_name(db_name, schema_name)?
748 .get_system_table_by_name(table_name)
749 .ok_or_else(|| CatalogError::NotFound("table", table_name.to_owned()))
750 }
751
752 pub fn get_source_by_name<'a>(
753 &self,
754 db_name: &str,
755 schema_path: SchemaPath<'a>,
756 source_name: &str,
757 ) -> CatalogResult<(&Arc<SourceCatalog>, &'a str)> {
758 schema_path
759 .try_find(|schema_name| {
760 Ok(self
761 .get_schema_by_name(db_name, schema_name)?
762 .get_source_by_name(source_name))
763 })?
764 .ok_or_else(|| CatalogError::NotFound("source", source_name.to_owned()))
765 }
766
767 pub fn get_sink_by_name<'a>(
768 &self,
769 db_name: &str,
770 schema_path: SchemaPath<'a>,
771 sink_name: &str,
772 ) -> CatalogResult<(&Arc<SinkCatalog>, &'a str)> {
773 schema_path
774 .try_find(|schema_name| {
775 Ok(self
776 .get_schema_by_name(db_name, schema_name)?
777 .get_sink_by_name(sink_name))
778 })?
779 .ok_or_else(|| CatalogError::NotFound("sink", sink_name.to_owned()))
780 }
781
782 pub fn get_subscription_by_name<'a>(
783 &self,
784 db_name: &str,
785 schema_path: SchemaPath<'a>,
786 subscription_name: &str,
787 ) -> CatalogResult<(&Arc<SubscriptionCatalog>, &'a str)> {
788 schema_path
789 .try_find(|schema_name| {
790 Ok(self
791 .get_schema_by_name(db_name, schema_name)?
792 .get_subscription_by_name(subscription_name))
793 })?
794 .ok_or_else(|| CatalogError::NotFound("subscription", subscription_name.to_owned()))
795 }
796
797 pub fn get_index_by_name<'a>(
798 &self,
799 db_name: &str,
800 schema_path: SchemaPath<'a>,
801 index_name: &str,
802 ) -> CatalogResult<(&Arc<IndexCatalog>, &'a str)> {
803 schema_path
804 .try_find(|schema_name| {
805 Ok(self
806 .get_schema_by_name(db_name, schema_name)?
807 .get_index_by_name(index_name))
808 })?
809 .ok_or_else(|| CatalogError::NotFound("index", index_name.to_owned()))
810 }
811
812 pub fn get_index_by_id(
813 &self,
814 db_name: &str,
815 index_id: u32,
816 ) -> CatalogResult<&Arc<IndexCatalog>> {
817 let index_id = IndexId::from(index_id);
818 for schema in self.get_database_by_name(db_name)?.iter_schemas() {
819 if let Some(index) = schema.get_index_by_id(&index_id) {
820 return Ok(index);
821 }
822 }
823 Err(CatalogError::NotFound("index", index_id.to_string()))
824 }
825
826 pub fn get_view_by_name<'a>(
827 &self,
828 db_name: &str,
829 schema_path: SchemaPath<'a>,
830 view_name: &str,
831 ) -> CatalogResult<(&Arc<ViewCatalog>, &'a str)> {
832 schema_path
833 .try_find(|schema_name| {
834 Ok(self
835 .get_schema_by_name(db_name, schema_name)?
836 .get_view_by_name(view_name))
837 })?
838 .ok_or_else(|| CatalogError::NotFound("view", view_name.to_owned()))
839 }
840
841 pub fn get_view_by_id(&self, db_name: &str, view_id: u32) -> CatalogResult<Arc<ViewCatalog>> {
842 for schema in self.get_database_by_name(db_name)?.iter_schemas() {
843 if let Some(view) = schema.get_view_by_id(&ViewId::from(view_id)) {
844 return Ok(view.clone());
845 }
846 }
847 Err(CatalogError::NotFound("view", view_id.to_string()))
848 }
849
850 pub fn get_secret_by_name<'a>(
851 &self,
852 db_name: &str,
853 schema_path: SchemaPath<'a>,
854 secret_name: &str,
855 ) -> CatalogResult<(&Arc<SecretCatalog>, &'a str)> {
856 schema_path
857 .try_find(|schema_name| {
858 Ok(self
859 .get_schema_by_name(db_name, schema_name)?
860 .get_secret_by_name(secret_name))
861 })?
862 .ok_or_else(|| CatalogError::NotFound("secret", secret_name.to_owned()))
863 }
864
865 pub fn get_connection_by_id(
866 &self,
867 db_name: &str,
868 connection_id: ConnectionId,
869 ) -> CatalogResult<&Arc<ConnectionCatalog>> {
870 for schema in self.get_database_by_name(db_name)?.iter_schemas() {
871 if let Some(conn) = schema.get_connection_by_id(&connection_id) {
872 return Ok(conn);
873 }
874 }
875 Err(CatalogError::NotFound(
876 "connection",
877 connection_id.to_string(),
878 ))
879 }
880
881 pub fn get_connection_by_name<'a>(
882 &self,
883 db_name: &str,
884 schema_path: SchemaPath<'a>,
885 connection_name: &str,
886 ) -> CatalogResult<(&Arc<ConnectionCatalog>, &'a str)> {
887 schema_path
888 .try_find(|schema_name| {
889 Ok(self
890 .get_schema_by_name(db_name, schema_name)?
891 .get_connection_by_name(connection_name))
892 })?
893 .ok_or_else(|| CatalogError::NotFound("connection", connection_name.to_owned()))
894 }
895
896 pub fn get_function_by_name_inputs<'a>(
897 &self,
898 db_name: &str,
899 schema_path: SchemaPath<'a>,
900 function_name: &str,
901 inputs: &mut [ExprImpl],
902 ) -> CatalogResult<(&Arc<FunctionCatalog>, &'a str)> {
903 schema_path
904 .try_find(|schema_name| {
905 Ok(self
906 .get_schema_by_name(db_name, schema_name)?
907 .get_function_by_name_inputs(function_name, inputs))
908 })?
909 .ok_or_else(|| {
910 CatalogError::NotFound(
911 "function",
912 format!(
913 "{}({})",
914 function_name,
915 inputs
916 .iter()
917 .map(|a| a.return_type().to_string())
918 .join(", ")
919 ),
920 )
921 })
922 }
923
924 pub fn get_function_by_name_args<'a>(
925 &self,
926 db_name: &str,
927 schema_path: SchemaPath<'a>,
928 function_name: &str,
929 args: &[DataType],
930 ) -> CatalogResult<(&Arc<FunctionCatalog>, &'a str)> {
931 schema_path
932 .try_find(|schema_name| {
933 Ok(self
934 .get_schema_by_name(db_name, schema_name)?
935 .get_function_by_name_args(function_name, args))
936 })?
937 .ok_or_else(|| {
938 CatalogError::NotFound(
939 "function",
940 format!(
941 "{}({})",
942 function_name,
943 args.iter().map(|a| a.to_string()).join(", ")
944 ),
945 )
946 })
947 }
948
949 pub fn get_functions_by_name<'a>(
951 &self,
952 db_name: &str,
953 schema_path: SchemaPath<'a>,
954 function_name: &str,
955 ) -> CatalogResult<(Vec<&Arc<FunctionCatalog>>, &'a str)> {
956 schema_path
957 .try_find(|schema_name| {
958 Ok(self
959 .get_schema_by_name(db_name, schema_name)?
960 .get_functions_by_name(function_name))
961 })?
962 .ok_or_else(|| CatalogError::NotFound("function", function_name.to_owned()))
963 }
964
965 pub fn check_relation_name_duplicated(
967 &self,
968 db_name: &str,
969 schema_name: &str,
970 relation_name: &str,
971 ) -> CatalogResult<()> {
972 let schema = self.get_schema_by_name(db_name, schema_name)?;
973
974 if let Some(table) = schema.get_any_table_by_name(relation_name) {
975 let is_creating = table.stream_job_status == StreamJobStatus::Creating;
976 if table.is_index() {
977 Err(CatalogError::Duplicated(
978 "index",
979 relation_name.to_owned(),
980 is_creating,
981 ))
982 } else if table.is_mview() {
983 Err(CatalogError::Duplicated(
984 "materialized view",
985 relation_name.to_owned(),
986 is_creating,
987 ))
988 } else {
989 Err(CatalogError::Duplicated(
990 "table",
991 relation_name.to_owned(),
992 is_creating,
993 ))
994 }
995 } else if schema.get_source_by_name(relation_name).is_some() {
996 Err(CatalogError::duplicated("source", relation_name.to_owned()))
997 } else if schema.get_sink_by_name(relation_name).is_some() {
998 Err(CatalogError::duplicated("sink", relation_name.to_owned()))
999 } else if schema.get_view_by_name(relation_name).is_some() {
1000 Err(CatalogError::duplicated("view", relation_name.to_owned()))
1001 } else if let Some(subscription) = schema.get_subscription_by_name(relation_name) {
1002 let is_not_created = subscription.subscription_state != SubscriptionState::Created;
1003 Err(CatalogError::Duplicated(
1004 "subscription",
1005 relation_name.to_owned(),
1006 is_not_created,
1007 ))
1008 } else {
1009 Ok(())
1010 }
1011 }
1012
1013 pub fn check_function_name_duplicated(
1014 &self,
1015 db_name: &str,
1016 schema_name: &str,
1017 function_name: &str,
1018 arg_types: &[DataType],
1019 ) -> CatalogResult<()> {
1020 let schema = self.get_schema_by_name(db_name, schema_name)?;
1021
1022 if schema
1023 .get_function_by_name_args(function_name, arg_types)
1024 .is_some()
1025 {
1026 let name = format!(
1027 "{function_name}({})",
1028 arg_types.iter().map(|t| t.to_string()).join(",")
1029 );
1030 Err(CatalogError::duplicated("function", name))
1031 } else {
1032 Ok(())
1033 }
1034 }
1035
1036 pub fn check_connection_name_duplicated(
1038 &self,
1039 db_name: &str,
1040 schema_name: &str,
1041 connection_name: &str,
1042 ) -> CatalogResult<()> {
1043 let schema = self.get_schema_by_name(db_name, schema_name)?;
1044
1045 if schema.get_connection_by_name(connection_name).is_some() {
1046 Err(CatalogError::duplicated(
1047 "connection",
1048 connection_name.to_owned(),
1049 ))
1050 } else {
1051 Ok(())
1052 }
1053 }
1054
1055 pub fn check_secret_name_duplicated(
1056 &self,
1057 db_name: &str,
1058 schema_name: &str,
1059 secret_name: &str,
1060 ) -> CatalogResult<()> {
1061 let schema = self.get_schema_by_name(db_name, schema_name)?;
1062
1063 if schema.get_secret_by_name(secret_name).is_some() {
1064 Err(CatalogError::duplicated("secret", secret_name.to_owned()))
1065 } else {
1066 Ok(())
1067 }
1068 }
1069
1070 pub fn table_stats(&self) -> &HummockVersionStats {
1071 &self.table_stats
1072 }
1073
1074 pub fn set_table_stats(&mut self, table_stats: HummockVersionStats) {
1075 self.table_stats = table_stats;
1076 }
1077
1078 pub fn get_all_indexes_related_to_object(
1079 &self,
1080 db_id: DatabaseId,
1081 schema_id: SchemaId,
1082 mv_id: TableId,
1083 ) -> Vec<Arc<IndexCatalog>> {
1084 self.get_database_by_id(&db_id)
1085 .unwrap()
1086 .get_schema_by_id(&schema_id)
1087 .unwrap()
1088 .get_indexes_by_table_id(&mv_id)
1089 }
1090
1091 pub fn get_id_by_class_name(
1092 &self,
1093 db_name: &str,
1094 schema_path: SchemaPath<'_>,
1095 class_name: &str,
1096 ) -> CatalogResult<u32> {
1097 schema_path
1098 .try_find(|schema_name| {
1099 let schema = self.get_schema_by_name(db_name, schema_name)?;
1100 #[allow(clippy::manual_map)]
1101 if let Some(item) = schema.get_system_table_by_name(class_name) {
1102 Ok(Some(item.id().into()))
1103 } else if let Some(item) = schema.get_created_table_by_name(class_name) {
1104 Ok(Some(item.id().into()))
1105 } else if let Some(item) = schema.get_index_by_name(class_name) {
1106 Ok(Some(item.id.into()))
1107 } else if let Some(item) = schema.get_source_by_name(class_name) {
1108 Ok(Some(item.id))
1109 } else if let Some(item) = schema.get_view_by_name(class_name) {
1110 Ok(Some(item.id))
1111 } else {
1112 Ok(None)
1113 }
1114 })?
1115 .map(|(id, _)| id)
1116 .ok_or_else(|| CatalogError::NotFound("class", class_name.to_owned()))
1117 }
1118}