1use std::collections::HashMap;
16use std::sync::Arc;
17
18use itertools::Itertools;
19use risingwave_common::catalog::{FunctionId, IndexId, StreamJobStatus, TableId};
20use risingwave_common::session_config::{SearchPath, USER_NAME_WILD_CARD};
21use risingwave_common::types::DataType;
22use risingwave_connector::sink::catalog::SinkCatalog;
23use risingwave_pb::catalog::{
24 PbConnection, PbDatabase, PbFunction, PbIndex, PbSchema, PbSecret, PbSink, PbSource,
25 PbSubscription, PbTable, PbView,
26};
27use risingwave_pb::hummock::HummockVersionStats;
28
29use super::function_catalog::FunctionCatalog;
30use super::source_catalog::SourceCatalog;
31use super::subscription_catalog::{SubscriptionCatalog, SubscriptionState};
32use super::view_catalog::ViewCatalog;
33use super::{
34 CatalogError, CatalogResult, ConnectionId, SecretId, SinkId, SourceId, SubscriptionId, ViewId,
35};
36use crate::catalog::connection_catalog::ConnectionCatalog;
37use crate::catalog::database_catalog::DatabaseCatalog;
38use crate::catalog::schema_catalog::SchemaCatalog;
39use crate::catalog::secret_catalog::SecretCatalog;
40use crate::catalog::system_catalog::{
41 SystemTableCatalog, get_sys_tables_in_schema, get_sys_views_in_schema,
42};
43use crate::catalog::table_catalog::TableCatalog;
44use crate::catalog::{DatabaseId, IndexCatalog, SchemaId};
45use crate::expr::{Expr, ExprImpl};
46
47#[derive(Copy, Clone)]
48pub enum SchemaPath<'a> {
49 Name(&'a str),
50 Path(&'a SearchPath, &'a str),
52}
53
54impl<'a> SchemaPath<'a> {
55 pub fn new(
56 schema_name: Option<&'a str>,
57 search_path: &'a SearchPath,
58 user_name: &'a str,
59 ) -> Self {
60 match schema_name {
61 Some(schema_name) => SchemaPath::Name(schema_name),
62 None => SchemaPath::Path(search_path, user_name),
63 }
64 }
65
66 pub fn try_find<T, E>(
68 &self,
69 mut f: impl FnMut(&str) -> Result<Option<T>, E>,
70 ) -> Result<Option<(T, &'a str)>, E> {
71 match self {
72 SchemaPath::Name(schema_name) => Ok(f(schema_name)?.map(|t| (t, *schema_name))),
73 SchemaPath::Path(search_path, user_name) => {
74 for schema_name in search_path.path() {
75 let mut schema_name: &str = schema_name;
76 if schema_name == USER_NAME_WILD_CARD {
77 schema_name = user_name;
78 }
79 if let Ok(Some(res)) = f(schema_name) {
80 return Ok(Some((res, schema_name)));
81 }
82 }
83 Ok(None)
84 }
85 }
86 }
87}
88
89pub struct Catalog {
101 database_by_name: HashMap<String, DatabaseCatalog>,
102 db_name_by_id: HashMap<DatabaseId, String>,
103 table_by_id: HashMap<TableId, Arc<TableCatalog>>,
105 table_stats: HummockVersionStats,
106}
107
108#[expect(clippy::derivable_impls)]
109impl Default for Catalog {
110 fn default() -> Self {
111 Self {
112 database_by_name: HashMap::new(),
113 db_name_by_id: HashMap::new(),
114 table_by_id: HashMap::new(),
115 table_stats: HummockVersionStats::default(),
116 }
117 }
118}
119
120impl Catalog {
121 fn get_database_mut(&mut self, db_id: DatabaseId) -> Option<&mut DatabaseCatalog> {
122 let name = self.db_name_by_id.get(&db_id)?;
123 self.database_by_name.get_mut(name)
124 }
125
126 pub fn clear(&mut self) {
127 self.database_by_name.clear();
128 self.db_name_by_id.clear();
129 self.table_by_id.clear();
130 }
131
132 pub fn create_database(&mut self, db: &PbDatabase) {
133 let name = db.name.clone();
134 let id = db.id;
135
136 self.database_by_name
137 .try_insert(name.clone(), db.into())
138 .unwrap();
139 self.db_name_by_id.try_insert(id, name).unwrap();
140 }
141
142 pub fn create_schema(&mut self, proto: &PbSchema) {
143 self.get_database_mut(proto.database_id)
144 .unwrap()
145 .create_schema(proto);
146
147 for sys_table in get_sys_tables_in_schema(proto.name.as_str()) {
148 self.get_database_mut(proto.database_id)
149 .unwrap()
150 .get_schema_mut(proto.id)
151 .unwrap()
152 .create_sys_table(sys_table);
153 }
154 for mut sys_view in get_sys_views_in_schema(proto.name.as_str()) {
155 sys_view.database_id = proto.database_id;
156 sys_view.schema_id = proto.id;
157 self.get_database_mut(proto.database_id)
158 .unwrap()
159 .get_schema_mut(proto.id)
160 .unwrap()
161 .create_sys_view(Arc::new(sys_view));
162 }
163 }
164
165 pub fn create_table(&mut self, proto: &PbTable) {
166 let table = self
167 .get_database_mut(proto.database_id)
168 .unwrap()
169 .get_schema_mut(proto.schema_id)
170 .unwrap()
171 .create_table(proto);
172 self.table_by_id.insert(proto.id.into(), table);
173 }
174
175 pub fn create_index(&mut self, proto: &PbIndex) {
176 self.get_database_mut(proto.database_id)
177 .unwrap()
178 .get_schema_mut(proto.schema_id)
179 .unwrap()
180 .create_index(proto);
181 }
182
183 pub fn create_source(&mut self, proto: &PbSource) {
184 self.get_database_mut(proto.database_id)
185 .unwrap()
186 .get_schema_mut(proto.schema_id)
187 .unwrap()
188 .create_source(proto);
189 }
190
191 pub fn create_sink(&mut self, proto: &PbSink) {
192 self.get_database_mut(proto.database_id)
193 .unwrap()
194 .get_schema_mut(proto.schema_id)
195 .unwrap()
196 .create_sink(proto);
197 }
198
199 pub fn create_subscription(&mut self, proto: &PbSubscription) {
200 self.get_database_mut(proto.database_id)
201 .unwrap()
202 .get_schema_mut(proto.schema_id)
203 .unwrap()
204 .create_subscription(proto);
205 }
206
207 pub fn create_secret(&mut self, proto: &PbSecret) {
208 self.get_database_mut(proto.database_id)
209 .unwrap()
210 .get_schema_mut(proto.schema_id)
211 .unwrap()
212 .create_secret(proto);
213 }
214
215 pub fn create_view(&mut self, proto: &PbView) {
216 self.get_database_mut(proto.database_id)
217 .unwrap()
218 .get_schema_mut(proto.schema_id)
219 .unwrap()
220 .create_view(proto);
221 }
222
223 pub fn create_function(&mut self, proto: &PbFunction) {
224 self.get_database_mut(proto.database_id)
225 .unwrap()
226 .get_schema_mut(proto.schema_id)
227 .unwrap()
228 .create_function(proto);
229 }
230
231 pub fn create_connection(&mut self, proto: &PbConnection) {
232 self.get_database_mut(proto.database_id)
233 .unwrap()
234 .get_schema_mut(proto.schema_id)
235 .unwrap()
236 .create_connection(proto);
237 }
238
239 pub fn drop_connection(
240 &mut self,
241 db_id: DatabaseId,
242 schema_id: SchemaId,
243 connection_id: ConnectionId,
244 ) {
245 self.get_database_mut(db_id)
246 .unwrap()
247 .get_schema_mut(schema_id)
248 .unwrap()
249 .drop_connection(connection_id);
250 }
251
252 pub fn update_connection(&mut self, proto: &PbConnection) {
253 let database = self.get_database_mut(proto.database_id).unwrap();
254 let schema = database.get_schema_mut(proto.schema_id).unwrap();
255 if schema.get_connection_by_id(&proto.id).is_some() {
256 schema.update_connection(proto);
257 } else {
258 schema.create_connection(proto);
260 database
261 .iter_schemas_mut()
262 .find(|schema| {
263 schema.id() != proto.schema_id
264 && schema.get_connection_by_id(&proto.id).is_some()
265 })
266 .unwrap()
267 .drop_connection(proto.id);
268 }
269 }
270
271 pub fn update_secret(&mut self, proto: &PbSecret) {
272 let database = self.get_database_mut(proto.database_id).unwrap();
273 let schema = database.get_schema_mut(proto.schema_id).unwrap();
274 let secret_id = SecretId::new(proto.id);
275 if schema.get_secret_by_id(&secret_id).is_some() {
276 schema.update_secret(proto);
277 } else {
278 schema.create_secret(proto);
280 database
281 .iter_schemas_mut()
282 .find(|schema| {
283 schema.id() != proto.schema_id && schema.get_secret_by_id(&secret_id).is_some()
284 })
285 .unwrap()
286 .drop_secret(secret_id);
287 }
288 }
289
290 pub fn drop_database(&mut self, db_id: DatabaseId) {
291 let name = self.db_name_by_id.remove(&db_id).unwrap();
292 let database = self.database_by_name.remove(&name).unwrap();
293 database.iter_all_table_ids().for_each(|table| {
294 self.table_by_id.remove(&table);
295 });
296 }
297
298 pub fn drop_schema(&mut self, db_id: DatabaseId, schema_id: SchemaId) {
299 self.get_database_mut(db_id).unwrap().drop_schema(schema_id);
300 }
301
302 pub fn drop_table(&mut self, db_id: DatabaseId, schema_id: SchemaId, tb_id: TableId) {
303 self.table_by_id.remove(&tb_id);
304 self.get_database_mut(db_id)
305 .unwrap()
306 .get_schema_mut(schema_id)
307 .unwrap()
308 .drop_table(tb_id);
309 }
310
311 pub fn update_table(&mut self, proto: &PbTable) {
312 let database = self.get_database_mut(proto.database_id).unwrap();
313 let schema = database.get_schema_mut(proto.schema_id).unwrap();
314 let table = if schema.get_table_by_id(&proto.id.into()).is_some() {
315 schema.update_table(proto)
316 } else {
317 let new_table = schema.create_table(proto);
319 database
320 .iter_schemas_mut()
321 .find(|schema| {
322 schema.id() != proto.schema_id
323 && schema.get_created_table_by_id(&proto.id.into()).is_some()
324 })
325 .unwrap()
326 .drop_table(proto.id.into());
327 new_table
328 };
329
330 self.table_by_id.insert(proto.id.into(), table);
331 }
332
333 pub fn update_database(&mut self, proto: &PbDatabase) {
334 let id = proto.id;
335 let name = proto.name.clone();
336
337 let old_database_name = self.db_name_by_id.get(&id).unwrap().to_owned();
338 if old_database_name != name {
339 let mut database = self.database_by_name.remove(&old_database_name).unwrap();
340 database.name.clone_from(&name);
341 database.owner = proto.owner;
342 self.database_by_name.insert(name.clone(), database);
343 self.db_name_by_id.insert(id, name);
344 } else {
345 let database = self.get_database_mut(id).unwrap();
346 database.name = name;
347 database.owner = proto.owner;
348 database.barrier_interval_ms = proto.barrier_interval_ms;
349 database.checkpoint_frequency = proto.checkpoint_frequency;
350 }
351 }
352
353 pub fn update_schema(&mut self, proto: &PbSchema) {
354 self.get_database_mut(proto.database_id)
355 .unwrap()
356 .update_schema(proto);
357 }
358
359 pub fn update_index(&mut self, proto: &PbIndex) {
360 let database = self.get_database_mut(proto.database_id).unwrap();
361 let schema = database.get_schema_mut(proto.schema_id).unwrap();
362 if schema.get_index_by_id(&proto.id.into()).is_some() {
363 schema.update_index(proto);
364 } else {
365 schema.create_index(proto);
367 database
368 .iter_schemas_mut()
369 .find(|schema| {
370 schema.id() != proto.schema_id
371 && schema.get_index_by_id(&proto.id.into()).is_some()
372 })
373 .unwrap()
374 .drop_index(proto.id.into());
375 }
376 }
377
378 pub fn drop_source(&mut self, db_id: DatabaseId, schema_id: SchemaId, source_id: SourceId) {
379 self.get_database_mut(db_id)
380 .unwrap()
381 .get_schema_mut(schema_id)
382 .unwrap()
383 .drop_source(source_id);
384 }
385
386 pub fn update_source(&mut self, proto: &PbSource) {
387 let database = self.get_database_mut(proto.database_id).unwrap();
388 let schema = database.get_schema_mut(proto.schema_id).unwrap();
389 if schema.get_source_by_id(&proto.id).is_some() {
390 schema.update_source(proto);
391 } else {
392 schema.create_source(proto);
394 database
395 .iter_schemas_mut()
396 .find(|schema| {
397 schema.id() != proto.schema_id && schema.get_source_by_id(&proto.id).is_some()
398 })
399 .unwrap()
400 .drop_source(proto.id);
401 }
402 }
403
404 pub fn drop_sink(&mut self, db_id: DatabaseId, schema_id: SchemaId, sink_id: SinkId) {
405 self.get_database_mut(db_id)
406 .unwrap()
407 .get_schema_mut(schema_id)
408 .unwrap()
409 .drop_sink(sink_id);
410 }
411
412 pub fn drop_secret(&mut self, db_id: DatabaseId, schema_id: SchemaId, secret_id: SecretId) {
413 self.get_database_mut(db_id)
414 .unwrap()
415 .get_schema_mut(schema_id)
416 .unwrap()
417 .drop_secret(secret_id);
418 }
419
420 pub fn update_sink(&mut self, proto: &PbSink) {
421 let database = self.get_database_mut(proto.database_id).unwrap();
422 let schema = database.get_schema_mut(proto.schema_id).unwrap();
423 if schema.get_sink_by_id(&proto.id).is_some() {
424 schema.update_sink(proto);
425 } else {
426 schema.create_sink(proto);
428 database
429 .iter_schemas_mut()
430 .find(|schema| {
431 schema.id() != proto.schema_id && schema.get_sink_by_id(&proto.id).is_some()
432 })
433 .unwrap()
434 .drop_sink(proto.id);
435 }
436 }
437
438 pub fn drop_subscription(
439 &mut self,
440 db_id: DatabaseId,
441 schema_id: SchemaId,
442 subscription_id: SubscriptionId,
443 ) {
444 self.get_database_mut(db_id)
445 .unwrap()
446 .get_schema_mut(schema_id)
447 .unwrap()
448 .drop_subscription(subscription_id);
449 }
450
451 pub fn update_subscription(&mut self, proto: &PbSubscription) {
452 let database = self.get_database_mut(proto.database_id).unwrap();
453 let schema = database.get_schema_mut(proto.schema_id).unwrap();
454 if schema.get_subscription_by_id(&proto.id).is_some() {
455 schema.update_subscription(proto);
456 } else {
457 schema.create_subscription(proto);
459 database
460 .iter_schemas_mut()
461 .find(|schema| {
462 schema.id() != proto.schema_id
463 && schema.get_subscription_by_id(&proto.id).is_some()
464 })
465 .unwrap()
466 .drop_subscription(proto.id);
467 }
468 }
469
470 pub fn drop_index(&mut self, db_id: DatabaseId, schema_id: SchemaId, index_id: IndexId) {
471 self.get_database_mut(db_id)
472 .unwrap()
473 .get_schema_mut(schema_id)
474 .unwrap()
475 .drop_index(index_id);
476 }
477
478 pub fn drop_view(&mut self, db_id: DatabaseId, schema_id: SchemaId, view_id: ViewId) {
479 self.get_database_mut(db_id)
480 .unwrap()
481 .get_schema_mut(schema_id)
482 .unwrap()
483 .drop_view(view_id);
484 }
485
486 pub fn update_view(&mut self, proto: &PbView) {
487 let database = self.get_database_mut(proto.database_id).unwrap();
488 let schema = database.get_schema_mut(proto.schema_id).unwrap();
489 if schema.get_view_by_id(&proto.id).is_some() {
490 schema.update_view(proto);
491 } else {
492 schema.create_view(proto);
494 database
495 .iter_schemas_mut()
496 .find(|schema| {
497 schema.id() != proto.schema_id && schema.get_view_by_id(&proto.id).is_some()
498 })
499 .unwrap()
500 .drop_view(proto.id);
501 }
502 }
503
504 pub fn drop_function(
505 &mut self,
506 db_id: DatabaseId,
507 schema_id: SchemaId,
508 function_id: FunctionId,
509 ) {
510 self.get_database_mut(db_id)
511 .unwrap()
512 .get_schema_mut(schema_id)
513 .unwrap()
514 .drop_function(function_id);
515 }
516
517 pub fn update_function(&mut self, proto: &PbFunction) {
518 let database = self.get_database_mut(proto.database_id).unwrap();
519 let schema = database.get_schema_mut(proto.schema_id).unwrap();
520 if schema.get_function_by_id(proto.id.into()).is_some() {
521 schema.update_function(proto);
522 } else {
523 schema.create_function(proto);
525 database
526 .iter_schemas_mut()
527 .find(|schema| {
528 schema.id() != proto.schema_id
529 && schema.get_function_by_id(proto.id.into()).is_some()
530 })
531 .unwrap()
532 .drop_function(proto.id.into());
533 }
534
535 self.get_database_mut(proto.database_id)
536 .unwrap()
537 .get_schema_mut(proto.schema_id)
538 .unwrap()
539 .update_function(proto);
540 }
541
542 pub fn get_database_by_name(&self, db_name: &str) -> CatalogResult<&DatabaseCatalog> {
543 self.database_by_name
544 .get(db_name)
545 .ok_or_else(|| CatalogError::NotFound("database", db_name.to_owned()))
546 }
547
548 pub fn get_database_by_id(&self, db_id: &DatabaseId) -> CatalogResult<&DatabaseCatalog> {
549 let db_name = self
550 .db_name_by_id
551 .get(db_id)
552 .ok_or_else(|| CatalogError::NotFound("db_id", db_id.to_string()))?;
553 self.database_by_name
554 .get(db_name)
555 .ok_or_else(|| CatalogError::NotFound("database", db_name.clone()))
556 }
557
558 pub fn find_schema_secret_by_secret_id(
559 &self,
560 db_name: &str,
561 secret_id: SecretId,
562 ) -> CatalogResult<(String, String)> {
563 let db = self.get_database_by_name(db_name)?;
564 let schema_secret = db
565 .iter_schemas()
566 .find_map(|schema| {
567 schema
568 .get_secret_by_id(&secret_id)
569 .map(|secret| (schema.name(), secret.name.clone()))
570 })
571 .ok_or_else(|| CatalogError::NotFound("secret", secret_id.to_string()))?;
572 Ok(schema_secret)
573 }
574
575 pub fn get_all_schema_names(&self, db_name: &str) -> CatalogResult<Vec<String>> {
576 Ok(self.get_database_by_name(db_name)?.get_all_schema_names())
577 }
578
579 pub fn iter_schemas(
580 &self,
581 db_name: &str,
582 ) -> CatalogResult<impl Iterator<Item = &SchemaCatalog>> {
583 Ok(self.get_database_by_name(db_name)?.iter_schemas())
584 }
585
586 pub fn get_all_database_names(&self) -> Vec<String> {
587 self.database_by_name.keys().cloned().collect_vec()
588 }
589
590 pub fn iter_databases(&self) -> impl Iterator<Item = &DatabaseCatalog> {
591 self.database_by_name.values()
592 }
593
594 pub fn get_schema_by_name(
595 &self,
596 db_name: &str,
597 schema_name: &str,
598 ) -> CatalogResult<&SchemaCatalog> {
599 self.get_database_by_name(db_name)?
600 .get_schema_by_name(schema_name)
601 .ok_or_else(|| CatalogError::NotFound("schema", schema_name.to_owned()))
602 }
603
604 pub fn get_table_name_by_id(&self, table_id: TableId) -> CatalogResult<String> {
605 self.get_any_table_by_id(&table_id)
606 .map(|table| table.name.clone())
607 }
608
609 pub fn get_schema_by_id(
610 &self,
611 db_id: &DatabaseId,
612 schema_id: &SchemaId,
613 ) -> CatalogResult<&SchemaCatalog> {
614 self.get_database_by_id(db_id)?
615 .get_schema_by_id(schema_id)
616 .ok_or_else(|| CatalogError::NotFound("schema_id", schema_id.to_string()))
617 }
618
619 pub fn first_valid_schema(
621 &self,
622 db_name: &str,
623 search_path: &SearchPath,
624 user_name: &str,
625 ) -> CatalogResult<&SchemaCatalog> {
626 for path in search_path.real_path() {
627 let mut schema_name: &str = path;
628 if schema_name == USER_NAME_WILD_CARD {
629 schema_name = user_name;
630 }
631
632 if let schema_catalog @ Ok(_) = self.get_schema_by_name(db_name, schema_name) {
633 return schema_catalog;
634 }
635 }
636 Err(CatalogError::NotFound(
637 "first valid schema",
638 "no schema has been selected to create in".to_owned(),
639 ))
640 }
641
642 pub fn get_source_by_id<'a>(
643 &self,
644 db_name: &'a str,
645 schema_path: SchemaPath<'a>,
646 source_id: &SourceId,
647 ) -> CatalogResult<(&Arc<SourceCatalog>, &'a str)> {
648 schema_path
649 .try_find(|schema_name| {
650 Ok(self
651 .get_schema_by_name(db_name, schema_name)?
652 .get_source_by_id(source_id))
653 })?
654 .ok_or_else(|| CatalogError::NotFound("source", source_id.to_string()))
655 }
656
657 pub fn get_table_by_name<'a>(
658 &self,
659 db_name: &str,
660 schema_path: SchemaPath<'a>,
661 table_name: &str,
662 bind_creating: bool,
663 ) -> CatalogResult<(&Arc<TableCatalog>, &'a str)> {
664 schema_path
665 .try_find(|schema_name| {
666 Ok(self
667 .get_schema_by_name(db_name, schema_name)?
668 .get_table_by_name(table_name, bind_creating))
669 })?
670 .ok_or_else(|| CatalogError::NotFound("table", table_name.to_owned()))
671 }
672
673 pub fn get_any_table_by_name<'a>(
676 &self,
677 db_name: &str,
678 schema_path: SchemaPath<'a>,
679 table_name: &str,
680 ) -> CatalogResult<(&Arc<TableCatalog>, &'a str)> {
681 self.get_table_by_name(db_name, schema_path, table_name, true)
682 }
683
684 pub fn get_created_table_by_name<'a>(
687 &self,
688 db_name: &str,
689 schema_path: SchemaPath<'a>,
690 table_name: &str,
691 ) -> CatalogResult<(&Arc<TableCatalog>, &'a str)> {
692 self.get_table_by_name(db_name, schema_path, table_name, false)
693 }
694
695 pub fn get_any_table_by_id(&self, table_id: &TableId) -> CatalogResult<&Arc<TableCatalog>> {
696 self.table_by_id
697 .get(table_id)
698 .ok_or_else(|| CatalogError::NotFound("table id", table_id.to_string()))
699 }
700
701 pub fn get_created_table_by_id_with_db(
703 &self,
704 db_name: &str,
705 table_id: u32,
706 ) -> CatalogResult<&Arc<TableCatalog>> {
707 let table_id = TableId::from(table_id);
708 for schema in self.get_database_by_name(db_name)?.iter_schemas() {
709 if let Some(table) = schema.get_created_table_by_id(&table_id) {
710 return Ok(table);
711 }
712 }
713 Err(CatalogError::NotFound("table id", table_id.to_string()))
714 }
715
716 pub fn iter_tables(&self) -> impl Iterator<Item = &Arc<TableCatalog>> {
717 self.table_by_id.values()
718 }
719
720 pub fn iter_backfilling_internal_tables(&self) -> impl Iterator<Item = &Arc<TableCatalog>> {
721 self.table_by_id
722 .values()
723 .filter(|t| t.is_internal_table() && !t.is_created())
724 }
725
726 pub fn alter_table_name_by_id(&mut self, table_id: &TableId, table_name: &str) {
728 let mut found = false;
729 for database in self.database_by_name.values() {
730 if !found {
731 for schema in database.iter_schemas() {
732 if schema.iter_user_table().any(|t| t.id() == *table_id) {
733 found = true;
734 break;
735 }
736 }
737 }
738 }
739
740 if found {
741 let mut table = self.get_any_table_by_id(table_id).unwrap().to_prost();
742 table.name = table_name.to_owned();
743 self.update_table(&table);
744 }
745 }
746
747 #[cfg(test)]
748 pub fn insert_table_id_mapping(&mut self, table_id: TableId, fragment_id: super::FragmentId) {
749 self.table_by_id.insert(
750 table_id,
751 Arc::new(TableCatalog {
752 fragment_id,
753 ..Default::default()
754 }),
755 );
756 }
757
758 pub fn get_sys_table_by_name(
759 &self,
760 db_name: &str,
761 schema_name: &str,
762 table_name: &str,
763 ) -> CatalogResult<&Arc<SystemTableCatalog>> {
764 self.get_schema_by_name(db_name, schema_name)?
765 .get_system_table_by_name(table_name)
766 .ok_or_else(|| CatalogError::NotFound("table", table_name.to_owned()))
767 }
768
769 pub fn get_source_by_name<'a>(
770 &self,
771 db_name: &str,
772 schema_path: SchemaPath<'a>,
773 source_name: &str,
774 ) -> CatalogResult<(&Arc<SourceCatalog>, &'a str)> {
775 schema_path
776 .try_find(|schema_name| {
777 Ok(self
778 .get_schema_by_name(db_name, schema_name)?
779 .get_source_by_name(source_name))
780 })?
781 .ok_or_else(|| CatalogError::NotFound("source", source_name.to_owned()))
782 }
783
784 pub fn get_sink_by_name<'a>(
785 &self,
786 db_name: &str,
787 schema_path: SchemaPath<'a>,
788 sink_name: &str,
789 bind_creating: bool,
790 ) -> CatalogResult<(&Arc<SinkCatalog>, &'a str)> {
791 schema_path
792 .try_find(|schema_name| {
793 Ok(self
794 .get_schema_by_name(db_name, schema_name)?
795 .get_sink_by_name(sink_name, bind_creating))
796 })?
797 .ok_or_else(|| CatalogError::NotFound("sink", sink_name.to_owned()))
798 }
799
800 pub fn get_any_sink_by_name<'a>(
801 &self,
802 db_name: &str,
803 schema_path: SchemaPath<'a>,
804 sink_name: &str,
805 ) -> CatalogResult<(&Arc<SinkCatalog>, &'a str)> {
806 self.get_sink_by_name(db_name, schema_path, sink_name, true)
807 }
808
809 pub fn get_created_sink_by_name<'a>(
810 &self,
811 db_name: &str,
812 schema_path: SchemaPath<'a>,
813 sink_name: &str,
814 ) -> CatalogResult<(&Arc<SinkCatalog>, &'a str)> {
815 self.get_sink_by_name(db_name, schema_path, sink_name, false)
816 }
817
818 pub fn get_subscription_by_name<'a>(
819 &self,
820 db_name: &str,
821 schema_path: SchemaPath<'a>,
822 subscription_name: &str,
823 ) -> CatalogResult<(&Arc<SubscriptionCatalog>, &'a str)> {
824 schema_path
825 .try_find(|schema_name| {
826 Ok(self
827 .get_schema_by_name(db_name, schema_name)?
828 .get_subscription_by_name(subscription_name))
829 })?
830 .ok_or_else(|| CatalogError::NotFound("subscription", subscription_name.to_owned()))
831 }
832
833 pub fn get_index_by_name<'a>(
834 &self,
835 db_name: &str,
836 schema_path: SchemaPath<'a>,
837 index_name: &str,
838 ) -> CatalogResult<(&Arc<IndexCatalog>, &'a str)> {
839 schema_path
840 .try_find(|schema_name| {
841 Ok(self
842 .get_schema_by_name(db_name, schema_name)?
843 .get_index_by_name(index_name))
844 })?
845 .ok_or_else(|| CatalogError::NotFound("index", index_name.to_owned()))
846 }
847
848 pub fn get_index_by_id(
849 &self,
850 db_name: &str,
851 index_id: u32,
852 ) -> CatalogResult<&Arc<IndexCatalog>> {
853 let index_id = IndexId::from(index_id);
854 for schema in self.get_database_by_name(db_name)?.iter_schemas() {
855 if let Some(index) = schema.get_index_by_id(&index_id) {
856 return Ok(index);
857 }
858 }
859 Err(CatalogError::NotFound("index", index_id.to_string()))
860 }
861
862 pub fn get_view_by_name<'a>(
863 &self,
864 db_name: &str,
865 schema_path: SchemaPath<'a>,
866 view_name: &str,
867 ) -> CatalogResult<(&Arc<ViewCatalog>, &'a str)> {
868 schema_path
869 .try_find(|schema_name| {
870 Ok(self
871 .get_schema_by_name(db_name, schema_name)?
872 .get_view_by_name(view_name))
873 })?
874 .ok_or_else(|| CatalogError::NotFound("view", view_name.to_owned()))
875 }
876
877 pub fn get_view_by_id(&self, db_name: &str, view_id: u32) -> CatalogResult<Arc<ViewCatalog>> {
878 for schema in self.get_database_by_name(db_name)?.iter_schemas() {
879 if let Some(view) = schema.get_view_by_id(&ViewId::from(view_id)) {
880 return Ok(view.clone());
881 }
882 }
883 Err(CatalogError::NotFound("view", view_id.to_string()))
884 }
885
886 pub fn get_secret_by_name<'a>(
887 &self,
888 db_name: &str,
889 schema_path: SchemaPath<'a>,
890 secret_name: &str,
891 ) -> CatalogResult<(&Arc<SecretCatalog>, &'a str)> {
892 schema_path
893 .try_find(|schema_name| {
894 Ok(self
895 .get_schema_by_name(db_name, schema_name)?
896 .get_secret_by_name(secret_name))
897 })?
898 .ok_or_else(|| CatalogError::NotFound("secret", secret_name.to_owned()))
899 }
900
901 pub fn get_connection_by_id(
902 &self,
903 db_name: &str,
904 connection_id: ConnectionId,
905 ) -> CatalogResult<&Arc<ConnectionCatalog>> {
906 for schema in self.get_database_by_name(db_name)?.iter_schemas() {
907 if let Some(conn) = schema.get_connection_by_id(&connection_id) {
908 return Ok(conn);
909 }
910 }
911 Err(CatalogError::NotFound(
912 "connection",
913 connection_id.to_string(),
914 ))
915 }
916
917 pub fn get_connection_by_name<'a>(
918 &self,
919 db_name: &str,
920 schema_path: SchemaPath<'a>,
921 connection_name: &str,
922 ) -> CatalogResult<(&Arc<ConnectionCatalog>, &'a str)> {
923 schema_path
924 .try_find(|schema_name| {
925 Ok(self
926 .get_schema_by_name(db_name, schema_name)?
927 .get_connection_by_name(connection_name))
928 })?
929 .ok_or_else(|| CatalogError::NotFound("connection", connection_name.to_owned()))
930 }
931
932 pub fn get_function_by_name_inputs<'a>(
933 &self,
934 db_name: &str,
935 schema_path: SchemaPath<'a>,
936 function_name: &str,
937 inputs: &mut [ExprImpl],
938 ) -> CatalogResult<(&Arc<FunctionCatalog>, &'a str)> {
939 schema_path
940 .try_find(|schema_name| {
941 Ok(self
942 .get_schema_by_name(db_name, schema_name)?
943 .get_function_by_name_inputs(function_name, inputs))
944 })?
945 .ok_or_else(|| {
946 CatalogError::NotFound(
947 "function",
948 format!(
949 "{}({})",
950 function_name,
951 inputs
952 .iter()
953 .map(|a| a.return_type().to_string())
954 .join(", ")
955 ),
956 )
957 })
958 }
959
960 pub fn get_function_by_name_args<'a>(
961 &self,
962 db_name: &str,
963 schema_path: SchemaPath<'a>,
964 function_name: &str,
965 args: &[DataType],
966 ) -> CatalogResult<(&Arc<FunctionCatalog>, &'a str)> {
967 schema_path
968 .try_find(|schema_name| {
969 Ok(self
970 .get_schema_by_name(db_name, schema_name)?
971 .get_function_by_name_args(function_name, args))
972 })?
973 .ok_or_else(|| {
974 CatalogError::NotFound(
975 "function",
976 format!(
977 "{}({})",
978 function_name,
979 args.iter().map(|a| a.to_string()).join(", ")
980 ),
981 )
982 })
983 }
984
985 pub fn get_functions_by_name<'a>(
987 &self,
988 db_name: &str,
989 schema_path: SchemaPath<'a>,
990 function_name: &str,
991 ) -> CatalogResult<(Vec<&Arc<FunctionCatalog>>, &'a str)> {
992 schema_path
993 .try_find(|schema_name| {
994 Ok(self
995 .get_schema_by_name(db_name, schema_name)?
996 .get_functions_by_name(function_name))
997 })?
998 .ok_or_else(|| CatalogError::NotFound("function", function_name.to_owned()))
999 }
1000
1001 pub fn check_relation_name_duplicated(
1003 &self,
1004 db_name: &str,
1005 schema_name: &str,
1006 relation_name: &str,
1007 ) -> CatalogResult<()> {
1008 let schema = self.get_schema_by_name(db_name, schema_name)?;
1009
1010 if let Some(table) = schema.get_any_table_by_name(relation_name) {
1011 let is_creating = table.stream_job_status == StreamJobStatus::Creating;
1012 if table.is_index() {
1013 Err(CatalogError::Duplicated(
1014 "index",
1015 relation_name.to_owned(),
1016 is_creating,
1017 ))
1018 } else if table.is_mview() {
1019 Err(CatalogError::Duplicated(
1020 "materialized view",
1021 relation_name.to_owned(),
1022 is_creating,
1023 ))
1024 } else {
1025 Err(CatalogError::Duplicated(
1026 "table",
1027 relation_name.to_owned(),
1028 is_creating,
1029 ))
1030 }
1031 } else if schema.get_source_by_name(relation_name).is_some() {
1032 Err(CatalogError::duplicated("source", relation_name.to_owned()))
1033 } else if let Some(sink) = schema.get_any_sink_by_name(relation_name) {
1034 Err(CatalogError::Duplicated(
1035 "sink",
1036 relation_name.to_owned(),
1037 !sink.is_created(),
1038 ))
1039 } else if schema.get_view_by_name(relation_name).is_some() {
1040 Err(CatalogError::duplicated("view", relation_name.to_owned()))
1041 } else if let Some(subscription) = schema.get_subscription_by_name(relation_name) {
1042 let is_not_created = subscription.subscription_state != SubscriptionState::Created;
1043 Err(CatalogError::Duplicated(
1044 "subscription",
1045 relation_name.to_owned(),
1046 is_not_created,
1047 ))
1048 } else {
1049 Ok(())
1050 }
1051 }
1052
1053 pub fn check_function_name_duplicated(
1054 &self,
1055 db_name: &str,
1056 schema_name: &str,
1057 function_name: &str,
1058 arg_types: &[DataType],
1059 ) -> CatalogResult<()> {
1060 let schema = self.get_schema_by_name(db_name, schema_name)?;
1061
1062 if schema
1063 .get_function_by_name_args(function_name, arg_types)
1064 .is_some()
1065 {
1066 let name = format!(
1067 "{function_name}({})",
1068 arg_types.iter().map(|t| t.to_string()).join(",")
1069 );
1070 Err(CatalogError::duplicated("function", name))
1071 } else {
1072 Ok(())
1073 }
1074 }
1075
1076 pub fn check_connection_name_duplicated(
1078 &self,
1079 db_name: &str,
1080 schema_name: &str,
1081 connection_name: &str,
1082 ) -> CatalogResult<()> {
1083 let schema = self.get_schema_by_name(db_name, schema_name)?;
1084
1085 if schema.get_connection_by_name(connection_name).is_some() {
1086 Err(CatalogError::duplicated(
1087 "connection",
1088 connection_name.to_owned(),
1089 ))
1090 } else {
1091 Ok(())
1092 }
1093 }
1094
1095 pub fn check_secret_name_duplicated(
1096 &self,
1097 db_name: &str,
1098 schema_name: &str,
1099 secret_name: &str,
1100 ) -> CatalogResult<()> {
1101 let schema = self.get_schema_by_name(db_name, schema_name)?;
1102
1103 if schema.get_secret_by_name(secret_name).is_some() {
1104 Err(CatalogError::duplicated("secret", secret_name.to_owned()))
1105 } else {
1106 Ok(())
1107 }
1108 }
1109
1110 pub fn table_stats(&self) -> &HummockVersionStats {
1111 &self.table_stats
1112 }
1113
1114 pub fn set_table_stats(&mut self, table_stats: HummockVersionStats) {
1115 self.table_stats = table_stats;
1116 }
1117
1118 pub fn get_all_indexes_related_to_object(
1119 &self,
1120 db_id: DatabaseId,
1121 schema_id: SchemaId,
1122 mv_id: TableId,
1123 ) -> Vec<Arc<IndexCatalog>> {
1124 self.get_database_by_id(&db_id)
1125 .unwrap()
1126 .get_schema_by_id(&schema_id)
1127 .unwrap()
1128 .get_indexes_by_table_id(&mv_id)
1129 }
1130
1131 pub fn get_id_by_class_name(
1132 &self,
1133 db_name: &str,
1134 schema_path: SchemaPath<'_>,
1135 class_name: &str,
1136 ) -> CatalogResult<u32> {
1137 schema_path
1138 .try_find(|schema_name| {
1139 let schema = self.get_schema_by_name(db_name, schema_name)?;
1140 #[allow(clippy::manual_map)]
1141 if let Some(item) = schema.get_system_table_by_name(class_name) {
1142 Ok(Some(item.id().into()))
1143 } else if let Some(item) = schema.get_created_table_by_name(class_name) {
1144 Ok(Some(item.id().into()))
1145 } else if let Some(item) = schema.get_index_by_name(class_name) {
1146 Ok(Some(item.id.into()))
1147 } else if let Some(item) = schema.get_source_by_name(class_name) {
1148 Ok(Some(item.id))
1149 } else if let Some(item) = schema.get_view_by_name(class_name) {
1150 Ok(Some(item.id))
1151 } else {
1152 Ok(None)
1153 }
1154 })?
1155 .map(|(id, _)| id)
1156 .ok_or_else(|| CatalogError::NotFound("class", class_name.to_owned()))
1157 }
1158}