risingwave_frontend/catalog/
root_catalog.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::Arc;
17
18use itertools::Itertools;
19use risingwave_common::catalog::{FunctionId, IndexId, StreamJobStatus, TableId};
20use risingwave_common::session_config::{SearchPath, USER_NAME_WILD_CARD};
21use risingwave_common::types::DataType;
22use risingwave_connector::sink::catalog::SinkCatalog;
23use risingwave_pb::catalog::{
24    PbConnection, PbDatabase, PbFunction, PbIndex, PbSchema, PbSecret, PbSink, PbSource,
25    PbSubscription, PbTable, PbView,
26};
27use risingwave_pb::hummock::HummockVersionStats;
28
29use super::function_catalog::FunctionCatalog;
30use super::source_catalog::SourceCatalog;
31use super::subscription_catalog::{SubscriptionCatalog, SubscriptionState};
32use super::view_catalog::ViewCatalog;
33use super::{
34    CatalogError, CatalogResult, ConnectionId, SecretId, SinkId, SourceId, SubscriptionId, ViewId,
35};
36use crate::catalog::connection_catalog::ConnectionCatalog;
37use crate::catalog::database_catalog::DatabaseCatalog;
38use crate::catalog::schema_catalog::SchemaCatalog;
39use crate::catalog::secret_catalog::SecretCatalog;
40use crate::catalog::system_catalog::{
41    SystemTableCatalog, get_sys_tables_in_schema, get_sys_views_in_schema,
42};
43use crate::catalog::table_catalog::TableCatalog;
44use crate::catalog::{DatabaseId, IndexCatalog, SchemaId};
45use crate::expr::{Expr, ExprImpl};
46
47#[derive(Copy, Clone)]
48pub enum SchemaPath<'a> {
49    Name(&'a str),
50    /// (`search_path`, `user_name`).
51    Path(&'a SearchPath, &'a str),
52}
53
54impl<'a> SchemaPath<'a> {
55    pub fn new(
56        schema_name: Option<&'a str>,
57        search_path: &'a SearchPath,
58        user_name: &'a str,
59    ) -> Self {
60        match schema_name {
61            Some(schema_name) => SchemaPath::Name(schema_name),
62            None => SchemaPath::Path(search_path, user_name),
63        }
64    }
65
66    /// Call function `f` for each schema name. Return the first `Some` result.
67    pub fn try_find<T, E>(
68        &self,
69        mut f: impl FnMut(&str) -> Result<Option<T>, E>,
70    ) -> Result<Option<(T, &'a str)>, E> {
71        match self {
72            SchemaPath::Name(schema_name) => Ok(f(schema_name)?.map(|t| (t, *schema_name))),
73            SchemaPath::Path(search_path, user_name) => {
74                for schema_name in search_path.path() {
75                    let mut schema_name: &str = schema_name;
76                    if schema_name == USER_NAME_WILD_CARD {
77                        schema_name = user_name;
78                    }
79                    if let Ok(Some(res)) = f(schema_name) {
80                        return Ok(Some((res, schema_name)));
81                    }
82                }
83                Ok(None)
84            }
85        }
86    }
87}
88
89/// Root catalog of database catalog. It manages all database/schema/table in memory on frontend.
90/// It is protected by a `RwLock`. Only [`crate::observer::FrontendObserverNode`]
91/// will acquire the write lock and sync it with the meta catalog. In other situations, it is
92/// read only.
93///
94/// - catalog (root catalog)
95///   - database catalog
96///     - schema catalog
97///       - function catalog (i.e., user defined function)
98///       - table/sink/source/index/view catalog
99///        - column catalog
100pub struct Catalog {
101    database_by_name: HashMap<String, DatabaseCatalog>,
102    db_name_by_id: HashMap<DatabaseId, String>,
103    /// all table catalogs in the cluster identified by universal unique table id.
104    table_by_id: HashMap<TableId, Arc<TableCatalog>>,
105    table_stats: HummockVersionStats,
106}
107
108#[expect(clippy::derivable_impls)]
109impl Default for Catalog {
110    fn default() -> Self {
111        Self {
112            database_by_name: HashMap::new(),
113            db_name_by_id: HashMap::new(),
114            table_by_id: HashMap::new(),
115            table_stats: HummockVersionStats::default(),
116        }
117    }
118}
119
120impl Catalog {
121    fn get_database_mut(&mut self, db_id: DatabaseId) -> Option<&mut DatabaseCatalog> {
122        let name = self.db_name_by_id.get(&db_id)?;
123        self.database_by_name.get_mut(name)
124    }
125
126    pub fn clear(&mut self) {
127        self.database_by_name.clear();
128        self.db_name_by_id.clear();
129        self.table_by_id.clear();
130    }
131
132    pub fn create_database(&mut self, db: &PbDatabase) {
133        let name = db.name.clone();
134        let id = db.id;
135
136        self.database_by_name
137            .try_insert(name.clone(), db.into())
138            .unwrap();
139        self.db_name_by_id.try_insert(id, name).unwrap();
140    }
141
142    pub fn create_schema(&mut self, proto: &PbSchema) {
143        self.get_database_mut(proto.database_id)
144            .unwrap()
145            .create_schema(proto);
146
147        for sys_table in get_sys_tables_in_schema(proto.name.as_str()) {
148            self.get_database_mut(proto.database_id)
149                .unwrap()
150                .get_schema_mut(proto.id)
151                .unwrap()
152                .create_sys_table(sys_table);
153        }
154        for mut sys_view in get_sys_views_in_schema(proto.name.as_str()) {
155            sys_view.database_id = proto.database_id;
156            sys_view.schema_id = proto.id;
157            self.get_database_mut(proto.database_id)
158                .unwrap()
159                .get_schema_mut(proto.id)
160                .unwrap()
161                .create_sys_view(Arc::new(sys_view));
162        }
163    }
164
165    pub fn create_table(&mut self, proto: &PbTable) {
166        let table = self
167            .get_database_mut(proto.database_id)
168            .unwrap()
169            .get_schema_mut(proto.schema_id)
170            .unwrap()
171            .create_table(proto);
172        self.table_by_id.insert(proto.id.into(), table);
173    }
174
175    pub fn create_index(&mut self, proto: &PbIndex) {
176        self.get_database_mut(proto.database_id)
177            .unwrap()
178            .get_schema_mut(proto.schema_id)
179            .unwrap()
180            .create_index(proto);
181    }
182
183    pub fn create_source(&mut self, proto: &PbSource) {
184        self.get_database_mut(proto.database_id)
185            .unwrap()
186            .get_schema_mut(proto.schema_id)
187            .unwrap()
188            .create_source(proto);
189    }
190
191    pub fn create_sink(&mut self, proto: &PbSink) {
192        self.get_database_mut(proto.database_id)
193            .unwrap()
194            .get_schema_mut(proto.schema_id)
195            .unwrap()
196            .create_sink(proto);
197    }
198
199    pub fn create_subscription(&mut self, proto: &PbSubscription) {
200        self.get_database_mut(proto.database_id)
201            .unwrap()
202            .get_schema_mut(proto.schema_id)
203            .unwrap()
204            .create_subscription(proto);
205    }
206
207    pub fn create_secret(&mut self, proto: &PbSecret) {
208        self.get_database_mut(proto.database_id)
209            .unwrap()
210            .get_schema_mut(proto.schema_id)
211            .unwrap()
212            .create_secret(proto);
213    }
214
215    pub fn create_view(&mut self, proto: &PbView) {
216        self.get_database_mut(proto.database_id)
217            .unwrap()
218            .get_schema_mut(proto.schema_id)
219            .unwrap()
220            .create_view(proto);
221    }
222
223    pub fn create_function(&mut self, proto: &PbFunction) {
224        self.get_database_mut(proto.database_id)
225            .unwrap()
226            .get_schema_mut(proto.schema_id)
227            .unwrap()
228            .create_function(proto);
229    }
230
231    pub fn create_connection(&mut self, proto: &PbConnection) {
232        self.get_database_mut(proto.database_id)
233            .unwrap()
234            .get_schema_mut(proto.schema_id)
235            .unwrap()
236            .create_connection(proto);
237    }
238
239    pub fn drop_connection(
240        &mut self,
241        db_id: DatabaseId,
242        schema_id: SchemaId,
243        connection_id: ConnectionId,
244    ) {
245        self.get_database_mut(db_id)
246            .unwrap()
247            .get_schema_mut(schema_id)
248            .unwrap()
249            .drop_connection(connection_id);
250    }
251
252    pub fn update_connection(&mut self, proto: &PbConnection) {
253        let database = self.get_database_mut(proto.database_id).unwrap();
254        let schema = database.get_schema_mut(proto.schema_id).unwrap();
255        if schema.get_connection_by_id(&proto.id).is_some() {
256            schema.update_connection(proto);
257        } else {
258            // Enter this branch when schema is changed by `ALTER ... SET SCHEMA ...` statement.
259            schema.create_connection(proto);
260            database
261                .iter_schemas_mut()
262                .find(|schema| {
263                    schema.id() != proto.schema_id
264                        && schema.get_connection_by_id(&proto.id).is_some()
265                })
266                .unwrap()
267                .drop_connection(proto.id);
268        }
269    }
270
271    pub fn update_secret(&mut self, proto: &PbSecret) {
272        let database = self.get_database_mut(proto.database_id).unwrap();
273        let schema = database.get_schema_mut(proto.schema_id).unwrap();
274        let secret_id = SecretId::new(proto.id);
275        if schema.get_secret_by_id(&secret_id).is_some() {
276            schema.update_secret(proto);
277        } else {
278            // Enter this branch when schema is changed by `ALTER ... SET SCHEMA ...` statement.
279            schema.create_secret(proto);
280            database
281                .iter_schemas_mut()
282                .find(|schema| {
283                    schema.id() != proto.schema_id && schema.get_secret_by_id(&secret_id).is_some()
284                })
285                .unwrap()
286                .drop_secret(secret_id);
287        }
288    }
289
290    pub fn drop_database(&mut self, db_id: DatabaseId) {
291        let name = self.db_name_by_id.remove(&db_id).unwrap();
292        let database = self.database_by_name.remove(&name).unwrap();
293        database.iter_all_table_ids().for_each(|table| {
294            self.table_by_id.remove(&table);
295        });
296    }
297
298    pub fn drop_schema(&mut self, db_id: DatabaseId, schema_id: SchemaId) {
299        self.get_database_mut(db_id).unwrap().drop_schema(schema_id);
300    }
301
302    pub fn drop_table(&mut self, db_id: DatabaseId, schema_id: SchemaId, tb_id: TableId) {
303        self.table_by_id.remove(&tb_id);
304        self.get_database_mut(db_id)
305            .unwrap()
306            .get_schema_mut(schema_id)
307            .unwrap()
308            .drop_table(tb_id);
309    }
310
311    pub fn update_table(&mut self, proto: &PbTable) {
312        let database = self.get_database_mut(proto.database_id).unwrap();
313        let schema = database.get_schema_mut(proto.schema_id).unwrap();
314        let table = if schema.get_table_by_id(&proto.id.into()).is_some() {
315            schema.update_table(proto)
316        } else {
317            // Enter this branch when schema is changed by `ALTER ... SET SCHEMA ...` statement.
318            let new_table = schema.create_table(proto);
319            database
320                .iter_schemas_mut()
321                .find(|schema| {
322                    schema.id() != proto.schema_id
323                        && schema.get_created_table_by_id(&proto.id.into()).is_some()
324                })
325                .unwrap()
326                .drop_table(proto.id.into());
327            new_table
328        };
329
330        self.table_by_id.insert(proto.id.into(), table);
331    }
332
333    pub fn update_database(&mut self, proto: &PbDatabase) {
334        let id = proto.id;
335        let name = proto.name.clone();
336
337        let old_database_name = self.db_name_by_id.get(&id).unwrap().to_owned();
338        if old_database_name != name {
339            let mut database = self.database_by_name.remove(&old_database_name).unwrap();
340            database.name.clone_from(&name);
341            database.owner = proto.owner;
342            self.database_by_name.insert(name.clone(), database);
343            self.db_name_by_id.insert(id, name);
344        } else {
345            let database = self.get_database_mut(id).unwrap();
346            database.name = name;
347            database.owner = proto.owner;
348            database.barrier_interval_ms = proto.barrier_interval_ms;
349            database.checkpoint_frequency = proto.checkpoint_frequency;
350        }
351    }
352
353    pub fn update_schema(&mut self, proto: &PbSchema) {
354        self.get_database_mut(proto.database_id)
355            .unwrap()
356            .update_schema(proto);
357    }
358
359    pub fn update_index(&mut self, proto: &PbIndex) {
360        let database = self.get_database_mut(proto.database_id).unwrap();
361        let schema = database.get_schema_mut(proto.schema_id).unwrap();
362        if schema.get_index_by_id(&proto.id.into()).is_some() {
363            schema.update_index(proto);
364        } else {
365            // Enter this branch when schema is changed by `ALTER ... SET SCHEMA ...` statement.
366            schema.create_index(proto);
367            database
368                .iter_schemas_mut()
369                .find(|schema| {
370                    schema.id() != proto.schema_id
371                        && schema.get_index_by_id(&proto.id.into()).is_some()
372                })
373                .unwrap()
374                .drop_index(proto.id.into());
375        }
376    }
377
378    pub fn drop_source(&mut self, db_id: DatabaseId, schema_id: SchemaId, source_id: SourceId) {
379        self.get_database_mut(db_id)
380            .unwrap()
381            .get_schema_mut(schema_id)
382            .unwrap()
383            .drop_source(source_id);
384    }
385
386    pub fn update_source(&mut self, proto: &PbSource) {
387        let database = self.get_database_mut(proto.database_id).unwrap();
388        let schema = database.get_schema_mut(proto.schema_id).unwrap();
389        if schema.get_source_by_id(&proto.id).is_some() {
390            schema.update_source(proto);
391        } else {
392            // Enter this branch when schema is changed by `ALTER ... SET SCHEMA ...` statement.
393            schema.create_source(proto);
394            database
395                .iter_schemas_mut()
396                .find(|schema| {
397                    schema.id() != proto.schema_id && schema.get_source_by_id(&proto.id).is_some()
398                })
399                .unwrap()
400                .drop_source(proto.id);
401        }
402    }
403
404    pub fn drop_sink(&mut self, db_id: DatabaseId, schema_id: SchemaId, sink_id: SinkId) {
405        self.get_database_mut(db_id)
406            .unwrap()
407            .get_schema_mut(schema_id)
408            .unwrap()
409            .drop_sink(sink_id);
410    }
411
412    pub fn drop_secret(&mut self, db_id: DatabaseId, schema_id: SchemaId, secret_id: SecretId) {
413        self.get_database_mut(db_id)
414            .unwrap()
415            .get_schema_mut(schema_id)
416            .unwrap()
417            .drop_secret(secret_id);
418    }
419
420    pub fn update_sink(&mut self, proto: &PbSink) {
421        let database = self.get_database_mut(proto.database_id).unwrap();
422        let schema = database.get_schema_mut(proto.schema_id).unwrap();
423        if schema.get_sink_by_id(&proto.id).is_some() {
424            schema.update_sink(proto);
425        } else {
426            // Enter this branch when schema is changed by `ALTER ... SET SCHEMA ...` statement.
427            schema.create_sink(proto);
428            database
429                .iter_schemas_mut()
430                .find(|schema| {
431                    schema.id() != proto.schema_id && schema.get_sink_by_id(&proto.id).is_some()
432                })
433                .unwrap()
434                .drop_sink(proto.id);
435        }
436    }
437
438    pub fn drop_subscription(
439        &mut self,
440        db_id: DatabaseId,
441        schema_id: SchemaId,
442        subscription_id: SubscriptionId,
443    ) {
444        self.get_database_mut(db_id)
445            .unwrap()
446            .get_schema_mut(schema_id)
447            .unwrap()
448            .drop_subscription(subscription_id);
449    }
450
451    pub fn update_subscription(&mut self, proto: &PbSubscription) {
452        let database = self.get_database_mut(proto.database_id).unwrap();
453        let schema = database.get_schema_mut(proto.schema_id).unwrap();
454        if schema.get_subscription_by_id(&proto.id).is_some() {
455            schema.update_subscription(proto);
456        } else {
457            // Enter this branch when schema is changed by `ALTER ... SET SCHEMA ...` statement.
458            schema.create_subscription(proto);
459            database
460                .iter_schemas_mut()
461                .find(|schema| {
462                    schema.id() != proto.schema_id
463                        && schema.get_subscription_by_id(&proto.id).is_some()
464                })
465                .unwrap()
466                .drop_subscription(proto.id);
467        }
468    }
469
470    pub fn drop_index(&mut self, db_id: DatabaseId, schema_id: SchemaId, index_id: IndexId) {
471        self.get_database_mut(db_id)
472            .unwrap()
473            .get_schema_mut(schema_id)
474            .unwrap()
475            .drop_index(index_id);
476    }
477
478    pub fn drop_view(&mut self, db_id: DatabaseId, schema_id: SchemaId, view_id: ViewId) {
479        self.get_database_mut(db_id)
480            .unwrap()
481            .get_schema_mut(schema_id)
482            .unwrap()
483            .drop_view(view_id);
484    }
485
486    pub fn update_view(&mut self, proto: &PbView) {
487        let database = self.get_database_mut(proto.database_id).unwrap();
488        let schema = database.get_schema_mut(proto.schema_id).unwrap();
489        if schema.get_view_by_id(&proto.id).is_some() {
490            schema.update_view(proto);
491        } else {
492            // Enter this branch when schema is changed by `ALTER ... SET SCHEMA ...` statement.
493            schema.create_view(proto);
494            database
495                .iter_schemas_mut()
496                .find(|schema| {
497                    schema.id() != proto.schema_id && schema.get_view_by_id(&proto.id).is_some()
498                })
499                .unwrap()
500                .drop_view(proto.id);
501        }
502    }
503
504    pub fn drop_function(
505        &mut self,
506        db_id: DatabaseId,
507        schema_id: SchemaId,
508        function_id: FunctionId,
509    ) {
510        self.get_database_mut(db_id)
511            .unwrap()
512            .get_schema_mut(schema_id)
513            .unwrap()
514            .drop_function(function_id);
515    }
516
517    pub fn update_function(&mut self, proto: &PbFunction) {
518        let database = self.get_database_mut(proto.database_id).unwrap();
519        let schema = database.get_schema_mut(proto.schema_id).unwrap();
520        if schema.get_function_by_id(proto.id.into()).is_some() {
521            schema.update_function(proto);
522        } else {
523            // Enter this branch when schema is changed by `ALTER ... SET SCHEMA ...` statement.
524            schema.create_function(proto);
525            database
526                .iter_schemas_mut()
527                .find(|schema| {
528                    schema.id() != proto.schema_id
529                        && schema.get_function_by_id(proto.id.into()).is_some()
530                })
531                .unwrap()
532                .drop_function(proto.id.into());
533        }
534
535        self.get_database_mut(proto.database_id)
536            .unwrap()
537            .get_schema_mut(proto.schema_id)
538            .unwrap()
539            .update_function(proto);
540    }
541
542    pub fn get_database_by_name(&self, db_name: &str) -> CatalogResult<&DatabaseCatalog> {
543        self.database_by_name
544            .get(db_name)
545            .ok_or_else(|| CatalogError::NotFound("database", db_name.to_owned()))
546    }
547
548    pub fn get_database_by_id(&self, db_id: &DatabaseId) -> CatalogResult<&DatabaseCatalog> {
549        let db_name = self
550            .db_name_by_id
551            .get(db_id)
552            .ok_or_else(|| CatalogError::NotFound("db_id", db_id.to_string()))?;
553        self.database_by_name
554            .get(db_name)
555            .ok_or_else(|| CatalogError::NotFound("database", db_name.clone()))
556    }
557
558    pub fn find_schema_secret_by_secret_id(
559        &self,
560        db_name: &str,
561        secret_id: SecretId,
562    ) -> CatalogResult<(String, String)> {
563        let db = self.get_database_by_name(db_name)?;
564        let schema_secret = db
565            .iter_schemas()
566            .find_map(|schema| {
567                schema
568                    .get_secret_by_id(&secret_id)
569                    .map(|secret| (schema.name(), secret.name.clone()))
570            })
571            .ok_or_else(|| CatalogError::NotFound("secret", secret_id.to_string()))?;
572        Ok(schema_secret)
573    }
574
575    pub fn get_all_schema_names(&self, db_name: &str) -> CatalogResult<Vec<String>> {
576        Ok(self.get_database_by_name(db_name)?.get_all_schema_names())
577    }
578
579    pub fn iter_schemas(
580        &self,
581        db_name: &str,
582    ) -> CatalogResult<impl Iterator<Item = &SchemaCatalog>> {
583        Ok(self.get_database_by_name(db_name)?.iter_schemas())
584    }
585
586    pub fn get_all_database_names(&self) -> Vec<String> {
587        self.database_by_name.keys().cloned().collect_vec()
588    }
589
590    pub fn iter_databases(&self) -> impl Iterator<Item = &DatabaseCatalog> {
591        self.database_by_name.values()
592    }
593
594    pub fn get_schema_by_name(
595        &self,
596        db_name: &str,
597        schema_name: &str,
598    ) -> CatalogResult<&SchemaCatalog> {
599        self.get_database_by_name(db_name)?
600            .get_schema_by_name(schema_name)
601            .ok_or_else(|| CatalogError::NotFound("schema", schema_name.to_owned()))
602    }
603
604    pub fn get_table_name_by_id(&self, table_id: TableId) -> CatalogResult<String> {
605        self.get_any_table_by_id(&table_id)
606            .map(|table| table.name.clone())
607    }
608
609    pub fn get_schema_by_id(
610        &self,
611        db_id: &DatabaseId,
612        schema_id: &SchemaId,
613    ) -> CatalogResult<&SchemaCatalog> {
614        self.get_database_by_id(db_id)?
615            .get_schema_by_id(schema_id)
616            .ok_or_else(|| CatalogError::NotFound("schema_id", schema_id.to_string()))
617    }
618
619    /// Refer to [`SearchPath`].
620    pub fn first_valid_schema(
621        &self,
622        db_name: &str,
623        search_path: &SearchPath,
624        user_name: &str,
625    ) -> CatalogResult<&SchemaCatalog> {
626        for path in search_path.real_path() {
627            let mut schema_name: &str = path;
628            if schema_name == USER_NAME_WILD_CARD {
629                schema_name = user_name;
630            }
631
632            if let schema_catalog @ Ok(_) = self.get_schema_by_name(db_name, schema_name) {
633                return schema_catalog;
634            }
635        }
636        Err(CatalogError::NotFound(
637            "first valid schema",
638            "no schema has been selected to create in".to_owned(),
639        ))
640    }
641
642    pub fn get_source_by_id<'a>(
643        &self,
644        db_name: &'a str,
645        schema_path: SchemaPath<'a>,
646        source_id: &SourceId,
647    ) -> CatalogResult<(&Arc<SourceCatalog>, &'a str)> {
648        schema_path
649            .try_find(|schema_name| {
650                Ok(self
651                    .get_schema_by_name(db_name, schema_name)?
652                    .get_source_by_id(source_id))
653            })?
654            .ok_or_else(|| CatalogError::NotFound("source", source_id.to_string()))
655    }
656
657    pub fn get_table_by_name<'a>(
658        &self,
659        db_name: &str,
660        schema_path: SchemaPath<'a>,
661        table_name: &str,
662        bind_creating: bool,
663    ) -> CatalogResult<(&Arc<TableCatalog>, &'a str)> {
664        schema_path
665            .try_find(|schema_name| {
666                Ok(self
667                    .get_schema_by_name(db_name, schema_name)?
668                    .get_table_by_name(table_name, bind_creating))
669            })?
670            .ok_or_else(|| CatalogError::NotFound("table", table_name.to_owned()))
671    }
672
673    /// Used to get `TableCatalog` for Materialized Views, Tables and Indexes.
674    /// Retrieves all tables, created or creating.
675    pub fn get_any_table_by_name<'a>(
676        &self,
677        db_name: &str,
678        schema_path: SchemaPath<'a>,
679        table_name: &str,
680    ) -> CatalogResult<(&Arc<TableCatalog>, &'a str)> {
681        self.get_table_by_name(db_name, schema_path, table_name, true)
682    }
683
684    /// Used to get `TableCatalog` for Materialized Views, Tables and Indexes.
685    /// Retrieves only created tables.
686    pub fn get_created_table_by_name<'a>(
687        &self,
688        db_name: &str,
689        schema_path: SchemaPath<'a>,
690        table_name: &str,
691    ) -> CatalogResult<(&Arc<TableCatalog>, &'a str)> {
692        self.get_table_by_name(db_name, schema_path, table_name, false)
693    }
694
695    pub fn get_any_table_by_id(&self, table_id: &TableId) -> CatalogResult<&Arc<TableCatalog>> {
696        self.table_by_id
697            .get(table_id)
698            .ok_or_else(|| CatalogError::NotFound("table id", table_id.to_string()))
699    }
700
701    /// This function is similar to `get_table_by_id` expect that a table must be in a given database.
702    pub fn get_created_table_by_id_with_db(
703        &self,
704        db_name: &str,
705        table_id: u32,
706    ) -> CatalogResult<&Arc<TableCatalog>> {
707        let table_id = TableId::from(table_id);
708        for schema in self.get_database_by_name(db_name)?.iter_schemas() {
709            if let Some(table) = schema.get_created_table_by_id(&table_id) {
710                return Ok(table);
711            }
712        }
713        Err(CatalogError::NotFound("table id", table_id.to_string()))
714    }
715
716    pub fn iter_tables(&self) -> impl Iterator<Item = &Arc<TableCatalog>> {
717        self.table_by_id.values()
718    }
719
720    pub fn iter_backfilling_internal_tables(&self) -> impl Iterator<Item = &Arc<TableCatalog>> {
721        self.table_by_id
722            .values()
723            .filter(|t| t.is_internal_table() && !t.is_created())
724    }
725
726    // Used by test_utils only.
727    pub fn alter_table_name_by_id(&mut self, table_id: &TableId, table_name: &str) {
728        let mut found = false;
729        for database in self.database_by_name.values() {
730            if !found {
731                for schema in database.iter_schemas() {
732                    if schema.iter_user_table().any(|t| t.id() == *table_id) {
733                        found = true;
734                        break;
735                    }
736                }
737            }
738        }
739
740        if found {
741            let mut table = self.get_any_table_by_id(table_id).unwrap().to_prost();
742            table.name = table_name.to_owned();
743            self.update_table(&table);
744        }
745    }
746
747    #[cfg(test)]
748    pub fn insert_table_id_mapping(&mut self, table_id: TableId, fragment_id: super::FragmentId) {
749        self.table_by_id.insert(
750            table_id,
751            Arc::new(TableCatalog {
752                fragment_id,
753                ..Default::default()
754            }),
755        );
756    }
757
758    pub fn get_sys_table_by_name(
759        &self,
760        db_name: &str,
761        schema_name: &str,
762        table_name: &str,
763    ) -> CatalogResult<&Arc<SystemTableCatalog>> {
764        self.get_schema_by_name(db_name, schema_name)?
765            .get_system_table_by_name(table_name)
766            .ok_or_else(|| CatalogError::NotFound("table", table_name.to_owned()))
767    }
768
769    pub fn get_source_by_name<'a>(
770        &self,
771        db_name: &str,
772        schema_path: SchemaPath<'a>,
773        source_name: &str,
774    ) -> CatalogResult<(&Arc<SourceCatalog>, &'a str)> {
775        schema_path
776            .try_find(|schema_name| {
777                Ok(self
778                    .get_schema_by_name(db_name, schema_name)?
779                    .get_source_by_name(source_name))
780            })?
781            .ok_or_else(|| CatalogError::NotFound("source", source_name.to_owned()))
782    }
783
784    pub fn get_sink_by_name<'a>(
785        &self,
786        db_name: &str,
787        schema_path: SchemaPath<'a>,
788        sink_name: &str,
789        bind_creating: bool,
790    ) -> CatalogResult<(&Arc<SinkCatalog>, &'a str)> {
791        schema_path
792            .try_find(|schema_name| {
793                Ok(self
794                    .get_schema_by_name(db_name, schema_name)?
795                    .get_sink_by_name(sink_name, bind_creating))
796            })?
797            .ok_or_else(|| CatalogError::NotFound("sink", sink_name.to_owned()))
798    }
799
800    pub fn get_any_sink_by_name<'a>(
801        &self,
802        db_name: &str,
803        schema_path: SchemaPath<'a>,
804        sink_name: &str,
805    ) -> CatalogResult<(&Arc<SinkCatalog>, &'a str)> {
806        self.get_sink_by_name(db_name, schema_path, sink_name, true)
807    }
808
809    pub fn get_created_sink_by_name<'a>(
810        &self,
811        db_name: &str,
812        schema_path: SchemaPath<'a>,
813        sink_name: &str,
814    ) -> CatalogResult<(&Arc<SinkCatalog>, &'a str)> {
815        self.get_sink_by_name(db_name, schema_path, sink_name, false)
816    }
817
818    pub fn get_subscription_by_name<'a>(
819        &self,
820        db_name: &str,
821        schema_path: SchemaPath<'a>,
822        subscription_name: &str,
823    ) -> CatalogResult<(&Arc<SubscriptionCatalog>, &'a str)> {
824        schema_path
825            .try_find(|schema_name| {
826                Ok(self
827                    .get_schema_by_name(db_name, schema_name)?
828                    .get_subscription_by_name(subscription_name))
829            })?
830            .ok_or_else(|| CatalogError::NotFound("subscription", subscription_name.to_owned()))
831    }
832
833    pub fn get_index_by_name<'a>(
834        &self,
835        db_name: &str,
836        schema_path: SchemaPath<'a>,
837        index_name: &str,
838    ) -> CatalogResult<(&Arc<IndexCatalog>, &'a str)> {
839        schema_path
840            .try_find(|schema_name| {
841                Ok(self
842                    .get_schema_by_name(db_name, schema_name)?
843                    .get_created_index_by_name(index_name))
844            })?
845            .ok_or_else(|| CatalogError::NotFound("index", index_name.to_owned()))
846    }
847
848    pub fn get_any_index_by_name<'a>(
849        &self,
850        db_name: &str,
851        schema_path: SchemaPath<'a>,
852        index_name: &str,
853    ) -> CatalogResult<(&Arc<IndexCatalog>, &'a str)> {
854        schema_path
855            .try_find(|schema_name| {
856                Ok(self
857                    .get_schema_by_name(db_name, schema_name)?
858                    .get_any_index_by_name(index_name))
859            })?
860            .ok_or_else(|| CatalogError::NotFound("index", index_name.to_owned()))
861    }
862
863    pub fn get_index_by_id(
864        &self,
865        db_name: &str,
866        index_id: u32,
867    ) -> CatalogResult<&Arc<IndexCatalog>> {
868        let index_id = IndexId::from(index_id);
869        for schema in self.get_database_by_name(db_name)?.iter_schemas() {
870            if let Some(index) = schema.get_index_by_id(&index_id) {
871                return Ok(index);
872            }
873        }
874        Err(CatalogError::NotFound("index", index_id.to_string()))
875    }
876
877    pub fn get_view_by_name<'a>(
878        &self,
879        db_name: &str,
880        schema_path: SchemaPath<'a>,
881        view_name: &str,
882    ) -> CatalogResult<(&Arc<ViewCatalog>, &'a str)> {
883        schema_path
884            .try_find(|schema_name| {
885                Ok(self
886                    .get_schema_by_name(db_name, schema_name)?
887                    .get_view_by_name(view_name))
888            })?
889            .ok_or_else(|| CatalogError::NotFound("view", view_name.to_owned()))
890    }
891
892    pub fn get_view_by_id(&self, db_name: &str, view_id: u32) -> CatalogResult<Arc<ViewCatalog>> {
893        for schema in self.get_database_by_name(db_name)?.iter_schemas() {
894            if let Some(view) = schema.get_view_by_id(&ViewId::from(view_id)) {
895                return Ok(view.clone());
896            }
897        }
898        Err(CatalogError::NotFound("view", view_id.to_string()))
899    }
900
901    pub fn get_secret_by_name<'a>(
902        &self,
903        db_name: &str,
904        schema_path: SchemaPath<'a>,
905        secret_name: &str,
906    ) -> CatalogResult<(&Arc<SecretCatalog>, &'a str)> {
907        schema_path
908            .try_find(|schema_name| {
909                Ok(self
910                    .get_schema_by_name(db_name, schema_name)?
911                    .get_secret_by_name(secret_name))
912            })?
913            .ok_or_else(|| CatalogError::NotFound("secret", secret_name.to_owned()))
914    }
915
916    pub fn get_connection_by_id(
917        &self,
918        db_name: &str,
919        connection_id: ConnectionId,
920    ) -> CatalogResult<&Arc<ConnectionCatalog>> {
921        for schema in self.get_database_by_name(db_name)?.iter_schemas() {
922            if let Some(conn) = schema.get_connection_by_id(&connection_id) {
923                return Ok(conn);
924            }
925        }
926        Err(CatalogError::NotFound(
927            "connection",
928            connection_id.to_string(),
929        ))
930    }
931
932    pub fn get_connection_by_name<'a>(
933        &self,
934        db_name: &str,
935        schema_path: SchemaPath<'a>,
936        connection_name: &str,
937    ) -> CatalogResult<(&Arc<ConnectionCatalog>, &'a str)> {
938        schema_path
939            .try_find(|schema_name| {
940                Ok(self
941                    .get_schema_by_name(db_name, schema_name)?
942                    .get_connection_by_name(connection_name))
943            })?
944            .ok_or_else(|| CatalogError::NotFound("connection", connection_name.to_owned()))
945    }
946
947    pub fn get_function_by_name_inputs<'a>(
948        &self,
949        db_name: &str,
950        schema_path: SchemaPath<'a>,
951        function_name: &str,
952        inputs: &mut [ExprImpl],
953    ) -> CatalogResult<(&Arc<FunctionCatalog>, &'a str)> {
954        schema_path
955            .try_find(|schema_name| {
956                Ok(self
957                    .get_schema_by_name(db_name, schema_name)?
958                    .get_function_by_name_inputs(function_name, inputs))
959            })?
960            .ok_or_else(|| {
961                CatalogError::NotFound(
962                    "function",
963                    format!(
964                        "{}({})",
965                        function_name,
966                        inputs
967                            .iter()
968                            .map(|a| a.return_type().to_string())
969                            .join(", ")
970                    ),
971                )
972            })
973    }
974
975    pub fn get_function_by_name_args<'a>(
976        &self,
977        db_name: &str,
978        schema_path: SchemaPath<'a>,
979        function_name: &str,
980        args: &[DataType],
981    ) -> CatalogResult<(&Arc<FunctionCatalog>, &'a str)> {
982        schema_path
983            .try_find(|schema_name| {
984                Ok(self
985                    .get_schema_by_name(db_name, schema_name)?
986                    .get_function_by_name_args(function_name, args))
987            })?
988            .ok_or_else(|| {
989                CatalogError::NotFound(
990                    "function",
991                    format!(
992                        "{}({})",
993                        function_name,
994                        args.iter().map(|a| a.to_string()).join(", ")
995                    ),
996                )
997            })
998    }
999
1000    /// Gets all functions with the given name.
1001    pub fn get_functions_by_name<'a>(
1002        &self,
1003        db_name: &str,
1004        schema_path: SchemaPath<'a>,
1005        function_name: &str,
1006    ) -> CatalogResult<(Vec<&Arc<FunctionCatalog>>, &'a str)> {
1007        schema_path
1008            .try_find(|schema_name| {
1009                Ok(self
1010                    .get_schema_by_name(db_name, schema_name)?
1011                    .get_functions_by_name(function_name))
1012            })?
1013            .ok_or_else(|| CatalogError::NotFound("function", function_name.to_owned()))
1014    }
1015
1016    /// Check if the name duplicates with existing table, materialized view or source.
1017    pub fn check_relation_name_duplicated(
1018        &self,
1019        db_name: &str,
1020        schema_name: &str,
1021        relation_name: &str,
1022    ) -> CatalogResult<()> {
1023        let schema = self.get_schema_by_name(db_name, schema_name)?;
1024
1025        if let Some(table) = schema.get_any_table_by_name(relation_name) {
1026            let is_creating = table.stream_job_status == StreamJobStatus::Creating;
1027            if table.is_index() {
1028                Err(CatalogError::Duplicated(
1029                    "index",
1030                    relation_name.to_owned(),
1031                    is_creating,
1032                ))
1033            } else if table.is_mview() {
1034                Err(CatalogError::Duplicated(
1035                    "materialized view",
1036                    relation_name.to_owned(),
1037                    is_creating,
1038                ))
1039            } else {
1040                Err(CatalogError::Duplicated(
1041                    "table",
1042                    relation_name.to_owned(),
1043                    is_creating,
1044                ))
1045            }
1046        } else if schema.get_source_by_name(relation_name).is_some() {
1047            Err(CatalogError::duplicated("source", relation_name.to_owned()))
1048        } else if let Some(sink) = schema.get_any_sink_by_name(relation_name) {
1049            Err(CatalogError::Duplicated(
1050                "sink",
1051                relation_name.to_owned(),
1052                !sink.is_created(),
1053            ))
1054        } else if schema.get_view_by_name(relation_name).is_some() {
1055            Err(CatalogError::duplicated("view", relation_name.to_owned()))
1056        } else if let Some(subscription) = schema.get_subscription_by_name(relation_name) {
1057            let is_not_created = subscription.subscription_state != SubscriptionState::Created;
1058            Err(CatalogError::Duplicated(
1059                "subscription",
1060                relation_name.to_owned(),
1061                is_not_created,
1062            ))
1063        } else {
1064            Ok(())
1065        }
1066    }
1067
1068    pub fn check_function_name_duplicated(
1069        &self,
1070        db_name: &str,
1071        schema_name: &str,
1072        function_name: &str,
1073        arg_types: &[DataType],
1074    ) -> CatalogResult<()> {
1075        let schema = self.get_schema_by_name(db_name, schema_name)?;
1076
1077        if schema
1078            .get_function_by_name_args(function_name, arg_types)
1079            .is_some()
1080        {
1081            let name = format!(
1082                "{function_name}({})",
1083                arg_types.iter().map(|t| t.to_string()).join(",")
1084            );
1085            Err(CatalogError::duplicated("function", name))
1086        } else {
1087            Ok(())
1088        }
1089    }
1090
1091    /// Check if the name duplicates with existing connection.
1092    pub fn check_connection_name_duplicated(
1093        &self,
1094        db_name: &str,
1095        schema_name: &str,
1096        connection_name: &str,
1097    ) -> CatalogResult<()> {
1098        let schema = self.get_schema_by_name(db_name, schema_name)?;
1099
1100        if schema.get_connection_by_name(connection_name).is_some() {
1101            Err(CatalogError::duplicated(
1102                "connection",
1103                connection_name.to_owned(),
1104            ))
1105        } else {
1106            Ok(())
1107        }
1108    }
1109
1110    pub fn check_secret_name_duplicated(
1111        &self,
1112        db_name: &str,
1113        schema_name: &str,
1114        secret_name: &str,
1115    ) -> CatalogResult<()> {
1116        let schema = self.get_schema_by_name(db_name, schema_name)?;
1117
1118        if schema.get_secret_by_name(secret_name).is_some() {
1119            Err(CatalogError::duplicated("secret", secret_name.to_owned()))
1120        } else {
1121            Ok(())
1122        }
1123    }
1124
1125    pub fn table_stats(&self) -> &HummockVersionStats {
1126        &self.table_stats
1127    }
1128
1129    pub fn set_table_stats(&mut self, table_stats: HummockVersionStats) {
1130        self.table_stats = table_stats;
1131    }
1132
1133    pub fn get_all_indexes_related_to_object(
1134        &self,
1135        db_id: DatabaseId,
1136        schema_id: SchemaId,
1137        mv_id: TableId,
1138    ) -> Vec<Arc<IndexCatalog>> {
1139        self.get_database_by_id(&db_id)
1140            .unwrap()
1141            .get_schema_by_id(&schema_id)
1142            .unwrap()
1143            .get_any_indexes_by_table_id(&mv_id)
1144    }
1145
1146    pub fn get_id_by_class_name(
1147        &self,
1148        db_name: &str,
1149        schema_path: SchemaPath<'_>,
1150        class_name: &str,
1151    ) -> CatalogResult<u32> {
1152        schema_path
1153            .try_find(|schema_name| {
1154                let schema = self.get_schema_by_name(db_name, schema_name)?;
1155                #[allow(clippy::manual_map)]
1156                if let Some(item) = schema.get_system_table_by_name(class_name) {
1157                    Ok(Some(item.id().into()))
1158                } else if let Some(item) = schema.get_any_table_by_name(class_name) {
1159                    Ok(Some(item.id().into()))
1160                } else if let Some(item) = schema.get_any_index_by_name(class_name) {
1161                    Ok(Some(item.id.into()))
1162                } else if let Some(item) = schema.get_source_by_name(class_name) {
1163                    Ok(Some(item.id))
1164                } else if let Some(item) = schema.get_view_by_name(class_name) {
1165                    Ok(Some(item.id))
1166                } else if let Some(item) = schema.get_any_sink_by_name(class_name) {
1167                    Ok(Some(item.id.into()))
1168                } else {
1169                    Ok(None)
1170                }
1171            })?
1172            .map(|(id, _)| id)
1173            .ok_or_else(|| CatalogError::NotFound("class", class_name.to_owned()))
1174    }
1175}