risingwave_frontend/catalog/
root_catalog.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::Arc;
17
18use itertools::Itertools;
19use risingwave_common::catalog::{CatalogVersion, FunctionId, IndexId, StreamJobStatus, TableId};
20use risingwave_common::session_config::{SearchPath, USER_NAME_WILD_CARD};
21use risingwave_common::types::DataType;
22use risingwave_connector::sink::catalog::SinkCatalog;
23use risingwave_pb::catalog::{
24    PbConnection, PbDatabase, PbFunction, PbIndex, PbSchema, PbSecret, PbSink, PbSource,
25    PbSubscription, PbTable, PbView,
26};
27use risingwave_pb::hummock::HummockVersionStats;
28
29use super::function_catalog::FunctionCatalog;
30use super::source_catalog::SourceCatalog;
31use super::subscription_catalog::{SubscriptionCatalog, SubscriptionState};
32use super::view_catalog::ViewCatalog;
33use super::{
34    CatalogError, CatalogResult, ConnectionId, SecretId, SinkId, SourceId, SubscriptionId, ViewId,
35};
36use crate::catalog::connection_catalog::ConnectionCatalog;
37use crate::catalog::database_catalog::DatabaseCatalog;
38use crate::catalog::schema_catalog::SchemaCatalog;
39use crate::catalog::secret_catalog::SecretCatalog;
40use crate::catalog::system_catalog::{
41    SystemTableCatalog, get_sys_tables_in_schema, get_sys_views_in_schema,
42};
43use crate::catalog::table_catalog::TableCatalog;
44use crate::catalog::{DatabaseId, IndexCatalog, SchemaId};
45use crate::expr::{Expr, ExprImpl};
46
47#[derive(Copy, Clone)]
48pub enum SchemaPath<'a> {
49    Name(&'a str),
50    /// (`search_path`, `user_name`).
51    Path(&'a SearchPath, &'a str),
52}
53
54impl<'a> SchemaPath<'a> {
55    pub fn new(
56        schema_name: Option<&'a str>,
57        search_path: &'a SearchPath,
58        user_name: &'a str,
59    ) -> Self {
60        match schema_name {
61            Some(schema_name) => SchemaPath::Name(schema_name),
62            None => SchemaPath::Path(search_path, user_name),
63        }
64    }
65
66    /// Call function `f` for each schema name. Return the first `Some` result.
67    pub fn try_find<T, E>(
68        &self,
69        mut f: impl FnMut(&str) -> Result<Option<T>, E>,
70    ) -> Result<Option<(T, &'a str)>, E> {
71        match self {
72            SchemaPath::Name(schema_name) => Ok(f(schema_name)?.map(|t| (t, *schema_name))),
73            SchemaPath::Path(search_path, user_name) => {
74                for schema_name in search_path.path() {
75                    let mut schema_name: &str = schema_name;
76                    if schema_name == USER_NAME_WILD_CARD {
77                        schema_name = user_name;
78                    }
79                    if let Ok(Some(res)) = f(schema_name) {
80                        return Ok(Some((res, schema_name)));
81                    }
82                }
83                Ok(None)
84            }
85        }
86    }
87}
88
89/// Root catalog of database catalog. It manages all database/schema/table in memory on frontend.
90/// It is protected by a `RwLock`. Only [`crate::observer::FrontendObserverNode`]
91/// will acquire the write lock and sync it with the meta catalog. In other situations, it is
92/// read only.
93///
94/// - catalog (root catalog)
95///   - database catalog
96///     - schema catalog
97///       - function catalog (i.e., user defined function)
98///       - table/sink/source/index/view catalog
99///        - column catalog
100pub struct Catalog {
101    version: CatalogVersion,
102    database_by_name: HashMap<String, DatabaseCatalog>,
103    db_name_by_id: HashMap<DatabaseId, String>,
104    /// all table catalogs in the cluster identified by universal unique table id.
105    table_by_id: HashMap<TableId, Arc<TableCatalog>>,
106    table_stats: HummockVersionStats,
107}
108
109#[expect(clippy::derivable_impls)]
110impl Default for Catalog {
111    fn default() -> Self {
112        Self {
113            version: 0,
114            database_by_name: HashMap::new(),
115            db_name_by_id: HashMap::new(),
116            table_by_id: HashMap::new(),
117            table_stats: HummockVersionStats::default(),
118        }
119    }
120}
121
122impl Catalog {
123    fn get_database_mut(&mut self, db_id: DatabaseId) -> Option<&mut DatabaseCatalog> {
124        let name = self.db_name_by_id.get(&db_id)?;
125        self.database_by_name.get_mut(name)
126    }
127
128    pub fn clear(&mut self) {
129        self.database_by_name.clear();
130        self.db_name_by_id.clear();
131        self.table_by_id.clear();
132    }
133
134    pub fn create_database(&mut self, db: &PbDatabase) {
135        let name = db.name.clone();
136        let id = db.id;
137
138        self.database_by_name
139            .try_insert(name.clone(), db.into())
140            .unwrap();
141        self.db_name_by_id.try_insert(id, name).unwrap();
142    }
143
144    pub fn create_schema(&mut self, proto: &PbSchema) {
145        self.get_database_mut(proto.database_id)
146            .unwrap()
147            .create_schema(proto);
148
149        for sys_table in get_sys_tables_in_schema(proto.name.as_str()) {
150            self.get_database_mut(proto.database_id)
151                .unwrap()
152                .get_schema_mut(proto.id)
153                .unwrap()
154                .create_sys_table(sys_table);
155        }
156        for mut sys_view in get_sys_views_in_schema(proto.name.as_str()) {
157            sys_view.database_id = proto.database_id;
158            sys_view.schema_id = proto.id;
159            self.get_database_mut(proto.database_id)
160                .unwrap()
161                .get_schema_mut(proto.id)
162                .unwrap()
163                .create_sys_view(Arc::new(sys_view));
164        }
165    }
166
167    pub fn create_table(&mut self, proto: &PbTable) {
168        let table = self
169            .get_database_mut(proto.database_id)
170            .unwrap()
171            .get_schema_mut(proto.schema_id)
172            .unwrap()
173            .create_table(proto);
174        self.table_by_id.insert(proto.id.into(), table);
175    }
176
177    pub fn create_index(&mut self, proto: &PbIndex) {
178        self.get_database_mut(proto.database_id)
179            .unwrap()
180            .get_schema_mut(proto.schema_id)
181            .unwrap()
182            .create_index(proto);
183    }
184
185    pub fn create_source(&mut self, proto: &PbSource) {
186        self.get_database_mut(proto.database_id)
187            .unwrap()
188            .get_schema_mut(proto.schema_id)
189            .unwrap()
190            .create_source(proto);
191    }
192
193    pub fn create_sink(&mut self, proto: &PbSink) {
194        self.get_database_mut(proto.database_id)
195            .unwrap()
196            .get_schema_mut(proto.schema_id)
197            .unwrap()
198            .create_sink(proto);
199    }
200
201    pub fn create_subscription(&mut self, proto: &PbSubscription) {
202        self.get_database_mut(proto.database_id)
203            .unwrap()
204            .get_schema_mut(proto.schema_id)
205            .unwrap()
206            .create_subscription(proto);
207    }
208
209    pub fn create_secret(&mut self, proto: &PbSecret) {
210        self.get_database_mut(proto.database_id)
211            .unwrap()
212            .get_schema_mut(proto.schema_id)
213            .unwrap()
214            .create_secret(proto);
215    }
216
217    pub fn create_view(&mut self, proto: &PbView) {
218        self.get_database_mut(proto.database_id)
219            .unwrap()
220            .get_schema_mut(proto.schema_id)
221            .unwrap()
222            .create_view(proto);
223    }
224
225    pub fn create_function(&mut self, proto: &PbFunction) {
226        self.get_database_mut(proto.database_id)
227            .unwrap()
228            .get_schema_mut(proto.schema_id)
229            .unwrap()
230            .create_function(proto);
231    }
232
233    pub fn create_connection(&mut self, proto: &PbConnection) {
234        self.get_database_mut(proto.database_id)
235            .unwrap()
236            .get_schema_mut(proto.schema_id)
237            .unwrap()
238            .create_connection(proto);
239    }
240
241    pub fn drop_connection(
242        &mut self,
243        db_id: DatabaseId,
244        schema_id: SchemaId,
245        connection_id: ConnectionId,
246    ) {
247        self.get_database_mut(db_id)
248            .unwrap()
249            .get_schema_mut(schema_id)
250            .unwrap()
251            .drop_connection(connection_id);
252    }
253
254    pub fn update_connection(&mut self, proto: &PbConnection) {
255        let database = self.get_database_mut(proto.database_id).unwrap();
256        let schema = database.get_schema_mut(proto.schema_id).unwrap();
257        if schema.get_connection_by_id(&proto.id).is_some() {
258            schema.update_connection(proto);
259        } else {
260            // Enter this branch when schema is changed by `ALTER ... SET SCHEMA ...` statement.
261            schema.create_connection(proto);
262            database
263                .iter_schemas_mut()
264                .find(|schema| {
265                    schema.id() != proto.schema_id
266                        && schema.get_connection_by_id(&proto.id).is_some()
267                })
268                .unwrap()
269                .drop_connection(proto.id);
270        }
271    }
272
273    pub fn update_secret(&mut self, proto: &PbSecret) {
274        let database = self.get_database_mut(proto.database_id).unwrap();
275        let schema = database.get_schema_mut(proto.schema_id).unwrap();
276        let secret_id = SecretId::new(proto.id);
277        if schema.get_secret_by_id(&secret_id).is_some() {
278            schema.update_secret(proto);
279        } else {
280            // Enter this branch when schema is changed by `ALTER ... SET SCHEMA ...` statement.
281            schema.create_secret(proto);
282            database
283                .iter_schemas_mut()
284                .find(|schema| {
285                    schema.id() != proto.schema_id && schema.get_secret_by_id(&secret_id).is_some()
286                })
287                .unwrap()
288                .drop_secret(secret_id);
289        }
290    }
291
292    pub fn drop_database(&mut self, db_id: DatabaseId) {
293        let name = self.db_name_by_id.remove(&db_id).unwrap();
294        let database = self.database_by_name.remove(&name).unwrap();
295        database.iter_all_table_ids().for_each(|table| {
296            self.table_by_id.remove(&table);
297        });
298    }
299
300    pub fn drop_schema(&mut self, db_id: DatabaseId, schema_id: SchemaId) {
301        self.get_database_mut(db_id).unwrap().drop_schema(schema_id);
302    }
303
304    pub fn drop_table(&mut self, db_id: DatabaseId, schema_id: SchemaId, tb_id: TableId) {
305        self.table_by_id.remove(&tb_id);
306        self.get_database_mut(db_id)
307            .unwrap()
308            .get_schema_mut(schema_id)
309            .unwrap()
310            .drop_table(tb_id);
311    }
312
313    pub fn update_table(&mut self, proto: &PbTable) {
314        let database = self.get_database_mut(proto.database_id).unwrap();
315        let schema = database.get_schema_mut(proto.schema_id).unwrap();
316        let table = if schema.get_table_by_id(&proto.id.into()).is_some() {
317            schema.update_table(proto)
318        } else {
319            // Enter this branch when schema is changed by `ALTER ... SET SCHEMA ...` statement.
320            let new_table = schema.create_table(proto);
321            database
322                .iter_schemas_mut()
323                .find(|schema| {
324                    schema.id() != proto.schema_id
325                        && schema.get_created_table_by_id(&proto.id.into()).is_some()
326                })
327                .unwrap()
328                .drop_table(proto.id.into());
329            new_table
330        };
331
332        self.table_by_id.insert(proto.id.into(), table);
333    }
334
335    pub fn update_database(&mut self, proto: &PbDatabase) {
336        let id = proto.id;
337        let name = proto.name.clone();
338
339        let old_database_name = self.db_name_by_id.get(&id).unwrap().to_owned();
340        if old_database_name != name {
341            let mut database = self.database_by_name.remove(&old_database_name).unwrap();
342            database.name.clone_from(&name);
343            database.owner = proto.owner;
344            self.database_by_name.insert(name.clone(), database);
345            self.db_name_by_id.insert(id, name);
346        } else {
347            let database = self.get_database_mut(id).unwrap();
348            database.name = name;
349            database.owner = proto.owner;
350            database.barrier_interval_ms = proto.barrier_interval_ms;
351            database.checkpoint_frequency = proto.checkpoint_frequency;
352        }
353    }
354
355    pub fn update_schema(&mut self, proto: &PbSchema) {
356        self.get_database_mut(proto.database_id)
357            .unwrap()
358            .update_schema(proto);
359    }
360
361    pub fn update_index(&mut self, proto: &PbIndex) {
362        let database = self.get_database_mut(proto.database_id).unwrap();
363        let schema = database.get_schema_mut(proto.schema_id).unwrap();
364        if schema.get_index_by_id(&proto.id.into()).is_some() {
365            schema.update_index(proto);
366        } else {
367            // Enter this branch when schema is changed by `ALTER ... SET SCHEMA ...` statement.
368            schema.create_index(proto);
369            database
370                .iter_schemas_mut()
371                .find(|schema| {
372                    schema.id() != proto.schema_id
373                        && schema.get_index_by_id(&proto.id.into()).is_some()
374                })
375                .unwrap()
376                .drop_index(proto.id.into());
377        }
378    }
379
380    pub fn drop_source(&mut self, db_id: DatabaseId, schema_id: SchemaId, source_id: SourceId) {
381        self.get_database_mut(db_id)
382            .unwrap()
383            .get_schema_mut(schema_id)
384            .unwrap()
385            .drop_source(source_id);
386    }
387
388    pub fn update_source(&mut self, proto: &PbSource) {
389        let database = self.get_database_mut(proto.database_id).unwrap();
390        let schema = database.get_schema_mut(proto.schema_id).unwrap();
391        if schema.get_source_by_id(&proto.id).is_some() {
392            schema.update_source(proto);
393        } else {
394            // Enter this branch when schema is changed by `ALTER ... SET SCHEMA ...` statement.
395            schema.create_source(proto);
396            database
397                .iter_schemas_mut()
398                .find(|schema| {
399                    schema.id() != proto.schema_id && schema.get_source_by_id(&proto.id).is_some()
400                })
401                .unwrap()
402                .drop_source(proto.id);
403        }
404    }
405
406    pub fn drop_sink(&mut self, db_id: DatabaseId, schema_id: SchemaId, sink_id: SinkId) {
407        self.get_database_mut(db_id)
408            .unwrap()
409            .get_schema_mut(schema_id)
410            .unwrap()
411            .drop_sink(sink_id);
412    }
413
414    pub fn drop_secret(&mut self, db_id: DatabaseId, schema_id: SchemaId, secret_id: SecretId) {
415        self.get_database_mut(db_id)
416            .unwrap()
417            .get_schema_mut(schema_id)
418            .unwrap()
419            .drop_secret(secret_id);
420    }
421
422    pub fn update_sink(&mut self, proto: &PbSink) {
423        let database = self.get_database_mut(proto.database_id).unwrap();
424        let schema = database.get_schema_mut(proto.schema_id).unwrap();
425        if schema.get_sink_by_id(&proto.id).is_some() {
426            schema.update_sink(proto);
427        } else {
428            // Enter this branch when schema is changed by `ALTER ... SET SCHEMA ...` statement.
429            schema.create_sink(proto);
430            database
431                .iter_schemas_mut()
432                .find(|schema| {
433                    schema.id() != proto.schema_id && schema.get_sink_by_id(&proto.id).is_some()
434                })
435                .unwrap()
436                .drop_sink(proto.id);
437        }
438    }
439
440    pub fn drop_subscription(
441        &mut self,
442        db_id: DatabaseId,
443        schema_id: SchemaId,
444        subscription_id: SubscriptionId,
445    ) {
446        self.get_database_mut(db_id)
447            .unwrap()
448            .get_schema_mut(schema_id)
449            .unwrap()
450            .drop_subscription(subscription_id);
451    }
452
453    pub fn update_subscription(&mut self, proto: &PbSubscription) {
454        let database = self.get_database_mut(proto.database_id).unwrap();
455        let schema = database.get_schema_mut(proto.schema_id).unwrap();
456        if schema.get_subscription_by_id(&proto.id).is_some() {
457            schema.update_subscription(proto);
458        } else {
459            // Enter this branch when schema is changed by `ALTER ... SET SCHEMA ...` statement.
460            schema.create_subscription(proto);
461            database
462                .iter_schemas_mut()
463                .find(|schema| {
464                    schema.id() != proto.schema_id
465                        && schema.get_subscription_by_id(&proto.id).is_some()
466                })
467                .unwrap()
468                .drop_subscription(proto.id);
469        }
470    }
471
472    pub fn drop_index(&mut self, db_id: DatabaseId, schema_id: SchemaId, index_id: IndexId) {
473        self.get_database_mut(db_id)
474            .unwrap()
475            .get_schema_mut(schema_id)
476            .unwrap()
477            .drop_index(index_id);
478    }
479
480    pub fn drop_view(&mut self, db_id: DatabaseId, schema_id: SchemaId, view_id: ViewId) {
481        self.get_database_mut(db_id)
482            .unwrap()
483            .get_schema_mut(schema_id)
484            .unwrap()
485            .drop_view(view_id);
486    }
487
488    pub fn update_view(&mut self, proto: &PbView) {
489        let database = self.get_database_mut(proto.database_id).unwrap();
490        let schema = database.get_schema_mut(proto.schema_id).unwrap();
491        if schema.get_view_by_id(&proto.id).is_some() {
492            schema.update_view(proto);
493        } else {
494            // Enter this branch when schema is changed by `ALTER ... SET SCHEMA ...` statement.
495            schema.create_view(proto);
496            database
497                .iter_schemas_mut()
498                .find(|schema| {
499                    schema.id() != proto.schema_id && schema.get_view_by_id(&proto.id).is_some()
500                })
501                .unwrap()
502                .drop_view(proto.id);
503        }
504    }
505
506    pub fn drop_function(
507        &mut self,
508        db_id: DatabaseId,
509        schema_id: SchemaId,
510        function_id: FunctionId,
511    ) {
512        self.get_database_mut(db_id)
513            .unwrap()
514            .get_schema_mut(schema_id)
515            .unwrap()
516            .drop_function(function_id);
517    }
518
519    pub fn update_function(&mut self, proto: &PbFunction) {
520        let database = self.get_database_mut(proto.database_id).unwrap();
521        let schema = database.get_schema_mut(proto.schema_id).unwrap();
522        if schema.get_function_by_id(proto.id.into()).is_some() {
523            schema.update_function(proto);
524        } else {
525            // Enter this branch when schema is changed by `ALTER ... SET SCHEMA ...` statement.
526            schema.create_function(proto);
527            database
528                .iter_schemas_mut()
529                .find(|schema| {
530                    schema.id() != proto.schema_id
531                        && schema.get_function_by_id(proto.id.into()).is_some()
532                })
533                .unwrap()
534                .drop_function(proto.id.into());
535        }
536
537        self.get_database_mut(proto.database_id)
538            .unwrap()
539            .get_schema_mut(proto.schema_id)
540            .unwrap()
541            .update_function(proto);
542    }
543
544    pub fn get_database_by_name(&self, db_name: &str) -> CatalogResult<&DatabaseCatalog> {
545        self.database_by_name
546            .get(db_name)
547            .ok_or_else(|| CatalogError::NotFound("database", db_name.to_owned()))
548    }
549
550    pub fn get_database_by_id(&self, db_id: &DatabaseId) -> CatalogResult<&DatabaseCatalog> {
551        let db_name = self
552            .db_name_by_id
553            .get(db_id)
554            .ok_or_else(|| CatalogError::NotFound("db_id", db_id.to_string()))?;
555        self.database_by_name
556            .get(db_name)
557            .ok_or_else(|| CatalogError::NotFound("database", db_name.to_string()))
558    }
559
560    pub fn get_all_schema_names(&self, db_name: &str) -> CatalogResult<Vec<String>> {
561        Ok(self.get_database_by_name(db_name)?.get_all_schema_names())
562    }
563
564    pub fn iter_schemas(
565        &self,
566        db_name: &str,
567    ) -> CatalogResult<impl Iterator<Item = &SchemaCatalog>> {
568        Ok(self.get_database_by_name(db_name)?.iter_schemas())
569    }
570
571    pub fn get_all_database_names(&self) -> Vec<String> {
572        self.database_by_name.keys().cloned().collect_vec()
573    }
574
575    pub fn iter_databases(&self) -> impl Iterator<Item = &DatabaseCatalog> {
576        self.database_by_name.values()
577    }
578
579    pub fn get_schema_by_name(
580        &self,
581        db_name: &str,
582        schema_name: &str,
583    ) -> CatalogResult<&SchemaCatalog> {
584        self.get_database_by_name(db_name)?
585            .get_schema_by_name(schema_name)
586            .ok_or_else(|| CatalogError::NotFound("schema", schema_name.to_owned()))
587    }
588
589    pub fn get_table_name_by_id(&self, table_id: TableId) -> CatalogResult<String> {
590        self.get_any_table_by_id(&table_id)
591            .map(|table| table.name.clone())
592    }
593
594    pub fn get_schema_by_id(
595        &self,
596        db_id: &DatabaseId,
597        schema_id: &SchemaId,
598    ) -> CatalogResult<&SchemaCatalog> {
599        self.get_database_by_id(db_id)?
600            .get_schema_by_id(schema_id)
601            .ok_or_else(|| CatalogError::NotFound("schema_id", schema_id.to_string()))
602    }
603
604    /// Refer to [`SearchPath`].
605    pub fn first_valid_schema(
606        &self,
607        db_name: &str,
608        search_path: &SearchPath,
609        user_name: &str,
610    ) -> CatalogResult<&SchemaCatalog> {
611        for path in search_path.real_path() {
612            let mut schema_name: &str = path;
613            if schema_name == USER_NAME_WILD_CARD {
614                schema_name = user_name;
615            }
616
617            if let schema_catalog @ Ok(_) = self.get_schema_by_name(db_name, schema_name) {
618                return schema_catalog;
619            }
620        }
621        Err(CatalogError::NotFound(
622            "first valid schema",
623            "no schema has been selected to create in".to_owned(),
624        ))
625    }
626
627    pub fn get_source_by_id<'a>(
628        &self,
629        db_name: &'a str,
630        schema_path: SchemaPath<'a>,
631        source_id: &SourceId,
632    ) -> CatalogResult<(&Arc<SourceCatalog>, &'a str)> {
633        schema_path
634            .try_find(|schema_name| {
635                Ok(self
636                    .get_schema_by_name(db_name, schema_name)?
637                    .get_source_by_id(source_id))
638            })?
639            .ok_or_else(|| CatalogError::NotFound("source", source_id.to_string()))
640    }
641
642    pub fn get_table_by_name<'a>(
643        &self,
644        db_name: &str,
645        schema_path: SchemaPath<'a>,
646        table_name: &str,
647        bind_creating_relations: bool,
648    ) -> CatalogResult<(&Arc<TableCatalog>, &'a str)> {
649        schema_path
650            .try_find(|schema_name| {
651                Ok(self
652                    .get_schema_by_name(db_name, schema_name)?
653                    .get_table_by_name(table_name, bind_creating_relations))
654            })?
655            .ok_or_else(|| CatalogError::NotFound("table", table_name.to_owned()))
656    }
657
658    /// Used to get `TableCatalog` for Materialized Views, Tables and Indexes.
659    /// Retrieves all tables, created or creating.
660    pub fn get_any_table_by_name<'a>(
661        &self,
662        db_name: &str,
663        schema_path: SchemaPath<'a>,
664        table_name: &str,
665    ) -> CatalogResult<(&Arc<TableCatalog>, &'a str)> {
666        self.get_table_by_name(db_name, schema_path, table_name, true)
667    }
668
669    /// Used to get `TableCatalog` for Materialized Views, Tables and Indexes.
670    /// Retrieves only created tables.
671    pub fn get_created_table_by_name<'a>(
672        &self,
673        db_name: &str,
674        schema_path: SchemaPath<'a>,
675        table_name: &str,
676    ) -> CatalogResult<(&Arc<TableCatalog>, &'a str)> {
677        self.get_table_by_name(db_name, schema_path, table_name, false)
678    }
679
680    pub fn get_any_table_by_id(&self, table_id: &TableId) -> CatalogResult<&Arc<TableCatalog>> {
681        self.table_by_id
682            .get(table_id)
683            .ok_or_else(|| CatalogError::NotFound("table id", table_id.to_string()))
684    }
685
686    /// This function is similar to `get_table_by_id` expect that a table must be in a given database.
687    pub fn get_created_table_by_id_with_db(
688        &self,
689        db_name: &str,
690        table_id: u32,
691    ) -> CatalogResult<&Arc<TableCatalog>> {
692        let table_id = TableId::from(table_id);
693        for schema in self.get_database_by_name(db_name)?.iter_schemas() {
694            if let Some(table) = schema.get_created_table_by_id(&table_id) {
695                return Ok(table);
696            }
697        }
698        Err(CatalogError::NotFound("table id", table_id.to_string()))
699    }
700
701    pub fn iter_tables(&self) -> impl Iterator<Item = &Arc<TableCatalog>> {
702        self.table_by_id.values()
703    }
704
705    pub fn iter_backfilling_internal_tables(&self) -> impl Iterator<Item = &Arc<TableCatalog>> {
706        self.table_by_id
707            .values()
708            .filter(|t| t.is_internal_table() && !t.is_created())
709    }
710
711    // Used by test_utils only.
712    pub fn alter_table_name_by_id(&mut self, table_id: &TableId, table_name: &str) {
713        let mut found = false;
714        for database in self.database_by_name.values() {
715            if !found {
716                for schema in database.iter_schemas() {
717                    if schema.iter_user_table().any(|t| t.id() == *table_id) {
718                        found = true;
719                        break;
720                    }
721                }
722            }
723        }
724
725        if found {
726            let mut table = self.get_any_table_by_id(table_id).unwrap().to_prost();
727            table.name = table_name.to_owned();
728            self.update_table(&table);
729        }
730    }
731
732    #[cfg(test)]
733    pub fn insert_table_id_mapping(&mut self, table_id: TableId, fragment_id: super::FragmentId) {
734        self.table_by_id.insert(
735            table_id,
736            Arc::new(TableCatalog {
737                fragment_id,
738                ..Default::default()
739            }),
740        );
741    }
742
743    pub fn get_sys_table_by_name(
744        &self,
745        db_name: &str,
746        schema_name: &str,
747        table_name: &str,
748    ) -> CatalogResult<&Arc<SystemTableCatalog>> {
749        self.get_schema_by_name(db_name, schema_name)?
750            .get_system_table_by_name(table_name)
751            .ok_or_else(|| CatalogError::NotFound("table", table_name.to_owned()))
752    }
753
754    pub fn get_source_by_name<'a>(
755        &self,
756        db_name: &str,
757        schema_path: SchemaPath<'a>,
758        source_name: &str,
759    ) -> CatalogResult<(&Arc<SourceCatalog>, &'a str)> {
760        schema_path
761            .try_find(|schema_name| {
762                Ok(self
763                    .get_schema_by_name(db_name, schema_name)?
764                    .get_source_by_name(source_name))
765            })?
766            .ok_or_else(|| CatalogError::NotFound("source", source_name.to_owned()))
767    }
768
769    pub fn get_sink_by_name<'a>(
770        &self,
771        db_name: &str,
772        schema_path: SchemaPath<'a>,
773        sink_name: &str,
774    ) -> CatalogResult<(&Arc<SinkCatalog>, &'a str)> {
775        schema_path
776            .try_find(|schema_name| {
777                Ok(self
778                    .get_schema_by_name(db_name, schema_name)?
779                    .get_sink_by_name(sink_name))
780            })?
781            .ok_or_else(|| CatalogError::NotFound("sink", sink_name.to_owned()))
782    }
783
784    pub fn get_subscription_by_name<'a>(
785        &self,
786        db_name: &str,
787        schema_path: SchemaPath<'a>,
788        subscription_name: &str,
789    ) -> CatalogResult<(&Arc<SubscriptionCatalog>, &'a str)> {
790        schema_path
791            .try_find(|schema_name| {
792                Ok(self
793                    .get_schema_by_name(db_name, schema_name)?
794                    .get_subscription_by_name(subscription_name))
795            })?
796            .ok_or_else(|| CatalogError::NotFound("subscription", subscription_name.to_owned()))
797    }
798
799    pub fn get_index_by_name<'a>(
800        &self,
801        db_name: &str,
802        schema_path: SchemaPath<'a>,
803        index_name: &str,
804    ) -> CatalogResult<(&Arc<IndexCatalog>, &'a str)> {
805        schema_path
806            .try_find(|schema_name| {
807                Ok(self
808                    .get_schema_by_name(db_name, schema_name)?
809                    .get_index_by_name(index_name))
810            })?
811            .ok_or_else(|| CatalogError::NotFound("index", index_name.to_owned()))
812    }
813
814    pub fn get_index_by_id(
815        &self,
816        db_name: &str,
817        index_id: u32,
818    ) -> CatalogResult<&Arc<IndexCatalog>> {
819        let index_id = IndexId::from(index_id);
820        for schema in self.get_database_by_name(db_name)?.iter_schemas() {
821            if let Some(index) = schema.get_index_by_id(&index_id) {
822                return Ok(index);
823            }
824        }
825        Err(CatalogError::NotFound("index", index_id.to_string()))
826    }
827
828    pub fn get_view_by_name<'a>(
829        &self,
830        db_name: &str,
831        schema_path: SchemaPath<'a>,
832        view_name: &str,
833    ) -> CatalogResult<(&Arc<ViewCatalog>, &'a str)> {
834        schema_path
835            .try_find(|schema_name| {
836                Ok(self
837                    .get_schema_by_name(db_name, schema_name)?
838                    .get_view_by_name(view_name))
839            })?
840            .ok_or_else(|| CatalogError::NotFound("view", view_name.to_owned()))
841    }
842
843    pub fn get_view_by_id(&self, db_name: &str, view_id: u32) -> CatalogResult<Arc<ViewCatalog>> {
844        for schema in self.get_database_by_name(db_name)?.iter_schemas() {
845            if let Some(view) = schema.get_view_by_id(&ViewId::from(view_id)) {
846                return Ok(view.clone());
847            }
848        }
849        Err(CatalogError::NotFound("view", view_id.to_string()))
850    }
851
852    pub fn get_secret_by_name<'a>(
853        &self,
854        db_name: &str,
855        schema_path: SchemaPath<'a>,
856        secret_name: &str,
857    ) -> CatalogResult<(&Arc<SecretCatalog>, &'a str)> {
858        schema_path
859            .try_find(|schema_name| {
860                Ok(self
861                    .get_schema_by_name(db_name, schema_name)?
862                    .get_secret_by_name(secret_name))
863            })?
864            .ok_or_else(|| CatalogError::NotFound("secret", secret_name.to_owned()))
865    }
866
867    pub fn get_connection_by_name<'a>(
868        &self,
869        db_name: &str,
870        schema_path: SchemaPath<'a>,
871        connection_name: &str,
872    ) -> CatalogResult<(&Arc<ConnectionCatalog>, &'a str)> {
873        schema_path
874            .try_find(|schema_name| {
875                Ok(self
876                    .get_schema_by_name(db_name, schema_name)?
877                    .get_connection_by_name(connection_name))
878            })?
879            .ok_or_else(|| CatalogError::NotFound("connection", connection_name.to_owned()))
880    }
881
882    pub fn get_function_by_name_inputs<'a>(
883        &self,
884        db_name: &str,
885        schema_path: SchemaPath<'a>,
886        function_name: &str,
887        inputs: &mut [ExprImpl],
888    ) -> CatalogResult<(&Arc<FunctionCatalog>, &'a str)> {
889        schema_path
890            .try_find(|schema_name| {
891                Ok(self
892                    .get_schema_by_name(db_name, schema_name)?
893                    .get_function_by_name_inputs(function_name, inputs))
894            })?
895            .ok_or_else(|| {
896                CatalogError::NotFound(
897                    "function",
898                    format!(
899                        "{}({})",
900                        function_name,
901                        inputs
902                            .iter()
903                            .map(|a| a.return_type().to_string())
904                            .join(", ")
905                    ),
906                )
907            })
908    }
909
910    pub fn get_function_by_name_args<'a>(
911        &self,
912        db_name: &str,
913        schema_path: SchemaPath<'a>,
914        function_name: &str,
915        args: &[DataType],
916    ) -> CatalogResult<(&Arc<FunctionCatalog>, &'a str)> {
917        schema_path
918            .try_find(|schema_name| {
919                Ok(self
920                    .get_schema_by_name(db_name, schema_name)?
921                    .get_function_by_name_args(function_name, args))
922            })?
923            .ok_or_else(|| {
924                CatalogError::NotFound(
925                    "function",
926                    format!(
927                        "{}({})",
928                        function_name,
929                        args.iter().map(|a| a.to_string()).join(", ")
930                    ),
931                )
932            })
933    }
934
935    /// Gets all functions with the given name.
936    pub fn get_functions_by_name<'a>(
937        &self,
938        db_name: &str,
939        schema_path: SchemaPath<'a>,
940        function_name: &str,
941    ) -> CatalogResult<(Vec<&Arc<FunctionCatalog>>, &'a str)> {
942        schema_path
943            .try_find(|schema_name| {
944                Ok(self
945                    .get_schema_by_name(db_name, schema_name)?
946                    .get_functions_by_name(function_name))
947            })?
948            .ok_or_else(|| CatalogError::NotFound("function", function_name.to_owned()))
949    }
950
951    /// Check if the name duplicates with existing table, materialized view or source.
952    pub fn check_relation_name_duplicated(
953        &self,
954        db_name: &str,
955        schema_name: &str,
956        relation_name: &str,
957    ) -> CatalogResult<()> {
958        let schema = self.get_schema_by_name(db_name, schema_name)?;
959
960        if let Some(table) = schema.get_any_table_by_name(relation_name) {
961            let is_creating = table.stream_job_status == StreamJobStatus::Creating;
962            if table.is_index() {
963                Err(CatalogError::Duplicated(
964                    "index",
965                    relation_name.to_owned(),
966                    is_creating,
967                ))
968            } else if table.is_mview() {
969                Err(CatalogError::Duplicated(
970                    "materialized view",
971                    relation_name.to_owned(),
972                    is_creating,
973                ))
974            } else {
975                Err(CatalogError::Duplicated(
976                    "table",
977                    relation_name.to_owned(),
978                    is_creating,
979                ))
980            }
981        } else if schema.get_source_by_name(relation_name).is_some() {
982            Err(CatalogError::duplicated("source", relation_name.to_owned()))
983        } else if schema.get_sink_by_name(relation_name).is_some() {
984            Err(CatalogError::duplicated("sink", relation_name.to_owned()))
985        } else if schema.get_view_by_name(relation_name).is_some() {
986            Err(CatalogError::duplicated("view", relation_name.to_owned()))
987        } else if let Some(subscription) = schema.get_subscription_by_name(relation_name) {
988            let is_not_created = subscription.subscription_state != SubscriptionState::Created;
989            Err(CatalogError::Duplicated(
990                "subscription",
991                relation_name.to_owned(),
992                is_not_created,
993            ))
994        } else {
995            Ok(())
996        }
997    }
998
999    pub fn check_function_name_duplicated(
1000        &self,
1001        db_name: &str,
1002        schema_name: &str,
1003        function_name: &str,
1004        arg_types: &[DataType],
1005    ) -> CatalogResult<()> {
1006        let schema = self.get_schema_by_name(db_name, schema_name)?;
1007
1008        if schema
1009            .get_function_by_name_args(function_name, arg_types)
1010            .is_some()
1011        {
1012            let name = format!(
1013                "{function_name}({})",
1014                arg_types.iter().map(|t| t.to_string()).join(",")
1015            );
1016            Err(CatalogError::duplicated("function", name))
1017        } else {
1018            Ok(())
1019        }
1020    }
1021
1022    /// Check if the name duplicates with existing connection.
1023    pub fn check_connection_name_duplicated(
1024        &self,
1025        db_name: &str,
1026        schema_name: &str,
1027        connection_name: &str,
1028    ) -> CatalogResult<()> {
1029        let schema = self.get_schema_by_name(db_name, schema_name)?;
1030
1031        if schema.get_connection_by_name(connection_name).is_some() {
1032            Err(CatalogError::duplicated(
1033                "connection",
1034                connection_name.to_owned(),
1035            ))
1036        } else {
1037            Ok(())
1038        }
1039    }
1040
1041    pub fn check_secret_name_duplicated(
1042        &self,
1043        db_name: &str,
1044        schema_name: &str,
1045        secret_name: &str,
1046    ) -> CatalogResult<()> {
1047        let schema = self.get_schema_by_name(db_name, schema_name)?;
1048
1049        if schema.get_secret_by_name(secret_name).is_some() {
1050            Err(CatalogError::duplicated("secret", secret_name.to_owned()))
1051        } else {
1052            Ok(())
1053        }
1054    }
1055
1056    /// Get the catalog cache's catalog version.
1057    pub fn version(&self) -> u64 {
1058        self.version
1059    }
1060
1061    /// Set the catalog cache's catalog version.
1062    pub fn set_version(&mut self, catalog_version: CatalogVersion) {
1063        self.version = catalog_version;
1064    }
1065
1066    pub fn table_stats(&self) -> &HummockVersionStats {
1067        &self.table_stats
1068    }
1069
1070    pub fn set_table_stats(&mut self, table_stats: HummockVersionStats) {
1071        self.table_stats = table_stats;
1072    }
1073
1074    pub fn get_all_indexes_related_to_object(
1075        &self,
1076        db_id: DatabaseId,
1077        schema_id: SchemaId,
1078        mv_id: TableId,
1079    ) -> Vec<Arc<IndexCatalog>> {
1080        self.get_database_by_id(&db_id)
1081            .unwrap()
1082            .get_schema_by_id(&schema_id)
1083            .unwrap()
1084            .get_indexes_by_table_id(&mv_id)
1085    }
1086
1087    pub fn get_id_by_class_name(
1088        &self,
1089        db_name: &str,
1090        schema_path: SchemaPath<'_>,
1091        class_name: &str,
1092    ) -> CatalogResult<u32> {
1093        schema_path
1094            .try_find(|schema_name| {
1095                let schema = self.get_schema_by_name(db_name, schema_name)?;
1096                #[allow(clippy::manual_map)]
1097                if let Some(item) = schema.get_system_table_by_name(class_name) {
1098                    Ok(Some(item.id().into()))
1099                } else if let Some(item) = schema.get_created_table_by_name(class_name) {
1100                    Ok(Some(item.id().into()))
1101                } else if let Some(item) = schema.get_index_by_name(class_name) {
1102                    Ok(Some(item.id.into()))
1103                } else if let Some(item) = schema.get_source_by_name(class_name) {
1104                    Ok(Some(item.id))
1105                } else if let Some(item) = schema.get_view_by_name(class_name) {
1106                    Ok(Some(item.id))
1107                } else {
1108                    Ok(None)
1109                }
1110            })?
1111            .map(|(id, _)| id)
1112            .ok_or_else(|| CatalogError::NotFound("class", class_name.to_owned()))
1113    }
1114}