Skip to main content

risingwave_frontend/catalog/
root_catalog.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use itertools::Itertools;
19use risingwave_common::catalog::{FunctionId, IndexId, StreamJobStatus, TableId};
20use risingwave_common::id::ObjectId;
21use risingwave_common::session_config::{SearchPath, USER_NAME_WILD_CARD};
22use risingwave_common::types::DataType;
23use risingwave_connector::sink::catalog::SinkCatalog;
24use risingwave_pb::catalog::{
25    PbConnection, PbDatabase, PbFunction, PbIndex, PbSchema, PbSecret, PbSink, PbSource,
26    PbSubscription, PbTable, PbView,
27};
28use risingwave_pb::common::PbObjectType;
29use risingwave_pb::hummock::HummockVersionStats;
30use risingwave_pb::meta::ObjectDependency as PbObjectDependency;
31
32use super::function_catalog::FunctionCatalog;
33use super::source_catalog::SourceCatalog;
34use super::subscription_catalog::{SubscriptionCatalog, SubscriptionState};
35use super::view_catalog::ViewCatalog;
36use super::{
37    CatalogError, CatalogResult, ConnectionId, SecretId, SinkId, SourceId, SubscriptionId, ViewId,
38};
39use crate::catalog::connection_catalog::ConnectionCatalog;
40use crate::catalog::database_catalog::DatabaseCatalog;
41use crate::catalog::schema_catalog::SchemaCatalog;
42use crate::catalog::secret_catalog::SecretCatalog;
43use crate::catalog::system_catalog::{
44    SystemTableCatalog, get_sys_tables_in_schema, get_sys_views_in_schema,
45};
46use crate::catalog::table_catalog::TableCatalog;
47use crate::catalog::{DatabaseId, IndexCatalog, SchemaId};
48use crate::expr::{Expr, ExprImpl};
49
50#[derive(Copy, Clone)]
51pub enum SchemaPath<'a> {
52    Name(&'a str),
53    /// (`search_path`, `user_name`).
54    Path(&'a SearchPath, &'a str),
55}
56
57impl<'a> SchemaPath<'a> {
58    pub fn new(
59        schema_name: Option<&'a str>,
60        search_path: &'a SearchPath,
61        user_name: &'a str,
62    ) -> Self {
63        match schema_name {
64            Some(schema_name) => SchemaPath::Name(schema_name),
65            None => SchemaPath::Path(search_path, user_name),
66        }
67    }
68
69    /// Call function `f` for each schema name. Return the first `Some` result.
70    pub fn try_find<T, E>(
71        &self,
72        mut f: impl FnMut(&str) -> Result<Option<T>, E>,
73    ) -> Result<Option<(T, &'a str)>, E> {
74        match self {
75            SchemaPath::Name(schema_name) => Ok(f(schema_name)?.map(|t| (t, *schema_name))),
76            SchemaPath::Path(search_path, user_name) => {
77                for schema_name in search_path.path() {
78                    let mut schema_name: &str = schema_name;
79                    if schema_name == USER_NAME_WILD_CARD {
80                        schema_name = user_name;
81                    }
82                    if let Ok(Some(res)) = f(schema_name) {
83                        return Ok(Some((res, schema_name)));
84                    }
85                }
86                Ok(None)
87            }
88        }
89    }
90}
91
92#[derive(Clone, Debug)]
93pub struct ObjectDependency {
94    pub object_id: ObjectId,
95    pub referenced_object_id: ObjectId,
96    pub referenced_object_type: PbObjectType,
97}
98
99impl From<PbObjectDependency> for ObjectDependency {
100    fn from(dependency: PbObjectDependency) -> Self {
101        let referenced_object_type = PbObjectType::try_from(dependency.referenced_object_type)
102            .unwrap_or(PbObjectType::Unspecified);
103        Self {
104            object_id: dependency.object_id,
105            referenced_object_id: dependency.referenced_object_id,
106            referenced_object_type,
107        }
108    }
109}
110
111/// Root catalog of database catalog. It manages all database/schema/table in memory on frontend.
112/// It is protected by a `RwLock`. Only [`crate::observer::FrontendObserverNode`]
113/// will acquire the write lock and sync it with the meta catalog. In other situations, it is
114/// read only.
115///
116/// - catalog (root catalog)
117///   - database catalog
118///     - schema catalog
119///       - function catalog (i.e., user defined function)
120///       - table/sink/source/index/view catalog
121///        - column catalog
122pub struct Catalog {
123    database_by_name: HashMap<String, DatabaseCatalog>,
124    db_name_by_id: HashMap<DatabaseId, String>,
125    /// all table catalogs in the cluster identified by universal unique table id.
126    table_by_id: HashMap<TableId, Arc<TableCatalog>>,
127    table_stats: HummockVersionStats,
128    object_dependencies: HashMap<ObjectId, Vec<ObjectDependency>>,
129}
130
131#[expect(clippy::derivable_impls)]
132impl Default for Catalog {
133    fn default() -> Self {
134        Self {
135            database_by_name: HashMap::new(),
136            db_name_by_id: HashMap::new(),
137            table_by_id: HashMap::new(),
138            table_stats: HummockVersionStats::default(),
139            object_dependencies: HashMap::new(),
140        }
141    }
142}
143
144impl Catalog {
145    fn get_database_mut(&mut self, db_id: DatabaseId) -> Option<&mut DatabaseCatalog> {
146        let name = self.db_name_by_id.get(&db_id)?;
147        self.database_by_name.get_mut(name)
148    }
149
150    pub fn clear(&mut self) {
151        self.database_by_name.clear();
152        self.db_name_by_id.clear();
153        self.table_by_id.clear();
154        self.object_dependencies.clear();
155    }
156
157    pub fn create_database(&mut self, db: &PbDatabase) {
158        let name = db.name.clone();
159        let id = db.id;
160
161        self.database_by_name
162            .try_insert(name.clone(), db.into())
163            .unwrap();
164        self.db_name_by_id.try_insert(id, name).unwrap();
165    }
166
167    pub fn create_schema(&mut self, proto: &PbSchema) {
168        let database_id = proto.database_id;
169        let id = proto.id;
170        self.get_database_mut(database_id)
171            .unwrap()
172            .create_schema(proto);
173
174        for sys_table in get_sys_tables_in_schema(proto.name.as_str()) {
175            self.get_database_mut(database_id)
176                .unwrap()
177                .get_schema_mut(id)
178                .unwrap()
179                .create_sys_table(sys_table);
180        }
181        for mut sys_view in get_sys_views_in_schema(proto.name.as_str()) {
182            sys_view.database_id = database_id;
183            sys_view.schema_id = id;
184            self.get_database_mut(database_id)
185                .unwrap()
186                .get_schema_mut(id)
187                .unwrap()
188                .create_sys_view(Arc::new(sys_view));
189        }
190    }
191
192    pub fn create_table(&mut self, proto: &PbTable) {
193        let table = self
194            .get_database_mut(proto.database_id)
195            .unwrap()
196            .get_schema_mut(proto.schema_id)
197            .unwrap()
198            .create_table(proto);
199        self.table_by_id.insert(proto.id, table);
200    }
201
202    pub fn create_index(&mut self, proto: &PbIndex) {
203        self.get_database_mut(proto.database_id)
204            .unwrap()
205            .get_schema_mut(proto.schema_id)
206            .unwrap()
207            .create_index(proto);
208    }
209
210    pub fn create_source(&mut self, proto: &PbSource) {
211        self.get_database_mut(proto.database_id)
212            .unwrap()
213            .get_schema_mut(proto.schema_id)
214            .unwrap()
215            .create_source(proto);
216    }
217
218    pub fn create_sink(&mut self, proto: &PbSink) {
219        self.get_database_mut(proto.database_id)
220            .unwrap()
221            .get_schema_mut(proto.schema_id)
222            .unwrap()
223            .create_sink(proto);
224    }
225
226    pub fn create_subscription(&mut self, proto: &PbSubscription) {
227        self.get_database_mut(proto.database_id)
228            .unwrap()
229            .get_schema_mut(proto.schema_id)
230            .unwrap()
231            .create_subscription(proto);
232    }
233
234    pub fn create_secret(&mut self, proto: &PbSecret) {
235        self.get_database_mut(proto.database_id)
236            .unwrap()
237            .get_schema_mut(proto.schema_id)
238            .unwrap()
239            .create_secret(proto);
240    }
241
242    pub fn create_view(&mut self, proto: &PbView) {
243        self.get_database_mut(proto.database_id)
244            .unwrap()
245            .get_schema_mut(proto.schema_id)
246            .unwrap()
247            .create_view(proto);
248    }
249
250    pub fn create_function(&mut self, proto: &PbFunction) {
251        self.get_database_mut(proto.database_id)
252            .unwrap()
253            .get_schema_mut(proto.schema_id)
254            .unwrap()
255            .create_function(proto);
256    }
257
258    pub fn create_connection(&mut self, proto: &PbConnection) {
259        self.get_database_mut(proto.database_id)
260            .unwrap()
261            .get_schema_mut(proto.schema_id)
262            .unwrap()
263            .create_connection(proto);
264    }
265
266    pub fn drop_connection(
267        &mut self,
268        db_id: DatabaseId,
269        schema_id: SchemaId,
270        connection_id: ConnectionId,
271    ) {
272        self.get_database_mut(db_id)
273            .unwrap()
274            .get_schema_mut(schema_id)
275            .unwrap()
276            .drop_connection(connection_id);
277    }
278
279    pub fn update_connection(&mut self, proto: &PbConnection) {
280        let database = self.get_database_mut(proto.database_id).unwrap();
281        let schema = database.get_schema_mut(proto.schema_id).unwrap();
282        if schema.get_connection_by_id(proto.id).is_some() {
283            schema.update_connection(proto);
284        } else {
285            // Enter this branch when schema is changed by `ALTER ... SET SCHEMA ...` statement.
286            schema.create_connection(proto);
287            database
288                .iter_schemas_mut()
289                .find(|schema| {
290                    schema.id() != proto.schema_id
291                        && schema.get_connection_by_id(proto.id).is_some()
292                })
293                .unwrap()
294                .drop_connection(proto.id);
295        }
296    }
297
298    pub fn update_secret(&mut self, proto: &PbSecret) {
299        let database = self.get_database_mut(proto.database_id).unwrap();
300        let schema = database.get_schema_mut(proto.schema_id).unwrap();
301        let secret_id = proto.id;
302        if schema.get_secret_by_id(secret_id).is_some() {
303            schema.update_secret(proto);
304        } else {
305            // Enter this branch when schema is changed by `ALTER ... SET SCHEMA ...` statement.
306            schema.create_secret(proto);
307            database
308                .iter_schemas_mut()
309                .find(|schema| {
310                    schema.id() != proto.schema_id && schema.get_secret_by_id(secret_id).is_some()
311                })
312                .unwrap()
313                .drop_secret(secret_id);
314        }
315    }
316
317    pub fn drop_database(&mut self, db_id: DatabaseId) {
318        let name = self.db_name_by_id.remove(&db_id).unwrap();
319        let database = self.database_by_name.remove(&name).unwrap();
320        let object_ids: HashSet<ObjectId> = database.iter_object_ids().collect();
321        self.remove_object_dependencies_for_objects(&object_ids);
322        database.iter_all_table_ids().for_each(|table| {
323            self.table_by_id.remove(&table);
324        });
325    }
326
327    pub fn drop_schema(&mut self, db_id: DatabaseId, schema_id: SchemaId) {
328        let object_ids = self
329            .get_database_by_id(db_id)
330            .ok()
331            .and_then(|db| db.get_schema_by_id(schema_id))
332            .map(|schema| schema.iter_object_ids().collect::<HashSet<_>>())
333            .unwrap_or_default();
334        self.remove_object_dependencies_for_objects(&object_ids);
335        self.get_database_mut(db_id).unwrap().drop_schema(schema_id);
336    }
337
338    pub fn drop_table(&mut self, db_id: DatabaseId, schema_id: SchemaId, tb_id: TableId) {
339        self.remove_object_dependencies_for_object(tb_id.as_object_id());
340        self.table_by_id.remove(&tb_id);
341        self.get_database_mut(db_id)
342            .unwrap()
343            .get_schema_mut(schema_id)
344            .unwrap()
345            .drop_table(tb_id);
346    }
347
348    pub fn update_table(&mut self, proto: &PbTable) {
349        let database = self.get_database_mut(proto.database_id).unwrap();
350        let schema = database.get_schema_mut(proto.schema_id).unwrap();
351        let table = if schema.get_table_by_id(proto.id).is_some() {
352            schema.update_table(proto)
353        } else {
354            // Enter this branch when schema is changed by `ALTER ... SET SCHEMA ...` statement.
355            let new_table = schema.create_table(proto);
356            database
357                .iter_schemas_mut()
358                .find(|schema| {
359                    schema.id() != proto.schema_id
360                        && schema.get_created_table_by_id(proto.id).is_some()
361                })
362                .unwrap()
363                .drop_table(proto.id);
364            new_table
365        };
366
367        self.table_by_id.insert(proto.id, table);
368    }
369
370    pub fn update_database(&mut self, proto: &PbDatabase) {
371        let id = proto.id;
372        let name = proto.name.clone();
373
374        let old_database_name = self.db_name_by_id.get(&id).unwrap().to_owned();
375        if old_database_name != name {
376            let mut database = self.database_by_name.remove(&old_database_name).unwrap();
377            database.name.clone_from(&name);
378            database.owner = proto.owner;
379            database.resource_group.clone_from(&proto.resource_group);
380            database.barrier_interval_ms = proto.barrier_interval_ms;
381            database.checkpoint_frequency = proto.checkpoint_frequency;
382            self.database_by_name.insert(name.clone(), database);
383            self.db_name_by_id.insert(id, name);
384        } else {
385            let database = self.get_database_mut(id).unwrap();
386            database.name = name;
387            database.owner = proto.owner;
388            database.resource_group = proto.resource_group.clone();
389            database.barrier_interval_ms = proto.barrier_interval_ms;
390            database.checkpoint_frequency = proto.checkpoint_frequency;
391        }
392    }
393
394    pub fn update_schema(&mut self, proto: &PbSchema) {
395        self.get_database_mut(proto.database_id)
396            .unwrap()
397            .update_schema(proto);
398    }
399
400    pub fn update_index(&mut self, proto: &PbIndex) {
401        let database = self.get_database_mut(proto.database_id).unwrap();
402        let schema = database.get_schema_mut(proto.schema_id).unwrap();
403        if schema.get_index_by_id(proto.id).is_some() {
404            schema.update_index(proto);
405        } else {
406            // Enter this branch when schema is changed by `ALTER ... SET SCHEMA ...` statement.
407            schema.create_index(proto);
408            database
409                .iter_schemas_mut()
410                .find(|schema| {
411                    schema.id() != proto.schema_id && schema.get_index_by_id(proto.id).is_some()
412                })
413                .unwrap()
414                .drop_index(proto.id);
415        }
416    }
417
418    pub fn drop_source(&mut self, db_id: DatabaseId, schema_id: SchemaId, source_id: SourceId) {
419        self.remove_object_dependencies_for_object(source_id.as_object_id());
420        self.get_database_mut(db_id)
421            .unwrap()
422            .get_schema_mut(schema_id)
423            .unwrap()
424            .drop_source(source_id);
425    }
426
427    pub fn update_source(&mut self, proto: &PbSource) {
428        let database = self.get_database_mut(proto.database_id).unwrap();
429        let schema = database.get_schema_mut(proto.schema_id).unwrap();
430        if schema.get_source_by_id(proto.id).is_some() {
431            schema.update_source(proto);
432        } else {
433            // Enter this branch when schema is changed by `ALTER ... SET SCHEMA ...` statement.
434            schema.create_source(proto);
435            database
436                .iter_schemas_mut()
437                .find(|schema| {
438                    schema.id() != proto.schema_id && schema.get_source_by_id(proto.id).is_some()
439                })
440                .unwrap()
441                .drop_source(proto.id);
442        }
443    }
444
445    pub fn drop_sink(&mut self, db_id: DatabaseId, schema_id: SchemaId, sink_id: SinkId) {
446        self.remove_object_dependencies_for_object(sink_id.as_object_id());
447        self.get_database_mut(db_id)
448            .unwrap()
449            .get_schema_mut(schema_id)
450            .unwrap()
451            .drop_sink(sink_id);
452    }
453
454    pub fn drop_secret(&mut self, db_id: DatabaseId, schema_id: SchemaId, secret_id: SecretId) {
455        self.remove_object_dependencies_for_object(secret_id.as_object_id());
456        self.get_database_mut(db_id)
457            .unwrap()
458            .get_schema_mut(schema_id)
459            .unwrap()
460            .drop_secret(secret_id);
461    }
462
463    pub fn update_sink(&mut self, proto: &PbSink) {
464        let database = self.get_database_mut(proto.database_id).unwrap();
465        let schema = database.get_schema_mut(proto.schema_id).unwrap();
466        if schema.get_sink_by_id(proto.id).is_some() {
467            schema.update_sink(proto);
468        } else {
469            // Enter this branch when schema is changed by `ALTER ... SET SCHEMA ...` statement.
470            schema.create_sink(proto);
471            database
472                .iter_schemas_mut()
473                .find(|schema| {
474                    schema.id() != proto.schema_id && schema.get_sink_by_id(proto.id).is_some()
475                })
476                .unwrap()
477                .drop_sink(proto.id);
478        }
479    }
480
481    pub fn drop_subscription(
482        &mut self,
483        db_id: DatabaseId,
484        schema_id: SchemaId,
485        subscription_id: SubscriptionId,
486    ) {
487        self.remove_object_dependencies_for_object(subscription_id.as_object_id());
488        self.get_database_mut(db_id)
489            .unwrap()
490            .get_schema_mut(schema_id)
491            .unwrap()
492            .drop_subscription(subscription_id);
493    }
494
495    pub fn update_subscription(&mut self, proto: &PbSubscription) {
496        let database = self.get_database_mut(proto.database_id).unwrap();
497        let schema = database.get_schema_mut(proto.schema_id).unwrap();
498        if schema.get_subscription_by_id(proto.id).is_some() {
499            schema.update_subscription(proto);
500        } else {
501            // Enter this branch when schema is changed by `ALTER ... SET SCHEMA ...` statement.
502            schema.create_subscription(proto);
503            database
504                .iter_schemas_mut()
505                .find(|schema| {
506                    schema.id() != proto.schema_id
507                        && schema.get_subscription_by_id(proto.id).is_some()
508                })
509                .unwrap()
510                .drop_subscription(proto.id);
511        }
512    }
513
514    pub fn drop_index(&mut self, db_id: DatabaseId, schema_id: SchemaId, index_id: IndexId) {
515        self.remove_object_dependencies_for_object(index_id.as_object_id());
516        self.get_database_mut(db_id)
517            .unwrap()
518            .get_schema_mut(schema_id)
519            .unwrap()
520            .drop_index(index_id);
521    }
522
523    pub fn drop_view(&mut self, db_id: DatabaseId, schema_id: SchemaId, view_id: ViewId) {
524        self.remove_object_dependencies_for_object(view_id.as_object_id());
525        self.get_database_mut(db_id)
526            .unwrap()
527            .get_schema_mut(schema_id)
528            .unwrap()
529            .drop_view(view_id);
530    }
531
532    pub fn update_view(&mut self, proto: &PbView) {
533        let database = self.get_database_mut(proto.database_id).unwrap();
534        let schema = database.get_schema_mut(proto.schema_id).unwrap();
535        if schema.get_view_by_id(proto.id).is_some() {
536            schema.update_view(proto);
537        } else {
538            // Enter this branch when schema is changed by `ALTER ... SET SCHEMA ...` statement.
539            schema.create_view(proto);
540            database
541                .iter_schemas_mut()
542                .find(|schema| {
543                    schema.id() != proto.schema_id && schema.get_view_by_id(proto.id).is_some()
544                })
545                .unwrap()
546                .drop_view(proto.id);
547        }
548    }
549
550    pub fn drop_function(
551        &mut self,
552        db_id: DatabaseId,
553        schema_id: SchemaId,
554        function_id: FunctionId,
555    ) {
556        self.remove_object_dependencies_for_object(function_id.as_object_id());
557        self.get_database_mut(db_id)
558            .unwrap()
559            .get_schema_mut(schema_id)
560            .unwrap()
561            .drop_function(function_id);
562    }
563
564    pub fn update_function(&mut self, proto: &PbFunction) {
565        let database = self.get_database_mut(proto.database_id).unwrap();
566        let schema = database.get_schema_mut(proto.schema_id).unwrap();
567        if schema.get_function_by_id(proto.id).is_some() {
568            schema.update_function(proto);
569        } else {
570            // Enter this branch when schema is changed by `ALTER ... SET SCHEMA ...` statement.
571            schema.create_function(proto);
572            database
573                .iter_schemas_mut()
574                .find(|schema| {
575                    schema.id() != proto.schema_id && schema.get_function_by_id(proto.id).is_some()
576                })
577                .unwrap()
578                .drop_function(proto.id);
579        }
580
581        self.get_database_mut(proto.database_id)
582            .unwrap()
583            .get_schema_mut(proto.schema_id)
584            .unwrap()
585            .update_function(proto);
586    }
587
588    pub fn get_database_by_name(&self, db_name: &str) -> CatalogResult<&DatabaseCatalog> {
589        self.database_by_name
590            .get(db_name)
591            .ok_or_else(|| CatalogError::not_found("database", db_name))
592    }
593
594    pub fn get_database_by_id(&self, db_id: DatabaseId) -> CatalogResult<&DatabaseCatalog> {
595        let db_name = self
596            .db_name_by_id
597            .get(&db_id)
598            .ok_or_else(|| CatalogError::not_found("db_id", db_id.to_string()))?;
599        self.database_by_name
600            .get(db_name)
601            .ok_or_else(|| CatalogError::not_found("database", db_name))
602    }
603
604    pub fn find_schema_secret_by_secret_id(
605        &self,
606        db_name: &str,
607        secret_id: SecretId,
608    ) -> CatalogResult<(String, String)> {
609        let db = self.get_database_by_name(db_name)?;
610        let schema_secret = db
611            .iter_schemas()
612            .find_map(|schema| {
613                schema
614                    .get_secret_by_id(secret_id)
615                    .map(|secret| (schema.name(), secret.name.clone()))
616            })
617            .ok_or_else(|| CatalogError::not_found("secret", secret_id.to_string()))?;
618        Ok(schema_secret)
619    }
620
621    pub fn get_all_schema_names(&self, db_name: &str) -> CatalogResult<Vec<String>> {
622        Ok(self.get_database_by_name(db_name)?.get_all_schema_names())
623    }
624
625    pub fn iter_schemas(
626        &self,
627        db_name: &str,
628    ) -> CatalogResult<impl Iterator<Item = &SchemaCatalog>> {
629        Ok(self.get_database_by_name(db_name)?.iter_schemas())
630    }
631
632    pub fn get_all_database_names(&self) -> Vec<String> {
633        self.database_by_name.keys().cloned().collect_vec()
634    }
635
636    pub fn iter_databases(&self) -> impl Iterator<Item = &DatabaseCatalog> {
637        self.database_by_name.values()
638    }
639
640    pub fn get_schema_by_name(
641        &self,
642        db_name: &str,
643        schema_name: &str,
644    ) -> CatalogResult<&SchemaCatalog> {
645        self.get_database_by_name(db_name)?
646            .get_schema_by_name(schema_name)
647            .ok_or_else(|| CatalogError::not_found("schema", schema_name))
648    }
649
650    pub fn get_table_name_by_id(&self, table_id: TableId) -> CatalogResult<String> {
651        self.get_any_table_by_id(table_id)
652            .map(|table| table.name.clone())
653    }
654
655    pub fn get_schema_by_id(
656        &self,
657        db_id: DatabaseId,
658        schema_id: SchemaId,
659    ) -> CatalogResult<&SchemaCatalog> {
660        self.get_database_by_id(db_id)?
661            .get_schema_by_id(schema_id)
662            .ok_or_else(|| CatalogError::not_found("schema_id", schema_id.to_string()))
663    }
664
665    /// Refer to [`SearchPath`].
666    pub fn first_valid_schema(
667        &self,
668        db_name: &str,
669        search_path: &SearchPath,
670        user_name: &str,
671    ) -> CatalogResult<&SchemaCatalog> {
672        for path in search_path.real_path() {
673            let mut schema_name: &str = path;
674            if schema_name == USER_NAME_WILD_CARD {
675                schema_name = user_name;
676            }
677
678            if let schema_catalog @ Ok(_) = self.get_schema_by_name(db_name, schema_name) {
679                return schema_catalog;
680            }
681        }
682        Err(CatalogError::not_found(
683            "first valid schema",
684            "no schema has been selected to create in".to_owned(),
685        ))
686    }
687
688    pub fn get_source_by_id<'a>(
689        &self,
690        db_name: &'a str,
691        schema_path: SchemaPath<'a>,
692        source_id: SourceId,
693    ) -> CatalogResult<(&Arc<SourceCatalog>, &'a str)> {
694        schema_path
695            .try_find(|schema_name| -> CatalogResult<_> {
696                Ok(self
697                    .get_schema_by_name(db_name, schema_name)?
698                    .get_source_by_id(source_id))
699            })?
700            .ok_or_else(|| CatalogError::not_found("source", source_id.to_string()))
701    }
702
703    pub fn get_table_by_name<'a>(
704        &self,
705        db_name: &str,
706        schema_path: SchemaPath<'a>,
707        table_name: &str,
708        bind_creating: bool,
709    ) -> CatalogResult<(&Arc<TableCatalog>, &'a str)> {
710        schema_path
711            .try_find(|schema_name| -> CatalogResult<_> {
712                Ok(self
713                    .get_schema_by_name(db_name, schema_name)?
714                    .get_table_by_name(table_name, bind_creating))
715            })?
716            .ok_or_else(|| CatalogError::not_found("table", table_name))
717    }
718
719    /// Used to get `TableCatalog` for Materialized Views, Tables and Indexes.
720    /// Retrieves all tables, created or creating.
721    pub fn get_any_table_by_name<'a>(
722        &self,
723        db_name: &str,
724        schema_path: SchemaPath<'a>,
725        table_name: &str,
726    ) -> CatalogResult<(&Arc<TableCatalog>, &'a str)> {
727        self.get_table_by_name(db_name, schema_path, table_name, true)
728    }
729
730    /// Used to get `TableCatalog` for Materialized Views, Tables and Indexes.
731    /// Retrieves only created tables.
732    pub fn get_created_table_by_name<'a>(
733        &self,
734        db_name: &str,
735        schema_path: SchemaPath<'a>,
736        table_name: &str,
737    ) -> CatalogResult<(&Arc<TableCatalog>, &'a str)> {
738        self.get_table_by_name(db_name, schema_path, table_name, false)
739    }
740
741    pub fn get_any_table_by_id(&self, table_id: TableId) -> CatalogResult<&Arc<TableCatalog>> {
742        self.table_by_id
743            .get(&table_id)
744            .ok_or_else(|| CatalogError::not_found("table id", table_id.to_string()))
745    }
746
747    /// This function is similar to `get_table_by_id` expect that a table must be in a given database.
748    pub fn get_created_table_by_id_with_db(
749        &self,
750        db_name: &str,
751        table_id: u32,
752    ) -> CatalogResult<&Arc<TableCatalog>> {
753        let table_id = TableId::from(table_id);
754        for schema in self.get_database_by_name(db_name)?.iter_schemas() {
755            if let Some(table) = schema.get_created_table_by_id(table_id) {
756                return Ok(table);
757            }
758        }
759        Err(CatalogError::not_found("table id", table_id.to_string()))
760    }
761
762    pub fn iter_tables(&self) -> impl Iterator<Item = &Arc<TableCatalog>> {
763        self.table_by_id.values()
764    }
765
766    pub fn iter_backfilling_internal_tables(&self) -> impl Iterator<Item = &Arc<TableCatalog>> {
767        self.table_by_id
768            .values()
769            .filter(|t| t.is_internal_table() && !t.is_created())
770    }
771
772    // Used by test_utils only.
773    pub fn alter_table_name_by_id(&mut self, table_id: TableId, table_name: &str) {
774        let mut found = false;
775        for database in self.database_by_name.values() {
776            if !found {
777                for schema in database.iter_schemas() {
778                    if schema.iter_user_table().any(|t| t.id() == table_id) {
779                        found = true;
780                        break;
781                    }
782                }
783            }
784        }
785
786        if found {
787            let mut table = self.get_any_table_by_id(table_id).unwrap().to_prost();
788            table.name = table_name.to_owned();
789            self.update_table(&table);
790        }
791    }
792
793    #[cfg(test)]
794    pub fn insert_table_id_mapping(&mut self, table_id: TableId, fragment_id: super::FragmentId) {
795        self.table_by_id.insert(
796            table_id,
797            Arc::new(TableCatalog {
798                fragment_id,
799                ..Default::default()
800            }),
801        );
802    }
803
804    pub fn get_sys_table_by_name(
805        &self,
806        db_name: &str,
807        schema_name: &str,
808        table_name: &str,
809    ) -> CatalogResult<&Arc<SystemTableCatalog>> {
810        self.get_schema_by_name(db_name, schema_name)?
811            .get_system_table_by_name(table_name)
812            .ok_or_else(|| CatalogError::not_found("table", table_name))
813    }
814
815    pub fn get_source_by_name<'a>(
816        &self,
817        db_name: &str,
818        schema_path: SchemaPath<'a>,
819        source_name: &str,
820    ) -> CatalogResult<(&Arc<SourceCatalog>, &'a str)> {
821        schema_path
822            .try_find(|schema_name| -> CatalogResult<_> {
823                Ok(self
824                    .get_schema_by_name(db_name, schema_name)?
825                    .get_source_by_name(source_name))
826            })?
827            .ok_or_else(|| CatalogError::not_found("source", source_name))
828    }
829
830    pub fn get_sink_by_name<'a>(
831        &self,
832        db_name: &str,
833        schema_path: SchemaPath<'a>,
834        sink_name: &str,
835        bind_creating: bool,
836    ) -> CatalogResult<(&Arc<SinkCatalog>, &'a str)> {
837        schema_path
838            .try_find(|schema_name| -> CatalogResult<_> {
839                Ok(self
840                    .get_schema_by_name(db_name, schema_name)?
841                    .get_sink_by_name(sink_name, bind_creating))
842            })?
843            .ok_or_else(|| CatalogError::not_found("sink", sink_name))
844    }
845
846    pub fn get_any_sink_by_name<'a>(
847        &self,
848        db_name: &str,
849        schema_path: SchemaPath<'a>,
850        sink_name: &str,
851    ) -> CatalogResult<(&Arc<SinkCatalog>, &'a str)> {
852        self.get_sink_by_name(db_name, schema_path, sink_name, true)
853    }
854
855    pub fn get_created_sink_by_name<'a>(
856        &self,
857        db_name: &str,
858        schema_path: SchemaPath<'a>,
859        sink_name: &str,
860    ) -> CatalogResult<(&Arc<SinkCatalog>, &'a str)> {
861        self.get_sink_by_name(db_name, schema_path, sink_name, false)
862    }
863
864    pub fn get_subscription_by_name<'a>(
865        &self,
866        db_name: &str,
867        schema_path: SchemaPath<'a>,
868        subscription_name: &str,
869    ) -> CatalogResult<(&Arc<SubscriptionCatalog>, &'a str)> {
870        schema_path
871            .try_find(|schema_name| -> CatalogResult<_> {
872                Ok(self
873                    .get_schema_by_name(db_name, schema_name)?
874                    .get_subscription_by_name(subscription_name))
875            })?
876            .ok_or_else(|| CatalogError::not_found("subscription", subscription_name))
877    }
878
879    pub fn get_index_by_name<'a>(
880        &self,
881        db_name: &str,
882        schema_path: SchemaPath<'a>,
883        index_name: &str,
884    ) -> CatalogResult<(&Arc<IndexCatalog>, &'a str)> {
885        schema_path
886            .try_find(|schema_name| -> CatalogResult<_> {
887                Ok(self
888                    .get_schema_by_name(db_name, schema_name)?
889                    .get_created_index_by_name(index_name))
890            })?
891            .ok_or_else(|| CatalogError::not_found("index", index_name))
892    }
893
894    pub fn get_any_index_by_name<'a>(
895        &self,
896        db_name: &str,
897        schema_path: SchemaPath<'a>,
898        index_name: &str,
899    ) -> CatalogResult<(&Arc<IndexCatalog>, &'a str)> {
900        schema_path
901            .try_find(|schema_name| -> CatalogResult<_> {
902                Ok(self
903                    .get_schema_by_name(db_name, schema_name)?
904                    .get_any_index_by_name(index_name))
905            })?
906            .ok_or_else(|| CatalogError::not_found("index", index_name))
907    }
908
909    pub fn get_index_by_id(
910        &self,
911        db_name: &str,
912        index_id: u32,
913    ) -> CatalogResult<&Arc<IndexCatalog>> {
914        let index_id = IndexId::from(index_id);
915        for schema in self.get_database_by_name(db_name)?.iter_schemas() {
916            if let Some(index) = schema.get_index_by_id(index_id) {
917                return Ok(index);
918            }
919        }
920        Err(CatalogError::not_found("index", index_id.to_string()))
921    }
922
923    pub fn get_view_by_name<'a>(
924        &self,
925        db_name: &str,
926        schema_path: SchemaPath<'a>,
927        view_name: &str,
928    ) -> CatalogResult<(&Arc<ViewCatalog>, &'a str)> {
929        schema_path
930            .try_find(|schema_name| -> CatalogResult<_> {
931                Ok(self
932                    .get_schema_by_name(db_name, schema_name)?
933                    .get_view_by_name(view_name))
934            })?
935            .ok_or_else(|| CatalogError::not_found("view", view_name))
936    }
937
938    pub fn get_view_by_id(&self, db_name: &str, view_id: u32) -> CatalogResult<Arc<ViewCatalog>> {
939        for schema in self.get_database_by_name(db_name)?.iter_schemas() {
940            if let Some(view) = schema.get_view_by_id(ViewId::from(view_id)) {
941                return Ok(view.clone());
942            }
943        }
944        Err(CatalogError::not_found("view", view_id.to_string()))
945    }
946
947    pub fn get_secret_by_name<'a>(
948        &self,
949        db_name: &str,
950        schema_path: SchemaPath<'a>,
951        secret_name: &str,
952    ) -> CatalogResult<(&Arc<SecretCatalog>, &'a str)> {
953        schema_path
954            .try_find(|schema_name| -> CatalogResult<_> {
955                Ok(self
956                    .get_schema_by_name(db_name, schema_name)?
957                    .get_secret_by_name(secret_name))
958            })?
959            .ok_or_else(|| CatalogError::not_found("secret", secret_name))
960    }
961
962    pub fn get_connection_by_id(
963        &self,
964        db_name: &str,
965        connection_id: ConnectionId,
966    ) -> CatalogResult<&Arc<ConnectionCatalog>> {
967        for schema in self.get_database_by_name(db_name)?.iter_schemas() {
968            if let Some(conn) = schema.get_connection_by_id(connection_id) {
969                return Ok(conn);
970            }
971        }
972        Err(CatalogError::not_found(
973            "connection",
974            connection_id.to_string(),
975        ))
976    }
977
978    pub fn get_connection_by_name<'a>(
979        &self,
980        db_name: &str,
981        schema_path: SchemaPath<'a>,
982        connection_name: &str,
983    ) -> CatalogResult<(&Arc<ConnectionCatalog>, &'a str)> {
984        schema_path
985            .try_find(|schema_name| -> CatalogResult<_> {
986                Ok(self
987                    .get_schema_by_name(db_name, schema_name)?
988                    .get_connection_by_name(connection_name))
989            })?
990            .ok_or_else(|| CatalogError::not_found("connection", connection_name))
991    }
992
993    pub fn get_function_by_name_inputs<'a>(
994        &self,
995        db_name: &str,
996        schema_path: SchemaPath<'a>,
997        function_name: &str,
998        inputs: &mut [ExprImpl],
999    ) -> CatalogResult<(&Arc<FunctionCatalog>, &'a str)> {
1000        schema_path
1001            .try_find(|schema_name| -> CatalogResult<_> {
1002                Ok(self
1003                    .get_schema_by_name(db_name, schema_name)?
1004                    .get_function_by_name_inputs(function_name, inputs))
1005            })?
1006            .ok_or_else(|| {
1007                CatalogError::not_found(
1008                    "function",
1009                    format!(
1010                        "{}({})",
1011                        function_name,
1012                        inputs
1013                            .iter()
1014                            .map(|a| a.return_type().to_string())
1015                            .join(", ")
1016                    ),
1017                )
1018            })
1019    }
1020
1021    pub fn get_function_by_name_args<'a>(
1022        &self,
1023        db_name: &str,
1024        schema_path: SchemaPath<'a>,
1025        function_name: &str,
1026        args: &[DataType],
1027    ) -> CatalogResult<(&Arc<FunctionCatalog>, &'a str)> {
1028        schema_path
1029            .try_find(|schema_name| -> CatalogResult<_> {
1030                Ok(self
1031                    .get_schema_by_name(db_name, schema_name)?
1032                    .get_function_by_name_args(function_name, args))
1033            })?
1034            .ok_or_else(|| {
1035                CatalogError::not_found(
1036                    "function",
1037                    format!(
1038                        "{}({})",
1039                        function_name,
1040                        args.iter().map(|a| a.to_string()).join(", ")
1041                    ),
1042                )
1043            })
1044    }
1045
1046    /// Gets all functions with the given name.
1047    pub fn get_functions_by_name<'a>(
1048        &self,
1049        db_name: &str,
1050        schema_path: SchemaPath<'a>,
1051        function_name: &str,
1052    ) -> CatalogResult<(Vec<&Arc<FunctionCatalog>>, &'a str)> {
1053        schema_path
1054            .try_find(|schema_name| -> CatalogResult<_> {
1055                Ok(self
1056                    .get_schema_by_name(db_name, schema_name)?
1057                    .get_functions_by_name(function_name))
1058            })?
1059            .ok_or_else(|| CatalogError::not_found("function", function_name))
1060    }
1061
1062    /// Check if the name duplicates with existing table, materialized view or source.
1063    pub fn check_relation_name_duplicated(
1064        &self,
1065        db_name: &str,
1066        schema_name: &str,
1067        relation_name: &str,
1068    ) -> CatalogResult<()> {
1069        let schema = self.get_schema_by_name(db_name, schema_name)?;
1070
1071        if let Some(table) = schema.get_any_table_by_name(relation_name) {
1072            let is_creating = table.stream_job_status == StreamJobStatus::Creating;
1073            if table.is_index() {
1074                Err(CatalogError::duplicated_under_creation(
1075                    "index",
1076                    relation_name.to_owned(),
1077                    is_creating,
1078                ))
1079            } else if table.is_mview() {
1080                Err(CatalogError::duplicated_under_creation(
1081                    "materialized view",
1082                    relation_name.to_owned(),
1083                    is_creating,
1084                ))
1085            } else {
1086                Err(CatalogError::duplicated_under_creation(
1087                    "table",
1088                    relation_name.to_owned(),
1089                    is_creating,
1090                ))
1091            }
1092        } else if schema.get_source_by_name(relation_name).is_some() {
1093            Err(CatalogError::duplicated("source", relation_name))
1094        } else if let Some(sink) = schema.get_any_sink_by_name(relation_name) {
1095            Err(CatalogError::duplicated_under_creation(
1096                "sink",
1097                relation_name.to_owned(),
1098                !sink.is_created(),
1099            ))
1100        } else if schema.get_view_by_name(relation_name).is_some() {
1101            Err(CatalogError::duplicated("view", relation_name))
1102        } else if let Some(subscription) = schema.get_subscription_by_name(relation_name) {
1103            let is_not_created = subscription.subscription_state != SubscriptionState::Created;
1104            Err(CatalogError::duplicated_under_creation(
1105                "subscription",
1106                relation_name.to_owned(),
1107                is_not_created,
1108            ))
1109        } else {
1110            Ok(())
1111        }
1112    }
1113
1114    pub fn check_function_name_duplicated(
1115        &self,
1116        db_name: &str,
1117        schema_name: &str,
1118        function_name: &str,
1119        arg_types: &[DataType],
1120    ) -> CatalogResult<()> {
1121        let schema = self.get_schema_by_name(db_name, schema_name)?;
1122
1123        if schema
1124            .get_function_by_name_args(function_name, arg_types)
1125            .is_some()
1126        {
1127            let name = format!(
1128                "{function_name}({})",
1129                arg_types.iter().map(|t| t.to_string()).join(",")
1130            );
1131            Err(CatalogError::duplicated("function", name))
1132        } else {
1133            Ok(())
1134        }
1135    }
1136
1137    /// Check if the name duplicates with existing connection.
1138    pub fn check_connection_name_duplicated(
1139        &self,
1140        db_name: &str,
1141        schema_name: &str,
1142        connection_name: &str,
1143    ) -> CatalogResult<()> {
1144        let schema = self.get_schema_by_name(db_name, schema_name)?;
1145
1146        if schema.get_connection_by_name(connection_name).is_some() {
1147            Err(CatalogError::duplicated("connection", connection_name))
1148        } else {
1149            Ok(())
1150        }
1151    }
1152
1153    pub fn check_secret_name_duplicated(
1154        &self,
1155        db_name: &str,
1156        schema_name: &str,
1157        secret_name: &str,
1158    ) -> CatalogResult<()> {
1159        let schema = self.get_schema_by_name(db_name, schema_name)?;
1160
1161        if schema.get_secret_by_name(secret_name).is_some() {
1162            Err(CatalogError::duplicated("secret", secret_name))
1163        } else {
1164            Ok(())
1165        }
1166    }
1167
1168    pub fn table_stats(&self) -> &HummockVersionStats {
1169        &self.table_stats
1170    }
1171
1172    pub fn set_table_stats(&mut self, table_stats: HummockVersionStats) {
1173        self.table_stats = table_stats;
1174    }
1175
1176    pub fn set_object_dependencies(&mut self, dependencies: Vec<PbObjectDependency>) {
1177        self.object_dependencies.clear();
1178        self.insert_object_dependencies(dependencies);
1179    }
1180
1181    pub fn insert_object_dependencies(&mut self, dependencies: Vec<PbObjectDependency>) {
1182        let mut grouped: HashMap<ObjectId, Vec<ObjectDependency>> = HashMap::new();
1183        for dependency in dependencies {
1184            let dependency = ObjectDependency::from(dependency);
1185            grouped
1186                .entry(dependency.object_id)
1187                .or_default()
1188                .push(dependency);
1189        }
1190        for (object_id, deps) in grouped {
1191            self.object_dependencies.insert(object_id, deps);
1192        }
1193    }
1194
1195    pub fn iter_object_dependencies(&self) -> impl Iterator<Item = &ObjectDependency> {
1196        self.object_dependencies
1197            .values()
1198            .flat_map(|deps| deps.iter())
1199    }
1200
1201    fn remove_object_dependencies_for_object(&mut self, object_id: ObjectId) {
1202        let mut object_ids = HashSet::new();
1203        object_ids.insert(object_id);
1204        self.remove_object_dependencies_for_objects(&object_ids);
1205    }
1206
1207    fn remove_object_dependencies_for_objects(&mut self, object_ids: &HashSet<ObjectId>) {
1208        if object_ids.is_empty() {
1209            return;
1210        }
1211        self.object_dependencies.retain(|object_id, deps| {
1212            if object_ids.contains(object_id) {
1213                return false;
1214            }
1215            deps.retain(|dep| !object_ids.contains(&dep.referenced_object_id));
1216            !deps.is_empty()
1217        });
1218    }
1219
1220    pub fn get_all_indexes_related_to_object(
1221        &self,
1222        db_id: DatabaseId,
1223        schema_id: SchemaId,
1224        mv_id: TableId,
1225    ) -> Vec<Arc<IndexCatalog>> {
1226        self.get_database_by_id(db_id)
1227            .unwrap()
1228            .get_schema_by_id(schema_id)
1229            .unwrap()
1230            .get_any_indexes_by_table_id(mv_id)
1231    }
1232
1233    pub fn get_id_by_class_name(
1234        &self,
1235        db_name: &str,
1236        schema_path: SchemaPath<'_>,
1237        class_name: &str,
1238    ) -> CatalogResult<ObjectId> {
1239        schema_path
1240            .try_find(|schema_name| -> CatalogResult<_> {
1241                let schema = self.get_schema_by_name(db_name, schema_name)?;
1242
1243                if let Some(item) = schema.get_system_table_by_name(class_name) {
1244                    Ok(Some(item.id().as_object_id()))
1245                } else if let Some(item) = schema.get_any_table_by_name(class_name) {
1246                    Ok(Some(item.id().as_object_id()))
1247                } else if let Some(item) = schema.get_any_index_by_name(class_name) {
1248                    Ok(Some(item.id.as_object_id()))
1249                } else if let Some(item) = schema.get_source_by_name(class_name) {
1250                    Ok(Some(item.id.as_object_id()))
1251                } else if let Some(item) = schema.get_view_by_name(class_name) {
1252                    Ok(Some(item.id.as_object_id()))
1253                } else if let Some(item) = schema.get_any_sink_by_name(class_name) {
1254                    Ok(Some(item.id.as_object_id()))
1255                } else {
1256                    Ok(None)
1257                }
1258            })?
1259            .map(|(id, _)| id)
1260            .ok_or_else(|| CatalogError::not_found("class", class_name))
1261    }
1262}