risingwave_frontend/catalog/
root_catalog.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use itertools::Itertools;
19use risingwave_common::catalog::{FunctionId, IndexId, StreamJobStatus, TableId};
20use risingwave_common::id::ObjectId;
21use risingwave_common::session_config::{SearchPath, USER_NAME_WILD_CARD};
22use risingwave_common::types::DataType;
23use risingwave_connector::sink::catalog::SinkCatalog;
24use risingwave_pb::catalog::{
25    PbConnection, PbDatabase, PbFunction, PbIndex, PbSchema, PbSecret, PbSink, PbSource,
26    PbSubscription, PbTable, PbView,
27};
28use risingwave_pb::common::PbObjectType;
29use risingwave_pb::hummock::HummockVersionStats;
30use risingwave_pb::meta::ObjectDependency as PbObjectDependency;
31
32use super::function_catalog::FunctionCatalog;
33use super::source_catalog::SourceCatalog;
34use super::subscription_catalog::{SubscriptionCatalog, SubscriptionState};
35use super::view_catalog::ViewCatalog;
36use super::{
37    CatalogError, CatalogResult, ConnectionId, SecretId, SinkId, SourceId, SubscriptionId, ViewId,
38};
39use crate::catalog::connection_catalog::ConnectionCatalog;
40use crate::catalog::database_catalog::DatabaseCatalog;
41use crate::catalog::schema_catalog::SchemaCatalog;
42use crate::catalog::secret_catalog::SecretCatalog;
43use crate::catalog::system_catalog::{
44    SystemTableCatalog, get_sys_tables_in_schema, get_sys_views_in_schema,
45};
46use crate::catalog::table_catalog::TableCatalog;
47use crate::catalog::{DatabaseId, IndexCatalog, SchemaId};
48use crate::expr::{Expr, ExprImpl};
49
50#[derive(Copy, Clone)]
51pub enum SchemaPath<'a> {
52    Name(&'a str),
53    /// (`search_path`, `user_name`).
54    Path(&'a SearchPath, &'a str),
55}
56
57impl<'a> SchemaPath<'a> {
58    pub fn new(
59        schema_name: Option<&'a str>,
60        search_path: &'a SearchPath,
61        user_name: &'a str,
62    ) -> Self {
63        match schema_name {
64            Some(schema_name) => SchemaPath::Name(schema_name),
65            None => SchemaPath::Path(search_path, user_name),
66        }
67    }
68
69    /// Call function `f` for each schema name. Return the first `Some` result.
70    pub fn try_find<T, E>(
71        &self,
72        mut f: impl FnMut(&str) -> Result<Option<T>, E>,
73    ) -> Result<Option<(T, &'a str)>, E> {
74        match self {
75            SchemaPath::Name(schema_name) => Ok(f(schema_name)?.map(|t| (t, *schema_name))),
76            SchemaPath::Path(search_path, user_name) => {
77                for schema_name in search_path.path() {
78                    let mut schema_name: &str = schema_name;
79                    if schema_name == USER_NAME_WILD_CARD {
80                        schema_name = user_name;
81                    }
82                    if let Ok(Some(res)) = f(schema_name) {
83                        return Ok(Some((res, schema_name)));
84                    }
85                }
86                Ok(None)
87            }
88        }
89    }
90}
91
92#[derive(Clone, Debug)]
93pub struct ObjectDependency {
94    pub object_id: ObjectId,
95    pub referenced_object_id: ObjectId,
96    pub referenced_object_type: PbObjectType,
97}
98
99impl From<PbObjectDependency> for ObjectDependency {
100    fn from(dependency: PbObjectDependency) -> Self {
101        let referenced_object_type = PbObjectType::try_from(dependency.referenced_object_type)
102            .unwrap_or(PbObjectType::Unspecified);
103        Self {
104            object_id: dependency.object_id,
105            referenced_object_id: dependency.referenced_object_id,
106            referenced_object_type,
107        }
108    }
109}
110
111/// Root catalog of database catalog. It manages all database/schema/table in memory on frontend.
112/// It is protected by a `RwLock`. Only [`crate::observer::FrontendObserverNode`]
113/// will acquire the write lock and sync it with the meta catalog. In other situations, it is
114/// read only.
115///
116/// - catalog (root catalog)
117///   - database catalog
118///     - schema catalog
119///       - function catalog (i.e., user defined function)
120///       - table/sink/source/index/view catalog
121///        - column catalog
122pub struct Catalog {
123    database_by_name: HashMap<String, DatabaseCatalog>,
124    db_name_by_id: HashMap<DatabaseId, String>,
125    /// all table catalogs in the cluster identified by universal unique table id.
126    table_by_id: HashMap<TableId, Arc<TableCatalog>>,
127    table_stats: HummockVersionStats,
128    object_dependencies: HashMap<ObjectId, Vec<ObjectDependency>>,
129}
130
131#[expect(clippy::derivable_impls)]
132impl Default for Catalog {
133    fn default() -> Self {
134        Self {
135            database_by_name: HashMap::new(),
136            db_name_by_id: HashMap::new(),
137            table_by_id: HashMap::new(),
138            table_stats: HummockVersionStats::default(),
139            object_dependencies: HashMap::new(),
140        }
141    }
142}
143
144impl Catalog {
145    fn get_database_mut(&mut self, db_id: DatabaseId) -> Option<&mut DatabaseCatalog> {
146        let name = self.db_name_by_id.get(&db_id)?;
147        self.database_by_name.get_mut(name)
148    }
149
150    pub fn clear(&mut self) {
151        self.database_by_name.clear();
152        self.db_name_by_id.clear();
153        self.table_by_id.clear();
154        self.object_dependencies.clear();
155    }
156
157    pub fn create_database(&mut self, db: &PbDatabase) {
158        let name = db.name.clone();
159        let id = db.id;
160
161        self.database_by_name
162            .try_insert(name.clone(), db.into())
163            .unwrap();
164        self.db_name_by_id.try_insert(id, name).unwrap();
165    }
166
167    pub fn create_schema(&mut self, proto: &PbSchema) {
168        let database_id = proto.database_id;
169        let id = proto.id;
170        self.get_database_mut(database_id)
171            .unwrap()
172            .create_schema(proto);
173
174        for sys_table in get_sys_tables_in_schema(proto.name.as_str()) {
175            self.get_database_mut(database_id)
176                .unwrap()
177                .get_schema_mut(id)
178                .unwrap()
179                .create_sys_table(sys_table);
180        }
181        for mut sys_view in get_sys_views_in_schema(proto.name.as_str()) {
182            sys_view.database_id = database_id;
183            sys_view.schema_id = id;
184            self.get_database_mut(database_id)
185                .unwrap()
186                .get_schema_mut(id)
187                .unwrap()
188                .create_sys_view(Arc::new(sys_view));
189        }
190    }
191
192    pub fn create_table(&mut self, proto: &PbTable) {
193        let table = self
194            .get_database_mut(proto.database_id)
195            .unwrap()
196            .get_schema_mut(proto.schema_id)
197            .unwrap()
198            .create_table(proto);
199        self.table_by_id.insert(proto.id, table);
200    }
201
202    pub fn create_index(&mut self, proto: &PbIndex) {
203        self.get_database_mut(proto.database_id)
204            .unwrap()
205            .get_schema_mut(proto.schema_id)
206            .unwrap()
207            .create_index(proto);
208    }
209
210    pub fn create_source(&mut self, proto: &PbSource) {
211        self.get_database_mut(proto.database_id)
212            .unwrap()
213            .get_schema_mut(proto.schema_id)
214            .unwrap()
215            .create_source(proto);
216    }
217
218    pub fn create_sink(&mut self, proto: &PbSink) {
219        self.get_database_mut(proto.database_id)
220            .unwrap()
221            .get_schema_mut(proto.schema_id)
222            .unwrap()
223            .create_sink(proto);
224    }
225
226    pub fn create_subscription(&mut self, proto: &PbSubscription) {
227        self.get_database_mut(proto.database_id)
228            .unwrap()
229            .get_schema_mut(proto.schema_id)
230            .unwrap()
231            .create_subscription(proto);
232    }
233
234    pub fn create_secret(&mut self, proto: &PbSecret) {
235        self.get_database_mut(proto.database_id)
236            .unwrap()
237            .get_schema_mut(proto.schema_id)
238            .unwrap()
239            .create_secret(proto);
240    }
241
242    pub fn create_view(&mut self, proto: &PbView) {
243        self.get_database_mut(proto.database_id)
244            .unwrap()
245            .get_schema_mut(proto.schema_id)
246            .unwrap()
247            .create_view(proto);
248    }
249
250    pub fn create_function(&mut self, proto: &PbFunction) {
251        self.get_database_mut(proto.database_id)
252            .unwrap()
253            .get_schema_mut(proto.schema_id)
254            .unwrap()
255            .create_function(proto);
256    }
257
258    pub fn create_connection(&mut self, proto: &PbConnection) {
259        self.get_database_mut(proto.database_id)
260            .unwrap()
261            .get_schema_mut(proto.schema_id)
262            .unwrap()
263            .create_connection(proto);
264    }
265
266    pub fn drop_connection(
267        &mut self,
268        db_id: DatabaseId,
269        schema_id: SchemaId,
270        connection_id: ConnectionId,
271    ) {
272        self.get_database_mut(db_id)
273            .unwrap()
274            .get_schema_mut(schema_id)
275            .unwrap()
276            .drop_connection(connection_id);
277    }
278
279    pub fn update_connection(&mut self, proto: &PbConnection) {
280        let database = self.get_database_mut(proto.database_id).unwrap();
281        let schema = database.get_schema_mut(proto.schema_id).unwrap();
282        if schema.get_connection_by_id(proto.id).is_some() {
283            schema.update_connection(proto);
284        } else {
285            // Enter this branch when schema is changed by `ALTER ... SET SCHEMA ...` statement.
286            schema.create_connection(proto);
287            database
288                .iter_schemas_mut()
289                .find(|schema| {
290                    schema.id() != proto.schema_id
291                        && schema.get_connection_by_id(proto.id).is_some()
292                })
293                .unwrap()
294                .drop_connection(proto.id);
295        }
296    }
297
298    pub fn update_secret(&mut self, proto: &PbSecret) {
299        let database = self.get_database_mut(proto.database_id).unwrap();
300        let schema = database.get_schema_mut(proto.schema_id).unwrap();
301        let secret_id = proto.id;
302        if schema.get_secret_by_id(secret_id).is_some() {
303            schema.update_secret(proto);
304        } else {
305            // Enter this branch when schema is changed by `ALTER ... SET SCHEMA ...` statement.
306            schema.create_secret(proto);
307            database
308                .iter_schemas_mut()
309                .find(|schema| {
310                    schema.id() != proto.schema_id && schema.get_secret_by_id(secret_id).is_some()
311                })
312                .unwrap()
313                .drop_secret(secret_id);
314        }
315    }
316
317    pub fn drop_database(&mut self, db_id: DatabaseId) {
318        let name = self.db_name_by_id.remove(&db_id).unwrap();
319        let database = self.database_by_name.remove(&name).unwrap();
320        let object_ids: HashSet<ObjectId> = database.iter_object_ids().collect();
321        self.remove_object_dependencies_for_objects(&object_ids);
322        database.iter_all_table_ids().for_each(|table| {
323            self.table_by_id.remove(&table);
324        });
325    }
326
327    pub fn drop_schema(&mut self, db_id: DatabaseId, schema_id: SchemaId) {
328        let object_ids = self
329            .get_database_by_id(db_id)
330            .ok()
331            .and_then(|db| db.get_schema_by_id(schema_id))
332            .map(|schema| schema.iter_object_ids().collect::<HashSet<_>>())
333            .unwrap_or_default();
334        self.remove_object_dependencies_for_objects(&object_ids);
335        self.get_database_mut(db_id).unwrap().drop_schema(schema_id);
336    }
337
338    pub fn drop_table(&mut self, db_id: DatabaseId, schema_id: SchemaId, tb_id: TableId) {
339        self.remove_object_dependencies_for_object(tb_id.as_object_id());
340        self.table_by_id.remove(&tb_id);
341        self.get_database_mut(db_id)
342            .unwrap()
343            .get_schema_mut(schema_id)
344            .unwrap()
345            .drop_table(tb_id);
346    }
347
348    pub fn update_table(&mut self, proto: &PbTable) {
349        let database = self.get_database_mut(proto.database_id).unwrap();
350        let schema = database.get_schema_mut(proto.schema_id).unwrap();
351        let table = if schema.get_table_by_id(proto.id).is_some() {
352            schema.update_table(proto)
353        } else {
354            // Enter this branch when schema is changed by `ALTER ... SET SCHEMA ...` statement.
355            let new_table = schema.create_table(proto);
356            database
357                .iter_schemas_mut()
358                .find(|schema| {
359                    schema.id() != proto.schema_id
360                        && schema.get_created_table_by_id(proto.id).is_some()
361                })
362                .unwrap()
363                .drop_table(proto.id);
364            new_table
365        };
366
367        self.table_by_id.insert(proto.id, table);
368    }
369
370    pub fn update_database(&mut self, proto: &PbDatabase) {
371        let id = proto.id;
372        let name = proto.name.clone();
373
374        let old_database_name = self.db_name_by_id.get(&id).unwrap().to_owned();
375        if old_database_name != name {
376            let mut database = self.database_by_name.remove(&old_database_name).unwrap();
377            database.name.clone_from(&name);
378            database.owner = proto.owner;
379            self.database_by_name.insert(name.clone(), database);
380            self.db_name_by_id.insert(id, name);
381        } else {
382            let database = self.get_database_mut(id).unwrap();
383            database.name = name;
384            database.owner = proto.owner;
385            database.barrier_interval_ms = proto.barrier_interval_ms;
386            database.checkpoint_frequency = proto.checkpoint_frequency;
387        }
388    }
389
390    pub fn update_schema(&mut self, proto: &PbSchema) {
391        self.get_database_mut(proto.database_id)
392            .unwrap()
393            .update_schema(proto);
394    }
395
396    pub fn update_index(&mut self, proto: &PbIndex) {
397        let database = self.get_database_mut(proto.database_id).unwrap();
398        let schema = database.get_schema_mut(proto.schema_id).unwrap();
399        if schema.get_index_by_id(proto.id).is_some() {
400            schema.update_index(proto);
401        } else {
402            // Enter this branch when schema is changed by `ALTER ... SET SCHEMA ...` statement.
403            schema.create_index(proto);
404            database
405                .iter_schemas_mut()
406                .find(|schema| {
407                    schema.id() != proto.schema_id && schema.get_index_by_id(proto.id).is_some()
408                })
409                .unwrap()
410                .drop_index(proto.id);
411        }
412    }
413
414    pub fn drop_source(&mut self, db_id: DatabaseId, schema_id: SchemaId, source_id: SourceId) {
415        self.remove_object_dependencies_for_object(source_id.as_object_id());
416        self.get_database_mut(db_id)
417            .unwrap()
418            .get_schema_mut(schema_id)
419            .unwrap()
420            .drop_source(source_id);
421    }
422
423    pub fn update_source(&mut self, proto: &PbSource) {
424        let database = self.get_database_mut(proto.database_id).unwrap();
425        let schema = database.get_schema_mut(proto.schema_id).unwrap();
426        if schema.get_source_by_id(proto.id).is_some() {
427            schema.update_source(proto);
428        } else {
429            // Enter this branch when schema is changed by `ALTER ... SET SCHEMA ...` statement.
430            schema.create_source(proto);
431            database
432                .iter_schemas_mut()
433                .find(|schema| {
434                    schema.id() != proto.schema_id && schema.get_source_by_id(proto.id).is_some()
435                })
436                .unwrap()
437                .drop_source(proto.id);
438        }
439    }
440
441    pub fn drop_sink(&mut self, db_id: DatabaseId, schema_id: SchemaId, sink_id: SinkId) {
442        self.remove_object_dependencies_for_object(sink_id.as_object_id());
443        self.get_database_mut(db_id)
444            .unwrap()
445            .get_schema_mut(schema_id)
446            .unwrap()
447            .drop_sink(sink_id);
448    }
449
450    pub fn drop_secret(&mut self, db_id: DatabaseId, schema_id: SchemaId, secret_id: SecretId) {
451        self.remove_object_dependencies_for_object(secret_id.as_object_id());
452        self.get_database_mut(db_id)
453            .unwrap()
454            .get_schema_mut(schema_id)
455            .unwrap()
456            .drop_secret(secret_id);
457    }
458
459    pub fn update_sink(&mut self, proto: &PbSink) {
460        let database = self.get_database_mut(proto.database_id).unwrap();
461        let schema = database.get_schema_mut(proto.schema_id).unwrap();
462        if schema.get_sink_by_id(proto.id).is_some() {
463            schema.update_sink(proto);
464        } else {
465            // Enter this branch when schema is changed by `ALTER ... SET SCHEMA ...` statement.
466            schema.create_sink(proto);
467            database
468                .iter_schemas_mut()
469                .find(|schema| {
470                    schema.id() != proto.schema_id && schema.get_sink_by_id(proto.id).is_some()
471                })
472                .unwrap()
473                .drop_sink(proto.id);
474        }
475    }
476
477    pub fn drop_subscription(
478        &mut self,
479        db_id: DatabaseId,
480        schema_id: SchemaId,
481        subscription_id: SubscriptionId,
482    ) {
483        self.remove_object_dependencies_for_object(subscription_id.as_object_id());
484        self.get_database_mut(db_id)
485            .unwrap()
486            .get_schema_mut(schema_id)
487            .unwrap()
488            .drop_subscription(subscription_id);
489    }
490
491    pub fn update_subscription(&mut self, proto: &PbSubscription) {
492        let database = self.get_database_mut(proto.database_id).unwrap();
493        let schema = database.get_schema_mut(proto.schema_id).unwrap();
494        if schema.get_subscription_by_id(proto.id).is_some() {
495            schema.update_subscription(proto);
496        } else {
497            // Enter this branch when schema is changed by `ALTER ... SET SCHEMA ...` statement.
498            schema.create_subscription(proto);
499            database
500                .iter_schemas_mut()
501                .find(|schema| {
502                    schema.id() != proto.schema_id
503                        && schema.get_subscription_by_id(proto.id).is_some()
504                })
505                .unwrap()
506                .drop_subscription(proto.id);
507        }
508    }
509
510    pub fn drop_index(&mut self, db_id: DatabaseId, schema_id: SchemaId, index_id: IndexId) {
511        self.remove_object_dependencies_for_object(index_id.as_object_id());
512        self.get_database_mut(db_id)
513            .unwrap()
514            .get_schema_mut(schema_id)
515            .unwrap()
516            .drop_index(index_id);
517    }
518
519    pub fn drop_view(&mut self, db_id: DatabaseId, schema_id: SchemaId, view_id: ViewId) {
520        self.remove_object_dependencies_for_object(view_id.as_object_id());
521        self.get_database_mut(db_id)
522            .unwrap()
523            .get_schema_mut(schema_id)
524            .unwrap()
525            .drop_view(view_id);
526    }
527
528    pub fn update_view(&mut self, proto: &PbView) {
529        let database = self.get_database_mut(proto.database_id).unwrap();
530        let schema = database.get_schema_mut(proto.schema_id).unwrap();
531        if schema.get_view_by_id(proto.id).is_some() {
532            schema.update_view(proto);
533        } else {
534            // Enter this branch when schema is changed by `ALTER ... SET SCHEMA ...` statement.
535            schema.create_view(proto);
536            database
537                .iter_schemas_mut()
538                .find(|schema| {
539                    schema.id() != proto.schema_id && schema.get_view_by_id(proto.id).is_some()
540                })
541                .unwrap()
542                .drop_view(proto.id);
543        }
544    }
545
546    pub fn drop_function(
547        &mut self,
548        db_id: DatabaseId,
549        schema_id: SchemaId,
550        function_id: FunctionId,
551    ) {
552        self.remove_object_dependencies_for_object(function_id.as_object_id());
553        self.get_database_mut(db_id)
554            .unwrap()
555            .get_schema_mut(schema_id)
556            .unwrap()
557            .drop_function(function_id);
558    }
559
560    pub fn update_function(&mut self, proto: &PbFunction) {
561        let database = self.get_database_mut(proto.database_id).unwrap();
562        let schema = database.get_schema_mut(proto.schema_id).unwrap();
563        if schema.get_function_by_id(proto.id).is_some() {
564            schema.update_function(proto);
565        } else {
566            // Enter this branch when schema is changed by `ALTER ... SET SCHEMA ...` statement.
567            schema.create_function(proto);
568            database
569                .iter_schemas_mut()
570                .find(|schema| {
571                    schema.id() != proto.schema_id && schema.get_function_by_id(proto.id).is_some()
572                })
573                .unwrap()
574                .drop_function(proto.id);
575        }
576
577        self.get_database_mut(proto.database_id)
578            .unwrap()
579            .get_schema_mut(proto.schema_id)
580            .unwrap()
581            .update_function(proto);
582    }
583
584    pub fn get_database_by_name(&self, db_name: &str) -> CatalogResult<&DatabaseCatalog> {
585        self.database_by_name
586            .get(db_name)
587            .ok_or_else(|| CatalogError::not_found("database", db_name))
588    }
589
590    pub fn get_database_by_id(&self, db_id: DatabaseId) -> CatalogResult<&DatabaseCatalog> {
591        let db_name = self
592            .db_name_by_id
593            .get(&db_id)
594            .ok_or_else(|| CatalogError::not_found("db_id", db_id.to_string()))?;
595        self.database_by_name
596            .get(db_name)
597            .ok_or_else(|| CatalogError::not_found("database", db_name))
598    }
599
600    pub fn find_schema_secret_by_secret_id(
601        &self,
602        db_name: &str,
603        secret_id: SecretId,
604    ) -> CatalogResult<(String, String)> {
605        let db = self.get_database_by_name(db_name)?;
606        let schema_secret = db
607            .iter_schemas()
608            .find_map(|schema| {
609                schema
610                    .get_secret_by_id(secret_id)
611                    .map(|secret| (schema.name(), secret.name.clone()))
612            })
613            .ok_or_else(|| CatalogError::not_found("secret", secret_id.to_string()))?;
614        Ok(schema_secret)
615    }
616
617    pub fn get_all_schema_names(&self, db_name: &str) -> CatalogResult<Vec<String>> {
618        Ok(self.get_database_by_name(db_name)?.get_all_schema_names())
619    }
620
621    pub fn iter_schemas(
622        &self,
623        db_name: &str,
624    ) -> CatalogResult<impl Iterator<Item = &SchemaCatalog>> {
625        Ok(self.get_database_by_name(db_name)?.iter_schemas())
626    }
627
628    pub fn get_all_database_names(&self) -> Vec<String> {
629        self.database_by_name.keys().cloned().collect_vec()
630    }
631
632    pub fn iter_databases(&self) -> impl Iterator<Item = &DatabaseCatalog> {
633        self.database_by_name.values()
634    }
635
636    pub fn get_schema_by_name(
637        &self,
638        db_name: &str,
639        schema_name: &str,
640    ) -> CatalogResult<&SchemaCatalog> {
641        self.get_database_by_name(db_name)?
642            .get_schema_by_name(schema_name)
643            .ok_or_else(|| CatalogError::not_found("schema", schema_name))
644    }
645
646    pub fn get_table_name_by_id(&self, table_id: TableId) -> CatalogResult<String> {
647        self.get_any_table_by_id(table_id)
648            .map(|table| table.name.clone())
649    }
650
651    pub fn get_schema_by_id(
652        &self,
653        db_id: DatabaseId,
654        schema_id: SchemaId,
655    ) -> CatalogResult<&SchemaCatalog> {
656        self.get_database_by_id(db_id)?
657            .get_schema_by_id(schema_id)
658            .ok_or_else(|| CatalogError::not_found("schema_id", schema_id.to_string()))
659    }
660
661    /// Refer to [`SearchPath`].
662    pub fn first_valid_schema(
663        &self,
664        db_name: &str,
665        search_path: &SearchPath,
666        user_name: &str,
667    ) -> CatalogResult<&SchemaCatalog> {
668        for path in search_path.real_path() {
669            let mut schema_name: &str = path;
670            if schema_name == USER_NAME_WILD_CARD {
671                schema_name = user_name;
672            }
673
674            if let schema_catalog @ Ok(_) = self.get_schema_by_name(db_name, schema_name) {
675                return schema_catalog;
676            }
677        }
678        Err(CatalogError::not_found(
679            "first valid schema",
680            "no schema has been selected to create in".to_owned(),
681        ))
682    }
683
684    pub fn get_source_by_id<'a>(
685        &self,
686        db_name: &'a str,
687        schema_path: SchemaPath<'a>,
688        source_id: SourceId,
689    ) -> CatalogResult<(&Arc<SourceCatalog>, &'a str)> {
690        schema_path
691            .try_find(|schema_name| -> CatalogResult<_> {
692                Ok(self
693                    .get_schema_by_name(db_name, schema_name)?
694                    .get_source_by_id(source_id))
695            })?
696            .ok_or_else(|| CatalogError::not_found("source", source_id.to_string()))
697    }
698
699    pub fn get_table_by_name<'a>(
700        &self,
701        db_name: &str,
702        schema_path: SchemaPath<'a>,
703        table_name: &str,
704        bind_creating: bool,
705    ) -> CatalogResult<(&Arc<TableCatalog>, &'a str)> {
706        schema_path
707            .try_find(|schema_name| -> CatalogResult<_> {
708                Ok(self
709                    .get_schema_by_name(db_name, schema_name)?
710                    .get_table_by_name(table_name, bind_creating))
711            })?
712            .ok_or_else(|| CatalogError::not_found("table", table_name))
713    }
714
715    /// Used to get `TableCatalog` for Materialized Views, Tables and Indexes.
716    /// Retrieves all tables, created or creating.
717    pub fn get_any_table_by_name<'a>(
718        &self,
719        db_name: &str,
720        schema_path: SchemaPath<'a>,
721        table_name: &str,
722    ) -> CatalogResult<(&Arc<TableCatalog>, &'a str)> {
723        self.get_table_by_name(db_name, schema_path, table_name, true)
724    }
725
726    /// Used to get `TableCatalog` for Materialized Views, Tables and Indexes.
727    /// Retrieves only created tables.
728    pub fn get_created_table_by_name<'a>(
729        &self,
730        db_name: &str,
731        schema_path: SchemaPath<'a>,
732        table_name: &str,
733    ) -> CatalogResult<(&Arc<TableCatalog>, &'a str)> {
734        self.get_table_by_name(db_name, schema_path, table_name, false)
735    }
736
737    pub fn get_any_table_by_id(&self, table_id: TableId) -> CatalogResult<&Arc<TableCatalog>> {
738        self.table_by_id
739            .get(&table_id)
740            .ok_or_else(|| CatalogError::not_found("table id", table_id.to_string()))
741    }
742
743    /// This function is similar to `get_table_by_id` expect that a table must be in a given database.
744    pub fn get_created_table_by_id_with_db(
745        &self,
746        db_name: &str,
747        table_id: u32,
748    ) -> CatalogResult<&Arc<TableCatalog>> {
749        let table_id = TableId::from(table_id);
750        for schema in self.get_database_by_name(db_name)?.iter_schemas() {
751            if let Some(table) = schema.get_created_table_by_id(table_id) {
752                return Ok(table);
753            }
754        }
755        Err(CatalogError::not_found("table id", table_id.to_string()))
756    }
757
758    pub fn iter_tables(&self) -> impl Iterator<Item = &Arc<TableCatalog>> {
759        self.table_by_id.values()
760    }
761
762    pub fn iter_backfilling_internal_tables(&self) -> impl Iterator<Item = &Arc<TableCatalog>> {
763        self.table_by_id
764            .values()
765            .filter(|t| t.is_internal_table() && !t.is_created())
766    }
767
768    // Used by test_utils only.
769    pub fn alter_table_name_by_id(&mut self, table_id: TableId, table_name: &str) {
770        let mut found = false;
771        for database in self.database_by_name.values() {
772            if !found {
773                for schema in database.iter_schemas() {
774                    if schema.iter_user_table().any(|t| t.id() == table_id) {
775                        found = true;
776                        break;
777                    }
778                }
779            }
780        }
781
782        if found {
783            let mut table = self.get_any_table_by_id(table_id).unwrap().to_prost();
784            table.name = table_name.to_owned();
785            self.update_table(&table);
786        }
787    }
788
789    #[cfg(test)]
790    pub fn insert_table_id_mapping(&mut self, table_id: TableId, fragment_id: super::FragmentId) {
791        self.table_by_id.insert(
792            table_id,
793            Arc::new(TableCatalog {
794                fragment_id,
795                ..Default::default()
796            }),
797        );
798    }
799
800    pub fn get_sys_table_by_name(
801        &self,
802        db_name: &str,
803        schema_name: &str,
804        table_name: &str,
805    ) -> CatalogResult<&Arc<SystemTableCatalog>> {
806        self.get_schema_by_name(db_name, schema_name)?
807            .get_system_table_by_name(table_name)
808            .ok_or_else(|| CatalogError::not_found("table", table_name))
809    }
810
811    pub fn get_source_by_name<'a>(
812        &self,
813        db_name: &str,
814        schema_path: SchemaPath<'a>,
815        source_name: &str,
816    ) -> CatalogResult<(&Arc<SourceCatalog>, &'a str)> {
817        schema_path
818            .try_find(|schema_name| -> CatalogResult<_> {
819                Ok(self
820                    .get_schema_by_name(db_name, schema_name)?
821                    .get_source_by_name(source_name))
822            })?
823            .ok_or_else(|| CatalogError::not_found("source", source_name))
824    }
825
826    pub fn get_sink_by_name<'a>(
827        &self,
828        db_name: &str,
829        schema_path: SchemaPath<'a>,
830        sink_name: &str,
831        bind_creating: bool,
832    ) -> CatalogResult<(&Arc<SinkCatalog>, &'a str)> {
833        schema_path
834            .try_find(|schema_name| -> CatalogResult<_> {
835                Ok(self
836                    .get_schema_by_name(db_name, schema_name)?
837                    .get_sink_by_name(sink_name, bind_creating))
838            })?
839            .ok_or_else(|| CatalogError::not_found("sink", sink_name))
840    }
841
842    pub fn get_any_sink_by_name<'a>(
843        &self,
844        db_name: &str,
845        schema_path: SchemaPath<'a>,
846        sink_name: &str,
847    ) -> CatalogResult<(&Arc<SinkCatalog>, &'a str)> {
848        self.get_sink_by_name(db_name, schema_path, sink_name, true)
849    }
850
851    pub fn get_created_sink_by_name<'a>(
852        &self,
853        db_name: &str,
854        schema_path: SchemaPath<'a>,
855        sink_name: &str,
856    ) -> CatalogResult<(&Arc<SinkCatalog>, &'a str)> {
857        self.get_sink_by_name(db_name, schema_path, sink_name, false)
858    }
859
860    pub fn get_subscription_by_name<'a>(
861        &self,
862        db_name: &str,
863        schema_path: SchemaPath<'a>,
864        subscription_name: &str,
865    ) -> CatalogResult<(&Arc<SubscriptionCatalog>, &'a str)> {
866        schema_path
867            .try_find(|schema_name| -> CatalogResult<_> {
868                Ok(self
869                    .get_schema_by_name(db_name, schema_name)?
870                    .get_subscription_by_name(subscription_name))
871            })?
872            .ok_or_else(|| CatalogError::not_found("subscription", subscription_name))
873    }
874
875    pub fn get_index_by_name<'a>(
876        &self,
877        db_name: &str,
878        schema_path: SchemaPath<'a>,
879        index_name: &str,
880    ) -> CatalogResult<(&Arc<IndexCatalog>, &'a str)> {
881        schema_path
882            .try_find(|schema_name| -> CatalogResult<_> {
883                Ok(self
884                    .get_schema_by_name(db_name, schema_name)?
885                    .get_created_index_by_name(index_name))
886            })?
887            .ok_or_else(|| CatalogError::not_found("index", index_name))
888    }
889
890    pub fn get_any_index_by_name<'a>(
891        &self,
892        db_name: &str,
893        schema_path: SchemaPath<'a>,
894        index_name: &str,
895    ) -> CatalogResult<(&Arc<IndexCatalog>, &'a str)> {
896        schema_path
897            .try_find(|schema_name| -> CatalogResult<_> {
898                Ok(self
899                    .get_schema_by_name(db_name, schema_name)?
900                    .get_any_index_by_name(index_name))
901            })?
902            .ok_or_else(|| CatalogError::not_found("index", index_name))
903    }
904
905    pub fn get_index_by_id(
906        &self,
907        db_name: &str,
908        index_id: u32,
909    ) -> CatalogResult<&Arc<IndexCatalog>> {
910        let index_id = IndexId::from(index_id);
911        for schema in self.get_database_by_name(db_name)?.iter_schemas() {
912            if let Some(index) = schema.get_index_by_id(index_id) {
913                return Ok(index);
914            }
915        }
916        Err(CatalogError::not_found("index", index_id.to_string()))
917    }
918
919    pub fn get_view_by_name<'a>(
920        &self,
921        db_name: &str,
922        schema_path: SchemaPath<'a>,
923        view_name: &str,
924    ) -> CatalogResult<(&Arc<ViewCatalog>, &'a str)> {
925        schema_path
926            .try_find(|schema_name| -> CatalogResult<_> {
927                Ok(self
928                    .get_schema_by_name(db_name, schema_name)?
929                    .get_view_by_name(view_name))
930            })?
931            .ok_or_else(|| CatalogError::not_found("view", view_name))
932    }
933
934    pub fn get_view_by_id(&self, db_name: &str, view_id: u32) -> CatalogResult<Arc<ViewCatalog>> {
935        for schema in self.get_database_by_name(db_name)?.iter_schemas() {
936            if let Some(view) = schema.get_view_by_id(ViewId::from(view_id)) {
937                return Ok(view.clone());
938            }
939        }
940        Err(CatalogError::not_found("view", view_id.to_string()))
941    }
942
943    pub fn get_secret_by_name<'a>(
944        &self,
945        db_name: &str,
946        schema_path: SchemaPath<'a>,
947        secret_name: &str,
948    ) -> CatalogResult<(&Arc<SecretCatalog>, &'a str)> {
949        schema_path
950            .try_find(|schema_name| -> CatalogResult<_> {
951                Ok(self
952                    .get_schema_by_name(db_name, schema_name)?
953                    .get_secret_by_name(secret_name))
954            })?
955            .ok_or_else(|| CatalogError::not_found("secret", secret_name))
956    }
957
958    pub fn get_connection_by_id(
959        &self,
960        db_name: &str,
961        connection_id: ConnectionId,
962    ) -> CatalogResult<&Arc<ConnectionCatalog>> {
963        for schema in self.get_database_by_name(db_name)?.iter_schemas() {
964            if let Some(conn) = schema.get_connection_by_id(connection_id) {
965                return Ok(conn);
966            }
967        }
968        Err(CatalogError::not_found(
969            "connection",
970            connection_id.to_string(),
971        ))
972    }
973
974    pub fn get_connection_by_name<'a>(
975        &self,
976        db_name: &str,
977        schema_path: SchemaPath<'a>,
978        connection_name: &str,
979    ) -> CatalogResult<(&Arc<ConnectionCatalog>, &'a str)> {
980        schema_path
981            .try_find(|schema_name| -> CatalogResult<_> {
982                Ok(self
983                    .get_schema_by_name(db_name, schema_name)?
984                    .get_connection_by_name(connection_name))
985            })?
986            .ok_or_else(|| CatalogError::not_found("connection", connection_name))
987    }
988
989    pub fn get_function_by_name_inputs<'a>(
990        &self,
991        db_name: &str,
992        schema_path: SchemaPath<'a>,
993        function_name: &str,
994        inputs: &mut [ExprImpl],
995    ) -> CatalogResult<(&Arc<FunctionCatalog>, &'a str)> {
996        schema_path
997            .try_find(|schema_name| -> CatalogResult<_> {
998                Ok(self
999                    .get_schema_by_name(db_name, schema_name)?
1000                    .get_function_by_name_inputs(function_name, inputs))
1001            })?
1002            .ok_or_else(|| {
1003                CatalogError::not_found(
1004                    "function",
1005                    format!(
1006                        "{}({})",
1007                        function_name,
1008                        inputs
1009                            .iter()
1010                            .map(|a| a.return_type().to_string())
1011                            .join(", ")
1012                    ),
1013                )
1014            })
1015    }
1016
1017    pub fn get_function_by_name_args<'a>(
1018        &self,
1019        db_name: &str,
1020        schema_path: SchemaPath<'a>,
1021        function_name: &str,
1022        args: &[DataType],
1023    ) -> CatalogResult<(&Arc<FunctionCatalog>, &'a str)> {
1024        schema_path
1025            .try_find(|schema_name| -> CatalogResult<_> {
1026                Ok(self
1027                    .get_schema_by_name(db_name, schema_name)?
1028                    .get_function_by_name_args(function_name, args))
1029            })?
1030            .ok_or_else(|| {
1031                CatalogError::not_found(
1032                    "function",
1033                    format!(
1034                        "{}({})",
1035                        function_name,
1036                        args.iter().map(|a| a.to_string()).join(", ")
1037                    ),
1038                )
1039            })
1040    }
1041
1042    /// Gets all functions with the given name.
1043    pub fn get_functions_by_name<'a>(
1044        &self,
1045        db_name: &str,
1046        schema_path: SchemaPath<'a>,
1047        function_name: &str,
1048    ) -> CatalogResult<(Vec<&Arc<FunctionCatalog>>, &'a str)> {
1049        schema_path
1050            .try_find(|schema_name| -> CatalogResult<_> {
1051                Ok(self
1052                    .get_schema_by_name(db_name, schema_name)?
1053                    .get_functions_by_name(function_name))
1054            })?
1055            .ok_or_else(|| CatalogError::not_found("function", function_name))
1056    }
1057
1058    /// Check if the name duplicates with existing table, materialized view or source.
1059    pub fn check_relation_name_duplicated(
1060        &self,
1061        db_name: &str,
1062        schema_name: &str,
1063        relation_name: &str,
1064    ) -> CatalogResult<()> {
1065        let schema = self.get_schema_by_name(db_name, schema_name)?;
1066
1067        if let Some(table) = schema.get_any_table_by_name(relation_name) {
1068            let is_creating = table.stream_job_status == StreamJobStatus::Creating;
1069            if table.is_index() {
1070                Err(CatalogError::duplicated_under_creation(
1071                    "index",
1072                    relation_name.to_owned(),
1073                    is_creating,
1074                ))
1075            } else if table.is_mview() {
1076                Err(CatalogError::duplicated_under_creation(
1077                    "materialized view",
1078                    relation_name.to_owned(),
1079                    is_creating,
1080                ))
1081            } else {
1082                Err(CatalogError::duplicated_under_creation(
1083                    "table",
1084                    relation_name.to_owned(),
1085                    is_creating,
1086                ))
1087            }
1088        } else if schema.get_source_by_name(relation_name).is_some() {
1089            Err(CatalogError::duplicated("source", relation_name))
1090        } else if let Some(sink) = schema.get_any_sink_by_name(relation_name) {
1091            Err(CatalogError::duplicated_under_creation(
1092                "sink",
1093                relation_name.to_owned(),
1094                !sink.is_created(),
1095            ))
1096        } else if schema.get_view_by_name(relation_name).is_some() {
1097            Err(CatalogError::duplicated("view", relation_name))
1098        } else if let Some(subscription) = schema.get_subscription_by_name(relation_name) {
1099            let is_not_created = subscription.subscription_state != SubscriptionState::Created;
1100            Err(CatalogError::duplicated_under_creation(
1101                "subscription",
1102                relation_name.to_owned(),
1103                is_not_created,
1104            ))
1105        } else {
1106            Ok(())
1107        }
1108    }
1109
1110    pub fn check_function_name_duplicated(
1111        &self,
1112        db_name: &str,
1113        schema_name: &str,
1114        function_name: &str,
1115        arg_types: &[DataType],
1116    ) -> CatalogResult<()> {
1117        let schema = self.get_schema_by_name(db_name, schema_name)?;
1118
1119        if schema
1120            .get_function_by_name_args(function_name, arg_types)
1121            .is_some()
1122        {
1123            let name = format!(
1124                "{function_name}({})",
1125                arg_types.iter().map(|t| t.to_string()).join(",")
1126            );
1127            Err(CatalogError::duplicated("function", name))
1128        } else {
1129            Ok(())
1130        }
1131    }
1132
1133    /// Check if the name duplicates with existing connection.
1134    pub fn check_connection_name_duplicated(
1135        &self,
1136        db_name: &str,
1137        schema_name: &str,
1138        connection_name: &str,
1139    ) -> CatalogResult<()> {
1140        let schema = self.get_schema_by_name(db_name, schema_name)?;
1141
1142        if schema.get_connection_by_name(connection_name).is_some() {
1143            Err(CatalogError::duplicated("connection", connection_name))
1144        } else {
1145            Ok(())
1146        }
1147    }
1148
1149    pub fn check_secret_name_duplicated(
1150        &self,
1151        db_name: &str,
1152        schema_name: &str,
1153        secret_name: &str,
1154    ) -> CatalogResult<()> {
1155        let schema = self.get_schema_by_name(db_name, schema_name)?;
1156
1157        if schema.get_secret_by_name(secret_name).is_some() {
1158            Err(CatalogError::duplicated("secret", secret_name))
1159        } else {
1160            Ok(())
1161        }
1162    }
1163
1164    pub fn table_stats(&self) -> &HummockVersionStats {
1165        &self.table_stats
1166    }
1167
1168    pub fn set_table_stats(&mut self, table_stats: HummockVersionStats) {
1169        self.table_stats = table_stats;
1170    }
1171
1172    pub fn set_object_dependencies(&mut self, dependencies: Vec<PbObjectDependency>) {
1173        self.object_dependencies.clear();
1174        self.insert_object_dependencies(dependencies);
1175    }
1176
1177    pub fn insert_object_dependencies(&mut self, dependencies: Vec<PbObjectDependency>) {
1178        let mut grouped: HashMap<ObjectId, Vec<ObjectDependency>> = HashMap::new();
1179        for dependency in dependencies {
1180            let dependency = ObjectDependency::from(dependency);
1181            grouped
1182                .entry(dependency.object_id)
1183                .or_default()
1184                .push(dependency);
1185        }
1186        for (object_id, deps) in grouped {
1187            self.object_dependencies.insert(object_id, deps);
1188        }
1189    }
1190
1191    pub fn iter_object_dependencies(&self) -> impl Iterator<Item = &ObjectDependency> {
1192        self.object_dependencies
1193            .values()
1194            .flat_map(|deps| deps.iter())
1195    }
1196
1197    fn remove_object_dependencies_for_object(&mut self, object_id: ObjectId) {
1198        let mut object_ids = HashSet::new();
1199        object_ids.insert(object_id);
1200        self.remove_object_dependencies_for_objects(&object_ids);
1201    }
1202
1203    fn remove_object_dependencies_for_objects(&mut self, object_ids: &HashSet<ObjectId>) {
1204        if object_ids.is_empty() {
1205            return;
1206        }
1207        self.object_dependencies.retain(|object_id, deps| {
1208            if object_ids.contains(object_id) {
1209                return false;
1210            }
1211            deps.retain(|dep| !object_ids.contains(&dep.referenced_object_id));
1212            !deps.is_empty()
1213        });
1214    }
1215
1216    pub fn get_all_indexes_related_to_object(
1217        &self,
1218        db_id: DatabaseId,
1219        schema_id: SchemaId,
1220        mv_id: TableId,
1221    ) -> Vec<Arc<IndexCatalog>> {
1222        self.get_database_by_id(db_id)
1223            .unwrap()
1224            .get_schema_by_id(schema_id)
1225            .unwrap()
1226            .get_any_indexes_by_table_id(mv_id)
1227    }
1228
1229    pub fn get_id_by_class_name(
1230        &self,
1231        db_name: &str,
1232        schema_path: SchemaPath<'_>,
1233        class_name: &str,
1234    ) -> CatalogResult<ObjectId> {
1235        schema_path
1236            .try_find(|schema_name| -> CatalogResult<_> {
1237                let schema = self.get_schema_by_name(db_name, schema_name)?;
1238                #[allow(clippy::manual_map)]
1239                if let Some(item) = schema.get_system_table_by_name(class_name) {
1240                    Ok(Some(item.id().as_object_id()))
1241                } else if let Some(item) = schema.get_any_table_by_name(class_name) {
1242                    Ok(Some(item.id().as_object_id()))
1243                } else if let Some(item) = schema.get_any_index_by_name(class_name) {
1244                    Ok(Some(item.id.as_object_id()))
1245                } else if let Some(item) = schema.get_source_by_name(class_name) {
1246                    Ok(Some(item.id.as_object_id()))
1247                } else if let Some(item) = schema.get_view_by_name(class_name) {
1248                    Ok(Some(item.id.as_object_id()))
1249                } else if let Some(item) = schema.get_any_sink_by_name(class_name) {
1250                    Ok(Some(item.id.as_object_id()))
1251                } else {
1252                    Ok(None)
1253                }
1254            })?
1255            .map(|(id, _)| id)
1256            .ok_or_else(|| CatalogError::not_found("class", class_name))
1257    }
1258}