risingwave_frontend/catalog/
schema_catalog.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry::{Occupied, Vacant};
16use std::collections::{HashMap, HashSet};
17use std::sync::Arc;
18
19use itertools::Itertools;
20use risingwave_common::catalog::{FunctionId, IndexId, StreamJobStatus, TableId};
21use risingwave_common::types::DataType;
22use risingwave_connector::sink::catalog::SinkCatalog;
23pub use risingwave_expr::sig::*;
24use risingwave_pb::catalog::{
25    PbConnection, PbFunction, PbIndex, PbSchema, PbSecret, PbSink, PbSource, PbSubscription,
26    PbTable, PbView,
27};
28use risingwave_pb::user::grant_privilege::Object;
29
30use super::subscription_catalog::SubscriptionCatalog;
31use super::{OwnedByUserCatalog, OwnedGrantObject, SubscriptionId};
32use crate::catalog::connection_catalog::ConnectionCatalog;
33use crate::catalog::function_catalog::FunctionCatalog;
34use crate::catalog::index_catalog::IndexCatalog;
35use crate::catalog::secret_catalog::SecretCatalog;
36use crate::catalog::source_catalog::SourceCatalog;
37use crate::catalog::system_catalog::SystemTableCatalog;
38use crate::catalog::table_catalog::TableCatalog;
39use crate::catalog::view_catalog::ViewCatalog;
40use crate::catalog::{ConnectionId, DatabaseId, SchemaId, SecretId, SinkId, SourceId, ViewId};
41use crate::expr::{Expr, ExprImpl, infer_type_name, infer_type_with_sigmap};
42use crate::user::user_catalog::UserCatalog;
43use crate::user::{UserId, has_access_to_object};
44
45#[derive(Clone, Debug)]
46pub struct SchemaCatalog {
47    id: SchemaId,
48    pub name: String,
49    pub database_id: DatabaseId,
50    /// Contains [all types of "tables"](super::table_catalog::TableType), not only user tables.
51    table_by_name: HashMap<String, Arc<TableCatalog>>,
52    /// Contains [all types of "tables"](super::table_catalog::TableType), not only user tables.
53    table_by_id: HashMap<TableId, Arc<TableCatalog>>,
54    source_by_name: HashMap<String, Arc<SourceCatalog>>,
55    source_by_id: HashMap<SourceId, Arc<SourceCatalog>>,
56    sink_by_name: HashMap<String, Arc<SinkCatalog>>,
57    sink_by_id: HashMap<SinkId, Arc<SinkCatalog>>,
58    /// reverted index of (`sink.target_table` -> `sink_id`s)
59    table_incoming_sinks: HashMap<TableId, HashSet<SinkId>>,
60    subscription_by_name: HashMap<String, Arc<SubscriptionCatalog>>,
61    subscription_by_id: HashMap<SubscriptionId, Arc<SubscriptionCatalog>>,
62    index_by_name: HashMap<String, Arc<IndexCatalog>>,
63    index_by_id: HashMap<IndexId, Arc<IndexCatalog>>,
64    indexes_by_table_id: HashMap<TableId, Vec<Arc<IndexCatalog>>>,
65    view_by_name: HashMap<String, Arc<ViewCatalog>>,
66    view_by_id: HashMap<ViewId, Arc<ViewCatalog>>,
67    function_registry: FunctionRegistry,
68    function_by_name: HashMap<String, HashMap<Vec<DataType>, Arc<FunctionCatalog>>>,
69    function_by_id: HashMap<FunctionId, Arc<FunctionCatalog>>,
70    connection_by_name: HashMap<String, Arc<ConnectionCatalog>>,
71    connection_by_id: HashMap<ConnectionId, Arc<ConnectionCatalog>>,
72    secret_by_name: HashMap<String, Arc<SecretCatalog>>,
73    secret_by_id: HashMap<SecretId, Arc<SecretCatalog>>,
74
75    _secret_source_ref: HashMap<SecretId, Vec<SourceId>>,
76    _secret_sink_ref: HashMap<SecretId, Vec<SinkId>>,
77
78    // This field is currently used only for `show connections`
79    connection_source_ref: HashMap<ConnectionId, Vec<SourceId>>,
80    // This field is currently used only for `show connections`
81    connection_sink_ref: HashMap<ConnectionId, Vec<SinkId>>,
82    // This field only available when schema is "pg_catalog". Meanwhile, others will be empty.
83    system_table_by_name: HashMap<String, Arc<SystemTableCatalog>>,
84    pub owner: u32,
85}
86
87impl SchemaCatalog {
88    pub fn create_table(&mut self, prost: &PbTable) -> Arc<TableCatalog> {
89        let name = prost.name.clone();
90        let id = prost.id.into();
91        let table: TableCatalog = prost.into();
92        let table_ref = Arc::new(table);
93
94        self.table_by_name
95            .try_insert(name, table_ref.clone())
96            .unwrap();
97        self.table_by_id.try_insert(id, table_ref.clone()).unwrap();
98        table_ref
99    }
100
101    pub fn create_sys_table(&mut self, sys_table: Arc<SystemTableCatalog>) {
102        self.system_table_by_name
103            .try_insert(sys_table.name.clone(), sys_table)
104            .unwrap();
105    }
106
107    pub fn create_sys_view(&mut self, sys_view: Arc<ViewCatalog>) {
108        self.view_by_name
109            .try_insert(sys_view.name().to_owned(), sys_view.clone())
110            .unwrap();
111        self.view_by_id
112            .try_insert(sys_view.id, sys_view.clone())
113            .unwrap();
114    }
115
116    pub fn update_table(&mut self, prost: &PbTable) -> Arc<TableCatalog> {
117        let name = prost.name.clone();
118        let id = prost.id.into();
119        let table: TableCatalog = prost.into();
120        let table_ref = Arc::new(table);
121
122        let old_table = self.table_by_id.get(&id).unwrap();
123        // check if the table name gets updated.
124        if old_table.name() != name
125            && let Some(t) = self.table_by_name.get(old_table.name())
126            && t.id == id
127        {
128            self.table_by_name.remove(old_table.name());
129        }
130
131        self.table_by_name.insert(name, table_ref.clone());
132        self.table_by_id.insert(id, table_ref.clone());
133        table_ref
134    }
135
136    pub fn update_index(&mut self, prost: &PbIndex) {
137        let name = prost.name.clone();
138        let id = prost.id.into();
139        let old_index = self.index_by_id.get(&id).unwrap();
140        let index_table = self
141            .get_created_table_by_id(&prost.index_table_id.into())
142            .unwrap();
143        let primary_table = self
144            .get_created_table_by_id(&prost.primary_table_id.into())
145            .unwrap();
146        let index: IndexCatalog = IndexCatalog::build_from(prost, index_table, primary_table);
147        let index_ref = Arc::new(index);
148
149        // check if the index name gets updated.
150        if old_index.name != name
151            && let Some(idx) = self.index_by_name.get(&old_index.name)
152            && idx.id == id
153        {
154            self.index_by_name.remove(&old_index.name);
155        }
156        self.index_by_name.insert(name, index_ref.clone());
157        self.index_by_id.insert(id, index_ref.clone());
158
159        match self.indexes_by_table_id.entry(index_ref.primary_table.id) {
160            Occupied(mut entry) => {
161                let pos = entry
162                    .get()
163                    .iter()
164                    .position(|x| x.id == index_ref.id)
165                    .unwrap();
166                *entry.get_mut().get_mut(pos).unwrap() = index_ref;
167            }
168            Vacant(_entry) => {
169                unreachable!()
170            }
171        };
172    }
173
174    pub fn drop_table(&mut self, id: TableId) {
175        if let Some(table_ref) = self.table_by_id.remove(&id) {
176            self.table_by_name.remove(&table_ref.name).unwrap();
177            self.indexes_by_table_id.remove(&table_ref.id);
178        } else {
179            tracing::warn!(
180                id = ?id.table_id,
181                "table not found when dropping, frontend might not be notified yet"
182            );
183        }
184    }
185
186    pub fn create_index(&mut self, prost: &PbIndex) {
187        let name = prost.name.clone();
188        let id = prost.id.into();
189        let index_table = self.get_table_by_id(&prost.index_table_id.into()).unwrap();
190        let primary_table = self
191            .get_created_table_by_id(&prost.primary_table_id.into())
192            .unwrap();
193        let index: IndexCatalog = IndexCatalog::build_from(prost, index_table, primary_table);
194        let index_ref = Arc::new(index);
195
196        self.index_by_name
197            .try_insert(name, index_ref.clone())
198            .unwrap();
199        self.index_by_id.try_insert(id, index_ref.clone()).unwrap();
200        match self.indexes_by_table_id.entry(index_ref.primary_table.id) {
201            Occupied(mut entry) => {
202                entry.get_mut().push(index_ref);
203            }
204            Vacant(entry) => {
205                entry.insert(vec![index_ref]);
206            }
207        };
208    }
209
210    pub fn drop_index(&mut self, id: IndexId) {
211        let index_ref = self.index_by_id.remove(&id).unwrap();
212        self.index_by_name.remove(&index_ref.name).unwrap();
213        match self.indexes_by_table_id.entry(index_ref.primary_table.id) {
214            Occupied(mut entry) => {
215                let pos = entry
216                    .get_mut()
217                    .iter()
218                    .position(|x| x.id == index_ref.id)
219                    .unwrap();
220                entry.get_mut().remove(pos);
221            }
222            Vacant(_entry) => (),
223        };
224    }
225
226    pub fn create_source(&mut self, prost: &PbSource) {
227        let name = prost.name.clone();
228        let id = prost.id;
229        let source = SourceCatalog::from(prost);
230        let source_ref = Arc::new(source);
231
232        if let Some(connection_id) = source_ref.connection_id {
233            self.connection_source_ref
234                .entry(connection_id)
235                .and_modify(|sources| sources.push(source_ref.id))
236                .or_insert(vec![source_ref.id]);
237        }
238
239        self.source_by_name
240            .try_insert(name, source_ref.clone())
241            .unwrap();
242        self.source_by_id.try_insert(id, source_ref).unwrap();
243    }
244
245    pub fn drop_source(&mut self, id: SourceId) {
246        let source_ref = self.source_by_id.remove(&id).unwrap();
247        self.source_by_name.remove(&source_ref.name).unwrap();
248        if let Some(connection_id) = source_ref.connection_id
249            && let Occupied(mut e) = self.connection_source_ref.entry(connection_id)
250        {
251            let source_ids = e.get_mut();
252            source_ids.retain_mut(|sid| *sid != id);
253            if source_ids.is_empty() {
254                e.remove_entry();
255            }
256        }
257    }
258
259    pub fn update_source(&mut self, prost: &PbSource) {
260        let name = prost.name.clone();
261        let id = prost.id;
262        let source = SourceCatalog::from(prost);
263        let source_ref = Arc::new(source);
264
265        let old_source = self.source_by_id.get(&id).unwrap();
266        // check if the source name gets updated.
267        if old_source.name != name
268            && let Some(src) = self.source_by_name.get(&old_source.name)
269            && src.id == id
270        {
271            self.source_by_name.remove(&old_source.name);
272        }
273
274        self.source_by_name.insert(name, source_ref.clone());
275        self.source_by_id.insert(id, source_ref);
276    }
277
278    pub fn create_sink(&mut self, prost: &PbSink) {
279        let name = prost.name.clone();
280        let id = prost.id;
281        let sink = SinkCatalog::from(prost);
282        let sink_ref = Arc::new(sink);
283
284        if let Some(connection_id) = sink_ref.connection_id {
285            self.connection_sink_ref
286                .entry(connection_id.0)
287                .and_modify(|sinks| sinks.push(id))
288                .or_insert(vec![id]);
289        }
290
291        if let Some(target_table) = sink_ref.target_table {
292            assert!(
293                self.table_incoming_sinks
294                    .entry(target_table)
295                    .or_default()
296                    .insert(sink_ref.id.sink_id)
297            );
298        }
299
300        self.sink_by_name
301            .try_insert(name, sink_ref.clone())
302            .unwrap();
303        self.sink_by_id.try_insert(id, sink_ref).unwrap();
304    }
305
306    pub fn drop_sink(&mut self, id: SinkId) {
307        if let Some(sink_ref) = self.sink_by_id.remove(&id) {
308            self.sink_by_name.remove(&sink_ref.name).unwrap();
309            if let Some(connection_id) = sink_ref.connection_id
310                && let Occupied(mut e) = self.connection_sink_ref.entry(connection_id.0)
311            {
312                let sink_ids = e.get_mut();
313                sink_ids.retain_mut(|sid| *sid != id);
314                if sink_ids.is_empty() {
315                    e.remove_entry();
316                }
317            }
318            if let Some(target_table) = sink_ref.target_table {
319                let incoming_sinks = self
320                    .table_incoming_sinks
321                    .get_mut(&target_table)
322                    .expect("should exists");
323                assert!(incoming_sinks.remove(&sink_ref.id.sink_id));
324                if incoming_sinks.is_empty() {
325                    self.table_incoming_sinks.remove(&target_table);
326                }
327            }
328        } else {
329            tracing::warn!(
330                id,
331                "sink not found when dropping, frontend might not be notified yet"
332            );
333        }
334    }
335
336    pub fn update_sink(&mut self, prost: &PbSink) {
337        let name = prost.name.clone();
338        let id = prost.id;
339        let sink = SinkCatalog::from(prost);
340        let sink_ref = Arc::new(sink);
341
342        let old_sink = self.sink_by_id.get(&id).unwrap();
343        assert_eq!(sink_ref.target_table, old_sink.target_table);
344        // check if the sink name gets updated.
345        if old_sink.name != name
346            && let Some(s) = self.sink_by_name.get(&old_sink.name)
347            && s.id.sink_id == id
348        {
349            self.sink_by_name.remove(&old_sink.name);
350        }
351
352        self.sink_by_name.insert(name, sink_ref.clone());
353        self.sink_by_id.insert(id, sink_ref);
354    }
355
356    pub fn table_incoming_sinks(&self, table_id: TableId) -> Option<&HashSet<SinkId>> {
357        self.table_incoming_sinks.get(&table_id)
358    }
359
360    pub fn create_subscription(&mut self, prost: &PbSubscription) {
361        let name = prost.name.clone();
362        let id = prost.id;
363        let subscription_catalog = SubscriptionCatalog::from(prost);
364        let subscription_ref = Arc::new(subscription_catalog);
365
366        self.subscription_by_name
367            .try_insert(name, subscription_ref.clone())
368            .unwrap();
369        self.subscription_by_id
370            .try_insert(id, subscription_ref)
371            .unwrap();
372    }
373
374    pub fn drop_subscription(&mut self, id: SubscriptionId) {
375        let subscription_ref = self.subscription_by_id.remove(&id);
376        if let Some(subscription_ref) = subscription_ref {
377            self.subscription_by_name.remove(&subscription_ref.name);
378        }
379    }
380
381    pub fn update_subscription(&mut self, prost: &PbSubscription) {
382        let name = prost.name.clone();
383        let id = prost.id;
384        let subscription = SubscriptionCatalog::from(prost);
385        let subscription_ref = Arc::new(subscription);
386
387        let old_subscription = self.subscription_by_id.get(&id).unwrap();
388        // check if the subscription name gets updated.
389        if old_subscription.name != name
390            && let Some(s) = self.subscription_by_name.get(&old_subscription.name)
391            && s.id.subscription_id == id
392        {
393            self.subscription_by_name.remove(&old_subscription.name);
394        }
395
396        self.subscription_by_name
397            .insert(name, subscription_ref.clone());
398        self.subscription_by_id.insert(id, subscription_ref);
399    }
400
401    pub fn create_view(&mut self, prost: &PbView) {
402        let name = prost.name.clone();
403        let id = prost.id;
404        let view = ViewCatalog::from(prost);
405        let view_ref = Arc::new(view);
406
407        self.view_by_name
408            .try_insert(name, view_ref.clone())
409            .unwrap();
410        self.view_by_id.try_insert(id, view_ref).unwrap();
411    }
412
413    pub fn drop_view(&mut self, id: ViewId) {
414        let view_ref = self.view_by_id.remove(&id).unwrap();
415        self.view_by_name.remove(&view_ref.name).unwrap();
416    }
417
418    pub fn update_view(&mut self, prost: &PbView) {
419        let name = prost.name.clone();
420        let id = prost.id;
421        let view = ViewCatalog::from(prost);
422        let view_ref = Arc::new(view);
423
424        let old_view = self.view_by_id.get(&id).unwrap();
425        // check if the view name gets updated.
426        if old_view.name != name
427            && let Some(v) = self.view_by_name.get(old_view.name())
428            && v.id == id
429        {
430            self.view_by_name.remove(&old_view.name);
431        }
432
433        self.view_by_name.insert(name, view_ref.clone());
434        self.view_by_id.insert(id, view_ref);
435    }
436
437    pub fn get_func_sign(func: &FunctionCatalog) -> FuncSign {
438        FuncSign {
439            name: FuncName::Udf(func.name.clone()),
440            inputs_type: func
441                .arg_types
442                .iter()
443                .map(|t| t.clone().into())
444                .collect_vec(),
445            variadic: false,
446            ret_type: func.return_type.clone().into(),
447            build: FuncBuilder::Udf,
448            // dummy type infer, will not use this result
449            type_infer: |_| Ok(DataType::Boolean),
450            deprecated: false,
451        }
452    }
453
454    pub fn create_function(&mut self, prost: &PbFunction) {
455        let name = prost.name.clone();
456        let id = prost.id;
457        let function = FunctionCatalog::from(prost);
458        let args = function.arg_types.clone();
459        let function_ref = Arc::new(function);
460
461        self.function_registry
462            .insert(Self::get_func_sign(&function_ref));
463        self.function_by_name
464            .entry(name)
465            .or_default()
466            .try_insert(args, function_ref.clone())
467            .expect("function already exists with same argument types");
468        self.function_by_id
469            .try_insert(id.into(), function_ref)
470            .expect("function id exists");
471    }
472
473    pub fn drop_function(&mut self, id: FunctionId) {
474        let function_ref = self
475            .function_by_id
476            .remove(&id)
477            .expect("function not found by id");
478
479        self.function_registry
480            .remove(Self::get_func_sign(&function_ref))
481            .expect("function not found in registry");
482
483        self.function_by_name
484            .get_mut(&function_ref.name)
485            .expect("function not found by name")
486            .remove(&function_ref.arg_types)
487            .expect("function not found by argument types");
488    }
489
490    pub fn update_function(&mut self, prost: &PbFunction) {
491        let name = prost.name.clone();
492        let id = prost.id.into();
493        let function = FunctionCatalog::from(prost);
494        let function_ref = Arc::new(function);
495
496        let old_function_by_id = self.function_by_id.get(&id).unwrap();
497        let old_function_by_name = self
498            .function_by_name
499            .get_mut(&old_function_by_id.name)
500            .unwrap();
501        // check if the function name gets updated.
502        if old_function_by_id.name != name
503            && let Some(f) = old_function_by_name.get(&old_function_by_id.arg_types)
504            && f.id == id
505        {
506            old_function_by_name.remove(&old_function_by_id.arg_types);
507            if old_function_by_name.is_empty() {
508                self.function_by_name.remove(&old_function_by_id.name);
509            }
510        }
511
512        self.function_by_name
513            .entry(name)
514            .or_default()
515            .insert(old_function_by_id.arg_types.clone(), function_ref.clone());
516        self.function_by_id.insert(id, function_ref);
517    }
518
519    pub fn create_connection(&mut self, prost: &PbConnection) {
520        let name = prost.name.clone();
521        let id = prost.id;
522        let connection = ConnectionCatalog::from(prost);
523        let connection_ref = Arc::new(connection);
524        self.connection_by_name
525            .try_insert(name, connection_ref.clone())
526            .unwrap();
527        self.connection_by_id
528            .try_insert(id, connection_ref)
529            .unwrap();
530    }
531
532    pub fn update_connection(&mut self, prost: &PbConnection) {
533        let name = prost.name.clone();
534        let id = prost.id;
535        let connection = ConnectionCatalog::from(prost);
536        let connection_ref = Arc::new(connection);
537
538        let old_connection = self.connection_by_id.get(&id).unwrap();
539        // check if the connection name gets updated.
540        if old_connection.name != name
541            && let Some(conn) = self.connection_by_name.get(&old_connection.name)
542            && conn.id == id
543        {
544            self.connection_by_name.remove(&old_connection.name);
545        }
546
547        self.connection_by_name.insert(name, connection_ref.clone());
548        self.connection_by_id.insert(id, connection_ref);
549    }
550
551    pub fn drop_connection(&mut self, connection_id: ConnectionId) {
552        let connection_ref = self
553            .connection_by_id
554            .remove(&connection_id)
555            .expect("connection not found by id");
556        self.connection_by_name
557            .remove(&connection_ref.name)
558            .expect("connection not found by name");
559    }
560
561    pub fn create_secret(&mut self, prost: &PbSecret) {
562        let name = prost.name.clone();
563        let id = SecretId::new(prost.id);
564        let secret = SecretCatalog::from(prost);
565        let secret_ref = Arc::new(secret);
566
567        self.secret_by_id
568            .try_insert(id, secret_ref.clone())
569            .unwrap();
570        self.secret_by_name
571            .try_insert(name, secret_ref.clone())
572            .unwrap();
573    }
574
575    pub fn update_secret(&mut self, prost: &PbSecret) {
576        let name = prost.name.clone();
577        let id = SecretId::new(prost.id);
578        let secret = SecretCatalog::from(prost);
579        let secret_ref = Arc::new(secret);
580
581        let old_secret = self.secret_by_id.get(&id).unwrap();
582        // check if the secret name gets updated.
583        if old_secret.name != name
584            && let Some(s) = self.secret_by_name.get(&old_secret.name)
585            && s.id == id
586        {
587            self.secret_by_name.remove(&old_secret.name);
588        }
589
590        self.secret_by_name.insert(name, secret_ref.clone());
591        self.secret_by_id.insert(id, secret_ref);
592    }
593
594    pub fn drop_secret(&mut self, secret_id: SecretId) {
595        let secret_ref = self
596            .secret_by_id
597            .remove(&secret_id)
598            .expect("secret not found by id");
599        self.secret_by_name
600            .remove(&secret_ref.name)
601            .expect("secret not found by name");
602    }
603
604    pub fn iter_all(&self) -> impl Iterator<Item = &Arc<TableCatalog>> {
605        self.table_by_name.values()
606    }
607
608    pub fn iter_user_table(&self) -> impl Iterator<Item = &Arc<TableCatalog>> {
609        self.table_by_name.values().filter(|v| v.is_user_table())
610    }
611
612    pub fn iter_user_table_with_acl<'a>(
613        &'a self,
614        user: &'a UserCatalog,
615    ) -> impl Iterator<Item = &'a Arc<TableCatalog>> {
616        self.table_by_name.values().filter(|v| {
617            v.is_user_table() && has_access_to_object(user, &self.name, v.id.table_id, v.owner)
618        })
619    }
620
621    pub fn iter_internal_table(&self) -> impl Iterator<Item = &Arc<TableCatalog>> {
622        self.table_by_name
623            .values()
624            .filter(|v| v.is_internal_table())
625    }
626
627    pub fn iter_internal_table_with_acl<'a>(
628        &'a self,
629        user: &'a UserCatalog,
630    ) -> impl Iterator<Item = &'a Arc<TableCatalog>> {
631        self.table_by_name.values().filter(|v| {
632            v.is_internal_table() && has_access_to_object(user, &self.name, v.id.table_id, v.owner)
633        })
634    }
635
636    /// Iterate all non-internal tables, including user tables, materialized views and indices.
637    pub fn iter_table_mv_indices(&self) -> impl Iterator<Item = &Arc<TableCatalog>> {
638        self.table_by_name
639            .values()
640            .filter(|v| !v.is_internal_table())
641    }
642
643    pub fn iter_table_mv_indices_with_acl<'a>(
644        &'a self,
645        user: &'a UserCatalog,
646    ) -> impl Iterator<Item = &'a Arc<TableCatalog>> {
647        self.table_by_name.values().filter(|v| {
648            !v.is_internal_table() && has_access_to_object(user, &self.name, v.id.table_id, v.owner)
649        })
650    }
651
652    /// Iterate all materialized views, excluding the indices.
653    pub fn iter_all_mvs(&self) -> impl Iterator<Item = &Arc<TableCatalog>> {
654        self.table_by_name.values().filter(|v| v.is_mview())
655    }
656
657    pub fn iter_all_mvs_with_acl<'a>(
658        &'a self,
659        user: &'a UserCatalog,
660    ) -> impl Iterator<Item = &'a Arc<TableCatalog>> {
661        self.table_by_name.values().filter(|v| {
662            v.is_mview() && has_access_to_object(user, &self.name, v.id.table_id, v.owner)
663        })
664    }
665
666    /// Iterate created materialized views, excluding the indices.
667    pub fn iter_created_mvs(&self) -> impl Iterator<Item = &Arc<TableCatalog>> {
668        self.table_by_name
669            .values()
670            .filter(|v| v.is_mview() && v.is_created())
671    }
672
673    pub fn iter_created_mvs_with_acl<'a>(
674        &'a self,
675        user: &'a UserCatalog,
676    ) -> impl Iterator<Item = &'a Arc<TableCatalog>> {
677        self.table_by_name.values().filter(|v| {
678            v.is_mview()
679                && v.is_created()
680                && has_access_to_object(user, &self.name, v.id.table_id, v.owner)
681        })
682    }
683
684    /// Iterate all indices
685    pub fn iter_index(&self) -> impl Iterator<Item = &Arc<IndexCatalog>> {
686        self.index_by_name.values()
687    }
688
689    pub fn iter_index_with_acl<'a>(
690        &'a self,
691        user: &'a UserCatalog,
692    ) -> impl Iterator<Item = &'a Arc<IndexCatalog>> {
693        self.index_by_name
694            .values()
695            .filter(|idx| has_access_to_object(user, &self.name, idx.id.index_id, idx.owner()))
696    }
697
698    /// Iterate all sources
699    pub fn iter_source(&self) -> impl Iterator<Item = &Arc<SourceCatalog>> {
700        self.source_by_name.values()
701    }
702
703    pub fn iter_source_with_acl<'a>(
704        &'a self,
705        user: &'a UserCatalog,
706    ) -> impl Iterator<Item = &'a Arc<SourceCatalog>> {
707        self.source_by_name
708            .values()
709            .filter(|s| has_access_to_object(user, &self.name, s.id, s.owner))
710    }
711
712    pub fn iter_sink(&self) -> impl Iterator<Item = &Arc<SinkCatalog>> {
713        self.sink_by_name.values()
714    }
715
716    pub fn iter_sink_with_acl<'a>(
717        &'a self,
718        user: &'a UserCatalog,
719    ) -> impl Iterator<Item = &'a Arc<SinkCatalog>> {
720        self.sink_by_name
721            .values()
722            .filter(|s| has_access_to_object(user, &self.name, s.id.sink_id, s.owner.user_id))
723    }
724
725    pub fn iter_subscription(&self) -> impl Iterator<Item = &Arc<SubscriptionCatalog>> {
726        self.subscription_by_name.values()
727    }
728
729    pub fn iter_subscription_with_acl<'a>(
730        &'a self,
731        user: &'a UserCatalog,
732    ) -> impl Iterator<Item = &'a Arc<SubscriptionCatalog>> {
733        self.subscription_by_name.values().filter(|s| {
734            has_access_to_object(user, &self.name, s.id.subscription_id, s.owner.user_id)
735        })
736    }
737
738    pub fn iter_view(&self) -> impl Iterator<Item = &Arc<ViewCatalog>> {
739        self.view_by_name.values()
740    }
741
742    pub fn iter_view_with_acl<'a>(
743        &'a self,
744        user: &'a UserCatalog,
745    ) -> impl Iterator<Item = &'a Arc<ViewCatalog>> {
746        self.view_by_name
747            .values()
748            .filter(|v| v.is_system_view() || has_access_to_object(user, &self.name, v.id, v.owner))
749    }
750
751    pub fn iter_function(&self) -> impl Iterator<Item = &Arc<FunctionCatalog>> {
752        self.function_by_name.values().flat_map(|v| v.values())
753    }
754
755    pub fn iter_connections(&self) -> impl Iterator<Item = &Arc<ConnectionCatalog>> {
756        self.connection_by_name.values()
757    }
758
759    pub fn iter_secret(&self) -> impl Iterator<Item = &Arc<SecretCatalog>> {
760        self.secret_by_name.values()
761    }
762
763    pub fn iter_system_tables(&self) -> impl Iterator<Item = &Arc<SystemTableCatalog>> {
764        self.system_table_by_name.values()
765    }
766
767    pub fn get_table_by_name(
768        &self,
769        table_name: &str,
770        bind_creating_relations: bool,
771    ) -> Option<&Arc<TableCatalog>> {
772        self.table_by_name
773            .get(table_name)
774            .filter(|&table| bind_creating_relations || table.is_created())
775    }
776
777    pub fn get_any_table_by_name(&self, table_name: &str) -> Option<&Arc<TableCatalog>> {
778        self.get_table_by_name(table_name, true)
779    }
780
781    pub fn get_created_table_by_name(&self, table_name: &str) -> Option<&Arc<TableCatalog>> {
782        self.get_table_by_name(table_name, false)
783    }
784
785    pub fn get_table_by_id(&self, table_id: &TableId) -> Option<&Arc<TableCatalog>> {
786        self.table_by_id.get(table_id)
787    }
788
789    pub fn get_created_table_by_id(&self, table_id: &TableId) -> Option<&Arc<TableCatalog>> {
790        self.table_by_id
791            .get(table_id)
792            .filter(|&table| table.stream_job_status == StreamJobStatus::Created)
793    }
794
795    pub fn get_view_by_name(&self, view_name: &str) -> Option<&Arc<ViewCatalog>> {
796        self.view_by_name.get(view_name)
797    }
798
799    pub fn get_view_by_id(&self, view_id: &ViewId) -> Option<&Arc<ViewCatalog>> {
800        self.view_by_id.get(view_id)
801    }
802
803    pub fn get_source_by_name(&self, source_name: &str) -> Option<&Arc<SourceCatalog>> {
804        self.source_by_name.get(source_name)
805    }
806
807    pub fn get_source_by_id(&self, source_id: &SourceId) -> Option<&Arc<SourceCatalog>> {
808        self.source_by_id.get(source_id)
809    }
810
811    pub fn get_sink_by_name(
812        &self,
813        sink_name: &str,
814        bind_creating: bool,
815    ) -> Option<&Arc<SinkCatalog>> {
816        self.sink_by_name
817            .get(sink_name)
818            .filter(|s| bind_creating || s.is_created())
819    }
820
821    pub fn get_any_sink_by_name(&self, sink_name: &str) -> Option<&Arc<SinkCatalog>> {
822        self.get_sink_by_name(sink_name, true)
823    }
824
825    pub fn get_created_sink_by_name(&self, sink_name: &str) -> Option<&Arc<SinkCatalog>> {
826        self.get_sink_by_name(sink_name, false)
827    }
828
829    pub fn get_sink_by_id(&self, sink_id: &SinkId) -> Option<&Arc<SinkCatalog>> {
830        self.sink_by_id.get(sink_id)
831    }
832
833    pub fn get_subscription_by_name(
834        &self,
835        subscription_name: &str,
836    ) -> Option<&Arc<SubscriptionCatalog>> {
837        self.subscription_by_name.get(subscription_name)
838    }
839
840    pub fn get_subscription_by_id(
841        &self,
842        subscription_id: &SubscriptionId,
843    ) -> Option<&Arc<SubscriptionCatalog>> {
844        self.subscription_by_id.get(subscription_id)
845    }
846
847    pub fn get_index_by_name(
848        &self,
849        index_name: &str,
850        bind_creating: bool,
851    ) -> Option<&Arc<IndexCatalog>> {
852        self.index_by_name
853            .get(index_name)
854            .filter(|i| bind_creating || i.is_created())
855    }
856
857    pub fn get_any_index_by_name(&self, index_name: &str) -> Option<&Arc<IndexCatalog>> {
858        self.get_index_by_name(index_name, true)
859    }
860
861    pub fn get_created_index_by_name(&self, index_name: &str) -> Option<&Arc<IndexCatalog>> {
862        self.get_index_by_name(index_name, false)
863    }
864
865    pub fn get_index_by_id(&self, index_id: &IndexId) -> Option<&Arc<IndexCatalog>> {
866        self.index_by_id.get(index_id)
867    }
868
869    pub fn get_indexes_by_table_id(
870        &self,
871        table_id: &TableId,
872        include_creating: bool,
873    ) -> Vec<Arc<IndexCatalog>> {
874        self.indexes_by_table_id
875            .get(table_id)
876            .cloned()
877            .unwrap_or_default()
878            .into_iter()
879            .filter(|i| include_creating || i.is_created())
880            .collect()
881    }
882
883    pub fn get_any_indexes_by_table_id(&self, table_id: &TableId) -> Vec<Arc<IndexCatalog>> {
884        self.get_indexes_by_table_id(table_id, true)
885    }
886
887    pub fn get_created_indexes_by_table_id(&self, table_id: &TableId) -> Vec<Arc<IndexCatalog>> {
888        self.get_indexes_by_table_id(table_id, false)
889    }
890
891    pub fn get_system_table_by_name(&self, table_name: &str) -> Option<&Arc<SystemTableCatalog>> {
892        self.system_table_by_name.get(table_name)
893    }
894
895    pub fn get_table_name_by_id(&self, table_id: TableId) -> Option<String> {
896        self.table_by_id
897            .get(&table_id)
898            .map(|table| table.name.clone())
899    }
900
901    pub fn get_function_by_id(&self, function_id: FunctionId) -> Option<&Arc<FunctionCatalog>> {
902        self.function_by_id.get(&function_id)
903    }
904
905    pub fn get_function_by_name_inputs(
906        &self,
907        name: &str,
908        inputs: &mut [ExprImpl],
909    ) -> Option<&Arc<FunctionCatalog>> {
910        infer_type_with_sigmap(
911            FuncName::Udf(name.to_owned()),
912            inputs,
913            &self.function_registry,
914        )
915        .ok()?;
916        let args = inputs.iter().map(|x| x.return_type()).collect_vec();
917        self.function_by_name.get(name)?.get(&args)
918    }
919
920    pub fn get_function_by_name_args(
921        &self,
922        name: &str,
923        args: &[DataType],
924    ) -> Option<&Arc<FunctionCatalog>> {
925        let args = args.iter().map(|x| Some(x.clone())).collect_vec();
926        let func = infer_type_name(
927            &self.function_registry,
928            FuncName::Udf(name.to_owned()),
929            &args,
930        )
931        .ok()?;
932
933        let args = func
934            .inputs_type
935            .iter()
936            .filter_map(|x| {
937                if let SigDataType::Exact(t) = x {
938                    Some(t.clone())
939                } else {
940                    None
941                }
942            })
943            .collect_vec();
944
945        self.function_by_name.get(name)?.get(&args)
946    }
947
948    pub fn get_functions_by_name(&self, name: &str) -> Option<Vec<&Arc<FunctionCatalog>>> {
949        let functions = self.function_by_name.get(name)?;
950        if functions.is_empty() {
951            return None;
952        }
953        Some(functions.values().collect())
954    }
955
956    pub fn get_connection_by_id(
957        &self,
958        connection_id: &ConnectionId,
959    ) -> Option<&Arc<ConnectionCatalog>> {
960        self.connection_by_id.get(connection_id)
961    }
962
963    pub fn get_connection_by_name(&self, connection_name: &str) -> Option<&Arc<ConnectionCatalog>> {
964        self.connection_by_name.get(connection_name)
965    }
966
967    pub fn get_secret_by_name(&self, secret_name: &str) -> Option<&Arc<SecretCatalog>> {
968        self.secret_by_name.get(secret_name)
969    }
970
971    pub fn get_secret_by_id(&self, secret_id: &SecretId) -> Option<&Arc<SecretCatalog>> {
972        self.secret_by_id.get(secret_id)
973    }
974
975    /// get all sources referencing the connection
976    pub fn get_source_ids_by_connection(
977        &self,
978        connection_id: ConnectionId,
979    ) -> Option<Vec<SourceId>> {
980        self.connection_source_ref
981            .get(&connection_id)
982            .map(|c| c.to_owned())
983    }
984
985    /// get all sinks referencing the connection
986    pub fn get_sink_ids_by_connection(&self, connection_id: ConnectionId) -> Option<Vec<SinkId>> {
987        self.connection_sink_ref
988            .get(&connection_id)
989            .map(|s| s.to_owned())
990    }
991
992    pub fn get_grant_object_by_oid(&self, oid: u32) -> Option<OwnedGrantObject> {
993        #[allow(clippy::manual_map)]
994        if let Some(table) = self.get_created_table_by_id(&TableId::new(oid)) {
995            Some(OwnedGrantObject {
996                owner: table.owner,
997                object: Object::TableId(oid),
998            })
999        } else if let Some(index) = self.get_index_by_id(&IndexId::new(oid)) {
1000            Some(OwnedGrantObject {
1001                owner: index.owner(),
1002                object: Object::TableId(oid),
1003            })
1004        } else if let Some(source) = self.get_source_by_id(&oid) {
1005            Some(OwnedGrantObject {
1006                owner: source.owner,
1007                object: Object::SourceId(oid),
1008            })
1009        } else if let Some(sink) = self.get_sink_by_id(&oid) {
1010            Some(OwnedGrantObject {
1011                owner: sink.owner.user_id,
1012                object: Object::SinkId(oid),
1013            })
1014        } else if let Some(view) = self.get_view_by_id(&oid) {
1015            Some(OwnedGrantObject {
1016                owner: view.owner,
1017                object: Object::ViewId(oid),
1018            })
1019        } else if let Some(function) = self.get_function_by_id(FunctionId::new(oid)) {
1020            Some(OwnedGrantObject {
1021                owner: function.owner(),
1022                object: Object::FunctionId(oid),
1023            })
1024        } else if let Some(subscription) = self.get_subscription_by_id(&oid) {
1025            Some(OwnedGrantObject {
1026                owner: subscription.owner.user_id,
1027                object: Object::SubscriptionId(oid),
1028            })
1029        } else if let Some(connection) = self.get_connection_by_id(&oid) {
1030            Some(OwnedGrantObject {
1031                owner: connection.owner,
1032                object: Object::ConnectionId(oid),
1033            })
1034        } else if let Some(secret) = self.get_secret_by_id(&SecretId::new(oid)) {
1035            Some(OwnedGrantObject {
1036                owner: secret.owner,
1037                object: Object::SecretId(oid),
1038            })
1039        } else {
1040            None
1041        }
1042    }
1043
1044    pub fn contains_object(&self, oid: u32) -> bool {
1045        self.table_by_id.contains_key(&TableId::new(oid))
1046            || self.index_by_id.contains_key(&IndexId::new(oid))
1047            || self.source_by_id.contains_key(&oid)
1048            || self.sink_by_id.contains_key(&oid)
1049            || self.view_by_id.contains_key(&oid)
1050            || self.function_by_id.contains_key(&FunctionId::new(oid))
1051            || self.subscription_by_id.contains_key(&oid)
1052            || self.connection_by_id.contains_key(&oid)
1053    }
1054
1055    pub fn id(&self) -> SchemaId {
1056        self.id
1057    }
1058
1059    pub fn database_id(&self) -> DatabaseId {
1060        self.database_id
1061    }
1062
1063    pub fn name(&self) -> String {
1064        self.name.clone()
1065    }
1066}
1067
1068impl OwnedByUserCatalog for SchemaCatalog {
1069    fn owner(&self) -> UserId {
1070        self.owner
1071    }
1072}
1073
1074impl From<&PbSchema> for SchemaCatalog {
1075    fn from(schema: &PbSchema) -> Self {
1076        Self {
1077            id: schema.id,
1078            owner: schema.owner,
1079            name: schema.name.clone(),
1080            database_id: schema.database_id,
1081            table_by_name: HashMap::new(),
1082            table_by_id: HashMap::new(),
1083            source_by_name: HashMap::new(),
1084            source_by_id: HashMap::new(),
1085            sink_by_name: HashMap::new(),
1086            sink_by_id: HashMap::new(),
1087            table_incoming_sinks: HashMap::new(),
1088            index_by_name: HashMap::new(),
1089            index_by_id: HashMap::new(),
1090            indexes_by_table_id: HashMap::new(),
1091            system_table_by_name: HashMap::new(),
1092            view_by_name: HashMap::new(),
1093            view_by_id: HashMap::new(),
1094            function_registry: FunctionRegistry::default(),
1095            function_by_name: HashMap::new(),
1096            function_by_id: HashMap::new(),
1097            connection_by_name: HashMap::new(),
1098            connection_by_id: HashMap::new(),
1099            secret_by_name: HashMap::new(),
1100            secret_by_id: HashMap::new(),
1101            _secret_source_ref: HashMap::new(),
1102            _secret_sink_ref: HashMap::new(),
1103            connection_source_ref: HashMap::new(),
1104            connection_sink_ref: HashMap::new(),
1105            subscription_by_name: HashMap::new(),
1106            subscription_by_id: HashMap::new(),
1107        }
1108    }
1109}