risingwave_frontend/catalog/
root_catalog.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::Arc;
17
18use itertools::Itertools;
19use risingwave_common::catalog::{CatalogVersion, FunctionId, IndexId, StreamJobStatus, TableId};
20use risingwave_common::session_config::{SearchPath, USER_NAME_WILD_CARD};
21use risingwave_common::types::DataType;
22use risingwave_connector::sink::catalog::SinkCatalog;
23use risingwave_pb::catalog::{
24    PbConnection, PbDatabase, PbFunction, PbIndex, PbSchema, PbSecret, PbSink, PbSource,
25    PbSubscription, PbTable, PbView,
26};
27use risingwave_pb::hummock::HummockVersionStats;
28
29use super::function_catalog::FunctionCatalog;
30use super::source_catalog::SourceCatalog;
31use super::subscription_catalog::{SubscriptionCatalog, SubscriptionState};
32use super::view_catalog::ViewCatalog;
33use super::{
34    CatalogError, CatalogResult, ConnectionId, SecretId, SinkId, SourceId, SubscriptionId, ViewId,
35};
36use crate::catalog::connection_catalog::ConnectionCatalog;
37use crate::catalog::database_catalog::DatabaseCatalog;
38use crate::catalog::schema_catalog::SchemaCatalog;
39use crate::catalog::secret_catalog::SecretCatalog;
40use crate::catalog::system_catalog::{
41    SystemTableCatalog, get_sys_tables_in_schema, get_sys_views_in_schema,
42};
43use crate::catalog::table_catalog::TableCatalog;
44use crate::catalog::{DatabaseId, IndexCatalog, SchemaId};
45use crate::expr::{Expr, ExprImpl};
46
47#[derive(Copy, Clone)]
48pub enum SchemaPath<'a> {
49    Name(&'a str),
50    /// (`search_path`, `user_name`).
51    Path(&'a SearchPath, &'a str),
52}
53
54impl<'a> SchemaPath<'a> {
55    pub fn new(
56        schema_name: Option<&'a str>,
57        search_path: &'a SearchPath,
58        user_name: &'a str,
59    ) -> Self {
60        match schema_name {
61            Some(schema_name) => SchemaPath::Name(schema_name),
62            None => SchemaPath::Path(search_path, user_name),
63        }
64    }
65
66    /// Call function `f` for each schema name. Return the first `Some` result.
67    pub fn try_find<T, E>(
68        &self,
69        mut f: impl FnMut(&str) -> Result<Option<T>, E>,
70    ) -> Result<Option<(T, &'a str)>, E> {
71        match self {
72            SchemaPath::Name(schema_name) => Ok(f(schema_name)?.map(|t| (t, *schema_name))),
73            SchemaPath::Path(search_path, user_name) => {
74                for schema_name in search_path.path() {
75                    let mut schema_name: &str = schema_name;
76                    if schema_name == USER_NAME_WILD_CARD {
77                        schema_name = user_name;
78                    }
79                    if let Ok(Some(res)) = f(schema_name) {
80                        return Ok(Some((res, schema_name)));
81                    }
82                }
83                Ok(None)
84            }
85        }
86    }
87}
88
89/// Root catalog of database catalog. It manages all database/schema/table in memory on frontend.
90/// It is protected by a `RwLock`. Only [`crate::observer::FrontendObserverNode`]
91/// will acquire the write lock and sync it with the meta catalog. In other situations, it is
92/// read only.
93///
94/// - catalog (root catalog)
95///   - database catalog
96///     - schema catalog
97///       - function catalog (i.e., user defined function)
98///       - table/sink/source/index/view catalog
99///        - column catalog
100pub struct Catalog {
101    version: CatalogVersion,
102    database_by_name: HashMap<String, DatabaseCatalog>,
103    db_name_by_id: HashMap<DatabaseId, String>,
104    /// all table catalogs in the cluster identified by universal unique table id.
105    table_by_id: HashMap<TableId, Arc<TableCatalog>>,
106    table_stats: HummockVersionStats,
107}
108
109#[expect(clippy::derivable_impls)]
110impl Default for Catalog {
111    fn default() -> Self {
112        Self {
113            version: 0,
114            database_by_name: HashMap::new(),
115            db_name_by_id: HashMap::new(),
116            table_by_id: HashMap::new(),
117            table_stats: HummockVersionStats::default(),
118        }
119    }
120}
121
122impl Catalog {
123    fn get_database_mut(&mut self, db_id: DatabaseId) -> Option<&mut DatabaseCatalog> {
124        let name = self.db_name_by_id.get(&db_id)?;
125        self.database_by_name.get_mut(name)
126    }
127
128    pub fn clear(&mut self) {
129        self.database_by_name.clear();
130        self.db_name_by_id.clear();
131        self.table_by_id.clear();
132    }
133
134    pub fn create_database(&mut self, db: &PbDatabase) {
135        let name = db.name.clone();
136        let id = db.id;
137
138        self.database_by_name
139            .try_insert(name.clone(), db.into())
140            .unwrap();
141        self.db_name_by_id.try_insert(id, name).unwrap();
142    }
143
144    pub fn create_schema(&mut self, proto: &PbSchema) {
145        self.get_database_mut(proto.database_id)
146            .unwrap()
147            .create_schema(proto);
148
149        for sys_table in get_sys_tables_in_schema(proto.name.as_str()) {
150            self.get_database_mut(proto.database_id)
151                .unwrap()
152                .get_schema_mut(proto.id)
153                .unwrap()
154                .create_sys_table(sys_table);
155        }
156        for mut sys_view in get_sys_views_in_schema(proto.name.as_str()) {
157            sys_view.database_id = proto.database_id;
158            sys_view.schema_id = proto.id;
159            self.get_database_mut(proto.database_id)
160                .unwrap()
161                .get_schema_mut(proto.id)
162                .unwrap()
163                .create_sys_view(Arc::new(sys_view));
164        }
165    }
166
167    pub fn create_table(&mut self, proto: &PbTable) {
168        let table = self
169            .get_database_mut(proto.database_id)
170            .unwrap()
171            .get_schema_mut(proto.schema_id)
172            .unwrap()
173            .create_table(proto);
174        self.table_by_id.insert(proto.id.into(), table);
175    }
176
177    pub fn create_index(&mut self, proto: &PbIndex) {
178        self.get_database_mut(proto.database_id)
179            .unwrap()
180            .get_schema_mut(proto.schema_id)
181            .unwrap()
182            .create_index(proto);
183    }
184
185    pub fn create_source(&mut self, proto: &PbSource) {
186        self.get_database_mut(proto.database_id)
187            .unwrap()
188            .get_schema_mut(proto.schema_id)
189            .unwrap()
190            .create_source(proto);
191    }
192
193    pub fn create_sink(&mut self, proto: &PbSink) {
194        self.get_database_mut(proto.database_id)
195            .unwrap()
196            .get_schema_mut(proto.schema_id)
197            .unwrap()
198            .create_sink(proto);
199    }
200
201    pub fn create_subscription(&mut self, proto: &PbSubscription) {
202        self.get_database_mut(proto.database_id)
203            .unwrap()
204            .get_schema_mut(proto.schema_id)
205            .unwrap()
206            .create_subscription(proto);
207    }
208
209    pub fn create_secret(&mut self, proto: &PbSecret) {
210        self.get_database_mut(proto.database_id)
211            .unwrap()
212            .get_schema_mut(proto.schema_id)
213            .unwrap()
214            .create_secret(proto);
215    }
216
217    pub fn create_view(&mut self, proto: &PbView) {
218        self.get_database_mut(proto.database_id)
219            .unwrap()
220            .get_schema_mut(proto.schema_id)
221            .unwrap()
222            .create_view(proto);
223    }
224
225    pub fn create_function(&mut self, proto: &PbFunction) {
226        self.get_database_mut(proto.database_id)
227            .unwrap()
228            .get_schema_mut(proto.schema_id)
229            .unwrap()
230            .create_function(proto);
231    }
232
233    pub fn create_connection(&mut self, proto: &PbConnection) {
234        self.get_database_mut(proto.database_id)
235            .unwrap()
236            .get_schema_mut(proto.schema_id)
237            .unwrap()
238            .create_connection(proto);
239    }
240
241    pub fn drop_connection(
242        &mut self,
243        db_id: DatabaseId,
244        schema_id: SchemaId,
245        connection_id: ConnectionId,
246    ) {
247        self.get_database_mut(db_id)
248            .unwrap()
249            .get_schema_mut(schema_id)
250            .unwrap()
251            .drop_connection(connection_id);
252    }
253
254    pub fn update_connection(&mut self, proto: &PbConnection) {
255        let database = self.get_database_mut(proto.database_id).unwrap();
256        let schema = database.get_schema_mut(proto.schema_id).unwrap();
257        if schema.get_connection_by_id(&proto.id).is_some() {
258            schema.update_connection(proto);
259        } else {
260            // Enter this branch when schema is changed by `ALTER ... SET SCHEMA ...` statement.
261            schema.create_connection(proto);
262            database
263                .iter_schemas_mut()
264                .find(|schema| {
265                    schema.id() != proto.schema_id
266                        && schema.get_connection_by_id(&proto.id).is_some()
267                })
268                .unwrap()
269                .drop_connection(proto.id);
270        }
271    }
272
273    pub fn update_secret(&mut self, proto: &PbSecret) {
274        let database = self.get_database_mut(proto.database_id).unwrap();
275        let schema = database.get_schema_mut(proto.schema_id).unwrap();
276        let secret_id = SecretId::new(proto.id);
277        if schema.get_secret_by_id(&secret_id).is_some() {
278            schema.update_secret(proto);
279        } else {
280            // Enter this branch when schema is changed by `ALTER ... SET SCHEMA ...` statement.
281            schema.create_secret(proto);
282            database
283                .iter_schemas_mut()
284                .find(|schema| {
285                    schema.id() != proto.schema_id && schema.get_secret_by_id(&secret_id).is_some()
286                })
287                .unwrap()
288                .drop_secret(secret_id);
289        }
290    }
291
292    pub fn drop_database(&mut self, db_id: DatabaseId) {
293        let name = self.db_name_by_id.remove(&db_id).unwrap();
294        let database = self.database_by_name.remove(&name).unwrap();
295        database.iter_all_table_ids().for_each(|table| {
296            self.table_by_id.remove(&table);
297        });
298    }
299
300    pub fn drop_schema(&mut self, db_id: DatabaseId, schema_id: SchemaId) {
301        self.get_database_mut(db_id).unwrap().drop_schema(schema_id);
302    }
303
304    pub fn drop_table(&mut self, db_id: DatabaseId, schema_id: SchemaId, tb_id: TableId) {
305        self.table_by_id.remove(&tb_id);
306        self.get_database_mut(db_id)
307            .unwrap()
308            .get_schema_mut(schema_id)
309            .unwrap()
310            .drop_table(tb_id);
311    }
312
313    pub fn update_table(&mut self, proto: &PbTable) {
314        let database = self.get_database_mut(proto.database_id).unwrap();
315        let schema = database.get_schema_mut(proto.schema_id).unwrap();
316        let table = if schema.get_table_by_id(&proto.id.into()).is_some() {
317            schema.update_table(proto)
318        } else {
319            // Enter this branch when schema is changed by `ALTER ... SET SCHEMA ...` statement.
320            let new_table = schema.create_table(proto);
321            database
322                .iter_schemas_mut()
323                .find(|schema| {
324                    schema.id() != proto.schema_id
325                        && schema.get_created_table_by_id(&proto.id.into()).is_some()
326                })
327                .unwrap()
328                .drop_table(proto.id.into());
329            new_table
330        };
331
332        self.table_by_id.insert(proto.id.into(), table);
333    }
334
335    pub fn update_database(&mut self, proto: &PbDatabase) {
336        let id = proto.id;
337        let name = proto.name.clone();
338
339        let old_database_name = self.db_name_by_id.get(&id).unwrap().to_owned();
340        if old_database_name != name {
341            let mut database = self.database_by_name.remove(&old_database_name).unwrap();
342            database.name.clone_from(&name);
343            database.owner = proto.owner;
344            self.database_by_name.insert(name.clone(), database);
345            self.db_name_by_id.insert(id, name);
346        } else {
347            let database = self.get_database_mut(id).unwrap();
348            database.name = name;
349            database.owner = proto.owner;
350        }
351    }
352
353    pub fn update_schema(&mut self, proto: &PbSchema) {
354        self.get_database_mut(proto.database_id)
355            .unwrap()
356            .update_schema(proto);
357    }
358
359    pub fn update_index(&mut self, proto: &PbIndex) {
360        let database = self.get_database_mut(proto.database_id).unwrap();
361        let schema = database.get_schema_mut(proto.schema_id).unwrap();
362        if schema.get_index_by_id(&proto.id.into()).is_some() {
363            schema.update_index(proto);
364        } else {
365            // Enter this branch when schema is changed by `ALTER ... SET SCHEMA ...` statement.
366            schema.create_index(proto);
367            database
368                .iter_schemas_mut()
369                .find(|schema| {
370                    schema.id() != proto.schema_id
371                        && schema.get_index_by_id(&proto.id.into()).is_some()
372                })
373                .unwrap()
374                .drop_index(proto.id.into());
375        }
376    }
377
378    pub fn drop_source(&mut self, db_id: DatabaseId, schema_id: SchemaId, source_id: SourceId) {
379        self.get_database_mut(db_id)
380            .unwrap()
381            .get_schema_mut(schema_id)
382            .unwrap()
383            .drop_source(source_id);
384    }
385
386    pub fn update_source(&mut self, proto: &PbSource) {
387        let database = self.get_database_mut(proto.database_id).unwrap();
388        let schema = database.get_schema_mut(proto.schema_id).unwrap();
389        if schema.get_source_by_id(&proto.id).is_some() {
390            schema.update_source(proto);
391        } else {
392            // Enter this branch when schema is changed by `ALTER ... SET SCHEMA ...` statement.
393            schema.create_source(proto);
394            database
395                .iter_schemas_mut()
396                .find(|schema| {
397                    schema.id() != proto.schema_id && schema.get_source_by_id(&proto.id).is_some()
398                })
399                .unwrap()
400                .drop_source(proto.id);
401        }
402    }
403
404    pub fn drop_sink(&mut self, db_id: DatabaseId, schema_id: SchemaId, sink_id: SinkId) {
405        self.get_database_mut(db_id)
406            .unwrap()
407            .get_schema_mut(schema_id)
408            .unwrap()
409            .drop_sink(sink_id);
410    }
411
412    pub fn drop_secret(&mut self, db_id: DatabaseId, schema_id: SchemaId, secret_id: SecretId) {
413        self.get_database_mut(db_id)
414            .unwrap()
415            .get_schema_mut(schema_id)
416            .unwrap()
417            .drop_secret(secret_id);
418    }
419
420    pub fn update_sink(&mut self, proto: &PbSink) {
421        let database = self.get_database_mut(proto.database_id).unwrap();
422        let schema = database.get_schema_mut(proto.schema_id).unwrap();
423        if schema.get_sink_by_id(&proto.id).is_some() {
424            schema.update_sink(proto);
425        } else {
426            // Enter this branch when schema is changed by `ALTER ... SET SCHEMA ...` statement.
427            schema.create_sink(proto);
428            database
429                .iter_schemas_mut()
430                .find(|schema| {
431                    schema.id() != proto.schema_id && schema.get_sink_by_id(&proto.id).is_some()
432                })
433                .unwrap()
434                .drop_sink(proto.id);
435        }
436    }
437
438    pub fn drop_subscription(
439        &mut self,
440        db_id: DatabaseId,
441        schema_id: SchemaId,
442        subscription_id: SubscriptionId,
443    ) {
444        self.get_database_mut(db_id)
445            .unwrap()
446            .get_schema_mut(schema_id)
447            .unwrap()
448            .drop_subscription(subscription_id);
449    }
450
451    pub fn update_subscription(&mut self, proto: &PbSubscription) {
452        let database = self.get_database_mut(proto.database_id).unwrap();
453        let schema = database.get_schema_mut(proto.schema_id).unwrap();
454        if schema.get_subscription_by_id(&proto.id).is_some() {
455            schema.update_subscription(proto);
456        } else {
457            // Enter this branch when schema is changed by `ALTER ... SET SCHEMA ...` statement.
458            schema.create_subscription(proto);
459            database
460                .iter_schemas_mut()
461                .find(|schema| {
462                    schema.id() != proto.schema_id
463                        && schema.get_subscription_by_id(&proto.id).is_some()
464                })
465                .unwrap()
466                .drop_subscription(proto.id);
467        }
468    }
469
470    pub fn drop_index(&mut self, db_id: DatabaseId, schema_id: SchemaId, index_id: IndexId) {
471        self.get_database_mut(db_id)
472            .unwrap()
473            .get_schema_mut(schema_id)
474            .unwrap()
475            .drop_index(index_id);
476    }
477
478    pub fn drop_view(&mut self, db_id: DatabaseId, schema_id: SchemaId, view_id: ViewId) {
479        self.get_database_mut(db_id)
480            .unwrap()
481            .get_schema_mut(schema_id)
482            .unwrap()
483            .drop_view(view_id);
484    }
485
486    pub fn update_view(&mut self, proto: &PbView) {
487        let database = self.get_database_mut(proto.database_id).unwrap();
488        let schema = database.get_schema_mut(proto.schema_id).unwrap();
489        if schema.get_view_by_id(&proto.id).is_some() {
490            schema.update_view(proto);
491        } else {
492            // Enter this branch when schema is changed by `ALTER ... SET SCHEMA ...` statement.
493            schema.create_view(proto);
494            database
495                .iter_schemas_mut()
496                .find(|schema| {
497                    schema.id() != proto.schema_id && schema.get_view_by_id(&proto.id).is_some()
498                })
499                .unwrap()
500                .drop_view(proto.id);
501        }
502    }
503
504    pub fn drop_function(
505        &mut self,
506        db_id: DatabaseId,
507        schema_id: SchemaId,
508        function_id: FunctionId,
509    ) {
510        self.get_database_mut(db_id)
511            .unwrap()
512            .get_schema_mut(schema_id)
513            .unwrap()
514            .drop_function(function_id);
515    }
516
517    pub fn update_function(&mut self, proto: &PbFunction) {
518        let database = self.get_database_mut(proto.database_id).unwrap();
519        let schema = database.get_schema_mut(proto.schema_id).unwrap();
520        if schema.get_function_by_id(proto.id.into()).is_some() {
521            schema.update_function(proto);
522        } else {
523            // Enter this branch when schema is changed by `ALTER ... SET SCHEMA ...` statement.
524            schema.create_function(proto);
525            database
526                .iter_schemas_mut()
527                .find(|schema| {
528                    schema.id() != proto.schema_id
529                        && schema.get_function_by_id(proto.id.into()).is_some()
530                })
531                .unwrap()
532                .drop_function(proto.id.into());
533        }
534
535        self.get_database_mut(proto.database_id)
536            .unwrap()
537            .get_schema_mut(proto.schema_id)
538            .unwrap()
539            .update_function(proto);
540    }
541
542    pub fn get_database_by_name(&self, db_name: &str) -> CatalogResult<&DatabaseCatalog> {
543        self.database_by_name
544            .get(db_name)
545            .ok_or_else(|| CatalogError::NotFound("database", db_name.to_owned()))
546    }
547
548    pub fn get_database_by_id(&self, db_id: &DatabaseId) -> CatalogResult<&DatabaseCatalog> {
549        let db_name = self
550            .db_name_by_id
551            .get(db_id)
552            .ok_or_else(|| CatalogError::NotFound("db_id", db_id.to_string()))?;
553        self.database_by_name
554            .get(db_name)
555            .ok_or_else(|| CatalogError::NotFound("database", db_name.to_string()))
556    }
557
558    pub fn get_all_schema_names(&self, db_name: &str) -> CatalogResult<Vec<String>> {
559        Ok(self.get_database_by_name(db_name)?.get_all_schema_names())
560    }
561
562    pub fn iter_schemas(
563        &self,
564        db_name: &str,
565    ) -> CatalogResult<impl Iterator<Item = &SchemaCatalog>> {
566        Ok(self.get_database_by_name(db_name)?.iter_schemas())
567    }
568
569    pub fn get_all_database_names(&self) -> Vec<String> {
570        self.database_by_name.keys().cloned().collect_vec()
571    }
572
573    pub fn iter_databases(&self) -> impl Iterator<Item = &DatabaseCatalog> {
574        self.database_by_name.values()
575    }
576
577    pub fn get_schema_by_name(
578        &self,
579        db_name: &str,
580        schema_name: &str,
581    ) -> CatalogResult<&SchemaCatalog> {
582        self.get_database_by_name(db_name)?
583            .get_schema_by_name(schema_name)
584            .ok_or_else(|| CatalogError::NotFound("schema", schema_name.to_owned()))
585    }
586
587    pub fn get_table_name_by_id(&self, table_id: TableId) -> CatalogResult<String> {
588        self.get_any_table_by_id(&table_id)
589            .map(|table| table.name.clone())
590    }
591
592    pub fn get_schema_by_id(
593        &self,
594        db_id: &DatabaseId,
595        schema_id: &SchemaId,
596    ) -> CatalogResult<&SchemaCatalog> {
597        self.get_database_by_id(db_id)?
598            .get_schema_by_id(schema_id)
599            .ok_or_else(|| CatalogError::NotFound("schema_id", schema_id.to_string()))
600    }
601
602    /// Refer to [`SearchPath`].
603    pub fn first_valid_schema(
604        &self,
605        db_name: &str,
606        search_path: &SearchPath,
607        user_name: &str,
608    ) -> CatalogResult<&SchemaCatalog> {
609        for path in search_path.real_path() {
610            let mut schema_name: &str = path;
611            if schema_name == USER_NAME_WILD_CARD {
612                schema_name = user_name;
613            }
614
615            if let schema_catalog @ Ok(_) = self.get_schema_by_name(db_name, schema_name) {
616                return schema_catalog;
617            }
618        }
619        Err(CatalogError::NotFound(
620            "first valid schema",
621            "no schema has been selected to create in".to_owned(),
622        ))
623    }
624
625    pub fn get_source_by_id<'a>(
626        &self,
627        db_name: &'a str,
628        schema_path: SchemaPath<'a>,
629        source_id: &SourceId,
630    ) -> CatalogResult<(&Arc<SourceCatalog>, &'a str)> {
631        schema_path
632            .try_find(|schema_name| {
633                Ok(self
634                    .get_schema_by_name(db_name, schema_name)?
635                    .get_source_by_id(source_id))
636            })?
637            .ok_or_else(|| CatalogError::NotFound("source", source_id.to_string()))
638    }
639
640    /// Used to get `TableCatalog` for Materialized Views, Tables and Indexes.
641    /// Retrieves all tables, created or creating.
642    pub fn get_any_table_by_name<'a>(
643        &self,
644        db_name: &str,
645        schema_path: SchemaPath<'a>,
646        table_name: &str,
647    ) -> CatalogResult<(&Arc<TableCatalog>, &'a str)> {
648        schema_path
649            .try_find(|schema_name| {
650                Ok(self
651                    .get_schema_by_name(db_name, schema_name)?
652                    .get_table_by_name(table_name))
653            })?
654            .ok_or_else(|| CatalogError::NotFound("table", table_name.to_owned()))
655    }
656
657    /// Used to get `TableCatalog` for Materialized Views, Tables and Indexes.
658    /// Retrieves only created tables.
659    pub fn get_created_table_by_name<'a>(
660        &self,
661        db_name: &str,
662        schema_path: SchemaPath<'a>,
663        table_name: &str,
664    ) -> CatalogResult<(&Arc<TableCatalog>, &'a str)> {
665        schema_path
666            .try_find(|schema_name| {
667                Ok(self
668                    .get_schema_by_name(db_name, schema_name)?
669                    .get_created_table_by_name(table_name))
670            })?
671            .ok_or_else(|| CatalogError::NotFound("table", table_name.to_owned()))
672    }
673
674    pub fn get_any_table_by_id(&self, table_id: &TableId) -> CatalogResult<&Arc<TableCatalog>> {
675        self.table_by_id
676            .get(table_id)
677            .ok_or_else(|| CatalogError::NotFound("table id", table_id.to_string()))
678    }
679
680    /// This function is similar to `get_table_by_id` expect that a table must be in a given database.
681    pub fn get_created_table_by_id_with_db(
682        &self,
683        db_name: &str,
684        table_id: u32,
685    ) -> CatalogResult<&Arc<TableCatalog>> {
686        let table_id = TableId::from(table_id);
687        for schema in self.get_database_by_name(db_name)?.iter_schemas() {
688            if let Some(table) = schema.get_created_table_by_id(&table_id) {
689                return Ok(table);
690            }
691        }
692        Err(CatalogError::NotFound("table id", table_id.to_string()))
693    }
694
695    // Used by test_utils only.
696    pub fn alter_table_name_by_id(&mut self, table_id: &TableId, table_name: &str) {
697        let mut found = false;
698        for database in self.database_by_name.values() {
699            if !found {
700                for schema in database.iter_schemas() {
701                    if schema.iter_user_table().any(|t| t.id() == *table_id) {
702                        found = true;
703                        break;
704                    }
705                }
706            }
707        }
708
709        if found {
710            let mut table = self.get_any_table_by_id(table_id).unwrap().to_prost();
711            table.name = table_name.to_owned();
712            self.update_table(&table);
713        }
714    }
715
716    #[cfg(test)]
717    pub fn insert_table_id_mapping(&mut self, table_id: TableId, fragment_id: super::FragmentId) {
718        self.table_by_id.insert(
719            table_id,
720            Arc::new(TableCatalog {
721                fragment_id,
722                ..Default::default()
723            }),
724        );
725    }
726
727    pub fn get_sys_table_by_name(
728        &self,
729        db_name: &str,
730        schema_name: &str,
731        table_name: &str,
732    ) -> CatalogResult<&Arc<SystemTableCatalog>> {
733        self.get_schema_by_name(db_name, schema_name)?
734            .get_system_table_by_name(table_name)
735            .ok_or_else(|| CatalogError::NotFound("table", table_name.to_owned()))
736    }
737
738    pub fn get_source_by_name<'a>(
739        &self,
740        db_name: &str,
741        schema_path: SchemaPath<'a>,
742        source_name: &str,
743    ) -> CatalogResult<(&Arc<SourceCatalog>, &'a str)> {
744        schema_path
745            .try_find(|schema_name| {
746                Ok(self
747                    .get_schema_by_name(db_name, schema_name)?
748                    .get_source_by_name(source_name))
749            })?
750            .ok_or_else(|| CatalogError::NotFound("source", source_name.to_owned()))
751    }
752
753    pub fn get_sink_by_name<'a>(
754        &self,
755        db_name: &str,
756        schema_path: SchemaPath<'a>,
757        sink_name: &str,
758    ) -> CatalogResult<(&Arc<SinkCatalog>, &'a str)> {
759        schema_path
760            .try_find(|schema_name| {
761                Ok(self
762                    .get_schema_by_name(db_name, schema_name)?
763                    .get_sink_by_name(sink_name))
764            })?
765            .ok_or_else(|| CatalogError::NotFound("sink", sink_name.to_owned()))
766    }
767
768    pub fn get_subscription_by_name<'a>(
769        &self,
770        db_name: &str,
771        schema_path: SchemaPath<'a>,
772        subscription_name: &str,
773    ) -> CatalogResult<(&Arc<SubscriptionCatalog>, &'a str)> {
774        schema_path
775            .try_find(|schema_name| {
776                Ok(self
777                    .get_schema_by_name(db_name, schema_name)?
778                    .get_subscription_by_name(subscription_name))
779            })?
780            .ok_or_else(|| CatalogError::NotFound("subscription", subscription_name.to_owned()))
781    }
782
783    pub fn get_index_by_name<'a>(
784        &self,
785        db_name: &str,
786        schema_path: SchemaPath<'a>,
787        index_name: &str,
788    ) -> CatalogResult<(&Arc<IndexCatalog>, &'a str)> {
789        schema_path
790            .try_find(|schema_name| {
791                Ok(self
792                    .get_schema_by_name(db_name, schema_name)?
793                    .get_index_by_name(index_name))
794            })?
795            .ok_or_else(|| CatalogError::NotFound("index", index_name.to_owned()))
796    }
797
798    pub fn get_index_by_id(
799        &self,
800        db_name: &str,
801        index_id: u32,
802    ) -> CatalogResult<&Arc<IndexCatalog>> {
803        let index_id = IndexId::from(index_id);
804        for schema in self.get_database_by_name(db_name)?.iter_schemas() {
805            if let Some(index) = schema.get_index_by_id(&index_id) {
806                return Ok(index);
807            }
808        }
809        Err(CatalogError::NotFound("index", index_id.to_string()))
810    }
811
812    pub fn get_view_by_name<'a>(
813        &self,
814        db_name: &str,
815        schema_path: SchemaPath<'a>,
816        view_name: &str,
817    ) -> CatalogResult<(&Arc<ViewCatalog>, &'a str)> {
818        schema_path
819            .try_find(|schema_name| {
820                Ok(self
821                    .get_schema_by_name(db_name, schema_name)?
822                    .get_view_by_name(view_name))
823            })?
824            .ok_or_else(|| CatalogError::NotFound("view", view_name.to_owned()))
825    }
826
827    pub fn get_view_by_id(&self, db_name: &str, view_id: u32) -> CatalogResult<Arc<ViewCatalog>> {
828        for schema in self.get_database_by_name(db_name)?.iter_schemas() {
829            if let Some(view) = schema.get_view_by_id(&ViewId::from(view_id)) {
830                return Ok(view.clone());
831            }
832        }
833        Err(CatalogError::NotFound("view", view_id.to_string()))
834    }
835
836    pub fn get_secret_by_name<'a>(
837        &self,
838        db_name: &str,
839        schema_path: SchemaPath<'a>,
840        secret_name: &str,
841    ) -> CatalogResult<(&Arc<SecretCatalog>, &'a str)> {
842        schema_path
843            .try_find(|schema_name| {
844                Ok(self
845                    .get_schema_by_name(db_name, schema_name)?
846                    .get_secret_by_name(secret_name))
847            })?
848            .ok_or_else(|| CatalogError::NotFound("secret", secret_name.to_owned()))
849    }
850
851    pub fn get_connection_by_name<'a>(
852        &self,
853        db_name: &str,
854        schema_path: SchemaPath<'a>,
855        connection_name: &str,
856    ) -> CatalogResult<(&Arc<ConnectionCatalog>, &'a str)> {
857        schema_path
858            .try_find(|schema_name| {
859                Ok(self
860                    .get_schema_by_name(db_name, schema_name)?
861                    .get_connection_by_name(connection_name))
862            })?
863            .ok_or_else(|| CatalogError::NotFound("connection", connection_name.to_owned()))
864    }
865
866    pub fn get_function_by_name_inputs<'a>(
867        &self,
868        db_name: &str,
869        schema_path: SchemaPath<'a>,
870        function_name: &str,
871        inputs: &mut [ExprImpl],
872    ) -> CatalogResult<(&Arc<FunctionCatalog>, &'a str)> {
873        schema_path
874            .try_find(|schema_name| {
875                Ok(self
876                    .get_schema_by_name(db_name, schema_name)?
877                    .get_function_by_name_inputs(function_name, inputs))
878            })?
879            .ok_or_else(|| {
880                CatalogError::NotFound(
881                    "function",
882                    format!(
883                        "{}({})",
884                        function_name,
885                        inputs
886                            .iter()
887                            .map(|a| a.return_type().to_string())
888                            .join(", ")
889                    ),
890                )
891            })
892    }
893
894    pub fn get_function_by_name_args<'a>(
895        &self,
896        db_name: &str,
897        schema_path: SchemaPath<'a>,
898        function_name: &str,
899        args: &[DataType],
900    ) -> CatalogResult<(&Arc<FunctionCatalog>, &'a str)> {
901        schema_path
902            .try_find(|schema_name| {
903                Ok(self
904                    .get_schema_by_name(db_name, schema_name)?
905                    .get_function_by_name_args(function_name, args))
906            })?
907            .ok_or_else(|| {
908                CatalogError::NotFound(
909                    "function",
910                    format!(
911                        "{}({})",
912                        function_name,
913                        args.iter().map(|a| a.to_string()).join(", ")
914                    ),
915                )
916            })
917    }
918
919    /// Gets all functions with the given name.
920    pub fn get_functions_by_name<'a>(
921        &self,
922        db_name: &str,
923        schema_path: SchemaPath<'a>,
924        function_name: &str,
925    ) -> CatalogResult<(Vec<&Arc<FunctionCatalog>>, &'a str)> {
926        schema_path
927            .try_find(|schema_name| {
928                Ok(self
929                    .get_schema_by_name(db_name, schema_name)?
930                    .get_functions_by_name(function_name))
931            })?
932            .ok_or_else(|| CatalogError::NotFound("function", function_name.to_owned()))
933    }
934
935    /// Check if the name duplicates with existing table, materialized view or source.
936    pub fn check_relation_name_duplicated(
937        &self,
938        db_name: &str,
939        schema_name: &str,
940        relation_name: &str,
941    ) -> CatalogResult<()> {
942        let schema = self.get_schema_by_name(db_name, schema_name)?;
943
944        if let Some(table) = schema.get_table_by_name(relation_name) {
945            let is_creating = table.stream_job_status == StreamJobStatus::Creating;
946            if table.is_index() {
947                Err(CatalogError::Duplicated(
948                    "index",
949                    relation_name.to_owned(),
950                    is_creating,
951                ))
952            } else if table.is_mview() {
953                Err(CatalogError::Duplicated(
954                    "materialized view",
955                    relation_name.to_owned(),
956                    is_creating,
957                ))
958            } else {
959                Err(CatalogError::Duplicated(
960                    "table",
961                    relation_name.to_owned(),
962                    is_creating,
963                ))
964            }
965        } else if schema.get_source_by_name(relation_name).is_some() {
966            Err(CatalogError::duplicated("source", relation_name.to_owned()))
967        } else if schema.get_sink_by_name(relation_name).is_some() {
968            Err(CatalogError::duplicated("sink", relation_name.to_owned()))
969        } else if schema.get_view_by_name(relation_name).is_some() {
970            Err(CatalogError::duplicated("view", relation_name.to_owned()))
971        } else if let Some(subscription) = schema.get_subscription_by_name(relation_name) {
972            let is_not_created = subscription.subscription_state != SubscriptionState::Created;
973            Err(CatalogError::Duplicated(
974                "subscription",
975                relation_name.to_owned(),
976                is_not_created,
977            ))
978        } else {
979            Ok(())
980        }
981    }
982
983    pub fn check_function_name_duplicated(
984        &self,
985        db_name: &str,
986        schema_name: &str,
987        function_name: &str,
988        arg_types: &[DataType],
989    ) -> CatalogResult<()> {
990        let schema = self.get_schema_by_name(db_name, schema_name)?;
991
992        if schema
993            .get_function_by_name_args(function_name, arg_types)
994            .is_some()
995        {
996            let name = format!(
997                "{function_name}({})",
998                arg_types.iter().map(|t| t.to_string()).join(",")
999            );
1000            Err(CatalogError::duplicated("function", name))
1001        } else {
1002            Ok(())
1003        }
1004    }
1005
1006    /// Check if the name duplicates with existing connection.
1007    pub fn check_connection_name_duplicated(
1008        &self,
1009        db_name: &str,
1010        schema_name: &str,
1011        connection_name: &str,
1012    ) -> CatalogResult<()> {
1013        let schema = self.get_schema_by_name(db_name, schema_name)?;
1014
1015        if schema.get_connection_by_name(connection_name).is_some() {
1016            Err(CatalogError::duplicated(
1017                "connection",
1018                connection_name.to_owned(),
1019            ))
1020        } else {
1021            Ok(())
1022        }
1023    }
1024
1025    pub fn check_secret_name_duplicated(
1026        &self,
1027        db_name: &str,
1028        schema_name: &str,
1029        secret_name: &str,
1030    ) -> CatalogResult<()> {
1031        let schema = self.get_schema_by_name(db_name, schema_name)?;
1032
1033        if schema.get_secret_by_name(secret_name).is_some() {
1034            Err(CatalogError::duplicated("secret", secret_name.to_owned()))
1035        } else {
1036            Ok(())
1037        }
1038    }
1039
1040    /// Get the catalog cache's catalog version.
1041    pub fn version(&self) -> u64 {
1042        self.version
1043    }
1044
1045    /// Set the catalog cache's catalog version.
1046    pub fn set_version(&mut self, catalog_version: CatalogVersion) {
1047        self.version = catalog_version;
1048    }
1049
1050    pub fn table_stats(&self) -> &HummockVersionStats {
1051        &self.table_stats
1052    }
1053
1054    pub fn set_table_stats(&mut self, table_stats: HummockVersionStats) {
1055        self.table_stats = table_stats;
1056    }
1057
1058    pub fn get_all_indexes_related_to_object(
1059        &self,
1060        db_id: DatabaseId,
1061        schema_id: SchemaId,
1062        mv_id: TableId,
1063    ) -> Vec<Arc<IndexCatalog>> {
1064        self.get_database_by_id(&db_id)
1065            .unwrap()
1066            .get_schema_by_id(&schema_id)
1067            .unwrap()
1068            .get_indexes_by_table_id(&mv_id)
1069    }
1070
1071    pub fn get_id_by_class_name(
1072        &self,
1073        db_name: &str,
1074        schema_path: SchemaPath<'_>,
1075        class_name: &str,
1076    ) -> CatalogResult<u32> {
1077        schema_path
1078            .try_find(|schema_name| {
1079                let schema = self.get_schema_by_name(db_name, schema_name)?;
1080                #[allow(clippy::manual_map)]
1081                if let Some(item) = schema.get_system_table_by_name(class_name) {
1082                    Ok(Some(item.id().into()))
1083                } else if let Some(item) = schema.get_created_table_by_name(class_name) {
1084                    Ok(Some(item.id().into()))
1085                } else if let Some(item) = schema.get_index_by_name(class_name) {
1086                    Ok(Some(item.id.into()))
1087                } else if let Some(item) = schema.get_source_by_name(class_name) {
1088                    Ok(Some(item.id))
1089                } else if let Some(item) = schema.get_view_by_name(class_name) {
1090                    Ok(Some(item.id))
1091                } else {
1092                    Ok(None)
1093                }
1094            })?
1095            .map(|(id, _)| id)
1096            .ok_or_else(|| CatalogError::NotFound("class", class_name.to_owned()))
1097    }
1098}