1use std::collections::HashMap;
16use std::sync::Arc;
17
18use itertools::Itertools;
19use risingwave_common::catalog::{CatalogVersion, FunctionId, IndexId, StreamJobStatus, TableId};
20use risingwave_common::session_config::{SearchPath, USER_NAME_WILD_CARD};
21use risingwave_common::types::DataType;
22use risingwave_connector::sink::catalog::SinkCatalog;
23use risingwave_pb::catalog::{
24 PbConnection, PbDatabase, PbFunction, PbIndex, PbSchema, PbSecret, PbSink, PbSource,
25 PbSubscription, PbTable, PbView,
26};
27use risingwave_pb::hummock::HummockVersionStats;
28
29use super::function_catalog::FunctionCatalog;
30use super::source_catalog::SourceCatalog;
31use super::subscription_catalog::SubscriptionCatalog;
32use super::view_catalog::ViewCatalog;
33use super::{
34 CatalogError, CatalogResult, ConnectionId, SecretId, SinkId, SourceId, SubscriptionId, ViewId,
35};
36use crate::catalog::connection_catalog::ConnectionCatalog;
37use crate::catalog::database_catalog::DatabaseCatalog;
38use crate::catalog::schema_catalog::SchemaCatalog;
39use crate::catalog::secret_catalog::SecretCatalog;
40use crate::catalog::system_catalog::{
41 SystemTableCatalog, get_sys_tables_in_schema, get_sys_views_in_schema,
42};
43use crate::catalog::table_catalog::TableCatalog;
44use crate::catalog::{DatabaseId, IndexCatalog, SchemaId};
45use crate::expr::{Expr, ExprImpl};
46
47#[derive(Copy, Clone)]
48pub enum SchemaPath<'a> {
49 Name(&'a str),
50 Path(&'a SearchPath, &'a str),
52}
53
54impl<'a> SchemaPath<'a> {
55 pub fn new(
56 schema_name: Option<&'a str>,
57 search_path: &'a SearchPath,
58 user_name: &'a str,
59 ) -> Self {
60 match schema_name {
61 Some(schema_name) => SchemaPath::Name(schema_name),
62 None => SchemaPath::Path(search_path, user_name),
63 }
64 }
65
66 pub fn try_find<T, E>(
68 &self,
69 mut f: impl FnMut(&str) -> Result<Option<T>, E>,
70 ) -> Result<Option<(T, &'a str)>, E> {
71 match self {
72 SchemaPath::Name(schema_name) => Ok(f(schema_name)?.map(|t| (t, *schema_name))),
73 SchemaPath::Path(search_path, user_name) => {
74 for schema_name in search_path.path() {
75 let mut schema_name: &str = schema_name;
76 if schema_name == USER_NAME_WILD_CARD {
77 schema_name = user_name;
78 }
79 if let Ok(Some(res)) = f(schema_name) {
80 return Ok(Some((res, schema_name)));
81 }
82 }
83 Ok(None)
84 }
85 }
86 }
87}
88
89pub struct Catalog {
101 version: CatalogVersion,
102 database_by_name: HashMap<String, DatabaseCatalog>,
103 db_name_by_id: HashMap<DatabaseId, String>,
104 table_by_id: HashMap<TableId, Arc<TableCatalog>>,
106 table_stats: HummockVersionStats,
107}
108
109#[expect(clippy::derivable_impls)]
110impl Default for Catalog {
111 fn default() -> Self {
112 Self {
113 version: 0,
114 database_by_name: HashMap::new(),
115 db_name_by_id: HashMap::new(),
116 table_by_id: HashMap::new(),
117 table_stats: HummockVersionStats::default(),
118 }
119 }
120}
121
122impl Catalog {
123 fn get_database_mut(&mut self, db_id: DatabaseId) -> Option<&mut DatabaseCatalog> {
124 let name = self.db_name_by_id.get(&db_id)?;
125 self.database_by_name.get_mut(name)
126 }
127
128 pub fn clear(&mut self) {
129 self.database_by_name.clear();
130 self.db_name_by_id.clear();
131 self.table_by_id.clear();
132 }
133
134 pub fn create_database(&mut self, db: &PbDatabase) {
135 let name = db.name.clone();
136 let id = db.id;
137
138 self.database_by_name
139 .try_insert(name.clone(), db.into())
140 .unwrap();
141 self.db_name_by_id.try_insert(id, name).unwrap();
142 }
143
144 pub fn create_schema(&mut self, proto: &PbSchema) {
145 self.get_database_mut(proto.database_id)
146 .unwrap()
147 .create_schema(proto);
148
149 for sys_table in get_sys_tables_in_schema(proto.name.as_str()) {
150 self.get_database_mut(proto.database_id)
151 .unwrap()
152 .get_schema_mut(proto.id)
153 .unwrap()
154 .create_sys_table(sys_table);
155 }
156 for mut sys_view in get_sys_views_in_schema(proto.name.as_str()) {
157 sys_view.database_id = proto.database_id;
158 sys_view.schema_id = proto.id;
159 self.get_database_mut(proto.database_id)
160 .unwrap()
161 .get_schema_mut(proto.id)
162 .unwrap()
163 .create_sys_view(Arc::new(sys_view));
164 }
165 }
166
167 pub fn create_table(&mut self, proto: &PbTable) {
168 let table = self
169 .get_database_mut(proto.database_id)
170 .unwrap()
171 .get_schema_mut(proto.schema_id)
172 .unwrap()
173 .create_table(proto);
174 self.table_by_id.insert(proto.id.into(), table);
175 }
176
177 pub fn create_index(&mut self, proto: &PbIndex) {
178 self.get_database_mut(proto.database_id)
179 .unwrap()
180 .get_schema_mut(proto.schema_id)
181 .unwrap()
182 .create_index(proto);
183 }
184
185 pub fn create_source(&mut self, proto: &PbSource) {
186 self.get_database_mut(proto.database_id)
187 .unwrap()
188 .get_schema_mut(proto.schema_id)
189 .unwrap()
190 .create_source(proto);
191 }
192
193 pub fn create_sink(&mut self, proto: &PbSink) {
194 self.get_database_mut(proto.database_id)
195 .unwrap()
196 .get_schema_mut(proto.schema_id)
197 .unwrap()
198 .create_sink(proto);
199 }
200
201 pub fn create_subscription(&mut self, proto: &PbSubscription) {
202 self.get_database_mut(proto.database_id)
203 .unwrap()
204 .get_schema_mut(proto.schema_id)
205 .unwrap()
206 .create_subscription(proto);
207 }
208
209 pub fn create_secret(&mut self, proto: &PbSecret) {
210 self.get_database_mut(proto.database_id)
211 .unwrap()
212 .get_schema_mut(proto.schema_id)
213 .unwrap()
214 .create_secret(proto);
215 }
216
217 pub fn create_view(&mut self, proto: &PbView) {
218 self.get_database_mut(proto.database_id)
219 .unwrap()
220 .get_schema_mut(proto.schema_id)
221 .unwrap()
222 .create_view(proto);
223 }
224
225 pub fn create_function(&mut self, proto: &PbFunction) {
226 self.get_database_mut(proto.database_id)
227 .unwrap()
228 .get_schema_mut(proto.schema_id)
229 .unwrap()
230 .create_function(proto);
231 }
232
233 pub fn create_connection(&mut self, proto: &PbConnection) {
234 self.get_database_mut(proto.database_id)
235 .unwrap()
236 .get_schema_mut(proto.schema_id)
237 .unwrap()
238 .create_connection(proto);
239 }
240
241 pub fn drop_connection(
242 &mut self,
243 db_id: DatabaseId,
244 schema_id: SchemaId,
245 connection_id: ConnectionId,
246 ) {
247 self.get_database_mut(db_id)
248 .unwrap()
249 .get_schema_mut(schema_id)
250 .unwrap()
251 .drop_connection(connection_id);
252 }
253
254 pub fn update_connection(&mut self, proto: &PbConnection) {
255 let database = self.get_database_mut(proto.database_id).unwrap();
256 let schema = database.get_schema_mut(proto.schema_id).unwrap();
257 if schema.get_connection_by_id(&proto.id).is_some() {
258 schema.update_connection(proto);
259 } else {
260 schema.create_connection(proto);
262 database
263 .iter_schemas_mut()
264 .find(|schema| {
265 schema.id() != proto.schema_id
266 && schema.get_connection_by_id(&proto.id).is_some()
267 })
268 .unwrap()
269 .drop_connection(proto.id);
270 }
271 }
272
273 pub fn update_secret(&mut self, proto: &PbSecret) {
274 let database = self.get_database_mut(proto.database_id).unwrap();
275 let schema = database.get_schema_mut(proto.schema_id).unwrap();
276 let secret_id = SecretId::new(proto.id);
277 if schema.get_secret_by_id(&secret_id).is_some() {
278 schema.update_secret(proto);
279 } else {
280 schema.create_secret(proto);
282 database
283 .iter_schemas_mut()
284 .find(|schema| {
285 schema.id() != proto.schema_id && schema.get_secret_by_id(&secret_id).is_some()
286 })
287 .unwrap()
288 .drop_secret(secret_id);
289 }
290 }
291
292 pub fn drop_database(&mut self, db_id: DatabaseId) {
293 let name = self.db_name_by_id.remove(&db_id).unwrap();
294 let database = self.database_by_name.remove(&name).unwrap();
295 database.iter_all_table_ids().for_each(|table| {
296 self.table_by_id.remove(&table);
297 });
298 }
299
300 pub fn drop_schema(&mut self, db_id: DatabaseId, schema_id: SchemaId) {
301 self.get_database_mut(db_id).unwrap().drop_schema(schema_id);
302 }
303
304 pub fn drop_table(&mut self, db_id: DatabaseId, schema_id: SchemaId, tb_id: TableId) {
305 self.table_by_id.remove(&tb_id);
306 self.get_database_mut(db_id)
307 .unwrap()
308 .get_schema_mut(schema_id)
309 .unwrap()
310 .drop_table(tb_id);
311 }
312
313 pub fn update_table(&mut self, proto: &PbTable) {
314 let database = self.get_database_mut(proto.database_id).unwrap();
315 let schema = database.get_schema_mut(proto.schema_id).unwrap();
316 let table = if schema.get_table_by_id(&proto.id.into()).is_some() {
317 schema.update_table(proto)
318 } else {
319 let new_table = schema.create_table(proto);
321 database
322 .iter_schemas_mut()
323 .find(|schema| {
324 schema.id() != proto.schema_id
325 && schema.get_created_table_by_id(&proto.id.into()).is_some()
326 })
327 .unwrap()
328 .drop_table(proto.id.into());
329 new_table
330 };
331
332 self.table_by_id.insert(proto.id.into(), table);
333 }
334
335 pub fn update_database(&mut self, proto: &PbDatabase) {
336 let id = proto.id;
337 let name = proto.name.clone();
338
339 let old_database_name = self.db_name_by_id.get(&id).unwrap().to_owned();
340 if old_database_name != name {
341 let mut database = self.database_by_name.remove(&old_database_name).unwrap();
342 database.name.clone_from(&name);
343 database.owner = proto.owner;
344 self.database_by_name.insert(name.clone(), database);
345 self.db_name_by_id.insert(id, name);
346 } else {
347 let database = self.get_database_mut(id).unwrap();
348 database.name = name;
349 database.owner = proto.owner;
350 }
351 }
352
353 pub fn update_schema(&mut self, proto: &PbSchema) {
354 self.get_database_mut(proto.database_id)
355 .unwrap()
356 .update_schema(proto);
357 }
358
359 pub fn update_index(&mut self, proto: &PbIndex) {
360 let database = self.get_database_mut(proto.database_id).unwrap();
361 let schema = database.get_schema_mut(proto.schema_id).unwrap();
362 if schema.get_index_by_id(&proto.id.into()).is_some() {
363 schema.update_index(proto);
364 } else {
365 schema.create_index(proto);
367 database
368 .iter_schemas_mut()
369 .find(|schema| {
370 schema.id() != proto.schema_id
371 && schema.get_index_by_id(&proto.id.into()).is_some()
372 })
373 .unwrap()
374 .drop_index(proto.id.into());
375 }
376 }
377
378 pub fn drop_source(&mut self, db_id: DatabaseId, schema_id: SchemaId, source_id: SourceId) {
379 self.get_database_mut(db_id)
380 .unwrap()
381 .get_schema_mut(schema_id)
382 .unwrap()
383 .drop_source(source_id);
384 }
385
386 pub fn update_source(&mut self, proto: &PbSource) {
387 let database = self.get_database_mut(proto.database_id).unwrap();
388 let schema = database.get_schema_mut(proto.schema_id).unwrap();
389 if schema.get_source_by_id(&proto.id).is_some() {
390 schema.update_source(proto);
391 } else {
392 schema.create_source(proto);
394 database
395 .iter_schemas_mut()
396 .find(|schema| {
397 schema.id() != proto.schema_id && schema.get_source_by_id(&proto.id).is_some()
398 })
399 .unwrap()
400 .drop_source(proto.id);
401 }
402 }
403
404 pub fn drop_sink(&mut self, db_id: DatabaseId, schema_id: SchemaId, sink_id: SinkId) {
405 self.get_database_mut(db_id)
406 .unwrap()
407 .get_schema_mut(schema_id)
408 .unwrap()
409 .drop_sink(sink_id);
410 }
411
412 pub fn drop_secret(&mut self, db_id: DatabaseId, schema_id: SchemaId, secret_id: SecretId) {
413 self.get_database_mut(db_id)
414 .unwrap()
415 .get_schema_mut(schema_id)
416 .unwrap()
417 .drop_secret(secret_id);
418 }
419
420 pub fn update_sink(&mut self, proto: &PbSink) {
421 let database = self.get_database_mut(proto.database_id).unwrap();
422 let schema = database.get_schema_mut(proto.schema_id).unwrap();
423 if schema.get_sink_by_id(&proto.id).is_some() {
424 schema.update_sink(proto);
425 } else {
426 schema.create_sink(proto);
428 database
429 .iter_schemas_mut()
430 .find(|schema| {
431 schema.id() != proto.schema_id && schema.get_sink_by_id(&proto.id).is_some()
432 })
433 .unwrap()
434 .drop_sink(proto.id);
435 }
436 }
437
438 pub fn drop_subscription(
439 &mut self,
440 db_id: DatabaseId,
441 schema_id: SchemaId,
442 subscription_id: SubscriptionId,
443 ) {
444 self.get_database_mut(db_id)
445 .unwrap()
446 .get_schema_mut(schema_id)
447 .unwrap()
448 .drop_subscription(subscription_id);
449 }
450
451 pub fn update_subscription(&mut self, proto: &PbSubscription) {
452 let database = self.get_database_mut(proto.database_id).unwrap();
453 let schema = database.get_schema_mut(proto.schema_id).unwrap();
454 if schema.get_subscription_by_id(&proto.id).is_some() {
455 schema.update_subscription(proto);
456 } else {
457 schema.create_subscription(proto);
459 database
460 .iter_schemas_mut()
461 .find(|schema| {
462 schema.id() != proto.schema_id
463 && schema.get_subscription_by_id(&proto.id).is_some()
464 })
465 .unwrap()
466 .drop_subscription(proto.id);
467 }
468 }
469
470 pub fn drop_index(&mut self, db_id: DatabaseId, schema_id: SchemaId, index_id: IndexId) {
471 self.get_database_mut(db_id)
472 .unwrap()
473 .get_schema_mut(schema_id)
474 .unwrap()
475 .drop_index(index_id);
476 }
477
478 pub fn drop_view(&mut self, db_id: DatabaseId, schema_id: SchemaId, view_id: ViewId) {
479 self.get_database_mut(db_id)
480 .unwrap()
481 .get_schema_mut(schema_id)
482 .unwrap()
483 .drop_view(view_id);
484 }
485
486 pub fn update_view(&mut self, proto: &PbView) {
487 let database = self.get_database_mut(proto.database_id).unwrap();
488 let schema = database.get_schema_mut(proto.schema_id).unwrap();
489 if schema.get_view_by_id(&proto.id).is_some() {
490 schema.update_view(proto);
491 } else {
492 schema.create_view(proto);
494 database
495 .iter_schemas_mut()
496 .find(|schema| {
497 schema.id() != proto.schema_id && schema.get_view_by_id(&proto.id).is_some()
498 })
499 .unwrap()
500 .drop_view(proto.id);
501 }
502 }
503
504 pub fn drop_function(
505 &mut self,
506 db_id: DatabaseId,
507 schema_id: SchemaId,
508 function_id: FunctionId,
509 ) {
510 self.get_database_mut(db_id)
511 .unwrap()
512 .get_schema_mut(schema_id)
513 .unwrap()
514 .drop_function(function_id);
515 }
516
517 pub fn update_function(&mut self, proto: &PbFunction) {
518 let database = self.get_database_mut(proto.database_id).unwrap();
519 let schema = database.get_schema_mut(proto.schema_id).unwrap();
520 if schema.get_function_by_id(proto.id.into()).is_some() {
521 schema.update_function(proto);
522 } else {
523 schema.create_function(proto);
525 database
526 .iter_schemas_mut()
527 .find(|schema| {
528 schema.id() != proto.schema_id
529 && schema.get_function_by_id(proto.id.into()).is_some()
530 })
531 .unwrap()
532 .drop_function(proto.id.into());
533 }
534
535 self.get_database_mut(proto.database_id)
536 .unwrap()
537 .get_schema_mut(proto.schema_id)
538 .unwrap()
539 .update_function(proto);
540 }
541
542 pub fn get_database_by_name(&self, db_name: &str) -> CatalogResult<&DatabaseCatalog> {
543 self.database_by_name
544 .get(db_name)
545 .ok_or_else(|| CatalogError::NotFound("database", db_name.to_owned()))
546 }
547
548 pub fn get_database_by_id(&self, db_id: &DatabaseId) -> CatalogResult<&DatabaseCatalog> {
549 let db_name = self
550 .db_name_by_id
551 .get(db_id)
552 .ok_or_else(|| CatalogError::NotFound("db_id", db_id.to_string()))?;
553 self.database_by_name
554 .get(db_name)
555 .ok_or_else(|| CatalogError::NotFound("database", db_name.to_string()))
556 }
557
558 pub fn get_all_schema_names(&self, db_name: &str) -> CatalogResult<Vec<String>> {
559 Ok(self.get_database_by_name(db_name)?.get_all_schema_names())
560 }
561
562 pub fn iter_schemas(
563 &self,
564 db_name: &str,
565 ) -> CatalogResult<impl Iterator<Item = &SchemaCatalog>> {
566 Ok(self.get_database_by_name(db_name)?.iter_schemas())
567 }
568
569 pub fn get_all_database_names(&self) -> Vec<String> {
570 self.database_by_name.keys().cloned().collect_vec()
571 }
572
573 pub fn iter_databases(&self) -> impl Iterator<Item = &DatabaseCatalog> {
574 self.database_by_name.values()
575 }
576
577 pub fn get_schema_by_name(
578 &self,
579 db_name: &str,
580 schema_name: &str,
581 ) -> CatalogResult<&SchemaCatalog> {
582 self.get_database_by_name(db_name)?
583 .get_schema_by_name(schema_name)
584 .ok_or_else(|| CatalogError::NotFound("schema", schema_name.to_owned()))
585 }
586
587 pub fn get_table_name_by_id(&self, table_id: TableId) -> CatalogResult<String> {
588 self.get_any_table_by_id(&table_id)
589 .map(|table| table.name.clone())
590 }
591
592 pub fn get_schema_by_id(
593 &self,
594 db_id: &DatabaseId,
595 schema_id: &SchemaId,
596 ) -> CatalogResult<&SchemaCatalog> {
597 self.get_database_by_id(db_id)?
598 .get_schema_by_id(schema_id)
599 .ok_or_else(|| CatalogError::NotFound("schema_id", schema_id.to_string()))
600 }
601
602 pub fn first_valid_schema(
604 &self,
605 db_name: &str,
606 search_path: &SearchPath,
607 user_name: &str,
608 ) -> CatalogResult<&SchemaCatalog> {
609 for path in search_path.real_path() {
610 let mut schema_name: &str = path;
611 if schema_name == USER_NAME_WILD_CARD {
612 schema_name = user_name;
613 }
614
615 if let schema_catalog @ Ok(_) = self.get_schema_by_name(db_name, schema_name) {
616 return schema_catalog;
617 }
618 }
619 Err(CatalogError::NotFound(
620 "first valid schema",
621 "no schema has been selected to create in".to_owned(),
622 ))
623 }
624
625 pub fn get_source_by_id<'a>(
626 &self,
627 db_name: &'a str,
628 schema_path: SchemaPath<'a>,
629 source_id: &SourceId,
630 ) -> CatalogResult<(&Arc<SourceCatalog>, &'a str)> {
631 schema_path
632 .try_find(|schema_name| {
633 Ok(self
634 .get_schema_by_name(db_name, schema_name)?
635 .get_source_by_id(source_id))
636 })?
637 .ok_or_else(|| CatalogError::NotFound("source", source_id.to_string()))
638 }
639
640 pub fn get_any_table_by_name<'a>(
643 &self,
644 db_name: &str,
645 schema_path: SchemaPath<'a>,
646 table_name: &str,
647 ) -> CatalogResult<(&Arc<TableCatalog>, &'a str)> {
648 schema_path
649 .try_find(|schema_name| {
650 Ok(self
651 .get_schema_by_name(db_name, schema_name)?
652 .get_table_by_name(table_name))
653 })?
654 .ok_or_else(|| CatalogError::NotFound("table", table_name.to_owned()))
655 }
656
657 pub fn get_created_table_by_name<'a>(
660 &self,
661 db_name: &str,
662 schema_path: SchemaPath<'a>,
663 table_name: &str,
664 ) -> CatalogResult<(&Arc<TableCatalog>, &'a str)> {
665 schema_path
666 .try_find(|schema_name| {
667 Ok(self
668 .get_schema_by_name(db_name, schema_name)?
669 .get_created_table_by_name(table_name))
670 })?
671 .ok_or_else(|| CatalogError::NotFound("table", table_name.to_owned()))
672 }
673
674 pub fn get_any_table_by_id(&self, table_id: &TableId) -> CatalogResult<&Arc<TableCatalog>> {
675 self.table_by_id
676 .get(table_id)
677 .ok_or_else(|| CatalogError::NotFound("table id", table_id.to_string()))
678 }
679
680 pub fn get_created_table_by_id_with_db(
682 &self,
683 db_name: &str,
684 table_id: u32,
685 ) -> CatalogResult<&Arc<TableCatalog>> {
686 let table_id = TableId::from(table_id);
687 for schema in self.get_database_by_name(db_name)?.iter_schemas() {
688 if let Some(table) = schema.get_created_table_by_id(&table_id) {
689 return Ok(table);
690 }
691 }
692 Err(CatalogError::NotFound("table id", table_id.to_string()))
693 }
694
695 pub fn alter_table_name_by_id(&mut self, table_id: &TableId, table_name: &str) {
697 let mut found = false;
698 for database in self.database_by_name.values() {
699 if !found {
700 for schema in database.iter_schemas() {
701 if schema.iter_user_table().any(|t| t.id() == *table_id) {
702 found = true;
703 break;
704 }
705 }
706 }
707 }
708
709 if found {
710 let mut table = self.get_any_table_by_id(table_id).unwrap().to_prost();
711 table.name = table_name.to_owned();
712 self.update_table(&table);
713 }
714 }
715
716 #[cfg(test)]
717 pub fn insert_table_id_mapping(&mut self, table_id: TableId, fragment_id: super::FragmentId) {
718 self.table_by_id.insert(
719 table_id,
720 Arc::new(TableCatalog {
721 fragment_id,
722 ..Default::default()
723 }),
724 );
725 }
726
727 pub fn get_sys_table_by_name(
728 &self,
729 db_name: &str,
730 schema_name: &str,
731 table_name: &str,
732 ) -> CatalogResult<&Arc<SystemTableCatalog>> {
733 self.get_schema_by_name(db_name, schema_name)
734 .unwrap()
735 .get_system_table_by_name(table_name)
736 .ok_or_else(|| CatalogError::NotFound("table", table_name.to_owned()))
737 }
738
739 pub fn get_source_by_name<'a>(
740 &self,
741 db_name: &str,
742 schema_path: SchemaPath<'a>,
743 source_name: &str,
744 ) -> CatalogResult<(&Arc<SourceCatalog>, &'a str)> {
745 schema_path
746 .try_find(|schema_name| {
747 Ok(self
748 .get_schema_by_name(db_name, schema_name)?
749 .get_source_by_name(source_name))
750 })?
751 .ok_or_else(|| CatalogError::NotFound("source", source_name.to_owned()))
752 }
753
754 pub fn get_sink_by_name<'a>(
755 &self,
756 db_name: &str,
757 schema_path: SchemaPath<'a>,
758 sink_name: &str,
759 ) -> CatalogResult<(&Arc<SinkCatalog>, &'a str)> {
760 schema_path
761 .try_find(|schema_name| {
762 Ok(self
763 .get_schema_by_name(db_name, schema_name)?
764 .get_sink_by_name(sink_name))
765 })?
766 .ok_or_else(|| CatalogError::NotFound("sink", sink_name.to_owned()))
767 }
768
769 pub fn get_subscription_by_name<'a>(
770 &self,
771 db_name: &str,
772 schema_path: SchemaPath<'a>,
773 subscription_name: &str,
774 ) -> CatalogResult<(&Arc<SubscriptionCatalog>, &'a str)> {
775 schema_path
776 .try_find(|schema_name| {
777 Ok(self
778 .get_schema_by_name(db_name, schema_name)?
779 .get_subscription_by_name(subscription_name))
780 })?
781 .ok_or_else(|| CatalogError::NotFound("subscription", subscription_name.to_owned()))
782 }
783
784 pub fn get_index_by_name<'a>(
785 &self,
786 db_name: &str,
787 schema_path: SchemaPath<'a>,
788 index_name: &str,
789 ) -> CatalogResult<(&Arc<IndexCatalog>, &'a str)> {
790 schema_path
791 .try_find(|schema_name| {
792 Ok(self
793 .get_schema_by_name(db_name, schema_name)?
794 .get_index_by_name(index_name))
795 })?
796 .ok_or_else(|| CatalogError::NotFound("index", index_name.to_owned()))
797 }
798
799 pub fn get_index_by_id(
800 &self,
801 db_name: &str,
802 index_id: u32,
803 ) -> CatalogResult<&Arc<IndexCatalog>> {
804 let index_id = IndexId::from(index_id);
805 for schema in self.get_database_by_name(db_name)?.iter_schemas() {
806 if let Some(index) = schema.get_index_by_id(&index_id) {
807 return Ok(index);
808 }
809 }
810 Err(CatalogError::NotFound("index", index_id.to_string()))
811 }
812
813 pub fn get_view_by_name<'a>(
814 &self,
815 db_name: &str,
816 schema_path: SchemaPath<'a>,
817 view_name: &str,
818 ) -> CatalogResult<(&Arc<ViewCatalog>, &'a str)> {
819 schema_path
820 .try_find(|schema_name| {
821 Ok(self
822 .get_schema_by_name(db_name, schema_name)?
823 .get_view_by_name(view_name))
824 })?
825 .ok_or_else(|| CatalogError::NotFound("view", view_name.to_owned()))
826 }
827
828 pub fn get_view_by_id(&self, db_name: &str, view_id: u32) -> CatalogResult<Arc<ViewCatalog>> {
829 for schema in self.get_database_by_name(db_name)?.iter_schemas() {
830 if let Some(view) = schema.get_view_by_id(&ViewId::from(view_id)) {
831 return Ok(view.clone());
832 }
833 }
834 Err(CatalogError::NotFound("view", view_id.to_string()))
835 }
836
837 pub fn get_secret_by_name<'a>(
838 &self,
839 db_name: &str,
840 schema_path: SchemaPath<'a>,
841 secret_name: &str,
842 ) -> CatalogResult<(&Arc<SecretCatalog>, &'a str)> {
843 schema_path
844 .try_find(|schema_name| {
845 Ok(self
846 .get_schema_by_name(db_name, schema_name)?
847 .get_secret_by_name(secret_name))
848 })?
849 .ok_or_else(|| CatalogError::NotFound("secret", secret_name.to_owned()))
850 }
851
852 pub fn get_connection_by_name<'a>(
853 &self,
854 db_name: &str,
855 schema_path: SchemaPath<'a>,
856 connection_name: &str,
857 ) -> CatalogResult<(&Arc<ConnectionCatalog>, &'a str)> {
858 schema_path
859 .try_find(|schema_name| {
860 Ok(self
861 .get_schema_by_name(db_name, schema_name)?
862 .get_connection_by_name(connection_name))
863 })?
864 .ok_or_else(|| CatalogError::NotFound("connection", connection_name.to_owned()))
865 }
866
867 pub fn get_function_by_name_inputs<'a>(
868 &self,
869 db_name: &str,
870 schema_path: SchemaPath<'a>,
871 function_name: &str,
872 inputs: &mut [ExprImpl],
873 ) -> CatalogResult<(&Arc<FunctionCatalog>, &'a str)> {
874 schema_path
875 .try_find(|schema_name| {
876 Ok(self
877 .get_schema_by_name(db_name, schema_name)?
878 .get_function_by_name_inputs(function_name, inputs))
879 })?
880 .ok_or_else(|| {
881 CatalogError::NotFound(
882 "function",
883 format!(
884 "{}({})",
885 function_name,
886 inputs
887 .iter()
888 .map(|a| a.return_type().to_string())
889 .join(", ")
890 ),
891 )
892 })
893 }
894
895 pub fn get_function_by_name_args<'a>(
896 &self,
897 db_name: &str,
898 schema_path: SchemaPath<'a>,
899 function_name: &str,
900 args: &[DataType],
901 ) -> CatalogResult<(&Arc<FunctionCatalog>, &'a str)> {
902 schema_path
903 .try_find(|schema_name| {
904 Ok(self
905 .get_schema_by_name(db_name, schema_name)?
906 .get_function_by_name_args(function_name, args))
907 })?
908 .ok_or_else(|| {
909 CatalogError::NotFound(
910 "function",
911 format!(
912 "{}({})",
913 function_name,
914 args.iter().map(|a| a.to_string()).join(", ")
915 ),
916 )
917 })
918 }
919
920 pub fn get_functions_by_name<'a>(
922 &self,
923 db_name: &str,
924 schema_path: SchemaPath<'a>,
925 function_name: &str,
926 ) -> CatalogResult<(Vec<&Arc<FunctionCatalog>>, &'a str)> {
927 schema_path
928 .try_find(|schema_name| {
929 Ok(self
930 .get_schema_by_name(db_name, schema_name)?
931 .get_functions_by_name(function_name))
932 })?
933 .ok_or_else(|| CatalogError::NotFound("function", function_name.to_owned()))
934 }
935
936 pub fn check_relation_name_duplicated(
938 &self,
939 db_name: &str,
940 schema_name: &str,
941 relation_name: &str,
942 ) -> CatalogResult<()> {
943 let schema = self.get_schema_by_name(db_name, schema_name)?;
944
945 if let Some(table) = schema.get_table_by_name(relation_name) {
946 let is_creating = table.stream_job_status == StreamJobStatus::Creating;
947 if table.is_index() {
948 Err(CatalogError::Duplicated(
949 "index",
950 relation_name.to_owned(),
951 is_creating,
952 ))
953 } else if table.is_mview() {
954 Err(CatalogError::Duplicated(
955 "materialized view",
956 relation_name.to_owned(),
957 is_creating,
958 ))
959 } else {
960 Err(CatalogError::Duplicated(
961 "table",
962 relation_name.to_owned(),
963 is_creating,
964 ))
965 }
966 } else if schema.get_source_by_name(relation_name).is_some() {
967 Err(CatalogError::duplicated("source", relation_name.to_owned()))
968 } else if schema.get_sink_by_name(relation_name).is_some() {
969 Err(CatalogError::duplicated("sink", relation_name.to_owned()))
970 } else if schema.get_view_by_name(relation_name).is_some() {
971 Err(CatalogError::duplicated("view", relation_name.to_owned()))
972 } else if schema.get_subscription_by_name(relation_name).is_some() {
973 Err(CatalogError::duplicated(
974 "subscription",
975 relation_name.to_owned(),
976 ))
977 } else {
978 Ok(())
979 }
980 }
981
982 pub fn check_function_name_duplicated(
983 &self,
984 db_name: &str,
985 schema_name: &str,
986 function_name: &str,
987 arg_types: &[DataType],
988 ) -> CatalogResult<()> {
989 let schema = self.get_schema_by_name(db_name, schema_name)?;
990
991 if schema
992 .get_function_by_name_args(function_name, arg_types)
993 .is_some()
994 {
995 let name = format!(
996 "{function_name}({})",
997 arg_types.iter().map(|t| t.to_string()).join(",")
998 );
999 Err(CatalogError::duplicated("function", name))
1000 } else {
1001 Ok(())
1002 }
1003 }
1004
1005 pub fn check_connection_name_duplicated(
1007 &self,
1008 db_name: &str,
1009 schema_name: &str,
1010 connection_name: &str,
1011 ) -> CatalogResult<()> {
1012 let schema = self.get_schema_by_name(db_name, schema_name)?;
1013
1014 if schema.get_connection_by_name(connection_name).is_some() {
1015 Err(CatalogError::duplicated(
1016 "connection",
1017 connection_name.to_owned(),
1018 ))
1019 } else {
1020 Ok(())
1021 }
1022 }
1023
1024 pub fn check_secret_name_duplicated(
1025 &self,
1026 db_name: &str,
1027 schema_name: &str,
1028 secret_name: &str,
1029 ) -> CatalogResult<()> {
1030 let schema = self.get_schema_by_name(db_name, schema_name)?;
1031
1032 if schema.get_secret_by_name(secret_name).is_some() {
1033 Err(CatalogError::duplicated("secret", secret_name.to_owned()))
1034 } else {
1035 Ok(())
1036 }
1037 }
1038
1039 pub fn version(&self) -> u64 {
1041 self.version
1042 }
1043
1044 pub fn set_version(&mut self, catalog_version: CatalogVersion) {
1046 self.version = catalog_version;
1047 }
1048
1049 pub fn table_stats(&self) -> &HummockVersionStats {
1050 &self.table_stats
1051 }
1052
1053 pub fn set_table_stats(&mut self, table_stats: HummockVersionStats) {
1054 self.table_stats = table_stats;
1055 }
1056
1057 pub fn get_all_indexes_related_to_object(
1058 &self,
1059 db_id: DatabaseId,
1060 schema_id: SchemaId,
1061 mv_id: TableId,
1062 ) -> Vec<Arc<IndexCatalog>> {
1063 self.get_database_by_id(&db_id)
1064 .unwrap()
1065 .get_schema_by_id(&schema_id)
1066 .unwrap()
1067 .get_indexes_by_table_id(&mv_id)
1068 }
1069
1070 pub fn get_id_by_class_name(
1071 &self,
1072 db_name: &str,
1073 schema_path: SchemaPath<'_>,
1074 class_name: &str,
1075 ) -> CatalogResult<u32> {
1076 schema_path
1077 .try_find(|schema_name| {
1078 let schema = self.get_schema_by_name(db_name, schema_name)?;
1079 #[allow(clippy::manual_map)]
1080 if let Some(item) = schema.get_system_table_by_name(class_name) {
1081 Ok(Some(item.id().into()))
1082 } else if let Some(item) = schema.get_created_table_by_name(class_name) {
1083 Ok(Some(item.id().into()))
1084 } else if let Some(item) = schema.get_index_by_name(class_name) {
1085 Ok(Some(item.id.into()))
1086 } else if let Some(item) = schema.get_source_by_name(class_name) {
1087 Ok(Some(item.id))
1088 } else if let Some(item) = schema.get_view_by_name(class_name) {
1089 Ok(Some(item.id))
1090 } else {
1091 Ok(None)
1092 }
1093 })?
1094 .map(|(id, _)| id)
1095 .ok_or_else(|| CatalogError::NotFound("class", class_name.to_owned()))
1096 }
1097}