1use std::collections::HashMap;
16use std::sync::Arc;
17
18use itertools::Itertools;
19use risingwave_common::catalog::{CatalogVersion, FunctionId, IndexId, StreamJobStatus, TableId};
20use risingwave_common::session_config::{SearchPath, USER_NAME_WILD_CARD};
21use risingwave_common::types::DataType;
22use risingwave_connector::sink::catalog::SinkCatalog;
23use risingwave_pb::catalog::{
24 PbConnection, PbDatabase, PbFunction, PbIndex, PbSchema, PbSecret, PbSink, PbSource,
25 PbSubscription, PbTable, PbView,
26};
27use risingwave_pb::hummock::HummockVersionStats;
28
29use super::function_catalog::FunctionCatalog;
30use super::source_catalog::SourceCatalog;
31use super::subscription_catalog::{SubscriptionCatalog, SubscriptionState};
32use super::view_catalog::ViewCatalog;
33use super::{
34 CatalogError, CatalogResult, ConnectionId, SecretId, SinkId, SourceId, SubscriptionId, ViewId,
35};
36use crate::catalog::connection_catalog::ConnectionCatalog;
37use crate::catalog::database_catalog::DatabaseCatalog;
38use crate::catalog::schema_catalog::SchemaCatalog;
39use crate::catalog::secret_catalog::SecretCatalog;
40use crate::catalog::system_catalog::{
41 SystemTableCatalog, get_sys_tables_in_schema, get_sys_views_in_schema,
42};
43use crate::catalog::table_catalog::TableCatalog;
44use crate::catalog::{DatabaseId, IndexCatalog, SchemaId};
45use crate::expr::{Expr, ExprImpl};
46
47#[derive(Copy, Clone)]
48pub enum SchemaPath<'a> {
49 Name(&'a str),
50 Path(&'a SearchPath, &'a str),
52}
53
54impl<'a> SchemaPath<'a> {
55 pub fn new(
56 schema_name: Option<&'a str>,
57 search_path: &'a SearchPath,
58 user_name: &'a str,
59 ) -> Self {
60 match schema_name {
61 Some(schema_name) => SchemaPath::Name(schema_name),
62 None => SchemaPath::Path(search_path, user_name),
63 }
64 }
65
66 pub fn try_find<T, E>(
68 &self,
69 mut f: impl FnMut(&str) -> Result<Option<T>, E>,
70 ) -> Result<Option<(T, &'a str)>, E> {
71 match self {
72 SchemaPath::Name(schema_name) => Ok(f(schema_name)?.map(|t| (t, *schema_name))),
73 SchemaPath::Path(search_path, user_name) => {
74 for schema_name in search_path.path() {
75 let mut schema_name: &str = schema_name;
76 if schema_name == USER_NAME_WILD_CARD {
77 schema_name = user_name;
78 }
79 if let Ok(Some(res)) = f(schema_name) {
80 return Ok(Some((res, schema_name)));
81 }
82 }
83 Ok(None)
84 }
85 }
86 }
87}
88
89pub struct Catalog {
101 version: CatalogVersion,
102 database_by_name: HashMap<String, DatabaseCatalog>,
103 db_name_by_id: HashMap<DatabaseId, String>,
104 table_by_id: HashMap<TableId, Arc<TableCatalog>>,
106 table_stats: HummockVersionStats,
107}
108
109#[expect(clippy::derivable_impls)]
110impl Default for Catalog {
111 fn default() -> Self {
112 Self {
113 version: 0,
114 database_by_name: HashMap::new(),
115 db_name_by_id: HashMap::new(),
116 table_by_id: HashMap::new(),
117 table_stats: HummockVersionStats::default(),
118 }
119 }
120}
121
122impl Catalog {
123 fn get_database_mut(&mut self, db_id: DatabaseId) -> Option<&mut DatabaseCatalog> {
124 let name = self.db_name_by_id.get(&db_id)?;
125 self.database_by_name.get_mut(name)
126 }
127
128 pub fn clear(&mut self) {
129 self.database_by_name.clear();
130 self.db_name_by_id.clear();
131 self.table_by_id.clear();
132 }
133
134 pub fn create_database(&mut self, db: &PbDatabase) {
135 let name = db.name.clone();
136 let id = db.id;
137
138 self.database_by_name
139 .try_insert(name.clone(), db.into())
140 .unwrap();
141 self.db_name_by_id.try_insert(id, name).unwrap();
142 }
143
144 pub fn create_schema(&mut self, proto: &PbSchema) {
145 self.get_database_mut(proto.database_id)
146 .unwrap()
147 .create_schema(proto);
148
149 for sys_table in get_sys_tables_in_schema(proto.name.as_str()) {
150 self.get_database_mut(proto.database_id)
151 .unwrap()
152 .get_schema_mut(proto.id)
153 .unwrap()
154 .create_sys_table(sys_table);
155 }
156 for mut sys_view in get_sys_views_in_schema(proto.name.as_str()) {
157 sys_view.database_id = proto.database_id;
158 sys_view.schema_id = proto.id;
159 self.get_database_mut(proto.database_id)
160 .unwrap()
161 .get_schema_mut(proto.id)
162 .unwrap()
163 .create_sys_view(Arc::new(sys_view));
164 }
165 }
166
167 pub fn create_table(&mut self, proto: &PbTable) {
168 let table = self
169 .get_database_mut(proto.database_id)
170 .unwrap()
171 .get_schema_mut(proto.schema_id)
172 .unwrap()
173 .create_table(proto);
174 self.table_by_id.insert(proto.id.into(), table);
175 }
176
177 pub fn create_index(&mut self, proto: &PbIndex) {
178 self.get_database_mut(proto.database_id)
179 .unwrap()
180 .get_schema_mut(proto.schema_id)
181 .unwrap()
182 .create_index(proto);
183 }
184
185 pub fn create_source(&mut self, proto: &PbSource) {
186 self.get_database_mut(proto.database_id)
187 .unwrap()
188 .get_schema_mut(proto.schema_id)
189 .unwrap()
190 .create_source(proto);
191 }
192
193 pub fn create_sink(&mut self, proto: &PbSink) {
194 self.get_database_mut(proto.database_id)
195 .unwrap()
196 .get_schema_mut(proto.schema_id)
197 .unwrap()
198 .create_sink(proto);
199 }
200
201 pub fn create_subscription(&mut self, proto: &PbSubscription) {
202 self.get_database_mut(proto.database_id)
203 .unwrap()
204 .get_schema_mut(proto.schema_id)
205 .unwrap()
206 .create_subscription(proto);
207 }
208
209 pub fn create_secret(&mut self, proto: &PbSecret) {
210 self.get_database_mut(proto.database_id)
211 .unwrap()
212 .get_schema_mut(proto.schema_id)
213 .unwrap()
214 .create_secret(proto);
215 }
216
217 pub fn create_view(&mut self, proto: &PbView) {
218 self.get_database_mut(proto.database_id)
219 .unwrap()
220 .get_schema_mut(proto.schema_id)
221 .unwrap()
222 .create_view(proto);
223 }
224
225 pub fn create_function(&mut self, proto: &PbFunction) {
226 self.get_database_mut(proto.database_id)
227 .unwrap()
228 .get_schema_mut(proto.schema_id)
229 .unwrap()
230 .create_function(proto);
231 }
232
233 pub fn create_connection(&mut self, proto: &PbConnection) {
234 self.get_database_mut(proto.database_id)
235 .unwrap()
236 .get_schema_mut(proto.schema_id)
237 .unwrap()
238 .create_connection(proto);
239 }
240
241 pub fn drop_connection(
242 &mut self,
243 db_id: DatabaseId,
244 schema_id: SchemaId,
245 connection_id: ConnectionId,
246 ) {
247 self.get_database_mut(db_id)
248 .unwrap()
249 .get_schema_mut(schema_id)
250 .unwrap()
251 .drop_connection(connection_id);
252 }
253
254 pub fn update_connection(&mut self, proto: &PbConnection) {
255 let database = self.get_database_mut(proto.database_id).unwrap();
256 let schema = database.get_schema_mut(proto.schema_id).unwrap();
257 if schema.get_connection_by_id(&proto.id).is_some() {
258 schema.update_connection(proto);
259 } else {
260 schema.create_connection(proto);
262 database
263 .iter_schemas_mut()
264 .find(|schema| {
265 schema.id() != proto.schema_id
266 && schema.get_connection_by_id(&proto.id).is_some()
267 })
268 .unwrap()
269 .drop_connection(proto.id);
270 }
271 }
272
273 pub fn update_secret(&mut self, proto: &PbSecret) {
274 let database = self.get_database_mut(proto.database_id).unwrap();
275 let schema = database.get_schema_mut(proto.schema_id).unwrap();
276 let secret_id = SecretId::new(proto.id);
277 if schema.get_secret_by_id(&secret_id).is_some() {
278 schema.update_secret(proto);
279 } else {
280 schema.create_secret(proto);
282 database
283 .iter_schemas_mut()
284 .find(|schema| {
285 schema.id() != proto.schema_id && schema.get_secret_by_id(&secret_id).is_some()
286 })
287 .unwrap()
288 .drop_secret(secret_id);
289 }
290 }
291
292 pub fn drop_database(&mut self, db_id: DatabaseId) {
293 let name = self.db_name_by_id.remove(&db_id).unwrap();
294 let database = self.database_by_name.remove(&name).unwrap();
295 database.iter_all_table_ids().for_each(|table| {
296 self.table_by_id.remove(&table);
297 });
298 }
299
300 pub fn drop_schema(&mut self, db_id: DatabaseId, schema_id: SchemaId) {
301 self.get_database_mut(db_id).unwrap().drop_schema(schema_id);
302 }
303
304 pub fn drop_table(&mut self, db_id: DatabaseId, schema_id: SchemaId, tb_id: TableId) {
305 self.table_by_id.remove(&tb_id);
306 self.get_database_mut(db_id)
307 .unwrap()
308 .get_schema_mut(schema_id)
309 .unwrap()
310 .drop_table(tb_id);
311 }
312
313 pub fn update_table(&mut self, proto: &PbTable) {
314 let database = self.get_database_mut(proto.database_id).unwrap();
315 let schema = database.get_schema_mut(proto.schema_id).unwrap();
316 let table = if schema.get_table_by_id(&proto.id.into()).is_some() {
317 schema.update_table(proto)
318 } else {
319 let new_table = schema.create_table(proto);
321 database
322 .iter_schemas_mut()
323 .find(|schema| {
324 schema.id() != proto.schema_id
325 && schema.get_created_table_by_id(&proto.id.into()).is_some()
326 })
327 .unwrap()
328 .drop_table(proto.id.into());
329 new_table
330 };
331
332 self.table_by_id.insert(proto.id.into(), table);
333 }
334
335 pub fn update_database(&mut self, proto: &PbDatabase) {
336 let id = proto.id;
337 let name = proto.name.clone();
338
339 let old_database_name = self.db_name_by_id.get(&id).unwrap().to_owned();
340 if old_database_name != name {
341 let mut database = self.database_by_name.remove(&old_database_name).unwrap();
342 database.name.clone_from(&name);
343 database.owner = proto.owner;
344 self.database_by_name.insert(name.clone(), database);
345 self.db_name_by_id.insert(id, name);
346 } else {
347 let database = self.get_database_mut(id).unwrap();
348 database.name = name;
349 database.owner = proto.owner;
350 }
351 }
352
353 pub fn update_schema(&mut self, proto: &PbSchema) {
354 self.get_database_mut(proto.database_id)
355 .unwrap()
356 .update_schema(proto);
357 }
358
359 pub fn update_index(&mut self, proto: &PbIndex) {
360 let database = self.get_database_mut(proto.database_id).unwrap();
361 let schema = database.get_schema_mut(proto.schema_id).unwrap();
362 if schema.get_index_by_id(&proto.id.into()).is_some() {
363 schema.update_index(proto);
364 } else {
365 schema.create_index(proto);
367 database
368 .iter_schemas_mut()
369 .find(|schema| {
370 schema.id() != proto.schema_id
371 && schema.get_index_by_id(&proto.id.into()).is_some()
372 })
373 .unwrap()
374 .drop_index(proto.id.into());
375 }
376 }
377
378 pub fn drop_source(&mut self, db_id: DatabaseId, schema_id: SchemaId, source_id: SourceId) {
379 self.get_database_mut(db_id)
380 .unwrap()
381 .get_schema_mut(schema_id)
382 .unwrap()
383 .drop_source(source_id);
384 }
385
386 pub fn update_source(&mut self, proto: &PbSource) {
387 let database = self.get_database_mut(proto.database_id).unwrap();
388 let schema = database.get_schema_mut(proto.schema_id).unwrap();
389 if schema.get_source_by_id(&proto.id).is_some() {
390 schema.update_source(proto);
391 } else {
392 schema.create_source(proto);
394 database
395 .iter_schemas_mut()
396 .find(|schema| {
397 schema.id() != proto.schema_id && schema.get_source_by_id(&proto.id).is_some()
398 })
399 .unwrap()
400 .drop_source(proto.id);
401 }
402 }
403
404 pub fn drop_sink(&mut self, db_id: DatabaseId, schema_id: SchemaId, sink_id: SinkId) {
405 self.get_database_mut(db_id)
406 .unwrap()
407 .get_schema_mut(schema_id)
408 .unwrap()
409 .drop_sink(sink_id);
410 }
411
412 pub fn drop_secret(&mut self, db_id: DatabaseId, schema_id: SchemaId, secret_id: SecretId) {
413 self.get_database_mut(db_id)
414 .unwrap()
415 .get_schema_mut(schema_id)
416 .unwrap()
417 .drop_secret(secret_id);
418 }
419
420 pub fn update_sink(&mut self, proto: &PbSink) {
421 let database = self.get_database_mut(proto.database_id).unwrap();
422 let schema = database.get_schema_mut(proto.schema_id).unwrap();
423 if schema.get_sink_by_id(&proto.id).is_some() {
424 schema.update_sink(proto);
425 } else {
426 schema.create_sink(proto);
428 database
429 .iter_schemas_mut()
430 .find(|schema| {
431 schema.id() != proto.schema_id && schema.get_sink_by_id(&proto.id).is_some()
432 })
433 .unwrap()
434 .drop_sink(proto.id);
435 }
436 }
437
438 pub fn drop_subscription(
439 &mut self,
440 db_id: DatabaseId,
441 schema_id: SchemaId,
442 subscription_id: SubscriptionId,
443 ) {
444 self.get_database_mut(db_id)
445 .unwrap()
446 .get_schema_mut(schema_id)
447 .unwrap()
448 .drop_subscription(subscription_id);
449 }
450
451 pub fn update_subscription(&mut self, proto: &PbSubscription) {
452 let database = self.get_database_mut(proto.database_id).unwrap();
453 let schema = database.get_schema_mut(proto.schema_id).unwrap();
454 if schema.get_subscription_by_id(&proto.id).is_some() {
455 schema.update_subscription(proto);
456 } else {
457 schema.create_subscription(proto);
459 database
460 .iter_schemas_mut()
461 .find(|schema| {
462 schema.id() != proto.schema_id
463 && schema.get_subscription_by_id(&proto.id).is_some()
464 })
465 .unwrap()
466 .drop_subscription(proto.id);
467 }
468 }
469
470 pub fn drop_index(&mut self, db_id: DatabaseId, schema_id: SchemaId, index_id: IndexId) {
471 self.get_database_mut(db_id)
472 .unwrap()
473 .get_schema_mut(schema_id)
474 .unwrap()
475 .drop_index(index_id);
476 }
477
478 pub fn drop_view(&mut self, db_id: DatabaseId, schema_id: SchemaId, view_id: ViewId) {
479 self.get_database_mut(db_id)
480 .unwrap()
481 .get_schema_mut(schema_id)
482 .unwrap()
483 .drop_view(view_id);
484 }
485
486 pub fn update_view(&mut self, proto: &PbView) {
487 let database = self.get_database_mut(proto.database_id).unwrap();
488 let schema = database.get_schema_mut(proto.schema_id).unwrap();
489 if schema.get_view_by_id(&proto.id).is_some() {
490 schema.update_view(proto);
491 } else {
492 schema.create_view(proto);
494 database
495 .iter_schemas_mut()
496 .find(|schema| {
497 schema.id() != proto.schema_id && schema.get_view_by_id(&proto.id).is_some()
498 })
499 .unwrap()
500 .drop_view(proto.id);
501 }
502 }
503
504 pub fn drop_function(
505 &mut self,
506 db_id: DatabaseId,
507 schema_id: SchemaId,
508 function_id: FunctionId,
509 ) {
510 self.get_database_mut(db_id)
511 .unwrap()
512 .get_schema_mut(schema_id)
513 .unwrap()
514 .drop_function(function_id);
515 }
516
517 pub fn update_function(&mut self, proto: &PbFunction) {
518 let database = self.get_database_mut(proto.database_id).unwrap();
519 let schema = database.get_schema_mut(proto.schema_id).unwrap();
520 if schema.get_function_by_id(proto.id.into()).is_some() {
521 schema.update_function(proto);
522 } else {
523 schema.create_function(proto);
525 database
526 .iter_schemas_mut()
527 .find(|schema| {
528 schema.id() != proto.schema_id
529 && schema.get_function_by_id(proto.id.into()).is_some()
530 })
531 .unwrap()
532 .drop_function(proto.id.into());
533 }
534
535 self.get_database_mut(proto.database_id)
536 .unwrap()
537 .get_schema_mut(proto.schema_id)
538 .unwrap()
539 .update_function(proto);
540 }
541
542 pub fn get_database_by_name(&self, db_name: &str) -> CatalogResult<&DatabaseCatalog> {
543 self.database_by_name
544 .get(db_name)
545 .ok_or_else(|| CatalogError::NotFound("database", db_name.to_owned()))
546 }
547
548 pub fn get_database_by_id(&self, db_id: &DatabaseId) -> CatalogResult<&DatabaseCatalog> {
549 let db_name = self
550 .db_name_by_id
551 .get(db_id)
552 .ok_or_else(|| CatalogError::NotFound("db_id", db_id.to_string()))?;
553 self.database_by_name
554 .get(db_name)
555 .ok_or_else(|| CatalogError::NotFound("database", db_name.to_string()))
556 }
557
558 pub fn get_all_schema_names(&self, db_name: &str) -> CatalogResult<Vec<String>> {
559 Ok(self.get_database_by_name(db_name)?.get_all_schema_names())
560 }
561
562 pub fn iter_schemas(
563 &self,
564 db_name: &str,
565 ) -> CatalogResult<impl Iterator<Item = &SchemaCatalog>> {
566 Ok(self.get_database_by_name(db_name)?.iter_schemas())
567 }
568
569 pub fn get_all_database_names(&self) -> Vec<String> {
570 self.database_by_name.keys().cloned().collect_vec()
571 }
572
573 pub fn iter_databases(&self) -> impl Iterator<Item = &DatabaseCatalog> {
574 self.database_by_name.values()
575 }
576
577 pub fn get_schema_by_name(
578 &self,
579 db_name: &str,
580 schema_name: &str,
581 ) -> CatalogResult<&SchemaCatalog> {
582 self.get_database_by_name(db_name)?
583 .get_schema_by_name(schema_name)
584 .ok_or_else(|| CatalogError::NotFound("schema", schema_name.to_owned()))
585 }
586
587 pub fn get_table_name_by_id(&self, table_id: TableId) -> CatalogResult<String> {
588 self.get_any_table_by_id(&table_id)
589 .map(|table| table.name.clone())
590 }
591
592 pub fn get_schema_by_id(
593 &self,
594 db_id: &DatabaseId,
595 schema_id: &SchemaId,
596 ) -> CatalogResult<&SchemaCatalog> {
597 self.get_database_by_id(db_id)?
598 .get_schema_by_id(schema_id)
599 .ok_or_else(|| CatalogError::NotFound("schema_id", schema_id.to_string()))
600 }
601
602 pub fn first_valid_schema(
604 &self,
605 db_name: &str,
606 search_path: &SearchPath,
607 user_name: &str,
608 ) -> CatalogResult<&SchemaCatalog> {
609 for path in search_path.real_path() {
610 let mut schema_name: &str = path;
611 if schema_name == USER_NAME_WILD_CARD {
612 schema_name = user_name;
613 }
614
615 if let schema_catalog @ Ok(_) = self.get_schema_by_name(db_name, schema_name) {
616 return schema_catalog;
617 }
618 }
619 Err(CatalogError::NotFound(
620 "first valid schema",
621 "no schema has been selected to create in".to_owned(),
622 ))
623 }
624
625 pub fn get_source_by_id<'a>(
626 &self,
627 db_name: &'a str,
628 schema_path: SchemaPath<'a>,
629 source_id: &SourceId,
630 ) -> CatalogResult<(&Arc<SourceCatalog>, &'a str)> {
631 schema_path
632 .try_find(|schema_name| {
633 Ok(self
634 .get_schema_by_name(db_name, schema_name)?
635 .get_source_by_id(source_id))
636 })?
637 .ok_or_else(|| CatalogError::NotFound("source", source_id.to_string()))
638 }
639
640 pub fn get_any_table_by_name<'a>(
643 &self,
644 db_name: &str,
645 schema_path: SchemaPath<'a>,
646 table_name: &str,
647 ) -> CatalogResult<(&Arc<TableCatalog>, &'a str)> {
648 schema_path
649 .try_find(|schema_name| {
650 Ok(self
651 .get_schema_by_name(db_name, schema_name)?
652 .get_table_by_name(table_name))
653 })?
654 .ok_or_else(|| CatalogError::NotFound("table", table_name.to_owned()))
655 }
656
657 pub fn get_created_table_by_name<'a>(
660 &self,
661 db_name: &str,
662 schema_path: SchemaPath<'a>,
663 table_name: &str,
664 ) -> CatalogResult<(&Arc<TableCatalog>, &'a str)> {
665 schema_path
666 .try_find(|schema_name| {
667 Ok(self
668 .get_schema_by_name(db_name, schema_name)?
669 .get_created_table_by_name(table_name))
670 })?
671 .ok_or_else(|| CatalogError::NotFound("table", table_name.to_owned()))
672 }
673
674 pub fn get_any_table_by_id(&self, table_id: &TableId) -> CatalogResult<&Arc<TableCatalog>> {
675 self.table_by_id
676 .get(table_id)
677 .ok_or_else(|| CatalogError::NotFound("table id", table_id.to_string()))
678 }
679
680 pub fn get_created_table_by_id_with_db(
682 &self,
683 db_name: &str,
684 table_id: u32,
685 ) -> CatalogResult<&Arc<TableCatalog>> {
686 let table_id = TableId::from(table_id);
687 for schema in self.get_database_by_name(db_name)?.iter_schemas() {
688 if let Some(table) = schema.get_created_table_by_id(&table_id) {
689 return Ok(table);
690 }
691 }
692 Err(CatalogError::NotFound("table id", table_id.to_string()))
693 }
694
695 pub fn alter_table_name_by_id(&mut self, table_id: &TableId, table_name: &str) {
697 let mut found = false;
698 for database in self.database_by_name.values() {
699 if !found {
700 for schema in database.iter_schemas() {
701 if schema.iter_user_table().any(|t| t.id() == *table_id) {
702 found = true;
703 break;
704 }
705 }
706 }
707 }
708
709 if found {
710 let mut table = self.get_any_table_by_id(table_id).unwrap().to_prost();
711 table.name = table_name.to_owned();
712 self.update_table(&table);
713 }
714 }
715
716 #[cfg(test)]
717 pub fn insert_table_id_mapping(&mut self, table_id: TableId, fragment_id: super::FragmentId) {
718 self.table_by_id.insert(
719 table_id,
720 Arc::new(TableCatalog {
721 fragment_id,
722 ..Default::default()
723 }),
724 );
725 }
726
727 pub fn get_sys_table_by_name(
728 &self,
729 db_name: &str,
730 schema_name: &str,
731 table_name: &str,
732 ) -> CatalogResult<&Arc<SystemTableCatalog>> {
733 self.get_schema_by_name(db_name, schema_name)?
734 .get_system_table_by_name(table_name)
735 .ok_or_else(|| CatalogError::NotFound("table", table_name.to_owned()))
736 }
737
738 pub fn get_source_by_name<'a>(
739 &self,
740 db_name: &str,
741 schema_path: SchemaPath<'a>,
742 source_name: &str,
743 ) -> CatalogResult<(&Arc<SourceCatalog>, &'a str)> {
744 schema_path
745 .try_find(|schema_name| {
746 Ok(self
747 .get_schema_by_name(db_name, schema_name)?
748 .get_source_by_name(source_name))
749 })?
750 .ok_or_else(|| CatalogError::NotFound("source", source_name.to_owned()))
751 }
752
753 pub fn get_sink_by_name<'a>(
754 &self,
755 db_name: &str,
756 schema_path: SchemaPath<'a>,
757 sink_name: &str,
758 ) -> CatalogResult<(&Arc<SinkCatalog>, &'a str)> {
759 schema_path
760 .try_find(|schema_name| {
761 Ok(self
762 .get_schema_by_name(db_name, schema_name)?
763 .get_sink_by_name(sink_name))
764 })?
765 .ok_or_else(|| CatalogError::NotFound("sink", sink_name.to_owned()))
766 }
767
768 pub fn get_subscription_by_name<'a>(
769 &self,
770 db_name: &str,
771 schema_path: SchemaPath<'a>,
772 subscription_name: &str,
773 ) -> CatalogResult<(&Arc<SubscriptionCatalog>, &'a str)> {
774 schema_path
775 .try_find(|schema_name| {
776 Ok(self
777 .get_schema_by_name(db_name, schema_name)?
778 .get_subscription_by_name(subscription_name))
779 })?
780 .ok_or_else(|| CatalogError::NotFound("subscription", subscription_name.to_owned()))
781 }
782
783 pub fn get_index_by_name<'a>(
784 &self,
785 db_name: &str,
786 schema_path: SchemaPath<'a>,
787 index_name: &str,
788 ) -> CatalogResult<(&Arc<IndexCatalog>, &'a str)> {
789 schema_path
790 .try_find(|schema_name| {
791 Ok(self
792 .get_schema_by_name(db_name, schema_name)?
793 .get_index_by_name(index_name))
794 })?
795 .ok_or_else(|| CatalogError::NotFound("index", index_name.to_owned()))
796 }
797
798 pub fn get_index_by_id(
799 &self,
800 db_name: &str,
801 index_id: u32,
802 ) -> CatalogResult<&Arc<IndexCatalog>> {
803 let index_id = IndexId::from(index_id);
804 for schema in self.get_database_by_name(db_name)?.iter_schemas() {
805 if let Some(index) = schema.get_index_by_id(&index_id) {
806 return Ok(index);
807 }
808 }
809 Err(CatalogError::NotFound("index", index_id.to_string()))
810 }
811
812 pub fn get_view_by_name<'a>(
813 &self,
814 db_name: &str,
815 schema_path: SchemaPath<'a>,
816 view_name: &str,
817 ) -> CatalogResult<(&Arc<ViewCatalog>, &'a str)> {
818 schema_path
819 .try_find(|schema_name| {
820 Ok(self
821 .get_schema_by_name(db_name, schema_name)?
822 .get_view_by_name(view_name))
823 })?
824 .ok_or_else(|| CatalogError::NotFound("view", view_name.to_owned()))
825 }
826
827 pub fn get_view_by_id(&self, db_name: &str, view_id: u32) -> CatalogResult<Arc<ViewCatalog>> {
828 for schema in self.get_database_by_name(db_name)?.iter_schemas() {
829 if let Some(view) = schema.get_view_by_id(&ViewId::from(view_id)) {
830 return Ok(view.clone());
831 }
832 }
833 Err(CatalogError::NotFound("view", view_id.to_string()))
834 }
835
836 pub fn get_secret_by_name<'a>(
837 &self,
838 db_name: &str,
839 schema_path: SchemaPath<'a>,
840 secret_name: &str,
841 ) -> CatalogResult<(&Arc<SecretCatalog>, &'a str)> {
842 schema_path
843 .try_find(|schema_name| {
844 Ok(self
845 .get_schema_by_name(db_name, schema_name)?
846 .get_secret_by_name(secret_name))
847 })?
848 .ok_or_else(|| CatalogError::NotFound("secret", secret_name.to_owned()))
849 }
850
851 pub fn get_connection_by_name<'a>(
852 &self,
853 db_name: &str,
854 schema_path: SchemaPath<'a>,
855 connection_name: &str,
856 ) -> CatalogResult<(&Arc<ConnectionCatalog>, &'a str)> {
857 schema_path
858 .try_find(|schema_name| {
859 Ok(self
860 .get_schema_by_name(db_name, schema_name)?
861 .get_connection_by_name(connection_name))
862 })?
863 .ok_or_else(|| CatalogError::NotFound("connection", connection_name.to_owned()))
864 }
865
866 pub fn get_function_by_name_inputs<'a>(
867 &self,
868 db_name: &str,
869 schema_path: SchemaPath<'a>,
870 function_name: &str,
871 inputs: &mut [ExprImpl],
872 ) -> CatalogResult<(&Arc<FunctionCatalog>, &'a str)> {
873 schema_path
874 .try_find(|schema_name| {
875 Ok(self
876 .get_schema_by_name(db_name, schema_name)?
877 .get_function_by_name_inputs(function_name, inputs))
878 })?
879 .ok_or_else(|| {
880 CatalogError::NotFound(
881 "function",
882 format!(
883 "{}({})",
884 function_name,
885 inputs
886 .iter()
887 .map(|a| a.return_type().to_string())
888 .join(", ")
889 ),
890 )
891 })
892 }
893
894 pub fn get_function_by_name_args<'a>(
895 &self,
896 db_name: &str,
897 schema_path: SchemaPath<'a>,
898 function_name: &str,
899 args: &[DataType],
900 ) -> CatalogResult<(&Arc<FunctionCatalog>, &'a str)> {
901 schema_path
902 .try_find(|schema_name| {
903 Ok(self
904 .get_schema_by_name(db_name, schema_name)?
905 .get_function_by_name_args(function_name, args))
906 })?
907 .ok_or_else(|| {
908 CatalogError::NotFound(
909 "function",
910 format!(
911 "{}({})",
912 function_name,
913 args.iter().map(|a| a.to_string()).join(", ")
914 ),
915 )
916 })
917 }
918
919 pub fn get_functions_by_name<'a>(
921 &self,
922 db_name: &str,
923 schema_path: SchemaPath<'a>,
924 function_name: &str,
925 ) -> CatalogResult<(Vec<&Arc<FunctionCatalog>>, &'a str)> {
926 schema_path
927 .try_find(|schema_name| {
928 Ok(self
929 .get_schema_by_name(db_name, schema_name)?
930 .get_functions_by_name(function_name))
931 })?
932 .ok_or_else(|| CatalogError::NotFound("function", function_name.to_owned()))
933 }
934
935 pub fn check_relation_name_duplicated(
937 &self,
938 db_name: &str,
939 schema_name: &str,
940 relation_name: &str,
941 ) -> CatalogResult<()> {
942 let schema = self.get_schema_by_name(db_name, schema_name)?;
943
944 if let Some(table) = schema.get_table_by_name(relation_name) {
945 let is_creating = table.stream_job_status == StreamJobStatus::Creating;
946 if table.is_index() {
947 Err(CatalogError::Duplicated(
948 "index",
949 relation_name.to_owned(),
950 is_creating,
951 ))
952 } else if table.is_mview() {
953 Err(CatalogError::Duplicated(
954 "materialized view",
955 relation_name.to_owned(),
956 is_creating,
957 ))
958 } else {
959 Err(CatalogError::Duplicated(
960 "table",
961 relation_name.to_owned(),
962 is_creating,
963 ))
964 }
965 } else if schema.get_source_by_name(relation_name).is_some() {
966 Err(CatalogError::duplicated("source", relation_name.to_owned()))
967 } else if schema.get_sink_by_name(relation_name).is_some() {
968 Err(CatalogError::duplicated("sink", relation_name.to_owned()))
969 } else if schema.get_view_by_name(relation_name).is_some() {
970 Err(CatalogError::duplicated("view", relation_name.to_owned()))
971 } else if let Some(subscription) = schema.get_subscription_by_name(relation_name) {
972 let is_not_created = subscription.subscription_state != SubscriptionState::Created;
973 Err(CatalogError::Duplicated(
974 "subscription",
975 relation_name.to_owned(),
976 is_not_created,
977 ))
978 } else {
979 Ok(())
980 }
981 }
982
983 pub fn check_function_name_duplicated(
984 &self,
985 db_name: &str,
986 schema_name: &str,
987 function_name: &str,
988 arg_types: &[DataType],
989 ) -> CatalogResult<()> {
990 let schema = self.get_schema_by_name(db_name, schema_name)?;
991
992 if schema
993 .get_function_by_name_args(function_name, arg_types)
994 .is_some()
995 {
996 let name = format!(
997 "{function_name}({})",
998 arg_types.iter().map(|t| t.to_string()).join(",")
999 );
1000 Err(CatalogError::duplicated("function", name))
1001 } else {
1002 Ok(())
1003 }
1004 }
1005
1006 pub fn check_connection_name_duplicated(
1008 &self,
1009 db_name: &str,
1010 schema_name: &str,
1011 connection_name: &str,
1012 ) -> CatalogResult<()> {
1013 let schema = self.get_schema_by_name(db_name, schema_name)?;
1014
1015 if schema.get_connection_by_name(connection_name).is_some() {
1016 Err(CatalogError::duplicated(
1017 "connection",
1018 connection_name.to_owned(),
1019 ))
1020 } else {
1021 Ok(())
1022 }
1023 }
1024
1025 pub fn check_secret_name_duplicated(
1026 &self,
1027 db_name: &str,
1028 schema_name: &str,
1029 secret_name: &str,
1030 ) -> CatalogResult<()> {
1031 let schema = self.get_schema_by_name(db_name, schema_name)?;
1032
1033 if schema.get_secret_by_name(secret_name).is_some() {
1034 Err(CatalogError::duplicated("secret", secret_name.to_owned()))
1035 } else {
1036 Ok(())
1037 }
1038 }
1039
1040 pub fn version(&self) -> u64 {
1042 self.version
1043 }
1044
1045 pub fn set_version(&mut self, catalog_version: CatalogVersion) {
1047 self.version = catalog_version;
1048 }
1049
1050 pub fn table_stats(&self) -> &HummockVersionStats {
1051 &self.table_stats
1052 }
1053
1054 pub fn set_table_stats(&mut self, table_stats: HummockVersionStats) {
1055 self.table_stats = table_stats;
1056 }
1057
1058 pub fn get_all_indexes_related_to_object(
1059 &self,
1060 db_id: DatabaseId,
1061 schema_id: SchemaId,
1062 mv_id: TableId,
1063 ) -> Vec<Arc<IndexCatalog>> {
1064 self.get_database_by_id(&db_id)
1065 .unwrap()
1066 .get_schema_by_id(&schema_id)
1067 .unwrap()
1068 .get_indexes_by_table_id(&mv_id)
1069 }
1070
1071 pub fn get_id_by_class_name(
1072 &self,
1073 db_name: &str,
1074 schema_path: SchemaPath<'_>,
1075 class_name: &str,
1076 ) -> CatalogResult<u32> {
1077 schema_path
1078 .try_find(|schema_name| {
1079 let schema = self.get_schema_by_name(db_name, schema_name)?;
1080 #[allow(clippy::manual_map)]
1081 if let Some(item) = schema.get_system_table_by_name(class_name) {
1082 Ok(Some(item.id().into()))
1083 } else if let Some(item) = schema.get_created_table_by_name(class_name) {
1084 Ok(Some(item.id().into()))
1085 } else if let Some(item) = schema.get_index_by_name(class_name) {
1086 Ok(Some(item.id.into()))
1087 } else if let Some(item) = schema.get_source_by_name(class_name) {
1088 Ok(Some(item.id))
1089 } else if let Some(item) = schema.get_view_by_name(class_name) {
1090 Ok(Some(item.id))
1091 } else {
1092 Ok(None)
1093 }
1094 })?
1095 .map(|(id, _)| id)
1096 .ok_or_else(|| CatalogError::NotFound("class", class_name.to_owned()))
1097 }
1098}