1use std::collections::HashMap;
16use std::sync::Arc;
17
18use itertools::Itertools;
19use risingwave_common::catalog::{FunctionId, IndexId, StreamJobStatus, TableId};
20use risingwave_common::id::ObjectId;
21use risingwave_common::session_config::{SearchPath, USER_NAME_WILD_CARD};
22use risingwave_common::types::DataType;
23use risingwave_connector::sink::catalog::SinkCatalog;
24use risingwave_pb::catalog::{
25 PbConnection, PbDatabase, PbFunction, PbIndex, PbSchema, PbSecret, PbSink, PbSource,
26 PbSubscription, PbTable, PbView,
27};
28use risingwave_pb::hummock::HummockVersionStats;
29
30use super::function_catalog::FunctionCatalog;
31use super::source_catalog::SourceCatalog;
32use super::subscription_catalog::{SubscriptionCatalog, SubscriptionState};
33use super::view_catalog::ViewCatalog;
34use super::{
35 CatalogError, CatalogResult, ConnectionId, SecretId, SinkId, SourceId, SubscriptionId, ViewId,
36};
37use crate::catalog::connection_catalog::ConnectionCatalog;
38use crate::catalog::database_catalog::DatabaseCatalog;
39use crate::catalog::schema_catalog::SchemaCatalog;
40use crate::catalog::secret_catalog::SecretCatalog;
41use crate::catalog::system_catalog::{
42 SystemTableCatalog, get_sys_tables_in_schema, get_sys_views_in_schema,
43};
44use crate::catalog::table_catalog::TableCatalog;
45use crate::catalog::{DatabaseId, IndexCatalog, SchemaId};
46use crate::expr::{Expr, ExprImpl};
47
48#[derive(Copy, Clone)]
49pub enum SchemaPath<'a> {
50 Name(&'a str),
51 Path(&'a SearchPath, &'a str),
53}
54
55impl<'a> SchemaPath<'a> {
56 pub fn new(
57 schema_name: Option<&'a str>,
58 search_path: &'a SearchPath,
59 user_name: &'a str,
60 ) -> Self {
61 match schema_name {
62 Some(schema_name) => SchemaPath::Name(schema_name),
63 None => SchemaPath::Path(search_path, user_name),
64 }
65 }
66
67 pub fn try_find<T, E>(
69 &self,
70 mut f: impl FnMut(&str) -> Result<Option<T>, E>,
71 ) -> Result<Option<(T, &'a str)>, E> {
72 match self {
73 SchemaPath::Name(schema_name) => Ok(f(schema_name)?.map(|t| (t, *schema_name))),
74 SchemaPath::Path(search_path, user_name) => {
75 for schema_name in search_path.path() {
76 let mut schema_name: &str = schema_name;
77 if schema_name == USER_NAME_WILD_CARD {
78 schema_name = user_name;
79 }
80 if let Ok(Some(res)) = f(schema_name) {
81 return Ok(Some((res, schema_name)));
82 }
83 }
84 Ok(None)
85 }
86 }
87 }
88}
89
90pub struct Catalog {
102 database_by_name: HashMap<String, DatabaseCatalog>,
103 db_name_by_id: HashMap<DatabaseId, String>,
104 table_by_id: HashMap<TableId, Arc<TableCatalog>>,
106 table_stats: HummockVersionStats,
107}
108
109#[expect(clippy::derivable_impls)]
110impl Default for Catalog {
111 fn default() -> Self {
112 Self {
113 database_by_name: HashMap::new(),
114 db_name_by_id: HashMap::new(),
115 table_by_id: HashMap::new(),
116 table_stats: HummockVersionStats::default(),
117 }
118 }
119}
120
121impl Catalog {
122 fn get_database_mut(&mut self, db_id: DatabaseId) -> Option<&mut DatabaseCatalog> {
123 let name = self.db_name_by_id.get(&db_id)?;
124 self.database_by_name.get_mut(name)
125 }
126
127 pub fn clear(&mut self) {
128 self.database_by_name.clear();
129 self.db_name_by_id.clear();
130 self.table_by_id.clear();
131 }
132
133 pub fn create_database(&mut self, db: &PbDatabase) {
134 let name = db.name.clone();
135 let id = db.id;
136
137 self.database_by_name
138 .try_insert(name.clone(), db.into())
139 .unwrap();
140 self.db_name_by_id.try_insert(id, name).unwrap();
141 }
142
143 pub fn create_schema(&mut self, proto: &PbSchema) {
144 let database_id = proto.database_id;
145 let id = proto.id;
146 self.get_database_mut(database_id)
147 .unwrap()
148 .create_schema(proto);
149
150 for sys_table in get_sys_tables_in_schema(proto.name.as_str()) {
151 self.get_database_mut(database_id)
152 .unwrap()
153 .get_schema_mut(id)
154 .unwrap()
155 .create_sys_table(sys_table);
156 }
157 for mut sys_view in get_sys_views_in_schema(proto.name.as_str()) {
158 sys_view.database_id = database_id;
159 sys_view.schema_id = id;
160 self.get_database_mut(database_id)
161 .unwrap()
162 .get_schema_mut(id)
163 .unwrap()
164 .create_sys_view(Arc::new(sys_view));
165 }
166 }
167
168 pub fn create_table(&mut self, proto: &PbTable) {
169 let table = self
170 .get_database_mut(proto.database_id)
171 .unwrap()
172 .get_schema_mut(proto.schema_id)
173 .unwrap()
174 .create_table(proto);
175 self.table_by_id.insert(proto.id, table);
176 }
177
178 pub fn create_index(&mut self, proto: &PbIndex) {
179 self.get_database_mut(proto.database_id)
180 .unwrap()
181 .get_schema_mut(proto.schema_id)
182 .unwrap()
183 .create_index(proto);
184 }
185
186 pub fn create_source(&mut self, proto: &PbSource) {
187 self.get_database_mut(proto.database_id)
188 .unwrap()
189 .get_schema_mut(proto.schema_id)
190 .unwrap()
191 .create_source(proto);
192 }
193
194 pub fn create_sink(&mut self, proto: &PbSink) {
195 self.get_database_mut(proto.database_id)
196 .unwrap()
197 .get_schema_mut(proto.schema_id)
198 .unwrap()
199 .create_sink(proto);
200 }
201
202 pub fn create_subscription(&mut self, proto: &PbSubscription) {
203 self.get_database_mut(proto.database_id)
204 .unwrap()
205 .get_schema_mut(proto.schema_id)
206 .unwrap()
207 .create_subscription(proto);
208 }
209
210 pub fn create_secret(&mut self, proto: &PbSecret) {
211 self.get_database_mut(proto.database_id)
212 .unwrap()
213 .get_schema_mut(proto.schema_id)
214 .unwrap()
215 .create_secret(proto);
216 }
217
218 pub fn create_view(&mut self, proto: &PbView) {
219 self.get_database_mut(proto.database_id)
220 .unwrap()
221 .get_schema_mut(proto.schema_id)
222 .unwrap()
223 .create_view(proto);
224 }
225
226 pub fn create_function(&mut self, proto: &PbFunction) {
227 self.get_database_mut(proto.database_id)
228 .unwrap()
229 .get_schema_mut(proto.schema_id)
230 .unwrap()
231 .create_function(proto);
232 }
233
234 pub fn create_connection(&mut self, proto: &PbConnection) {
235 self.get_database_mut(proto.database_id)
236 .unwrap()
237 .get_schema_mut(proto.schema_id)
238 .unwrap()
239 .create_connection(proto);
240 }
241
242 pub fn drop_connection(
243 &mut self,
244 db_id: DatabaseId,
245 schema_id: SchemaId,
246 connection_id: ConnectionId,
247 ) {
248 self.get_database_mut(db_id)
249 .unwrap()
250 .get_schema_mut(schema_id)
251 .unwrap()
252 .drop_connection(connection_id);
253 }
254
255 pub fn update_connection(&mut self, proto: &PbConnection) {
256 let database = self.get_database_mut(proto.database_id).unwrap();
257 let schema = database.get_schema_mut(proto.schema_id).unwrap();
258 if schema.get_connection_by_id(proto.id).is_some() {
259 schema.update_connection(proto);
260 } else {
261 schema.create_connection(proto);
263 database
264 .iter_schemas_mut()
265 .find(|schema| {
266 schema.id() != proto.schema_id
267 && schema.get_connection_by_id(proto.id).is_some()
268 })
269 .unwrap()
270 .drop_connection(proto.id);
271 }
272 }
273
274 pub fn update_secret(&mut self, proto: &PbSecret) {
275 let database = self.get_database_mut(proto.database_id).unwrap();
276 let schema = database.get_schema_mut(proto.schema_id).unwrap();
277 let secret_id = proto.id;
278 if schema.get_secret_by_id(secret_id).is_some() {
279 schema.update_secret(proto);
280 } else {
281 schema.create_secret(proto);
283 database
284 .iter_schemas_mut()
285 .find(|schema| {
286 schema.id() != proto.schema_id && schema.get_secret_by_id(secret_id).is_some()
287 })
288 .unwrap()
289 .drop_secret(secret_id);
290 }
291 }
292
293 pub fn drop_database(&mut self, db_id: DatabaseId) {
294 let name = self.db_name_by_id.remove(&db_id).unwrap();
295 let database = self.database_by_name.remove(&name).unwrap();
296 database.iter_all_table_ids().for_each(|table| {
297 self.table_by_id.remove(&table);
298 });
299 }
300
301 pub fn drop_schema(&mut self, db_id: DatabaseId, schema_id: SchemaId) {
302 self.get_database_mut(db_id).unwrap().drop_schema(schema_id);
303 }
304
305 pub fn drop_table(&mut self, db_id: DatabaseId, schema_id: SchemaId, tb_id: TableId) {
306 self.table_by_id.remove(&tb_id);
307 self.get_database_mut(db_id)
308 .unwrap()
309 .get_schema_mut(schema_id)
310 .unwrap()
311 .drop_table(tb_id);
312 }
313
314 pub fn update_table(&mut self, proto: &PbTable) {
315 let database = self.get_database_mut(proto.database_id).unwrap();
316 let schema = database.get_schema_mut(proto.schema_id).unwrap();
317 let table = if schema.get_table_by_id(proto.id).is_some() {
318 schema.update_table(proto)
319 } else {
320 let new_table = schema.create_table(proto);
322 database
323 .iter_schemas_mut()
324 .find(|schema| {
325 schema.id() != proto.schema_id
326 && schema.get_created_table_by_id(proto.id).is_some()
327 })
328 .unwrap()
329 .drop_table(proto.id);
330 new_table
331 };
332
333 self.table_by_id.insert(proto.id, table);
334 }
335
336 pub fn update_database(&mut self, proto: &PbDatabase) {
337 let id = proto.id;
338 let name = proto.name.clone();
339
340 let old_database_name = self.db_name_by_id.get(&id).unwrap().to_owned();
341 if old_database_name != name {
342 let mut database = self.database_by_name.remove(&old_database_name).unwrap();
343 database.name.clone_from(&name);
344 database.owner = proto.owner;
345 self.database_by_name.insert(name.clone(), database);
346 self.db_name_by_id.insert(id, name);
347 } else {
348 let database = self.get_database_mut(id).unwrap();
349 database.name = name;
350 database.owner = proto.owner;
351 database.barrier_interval_ms = proto.barrier_interval_ms;
352 database.checkpoint_frequency = proto.checkpoint_frequency;
353 }
354 }
355
356 pub fn update_schema(&mut self, proto: &PbSchema) {
357 self.get_database_mut(proto.database_id)
358 .unwrap()
359 .update_schema(proto);
360 }
361
362 pub fn update_index(&mut self, proto: &PbIndex) {
363 let database = self.get_database_mut(proto.database_id).unwrap();
364 let schema = database.get_schema_mut(proto.schema_id).unwrap();
365 if schema.get_index_by_id(proto.id).is_some() {
366 schema.update_index(proto);
367 } else {
368 schema.create_index(proto);
370 database
371 .iter_schemas_mut()
372 .find(|schema| {
373 schema.id() != proto.schema_id && schema.get_index_by_id(proto.id).is_some()
374 })
375 .unwrap()
376 .drop_index(proto.id);
377 }
378 }
379
380 pub fn drop_source(&mut self, db_id: DatabaseId, schema_id: SchemaId, source_id: SourceId) {
381 self.get_database_mut(db_id)
382 .unwrap()
383 .get_schema_mut(schema_id)
384 .unwrap()
385 .drop_source(source_id);
386 }
387
388 pub fn update_source(&mut self, proto: &PbSource) {
389 let database = self.get_database_mut(proto.database_id).unwrap();
390 let schema = database.get_schema_mut(proto.schema_id).unwrap();
391 if schema.get_source_by_id(proto.id).is_some() {
392 schema.update_source(proto);
393 } else {
394 schema.create_source(proto);
396 database
397 .iter_schemas_mut()
398 .find(|schema| {
399 schema.id() != proto.schema_id && schema.get_source_by_id(proto.id).is_some()
400 })
401 .unwrap()
402 .drop_source(proto.id);
403 }
404 }
405
406 pub fn drop_sink(&mut self, db_id: DatabaseId, schema_id: SchemaId, sink_id: SinkId) {
407 self.get_database_mut(db_id)
408 .unwrap()
409 .get_schema_mut(schema_id)
410 .unwrap()
411 .drop_sink(sink_id);
412 }
413
414 pub fn drop_secret(&mut self, db_id: DatabaseId, schema_id: SchemaId, secret_id: SecretId) {
415 self.get_database_mut(db_id)
416 .unwrap()
417 .get_schema_mut(schema_id)
418 .unwrap()
419 .drop_secret(secret_id);
420 }
421
422 pub fn update_sink(&mut self, proto: &PbSink) {
423 let database = self.get_database_mut(proto.database_id).unwrap();
424 let schema = database.get_schema_mut(proto.schema_id).unwrap();
425 if schema.get_sink_by_id(proto.id).is_some() {
426 schema.update_sink(proto);
427 } else {
428 schema.create_sink(proto);
430 database
431 .iter_schemas_mut()
432 .find(|schema| {
433 schema.id() != proto.schema_id && schema.get_sink_by_id(proto.id).is_some()
434 })
435 .unwrap()
436 .drop_sink(proto.id);
437 }
438 }
439
440 pub fn drop_subscription(
441 &mut self,
442 db_id: DatabaseId,
443 schema_id: SchemaId,
444 subscription_id: SubscriptionId,
445 ) {
446 self.get_database_mut(db_id)
447 .unwrap()
448 .get_schema_mut(schema_id)
449 .unwrap()
450 .drop_subscription(subscription_id);
451 }
452
453 pub fn update_subscription(&mut self, proto: &PbSubscription) {
454 let database = self.get_database_mut(proto.database_id).unwrap();
455 let schema = database.get_schema_mut(proto.schema_id).unwrap();
456 if schema.get_subscription_by_id(proto.id).is_some() {
457 schema.update_subscription(proto);
458 } else {
459 schema.create_subscription(proto);
461 database
462 .iter_schemas_mut()
463 .find(|schema| {
464 schema.id() != proto.schema_id
465 && schema.get_subscription_by_id(proto.id).is_some()
466 })
467 .unwrap()
468 .drop_subscription(proto.id);
469 }
470 }
471
472 pub fn drop_index(&mut self, db_id: DatabaseId, schema_id: SchemaId, index_id: IndexId) {
473 self.get_database_mut(db_id)
474 .unwrap()
475 .get_schema_mut(schema_id)
476 .unwrap()
477 .drop_index(index_id);
478 }
479
480 pub fn drop_view(&mut self, db_id: DatabaseId, schema_id: SchemaId, view_id: ViewId) {
481 self.get_database_mut(db_id)
482 .unwrap()
483 .get_schema_mut(schema_id)
484 .unwrap()
485 .drop_view(view_id);
486 }
487
488 pub fn update_view(&mut self, proto: &PbView) {
489 let database = self.get_database_mut(proto.database_id).unwrap();
490 let schema = database.get_schema_mut(proto.schema_id).unwrap();
491 if schema.get_view_by_id(proto.id).is_some() {
492 schema.update_view(proto);
493 } else {
494 schema.create_view(proto);
496 database
497 .iter_schemas_mut()
498 .find(|schema| {
499 schema.id() != proto.schema_id && schema.get_view_by_id(proto.id).is_some()
500 })
501 .unwrap()
502 .drop_view(proto.id);
503 }
504 }
505
506 pub fn drop_function(
507 &mut self,
508 db_id: DatabaseId,
509 schema_id: SchemaId,
510 function_id: FunctionId,
511 ) {
512 self.get_database_mut(db_id)
513 .unwrap()
514 .get_schema_mut(schema_id)
515 .unwrap()
516 .drop_function(function_id);
517 }
518
519 pub fn update_function(&mut self, proto: &PbFunction) {
520 let database = self.get_database_mut(proto.database_id).unwrap();
521 let schema = database.get_schema_mut(proto.schema_id).unwrap();
522 if schema.get_function_by_id(proto.id).is_some() {
523 schema.update_function(proto);
524 } else {
525 schema.create_function(proto);
527 database
528 .iter_schemas_mut()
529 .find(|schema| {
530 schema.id() != proto.schema_id && schema.get_function_by_id(proto.id).is_some()
531 })
532 .unwrap()
533 .drop_function(proto.id);
534 }
535
536 self.get_database_mut(proto.database_id)
537 .unwrap()
538 .get_schema_mut(proto.schema_id)
539 .unwrap()
540 .update_function(proto);
541 }
542
543 pub fn get_database_by_name(&self, db_name: &str) -> CatalogResult<&DatabaseCatalog> {
544 self.database_by_name
545 .get(db_name)
546 .ok_or_else(|| CatalogError::NotFound("database", db_name.to_owned()))
547 }
548
549 pub fn get_database_by_id(&self, db_id: DatabaseId) -> CatalogResult<&DatabaseCatalog> {
550 let db_name = self
551 .db_name_by_id
552 .get(&db_id)
553 .ok_or_else(|| CatalogError::NotFound("db_id", db_id.to_string()))?;
554 self.database_by_name
555 .get(db_name)
556 .ok_or_else(|| CatalogError::NotFound("database", db_name.clone()))
557 }
558
559 pub fn find_schema_secret_by_secret_id(
560 &self,
561 db_name: &str,
562 secret_id: SecretId,
563 ) -> CatalogResult<(String, String)> {
564 let db = self.get_database_by_name(db_name)?;
565 let schema_secret = db
566 .iter_schemas()
567 .find_map(|schema| {
568 schema
569 .get_secret_by_id(secret_id)
570 .map(|secret| (schema.name(), secret.name.clone()))
571 })
572 .ok_or_else(|| CatalogError::NotFound("secret", secret_id.to_string()))?;
573 Ok(schema_secret)
574 }
575
576 pub fn get_all_schema_names(&self, db_name: &str) -> CatalogResult<Vec<String>> {
577 Ok(self.get_database_by_name(db_name)?.get_all_schema_names())
578 }
579
580 pub fn iter_schemas(
581 &self,
582 db_name: &str,
583 ) -> CatalogResult<impl Iterator<Item = &SchemaCatalog>> {
584 Ok(self.get_database_by_name(db_name)?.iter_schemas())
585 }
586
587 pub fn get_all_database_names(&self) -> Vec<String> {
588 self.database_by_name.keys().cloned().collect_vec()
589 }
590
591 pub fn iter_databases(&self) -> impl Iterator<Item = &DatabaseCatalog> {
592 self.database_by_name.values()
593 }
594
595 pub fn get_schema_by_name(
596 &self,
597 db_name: &str,
598 schema_name: &str,
599 ) -> CatalogResult<&SchemaCatalog> {
600 self.get_database_by_name(db_name)?
601 .get_schema_by_name(schema_name)
602 .ok_or_else(|| CatalogError::NotFound("schema", schema_name.to_owned()))
603 }
604
605 pub fn get_table_name_by_id(&self, table_id: TableId) -> CatalogResult<String> {
606 self.get_any_table_by_id(table_id)
607 .map(|table| table.name.clone())
608 }
609
610 pub fn get_schema_by_id(
611 &self,
612 db_id: DatabaseId,
613 schema_id: SchemaId,
614 ) -> CatalogResult<&SchemaCatalog> {
615 self.get_database_by_id(db_id)?
616 .get_schema_by_id(schema_id)
617 .ok_or_else(|| CatalogError::NotFound("schema_id", schema_id.to_string()))
618 }
619
620 pub fn first_valid_schema(
622 &self,
623 db_name: &str,
624 search_path: &SearchPath,
625 user_name: &str,
626 ) -> CatalogResult<&SchemaCatalog> {
627 for path in search_path.real_path() {
628 let mut schema_name: &str = path;
629 if schema_name == USER_NAME_WILD_CARD {
630 schema_name = user_name;
631 }
632
633 if let schema_catalog @ Ok(_) = self.get_schema_by_name(db_name, schema_name) {
634 return schema_catalog;
635 }
636 }
637 Err(CatalogError::NotFound(
638 "first valid schema",
639 "no schema has been selected to create in".to_owned(),
640 ))
641 }
642
643 pub fn get_source_by_id<'a>(
644 &self,
645 db_name: &'a str,
646 schema_path: SchemaPath<'a>,
647 source_id: SourceId,
648 ) -> CatalogResult<(&Arc<SourceCatalog>, &'a str)> {
649 schema_path
650 .try_find(|schema_name| {
651 Ok(self
652 .get_schema_by_name(db_name, schema_name)?
653 .get_source_by_id(source_id))
654 })?
655 .ok_or_else(|| CatalogError::NotFound("source", source_id.to_string()))
656 }
657
658 pub fn get_table_by_name<'a>(
659 &self,
660 db_name: &str,
661 schema_path: SchemaPath<'a>,
662 table_name: &str,
663 bind_creating: bool,
664 ) -> CatalogResult<(&Arc<TableCatalog>, &'a str)> {
665 schema_path
666 .try_find(|schema_name| {
667 Ok(self
668 .get_schema_by_name(db_name, schema_name)?
669 .get_table_by_name(table_name, bind_creating))
670 })?
671 .ok_or_else(|| CatalogError::NotFound("table", table_name.to_owned()))
672 }
673
674 pub fn get_any_table_by_name<'a>(
677 &self,
678 db_name: &str,
679 schema_path: SchemaPath<'a>,
680 table_name: &str,
681 ) -> CatalogResult<(&Arc<TableCatalog>, &'a str)> {
682 self.get_table_by_name(db_name, schema_path, table_name, true)
683 }
684
685 pub fn get_created_table_by_name<'a>(
688 &self,
689 db_name: &str,
690 schema_path: SchemaPath<'a>,
691 table_name: &str,
692 ) -> CatalogResult<(&Arc<TableCatalog>, &'a str)> {
693 self.get_table_by_name(db_name, schema_path, table_name, false)
694 }
695
696 pub fn get_any_table_by_id(&self, table_id: TableId) -> CatalogResult<&Arc<TableCatalog>> {
697 self.table_by_id
698 .get(&table_id)
699 .ok_or_else(|| CatalogError::NotFound("table id", table_id.to_string()))
700 }
701
702 pub fn get_created_table_by_id_with_db(
704 &self,
705 db_name: &str,
706 table_id: u32,
707 ) -> CatalogResult<&Arc<TableCatalog>> {
708 let table_id = TableId::from(table_id);
709 for schema in self.get_database_by_name(db_name)?.iter_schemas() {
710 if let Some(table) = schema.get_created_table_by_id(table_id) {
711 return Ok(table);
712 }
713 }
714 Err(CatalogError::NotFound("table id", table_id.to_string()))
715 }
716
717 pub fn iter_tables(&self) -> impl Iterator<Item = &Arc<TableCatalog>> {
718 self.table_by_id.values()
719 }
720
721 pub fn iter_backfilling_internal_tables(&self) -> impl Iterator<Item = &Arc<TableCatalog>> {
722 self.table_by_id
723 .values()
724 .filter(|t| t.is_internal_table() && !t.is_created())
725 }
726
727 pub fn alter_table_name_by_id(&mut self, table_id: TableId, table_name: &str) {
729 let mut found = false;
730 for database in self.database_by_name.values() {
731 if !found {
732 for schema in database.iter_schemas() {
733 if schema.iter_user_table().any(|t| t.id() == table_id) {
734 found = true;
735 break;
736 }
737 }
738 }
739 }
740
741 if found {
742 let mut table = self.get_any_table_by_id(table_id).unwrap().to_prost();
743 table.name = table_name.to_owned();
744 self.update_table(&table);
745 }
746 }
747
748 #[cfg(test)]
749 pub fn insert_table_id_mapping(&mut self, table_id: TableId, fragment_id: super::FragmentId) {
750 self.table_by_id.insert(
751 table_id,
752 Arc::new(TableCatalog {
753 fragment_id,
754 ..Default::default()
755 }),
756 );
757 }
758
759 pub fn get_sys_table_by_name(
760 &self,
761 db_name: &str,
762 schema_name: &str,
763 table_name: &str,
764 ) -> CatalogResult<&Arc<SystemTableCatalog>> {
765 self.get_schema_by_name(db_name, schema_name)?
766 .get_system_table_by_name(table_name)
767 .ok_or_else(|| CatalogError::NotFound("table", table_name.to_owned()))
768 }
769
770 pub fn get_source_by_name<'a>(
771 &self,
772 db_name: &str,
773 schema_path: SchemaPath<'a>,
774 source_name: &str,
775 ) -> CatalogResult<(&Arc<SourceCatalog>, &'a str)> {
776 schema_path
777 .try_find(|schema_name| {
778 Ok(self
779 .get_schema_by_name(db_name, schema_name)?
780 .get_source_by_name(source_name))
781 })?
782 .ok_or_else(|| CatalogError::NotFound("source", source_name.to_owned()))
783 }
784
785 pub fn get_sink_by_name<'a>(
786 &self,
787 db_name: &str,
788 schema_path: SchemaPath<'a>,
789 sink_name: &str,
790 bind_creating: bool,
791 ) -> CatalogResult<(&Arc<SinkCatalog>, &'a str)> {
792 schema_path
793 .try_find(|schema_name| {
794 Ok(self
795 .get_schema_by_name(db_name, schema_name)?
796 .get_sink_by_name(sink_name, bind_creating))
797 })?
798 .ok_or_else(|| CatalogError::NotFound("sink", sink_name.to_owned()))
799 }
800
801 pub fn get_any_sink_by_name<'a>(
802 &self,
803 db_name: &str,
804 schema_path: SchemaPath<'a>,
805 sink_name: &str,
806 ) -> CatalogResult<(&Arc<SinkCatalog>, &'a str)> {
807 self.get_sink_by_name(db_name, schema_path, sink_name, true)
808 }
809
810 pub fn get_created_sink_by_name<'a>(
811 &self,
812 db_name: &str,
813 schema_path: SchemaPath<'a>,
814 sink_name: &str,
815 ) -> CatalogResult<(&Arc<SinkCatalog>, &'a str)> {
816 self.get_sink_by_name(db_name, schema_path, sink_name, false)
817 }
818
819 pub fn get_subscription_by_name<'a>(
820 &self,
821 db_name: &str,
822 schema_path: SchemaPath<'a>,
823 subscription_name: &str,
824 ) -> CatalogResult<(&Arc<SubscriptionCatalog>, &'a str)> {
825 schema_path
826 .try_find(|schema_name| {
827 Ok(self
828 .get_schema_by_name(db_name, schema_name)?
829 .get_subscription_by_name(subscription_name))
830 })?
831 .ok_or_else(|| CatalogError::NotFound("subscription", subscription_name.to_owned()))
832 }
833
834 pub fn get_index_by_name<'a>(
835 &self,
836 db_name: &str,
837 schema_path: SchemaPath<'a>,
838 index_name: &str,
839 ) -> CatalogResult<(&Arc<IndexCatalog>, &'a str)> {
840 schema_path
841 .try_find(|schema_name| {
842 Ok(self
843 .get_schema_by_name(db_name, schema_name)?
844 .get_created_index_by_name(index_name))
845 })?
846 .ok_or_else(|| CatalogError::NotFound("index", index_name.to_owned()))
847 }
848
849 pub fn get_any_index_by_name<'a>(
850 &self,
851 db_name: &str,
852 schema_path: SchemaPath<'a>,
853 index_name: &str,
854 ) -> CatalogResult<(&Arc<IndexCatalog>, &'a str)> {
855 schema_path
856 .try_find(|schema_name| {
857 Ok(self
858 .get_schema_by_name(db_name, schema_name)?
859 .get_any_index_by_name(index_name))
860 })?
861 .ok_or_else(|| CatalogError::NotFound("index", index_name.to_owned()))
862 }
863
864 pub fn get_index_by_id(
865 &self,
866 db_name: &str,
867 index_id: u32,
868 ) -> CatalogResult<&Arc<IndexCatalog>> {
869 let index_id = IndexId::from(index_id);
870 for schema in self.get_database_by_name(db_name)?.iter_schemas() {
871 if let Some(index) = schema.get_index_by_id(index_id) {
872 return Ok(index);
873 }
874 }
875 Err(CatalogError::NotFound("index", index_id.to_string()))
876 }
877
878 pub fn get_view_by_name<'a>(
879 &self,
880 db_name: &str,
881 schema_path: SchemaPath<'a>,
882 view_name: &str,
883 ) -> CatalogResult<(&Arc<ViewCatalog>, &'a str)> {
884 schema_path
885 .try_find(|schema_name| {
886 Ok(self
887 .get_schema_by_name(db_name, schema_name)?
888 .get_view_by_name(view_name))
889 })?
890 .ok_or_else(|| CatalogError::NotFound("view", view_name.to_owned()))
891 }
892
893 pub fn get_view_by_id(&self, db_name: &str, view_id: u32) -> CatalogResult<Arc<ViewCatalog>> {
894 for schema in self.get_database_by_name(db_name)?.iter_schemas() {
895 if let Some(view) = schema.get_view_by_id(ViewId::from(view_id)) {
896 return Ok(view.clone());
897 }
898 }
899 Err(CatalogError::NotFound("view", view_id.to_string()))
900 }
901
902 pub fn get_secret_by_name<'a>(
903 &self,
904 db_name: &str,
905 schema_path: SchemaPath<'a>,
906 secret_name: &str,
907 ) -> CatalogResult<(&Arc<SecretCatalog>, &'a str)> {
908 schema_path
909 .try_find(|schema_name| {
910 Ok(self
911 .get_schema_by_name(db_name, schema_name)?
912 .get_secret_by_name(secret_name))
913 })?
914 .ok_or_else(|| CatalogError::NotFound("secret", secret_name.to_owned()))
915 }
916
917 pub fn get_connection_by_id(
918 &self,
919 db_name: &str,
920 connection_id: ConnectionId,
921 ) -> CatalogResult<&Arc<ConnectionCatalog>> {
922 for schema in self.get_database_by_name(db_name)?.iter_schemas() {
923 if let Some(conn) = schema.get_connection_by_id(connection_id) {
924 return Ok(conn);
925 }
926 }
927 Err(CatalogError::NotFound(
928 "connection",
929 connection_id.to_string(),
930 ))
931 }
932
933 pub fn get_connection_by_name<'a>(
934 &self,
935 db_name: &str,
936 schema_path: SchemaPath<'a>,
937 connection_name: &str,
938 ) -> CatalogResult<(&Arc<ConnectionCatalog>, &'a str)> {
939 schema_path
940 .try_find(|schema_name| {
941 Ok(self
942 .get_schema_by_name(db_name, schema_name)?
943 .get_connection_by_name(connection_name))
944 })?
945 .ok_or_else(|| CatalogError::NotFound("connection", connection_name.to_owned()))
946 }
947
948 pub fn get_function_by_name_inputs<'a>(
949 &self,
950 db_name: &str,
951 schema_path: SchemaPath<'a>,
952 function_name: &str,
953 inputs: &mut [ExprImpl],
954 ) -> CatalogResult<(&Arc<FunctionCatalog>, &'a str)> {
955 schema_path
956 .try_find(|schema_name| {
957 Ok(self
958 .get_schema_by_name(db_name, schema_name)?
959 .get_function_by_name_inputs(function_name, inputs))
960 })?
961 .ok_or_else(|| {
962 CatalogError::NotFound(
963 "function",
964 format!(
965 "{}({})",
966 function_name,
967 inputs
968 .iter()
969 .map(|a| a.return_type().to_string())
970 .join(", ")
971 ),
972 )
973 })
974 }
975
976 pub fn get_function_by_name_args<'a>(
977 &self,
978 db_name: &str,
979 schema_path: SchemaPath<'a>,
980 function_name: &str,
981 args: &[DataType],
982 ) -> CatalogResult<(&Arc<FunctionCatalog>, &'a str)> {
983 schema_path
984 .try_find(|schema_name| {
985 Ok(self
986 .get_schema_by_name(db_name, schema_name)?
987 .get_function_by_name_args(function_name, args))
988 })?
989 .ok_or_else(|| {
990 CatalogError::NotFound(
991 "function",
992 format!(
993 "{}({})",
994 function_name,
995 args.iter().map(|a| a.to_string()).join(", ")
996 ),
997 )
998 })
999 }
1000
1001 pub fn get_functions_by_name<'a>(
1003 &self,
1004 db_name: &str,
1005 schema_path: SchemaPath<'a>,
1006 function_name: &str,
1007 ) -> CatalogResult<(Vec<&Arc<FunctionCatalog>>, &'a str)> {
1008 schema_path
1009 .try_find(|schema_name| {
1010 Ok(self
1011 .get_schema_by_name(db_name, schema_name)?
1012 .get_functions_by_name(function_name))
1013 })?
1014 .ok_or_else(|| CatalogError::NotFound("function", function_name.to_owned()))
1015 }
1016
1017 pub fn check_relation_name_duplicated(
1019 &self,
1020 db_name: &str,
1021 schema_name: &str,
1022 relation_name: &str,
1023 ) -> CatalogResult<()> {
1024 let schema = self.get_schema_by_name(db_name, schema_name)?;
1025
1026 if let Some(table) = schema.get_any_table_by_name(relation_name) {
1027 let is_creating = table.stream_job_status == StreamJobStatus::Creating;
1028 if table.is_index() {
1029 Err(CatalogError::Duplicated(
1030 "index",
1031 relation_name.to_owned(),
1032 is_creating,
1033 ))
1034 } else if table.is_mview() {
1035 Err(CatalogError::Duplicated(
1036 "materialized view",
1037 relation_name.to_owned(),
1038 is_creating,
1039 ))
1040 } else {
1041 Err(CatalogError::Duplicated(
1042 "table",
1043 relation_name.to_owned(),
1044 is_creating,
1045 ))
1046 }
1047 } else if schema.get_source_by_name(relation_name).is_some() {
1048 Err(CatalogError::duplicated("source", relation_name.to_owned()))
1049 } else if let Some(sink) = schema.get_any_sink_by_name(relation_name) {
1050 Err(CatalogError::Duplicated(
1051 "sink",
1052 relation_name.to_owned(),
1053 !sink.is_created(),
1054 ))
1055 } else if schema.get_view_by_name(relation_name).is_some() {
1056 Err(CatalogError::duplicated("view", relation_name.to_owned()))
1057 } else if let Some(subscription) = schema.get_subscription_by_name(relation_name) {
1058 let is_not_created = subscription.subscription_state != SubscriptionState::Created;
1059 Err(CatalogError::Duplicated(
1060 "subscription",
1061 relation_name.to_owned(),
1062 is_not_created,
1063 ))
1064 } else {
1065 Ok(())
1066 }
1067 }
1068
1069 pub fn check_function_name_duplicated(
1070 &self,
1071 db_name: &str,
1072 schema_name: &str,
1073 function_name: &str,
1074 arg_types: &[DataType],
1075 ) -> CatalogResult<()> {
1076 let schema = self.get_schema_by_name(db_name, schema_name)?;
1077
1078 if schema
1079 .get_function_by_name_args(function_name, arg_types)
1080 .is_some()
1081 {
1082 let name = format!(
1083 "{function_name}({})",
1084 arg_types.iter().map(|t| t.to_string()).join(",")
1085 );
1086 Err(CatalogError::duplicated("function", name))
1087 } else {
1088 Ok(())
1089 }
1090 }
1091
1092 pub fn check_connection_name_duplicated(
1094 &self,
1095 db_name: &str,
1096 schema_name: &str,
1097 connection_name: &str,
1098 ) -> CatalogResult<()> {
1099 let schema = self.get_schema_by_name(db_name, schema_name)?;
1100
1101 if schema.get_connection_by_name(connection_name).is_some() {
1102 Err(CatalogError::duplicated(
1103 "connection",
1104 connection_name.to_owned(),
1105 ))
1106 } else {
1107 Ok(())
1108 }
1109 }
1110
1111 pub fn check_secret_name_duplicated(
1112 &self,
1113 db_name: &str,
1114 schema_name: &str,
1115 secret_name: &str,
1116 ) -> CatalogResult<()> {
1117 let schema = self.get_schema_by_name(db_name, schema_name)?;
1118
1119 if schema.get_secret_by_name(secret_name).is_some() {
1120 Err(CatalogError::duplicated("secret", secret_name.to_owned()))
1121 } else {
1122 Ok(())
1123 }
1124 }
1125
1126 pub fn table_stats(&self) -> &HummockVersionStats {
1127 &self.table_stats
1128 }
1129
1130 pub fn set_table_stats(&mut self, table_stats: HummockVersionStats) {
1131 self.table_stats = table_stats;
1132 }
1133
1134 pub fn get_all_indexes_related_to_object(
1135 &self,
1136 db_id: DatabaseId,
1137 schema_id: SchemaId,
1138 mv_id: TableId,
1139 ) -> Vec<Arc<IndexCatalog>> {
1140 self.get_database_by_id(db_id)
1141 .unwrap()
1142 .get_schema_by_id(schema_id)
1143 .unwrap()
1144 .get_any_indexes_by_table_id(mv_id)
1145 }
1146
1147 pub fn get_id_by_class_name(
1148 &self,
1149 db_name: &str,
1150 schema_path: SchemaPath<'_>,
1151 class_name: &str,
1152 ) -> CatalogResult<ObjectId> {
1153 schema_path
1154 .try_find(|schema_name| {
1155 let schema = self.get_schema_by_name(db_name, schema_name)?;
1156 #[allow(clippy::manual_map)]
1157 if let Some(item) = schema.get_system_table_by_name(class_name) {
1158 Ok(Some(item.id().as_object_id()))
1159 } else if let Some(item) = schema.get_any_table_by_name(class_name) {
1160 Ok(Some(item.id().as_object_id()))
1161 } else if let Some(item) = schema.get_any_index_by_name(class_name) {
1162 Ok(Some(item.id.as_object_id()))
1163 } else if let Some(item) = schema.get_source_by_name(class_name) {
1164 Ok(Some(item.id.as_object_id()))
1165 } else if let Some(item) = schema.get_view_by_name(class_name) {
1166 Ok(Some(item.id.as_object_id()))
1167 } else if let Some(item) = schema.get_any_sink_by_name(class_name) {
1168 Ok(Some(item.id.as_object_id()))
1169 } else {
1170 Ok(None)
1171 }
1172 })?
1173 .map(|(id, _)| id)
1174 .ok_or_else(|| CatalogError::NotFound("class", class_name.to_owned()))
1175 }
1176}