risingwave_frontend/catalog/
schema_catalog.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry::{Occupied, Vacant};
16use std::collections::{HashMap, HashSet};
17use std::sync::Arc;
18
19use itertools::Itertools;
20use risingwave_common::catalog::{FunctionId, IndexId, StreamJobStatus, TableId};
21use risingwave_common::types::DataType;
22use risingwave_connector::sink::catalog::SinkCatalog;
23pub use risingwave_expr::sig::*;
24use risingwave_pb::catalog::{
25    PbConnection, PbFunction, PbIndex, PbSchema, PbSecret, PbSink, PbSource, PbSubscription,
26    PbTable, PbView,
27};
28use risingwave_pb::user::grant_privilege::Object;
29
30use super::subscription_catalog::SubscriptionCatalog;
31use super::{OwnedByUserCatalog, OwnedGrantObject, SubscriptionId};
32use crate::catalog::connection_catalog::ConnectionCatalog;
33use crate::catalog::function_catalog::FunctionCatalog;
34use crate::catalog::index_catalog::IndexCatalog;
35use crate::catalog::secret_catalog::SecretCatalog;
36use crate::catalog::source_catalog::SourceCatalog;
37use crate::catalog::system_catalog::SystemTableCatalog;
38use crate::catalog::table_catalog::TableCatalog;
39use crate::catalog::view_catalog::ViewCatalog;
40use crate::catalog::{ConnectionId, DatabaseId, SchemaId, SecretId, SinkId, SourceId, ViewId};
41use crate::expr::{Expr, ExprImpl, infer_type_name, infer_type_with_sigmap};
42use crate::user::user_catalog::UserCatalog;
43use crate::user::{UserId, has_access_to_object};
44
45#[derive(Clone, Debug)]
46pub struct SchemaCatalog {
47    id: SchemaId,
48    pub name: String,
49    pub database_id: DatabaseId,
50    /// Contains [all types of "tables"](super::table_catalog::TableType), not only user tables.
51    table_by_name: HashMap<String, Arc<TableCatalog>>,
52    /// Contains [all types of "tables"](super::table_catalog::TableType), not only user tables.
53    table_by_id: HashMap<TableId, Arc<TableCatalog>>,
54    source_by_name: HashMap<String, Arc<SourceCatalog>>,
55    source_by_id: HashMap<SourceId, Arc<SourceCatalog>>,
56    sink_by_name: HashMap<String, Arc<SinkCatalog>>,
57    sink_by_id: HashMap<SinkId, Arc<SinkCatalog>>,
58    /// reverted index of (`sink.target_table` -> `sink_id`s)
59    table_incoming_sinks: HashMap<TableId, HashSet<SinkId>>,
60    subscription_by_name: HashMap<String, Arc<SubscriptionCatalog>>,
61    subscription_by_id: HashMap<SubscriptionId, Arc<SubscriptionCatalog>>,
62    index_by_name: HashMap<String, Arc<IndexCatalog>>,
63    index_by_id: HashMap<IndexId, Arc<IndexCatalog>>,
64    indexes_by_table_id: HashMap<TableId, Vec<Arc<IndexCatalog>>>,
65    view_by_name: HashMap<String, Arc<ViewCatalog>>,
66    view_by_id: HashMap<ViewId, Arc<ViewCatalog>>,
67    function_registry: FunctionRegistry,
68    function_by_name: HashMap<String, HashMap<Vec<DataType>, Arc<FunctionCatalog>>>,
69    function_by_id: HashMap<FunctionId, Arc<FunctionCatalog>>,
70    connection_by_name: HashMap<String, Arc<ConnectionCatalog>>,
71    connection_by_id: HashMap<ConnectionId, Arc<ConnectionCatalog>>,
72    secret_by_name: HashMap<String, Arc<SecretCatalog>>,
73    secret_by_id: HashMap<SecretId, Arc<SecretCatalog>>,
74
75    _secret_source_ref: HashMap<SecretId, Vec<SourceId>>,
76    _secret_sink_ref: HashMap<SecretId, Vec<SinkId>>,
77
78    // This field is currently used only for `show connections`
79    connection_source_ref: HashMap<ConnectionId, Vec<SourceId>>,
80    // This field is currently used only for `show connections`
81    connection_sink_ref: HashMap<ConnectionId, Vec<SinkId>>,
82    // This field only available when schema is "pg_catalog". Meanwhile, others will be empty.
83    system_table_by_name: HashMap<String, Arc<SystemTableCatalog>>,
84    pub owner: u32,
85}
86
87impl SchemaCatalog {
88    pub fn create_table(&mut self, prost: &PbTable) -> Arc<TableCatalog> {
89        let name = prost.name.clone();
90        let id = prost.id.into();
91        let table: TableCatalog = prost.into();
92        let table_ref = Arc::new(table);
93
94        self.table_by_name
95            .try_insert(name, table_ref.clone())
96            .unwrap();
97        self.table_by_id.try_insert(id, table_ref.clone()).unwrap();
98        table_ref
99    }
100
101    pub fn create_sys_table(&mut self, sys_table: Arc<SystemTableCatalog>) {
102        self.system_table_by_name
103            .try_insert(sys_table.name.clone(), sys_table)
104            .unwrap();
105    }
106
107    pub fn create_sys_view(&mut self, sys_view: Arc<ViewCatalog>) {
108        self.view_by_name
109            .try_insert(sys_view.name().to_owned(), sys_view.clone())
110            .unwrap();
111        self.view_by_id
112            .try_insert(sys_view.id, sys_view.clone())
113            .unwrap();
114    }
115
116    pub fn update_table(&mut self, prost: &PbTable) -> Arc<TableCatalog> {
117        let name = prost.name.clone();
118        let id = prost.id.into();
119        let table: TableCatalog = prost.into();
120        let table_ref = Arc::new(table);
121
122        let old_table = self.table_by_id.get(&id).unwrap();
123        // check if the table name gets updated.
124        if old_table.name() != name
125            && let Some(t) = self.table_by_name.get(old_table.name())
126            && t.id == id
127        {
128            self.table_by_name.remove(old_table.name());
129        }
130
131        self.table_by_name.insert(name, table_ref.clone());
132        self.table_by_id.insert(id, table_ref.clone());
133        table_ref
134    }
135
136    pub fn update_index(&mut self, prost: &PbIndex) {
137        let name = prost.name.clone();
138        let id = prost.id.into();
139        let old_index = self.index_by_id.get(&id).unwrap();
140        let index_table = self
141            .get_created_table_by_id(&prost.index_table_id.into())
142            .unwrap();
143        let primary_table = self
144            .get_created_table_by_id(&prost.primary_table_id.into())
145            .unwrap();
146        let index: IndexCatalog = IndexCatalog::build_from(prost, index_table, primary_table);
147        let index_ref = Arc::new(index);
148
149        // check if the index name gets updated.
150        if old_index.name != name
151            && let Some(idx) = self.index_by_name.get(&old_index.name)
152            && idx.id == id
153        {
154            self.index_by_name.remove(&old_index.name);
155        }
156        self.index_by_name.insert(name, index_ref.clone());
157        self.index_by_id.insert(id, index_ref.clone());
158
159        match self.indexes_by_table_id.entry(index_ref.primary_table.id) {
160            Occupied(mut entry) => {
161                let pos = entry
162                    .get()
163                    .iter()
164                    .position(|x| x.id == index_ref.id)
165                    .unwrap();
166                *entry.get_mut().get_mut(pos).unwrap() = index_ref;
167            }
168            Vacant(_entry) => {
169                unreachable!()
170            }
171        };
172    }
173
174    pub fn drop_table(&mut self, id: TableId) {
175        if let Some(table_ref) = self.table_by_id.remove(&id) {
176            self.table_by_name.remove(&table_ref.name).unwrap();
177            self.indexes_by_table_id.remove(&table_ref.id);
178        } else {
179            tracing::warn!(
180                id = ?id.table_id,
181                "table not found when dropping, frontend might not be notified yet"
182            );
183        }
184    }
185
186    pub fn create_index(&mut self, prost: &PbIndex) {
187        let name = prost.name.clone();
188        let id = prost.id.into();
189        let index_table = self.get_table_by_id(&prost.index_table_id.into()).unwrap();
190        let primary_table = self
191            .get_created_table_by_id(&prost.primary_table_id.into())
192            .unwrap();
193        let index: IndexCatalog = IndexCatalog::build_from(prost, index_table, primary_table);
194        let index_ref = Arc::new(index);
195
196        self.index_by_name
197            .try_insert(name, index_ref.clone())
198            .unwrap();
199        self.index_by_id.try_insert(id, index_ref.clone()).unwrap();
200        match self.indexes_by_table_id.entry(index_ref.primary_table.id) {
201            Occupied(mut entry) => {
202                entry.get_mut().push(index_ref);
203            }
204            Vacant(entry) => {
205                entry.insert(vec![index_ref]);
206            }
207        };
208    }
209
210    pub fn drop_index(&mut self, id: IndexId) {
211        let index_ref = self.index_by_id.remove(&id).unwrap();
212        self.index_by_name.remove(&index_ref.name).unwrap();
213        match self.indexes_by_table_id.entry(index_ref.primary_table.id) {
214            Occupied(mut entry) => {
215                let pos = entry
216                    .get_mut()
217                    .iter()
218                    .position(|x| x.id == index_ref.id)
219                    .unwrap();
220                entry.get_mut().remove(pos);
221            }
222            Vacant(_entry) => (),
223        };
224    }
225
226    pub fn create_source(&mut self, prost: &PbSource) {
227        let name = prost.name.clone();
228        let id = prost.id;
229        let source = SourceCatalog::from(prost);
230        let source_ref = Arc::new(source);
231
232        if let Some(connection_id) = source_ref.connection_id {
233            self.connection_source_ref
234                .entry(connection_id)
235                .and_modify(|sources| sources.push(source_ref.id))
236                .or_insert(vec![source_ref.id]);
237        }
238
239        self.source_by_name
240            .try_insert(name, source_ref.clone())
241            .unwrap();
242        self.source_by_id.try_insert(id, source_ref).unwrap();
243    }
244
245    pub fn drop_source(&mut self, id: SourceId) {
246        let source_ref = self.source_by_id.remove(&id).unwrap();
247        self.source_by_name.remove(&source_ref.name).unwrap();
248        if let Some(connection_id) = source_ref.connection_id
249            && let Occupied(mut e) = self.connection_source_ref.entry(connection_id)
250        {
251            let source_ids = e.get_mut();
252            source_ids.retain_mut(|sid| *sid != id);
253            if source_ids.is_empty() {
254                e.remove_entry();
255            }
256        }
257    }
258
259    pub fn update_source(&mut self, prost: &PbSource) {
260        let name = prost.name.clone();
261        let id = prost.id;
262        let source = SourceCatalog::from(prost);
263        let source_ref = Arc::new(source);
264
265        let old_source = self.source_by_id.get(&id).unwrap();
266        // check if the source name gets updated.
267        if old_source.name != name
268            && let Some(src) = self.source_by_name.get(&old_source.name)
269            && src.id == id
270        {
271            self.source_by_name.remove(&old_source.name);
272        }
273
274        self.source_by_name.insert(name, source_ref.clone());
275        self.source_by_id.insert(id, source_ref);
276    }
277
278    pub fn create_sink(&mut self, prost: &PbSink) {
279        let name = prost.name.clone();
280        let id = prost.id;
281        let sink = SinkCatalog::from(prost);
282        let sink_ref = Arc::new(sink);
283
284        if let Some(connection_id) = sink_ref.connection_id {
285            self.connection_sink_ref
286                .entry(connection_id.0)
287                .and_modify(|sinks| sinks.push(id))
288                .or_insert(vec![id]);
289        }
290
291        if let Some(target_table) = sink_ref.target_table {
292            assert!(
293                self.table_incoming_sinks
294                    .entry(target_table)
295                    .or_default()
296                    .insert(sink_ref.id.sink_id)
297            );
298        }
299
300        self.sink_by_name
301            .try_insert(name, sink_ref.clone())
302            .unwrap();
303        self.sink_by_id.try_insert(id, sink_ref).unwrap();
304    }
305
306    pub fn drop_sink(&mut self, id: SinkId) {
307        if let Some(sink_ref) = self.sink_by_id.remove(&id) {
308            self.sink_by_name.remove(&sink_ref.name).unwrap();
309            if let Some(connection_id) = sink_ref.connection_id
310                && let Occupied(mut e) = self.connection_sink_ref.entry(connection_id.0)
311            {
312                let sink_ids = e.get_mut();
313                sink_ids.retain_mut(|sid| *sid != id);
314                if sink_ids.is_empty() {
315                    e.remove_entry();
316                }
317            }
318            if let Some(target_table) = sink_ref.target_table {
319                let incoming_sinks = self
320                    .table_incoming_sinks
321                    .get_mut(&target_table)
322                    .expect("should exists");
323                assert!(incoming_sinks.remove(&sink_ref.id.sink_id));
324                if incoming_sinks.is_empty() {
325                    self.table_incoming_sinks.remove(&target_table);
326                }
327            }
328        } else {
329            tracing::warn!(
330                id,
331                "sink not found when dropping, frontend might not be notified yet"
332            );
333        }
334    }
335
336    pub fn update_sink(&mut self, prost: &PbSink) {
337        let name = prost.name.clone();
338        let id = prost.id;
339        let sink = SinkCatalog::from(prost);
340        let sink_ref = Arc::new(sink);
341
342        let old_sink = self.sink_by_id.get(&id).unwrap();
343        assert_eq!(sink_ref.target_table, old_sink.target_table);
344        // check if the sink name gets updated.
345        if old_sink.name != name
346            && let Some(s) = self.sink_by_name.get(&old_sink.name)
347            && s.id.sink_id == id
348        {
349            self.sink_by_name.remove(&old_sink.name);
350        }
351
352        self.sink_by_name.insert(name, sink_ref.clone());
353        self.sink_by_id.insert(id, sink_ref);
354    }
355
356    pub fn table_incoming_sinks(&self, table_id: TableId) -> Option<&HashSet<SinkId>> {
357        self.table_incoming_sinks.get(&table_id)
358    }
359
360    pub fn create_subscription(&mut self, prost: &PbSubscription) {
361        let name = prost.name.clone();
362        let id = prost.id;
363        let subscription_catalog = SubscriptionCatalog::from(prost);
364        let subscription_ref = Arc::new(subscription_catalog);
365
366        self.subscription_by_name
367            .try_insert(name, subscription_ref.clone())
368            .unwrap();
369        self.subscription_by_id
370            .try_insert(id, subscription_ref)
371            .unwrap();
372    }
373
374    pub fn drop_subscription(&mut self, id: SubscriptionId) {
375        let subscription_ref = self.subscription_by_id.remove(&id);
376        if let Some(subscription_ref) = subscription_ref {
377            self.subscription_by_name.remove(&subscription_ref.name);
378        }
379    }
380
381    pub fn update_subscription(&mut self, prost: &PbSubscription) {
382        let name = prost.name.clone();
383        let id = prost.id;
384        let subscription = SubscriptionCatalog::from(prost);
385        let subscription_ref = Arc::new(subscription);
386
387        let old_subscription = self.subscription_by_id.get(&id).unwrap();
388        // check if the subscription name gets updated.
389        if old_subscription.name != name
390            && let Some(s) = self.subscription_by_name.get(&old_subscription.name)
391            && s.id.subscription_id == id
392        {
393            self.subscription_by_name.remove(&old_subscription.name);
394        }
395
396        self.subscription_by_name
397            .insert(name, subscription_ref.clone());
398        self.subscription_by_id.insert(id, subscription_ref);
399    }
400
401    pub fn create_view(&mut self, prost: &PbView) {
402        let name = prost.name.clone();
403        let id = prost.id;
404        let view = ViewCatalog::from(prost);
405        let view_ref = Arc::new(view);
406
407        self.view_by_name
408            .try_insert(name, view_ref.clone())
409            .unwrap();
410        self.view_by_id.try_insert(id, view_ref).unwrap();
411    }
412
413    pub fn drop_view(&mut self, id: ViewId) {
414        let view_ref = self.view_by_id.remove(&id).unwrap();
415        self.view_by_name.remove(&view_ref.name).unwrap();
416    }
417
418    pub fn update_view(&mut self, prost: &PbView) {
419        let name = prost.name.clone();
420        let id = prost.id;
421        let view = ViewCatalog::from(prost);
422        let view_ref = Arc::new(view);
423
424        let old_view = self.view_by_id.get(&id).unwrap();
425        // check if the view name gets updated.
426        if old_view.name != name
427            && let Some(v) = self.view_by_name.get(old_view.name())
428            && v.id == id
429        {
430            self.view_by_name.remove(&old_view.name);
431        }
432
433        self.view_by_name.insert(name, view_ref.clone());
434        self.view_by_id.insert(id, view_ref);
435    }
436
437    pub fn get_func_sign(func: &FunctionCatalog) -> FuncSign {
438        FuncSign {
439            name: FuncName::Udf(func.name.clone()),
440            inputs_type: func
441                .arg_types
442                .iter()
443                .map(|t| t.clone().into())
444                .collect_vec(),
445            variadic: false,
446            ret_type: func.return_type.clone().into(),
447            build: FuncBuilder::Udf,
448            // dummy type infer, will not use this result
449            type_infer: |_| Ok(DataType::Boolean),
450            deprecated: false,
451        }
452    }
453
454    pub fn create_function(&mut self, prost: &PbFunction) {
455        let name = prost.name.clone();
456        let id = prost.id;
457        let function = FunctionCatalog::from(prost);
458        let args = function.arg_types.clone();
459        let function_ref = Arc::new(function);
460
461        self.function_registry
462            .insert(Self::get_func_sign(&function_ref));
463        self.function_by_name
464            .entry(name)
465            .or_default()
466            .try_insert(args, function_ref.clone())
467            .expect("function already exists with same argument types");
468        self.function_by_id
469            .try_insert(id.into(), function_ref)
470            .expect("function id exists");
471    }
472
473    pub fn drop_function(&mut self, id: FunctionId) {
474        let function_ref = self
475            .function_by_id
476            .remove(&id)
477            .expect("function not found by id");
478
479        self.function_registry
480            .remove(Self::get_func_sign(&function_ref))
481            .expect("function not found in registry");
482
483        self.function_by_name
484            .get_mut(&function_ref.name)
485            .expect("function not found by name")
486            .remove(&function_ref.arg_types)
487            .expect("function not found by argument types");
488    }
489
490    pub fn update_function(&mut self, prost: &PbFunction) {
491        let name = prost.name.clone();
492        let id = prost.id.into();
493        let function = FunctionCatalog::from(prost);
494        let function_ref = Arc::new(function);
495
496        let old_function_by_id = self.function_by_id.get(&id).unwrap();
497        let old_function_by_name = self
498            .function_by_name
499            .get_mut(&old_function_by_id.name)
500            .unwrap();
501        // check if the function name gets updated.
502        if old_function_by_id.name != name
503            && let Some(f) = old_function_by_name.get(&old_function_by_id.arg_types)
504            && f.id == id
505        {
506            old_function_by_name.remove(&old_function_by_id.arg_types);
507            if old_function_by_name.is_empty() {
508                self.function_by_name.remove(&old_function_by_id.name);
509            }
510        }
511
512        self.function_by_name
513            .entry(name)
514            .or_default()
515            .insert(old_function_by_id.arg_types.clone(), function_ref.clone());
516        self.function_by_id.insert(id, function_ref);
517    }
518
519    pub fn create_connection(&mut self, prost: &PbConnection) {
520        let name = prost.name.clone();
521        let id = prost.id;
522        let connection = ConnectionCatalog::from(prost);
523        let connection_ref = Arc::new(connection);
524        self.connection_by_name
525            .try_insert(name, connection_ref.clone())
526            .unwrap();
527        self.connection_by_id
528            .try_insert(id, connection_ref)
529            .unwrap();
530    }
531
532    pub fn update_connection(&mut self, prost: &PbConnection) {
533        let name = prost.name.clone();
534        let id = prost.id;
535        let connection = ConnectionCatalog::from(prost);
536        let connection_ref = Arc::new(connection);
537
538        let old_connection = self.connection_by_id.get(&id).unwrap();
539        // check if the connection name gets updated.
540        if old_connection.name != name
541            && let Some(conn) = self.connection_by_name.get(&old_connection.name)
542            && conn.id == id
543        {
544            self.connection_by_name.remove(&old_connection.name);
545        }
546
547        self.connection_by_name.insert(name, connection_ref.clone());
548        self.connection_by_id.insert(id, connection_ref);
549    }
550
551    pub fn drop_connection(&mut self, connection_id: ConnectionId) {
552        let connection_ref = self
553            .connection_by_id
554            .remove(&connection_id)
555            .expect("connection not found by id");
556        self.connection_by_name
557            .remove(&connection_ref.name)
558            .expect("connection not found by name");
559    }
560
561    pub fn create_secret(&mut self, prost: &PbSecret) {
562        let name = prost.name.clone();
563        let id = SecretId::new(prost.id);
564        let secret = SecretCatalog::from(prost);
565        let secret_ref = Arc::new(secret);
566
567        self.secret_by_id
568            .try_insert(id, secret_ref.clone())
569            .unwrap();
570        self.secret_by_name.try_insert(name, secret_ref).unwrap();
571    }
572
573    pub fn update_secret(&mut self, prost: &PbSecret) {
574        let name = prost.name.clone();
575        let id = SecretId::new(prost.id);
576        let secret = SecretCatalog::from(prost);
577        let secret_ref = Arc::new(secret);
578
579        let old_secret = self.secret_by_id.get(&id).unwrap();
580        // check if the secret name gets updated.
581        if old_secret.name != name
582            && let Some(s) = self.secret_by_name.get(&old_secret.name)
583            && s.id == id
584        {
585            self.secret_by_name.remove(&old_secret.name);
586        }
587
588        self.secret_by_name.insert(name, secret_ref.clone());
589        self.secret_by_id.insert(id, secret_ref);
590    }
591
592    pub fn drop_secret(&mut self, secret_id: SecretId) {
593        let secret_ref = self
594            .secret_by_id
595            .remove(&secret_id)
596            .expect("secret not found by id");
597        self.secret_by_name
598            .remove(&secret_ref.name)
599            .expect("secret not found by name");
600    }
601
602    pub fn iter_all(&self) -> impl Iterator<Item = &Arc<TableCatalog>> {
603        self.table_by_name.values()
604    }
605
606    pub fn iter_user_table(&self) -> impl Iterator<Item = &Arc<TableCatalog>> {
607        self.table_by_name.values().filter(|v| v.is_user_table())
608    }
609
610    pub fn iter_user_table_with_acl<'a>(
611        &'a self,
612        user: &'a UserCatalog,
613    ) -> impl Iterator<Item = &'a Arc<TableCatalog>> {
614        self.table_by_name
615            .values()
616            .filter(|v| v.is_user_table() && has_access_to_object(user, v.id.table_id, v.owner))
617    }
618
619    pub fn iter_internal_table(&self) -> impl Iterator<Item = &Arc<TableCatalog>> {
620        self.table_by_name
621            .values()
622            .filter(|v| v.is_internal_table())
623    }
624
625    pub fn iter_internal_table_with_acl<'a>(
626        &'a self,
627        user: &'a UserCatalog,
628    ) -> impl Iterator<Item = &'a Arc<TableCatalog>> {
629        self.table_by_name
630            .values()
631            .filter(|v| v.is_internal_table() && has_access_to_object(user, v.id.table_id, v.owner))
632    }
633
634    /// Iterate all non-internal tables, including user tables, materialized views and indices.
635    pub fn iter_table_mv_indices(&self) -> impl Iterator<Item = &Arc<TableCatalog>> {
636        self.table_by_name
637            .values()
638            .filter(|v| !v.is_internal_table())
639    }
640
641    pub fn iter_table_mv_indices_with_acl<'a>(
642        &'a self,
643        user: &'a UserCatalog,
644    ) -> impl Iterator<Item = &'a Arc<TableCatalog>> {
645        self.table_by_name.values().filter(|v| {
646            !v.is_internal_table() && has_access_to_object(user, v.id.table_id, v.owner)
647        })
648    }
649
650    /// Iterate all materialized views, excluding the indices.
651    pub fn iter_all_mvs(&self) -> impl Iterator<Item = &Arc<TableCatalog>> {
652        self.table_by_name.values().filter(|v| v.is_mview())
653    }
654
655    pub fn iter_all_mvs_with_acl<'a>(
656        &'a self,
657        user: &'a UserCatalog,
658    ) -> impl Iterator<Item = &'a Arc<TableCatalog>> {
659        self.table_by_name
660            .values()
661            .filter(|v| v.is_mview() && has_access_to_object(user, v.id.table_id, v.owner))
662    }
663
664    /// Iterate created materialized views, excluding the indices.
665    pub fn iter_created_mvs(&self) -> impl Iterator<Item = &Arc<TableCatalog>> {
666        self.table_by_name
667            .values()
668            .filter(|v| v.is_mview() && v.is_created())
669    }
670
671    pub fn iter_created_mvs_with_acl<'a>(
672        &'a self,
673        user: &'a UserCatalog,
674    ) -> impl Iterator<Item = &'a Arc<TableCatalog>> {
675        self.table_by_name.values().filter(|v| {
676            v.is_mview() && v.is_created() && has_access_to_object(user, v.id.table_id, v.owner)
677        })
678    }
679
680    /// Iterate all indices
681    pub fn iter_index(&self) -> impl Iterator<Item = &Arc<IndexCatalog>> {
682        self.index_by_name.values()
683    }
684
685    pub fn iter_index_with_acl<'a>(
686        &'a self,
687        user: &'a UserCatalog,
688    ) -> impl Iterator<Item = &'a Arc<IndexCatalog>> {
689        self.index_by_name
690            .values()
691            .filter(|idx| has_access_to_object(user, idx.id.index_id, idx.owner()))
692    }
693
694    /// Iterate all sources
695    pub fn iter_source(&self) -> impl Iterator<Item = &Arc<SourceCatalog>> {
696        self.source_by_name.values()
697    }
698
699    pub fn iter_source_with_acl<'a>(
700        &'a self,
701        user: &'a UserCatalog,
702    ) -> impl Iterator<Item = &'a Arc<SourceCatalog>> {
703        self.source_by_name
704            .values()
705            .filter(|s| has_access_to_object(user, s.id, s.owner))
706    }
707
708    pub fn iter_sink(&self) -> impl Iterator<Item = &Arc<SinkCatalog>> {
709        self.sink_by_name.values()
710    }
711
712    pub fn iter_sink_with_acl<'a>(
713        &'a self,
714        user: &'a UserCatalog,
715    ) -> impl Iterator<Item = &'a Arc<SinkCatalog>> {
716        self.sink_by_name
717            .values()
718            .filter(|s| has_access_to_object(user, s.id.sink_id, s.owner.user_id))
719    }
720
721    pub fn iter_subscription(&self) -> impl Iterator<Item = &Arc<SubscriptionCatalog>> {
722        self.subscription_by_name.values()
723    }
724
725    pub fn iter_subscription_with_acl<'a>(
726        &'a self,
727        user: &'a UserCatalog,
728    ) -> impl Iterator<Item = &'a Arc<SubscriptionCatalog>> {
729        self.subscription_by_name
730            .values()
731            .filter(|s| has_access_to_object(user, s.id.subscription_id, s.owner.user_id))
732    }
733
734    pub fn iter_view(&self) -> impl Iterator<Item = &Arc<ViewCatalog>> {
735        self.view_by_name.values()
736    }
737
738    pub fn iter_view_with_acl<'a>(
739        &'a self,
740        user: &'a UserCatalog,
741    ) -> impl Iterator<Item = &'a Arc<ViewCatalog>> {
742        self.view_by_name
743            .values()
744            .filter(|v| v.is_system_view() || has_access_to_object(user, v.id, v.owner))
745    }
746
747    pub fn iter_function(&self) -> impl Iterator<Item = &Arc<FunctionCatalog>> {
748        self.function_by_name.values().flat_map(|v| v.values())
749    }
750
751    pub fn iter_function_with_acl<'a>(
752        &'a self,
753        user: &'a UserCatalog,
754    ) -> impl Iterator<Item = &'a Arc<FunctionCatalog>> {
755        self.function_by_name
756            .values()
757            .flat_map(|v| v.values())
758            .filter(|f| has_access_to_object(user, f.id.function_id(), f.owner))
759    }
760
761    pub fn iter_connections(&self) -> impl Iterator<Item = &Arc<ConnectionCatalog>> {
762        self.connection_by_name.values()
763    }
764
765    pub fn iter_connections_with_acl<'a>(
766        &'a self,
767        user: &'a UserCatalog,
768    ) -> impl Iterator<Item = &'a Arc<ConnectionCatalog>> {
769        self.connection_by_name
770            .values()
771            .filter(|c| has_access_to_object(user, c.id, c.owner))
772    }
773
774    pub fn iter_secret(&self) -> impl Iterator<Item = &Arc<SecretCatalog>> {
775        self.secret_by_name.values()
776    }
777
778    pub fn iter_secret_with_acl<'a>(
779        &'a self,
780        user: &'a UserCatalog,
781    ) -> impl Iterator<Item = &'a Arc<SecretCatalog>> {
782        self.secret_by_name
783            .values()
784            .filter(|s| has_access_to_object(user, s.id.secret_id(), s.owner))
785    }
786
787    pub fn iter_system_tables(&self) -> impl Iterator<Item = &Arc<SystemTableCatalog>> {
788        self.system_table_by_name.values()
789    }
790
791    pub fn get_table_by_name(
792        &self,
793        table_name: &str,
794        bind_creating_relations: bool,
795    ) -> Option<&Arc<TableCatalog>> {
796        self.table_by_name
797            .get(table_name)
798            .filter(|&table| bind_creating_relations || table.is_created())
799    }
800
801    pub fn get_any_table_by_name(&self, table_name: &str) -> Option<&Arc<TableCatalog>> {
802        self.get_table_by_name(table_name, true)
803    }
804
805    pub fn get_created_table_by_name(&self, table_name: &str) -> Option<&Arc<TableCatalog>> {
806        self.get_table_by_name(table_name, false)
807    }
808
809    pub fn get_table_by_id(&self, table_id: &TableId) -> Option<&Arc<TableCatalog>> {
810        self.table_by_id.get(table_id)
811    }
812
813    pub fn get_created_table_by_id(&self, table_id: &TableId) -> Option<&Arc<TableCatalog>> {
814        self.table_by_id
815            .get(table_id)
816            .filter(|&table| table.stream_job_status == StreamJobStatus::Created)
817    }
818
819    pub fn get_view_by_name(&self, view_name: &str) -> Option<&Arc<ViewCatalog>> {
820        self.view_by_name.get(view_name)
821    }
822
823    pub fn get_view_by_id(&self, view_id: &ViewId) -> Option<&Arc<ViewCatalog>> {
824        self.view_by_id.get(view_id)
825    }
826
827    pub fn get_source_by_name(&self, source_name: &str) -> Option<&Arc<SourceCatalog>> {
828        self.source_by_name.get(source_name)
829    }
830
831    pub fn get_source_by_id(&self, source_id: &SourceId) -> Option<&Arc<SourceCatalog>> {
832        self.source_by_id.get(source_id)
833    }
834
835    pub fn get_sink_by_name(
836        &self,
837        sink_name: &str,
838        bind_creating: bool,
839    ) -> Option<&Arc<SinkCatalog>> {
840        self.sink_by_name
841            .get(sink_name)
842            .filter(|s| bind_creating || s.is_created())
843    }
844
845    pub fn get_any_sink_by_name(&self, sink_name: &str) -> Option<&Arc<SinkCatalog>> {
846        self.get_sink_by_name(sink_name, true)
847    }
848
849    pub fn get_created_sink_by_name(&self, sink_name: &str) -> Option<&Arc<SinkCatalog>> {
850        self.get_sink_by_name(sink_name, false)
851    }
852
853    pub fn get_sink_by_id(&self, sink_id: &SinkId) -> Option<&Arc<SinkCatalog>> {
854        self.sink_by_id.get(sink_id)
855    }
856
857    pub fn get_subscription_by_name(
858        &self,
859        subscription_name: &str,
860    ) -> Option<&Arc<SubscriptionCatalog>> {
861        self.subscription_by_name.get(subscription_name)
862    }
863
864    pub fn get_subscription_by_id(
865        &self,
866        subscription_id: &SubscriptionId,
867    ) -> Option<&Arc<SubscriptionCatalog>> {
868        self.subscription_by_id.get(subscription_id)
869    }
870
871    pub fn get_index_by_name(
872        &self,
873        index_name: &str,
874        bind_creating: bool,
875    ) -> Option<&Arc<IndexCatalog>> {
876        self.index_by_name
877            .get(index_name)
878            .filter(|i| bind_creating || i.is_created())
879    }
880
881    pub fn get_any_index_by_name(&self, index_name: &str) -> Option<&Arc<IndexCatalog>> {
882        self.get_index_by_name(index_name, true)
883    }
884
885    pub fn get_created_index_by_name(&self, index_name: &str) -> Option<&Arc<IndexCatalog>> {
886        self.get_index_by_name(index_name, false)
887    }
888
889    pub fn get_index_by_id(&self, index_id: &IndexId) -> Option<&Arc<IndexCatalog>> {
890        self.index_by_id.get(index_id)
891    }
892
893    pub fn get_indexes_by_table_id(
894        &self,
895        table_id: &TableId,
896        include_creating: bool,
897    ) -> Vec<Arc<IndexCatalog>> {
898        self.indexes_by_table_id
899            .get(table_id)
900            .cloned()
901            .unwrap_or_default()
902            .into_iter()
903            .filter(|i| include_creating || i.is_created())
904            .collect()
905    }
906
907    pub fn get_any_indexes_by_table_id(&self, table_id: &TableId) -> Vec<Arc<IndexCatalog>> {
908        self.get_indexes_by_table_id(table_id, true)
909    }
910
911    pub fn get_created_indexes_by_table_id(&self, table_id: &TableId) -> Vec<Arc<IndexCatalog>> {
912        self.get_indexes_by_table_id(table_id, false)
913    }
914
915    pub fn get_system_table_by_name(&self, table_name: &str) -> Option<&Arc<SystemTableCatalog>> {
916        self.system_table_by_name.get(table_name)
917    }
918
919    pub fn get_table_name_by_id(&self, table_id: TableId) -> Option<String> {
920        self.table_by_id
921            .get(&table_id)
922            .map(|table| table.name.clone())
923    }
924
925    pub fn get_function_by_id(&self, function_id: FunctionId) -> Option<&Arc<FunctionCatalog>> {
926        self.function_by_id.get(&function_id)
927    }
928
929    pub fn get_function_by_name_inputs(
930        &self,
931        name: &str,
932        inputs: &mut [ExprImpl],
933    ) -> Option<&Arc<FunctionCatalog>> {
934        infer_type_with_sigmap(
935            FuncName::Udf(name.to_owned()),
936            inputs,
937            &self.function_registry,
938        )
939        .ok()?;
940        let args = inputs.iter().map(|x| x.return_type()).collect_vec();
941        self.function_by_name.get(name)?.get(&args)
942    }
943
944    pub fn get_function_by_name_args(
945        &self,
946        name: &str,
947        args: &[DataType],
948    ) -> Option<&Arc<FunctionCatalog>> {
949        let args = args.iter().map(|x| Some(x.clone())).collect_vec();
950        let func = infer_type_name(
951            &self.function_registry,
952            FuncName::Udf(name.to_owned()),
953            &args,
954        )
955        .ok()?;
956
957        let args = func
958            .inputs_type
959            .iter()
960            .filter_map(|x| {
961                if let SigDataType::Exact(t) = x {
962                    Some(t.clone())
963                } else {
964                    None
965                }
966            })
967            .collect_vec();
968
969        self.function_by_name.get(name)?.get(&args)
970    }
971
972    pub fn get_functions_by_name(&self, name: &str) -> Option<Vec<&Arc<FunctionCatalog>>> {
973        let functions = self.function_by_name.get(name)?;
974        if functions.is_empty() {
975            return None;
976        }
977        Some(functions.values().collect())
978    }
979
980    pub fn get_connection_by_id(
981        &self,
982        connection_id: &ConnectionId,
983    ) -> Option<&Arc<ConnectionCatalog>> {
984        self.connection_by_id.get(connection_id)
985    }
986
987    pub fn get_connection_by_name(&self, connection_name: &str) -> Option<&Arc<ConnectionCatalog>> {
988        self.connection_by_name.get(connection_name)
989    }
990
991    pub fn get_secret_by_name(&self, secret_name: &str) -> Option<&Arc<SecretCatalog>> {
992        self.secret_by_name.get(secret_name)
993    }
994
995    pub fn get_secret_by_id(&self, secret_id: &SecretId) -> Option<&Arc<SecretCatalog>> {
996        self.secret_by_id.get(secret_id)
997    }
998
999    /// get all sources referencing the connection
1000    pub fn get_source_ids_by_connection(
1001        &self,
1002        connection_id: ConnectionId,
1003    ) -> Option<Vec<SourceId>> {
1004        self.connection_source_ref
1005            .get(&connection_id)
1006            .map(|c| c.to_owned())
1007    }
1008
1009    /// get all sinks referencing the connection
1010    pub fn get_sink_ids_by_connection(&self, connection_id: ConnectionId) -> Option<Vec<SinkId>> {
1011        self.connection_sink_ref
1012            .get(&connection_id)
1013            .map(|s| s.to_owned())
1014    }
1015
1016    pub fn get_grant_object_by_oid(&self, oid: u32) -> Option<OwnedGrantObject> {
1017        #[allow(clippy::manual_map)]
1018        if let Some(table) = self.get_created_table_by_id(&TableId::new(oid)) {
1019            Some(OwnedGrantObject {
1020                owner: table.owner,
1021                object: Object::TableId(oid),
1022            })
1023        } else if let Some(index) = self.get_index_by_id(&IndexId::new(oid)) {
1024            Some(OwnedGrantObject {
1025                owner: index.owner(),
1026                object: Object::TableId(oid),
1027            })
1028        } else if let Some(source) = self.get_source_by_id(&oid) {
1029            Some(OwnedGrantObject {
1030                owner: source.owner,
1031                object: Object::SourceId(oid),
1032            })
1033        } else if let Some(sink) = self.get_sink_by_id(&oid) {
1034            Some(OwnedGrantObject {
1035                owner: sink.owner.user_id,
1036                object: Object::SinkId(oid),
1037            })
1038        } else if let Some(view) = self.get_view_by_id(&oid) {
1039            Some(OwnedGrantObject {
1040                owner: view.owner,
1041                object: Object::ViewId(oid),
1042            })
1043        } else if let Some(function) = self.get_function_by_id(FunctionId::new(oid)) {
1044            Some(OwnedGrantObject {
1045                owner: function.owner(),
1046                object: Object::FunctionId(oid),
1047            })
1048        } else if let Some(subscription) = self.get_subscription_by_id(&oid) {
1049            Some(OwnedGrantObject {
1050                owner: subscription.owner.user_id,
1051                object: Object::SubscriptionId(oid),
1052            })
1053        } else if let Some(connection) = self.get_connection_by_id(&oid) {
1054            Some(OwnedGrantObject {
1055                owner: connection.owner,
1056                object: Object::ConnectionId(oid),
1057            })
1058        } else if let Some(secret) = self.get_secret_by_id(&SecretId::new(oid)) {
1059            Some(OwnedGrantObject {
1060                owner: secret.owner,
1061                object: Object::SecretId(oid),
1062            })
1063        } else {
1064            None
1065        }
1066    }
1067
1068    pub fn contains_object(&self, oid: u32) -> bool {
1069        self.table_by_id.contains_key(&TableId::new(oid))
1070            || self.index_by_id.contains_key(&IndexId::new(oid))
1071            || self.source_by_id.contains_key(&oid)
1072            || self.sink_by_id.contains_key(&oid)
1073            || self.view_by_id.contains_key(&oid)
1074            || self.function_by_id.contains_key(&FunctionId::new(oid))
1075            || self.subscription_by_id.contains_key(&oid)
1076            || self.connection_by_id.contains_key(&oid)
1077    }
1078
1079    pub fn id(&self) -> SchemaId {
1080        self.id
1081    }
1082
1083    pub fn database_id(&self) -> DatabaseId {
1084        self.database_id
1085    }
1086
1087    pub fn name(&self) -> String {
1088        self.name.clone()
1089    }
1090}
1091
1092impl OwnedByUserCatalog for SchemaCatalog {
1093    fn owner(&self) -> UserId {
1094        self.owner
1095    }
1096}
1097
1098impl From<&PbSchema> for SchemaCatalog {
1099    fn from(schema: &PbSchema) -> Self {
1100        Self {
1101            id: schema.id,
1102            owner: schema.owner,
1103            name: schema.name.clone(),
1104            database_id: schema.database_id,
1105            table_by_name: HashMap::new(),
1106            table_by_id: HashMap::new(),
1107            source_by_name: HashMap::new(),
1108            source_by_id: HashMap::new(),
1109            sink_by_name: HashMap::new(),
1110            sink_by_id: HashMap::new(),
1111            table_incoming_sinks: HashMap::new(),
1112            index_by_name: HashMap::new(),
1113            index_by_id: HashMap::new(),
1114            indexes_by_table_id: HashMap::new(),
1115            system_table_by_name: HashMap::new(),
1116            view_by_name: HashMap::new(),
1117            view_by_id: HashMap::new(),
1118            function_registry: FunctionRegistry::default(),
1119            function_by_name: HashMap::new(),
1120            function_by_id: HashMap::new(),
1121            connection_by_name: HashMap::new(),
1122            connection_by_id: HashMap::new(),
1123            secret_by_name: HashMap::new(),
1124            secret_by_id: HashMap::new(),
1125            _secret_source_ref: HashMap::new(),
1126            _secret_sink_ref: HashMap::new(),
1127            connection_source_ref: HashMap::new(),
1128            connection_sink_ref: HashMap::new(),
1129            subscription_by_name: HashMap::new(),
1130            subscription_by_id: HashMap::new(),
1131        }
1132    }
1133}