risingwave_frontend/catalog/
root_catalog.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::Arc;
17
18use itertools::Itertools;
19use risingwave_common::catalog::{FunctionId, IndexId, StreamJobStatus, TableId};
20use risingwave_common::id::ObjectId;
21use risingwave_common::session_config::{SearchPath, USER_NAME_WILD_CARD};
22use risingwave_common::types::DataType;
23use risingwave_connector::sink::catalog::SinkCatalog;
24use risingwave_pb::catalog::{
25    PbConnection, PbDatabase, PbFunction, PbIndex, PbSchema, PbSecret, PbSink, PbSource,
26    PbSubscription, PbTable, PbView,
27};
28use risingwave_pb::hummock::HummockVersionStats;
29
30use super::function_catalog::FunctionCatalog;
31use super::source_catalog::SourceCatalog;
32use super::subscription_catalog::{SubscriptionCatalog, SubscriptionState};
33use super::view_catalog::ViewCatalog;
34use super::{
35    CatalogError, CatalogResult, ConnectionId, SecretId, SinkId, SourceId, SubscriptionId, ViewId,
36};
37use crate::catalog::connection_catalog::ConnectionCatalog;
38use crate::catalog::database_catalog::DatabaseCatalog;
39use crate::catalog::schema_catalog::SchemaCatalog;
40use crate::catalog::secret_catalog::SecretCatalog;
41use crate::catalog::system_catalog::{
42    SystemTableCatalog, get_sys_tables_in_schema, get_sys_views_in_schema,
43};
44use crate::catalog::table_catalog::TableCatalog;
45use crate::catalog::{DatabaseId, IndexCatalog, SchemaId};
46use crate::expr::{Expr, ExprImpl};
47
48#[derive(Copy, Clone)]
49pub enum SchemaPath<'a> {
50    Name(&'a str),
51    /// (`search_path`, `user_name`).
52    Path(&'a SearchPath, &'a str),
53}
54
55impl<'a> SchemaPath<'a> {
56    pub fn new(
57        schema_name: Option<&'a str>,
58        search_path: &'a SearchPath,
59        user_name: &'a str,
60    ) -> Self {
61        match schema_name {
62            Some(schema_name) => SchemaPath::Name(schema_name),
63            None => SchemaPath::Path(search_path, user_name),
64        }
65    }
66
67    /// Call function `f` for each schema name. Return the first `Some` result.
68    pub fn try_find<T, E>(
69        &self,
70        mut f: impl FnMut(&str) -> Result<Option<T>, E>,
71    ) -> Result<Option<(T, &'a str)>, E> {
72        match self {
73            SchemaPath::Name(schema_name) => Ok(f(schema_name)?.map(|t| (t, *schema_name))),
74            SchemaPath::Path(search_path, user_name) => {
75                for schema_name in search_path.path() {
76                    let mut schema_name: &str = schema_name;
77                    if schema_name == USER_NAME_WILD_CARD {
78                        schema_name = user_name;
79                    }
80                    if let Ok(Some(res)) = f(schema_name) {
81                        return Ok(Some((res, schema_name)));
82                    }
83                }
84                Ok(None)
85            }
86        }
87    }
88}
89
90/// Root catalog of database catalog. It manages all database/schema/table in memory on frontend.
91/// It is protected by a `RwLock`. Only [`crate::observer::FrontendObserverNode`]
92/// will acquire the write lock and sync it with the meta catalog. In other situations, it is
93/// read only.
94///
95/// - catalog (root catalog)
96///   - database catalog
97///     - schema catalog
98///       - function catalog (i.e., user defined function)
99///       - table/sink/source/index/view catalog
100///        - column catalog
101pub struct Catalog {
102    database_by_name: HashMap<String, DatabaseCatalog>,
103    db_name_by_id: HashMap<DatabaseId, String>,
104    /// all table catalogs in the cluster identified by universal unique table id.
105    table_by_id: HashMap<TableId, Arc<TableCatalog>>,
106    table_stats: HummockVersionStats,
107}
108
109#[expect(clippy::derivable_impls)]
110impl Default for Catalog {
111    fn default() -> Self {
112        Self {
113            database_by_name: HashMap::new(),
114            db_name_by_id: HashMap::new(),
115            table_by_id: HashMap::new(),
116            table_stats: HummockVersionStats::default(),
117        }
118    }
119}
120
121impl Catalog {
122    fn get_database_mut(&mut self, db_id: DatabaseId) -> Option<&mut DatabaseCatalog> {
123        let name = self.db_name_by_id.get(&db_id)?;
124        self.database_by_name.get_mut(name)
125    }
126
127    pub fn clear(&mut self) {
128        self.database_by_name.clear();
129        self.db_name_by_id.clear();
130        self.table_by_id.clear();
131    }
132
133    pub fn create_database(&mut self, db: &PbDatabase) {
134        let name = db.name.clone();
135        let id = db.id;
136
137        self.database_by_name
138            .try_insert(name.clone(), db.into())
139            .unwrap();
140        self.db_name_by_id.try_insert(id, name).unwrap();
141    }
142
143    pub fn create_schema(&mut self, proto: &PbSchema) {
144        let database_id = proto.database_id;
145        let id = proto.id;
146        self.get_database_mut(database_id)
147            .unwrap()
148            .create_schema(proto);
149
150        for sys_table in get_sys_tables_in_schema(proto.name.as_str()) {
151            self.get_database_mut(database_id)
152                .unwrap()
153                .get_schema_mut(id)
154                .unwrap()
155                .create_sys_table(sys_table);
156        }
157        for mut sys_view in get_sys_views_in_schema(proto.name.as_str()) {
158            sys_view.database_id = database_id;
159            sys_view.schema_id = id;
160            self.get_database_mut(database_id)
161                .unwrap()
162                .get_schema_mut(id)
163                .unwrap()
164                .create_sys_view(Arc::new(sys_view));
165        }
166    }
167
168    pub fn create_table(&mut self, proto: &PbTable) {
169        let table = self
170            .get_database_mut(proto.database_id)
171            .unwrap()
172            .get_schema_mut(proto.schema_id)
173            .unwrap()
174            .create_table(proto);
175        self.table_by_id.insert(proto.id, table);
176    }
177
178    pub fn create_index(&mut self, proto: &PbIndex) {
179        self.get_database_mut(proto.database_id)
180            .unwrap()
181            .get_schema_mut(proto.schema_id)
182            .unwrap()
183            .create_index(proto);
184    }
185
186    pub fn create_source(&mut self, proto: &PbSource) {
187        self.get_database_mut(proto.database_id)
188            .unwrap()
189            .get_schema_mut(proto.schema_id)
190            .unwrap()
191            .create_source(proto);
192    }
193
194    pub fn create_sink(&mut self, proto: &PbSink) {
195        self.get_database_mut(proto.database_id)
196            .unwrap()
197            .get_schema_mut(proto.schema_id)
198            .unwrap()
199            .create_sink(proto);
200    }
201
202    pub fn create_subscription(&mut self, proto: &PbSubscription) {
203        self.get_database_mut(proto.database_id)
204            .unwrap()
205            .get_schema_mut(proto.schema_id)
206            .unwrap()
207            .create_subscription(proto);
208    }
209
210    pub fn create_secret(&mut self, proto: &PbSecret) {
211        self.get_database_mut(proto.database_id)
212            .unwrap()
213            .get_schema_mut(proto.schema_id)
214            .unwrap()
215            .create_secret(proto);
216    }
217
218    pub fn create_view(&mut self, proto: &PbView) {
219        self.get_database_mut(proto.database_id)
220            .unwrap()
221            .get_schema_mut(proto.schema_id)
222            .unwrap()
223            .create_view(proto);
224    }
225
226    pub fn create_function(&mut self, proto: &PbFunction) {
227        self.get_database_mut(proto.database_id)
228            .unwrap()
229            .get_schema_mut(proto.schema_id)
230            .unwrap()
231            .create_function(proto);
232    }
233
234    pub fn create_connection(&mut self, proto: &PbConnection) {
235        self.get_database_mut(proto.database_id)
236            .unwrap()
237            .get_schema_mut(proto.schema_id)
238            .unwrap()
239            .create_connection(proto);
240    }
241
242    pub fn drop_connection(
243        &mut self,
244        db_id: DatabaseId,
245        schema_id: SchemaId,
246        connection_id: ConnectionId,
247    ) {
248        self.get_database_mut(db_id)
249            .unwrap()
250            .get_schema_mut(schema_id)
251            .unwrap()
252            .drop_connection(connection_id);
253    }
254
255    pub fn update_connection(&mut self, proto: &PbConnection) {
256        let database = self.get_database_mut(proto.database_id).unwrap();
257        let schema = database.get_schema_mut(proto.schema_id).unwrap();
258        if schema.get_connection_by_id(proto.id).is_some() {
259            schema.update_connection(proto);
260        } else {
261            // Enter this branch when schema is changed by `ALTER ... SET SCHEMA ...` statement.
262            schema.create_connection(proto);
263            database
264                .iter_schemas_mut()
265                .find(|schema| {
266                    schema.id() != proto.schema_id
267                        && schema.get_connection_by_id(proto.id).is_some()
268                })
269                .unwrap()
270                .drop_connection(proto.id);
271        }
272    }
273
274    pub fn update_secret(&mut self, proto: &PbSecret) {
275        let database = self.get_database_mut(proto.database_id).unwrap();
276        let schema = database.get_schema_mut(proto.schema_id).unwrap();
277        let secret_id = proto.id;
278        if schema.get_secret_by_id(secret_id).is_some() {
279            schema.update_secret(proto);
280        } else {
281            // Enter this branch when schema is changed by `ALTER ... SET SCHEMA ...` statement.
282            schema.create_secret(proto);
283            database
284                .iter_schemas_mut()
285                .find(|schema| {
286                    schema.id() != proto.schema_id && schema.get_secret_by_id(secret_id).is_some()
287                })
288                .unwrap()
289                .drop_secret(secret_id);
290        }
291    }
292
293    pub fn drop_database(&mut self, db_id: DatabaseId) {
294        let name = self.db_name_by_id.remove(&db_id).unwrap();
295        let database = self.database_by_name.remove(&name).unwrap();
296        database.iter_all_table_ids().for_each(|table| {
297            self.table_by_id.remove(&table);
298        });
299    }
300
301    pub fn drop_schema(&mut self, db_id: DatabaseId, schema_id: SchemaId) {
302        self.get_database_mut(db_id).unwrap().drop_schema(schema_id);
303    }
304
305    pub fn drop_table(&mut self, db_id: DatabaseId, schema_id: SchemaId, tb_id: TableId) {
306        self.table_by_id.remove(&tb_id);
307        self.get_database_mut(db_id)
308            .unwrap()
309            .get_schema_mut(schema_id)
310            .unwrap()
311            .drop_table(tb_id);
312    }
313
314    pub fn update_table(&mut self, proto: &PbTable) {
315        let database = self.get_database_mut(proto.database_id).unwrap();
316        let schema = database.get_schema_mut(proto.schema_id).unwrap();
317        let table = if schema.get_table_by_id(proto.id).is_some() {
318            schema.update_table(proto)
319        } else {
320            // Enter this branch when schema is changed by `ALTER ... SET SCHEMA ...` statement.
321            let new_table = schema.create_table(proto);
322            database
323                .iter_schemas_mut()
324                .find(|schema| {
325                    schema.id() != proto.schema_id
326                        && schema.get_created_table_by_id(proto.id).is_some()
327                })
328                .unwrap()
329                .drop_table(proto.id);
330            new_table
331        };
332
333        self.table_by_id.insert(proto.id, table);
334    }
335
336    pub fn update_database(&mut self, proto: &PbDatabase) {
337        let id = proto.id;
338        let name = proto.name.clone();
339
340        let old_database_name = self.db_name_by_id.get(&id).unwrap().to_owned();
341        if old_database_name != name {
342            let mut database = self.database_by_name.remove(&old_database_name).unwrap();
343            database.name.clone_from(&name);
344            database.owner = proto.owner;
345            self.database_by_name.insert(name.clone(), database);
346            self.db_name_by_id.insert(id, name);
347        } else {
348            let database = self.get_database_mut(id).unwrap();
349            database.name = name;
350            database.owner = proto.owner;
351            database.barrier_interval_ms = proto.barrier_interval_ms;
352            database.checkpoint_frequency = proto.checkpoint_frequency;
353        }
354    }
355
356    pub fn update_schema(&mut self, proto: &PbSchema) {
357        self.get_database_mut(proto.database_id)
358            .unwrap()
359            .update_schema(proto);
360    }
361
362    pub fn update_index(&mut self, proto: &PbIndex) {
363        let database = self.get_database_mut(proto.database_id).unwrap();
364        let schema = database.get_schema_mut(proto.schema_id).unwrap();
365        if schema.get_index_by_id(proto.id).is_some() {
366            schema.update_index(proto);
367        } else {
368            // Enter this branch when schema is changed by `ALTER ... SET SCHEMA ...` statement.
369            schema.create_index(proto);
370            database
371                .iter_schemas_mut()
372                .find(|schema| {
373                    schema.id() != proto.schema_id && schema.get_index_by_id(proto.id).is_some()
374                })
375                .unwrap()
376                .drop_index(proto.id);
377        }
378    }
379
380    pub fn drop_source(&mut self, db_id: DatabaseId, schema_id: SchemaId, source_id: SourceId) {
381        self.get_database_mut(db_id)
382            .unwrap()
383            .get_schema_mut(schema_id)
384            .unwrap()
385            .drop_source(source_id);
386    }
387
388    pub fn update_source(&mut self, proto: &PbSource) {
389        let database = self.get_database_mut(proto.database_id).unwrap();
390        let schema = database.get_schema_mut(proto.schema_id).unwrap();
391        if schema.get_source_by_id(proto.id).is_some() {
392            schema.update_source(proto);
393        } else {
394            // Enter this branch when schema is changed by `ALTER ... SET SCHEMA ...` statement.
395            schema.create_source(proto);
396            database
397                .iter_schemas_mut()
398                .find(|schema| {
399                    schema.id() != proto.schema_id && schema.get_source_by_id(proto.id).is_some()
400                })
401                .unwrap()
402                .drop_source(proto.id);
403        }
404    }
405
406    pub fn drop_sink(&mut self, db_id: DatabaseId, schema_id: SchemaId, sink_id: SinkId) {
407        self.get_database_mut(db_id)
408            .unwrap()
409            .get_schema_mut(schema_id)
410            .unwrap()
411            .drop_sink(sink_id);
412    }
413
414    pub fn drop_secret(&mut self, db_id: DatabaseId, schema_id: SchemaId, secret_id: SecretId) {
415        self.get_database_mut(db_id)
416            .unwrap()
417            .get_schema_mut(schema_id)
418            .unwrap()
419            .drop_secret(secret_id);
420    }
421
422    pub fn update_sink(&mut self, proto: &PbSink) {
423        let database = self.get_database_mut(proto.database_id).unwrap();
424        let schema = database.get_schema_mut(proto.schema_id).unwrap();
425        if schema.get_sink_by_id(proto.id).is_some() {
426            schema.update_sink(proto);
427        } else {
428            // Enter this branch when schema is changed by `ALTER ... SET SCHEMA ...` statement.
429            schema.create_sink(proto);
430            database
431                .iter_schemas_mut()
432                .find(|schema| {
433                    schema.id() != proto.schema_id && schema.get_sink_by_id(proto.id).is_some()
434                })
435                .unwrap()
436                .drop_sink(proto.id);
437        }
438    }
439
440    pub fn drop_subscription(
441        &mut self,
442        db_id: DatabaseId,
443        schema_id: SchemaId,
444        subscription_id: SubscriptionId,
445    ) {
446        self.get_database_mut(db_id)
447            .unwrap()
448            .get_schema_mut(schema_id)
449            .unwrap()
450            .drop_subscription(subscription_id);
451    }
452
453    pub fn update_subscription(&mut self, proto: &PbSubscription) {
454        let database = self.get_database_mut(proto.database_id).unwrap();
455        let schema = database.get_schema_mut(proto.schema_id).unwrap();
456        if schema.get_subscription_by_id(proto.id).is_some() {
457            schema.update_subscription(proto);
458        } else {
459            // Enter this branch when schema is changed by `ALTER ... SET SCHEMA ...` statement.
460            schema.create_subscription(proto);
461            database
462                .iter_schemas_mut()
463                .find(|schema| {
464                    schema.id() != proto.schema_id
465                        && schema.get_subscription_by_id(proto.id).is_some()
466                })
467                .unwrap()
468                .drop_subscription(proto.id);
469        }
470    }
471
472    pub fn drop_index(&mut self, db_id: DatabaseId, schema_id: SchemaId, index_id: IndexId) {
473        self.get_database_mut(db_id)
474            .unwrap()
475            .get_schema_mut(schema_id)
476            .unwrap()
477            .drop_index(index_id);
478    }
479
480    pub fn drop_view(&mut self, db_id: DatabaseId, schema_id: SchemaId, view_id: ViewId) {
481        self.get_database_mut(db_id)
482            .unwrap()
483            .get_schema_mut(schema_id)
484            .unwrap()
485            .drop_view(view_id);
486    }
487
488    pub fn update_view(&mut self, proto: &PbView) {
489        let database = self.get_database_mut(proto.database_id).unwrap();
490        let schema = database.get_schema_mut(proto.schema_id).unwrap();
491        if schema.get_view_by_id(proto.id).is_some() {
492            schema.update_view(proto);
493        } else {
494            // Enter this branch when schema is changed by `ALTER ... SET SCHEMA ...` statement.
495            schema.create_view(proto);
496            database
497                .iter_schemas_mut()
498                .find(|schema| {
499                    schema.id() != proto.schema_id && schema.get_view_by_id(proto.id).is_some()
500                })
501                .unwrap()
502                .drop_view(proto.id);
503        }
504    }
505
506    pub fn drop_function(
507        &mut self,
508        db_id: DatabaseId,
509        schema_id: SchemaId,
510        function_id: FunctionId,
511    ) {
512        self.get_database_mut(db_id)
513            .unwrap()
514            .get_schema_mut(schema_id)
515            .unwrap()
516            .drop_function(function_id);
517    }
518
519    pub fn update_function(&mut self, proto: &PbFunction) {
520        let database = self.get_database_mut(proto.database_id).unwrap();
521        let schema = database.get_schema_mut(proto.schema_id).unwrap();
522        if schema.get_function_by_id(proto.id).is_some() {
523            schema.update_function(proto);
524        } else {
525            // Enter this branch when schema is changed by `ALTER ... SET SCHEMA ...` statement.
526            schema.create_function(proto);
527            database
528                .iter_schemas_mut()
529                .find(|schema| {
530                    schema.id() != proto.schema_id && schema.get_function_by_id(proto.id).is_some()
531                })
532                .unwrap()
533                .drop_function(proto.id);
534        }
535
536        self.get_database_mut(proto.database_id)
537            .unwrap()
538            .get_schema_mut(proto.schema_id)
539            .unwrap()
540            .update_function(proto);
541    }
542
543    pub fn get_database_by_name(&self, db_name: &str) -> CatalogResult<&DatabaseCatalog> {
544        self.database_by_name
545            .get(db_name)
546            .ok_or_else(|| CatalogError::NotFound("database", db_name.to_owned()))
547    }
548
549    pub fn get_database_by_id(&self, db_id: DatabaseId) -> CatalogResult<&DatabaseCatalog> {
550        let db_name = self
551            .db_name_by_id
552            .get(&db_id)
553            .ok_or_else(|| CatalogError::NotFound("db_id", db_id.to_string()))?;
554        self.database_by_name
555            .get(db_name)
556            .ok_or_else(|| CatalogError::NotFound("database", db_name.clone()))
557    }
558
559    pub fn find_schema_secret_by_secret_id(
560        &self,
561        db_name: &str,
562        secret_id: SecretId,
563    ) -> CatalogResult<(String, String)> {
564        let db = self.get_database_by_name(db_name)?;
565        let schema_secret = db
566            .iter_schemas()
567            .find_map(|schema| {
568                schema
569                    .get_secret_by_id(secret_id)
570                    .map(|secret| (schema.name(), secret.name.clone()))
571            })
572            .ok_or_else(|| CatalogError::NotFound("secret", secret_id.to_string()))?;
573        Ok(schema_secret)
574    }
575
576    pub fn get_all_schema_names(&self, db_name: &str) -> CatalogResult<Vec<String>> {
577        Ok(self.get_database_by_name(db_name)?.get_all_schema_names())
578    }
579
580    pub fn iter_schemas(
581        &self,
582        db_name: &str,
583    ) -> CatalogResult<impl Iterator<Item = &SchemaCatalog>> {
584        Ok(self.get_database_by_name(db_name)?.iter_schemas())
585    }
586
587    pub fn get_all_database_names(&self) -> Vec<String> {
588        self.database_by_name.keys().cloned().collect_vec()
589    }
590
591    pub fn iter_databases(&self) -> impl Iterator<Item = &DatabaseCatalog> {
592        self.database_by_name.values()
593    }
594
595    pub fn get_schema_by_name(
596        &self,
597        db_name: &str,
598        schema_name: &str,
599    ) -> CatalogResult<&SchemaCatalog> {
600        self.get_database_by_name(db_name)?
601            .get_schema_by_name(schema_name)
602            .ok_or_else(|| CatalogError::NotFound("schema", schema_name.to_owned()))
603    }
604
605    pub fn get_table_name_by_id(&self, table_id: TableId) -> CatalogResult<String> {
606        self.get_any_table_by_id(table_id)
607            .map(|table| table.name.clone())
608    }
609
610    pub fn get_schema_by_id(
611        &self,
612        db_id: DatabaseId,
613        schema_id: SchemaId,
614    ) -> CatalogResult<&SchemaCatalog> {
615        self.get_database_by_id(db_id)?
616            .get_schema_by_id(schema_id)
617            .ok_or_else(|| CatalogError::NotFound("schema_id", schema_id.to_string()))
618    }
619
620    /// Refer to [`SearchPath`].
621    pub fn first_valid_schema(
622        &self,
623        db_name: &str,
624        search_path: &SearchPath,
625        user_name: &str,
626    ) -> CatalogResult<&SchemaCatalog> {
627        for path in search_path.real_path() {
628            let mut schema_name: &str = path;
629            if schema_name == USER_NAME_WILD_CARD {
630                schema_name = user_name;
631            }
632
633            if let schema_catalog @ Ok(_) = self.get_schema_by_name(db_name, schema_name) {
634                return schema_catalog;
635            }
636        }
637        Err(CatalogError::NotFound(
638            "first valid schema",
639            "no schema has been selected to create in".to_owned(),
640        ))
641    }
642
643    pub fn get_source_by_id<'a>(
644        &self,
645        db_name: &'a str,
646        schema_path: SchemaPath<'a>,
647        source_id: SourceId,
648    ) -> CatalogResult<(&Arc<SourceCatalog>, &'a str)> {
649        schema_path
650            .try_find(|schema_name| {
651                Ok(self
652                    .get_schema_by_name(db_name, schema_name)?
653                    .get_source_by_id(source_id))
654            })?
655            .ok_or_else(|| CatalogError::NotFound("source", source_id.to_string()))
656    }
657
658    pub fn get_table_by_name<'a>(
659        &self,
660        db_name: &str,
661        schema_path: SchemaPath<'a>,
662        table_name: &str,
663        bind_creating: bool,
664    ) -> CatalogResult<(&Arc<TableCatalog>, &'a str)> {
665        schema_path
666            .try_find(|schema_name| {
667                Ok(self
668                    .get_schema_by_name(db_name, schema_name)?
669                    .get_table_by_name(table_name, bind_creating))
670            })?
671            .ok_or_else(|| CatalogError::NotFound("table", table_name.to_owned()))
672    }
673
674    /// Used to get `TableCatalog` for Materialized Views, Tables and Indexes.
675    /// Retrieves all tables, created or creating.
676    pub fn get_any_table_by_name<'a>(
677        &self,
678        db_name: &str,
679        schema_path: SchemaPath<'a>,
680        table_name: &str,
681    ) -> CatalogResult<(&Arc<TableCatalog>, &'a str)> {
682        self.get_table_by_name(db_name, schema_path, table_name, true)
683    }
684
685    /// Used to get `TableCatalog` for Materialized Views, Tables and Indexes.
686    /// Retrieves only created tables.
687    pub fn get_created_table_by_name<'a>(
688        &self,
689        db_name: &str,
690        schema_path: SchemaPath<'a>,
691        table_name: &str,
692    ) -> CatalogResult<(&Arc<TableCatalog>, &'a str)> {
693        self.get_table_by_name(db_name, schema_path, table_name, false)
694    }
695
696    pub fn get_any_table_by_id(&self, table_id: TableId) -> CatalogResult<&Arc<TableCatalog>> {
697        self.table_by_id
698            .get(&table_id)
699            .ok_or_else(|| CatalogError::NotFound("table id", table_id.to_string()))
700    }
701
702    /// This function is similar to `get_table_by_id` expect that a table must be in a given database.
703    pub fn get_created_table_by_id_with_db(
704        &self,
705        db_name: &str,
706        table_id: u32,
707    ) -> CatalogResult<&Arc<TableCatalog>> {
708        let table_id = TableId::from(table_id);
709        for schema in self.get_database_by_name(db_name)?.iter_schemas() {
710            if let Some(table) = schema.get_created_table_by_id(table_id) {
711                return Ok(table);
712            }
713        }
714        Err(CatalogError::NotFound("table id", table_id.to_string()))
715    }
716
717    pub fn iter_tables(&self) -> impl Iterator<Item = &Arc<TableCatalog>> {
718        self.table_by_id.values()
719    }
720
721    pub fn iter_backfilling_internal_tables(&self) -> impl Iterator<Item = &Arc<TableCatalog>> {
722        self.table_by_id
723            .values()
724            .filter(|t| t.is_internal_table() && !t.is_created())
725    }
726
727    // Used by test_utils only.
728    pub fn alter_table_name_by_id(&mut self, table_id: TableId, table_name: &str) {
729        let mut found = false;
730        for database in self.database_by_name.values() {
731            if !found {
732                for schema in database.iter_schemas() {
733                    if schema.iter_user_table().any(|t| t.id() == table_id) {
734                        found = true;
735                        break;
736                    }
737                }
738            }
739        }
740
741        if found {
742            let mut table = self.get_any_table_by_id(table_id).unwrap().to_prost();
743            table.name = table_name.to_owned();
744            self.update_table(&table);
745        }
746    }
747
748    #[cfg(test)]
749    pub fn insert_table_id_mapping(&mut self, table_id: TableId, fragment_id: super::FragmentId) {
750        self.table_by_id.insert(
751            table_id,
752            Arc::new(TableCatalog {
753                fragment_id,
754                ..Default::default()
755            }),
756        );
757    }
758
759    pub fn get_sys_table_by_name(
760        &self,
761        db_name: &str,
762        schema_name: &str,
763        table_name: &str,
764    ) -> CatalogResult<&Arc<SystemTableCatalog>> {
765        self.get_schema_by_name(db_name, schema_name)?
766            .get_system_table_by_name(table_name)
767            .ok_or_else(|| CatalogError::NotFound("table", table_name.to_owned()))
768    }
769
770    pub fn get_source_by_name<'a>(
771        &self,
772        db_name: &str,
773        schema_path: SchemaPath<'a>,
774        source_name: &str,
775    ) -> CatalogResult<(&Arc<SourceCatalog>, &'a str)> {
776        schema_path
777            .try_find(|schema_name| {
778                Ok(self
779                    .get_schema_by_name(db_name, schema_name)?
780                    .get_source_by_name(source_name))
781            })?
782            .ok_or_else(|| CatalogError::NotFound("source", source_name.to_owned()))
783    }
784
785    pub fn get_sink_by_name<'a>(
786        &self,
787        db_name: &str,
788        schema_path: SchemaPath<'a>,
789        sink_name: &str,
790        bind_creating: bool,
791    ) -> CatalogResult<(&Arc<SinkCatalog>, &'a str)> {
792        schema_path
793            .try_find(|schema_name| {
794                Ok(self
795                    .get_schema_by_name(db_name, schema_name)?
796                    .get_sink_by_name(sink_name, bind_creating))
797            })?
798            .ok_or_else(|| CatalogError::NotFound("sink", sink_name.to_owned()))
799    }
800
801    pub fn get_any_sink_by_name<'a>(
802        &self,
803        db_name: &str,
804        schema_path: SchemaPath<'a>,
805        sink_name: &str,
806    ) -> CatalogResult<(&Arc<SinkCatalog>, &'a str)> {
807        self.get_sink_by_name(db_name, schema_path, sink_name, true)
808    }
809
810    pub fn get_created_sink_by_name<'a>(
811        &self,
812        db_name: &str,
813        schema_path: SchemaPath<'a>,
814        sink_name: &str,
815    ) -> CatalogResult<(&Arc<SinkCatalog>, &'a str)> {
816        self.get_sink_by_name(db_name, schema_path, sink_name, false)
817    }
818
819    pub fn get_subscription_by_name<'a>(
820        &self,
821        db_name: &str,
822        schema_path: SchemaPath<'a>,
823        subscription_name: &str,
824    ) -> CatalogResult<(&Arc<SubscriptionCatalog>, &'a str)> {
825        schema_path
826            .try_find(|schema_name| {
827                Ok(self
828                    .get_schema_by_name(db_name, schema_name)?
829                    .get_subscription_by_name(subscription_name))
830            })?
831            .ok_or_else(|| CatalogError::NotFound("subscription", subscription_name.to_owned()))
832    }
833
834    pub fn get_index_by_name<'a>(
835        &self,
836        db_name: &str,
837        schema_path: SchemaPath<'a>,
838        index_name: &str,
839    ) -> CatalogResult<(&Arc<IndexCatalog>, &'a str)> {
840        schema_path
841            .try_find(|schema_name| {
842                Ok(self
843                    .get_schema_by_name(db_name, schema_name)?
844                    .get_created_index_by_name(index_name))
845            })?
846            .ok_or_else(|| CatalogError::NotFound("index", index_name.to_owned()))
847    }
848
849    pub fn get_any_index_by_name<'a>(
850        &self,
851        db_name: &str,
852        schema_path: SchemaPath<'a>,
853        index_name: &str,
854    ) -> CatalogResult<(&Arc<IndexCatalog>, &'a str)> {
855        schema_path
856            .try_find(|schema_name| {
857                Ok(self
858                    .get_schema_by_name(db_name, schema_name)?
859                    .get_any_index_by_name(index_name))
860            })?
861            .ok_or_else(|| CatalogError::NotFound("index", index_name.to_owned()))
862    }
863
864    pub fn get_index_by_id(
865        &self,
866        db_name: &str,
867        index_id: u32,
868    ) -> CatalogResult<&Arc<IndexCatalog>> {
869        let index_id = IndexId::from(index_id);
870        for schema in self.get_database_by_name(db_name)?.iter_schemas() {
871            if let Some(index) = schema.get_index_by_id(index_id) {
872                return Ok(index);
873            }
874        }
875        Err(CatalogError::NotFound("index", index_id.to_string()))
876    }
877
878    pub fn get_view_by_name<'a>(
879        &self,
880        db_name: &str,
881        schema_path: SchemaPath<'a>,
882        view_name: &str,
883    ) -> CatalogResult<(&Arc<ViewCatalog>, &'a str)> {
884        schema_path
885            .try_find(|schema_name| {
886                Ok(self
887                    .get_schema_by_name(db_name, schema_name)?
888                    .get_view_by_name(view_name))
889            })?
890            .ok_or_else(|| CatalogError::NotFound("view", view_name.to_owned()))
891    }
892
893    pub fn get_view_by_id(&self, db_name: &str, view_id: u32) -> CatalogResult<Arc<ViewCatalog>> {
894        for schema in self.get_database_by_name(db_name)?.iter_schemas() {
895            if let Some(view) = schema.get_view_by_id(ViewId::from(view_id)) {
896                return Ok(view.clone());
897            }
898        }
899        Err(CatalogError::NotFound("view", view_id.to_string()))
900    }
901
902    pub fn get_secret_by_name<'a>(
903        &self,
904        db_name: &str,
905        schema_path: SchemaPath<'a>,
906        secret_name: &str,
907    ) -> CatalogResult<(&Arc<SecretCatalog>, &'a str)> {
908        schema_path
909            .try_find(|schema_name| {
910                Ok(self
911                    .get_schema_by_name(db_name, schema_name)?
912                    .get_secret_by_name(secret_name))
913            })?
914            .ok_or_else(|| CatalogError::NotFound("secret", secret_name.to_owned()))
915    }
916
917    pub fn get_connection_by_id(
918        &self,
919        db_name: &str,
920        connection_id: ConnectionId,
921    ) -> CatalogResult<&Arc<ConnectionCatalog>> {
922        for schema in self.get_database_by_name(db_name)?.iter_schemas() {
923            if let Some(conn) = schema.get_connection_by_id(connection_id) {
924                return Ok(conn);
925            }
926        }
927        Err(CatalogError::NotFound(
928            "connection",
929            connection_id.to_string(),
930        ))
931    }
932
933    pub fn get_connection_by_name<'a>(
934        &self,
935        db_name: &str,
936        schema_path: SchemaPath<'a>,
937        connection_name: &str,
938    ) -> CatalogResult<(&Arc<ConnectionCatalog>, &'a str)> {
939        schema_path
940            .try_find(|schema_name| {
941                Ok(self
942                    .get_schema_by_name(db_name, schema_name)?
943                    .get_connection_by_name(connection_name))
944            })?
945            .ok_or_else(|| CatalogError::NotFound("connection", connection_name.to_owned()))
946    }
947
948    pub fn get_function_by_name_inputs<'a>(
949        &self,
950        db_name: &str,
951        schema_path: SchemaPath<'a>,
952        function_name: &str,
953        inputs: &mut [ExprImpl],
954    ) -> CatalogResult<(&Arc<FunctionCatalog>, &'a str)> {
955        schema_path
956            .try_find(|schema_name| {
957                Ok(self
958                    .get_schema_by_name(db_name, schema_name)?
959                    .get_function_by_name_inputs(function_name, inputs))
960            })?
961            .ok_or_else(|| {
962                CatalogError::NotFound(
963                    "function",
964                    format!(
965                        "{}({})",
966                        function_name,
967                        inputs
968                            .iter()
969                            .map(|a| a.return_type().to_string())
970                            .join(", ")
971                    ),
972                )
973            })
974    }
975
976    pub fn get_function_by_name_args<'a>(
977        &self,
978        db_name: &str,
979        schema_path: SchemaPath<'a>,
980        function_name: &str,
981        args: &[DataType],
982    ) -> CatalogResult<(&Arc<FunctionCatalog>, &'a str)> {
983        schema_path
984            .try_find(|schema_name| {
985                Ok(self
986                    .get_schema_by_name(db_name, schema_name)?
987                    .get_function_by_name_args(function_name, args))
988            })?
989            .ok_or_else(|| {
990                CatalogError::NotFound(
991                    "function",
992                    format!(
993                        "{}({})",
994                        function_name,
995                        args.iter().map(|a| a.to_string()).join(", ")
996                    ),
997                )
998            })
999    }
1000
1001    /// Gets all functions with the given name.
1002    pub fn get_functions_by_name<'a>(
1003        &self,
1004        db_name: &str,
1005        schema_path: SchemaPath<'a>,
1006        function_name: &str,
1007    ) -> CatalogResult<(Vec<&Arc<FunctionCatalog>>, &'a str)> {
1008        schema_path
1009            .try_find(|schema_name| {
1010                Ok(self
1011                    .get_schema_by_name(db_name, schema_name)?
1012                    .get_functions_by_name(function_name))
1013            })?
1014            .ok_or_else(|| CatalogError::NotFound("function", function_name.to_owned()))
1015    }
1016
1017    /// Check if the name duplicates with existing table, materialized view or source.
1018    pub fn check_relation_name_duplicated(
1019        &self,
1020        db_name: &str,
1021        schema_name: &str,
1022        relation_name: &str,
1023    ) -> CatalogResult<()> {
1024        let schema = self.get_schema_by_name(db_name, schema_name)?;
1025
1026        if let Some(table) = schema.get_any_table_by_name(relation_name) {
1027            let is_creating = table.stream_job_status == StreamJobStatus::Creating;
1028            if table.is_index() {
1029                Err(CatalogError::Duplicated(
1030                    "index",
1031                    relation_name.to_owned(),
1032                    is_creating,
1033                ))
1034            } else if table.is_mview() {
1035                Err(CatalogError::Duplicated(
1036                    "materialized view",
1037                    relation_name.to_owned(),
1038                    is_creating,
1039                ))
1040            } else {
1041                Err(CatalogError::Duplicated(
1042                    "table",
1043                    relation_name.to_owned(),
1044                    is_creating,
1045                ))
1046            }
1047        } else if schema.get_source_by_name(relation_name).is_some() {
1048            Err(CatalogError::duplicated("source", relation_name.to_owned()))
1049        } else if let Some(sink) = schema.get_any_sink_by_name(relation_name) {
1050            Err(CatalogError::Duplicated(
1051                "sink",
1052                relation_name.to_owned(),
1053                !sink.is_created(),
1054            ))
1055        } else if schema.get_view_by_name(relation_name).is_some() {
1056            Err(CatalogError::duplicated("view", relation_name.to_owned()))
1057        } else if let Some(subscription) = schema.get_subscription_by_name(relation_name) {
1058            let is_not_created = subscription.subscription_state != SubscriptionState::Created;
1059            Err(CatalogError::Duplicated(
1060                "subscription",
1061                relation_name.to_owned(),
1062                is_not_created,
1063            ))
1064        } else {
1065            Ok(())
1066        }
1067    }
1068
1069    pub fn check_function_name_duplicated(
1070        &self,
1071        db_name: &str,
1072        schema_name: &str,
1073        function_name: &str,
1074        arg_types: &[DataType],
1075    ) -> CatalogResult<()> {
1076        let schema = self.get_schema_by_name(db_name, schema_name)?;
1077
1078        if schema
1079            .get_function_by_name_args(function_name, arg_types)
1080            .is_some()
1081        {
1082            let name = format!(
1083                "{function_name}({})",
1084                arg_types.iter().map(|t| t.to_string()).join(",")
1085            );
1086            Err(CatalogError::duplicated("function", name))
1087        } else {
1088            Ok(())
1089        }
1090    }
1091
1092    /// Check if the name duplicates with existing connection.
1093    pub fn check_connection_name_duplicated(
1094        &self,
1095        db_name: &str,
1096        schema_name: &str,
1097        connection_name: &str,
1098    ) -> CatalogResult<()> {
1099        let schema = self.get_schema_by_name(db_name, schema_name)?;
1100
1101        if schema.get_connection_by_name(connection_name).is_some() {
1102            Err(CatalogError::duplicated(
1103                "connection",
1104                connection_name.to_owned(),
1105            ))
1106        } else {
1107            Ok(())
1108        }
1109    }
1110
1111    pub fn check_secret_name_duplicated(
1112        &self,
1113        db_name: &str,
1114        schema_name: &str,
1115        secret_name: &str,
1116    ) -> CatalogResult<()> {
1117        let schema = self.get_schema_by_name(db_name, schema_name)?;
1118
1119        if schema.get_secret_by_name(secret_name).is_some() {
1120            Err(CatalogError::duplicated("secret", secret_name.to_owned()))
1121        } else {
1122            Ok(())
1123        }
1124    }
1125
1126    pub fn table_stats(&self) -> &HummockVersionStats {
1127        &self.table_stats
1128    }
1129
1130    pub fn set_table_stats(&mut self, table_stats: HummockVersionStats) {
1131        self.table_stats = table_stats;
1132    }
1133
1134    pub fn get_all_indexes_related_to_object(
1135        &self,
1136        db_id: DatabaseId,
1137        schema_id: SchemaId,
1138        mv_id: TableId,
1139    ) -> Vec<Arc<IndexCatalog>> {
1140        self.get_database_by_id(db_id)
1141            .unwrap()
1142            .get_schema_by_id(schema_id)
1143            .unwrap()
1144            .get_any_indexes_by_table_id(mv_id)
1145    }
1146
1147    pub fn get_id_by_class_name(
1148        &self,
1149        db_name: &str,
1150        schema_path: SchemaPath<'_>,
1151        class_name: &str,
1152    ) -> CatalogResult<ObjectId> {
1153        schema_path
1154            .try_find(|schema_name| {
1155                let schema = self.get_schema_by_name(db_name, schema_name)?;
1156                #[allow(clippy::manual_map)]
1157                if let Some(item) = schema.get_system_table_by_name(class_name) {
1158                    Ok(Some(item.id().as_object_id()))
1159                } else if let Some(item) = schema.get_any_table_by_name(class_name) {
1160                    Ok(Some(item.id().as_object_id()))
1161                } else if let Some(item) = schema.get_any_index_by_name(class_name) {
1162                    Ok(Some(item.id.as_object_id()))
1163                } else if let Some(item) = schema.get_source_by_name(class_name) {
1164                    Ok(Some(item.id.as_object_id()))
1165                } else if let Some(item) = schema.get_view_by_name(class_name) {
1166                    Ok(Some(item.id.as_object_id()))
1167                } else if let Some(item) = schema.get_any_sink_by_name(class_name) {
1168                    Ok(Some(item.id.as_object_id()))
1169                } else {
1170                    Ok(None)
1171                }
1172            })?
1173            .map(|(id, _)| id)
1174            .ok_or_else(|| CatalogError::NotFound("class", class_name.to_owned()))
1175    }
1176}