1use std::collections::HashMap;
16use std::sync::Arc;
17
18use itertools::Itertools;
19use risingwave_common::catalog::{CatalogVersion, FunctionId, IndexId, StreamJobStatus, TableId};
20use risingwave_common::session_config::{SearchPath, USER_NAME_WILD_CARD};
21use risingwave_common::types::DataType;
22use risingwave_connector::sink::catalog::SinkCatalog;
23use risingwave_pb::catalog::{
24 PbConnection, PbDatabase, PbFunction, PbIndex, PbSchema, PbSecret, PbSink, PbSource,
25 PbSubscription, PbTable, PbView,
26};
27use risingwave_pb::hummock::HummockVersionStats;
28
29use super::function_catalog::FunctionCatalog;
30use super::source_catalog::SourceCatalog;
31use super::subscription_catalog::{SubscriptionCatalog, SubscriptionState};
32use super::view_catalog::ViewCatalog;
33use super::{
34 CatalogError, CatalogResult, ConnectionId, SecretId, SinkId, SourceId, SubscriptionId, ViewId,
35};
36use crate::catalog::connection_catalog::ConnectionCatalog;
37use crate::catalog::database_catalog::DatabaseCatalog;
38use crate::catalog::schema_catalog::SchemaCatalog;
39use crate::catalog::secret_catalog::SecretCatalog;
40use crate::catalog::system_catalog::{
41 SystemTableCatalog, get_sys_tables_in_schema, get_sys_views_in_schema,
42};
43use crate::catalog::table_catalog::TableCatalog;
44use crate::catalog::{DatabaseId, IndexCatalog, SchemaId};
45use crate::expr::{Expr, ExprImpl};
46
47#[derive(Copy, Clone)]
48pub enum SchemaPath<'a> {
49 Name(&'a str),
50 Path(&'a SearchPath, &'a str),
52}
53
54impl<'a> SchemaPath<'a> {
55 pub fn new(
56 schema_name: Option<&'a str>,
57 search_path: &'a SearchPath,
58 user_name: &'a str,
59 ) -> Self {
60 match schema_name {
61 Some(schema_name) => SchemaPath::Name(schema_name),
62 None => SchemaPath::Path(search_path, user_name),
63 }
64 }
65
66 pub fn try_find<T, E>(
68 &self,
69 mut f: impl FnMut(&str) -> Result<Option<T>, E>,
70 ) -> Result<Option<(T, &'a str)>, E> {
71 match self {
72 SchemaPath::Name(schema_name) => Ok(f(schema_name)?.map(|t| (t, *schema_name))),
73 SchemaPath::Path(search_path, user_name) => {
74 for schema_name in search_path.path() {
75 let mut schema_name: &str = schema_name;
76 if schema_name == USER_NAME_WILD_CARD {
77 schema_name = user_name;
78 }
79 if let Ok(Some(res)) = f(schema_name) {
80 return Ok(Some((res, schema_name)));
81 }
82 }
83 Ok(None)
84 }
85 }
86 }
87}
88
89pub struct Catalog {
101 version: CatalogVersion,
102 database_by_name: HashMap<String, DatabaseCatalog>,
103 db_name_by_id: HashMap<DatabaseId, String>,
104 table_by_id: HashMap<TableId, Arc<TableCatalog>>,
106 table_stats: HummockVersionStats,
107}
108
109#[expect(clippy::derivable_impls)]
110impl Default for Catalog {
111 fn default() -> Self {
112 Self {
113 version: 0,
114 database_by_name: HashMap::new(),
115 db_name_by_id: HashMap::new(),
116 table_by_id: HashMap::new(),
117 table_stats: HummockVersionStats::default(),
118 }
119 }
120}
121
122impl Catalog {
123 fn get_database_mut(&mut self, db_id: DatabaseId) -> Option<&mut DatabaseCatalog> {
124 let name = self.db_name_by_id.get(&db_id)?;
125 self.database_by_name.get_mut(name)
126 }
127
128 pub fn clear(&mut self) {
129 self.database_by_name.clear();
130 self.db_name_by_id.clear();
131 self.table_by_id.clear();
132 }
133
134 pub fn create_database(&mut self, db: &PbDatabase) {
135 let name = db.name.clone();
136 let id = db.id;
137
138 self.database_by_name
139 .try_insert(name.clone(), db.into())
140 .unwrap();
141 self.db_name_by_id.try_insert(id, name).unwrap();
142 }
143
144 pub fn create_schema(&mut self, proto: &PbSchema) {
145 self.get_database_mut(proto.database_id)
146 .unwrap()
147 .create_schema(proto);
148
149 for sys_table in get_sys_tables_in_schema(proto.name.as_str()) {
150 self.get_database_mut(proto.database_id)
151 .unwrap()
152 .get_schema_mut(proto.id)
153 .unwrap()
154 .create_sys_table(sys_table);
155 }
156 for mut sys_view in get_sys_views_in_schema(proto.name.as_str()) {
157 sys_view.database_id = proto.database_id;
158 sys_view.schema_id = proto.id;
159 self.get_database_mut(proto.database_id)
160 .unwrap()
161 .get_schema_mut(proto.id)
162 .unwrap()
163 .create_sys_view(Arc::new(sys_view));
164 }
165 }
166
167 pub fn create_table(&mut self, proto: &PbTable) {
168 let table = self
169 .get_database_mut(proto.database_id)
170 .unwrap()
171 .get_schema_mut(proto.schema_id)
172 .unwrap()
173 .create_table(proto);
174 self.table_by_id.insert(proto.id.into(), table);
175 }
176
177 pub fn create_index(&mut self, proto: &PbIndex) {
178 self.get_database_mut(proto.database_id)
179 .unwrap()
180 .get_schema_mut(proto.schema_id)
181 .unwrap()
182 .create_index(proto);
183 }
184
185 pub fn create_source(&mut self, proto: &PbSource) {
186 self.get_database_mut(proto.database_id)
187 .unwrap()
188 .get_schema_mut(proto.schema_id)
189 .unwrap()
190 .create_source(proto);
191 }
192
193 pub fn create_sink(&mut self, proto: &PbSink) {
194 self.get_database_mut(proto.database_id)
195 .unwrap()
196 .get_schema_mut(proto.schema_id)
197 .unwrap()
198 .create_sink(proto);
199 }
200
201 pub fn create_subscription(&mut self, proto: &PbSubscription) {
202 self.get_database_mut(proto.database_id)
203 .unwrap()
204 .get_schema_mut(proto.schema_id)
205 .unwrap()
206 .create_subscription(proto);
207 }
208
209 pub fn create_secret(&mut self, proto: &PbSecret) {
210 self.get_database_mut(proto.database_id)
211 .unwrap()
212 .get_schema_mut(proto.schema_id)
213 .unwrap()
214 .create_secret(proto);
215 }
216
217 pub fn create_view(&mut self, proto: &PbView) {
218 self.get_database_mut(proto.database_id)
219 .unwrap()
220 .get_schema_mut(proto.schema_id)
221 .unwrap()
222 .create_view(proto);
223 }
224
225 pub fn create_function(&mut self, proto: &PbFunction) {
226 self.get_database_mut(proto.database_id)
227 .unwrap()
228 .get_schema_mut(proto.schema_id)
229 .unwrap()
230 .create_function(proto);
231 }
232
233 pub fn create_connection(&mut self, proto: &PbConnection) {
234 self.get_database_mut(proto.database_id)
235 .unwrap()
236 .get_schema_mut(proto.schema_id)
237 .unwrap()
238 .create_connection(proto);
239 }
240
241 pub fn drop_connection(
242 &mut self,
243 db_id: DatabaseId,
244 schema_id: SchemaId,
245 connection_id: ConnectionId,
246 ) {
247 self.get_database_mut(db_id)
248 .unwrap()
249 .get_schema_mut(schema_id)
250 .unwrap()
251 .drop_connection(connection_id);
252 }
253
254 pub fn update_connection(&mut self, proto: &PbConnection) {
255 let database = self.get_database_mut(proto.database_id).unwrap();
256 let schema = database.get_schema_mut(proto.schema_id).unwrap();
257 if schema.get_connection_by_id(&proto.id).is_some() {
258 schema.update_connection(proto);
259 } else {
260 schema.create_connection(proto);
262 database
263 .iter_schemas_mut()
264 .find(|schema| {
265 schema.id() != proto.schema_id
266 && schema.get_connection_by_id(&proto.id).is_some()
267 })
268 .unwrap()
269 .drop_connection(proto.id);
270 }
271 }
272
273 pub fn update_secret(&mut self, proto: &PbSecret) {
274 let database = self.get_database_mut(proto.database_id).unwrap();
275 let schema = database.get_schema_mut(proto.schema_id).unwrap();
276 let secret_id = SecretId::new(proto.id);
277 if schema.get_secret_by_id(&secret_id).is_some() {
278 schema.update_secret(proto);
279 } else {
280 schema.create_secret(proto);
282 database
283 .iter_schemas_mut()
284 .find(|schema| {
285 schema.id() != proto.schema_id && schema.get_secret_by_id(&secret_id).is_some()
286 })
287 .unwrap()
288 .drop_secret(secret_id);
289 }
290 }
291
292 pub fn drop_database(&mut self, db_id: DatabaseId) {
293 let name = self.db_name_by_id.remove(&db_id).unwrap();
294 let database = self.database_by_name.remove(&name).unwrap();
295 database.iter_all_table_ids().for_each(|table| {
296 self.table_by_id.remove(&table);
297 });
298 }
299
300 pub fn drop_schema(&mut self, db_id: DatabaseId, schema_id: SchemaId) {
301 self.get_database_mut(db_id).unwrap().drop_schema(schema_id);
302 }
303
304 pub fn drop_table(&mut self, db_id: DatabaseId, schema_id: SchemaId, tb_id: TableId) {
305 self.table_by_id.remove(&tb_id);
306 self.get_database_mut(db_id)
307 .unwrap()
308 .get_schema_mut(schema_id)
309 .unwrap()
310 .drop_table(tb_id);
311 }
312
313 pub fn update_table(&mut self, proto: &PbTable) {
314 let database = self.get_database_mut(proto.database_id).unwrap();
315 let schema = database.get_schema_mut(proto.schema_id).unwrap();
316 let table = if schema.get_table_by_id(&proto.id.into()).is_some() {
317 schema.update_table(proto)
318 } else {
319 let new_table = schema.create_table(proto);
321 database
322 .iter_schemas_mut()
323 .find(|schema| {
324 schema.id() != proto.schema_id
325 && schema.get_created_table_by_id(&proto.id.into()).is_some()
326 })
327 .unwrap()
328 .drop_table(proto.id.into());
329 new_table
330 };
331
332 self.table_by_id.insert(proto.id.into(), table);
333 }
334
335 pub fn update_database(&mut self, proto: &PbDatabase) {
336 let id = proto.id;
337 let name = proto.name.clone();
338
339 let old_database_name = self.db_name_by_id.get(&id).unwrap().to_owned();
340 if old_database_name != name {
341 let mut database = self.database_by_name.remove(&old_database_name).unwrap();
342 database.name.clone_from(&name);
343 database.owner = proto.owner;
344 self.database_by_name.insert(name.clone(), database);
345 self.db_name_by_id.insert(id, name);
346 } else {
347 let database = self.get_database_mut(id).unwrap();
348 database.name = name;
349 database.owner = proto.owner;
350 database.barrier_interval_ms = proto.barrier_interval_ms;
351 database.checkpoint_frequency = proto.checkpoint_frequency;
352 }
353 }
354
355 pub fn update_schema(&mut self, proto: &PbSchema) {
356 self.get_database_mut(proto.database_id)
357 .unwrap()
358 .update_schema(proto);
359 }
360
361 pub fn update_index(&mut self, proto: &PbIndex) {
362 let database = self.get_database_mut(proto.database_id).unwrap();
363 let schema = database.get_schema_mut(proto.schema_id).unwrap();
364 if schema.get_index_by_id(&proto.id.into()).is_some() {
365 schema.update_index(proto);
366 } else {
367 schema.create_index(proto);
369 database
370 .iter_schemas_mut()
371 .find(|schema| {
372 schema.id() != proto.schema_id
373 && schema.get_index_by_id(&proto.id.into()).is_some()
374 })
375 .unwrap()
376 .drop_index(proto.id.into());
377 }
378 }
379
380 pub fn drop_source(&mut self, db_id: DatabaseId, schema_id: SchemaId, source_id: SourceId) {
381 self.get_database_mut(db_id)
382 .unwrap()
383 .get_schema_mut(schema_id)
384 .unwrap()
385 .drop_source(source_id);
386 }
387
388 pub fn update_source(&mut self, proto: &PbSource) {
389 let database = self.get_database_mut(proto.database_id).unwrap();
390 let schema = database.get_schema_mut(proto.schema_id).unwrap();
391 if schema.get_source_by_id(&proto.id).is_some() {
392 schema.update_source(proto);
393 } else {
394 schema.create_source(proto);
396 database
397 .iter_schemas_mut()
398 .find(|schema| {
399 schema.id() != proto.schema_id && schema.get_source_by_id(&proto.id).is_some()
400 })
401 .unwrap()
402 .drop_source(proto.id);
403 }
404 }
405
406 pub fn drop_sink(&mut self, db_id: DatabaseId, schema_id: SchemaId, sink_id: SinkId) {
407 self.get_database_mut(db_id)
408 .unwrap()
409 .get_schema_mut(schema_id)
410 .unwrap()
411 .drop_sink(sink_id);
412 }
413
414 pub fn drop_secret(&mut self, db_id: DatabaseId, schema_id: SchemaId, secret_id: SecretId) {
415 self.get_database_mut(db_id)
416 .unwrap()
417 .get_schema_mut(schema_id)
418 .unwrap()
419 .drop_secret(secret_id);
420 }
421
422 pub fn update_sink(&mut self, proto: &PbSink) {
423 let database = self.get_database_mut(proto.database_id).unwrap();
424 let schema = database.get_schema_mut(proto.schema_id).unwrap();
425 if schema.get_sink_by_id(&proto.id).is_some() {
426 schema.update_sink(proto);
427 } else {
428 schema.create_sink(proto);
430 database
431 .iter_schemas_mut()
432 .find(|schema| {
433 schema.id() != proto.schema_id && schema.get_sink_by_id(&proto.id).is_some()
434 })
435 .unwrap()
436 .drop_sink(proto.id);
437 }
438 }
439
440 pub fn drop_subscription(
441 &mut self,
442 db_id: DatabaseId,
443 schema_id: SchemaId,
444 subscription_id: SubscriptionId,
445 ) {
446 self.get_database_mut(db_id)
447 .unwrap()
448 .get_schema_mut(schema_id)
449 .unwrap()
450 .drop_subscription(subscription_id);
451 }
452
453 pub fn update_subscription(&mut self, proto: &PbSubscription) {
454 let database = self.get_database_mut(proto.database_id).unwrap();
455 let schema = database.get_schema_mut(proto.schema_id).unwrap();
456 if schema.get_subscription_by_id(&proto.id).is_some() {
457 schema.update_subscription(proto);
458 } else {
459 schema.create_subscription(proto);
461 database
462 .iter_schemas_mut()
463 .find(|schema| {
464 schema.id() != proto.schema_id
465 && schema.get_subscription_by_id(&proto.id).is_some()
466 })
467 .unwrap()
468 .drop_subscription(proto.id);
469 }
470 }
471
472 pub fn drop_index(&mut self, db_id: DatabaseId, schema_id: SchemaId, index_id: IndexId) {
473 self.get_database_mut(db_id)
474 .unwrap()
475 .get_schema_mut(schema_id)
476 .unwrap()
477 .drop_index(index_id);
478 }
479
480 pub fn drop_view(&mut self, db_id: DatabaseId, schema_id: SchemaId, view_id: ViewId) {
481 self.get_database_mut(db_id)
482 .unwrap()
483 .get_schema_mut(schema_id)
484 .unwrap()
485 .drop_view(view_id);
486 }
487
488 pub fn update_view(&mut self, proto: &PbView) {
489 let database = self.get_database_mut(proto.database_id).unwrap();
490 let schema = database.get_schema_mut(proto.schema_id).unwrap();
491 if schema.get_view_by_id(&proto.id).is_some() {
492 schema.update_view(proto);
493 } else {
494 schema.create_view(proto);
496 database
497 .iter_schemas_mut()
498 .find(|schema| {
499 schema.id() != proto.schema_id && schema.get_view_by_id(&proto.id).is_some()
500 })
501 .unwrap()
502 .drop_view(proto.id);
503 }
504 }
505
506 pub fn drop_function(
507 &mut self,
508 db_id: DatabaseId,
509 schema_id: SchemaId,
510 function_id: FunctionId,
511 ) {
512 self.get_database_mut(db_id)
513 .unwrap()
514 .get_schema_mut(schema_id)
515 .unwrap()
516 .drop_function(function_id);
517 }
518
519 pub fn update_function(&mut self, proto: &PbFunction) {
520 let database = self.get_database_mut(proto.database_id).unwrap();
521 let schema = database.get_schema_mut(proto.schema_id).unwrap();
522 if schema.get_function_by_id(proto.id.into()).is_some() {
523 schema.update_function(proto);
524 } else {
525 schema.create_function(proto);
527 database
528 .iter_schemas_mut()
529 .find(|schema| {
530 schema.id() != proto.schema_id
531 && schema.get_function_by_id(proto.id.into()).is_some()
532 })
533 .unwrap()
534 .drop_function(proto.id.into());
535 }
536
537 self.get_database_mut(proto.database_id)
538 .unwrap()
539 .get_schema_mut(proto.schema_id)
540 .unwrap()
541 .update_function(proto);
542 }
543
544 pub fn get_database_by_name(&self, db_name: &str) -> CatalogResult<&DatabaseCatalog> {
545 self.database_by_name
546 .get(db_name)
547 .ok_or_else(|| CatalogError::NotFound("database", db_name.to_owned()))
548 }
549
550 pub fn get_database_by_id(&self, db_id: &DatabaseId) -> CatalogResult<&DatabaseCatalog> {
551 let db_name = self
552 .db_name_by_id
553 .get(db_id)
554 .ok_or_else(|| CatalogError::NotFound("db_id", db_id.to_string()))?;
555 self.database_by_name
556 .get(db_name)
557 .ok_or_else(|| CatalogError::NotFound("database", db_name.to_string()))
558 }
559
560 pub fn get_all_schema_names(&self, db_name: &str) -> CatalogResult<Vec<String>> {
561 Ok(self.get_database_by_name(db_name)?.get_all_schema_names())
562 }
563
564 pub fn iter_schemas(
565 &self,
566 db_name: &str,
567 ) -> CatalogResult<impl Iterator<Item = &SchemaCatalog>> {
568 Ok(self.get_database_by_name(db_name)?.iter_schemas())
569 }
570
571 pub fn get_all_database_names(&self) -> Vec<String> {
572 self.database_by_name.keys().cloned().collect_vec()
573 }
574
575 pub fn iter_databases(&self) -> impl Iterator<Item = &DatabaseCatalog> {
576 self.database_by_name.values()
577 }
578
579 pub fn get_schema_by_name(
580 &self,
581 db_name: &str,
582 schema_name: &str,
583 ) -> CatalogResult<&SchemaCatalog> {
584 self.get_database_by_name(db_name)?
585 .get_schema_by_name(schema_name)
586 .ok_or_else(|| CatalogError::NotFound("schema", schema_name.to_owned()))
587 }
588
589 pub fn get_table_name_by_id(&self, table_id: TableId) -> CatalogResult<String> {
590 self.get_any_table_by_id(&table_id)
591 .map(|table| table.name.clone())
592 }
593
594 pub fn get_schema_by_id(
595 &self,
596 db_id: &DatabaseId,
597 schema_id: &SchemaId,
598 ) -> CatalogResult<&SchemaCatalog> {
599 self.get_database_by_id(db_id)?
600 .get_schema_by_id(schema_id)
601 .ok_or_else(|| CatalogError::NotFound("schema_id", schema_id.to_string()))
602 }
603
604 pub fn first_valid_schema(
606 &self,
607 db_name: &str,
608 search_path: &SearchPath,
609 user_name: &str,
610 ) -> CatalogResult<&SchemaCatalog> {
611 for path in search_path.real_path() {
612 let mut schema_name: &str = path;
613 if schema_name == USER_NAME_WILD_CARD {
614 schema_name = user_name;
615 }
616
617 if let schema_catalog @ Ok(_) = self.get_schema_by_name(db_name, schema_name) {
618 return schema_catalog;
619 }
620 }
621 Err(CatalogError::NotFound(
622 "first valid schema",
623 "no schema has been selected to create in".to_owned(),
624 ))
625 }
626
627 pub fn get_source_by_id<'a>(
628 &self,
629 db_name: &'a str,
630 schema_path: SchemaPath<'a>,
631 source_id: &SourceId,
632 ) -> CatalogResult<(&Arc<SourceCatalog>, &'a str)> {
633 schema_path
634 .try_find(|schema_name| {
635 Ok(self
636 .get_schema_by_name(db_name, schema_name)?
637 .get_source_by_id(source_id))
638 })?
639 .ok_or_else(|| CatalogError::NotFound("source", source_id.to_string()))
640 }
641
642 pub fn get_table_by_name<'a>(
643 &self,
644 db_name: &str,
645 schema_path: SchemaPath<'a>,
646 table_name: &str,
647 bind_creating_relations: bool,
648 ) -> CatalogResult<(&Arc<TableCatalog>, &'a str)> {
649 schema_path
650 .try_find(|schema_name| {
651 Ok(self
652 .get_schema_by_name(db_name, schema_name)?
653 .get_table_by_name(table_name, bind_creating_relations))
654 })?
655 .ok_or_else(|| CatalogError::NotFound("table", table_name.to_owned()))
656 }
657
658 pub fn get_any_table_by_name<'a>(
661 &self,
662 db_name: &str,
663 schema_path: SchemaPath<'a>,
664 table_name: &str,
665 ) -> CatalogResult<(&Arc<TableCatalog>, &'a str)> {
666 self.get_table_by_name(db_name, schema_path, table_name, true)
667 }
668
669 pub fn get_created_table_by_name<'a>(
672 &self,
673 db_name: &str,
674 schema_path: SchemaPath<'a>,
675 table_name: &str,
676 ) -> CatalogResult<(&Arc<TableCatalog>, &'a str)> {
677 self.get_table_by_name(db_name, schema_path, table_name, false)
678 }
679
680 pub fn get_any_table_by_id(&self, table_id: &TableId) -> CatalogResult<&Arc<TableCatalog>> {
681 self.table_by_id
682 .get(table_id)
683 .ok_or_else(|| CatalogError::NotFound("table id", table_id.to_string()))
684 }
685
686 pub fn get_created_table_by_id_with_db(
688 &self,
689 db_name: &str,
690 table_id: u32,
691 ) -> CatalogResult<&Arc<TableCatalog>> {
692 let table_id = TableId::from(table_id);
693 for schema in self.get_database_by_name(db_name)?.iter_schemas() {
694 if let Some(table) = schema.get_created_table_by_id(&table_id) {
695 return Ok(table);
696 }
697 }
698 Err(CatalogError::NotFound("table id", table_id.to_string()))
699 }
700
701 pub fn iter_tables(&self) -> impl Iterator<Item = &Arc<TableCatalog>> {
702 self.table_by_id.values()
703 }
704
705 pub fn iter_backfilling_internal_tables(&self) -> impl Iterator<Item = &Arc<TableCatalog>> {
706 self.table_by_id
707 .values()
708 .filter(|t| t.is_internal_table() && !t.is_created())
709 }
710
711 pub fn alter_table_name_by_id(&mut self, table_id: &TableId, table_name: &str) {
713 let mut found = false;
714 for database in self.database_by_name.values() {
715 if !found {
716 for schema in database.iter_schemas() {
717 if schema.iter_user_table().any(|t| t.id() == *table_id) {
718 found = true;
719 break;
720 }
721 }
722 }
723 }
724
725 if found {
726 let mut table = self.get_any_table_by_id(table_id).unwrap().to_prost();
727 table.name = table_name.to_owned();
728 self.update_table(&table);
729 }
730 }
731
732 #[cfg(test)]
733 pub fn insert_table_id_mapping(&mut self, table_id: TableId, fragment_id: super::FragmentId) {
734 self.table_by_id.insert(
735 table_id,
736 Arc::new(TableCatalog {
737 fragment_id,
738 ..Default::default()
739 }),
740 );
741 }
742
743 pub fn get_sys_table_by_name(
744 &self,
745 db_name: &str,
746 schema_name: &str,
747 table_name: &str,
748 ) -> CatalogResult<&Arc<SystemTableCatalog>> {
749 self.get_schema_by_name(db_name, schema_name)?
750 .get_system_table_by_name(table_name)
751 .ok_or_else(|| CatalogError::NotFound("table", table_name.to_owned()))
752 }
753
754 pub fn get_source_by_name<'a>(
755 &self,
756 db_name: &str,
757 schema_path: SchemaPath<'a>,
758 source_name: &str,
759 ) -> CatalogResult<(&Arc<SourceCatalog>, &'a str)> {
760 schema_path
761 .try_find(|schema_name| {
762 Ok(self
763 .get_schema_by_name(db_name, schema_name)?
764 .get_source_by_name(source_name))
765 })?
766 .ok_or_else(|| CatalogError::NotFound("source", source_name.to_owned()))
767 }
768
769 pub fn get_sink_by_name<'a>(
770 &self,
771 db_name: &str,
772 schema_path: SchemaPath<'a>,
773 sink_name: &str,
774 ) -> CatalogResult<(&Arc<SinkCatalog>, &'a str)> {
775 schema_path
776 .try_find(|schema_name| {
777 Ok(self
778 .get_schema_by_name(db_name, schema_name)?
779 .get_sink_by_name(sink_name))
780 })?
781 .ok_or_else(|| CatalogError::NotFound("sink", sink_name.to_owned()))
782 }
783
784 pub fn get_subscription_by_name<'a>(
785 &self,
786 db_name: &str,
787 schema_path: SchemaPath<'a>,
788 subscription_name: &str,
789 ) -> CatalogResult<(&Arc<SubscriptionCatalog>, &'a str)> {
790 schema_path
791 .try_find(|schema_name| {
792 Ok(self
793 .get_schema_by_name(db_name, schema_name)?
794 .get_subscription_by_name(subscription_name))
795 })?
796 .ok_or_else(|| CatalogError::NotFound("subscription", subscription_name.to_owned()))
797 }
798
799 pub fn get_index_by_name<'a>(
800 &self,
801 db_name: &str,
802 schema_path: SchemaPath<'a>,
803 index_name: &str,
804 ) -> CatalogResult<(&Arc<IndexCatalog>, &'a str)> {
805 schema_path
806 .try_find(|schema_name| {
807 Ok(self
808 .get_schema_by_name(db_name, schema_name)?
809 .get_index_by_name(index_name))
810 })?
811 .ok_or_else(|| CatalogError::NotFound("index", index_name.to_owned()))
812 }
813
814 pub fn get_index_by_id(
815 &self,
816 db_name: &str,
817 index_id: u32,
818 ) -> CatalogResult<&Arc<IndexCatalog>> {
819 let index_id = IndexId::from(index_id);
820 for schema in self.get_database_by_name(db_name)?.iter_schemas() {
821 if let Some(index) = schema.get_index_by_id(&index_id) {
822 return Ok(index);
823 }
824 }
825 Err(CatalogError::NotFound("index", index_id.to_string()))
826 }
827
828 pub fn get_view_by_name<'a>(
829 &self,
830 db_name: &str,
831 schema_path: SchemaPath<'a>,
832 view_name: &str,
833 ) -> CatalogResult<(&Arc<ViewCatalog>, &'a str)> {
834 schema_path
835 .try_find(|schema_name| {
836 Ok(self
837 .get_schema_by_name(db_name, schema_name)?
838 .get_view_by_name(view_name))
839 })?
840 .ok_or_else(|| CatalogError::NotFound("view", view_name.to_owned()))
841 }
842
843 pub fn get_view_by_id(&self, db_name: &str, view_id: u32) -> CatalogResult<Arc<ViewCatalog>> {
844 for schema in self.get_database_by_name(db_name)?.iter_schemas() {
845 if let Some(view) = schema.get_view_by_id(&ViewId::from(view_id)) {
846 return Ok(view.clone());
847 }
848 }
849 Err(CatalogError::NotFound("view", view_id.to_string()))
850 }
851
852 pub fn get_secret_by_name<'a>(
853 &self,
854 db_name: &str,
855 schema_path: SchemaPath<'a>,
856 secret_name: &str,
857 ) -> CatalogResult<(&Arc<SecretCatalog>, &'a str)> {
858 schema_path
859 .try_find(|schema_name| {
860 Ok(self
861 .get_schema_by_name(db_name, schema_name)?
862 .get_secret_by_name(secret_name))
863 })?
864 .ok_or_else(|| CatalogError::NotFound("secret", secret_name.to_owned()))
865 }
866
867 pub fn get_connection_by_name<'a>(
868 &self,
869 db_name: &str,
870 schema_path: SchemaPath<'a>,
871 connection_name: &str,
872 ) -> CatalogResult<(&Arc<ConnectionCatalog>, &'a str)> {
873 schema_path
874 .try_find(|schema_name| {
875 Ok(self
876 .get_schema_by_name(db_name, schema_name)?
877 .get_connection_by_name(connection_name))
878 })?
879 .ok_or_else(|| CatalogError::NotFound("connection", connection_name.to_owned()))
880 }
881
882 pub fn get_function_by_name_inputs<'a>(
883 &self,
884 db_name: &str,
885 schema_path: SchemaPath<'a>,
886 function_name: &str,
887 inputs: &mut [ExprImpl],
888 ) -> CatalogResult<(&Arc<FunctionCatalog>, &'a str)> {
889 schema_path
890 .try_find(|schema_name| {
891 Ok(self
892 .get_schema_by_name(db_name, schema_name)?
893 .get_function_by_name_inputs(function_name, inputs))
894 })?
895 .ok_or_else(|| {
896 CatalogError::NotFound(
897 "function",
898 format!(
899 "{}({})",
900 function_name,
901 inputs
902 .iter()
903 .map(|a| a.return_type().to_string())
904 .join(", ")
905 ),
906 )
907 })
908 }
909
910 pub fn get_function_by_name_args<'a>(
911 &self,
912 db_name: &str,
913 schema_path: SchemaPath<'a>,
914 function_name: &str,
915 args: &[DataType],
916 ) -> CatalogResult<(&Arc<FunctionCatalog>, &'a str)> {
917 schema_path
918 .try_find(|schema_name| {
919 Ok(self
920 .get_schema_by_name(db_name, schema_name)?
921 .get_function_by_name_args(function_name, args))
922 })?
923 .ok_or_else(|| {
924 CatalogError::NotFound(
925 "function",
926 format!(
927 "{}({})",
928 function_name,
929 args.iter().map(|a| a.to_string()).join(", ")
930 ),
931 )
932 })
933 }
934
935 pub fn get_functions_by_name<'a>(
937 &self,
938 db_name: &str,
939 schema_path: SchemaPath<'a>,
940 function_name: &str,
941 ) -> CatalogResult<(Vec<&Arc<FunctionCatalog>>, &'a str)> {
942 schema_path
943 .try_find(|schema_name| {
944 Ok(self
945 .get_schema_by_name(db_name, schema_name)?
946 .get_functions_by_name(function_name))
947 })?
948 .ok_or_else(|| CatalogError::NotFound("function", function_name.to_owned()))
949 }
950
951 pub fn check_relation_name_duplicated(
953 &self,
954 db_name: &str,
955 schema_name: &str,
956 relation_name: &str,
957 ) -> CatalogResult<()> {
958 let schema = self.get_schema_by_name(db_name, schema_name)?;
959
960 if let Some(table) = schema.get_any_table_by_name(relation_name) {
961 let is_creating = table.stream_job_status == StreamJobStatus::Creating;
962 if table.is_index() {
963 Err(CatalogError::Duplicated(
964 "index",
965 relation_name.to_owned(),
966 is_creating,
967 ))
968 } else if table.is_mview() {
969 Err(CatalogError::Duplicated(
970 "materialized view",
971 relation_name.to_owned(),
972 is_creating,
973 ))
974 } else {
975 Err(CatalogError::Duplicated(
976 "table",
977 relation_name.to_owned(),
978 is_creating,
979 ))
980 }
981 } else if schema.get_source_by_name(relation_name).is_some() {
982 Err(CatalogError::duplicated("source", relation_name.to_owned()))
983 } else if schema.get_sink_by_name(relation_name).is_some() {
984 Err(CatalogError::duplicated("sink", relation_name.to_owned()))
985 } else if schema.get_view_by_name(relation_name).is_some() {
986 Err(CatalogError::duplicated("view", relation_name.to_owned()))
987 } else if let Some(subscription) = schema.get_subscription_by_name(relation_name) {
988 let is_not_created = subscription.subscription_state != SubscriptionState::Created;
989 Err(CatalogError::Duplicated(
990 "subscription",
991 relation_name.to_owned(),
992 is_not_created,
993 ))
994 } else {
995 Ok(())
996 }
997 }
998
999 pub fn check_function_name_duplicated(
1000 &self,
1001 db_name: &str,
1002 schema_name: &str,
1003 function_name: &str,
1004 arg_types: &[DataType],
1005 ) -> CatalogResult<()> {
1006 let schema = self.get_schema_by_name(db_name, schema_name)?;
1007
1008 if schema
1009 .get_function_by_name_args(function_name, arg_types)
1010 .is_some()
1011 {
1012 let name = format!(
1013 "{function_name}({})",
1014 arg_types.iter().map(|t| t.to_string()).join(",")
1015 );
1016 Err(CatalogError::duplicated("function", name))
1017 } else {
1018 Ok(())
1019 }
1020 }
1021
1022 pub fn check_connection_name_duplicated(
1024 &self,
1025 db_name: &str,
1026 schema_name: &str,
1027 connection_name: &str,
1028 ) -> CatalogResult<()> {
1029 let schema = self.get_schema_by_name(db_name, schema_name)?;
1030
1031 if schema.get_connection_by_name(connection_name).is_some() {
1032 Err(CatalogError::duplicated(
1033 "connection",
1034 connection_name.to_owned(),
1035 ))
1036 } else {
1037 Ok(())
1038 }
1039 }
1040
1041 pub fn check_secret_name_duplicated(
1042 &self,
1043 db_name: &str,
1044 schema_name: &str,
1045 secret_name: &str,
1046 ) -> CatalogResult<()> {
1047 let schema = self.get_schema_by_name(db_name, schema_name)?;
1048
1049 if schema.get_secret_by_name(secret_name).is_some() {
1050 Err(CatalogError::duplicated("secret", secret_name.to_owned()))
1051 } else {
1052 Ok(())
1053 }
1054 }
1055
1056 pub fn version(&self) -> u64 {
1058 self.version
1059 }
1060
1061 pub fn set_version(&mut self, catalog_version: CatalogVersion) {
1063 self.version = catalog_version;
1064 }
1065
1066 pub fn table_stats(&self) -> &HummockVersionStats {
1067 &self.table_stats
1068 }
1069
1070 pub fn set_table_stats(&mut self, table_stats: HummockVersionStats) {
1071 self.table_stats = table_stats;
1072 }
1073
1074 pub fn get_all_indexes_related_to_object(
1075 &self,
1076 db_id: DatabaseId,
1077 schema_id: SchemaId,
1078 mv_id: TableId,
1079 ) -> Vec<Arc<IndexCatalog>> {
1080 self.get_database_by_id(&db_id)
1081 .unwrap()
1082 .get_schema_by_id(&schema_id)
1083 .unwrap()
1084 .get_indexes_by_table_id(&mv_id)
1085 }
1086
1087 pub fn get_id_by_class_name(
1088 &self,
1089 db_name: &str,
1090 schema_path: SchemaPath<'_>,
1091 class_name: &str,
1092 ) -> CatalogResult<u32> {
1093 schema_path
1094 .try_find(|schema_name| {
1095 let schema = self.get_schema_by_name(db_name, schema_name)?;
1096 #[allow(clippy::manual_map)]
1097 if let Some(item) = schema.get_system_table_by_name(class_name) {
1098 Ok(Some(item.id().into()))
1099 } else if let Some(item) = schema.get_created_table_by_name(class_name) {
1100 Ok(Some(item.id().into()))
1101 } else if let Some(item) = schema.get_index_by_name(class_name) {
1102 Ok(Some(item.id.into()))
1103 } else if let Some(item) = schema.get_source_by_name(class_name) {
1104 Ok(Some(item.id))
1105 } else if let Some(item) = schema.get_view_by_name(class_name) {
1106 Ok(Some(item.id))
1107 } else {
1108 Ok(None)
1109 }
1110 })?
1111 .map(|(id, _)| id)
1112 .ok_or_else(|| CatalogError::NotFound("class", class_name.to_owned()))
1113 }
1114}