risingwave_frontend/catalog/
schema_catalog.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry::{Occupied, Vacant};
16use std::collections::{HashMap, HashSet};
17use std::sync::Arc;
18
19use itertools::Itertools;
20use risingwave_common::catalog::{FunctionId, IndexId, ObjectId, StreamJobStatus, TableId};
21use risingwave_common::types::DataType;
22use risingwave_connector::sink::catalog::SinkCatalog;
23pub use risingwave_expr::sig::*;
24use risingwave_pb::catalog::{
25    PbConnection, PbFunction, PbIndex, PbSchema, PbSecret, PbSink, PbSource, PbSubscription,
26    PbTable, PbView,
27};
28use risingwave_pb::user::grant_privilege::Object;
29
30use super::subscription_catalog::SubscriptionCatalog;
31use super::{OwnedByUserCatalog, OwnedGrantObject, SubscriptionId};
32use crate::catalog::connection_catalog::ConnectionCatalog;
33use crate::catalog::function_catalog::FunctionCatalog;
34use crate::catalog::index_catalog::IndexCatalog;
35use crate::catalog::secret_catalog::SecretCatalog;
36use crate::catalog::source_catalog::SourceCatalog;
37use crate::catalog::system_catalog::SystemTableCatalog;
38use crate::catalog::table_catalog::TableCatalog;
39use crate::catalog::view_catalog::ViewCatalog;
40use crate::catalog::{ConnectionId, DatabaseId, SchemaId, SecretId, SinkId, SourceId, ViewId};
41use crate::expr::{Expr, ExprImpl, infer_type_name, infer_type_with_sigmap};
42use crate::user::user_catalog::UserCatalog;
43use crate::user::{UserId, has_access_to_object};
44
45#[derive(Clone, Debug)]
46pub struct SchemaCatalog {
47    id: SchemaId,
48    pub name: String,
49    pub database_id: DatabaseId,
50    /// Contains [all types of "tables"](super::table_catalog::TableType), not only user tables.
51    table_by_name: HashMap<String, Arc<TableCatalog>>,
52    /// Contains [all types of "tables"](super::table_catalog::TableType), not only user tables.
53    table_by_id: HashMap<TableId, Arc<TableCatalog>>,
54    source_by_name: HashMap<String, Arc<SourceCatalog>>,
55    source_by_id: HashMap<SourceId, Arc<SourceCatalog>>,
56    sink_by_name: HashMap<String, Arc<SinkCatalog>>,
57    sink_by_id: HashMap<SinkId, Arc<SinkCatalog>>,
58    /// reverted index of (`sink.target_table` -> `sink_id`s)
59    table_incoming_sinks: HashMap<TableId, HashSet<SinkId>>,
60    subscription_by_name: HashMap<String, Arc<SubscriptionCatalog>>,
61    subscription_by_id: HashMap<SubscriptionId, Arc<SubscriptionCatalog>>,
62    index_by_name: HashMap<String, Arc<IndexCatalog>>,
63    index_by_id: HashMap<IndexId, Arc<IndexCatalog>>,
64    indexes_by_table_id: HashMap<TableId, Vec<Arc<IndexCatalog>>>,
65    view_by_name: HashMap<String, Arc<ViewCatalog>>,
66    view_by_id: HashMap<ViewId, Arc<ViewCatalog>>,
67    function_registry: FunctionRegistry,
68    function_by_name: HashMap<String, HashMap<Vec<DataType>, Arc<FunctionCatalog>>>,
69    function_by_id: HashMap<FunctionId, Arc<FunctionCatalog>>,
70    connection_by_name: HashMap<String, Arc<ConnectionCatalog>>,
71    connection_by_id: HashMap<ConnectionId, Arc<ConnectionCatalog>>,
72    secret_by_name: HashMap<String, Arc<SecretCatalog>>,
73    secret_by_id: HashMap<SecretId, Arc<SecretCatalog>>,
74
75    _secret_source_ref: HashMap<SecretId, Vec<SourceId>>,
76    _secret_sink_ref: HashMap<SecretId, Vec<SinkId>>,
77
78    // This field is currently used only for `show connections`
79    connection_source_ref: HashMap<ConnectionId, Vec<SourceId>>,
80    // This field is currently used only for `show connections`
81    connection_sink_ref: HashMap<ConnectionId, Vec<SinkId>>,
82    // This field only available when schema is "pg_catalog". Meanwhile, others will be empty.
83    system_table_by_name: HashMap<String, Arc<SystemTableCatalog>>,
84    pub owner: u32,
85}
86
87impl SchemaCatalog {
88    pub fn create_table(&mut self, prost: &PbTable) -> Arc<TableCatalog> {
89        let name = prost.name.clone();
90        let id = prost.id;
91        let table: TableCatalog = prost.into();
92        let table_ref = Arc::new(table);
93
94        self.table_by_name
95            .try_insert(name, table_ref.clone())
96            .unwrap();
97        self.table_by_id.try_insert(id, table_ref.clone()).unwrap();
98        table_ref
99    }
100
101    pub fn create_sys_table(&mut self, sys_table: Arc<SystemTableCatalog>) {
102        self.system_table_by_name
103            .try_insert(sys_table.name.clone(), sys_table)
104            .unwrap();
105    }
106
107    pub fn create_sys_view(&mut self, sys_view: Arc<ViewCatalog>) {
108        self.view_by_name
109            .try_insert(sys_view.name().to_owned(), sys_view.clone())
110            .unwrap();
111        self.view_by_id
112            .try_insert(sys_view.id, sys_view.clone())
113            .unwrap();
114    }
115
116    pub fn update_table(&mut self, prost: &PbTable) -> Arc<TableCatalog> {
117        let name = prost.name.clone();
118        let id = prost.id;
119        let table: TableCatalog = prost.into();
120        let table_ref = Arc::new(table);
121
122        let old_table = self.table_by_id.get(&id).unwrap();
123        // check if the table name gets updated.
124        if old_table.name() != name
125            && let Some(t) = self.table_by_name.get(old_table.name())
126            && t.id == id
127        {
128            self.table_by_name.remove(old_table.name());
129        }
130
131        self.table_by_name.insert(name, table_ref.clone());
132        self.table_by_id.insert(id, table_ref.clone());
133        table_ref
134    }
135
136    pub fn update_index(&mut self, prost: &PbIndex) {
137        let name = prost.name.clone();
138        let id = prost.id;
139        let old_index = self.index_by_id.get(&id).unwrap();
140        let index_table = self.get_created_table_by_id(prost.index_table_id).unwrap();
141        let primary_table = self
142            .get_created_table_by_id(prost.primary_table_id)
143            .unwrap();
144        let index: IndexCatalog = IndexCatalog::build_from(prost, index_table, primary_table);
145        let index_ref = Arc::new(index);
146
147        // check if the index name gets updated.
148        if old_index.name != name
149            && let Some(idx) = self.index_by_name.get(&old_index.name)
150            && idx.id == id
151        {
152            self.index_by_name.remove(&old_index.name);
153        }
154        self.index_by_name.insert(name, index_ref.clone());
155        self.index_by_id.insert(id, index_ref.clone());
156
157        match self.indexes_by_table_id.entry(index_ref.primary_table.id) {
158            Occupied(mut entry) => {
159                let pos = entry
160                    .get()
161                    .iter()
162                    .position(|x| x.id == index_ref.id)
163                    .unwrap();
164                *entry.get_mut().get_mut(pos).unwrap() = index_ref;
165            }
166            Vacant(_entry) => {
167                unreachable!()
168            }
169        };
170    }
171
172    pub fn drop_table(&mut self, id: TableId) {
173        if let Some(table_ref) = self.table_by_id.remove(&id) {
174            self.table_by_name.remove(&table_ref.name).unwrap();
175            self.indexes_by_table_id.remove(&table_ref.id);
176        } else {
177            tracing::warn!(
178                %id,
179                "table not found when dropping, frontend might not be notified yet"
180            );
181        }
182    }
183
184    pub fn create_index(&mut self, prost: &PbIndex) {
185        let name = prost.name.clone();
186        let id = prost.id;
187        let index_table = self.get_table_by_id(prost.index_table_id).unwrap();
188        let primary_table = self
189            .get_created_table_by_id(prost.primary_table_id)
190            .unwrap();
191        let index: IndexCatalog = IndexCatalog::build_from(prost, index_table, primary_table);
192        let index_ref = Arc::new(index);
193
194        self.index_by_name
195            .try_insert(name, index_ref.clone())
196            .unwrap();
197        self.index_by_id.try_insert(id, index_ref.clone()).unwrap();
198        match self.indexes_by_table_id.entry(index_ref.primary_table.id) {
199            Occupied(mut entry) => {
200                entry.get_mut().push(index_ref);
201            }
202            Vacant(entry) => {
203                entry.insert(vec![index_ref]);
204            }
205        };
206    }
207
208    pub fn drop_index(&mut self, id: IndexId) {
209        let index_ref = self.index_by_id.remove(&id).unwrap();
210        self.index_by_name.remove(&index_ref.name).unwrap();
211        match self.indexes_by_table_id.entry(index_ref.primary_table.id) {
212            Occupied(mut entry) => {
213                let pos = entry
214                    .get_mut()
215                    .iter()
216                    .position(|x| x.id == index_ref.id)
217                    .unwrap();
218                entry.get_mut().remove(pos);
219            }
220            Vacant(_entry) => (),
221        };
222    }
223
224    pub fn create_source(&mut self, prost: &PbSource) {
225        let name = prost.name.clone();
226        let id = prost.id;
227        let source = SourceCatalog::from(prost);
228        let source_ref = Arc::new(source);
229
230        if let Some(connection_id) = source_ref.connection_id {
231            self.connection_source_ref
232                .entry(connection_id)
233                .and_modify(|sources| sources.push(source_ref.id))
234                .or_insert(vec![source_ref.id]);
235        }
236
237        self.source_by_name
238            .try_insert(name, source_ref.clone())
239            .unwrap();
240        self.source_by_id.try_insert(id, source_ref).unwrap();
241    }
242
243    pub fn drop_source(&mut self, id: SourceId) {
244        let source_ref = self.source_by_id.remove(&id).unwrap();
245        self.source_by_name.remove(&source_ref.name).unwrap();
246        if let Some(connection_id) = source_ref.connection_id
247            && let Occupied(mut e) = self.connection_source_ref.entry(connection_id)
248        {
249            let source_ids = e.get_mut();
250            source_ids.retain_mut(|sid| *sid != id);
251            if source_ids.is_empty() {
252                e.remove_entry();
253            }
254        }
255    }
256
257    pub fn update_source(&mut self, prost: &PbSource) {
258        let name = prost.name.clone();
259        let id = prost.id;
260        let source = SourceCatalog::from(prost);
261        let source_ref = Arc::new(source);
262
263        let old_source = self.source_by_id.get(&id).unwrap();
264        // check if the source name gets updated.
265        if old_source.name != name
266            && let Some(src) = self.source_by_name.get(&old_source.name)
267            && src.id == id
268        {
269            self.source_by_name.remove(&old_source.name);
270        }
271
272        self.source_by_name.insert(name, source_ref.clone());
273        self.source_by_id.insert(id, source_ref);
274    }
275
276    pub fn create_sink(&mut self, prost: &PbSink) {
277        let name = prost.name.clone();
278        let id = prost.id;
279        let sink = SinkCatalog::from(prost);
280        let sink_ref = Arc::new(sink);
281
282        if let Some(connection_id) = sink_ref.connection_id {
283            self.connection_sink_ref
284                .entry(connection_id)
285                .and_modify(|sinks| sinks.push(id))
286                .or_insert(vec![id]);
287        }
288
289        if let Some(target_table) = sink_ref.target_table {
290            assert!(
291                self.table_incoming_sinks
292                    .entry(target_table)
293                    .or_default()
294                    .insert(sink_ref.id)
295            );
296        }
297
298        self.sink_by_name
299            .try_insert(name, sink_ref.clone())
300            .unwrap();
301        self.sink_by_id.try_insert(id, sink_ref).unwrap();
302    }
303
304    pub fn drop_sink(&mut self, id: SinkId) {
305        if let Some(sink_ref) = self.sink_by_id.remove(&id) {
306            self.sink_by_name.remove(&sink_ref.name).unwrap();
307            if let Some(connection_id) = sink_ref.connection_id
308                && let Occupied(mut e) = self.connection_sink_ref.entry(connection_id)
309            {
310                let sink_ids = e.get_mut();
311                sink_ids.retain_mut(|sid| *sid != id);
312                if sink_ids.is_empty() {
313                    e.remove_entry();
314                }
315            }
316            if let Some(target_table) = sink_ref.target_table {
317                let incoming_sinks = self
318                    .table_incoming_sinks
319                    .get_mut(&target_table)
320                    .expect("should exists");
321                assert!(incoming_sinks.remove(&sink_ref.id));
322                if incoming_sinks.is_empty() {
323                    self.table_incoming_sinks.remove(&target_table);
324                }
325            }
326        } else {
327            tracing::warn!(
328                %id,
329                "sink not found when dropping, frontend might not be notified yet"
330            );
331        }
332    }
333
334    pub fn update_sink(&mut self, prost: &PbSink) {
335        let name = prost.name.clone();
336        let id = prost.id;
337        let sink = SinkCatalog::from(prost);
338        let sink_ref = Arc::new(sink);
339
340        let old_sink = self.sink_by_id.get(&id).unwrap();
341        assert_eq!(sink_ref.target_table, old_sink.target_table);
342        // check if the sink name gets updated.
343        if old_sink.name != name
344            && let Some(s) = self.sink_by_name.get(&old_sink.name)
345            && s.id == id
346        {
347            self.sink_by_name.remove(&old_sink.name);
348        }
349
350        self.sink_by_name.insert(name, sink_ref.clone());
351        self.sink_by_id.insert(id, sink_ref);
352    }
353
354    pub fn table_incoming_sinks(&self, table_id: TableId) -> Option<&HashSet<SinkId>> {
355        self.table_incoming_sinks.get(&table_id)
356    }
357
358    pub fn create_subscription(&mut self, prost: &PbSubscription) {
359        let name = prost.name.clone();
360        let id = prost.id;
361        let subscription_catalog = SubscriptionCatalog::from(prost);
362        let subscription_ref = Arc::new(subscription_catalog);
363
364        self.subscription_by_name
365            .try_insert(name, subscription_ref.clone())
366            .unwrap();
367        self.subscription_by_id
368            .try_insert(id, subscription_ref)
369            .unwrap();
370    }
371
372    pub fn drop_subscription(&mut self, id: SubscriptionId) {
373        let subscription_ref = self.subscription_by_id.remove(&id);
374        if let Some(subscription_ref) = subscription_ref {
375            self.subscription_by_name.remove(&subscription_ref.name);
376        }
377    }
378
379    pub fn update_subscription(&mut self, prost: &PbSubscription) {
380        let name = prost.name.clone();
381        let id = prost.id;
382        let subscription = SubscriptionCatalog::from(prost);
383        let subscription_ref = Arc::new(subscription);
384
385        let old_subscription = self.subscription_by_id.get(&id).unwrap();
386        // check if the subscription name gets updated.
387        if old_subscription.name != name
388            && let Some(s) = self.subscription_by_name.get(&old_subscription.name)
389            && s.id == id
390        {
391            self.subscription_by_name.remove(&old_subscription.name);
392        }
393
394        self.subscription_by_name
395            .insert(name, subscription_ref.clone());
396        self.subscription_by_id.insert(id, subscription_ref);
397    }
398
399    pub fn create_view(&mut self, prost: &PbView) {
400        let name = prost.name.clone();
401        let id = prost.id;
402        let view = ViewCatalog::from(prost);
403        let view_ref = Arc::new(view);
404
405        self.view_by_name
406            .try_insert(name, view_ref.clone())
407            .unwrap();
408        self.view_by_id.try_insert(id, view_ref).unwrap();
409    }
410
411    pub fn drop_view(&mut self, id: ViewId) {
412        let view_ref = self.view_by_id.remove(&id).unwrap();
413        self.view_by_name.remove(&view_ref.name).unwrap();
414    }
415
416    pub fn update_view(&mut self, prost: &PbView) {
417        let name = prost.name.clone();
418        let id = prost.id;
419        let view = ViewCatalog::from(prost);
420        let view_ref = Arc::new(view);
421
422        let old_view = self.view_by_id.get(&id).unwrap();
423        // check if the view name gets updated.
424        if old_view.name != name
425            && let Some(v) = self.view_by_name.get(old_view.name())
426            && v.id == id
427        {
428            self.view_by_name.remove(&old_view.name);
429        }
430
431        self.view_by_name.insert(name, view_ref.clone());
432        self.view_by_id.insert(id, view_ref);
433    }
434
435    pub fn get_func_sign(func: &FunctionCatalog) -> FuncSign {
436        FuncSign {
437            name: FuncName::Udf(func.name.clone()),
438            inputs_type: func
439                .arg_types
440                .iter()
441                .map(|t| t.clone().into())
442                .collect_vec(),
443            variadic: false,
444            ret_type: func.return_type.clone().into(),
445            build: FuncBuilder::Udf,
446            // dummy type infer, will not use this result
447            type_infer: |_| Ok(DataType::Boolean),
448            deprecated: false,
449        }
450    }
451
452    pub fn create_function(&mut self, prost: &PbFunction) {
453        let name = prost.name.clone();
454        let id = prost.id;
455        let function = FunctionCatalog::from(prost);
456        let args = function.arg_types.clone();
457        let function_ref = Arc::new(function);
458
459        self.function_registry
460            .insert(Self::get_func_sign(&function_ref));
461        self.function_by_name
462            .entry(name)
463            .or_default()
464            .try_insert(args, function_ref.clone())
465            .expect("function already exists with same argument types");
466        self.function_by_id
467            .try_insert(id, function_ref)
468            .expect("function id exists");
469    }
470
471    pub fn drop_function(&mut self, id: FunctionId) {
472        let function_ref = self
473            .function_by_id
474            .remove(&id)
475            .expect("function not found by id");
476
477        self.function_registry
478            .remove(Self::get_func_sign(&function_ref))
479            .expect("function not found in registry");
480
481        self.function_by_name
482            .get_mut(&function_ref.name)
483            .expect("function not found by name")
484            .remove(&function_ref.arg_types)
485            .expect("function not found by argument types");
486    }
487
488    pub fn update_function(&mut self, prost: &PbFunction) {
489        let name = prost.name.clone();
490        let id = prost.id;
491        let function = FunctionCatalog::from(prost);
492        let function_ref = Arc::new(function);
493
494        let old_function_by_id = self.function_by_id.get(&id).unwrap();
495        let old_function_by_name = self
496            .function_by_name
497            .get_mut(&old_function_by_id.name)
498            .unwrap();
499        // check if the function name gets updated.
500        if old_function_by_id.name != name
501            && let Some(f) = old_function_by_name.get(&old_function_by_id.arg_types)
502            && f.id == id
503        {
504            old_function_by_name.remove(&old_function_by_id.arg_types);
505            if old_function_by_name.is_empty() {
506                self.function_by_name.remove(&old_function_by_id.name);
507            }
508        }
509
510        self.function_by_name
511            .entry(name)
512            .or_default()
513            .insert(old_function_by_id.arg_types.clone(), function_ref.clone());
514        self.function_by_id.insert(id, function_ref);
515    }
516
517    pub fn create_connection(&mut self, prost: &PbConnection) {
518        let name = prost.name.clone();
519        let id = prost.id;
520        let connection = ConnectionCatalog::from(prost);
521        let connection_ref = Arc::new(connection);
522        self.connection_by_name
523            .try_insert(name, connection_ref.clone())
524            .unwrap();
525        self.connection_by_id
526            .try_insert(id, connection_ref)
527            .unwrap();
528    }
529
530    pub fn update_connection(&mut self, prost: &PbConnection) {
531        let name = prost.name.clone();
532        let id = prost.id;
533        let connection = ConnectionCatalog::from(prost);
534        let connection_ref = Arc::new(connection);
535
536        let old_connection = self.connection_by_id.get(&id).unwrap();
537        // check if the connection name gets updated.
538        if old_connection.name != name
539            && let Some(conn) = self.connection_by_name.get(&old_connection.name)
540            && conn.id == id
541        {
542            self.connection_by_name.remove(&old_connection.name);
543        }
544
545        self.connection_by_name.insert(name, connection_ref.clone());
546        self.connection_by_id.insert(id, connection_ref);
547    }
548
549    pub fn drop_connection(&mut self, connection_id: ConnectionId) {
550        let connection_ref = self
551            .connection_by_id
552            .remove(&connection_id)
553            .expect("connection not found by id");
554        self.connection_by_name
555            .remove(&connection_ref.name)
556            .expect("connection not found by name");
557    }
558
559    pub fn create_secret(&mut self, prost: &PbSecret) {
560        let name = prost.name.clone();
561        let id = prost.id;
562        let secret = SecretCatalog::from(prost);
563        let secret_ref = Arc::new(secret);
564
565        self.secret_by_id
566            .try_insert(id, secret_ref.clone())
567            .unwrap();
568        self.secret_by_name.try_insert(name, secret_ref).unwrap();
569    }
570
571    pub fn update_secret(&mut self, prost: &PbSecret) {
572        let name = prost.name.clone();
573        let id = prost.id;
574        let secret = SecretCatalog::from(prost);
575        let secret_ref = Arc::new(secret);
576
577        let old_secret = self.secret_by_id.get(&id).unwrap();
578        // check if the secret name gets updated.
579        if old_secret.name != name
580            && let Some(s) = self.secret_by_name.get(&old_secret.name)
581            && s.id == id
582        {
583            self.secret_by_name.remove(&old_secret.name);
584        }
585
586        self.secret_by_name.insert(name, secret_ref.clone());
587        self.secret_by_id.insert(id, secret_ref);
588    }
589
590    pub fn drop_secret(&mut self, secret_id: SecretId) {
591        let secret_ref = self
592            .secret_by_id
593            .remove(&secret_id)
594            .expect("secret not found by id");
595        self.secret_by_name
596            .remove(&secret_ref.name)
597            .expect("secret not found by name");
598    }
599
600    pub fn iter_all(&self) -> impl Iterator<Item = &Arc<TableCatalog>> {
601        self.table_by_name.values()
602    }
603
604    pub fn iter_user_table(&self) -> impl Iterator<Item = &Arc<TableCatalog>> {
605        self.table_by_name.values().filter(|v| v.is_user_table())
606    }
607
608    pub fn iter_user_table_with_acl<'a>(
609        &'a self,
610        user: &'a UserCatalog,
611    ) -> impl Iterator<Item = &'a Arc<TableCatalog>> {
612        self.table_by_name
613            .values()
614            .filter(|v| v.is_user_table() && has_access_to_object(user, v.id, v.owner))
615    }
616
617    pub fn iter_internal_table(&self) -> impl Iterator<Item = &Arc<TableCatalog>> {
618        self.table_by_name
619            .values()
620            .filter(|v| v.is_internal_table())
621    }
622
623    pub fn iter_internal_table_with_acl<'a>(
624        &'a self,
625        user: &'a UserCatalog,
626    ) -> impl Iterator<Item = &'a Arc<TableCatalog>> {
627        self.table_by_name
628            .values()
629            .filter(|v| v.is_internal_table() && has_access_to_object(user, v.id, v.owner))
630    }
631
632    /// Iterate all non-internal tables, including user tables, materialized views and indices.
633    pub fn iter_table_mv_indices(&self) -> impl Iterator<Item = &Arc<TableCatalog>> {
634        self.table_by_name
635            .values()
636            .filter(|v| !v.is_internal_table())
637    }
638
639    pub fn iter_table_mv_indices_with_acl<'a>(
640        &'a self,
641        user: &'a UserCatalog,
642    ) -> impl Iterator<Item = &'a Arc<TableCatalog>> {
643        self.table_by_name
644            .values()
645            .filter(|v| !v.is_internal_table() && has_access_to_object(user, v.id, v.owner))
646    }
647
648    /// Iterate all materialized views, excluding the indices.
649    pub fn iter_all_mvs(&self) -> impl Iterator<Item = &Arc<TableCatalog>> {
650        self.table_by_name.values().filter(|v| v.is_mview())
651    }
652
653    pub fn iter_all_mvs_with_acl<'a>(
654        &'a self,
655        user: &'a UserCatalog,
656    ) -> impl Iterator<Item = &'a Arc<TableCatalog>> {
657        self.table_by_name
658            .values()
659            .filter(|v| v.is_mview() && has_access_to_object(user, v.id, v.owner))
660    }
661
662    /// Iterate created materialized views, excluding the indices.
663    pub fn iter_created_mvs(&self) -> impl Iterator<Item = &Arc<TableCatalog>> {
664        self.table_by_name
665            .values()
666            .filter(|v| v.is_mview() && v.is_created())
667    }
668
669    pub fn iter_created_mvs_with_acl<'a>(
670        &'a self,
671        user: &'a UserCatalog,
672    ) -> impl Iterator<Item = &'a Arc<TableCatalog>> {
673        self.table_by_name
674            .values()
675            .filter(|v| v.is_mview() && v.is_created() && has_access_to_object(user, v.id, v.owner))
676    }
677
678    /// Iterate all indices
679    pub fn iter_index(&self) -> impl Iterator<Item = &Arc<IndexCatalog>> {
680        self.index_by_name.values()
681    }
682
683    pub fn iter_index_with_acl<'a>(
684        &'a self,
685        user: &'a UserCatalog,
686    ) -> impl Iterator<Item = &'a Arc<IndexCatalog>> {
687        self.index_by_name
688            .values()
689            .filter(|idx| has_access_to_object(user, idx.id, idx.owner()))
690    }
691
692    /// Iterate all sources
693    pub fn iter_source(&self) -> impl Iterator<Item = &Arc<SourceCatalog>> {
694        self.source_by_name.values()
695    }
696
697    pub fn iter_source_with_acl<'a>(
698        &'a self,
699        user: &'a UserCatalog,
700    ) -> impl Iterator<Item = &'a Arc<SourceCatalog>> {
701        self.source_by_name
702            .values()
703            .filter(|s| has_access_to_object(user, s.id, s.owner))
704    }
705
706    pub fn iter_sink(&self) -> impl Iterator<Item = &Arc<SinkCatalog>> {
707        self.sink_by_name.values()
708    }
709
710    pub fn iter_sink_with_acl<'a>(
711        &'a self,
712        user: &'a UserCatalog,
713    ) -> impl Iterator<Item = &'a Arc<SinkCatalog>> {
714        self.sink_by_name
715            .values()
716            .filter(|s| has_access_to_object(user, s.id, s.owner.user_id))
717    }
718
719    pub fn iter_subscription(&self) -> impl Iterator<Item = &Arc<SubscriptionCatalog>> {
720        self.subscription_by_name.values()
721    }
722
723    pub fn iter_subscription_with_acl<'a>(
724        &'a self,
725        user: &'a UserCatalog,
726    ) -> impl Iterator<Item = &'a Arc<SubscriptionCatalog>> {
727        self.subscription_by_name
728            .values()
729            .filter(|s| has_access_to_object(user, s.id, s.owner.user_id))
730    }
731
732    pub fn iter_view(&self) -> impl Iterator<Item = &Arc<ViewCatalog>> {
733        self.view_by_name.values()
734    }
735
736    pub fn iter_view_with_acl<'a>(
737        &'a self,
738        user: &'a UserCatalog,
739    ) -> impl Iterator<Item = &'a Arc<ViewCatalog>> {
740        self.view_by_name
741            .values()
742            .filter(|v| v.is_system_view() || has_access_to_object(user, v.id, v.owner))
743    }
744
745    pub fn iter_function(&self) -> impl Iterator<Item = &Arc<FunctionCatalog>> {
746        self.function_by_name.values().flat_map(|v| v.values())
747    }
748
749    pub fn iter_function_with_acl<'a>(
750        &'a self,
751        user: &'a UserCatalog,
752    ) -> impl Iterator<Item = &'a Arc<FunctionCatalog>> {
753        self.function_by_name
754            .values()
755            .flat_map(|v| v.values())
756            .filter(|f| has_access_to_object(user, f.id, f.owner))
757    }
758
759    pub fn iter_connections(&self) -> impl Iterator<Item = &Arc<ConnectionCatalog>> {
760        self.connection_by_name.values()
761    }
762
763    pub fn iter_connections_with_acl<'a>(
764        &'a self,
765        user: &'a UserCatalog,
766    ) -> impl Iterator<Item = &'a Arc<ConnectionCatalog>> {
767        self.connection_by_name
768            .values()
769            .filter(|c| has_access_to_object(user, c.id, c.owner))
770    }
771
772    pub fn iter_secret(&self) -> impl Iterator<Item = &Arc<SecretCatalog>> {
773        self.secret_by_name.values()
774    }
775
776    pub fn iter_secret_with_acl<'a>(
777        &'a self,
778        user: &'a UserCatalog,
779    ) -> impl Iterator<Item = &'a Arc<SecretCatalog>> {
780        self.secret_by_name
781            .values()
782            .filter(|s| has_access_to_object(user, s.id, s.owner))
783    }
784
785    pub fn iter_system_tables(&self) -> impl Iterator<Item = &Arc<SystemTableCatalog>> {
786        self.system_table_by_name.values()
787    }
788
789    pub fn get_table_by_name(
790        &self,
791        table_name: &str,
792        bind_creating_relations: bool,
793    ) -> Option<&Arc<TableCatalog>> {
794        self.table_by_name
795            .get(table_name)
796            .filter(|&table| bind_creating_relations || table.is_created())
797    }
798
799    pub fn get_any_table_by_name(&self, table_name: &str) -> Option<&Arc<TableCatalog>> {
800        self.get_table_by_name(table_name, true)
801    }
802
803    pub fn get_created_table_by_name(&self, table_name: &str) -> Option<&Arc<TableCatalog>> {
804        self.get_table_by_name(table_name, false)
805    }
806
807    pub fn get_table_by_id(&self, table_id: TableId) -> Option<&Arc<TableCatalog>> {
808        self.table_by_id.get(&table_id)
809    }
810
811    pub fn get_created_table_by_id(&self, table_id: TableId) -> Option<&Arc<TableCatalog>> {
812        self.table_by_id
813            .get(&table_id)
814            .filter(|&table| table.stream_job_status == StreamJobStatus::Created)
815    }
816
817    pub fn get_view_by_name(&self, view_name: &str) -> Option<&Arc<ViewCatalog>> {
818        self.view_by_name.get(view_name)
819    }
820
821    pub fn get_view_by_id(&self, view_id: ViewId) -> Option<&Arc<ViewCatalog>> {
822        self.view_by_id.get(&view_id)
823    }
824
825    pub fn get_source_by_name(&self, source_name: &str) -> Option<&Arc<SourceCatalog>> {
826        self.source_by_name.get(source_name)
827    }
828
829    pub fn get_source_by_id(&self, source_id: SourceId) -> Option<&Arc<SourceCatalog>> {
830        self.source_by_id.get(&source_id)
831    }
832
833    pub fn get_sink_by_name(
834        &self,
835        sink_name: &str,
836        bind_creating: bool,
837    ) -> Option<&Arc<SinkCatalog>> {
838        self.sink_by_name
839            .get(sink_name)
840            .filter(|s| bind_creating || s.is_created())
841    }
842
843    pub fn get_any_sink_by_name(&self, sink_name: &str) -> Option<&Arc<SinkCatalog>> {
844        self.get_sink_by_name(sink_name, true)
845    }
846
847    pub fn get_created_sink_by_name(&self, sink_name: &str) -> Option<&Arc<SinkCatalog>> {
848        self.get_sink_by_name(sink_name, false)
849    }
850
851    pub fn get_sink_by_id(&self, sink_id: SinkId) -> Option<&Arc<SinkCatalog>> {
852        self.sink_by_id.get(&sink_id)
853    }
854
855    pub fn get_subscription_by_name(
856        &self,
857        subscription_name: &str,
858    ) -> Option<&Arc<SubscriptionCatalog>> {
859        self.subscription_by_name.get(subscription_name)
860    }
861
862    pub fn get_subscription_by_id(
863        &self,
864        subscription_id: SubscriptionId,
865    ) -> Option<&Arc<SubscriptionCatalog>> {
866        self.subscription_by_id.get(&subscription_id)
867    }
868
869    pub fn get_index_by_name(
870        &self,
871        index_name: &str,
872        bind_creating: bool,
873    ) -> Option<&Arc<IndexCatalog>> {
874        self.index_by_name
875            .get(index_name)
876            .filter(|i| bind_creating || i.is_created())
877    }
878
879    pub fn get_any_index_by_name(&self, index_name: &str) -> Option<&Arc<IndexCatalog>> {
880        self.get_index_by_name(index_name, true)
881    }
882
883    pub fn get_created_index_by_name(&self, index_name: &str) -> Option<&Arc<IndexCatalog>> {
884        self.get_index_by_name(index_name, false)
885    }
886
887    pub fn get_index_by_id(&self, index_id: IndexId) -> Option<&Arc<IndexCatalog>> {
888        self.index_by_id.get(&index_id)
889    }
890
891    pub fn get_indexes_by_table_id(
892        &self,
893        table_id: TableId,
894        include_creating: bool,
895    ) -> Vec<Arc<IndexCatalog>> {
896        self.indexes_by_table_id
897            .get(&table_id)
898            .cloned()
899            .unwrap_or_default()
900            .into_iter()
901            .filter(|i| include_creating || i.is_created())
902            .collect()
903    }
904
905    pub fn get_any_indexes_by_table_id(&self, table_id: TableId) -> Vec<Arc<IndexCatalog>> {
906        self.get_indexes_by_table_id(table_id, true)
907    }
908
909    pub fn get_created_indexes_by_table_id(&self, table_id: TableId) -> Vec<Arc<IndexCatalog>> {
910        self.get_indexes_by_table_id(table_id, false)
911    }
912
913    pub fn get_system_table_by_name(&self, table_name: &str) -> Option<&Arc<SystemTableCatalog>> {
914        self.system_table_by_name.get(table_name)
915    }
916
917    pub fn get_table_name_by_id(&self, table_id: TableId) -> Option<String> {
918        self.table_by_id
919            .get(&table_id)
920            .map(|table| table.name.clone())
921    }
922
923    pub fn get_function_by_id(&self, function_id: FunctionId) -> Option<&Arc<FunctionCatalog>> {
924        self.function_by_id.get(&function_id)
925    }
926
927    pub fn get_function_by_name_inputs(
928        &self,
929        name: &str,
930        inputs: &mut [ExprImpl],
931    ) -> Option<&Arc<FunctionCatalog>> {
932        infer_type_with_sigmap(
933            FuncName::Udf(name.to_owned()),
934            inputs,
935            &self.function_registry,
936        )
937        .ok()?;
938        let args = inputs.iter().map(|x| x.return_type()).collect_vec();
939        self.function_by_name.get(name)?.get(&args)
940    }
941
942    pub fn get_function_by_name_args(
943        &self,
944        name: &str,
945        args: &[DataType],
946    ) -> Option<&Arc<FunctionCatalog>> {
947        let args = args.iter().map(|x| Some(x.clone())).collect_vec();
948        let func = infer_type_name(
949            &self.function_registry,
950            FuncName::Udf(name.to_owned()),
951            &args,
952        )
953        .ok()?;
954
955        let args = func
956            .inputs_type
957            .iter()
958            .filter_map(|x| {
959                if let SigDataType::Exact(t) = x {
960                    Some(t.clone())
961                } else {
962                    None
963                }
964            })
965            .collect_vec();
966
967        self.function_by_name.get(name)?.get(&args)
968    }
969
970    pub fn get_functions_by_name(&self, name: &str) -> Option<Vec<&Arc<FunctionCatalog>>> {
971        let functions = self.function_by_name.get(name)?;
972        if functions.is_empty() {
973            return None;
974        }
975        Some(functions.values().collect())
976    }
977
978    pub fn get_connection_by_id(
979        &self,
980        connection_id: ConnectionId,
981    ) -> Option<&Arc<ConnectionCatalog>> {
982        self.connection_by_id.get(&connection_id)
983    }
984
985    pub fn get_connection_by_name(&self, connection_name: &str) -> Option<&Arc<ConnectionCatalog>> {
986        self.connection_by_name.get(connection_name)
987    }
988
989    pub fn get_secret_by_name(&self, secret_name: &str) -> Option<&Arc<SecretCatalog>> {
990        self.secret_by_name.get(secret_name)
991    }
992
993    pub fn get_secret_by_id(&self, secret_id: SecretId) -> Option<&Arc<SecretCatalog>> {
994        self.secret_by_id.get(&secret_id)
995    }
996
997    /// get all sources referencing the connection
998    pub fn get_source_ids_by_connection(
999        &self,
1000        connection_id: ConnectionId,
1001    ) -> Option<Vec<SourceId>> {
1002        self.connection_source_ref
1003            .get(&connection_id)
1004            .map(|c| c.to_owned())
1005    }
1006
1007    /// get all sinks referencing the connection
1008    pub fn get_sink_ids_by_connection(&self, connection_id: ConnectionId) -> Option<Vec<SinkId>> {
1009        self.connection_sink_ref
1010            .get(&connection_id)
1011            .map(|s| s.to_owned())
1012    }
1013
1014    pub fn get_grant_object_by_oid(&self, oid: ObjectId) -> Option<OwnedGrantObject> {
1015        #[allow(clippy::manual_map)]
1016        if let Some(table) = self.get_created_table_by_id(oid.as_table_id()) {
1017            Some(OwnedGrantObject {
1018                owner: table.owner,
1019                object: Object::TableId(oid.as_raw_id()),
1020            })
1021        } else if let Some(index) = self.get_index_by_id(oid.as_index_id()) {
1022            Some(OwnedGrantObject {
1023                owner: index.owner(),
1024                object: Object::TableId(oid.as_raw_id()),
1025            })
1026        } else if let Some(source) = self.get_source_by_id(oid.as_source_id()) {
1027            Some(OwnedGrantObject {
1028                owner: source.owner,
1029                object: Object::SourceId(oid.as_raw_id()),
1030            })
1031        } else if let Some(sink) = self.get_sink_by_id(oid.as_sink_id()) {
1032            Some(OwnedGrantObject {
1033                owner: sink.owner.user_id,
1034                object: Object::SinkId(oid.as_raw_id()),
1035            })
1036        } else if let Some(view) = self.get_view_by_id(oid.as_view_id()) {
1037            Some(OwnedGrantObject {
1038                owner: view.owner,
1039                object: Object::ViewId(oid.as_raw_id()),
1040            })
1041        } else if let Some(function) = self.get_function_by_id(oid.as_function_id()) {
1042            Some(OwnedGrantObject {
1043                owner: function.owner(),
1044                object: Object::FunctionId(oid.as_raw_id()),
1045            })
1046        } else if let Some(subscription) = self.get_subscription_by_id(oid.as_subscription_id()) {
1047            Some(OwnedGrantObject {
1048                owner: subscription.owner.user_id,
1049                object: Object::SubscriptionId(oid.as_raw_id()),
1050            })
1051        } else if let Some(connection) = self.get_connection_by_id(oid.as_connection_id()) {
1052            Some(OwnedGrantObject {
1053                owner: connection.owner,
1054                object: Object::ConnectionId(oid.as_raw_id()),
1055            })
1056        } else if let Some(secret) = self.get_secret_by_id(oid.as_secret_id()) {
1057            Some(OwnedGrantObject {
1058                owner: secret.owner,
1059                object: Object::SecretId(oid.as_raw_id()),
1060            })
1061        } else {
1062            None
1063        }
1064    }
1065
1066    pub fn contains_object(&self, oid: ObjectId) -> bool {
1067        self.table_by_id.contains_key(&oid.as_table_id())
1068            || self.index_by_id.contains_key(&oid.as_index_id())
1069            || self.source_by_id.contains_key(&oid.as_source_id())
1070            || self.sink_by_id.contains_key(&oid.as_sink_id())
1071            || self.view_by_id.contains_key(&oid.as_view_id())
1072            || self.function_by_id.contains_key(&oid.as_function_id())
1073            || self
1074                .subscription_by_id
1075                .contains_key(&oid.as_subscription_id())
1076            || self.connection_by_id.contains_key(&oid.as_connection_id())
1077    }
1078
1079    pub fn id(&self) -> SchemaId {
1080        self.id
1081    }
1082
1083    pub fn database_id(&self) -> DatabaseId {
1084        self.database_id
1085    }
1086
1087    pub fn name(&self) -> String {
1088        self.name.clone()
1089    }
1090}
1091
1092impl OwnedByUserCatalog for SchemaCatalog {
1093    fn owner(&self) -> UserId {
1094        self.owner
1095    }
1096}
1097
1098impl From<&PbSchema> for SchemaCatalog {
1099    fn from(schema: &PbSchema) -> Self {
1100        Self {
1101            id: schema.id,
1102            owner: schema.owner,
1103            name: schema.name.clone(),
1104            database_id: schema.database_id,
1105            table_by_name: HashMap::new(),
1106            table_by_id: HashMap::new(),
1107            source_by_name: HashMap::new(),
1108            source_by_id: HashMap::new(),
1109            sink_by_name: HashMap::new(),
1110            sink_by_id: HashMap::new(),
1111            table_incoming_sinks: HashMap::new(),
1112            index_by_name: HashMap::new(),
1113            index_by_id: HashMap::new(),
1114            indexes_by_table_id: HashMap::new(),
1115            system_table_by_name: HashMap::new(),
1116            view_by_name: HashMap::new(),
1117            view_by_id: HashMap::new(),
1118            function_registry: FunctionRegistry::default(),
1119            function_by_name: HashMap::new(),
1120            function_by_id: HashMap::new(),
1121            connection_by_name: HashMap::new(),
1122            connection_by_id: HashMap::new(),
1123            secret_by_name: HashMap::new(),
1124            secret_by_id: HashMap::new(),
1125            _secret_source_ref: HashMap::new(),
1126            _secret_sink_ref: HashMap::new(),
1127            connection_source_ref: HashMap::new(),
1128            connection_sink_ref: HashMap::new(),
1129            subscription_by_name: HashMap::new(),
1130            subscription_by_id: HashMap::new(),
1131        }
1132    }
1133}