risingwave_frontend/catalog/
schema_catalog.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::collections::hash_map::Entry::{Occupied, Vacant};
17use std::sync::Arc;
18
19use itertools::Itertools;
20use risingwave_common::catalog::{FunctionId, IndexId, StreamJobStatus, TableId};
21use risingwave_common::types::DataType;
22use risingwave_connector::sink::catalog::SinkCatalog;
23pub use risingwave_expr::sig::*;
24use risingwave_pb::catalog::{
25    PbConnection, PbFunction, PbIndex, PbSchema, PbSecret, PbSink, PbSource, PbSubscription,
26    PbTable, PbView,
27};
28use risingwave_pb::user::grant_privilege::Object;
29
30use super::subscription_catalog::SubscriptionCatalog;
31use super::{OwnedByUserCatalog, SubscriptionId};
32use crate::catalog::connection_catalog::ConnectionCatalog;
33use crate::catalog::function_catalog::FunctionCatalog;
34use crate::catalog::index_catalog::IndexCatalog;
35use crate::catalog::secret_catalog::SecretCatalog;
36use crate::catalog::source_catalog::SourceCatalog;
37use crate::catalog::system_catalog::SystemTableCatalog;
38use crate::catalog::table_catalog::TableCatalog;
39use crate::catalog::view_catalog::ViewCatalog;
40use crate::catalog::{ConnectionId, DatabaseId, SchemaId, SecretId, SinkId, SourceId, ViewId};
41use crate::expr::{Expr, ExprImpl, infer_type_name, infer_type_with_sigmap};
42use crate::user::user_catalog::UserCatalog;
43use crate::user::{UserId, has_access_to_object};
44
45#[derive(Clone, Debug)]
46pub struct SchemaCatalog {
47    id: SchemaId,
48    pub name: String,
49    pub database_id: DatabaseId,
50    /// Contains [all types of "tables"](super::table_catalog::TableType), not only user tables.
51    table_by_name: HashMap<String, Arc<TableCatalog>>,
52    /// Contains [all types of "tables"](super::table_catalog::TableType), not only user tables.
53    table_by_id: HashMap<TableId, Arc<TableCatalog>>,
54    source_by_name: HashMap<String, Arc<SourceCatalog>>,
55    source_by_id: HashMap<SourceId, Arc<SourceCatalog>>,
56    sink_by_name: HashMap<String, Arc<SinkCatalog>>,
57    sink_by_id: HashMap<SinkId, Arc<SinkCatalog>>,
58    subscription_by_name: HashMap<String, Arc<SubscriptionCatalog>>,
59    subscription_by_id: HashMap<SubscriptionId, Arc<SubscriptionCatalog>>,
60    index_by_name: HashMap<String, Arc<IndexCatalog>>,
61    index_by_id: HashMap<IndexId, Arc<IndexCatalog>>,
62    indexes_by_table_id: HashMap<TableId, Vec<Arc<IndexCatalog>>>,
63    view_by_name: HashMap<String, Arc<ViewCatalog>>,
64    view_by_id: HashMap<ViewId, Arc<ViewCatalog>>,
65    function_registry: FunctionRegistry,
66    function_by_name: HashMap<String, HashMap<Vec<DataType>, Arc<FunctionCatalog>>>,
67    function_by_id: HashMap<FunctionId, Arc<FunctionCatalog>>,
68    connection_by_name: HashMap<String, Arc<ConnectionCatalog>>,
69    connection_by_id: HashMap<ConnectionId, Arc<ConnectionCatalog>>,
70    secret_by_name: HashMap<String, Arc<SecretCatalog>>,
71    secret_by_id: HashMap<SecretId, Arc<SecretCatalog>>,
72
73    _secret_source_ref: HashMap<SecretId, Vec<SourceId>>,
74    _secret_sink_ref: HashMap<SecretId, Vec<SinkId>>,
75
76    // This field is currently used only for `show connections`
77    connection_source_ref: HashMap<ConnectionId, Vec<SourceId>>,
78    // This field is currently used only for `show connections`
79    connection_sink_ref: HashMap<ConnectionId, Vec<SinkId>>,
80    // This field only available when schema is "pg_catalog". Meanwhile, others will be empty.
81    system_table_by_name: HashMap<String, Arc<SystemTableCatalog>>,
82    pub owner: u32,
83}
84
85impl SchemaCatalog {
86    pub fn create_table(&mut self, prost: &PbTable) -> Arc<TableCatalog> {
87        let name = prost.name.clone();
88        let id = prost.id.into();
89        let table: TableCatalog = prost.into();
90        let table_ref = Arc::new(table);
91
92        self.table_by_name
93            .try_insert(name, table_ref.clone())
94            .unwrap();
95        self.table_by_id.try_insert(id, table_ref.clone()).unwrap();
96        table_ref
97    }
98
99    pub fn create_sys_table(&mut self, sys_table: Arc<SystemTableCatalog>) {
100        self.system_table_by_name
101            .try_insert(sys_table.name.clone(), sys_table)
102            .unwrap();
103    }
104
105    pub fn create_sys_view(&mut self, sys_view: Arc<ViewCatalog>) {
106        self.view_by_name
107            .try_insert(sys_view.name().to_owned(), sys_view.clone())
108            .unwrap();
109        self.view_by_id
110            .try_insert(sys_view.id, sys_view.clone())
111            .unwrap();
112    }
113
114    pub fn update_table(&mut self, prost: &PbTable) -> Arc<TableCatalog> {
115        let name = prost.name.clone();
116        let id = prost.id.into();
117        let table: TableCatalog = prost.into();
118        let table_ref = Arc::new(table);
119
120        let old_table = self.table_by_id.get(&id).unwrap();
121        // check if the table name gets updated.
122        if old_table.name() != name
123            && let Some(t) = self.table_by_name.get(old_table.name())
124            && t.id == id
125        {
126            self.table_by_name.remove(old_table.name());
127        }
128
129        self.table_by_name.insert(name, table_ref.clone());
130        self.table_by_id.insert(id, table_ref.clone());
131        table_ref
132    }
133
134    pub fn update_index(&mut self, prost: &PbIndex) {
135        let name = prost.name.clone();
136        let id = prost.id.into();
137        let old_index = self.index_by_id.get(&id).unwrap();
138        let index_table = self
139            .get_created_table_by_id(&prost.index_table_id.into())
140            .unwrap();
141        let primary_table = self
142            .get_created_table_by_id(&prost.primary_table_id.into())
143            .unwrap();
144        let index: IndexCatalog = IndexCatalog::build_from(prost, index_table, primary_table);
145        let index_ref = Arc::new(index);
146
147        // check if the index name gets updated.
148        if old_index.name != name
149            && let Some(idx) = self.index_by_name.get(&old_index.name)
150            && idx.id == id
151        {
152            self.index_by_name.remove(&old_index.name);
153        }
154        self.index_by_name.insert(name, index_ref.clone());
155        self.index_by_id.insert(id, index_ref.clone());
156
157        match self.indexes_by_table_id.entry(index_ref.primary_table.id) {
158            Occupied(mut entry) => {
159                let pos = entry
160                    .get()
161                    .iter()
162                    .position(|x| x.id == index_ref.id)
163                    .unwrap();
164                *entry.get_mut().get_mut(pos).unwrap() = index_ref;
165            }
166            Vacant(_entry) => {
167                unreachable!()
168            }
169        };
170    }
171
172    pub fn drop_table(&mut self, id: TableId) {
173        if let Some(table_ref) = self.table_by_id.remove(&id) {
174            self.table_by_name.remove(&table_ref.name).unwrap();
175            self.indexes_by_table_id.remove(&table_ref.id);
176        } else {
177            tracing::warn!(
178                id = ?id.table_id,
179                "table not found when dropping, frontend might not be notified yet"
180            );
181        }
182    }
183
184    pub fn create_index(&mut self, prost: &PbIndex) {
185        let name = prost.name.clone();
186        let id = prost.id.into();
187        let index_table = self.get_table_by_id(&prost.index_table_id.into()).unwrap();
188        let primary_table = self
189            .get_created_table_by_id(&prost.primary_table_id.into())
190            .unwrap();
191        let index: IndexCatalog = IndexCatalog::build_from(prost, index_table, primary_table);
192        let index_ref = Arc::new(index);
193
194        self.index_by_name
195            .try_insert(name, index_ref.clone())
196            .unwrap();
197        self.index_by_id.try_insert(id, index_ref.clone()).unwrap();
198        match self.indexes_by_table_id.entry(index_ref.primary_table.id) {
199            Occupied(mut entry) => {
200                entry.get_mut().push(index_ref);
201            }
202            Vacant(entry) => {
203                entry.insert(vec![index_ref]);
204            }
205        };
206    }
207
208    pub fn drop_index(&mut self, id: IndexId) {
209        let index_ref = self.index_by_id.remove(&id).unwrap();
210        self.index_by_name.remove(&index_ref.name).unwrap();
211        match self.indexes_by_table_id.entry(index_ref.primary_table.id) {
212            Occupied(mut entry) => {
213                let pos = entry
214                    .get_mut()
215                    .iter()
216                    .position(|x| x.id == index_ref.id)
217                    .unwrap();
218                entry.get_mut().remove(pos);
219            }
220            Vacant(_entry) => (),
221        };
222    }
223
224    pub fn create_source(&mut self, prost: &PbSource) {
225        let name = prost.name.clone();
226        let id = prost.id;
227        let source = SourceCatalog::from(prost);
228        let source_ref = Arc::new(source);
229
230        if let Some(connection_id) = source_ref.connection_id {
231            self.connection_source_ref
232                .entry(connection_id)
233                .and_modify(|sources| sources.push(source_ref.id))
234                .or_insert(vec![source_ref.id]);
235        }
236
237        self.source_by_name
238            .try_insert(name, source_ref.clone())
239            .unwrap();
240        self.source_by_id.try_insert(id, source_ref).unwrap();
241    }
242
243    pub fn drop_source(&mut self, id: SourceId) {
244        let source_ref = self.source_by_id.remove(&id).unwrap();
245        self.source_by_name.remove(&source_ref.name).unwrap();
246        if let Some(connection_id) = source_ref.connection_id {
247            if let Occupied(mut e) = self.connection_source_ref.entry(connection_id) {
248                let source_ids = e.get_mut();
249                source_ids.retain_mut(|sid| *sid != id);
250                if source_ids.is_empty() {
251                    e.remove_entry();
252                }
253            }
254        }
255    }
256
257    pub fn update_source(&mut self, prost: &PbSource) {
258        let name = prost.name.clone();
259        let id = prost.id;
260        let source = SourceCatalog::from(prost);
261        let source_ref = Arc::new(source);
262
263        let old_source = self.source_by_id.get(&id).unwrap();
264        // check if the source name gets updated.
265        if old_source.name != name
266            && let Some(src) = self.source_by_name.get(&old_source.name)
267            && src.id == id
268        {
269            self.source_by_name.remove(&old_source.name);
270        }
271
272        self.source_by_name.insert(name, source_ref.clone());
273        self.source_by_id.insert(id, source_ref);
274    }
275
276    pub fn create_sink(&mut self, prost: &PbSink) {
277        let name = prost.name.clone();
278        let id = prost.id;
279        let sink = SinkCatalog::from(prost);
280        let sink_ref = Arc::new(sink);
281
282        if let Some(connection_id) = sink_ref.connection_id {
283            self.connection_sink_ref
284                .entry(connection_id.0)
285                .and_modify(|sinks| sinks.push(id))
286                .or_insert(vec![id]);
287        }
288
289        self.sink_by_name
290            .try_insert(name, sink_ref.clone())
291            .unwrap();
292        self.sink_by_id.try_insert(id, sink_ref).unwrap();
293    }
294
295    pub fn drop_sink(&mut self, id: SinkId) {
296        let sink_ref = self.sink_by_id.remove(&id).unwrap();
297        self.sink_by_name.remove(&sink_ref.name).unwrap();
298        if let Some(connection_id) = sink_ref.connection_id {
299            if let Occupied(mut e) = self.connection_sink_ref.entry(connection_id.0) {
300                let sink_ids = e.get_mut();
301                sink_ids.retain_mut(|sid| *sid != id);
302                if sink_ids.is_empty() {
303                    e.remove_entry();
304                }
305            }
306        }
307    }
308
309    pub fn update_sink(&mut self, prost: &PbSink) {
310        let name = prost.name.clone();
311        let id = prost.id;
312        let sink = SinkCatalog::from(prost);
313        let sink_ref = Arc::new(sink);
314
315        let old_sink = self.sink_by_id.get(&id).unwrap();
316        // check if the sink name gets updated.
317        if old_sink.name != name
318            && let Some(s) = self.sink_by_name.get(&old_sink.name)
319            && s.id.sink_id == id
320        {
321            self.sink_by_name.remove(&old_sink.name);
322        }
323
324        self.sink_by_name.insert(name, sink_ref.clone());
325        self.sink_by_id.insert(id, sink_ref);
326    }
327
328    pub fn create_subscription(&mut self, prost: &PbSubscription) {
329        let name = prost.name.clone();
330        let id = prost.id;
331        let subscription_catalog = SubscriptionCatalog::from(prost);
332        let subscription_ref = Arc::new(subscription_catalog);
333
334        self.subscription_by_name
335            .try_insert(name, subscription_ref.clone())
336            .unwrap();
337        self.subscription_by_id
338            .try_insert(id, subscription_ref)
339            .unwrap();
340    }
341
342    pub fn drop_subscription(&mut self, id: SubscriptionId) {
343        let subscription_ref = self.subscription_by_id.remove(&id).unwrap();
344        self.subscription_by_name
345            .remove(&subscription_ref.name)
346            .unwrap();
347    }
348
349    pub fn update_subscription(&mut self, prost: &PbSubscription) {
350        let name = prost.name.clone();
351        let id = prost.id;
352        let subscription = SubscriptionCatalog::from(prost);
353        let subscription_ref = Arc::new(subscription);
354
355        let old_subscription = self.subscription_by_id.get(&id).unwrap();
356        // check if the subscription name gets updated.
357        if old_subscription.name != name
358            && let Some(s) = self.subscription_by_name.get(&old_subscription.name)
359            && s.id.subscription_id == id
360        {
361            self.subscription_by_name.remove(&old_subscription.name);
362        }
363
364        self.subscription_by_name
365            .insert(name, subscription_ref.clone());
366        self.subscription_by_id.insert(id, subscription_ref);
367    }
368
369    pub fn create_view(&mut self, prost: &PbView) {
370        let name = prost.name.clone();
371        let id = prost.id;
372        let view = ViewCatalog::from(prost);
373        let view_ref = Arc::new(view);
374
375        self.view_by_name
376            .try_insert(name, view_ref.clone())
377            .unwrap();
378        self.view_by_id.try_insert(id, view_ref).unwrap();
379    }
380
381    pub fn drop_view(&mut self, id: ViewId) {
382        let view_ref = self.view_by_id.remove(&id).unwrap();
383        self.view_by_name.remove(&view_ref.name).unwrap();
384    }
385
386    pub fn update_view(&mut self, prost: &PbView) {
387        let name = prost.name.clone();
388        let id = prost.id;
389        let view = ViewCatalog::from(prost);
390        let view_ref = Arc::new(view);
391
392        let old_view = self.view_by_id.get(&id).unwrap();
393        // check if the view name gets updated.
394        if old_view.name != name
395            && let Some(v) = self.view_by_name.get(old_view.name())
396            && v.id == id
397        {
398            self.view_by_name.remove(&old_view.name);
399        }
400
401        self.view_by_name.insert(name, view_ref.clone());
402        self.view_by_id.insert(id, view_ref);
403    }
404
405    pub fn get_func_sign(func: &FunctionCatalog) -> FuncSign {
406        FuncSign {
407            name: FuncName::Udf(func.name.clone()),
408            inputs_type: func
409                .arg_types
410                .iter()
411                .map(|t| t.clone().into())
412                .collect_vec(),
413            variadic: false,
414            ret_type: func.return_type.clone().into(),
415            build: FuncBuilder::Udf,
416            // dummy type infer, will not use this result
417            type_infer: |_| Ok(DataType::Boolean),
418            deprecated: false,
419        }
420    }
421
422    pub fn create_function(&mut self, prost: &PbFunction) {
423        let name = prost.name.clone();
424        let id = prost.id;
425        let function = FunctionCatalog::from(prost);
426        let args = function.arg_types.clone();
427        let function_ref = Arc::new(function);
428
429        self.function_registry
430            .insert(Self::get_func_sign(&function_ref));
431        self.function_by_name
432            .entry(name)
433            .or_default()
434            .try_insert(args, function_ref.clone())
435            .expect("function already exists with same argument types");
436        self.function_by_id
437            .try_insert(id.into(), function_ref)
438            .expect("function id exists");
439    }
440
441    pub fn drop_function(&mut self, id: FunctionId) {
442        let function_ref = self
443            .function_by_id
444            .remove(&id)
445            .expect("function not found by id");
446
447        self.function_registry
448            .remove(Self::get_func_sign(&function_ref))
449            .expect("function not found in registry");
450
451        self.function_by_name
452            .get_mut(&function_ref.name)
453            .expect("function not found by name")
454            .remove(&function_ref.arg_types)
455            .expect("function not found by argument types");
456    }
457
458    pub fn update_function(&mut self, prost: &PbFunction) {
459        let name = prost.name.clone();
460        let id = prost.id.into();
461        let function = FunctionCatalog::from(prost);
462        let function_ref = Arc::new(function);
463
464        let old_function_by_id = self.function_by_id.get(&id).unwrap();
465        let old_function_by_name = self
466            .function_by_name
467            .get_mut(&old_function_by_id.name)
468            .unwrap();
469        // check if the function name gets updated.
470        if old_function_by_id.name != name
471            && let Some(f) = old_function_by_name.get(&old_function_by_id.arg_types)
472            && f.id == id
473        {
474            old_function_by_name.remove(&old_function_by_id.arg_types);
475            if old_function_by_name.is_empty() {
476                self.function_by_name.remove(&old_function_by_id.name);
477            }
478        }
479
480        self.function_by_name
481            .entry(name)
482            .or_default()
483            .insert(old_function_by_id.arg_types.clone(), function_ref.clone());
484        self.function_by_id.insert(id, function_ref);
485    }
486
487    pub fn create_connection(&mut self, prost: &PbConnection) {
488        let name = prost.name.clone();
489        let id = prost.id;
490        let connection = ConnectionCatalog::from(prost);
491        let connection_ref = Arc::new(connection);
492        self.connection_by_name
493            .try_insert(name, connection_ref.clone())
494            .unwrap();
495        self.connection_by_id
496            .try_insert(id, connection_ref)
497            .unwrap();
498    }
499
500    pub fn update_connection(&mut self, prost: &PbConnection) {
501        let name = prost.name.clone();
502        let id = prost.id;
503        let connection = ConnectionCatalog::from(prost);
504        let connection_ref = Arc::new(connection);
505
506        let old_connection = self.connection_by_id.get(&id).unwrap();
507        // check if the connection name gets updated.
508        if old_connection.name != name
509            && let Some(conn) = self.connection_by_name.get(&old_connection.name)
510            && conn.id == id
511        {
512            self.connection_by_name.remove(&old_connection.name);
513        }
514
515        self.connection_by_name.insert(name, connection_ref.clone());
516        self.connection_by_id.insert(id, connection_ref);
517    }
518
519    pub fn drop_connection(&mut self, connection_id: ConnectionId) {
520        let connection_ref = self
521            .connection_by_id
522            .remove(&connection_id)
523            .expect("connection not found by id");
524        self.connection_by_name
525            .remove(&connection_ref.name)
526            .expect("connection not found by name");
527    }
528
529    pub fn create_secret(&mut self, prost: &PbSecret) {
530        let name = prost.name.clone();
531        let id = SecretId::new(prost.id);
532        let secret = SecretCatalog::from(prost);
533        let secret_ref = Arc::new(secret);
534
535        self.secret_by_id
536            .try_insert(id, secret_ref.clone())
537            .unwrap();
538        self.secret_by_name
539            .try_insert(name, secret_ref.clone())
540            .unwrap();
541    }
542
543    pub fn update_secret(&mut self, prost: &PbSecret) {
544        let name = prost.name.clone();
545        let id = SecretId::new(prost.id);
546        let secret = SecretCatalog::from(prost);
547        let secret_ref = Arc::new(secret);
548
549        let old_secret = self.secret_by_id.get(&id).unwrap();
550        // check if the secret name gets updated.
551        if old_secret.name != name
552            && let Some(s) = self.secret_by_name.get(&old_secret.name)
553            && s.id == id
554        {
555            self.secret_by_name.remove(&old_secret.name);
556        }
557
558        self.secret_by_name.insert(name, secret_ref.clone());
559        self.secret_by_id.insert(id, secret_ref);
560    }
561
562    pub fn drop_secret(&mut self, secret_id: SecretId) {
563        let secret_ref = self
564            .secret_by_id
565            .remove(&secret_id)
566            .expect("secret not found by id");
567        self.secret_by_name
568            .remove(&secret_ref.name)
569            .expect("secret not found by name");
570    }
571
572    pub fn iter_all(&self) -> impl Iterator<Item = &Arc<TableCatalog>> {
573        self.table_by_name.values()
574    }
575
576    pub fn iter_user_table(&self) -> impl Iterator<Item = &Arc<TableCatalog>> {
577        self.table_by_name.values().filter(|v| v.is_user_table())
578    }
579
580    pub fn iter_user_table_with_acl<'a>(
581        &'a self,
582        user: &'a UserCatalog,
583    ) -> impl Iterator<Item = &'a Arc<TableCatalog>> {
584        self.table_by_name.values().filter(|v| {
585            v.is_user_table() && has_access_to_object(user, &self.name, v.id.table_id, v.owner)
586        })
587    }
588
589    pub fn iter_internal_table(&self) -> impl Iterator<Item = &Arc<TableCatalog>> {
590        self.table_by_name
591            .values()
592            .filter(|v| v.is_internal_table())
593    }
594
595    pub fn iter_internal_table_with_acl<'a>(
596        &'a self,
597        user: &'a UserCatalog,
598    ) -> impl Iterator<Item = &'a Arc<TableCatalog>> {
599        self.table_by_name.values().filter(|v| {
600            v.is_internal_table() && has_access_to_object(user, &self.name, v.id.table_id, v.owner)
601        })
602    }
603
604    /// Iterate all non-internal tables, including user tables, materialized views and indices.
605    pub fn iter_table_mv_indices(&self) -> impl Iterator<Item = &Arc<TableCatalog>> {
606        self.table_by_name
607            .values()
608            .filter(|v| !v.is_internal_table())
609    }
610
611    pub fn iter_table_mv_indices_with_acl<'a>(
612        &'a self,
613        user: &'a UserCatalog,
614    ) -> impl Iterator<Item = &'a Arc<TableCatalog>> {
615        self.table_by_name.values().filter(|v| {
616            !v.is_internal_table() && has_access_to_object(user, &self.name, v.id.table_id, v.owner)
617        })
618    }
619
620    /// Iterate all materialized views, excluding the indices.
621    pub fn iter_all_mvs(&self) -> impl Iterator<Item = &Arc<TableCatalog>> {
622        self.table_by_name.values().filter(|v| v.is_mview())
623    }
624
625    pub fn iter_all_mvs_with_acl<'a>(
626        &'a self,
627        user: &'a UserCatalog,
628    ) -> impl Iterator<Item = &'a Arc<TableCatalog>> {
629        self.table_by_name.values().filter(|v| {
630            v.is_mview() && has_access_to_object(user, &self.name, v.id.table_id, v.owner)
631        })
632    }
633
634    /// Iterate created materialized views, excluding the indices.
635    pub fn iter_created_mvs(&self) -> impl Iterator<Item = &Arc<TableCatalog>> {
636        self.table_by_name
637            .values()
638            .filter(|v| v.is_mview() && v.is_created())
639    }
640
641    pub fn iter_created_mvs_with_acl<'a>(
642        &'a self,
643        user: &'a UserCatalog,
644    ) -> impl Iterator<Item = &'a Arc<TableCatalog>> {
645        self.table_by_name.values().filter(|v| {
646            v.is_mview()
647                && v.is_created()
648                && has_access_to_object(user, &self.name, v.id.table_id, v.owner)
649        })
650    }
651
652    /// Iterate all indices
653    pub fn iter_index(&self) -> impl Iterator<Item = &Arc<IndexCatalog>> {
654        self.index_by_name.values()
655    }
656
657    pub fn iter_index_with_acl<'a>(
658        &'a self,
659        user: &'a UserCatalog,
660    ) -> impl Iterator<Item = &'a Arc<IndexCatalog>> {
661        self.index_by_name
662            .values()
663            .filter(|idx| has_access_to_object(user, &self.name, idx.id.index_id, idx.owner()))
664    }
665
666    /// Iterate all sources
667    pub fn iter_source(&self) -> impl Iterator<Item = &Arc<SourceCatalog>> {
668        self.source_by_name.values()
669    }
670
671    pub fn iter_source_with_acl<'a>(
672        &'a self,
673        user: &'a UserCatalog,
674    ) -> impl Iterator<Item = &'a Arc<SourceCatalog>> {
675        self.source_by_name
676            .values()
677            .filter(|s| has_access_to_object(user, &self.name, s.id, s.owner))
678    }
679
680    pub fn iter_sink(&self) -> impl Iterator<Item = &Arc<SinkCatalog>> {
681        self.sink_by_name.values()
682    }
683
684    pub fn iter_sink_with_acl<'a>(
685        &'a self,
686        user: &'a UserCatalog,
687    ) -> impl Iterator<Item = &'a Arc<SinkCatalog>> {
688        self.sink_by_name
689            .values()
690            .filter(|s| has_access_to_object(user, &self.name, s.id.sink_id, s.owner.user_id))
691    }
692
693    pub fn iter_subscription(&self) -> impl Iterator<Item = &Arc<SubscriptionCatalog>> {
694        self.subscription_by_name.values()
695    }
696
697    pub fn iter_subscription_with_acl<'a>(
698        &'a self,
699        user: &'a UserCatalog,
700    ) -> impl Iterator<Item = &'a Arc<SubscriptionCatalog>> {
701        self.subscription_by_name.values().filter(|s| {
702            has_access_to_object(user, &self.name, s.id.subscription_id, s.owner.user_id)
703        })
704    }
705
706    pub fn iter_view(&self) -> impl Iterator<Item = &Arc<ViewCatalog>> {
707        self.view_by_name.values()
708    }
709
710    pub fn iter_view_with_acl<'a>(
711        &'a self,
712        user: &'a UserCatalog,
713    ) -> impl Iterator<Item = &'a Arc<ViewCatalog>> {
714        self.view_by_name
715            .values()
716            .filter(|v| v.is_system_view() || has_access_to_object(user, &self.name, v.id, v.owner))
717    }
718
719    pub fn iter_function(&self) -> impl Iterator<Item = &Arc<FunctionCatalog>> {
720        self.function_by_name.values().flat_map(|v| v.values())
721    }
722
723    pub fn iter_connections(&self) -> impl Iterator<Item = &Arc<ConnectionCatalog>> {
724        self.connection_by_name.values()
725    }
726
727    pub fn iter_secret(&self) -> impl Iterator<Item = &Arc<SecretCatalog>> {
728        self.secret_by_name.values()
729    }
730
731    pub fn iter_system_tables(&self) -> impl Iterator<Item = &Arc<SystemTableCatalog>> {
732        self.system_table_by_name.values()
733    }
734
735    pub fn get_table_by_name(&self, table_name: &str) -> Option<&Arc<TableCatalog>> {
736        self.table_by_name.get(table_name)
737    }
738
739    pub fn get_created_table_by_name(&self, table_name: &str) -> Option<&Arc<TableCatalog>> {
740        self.table_by_name
741            .get(table_name)
742            .filter(|&table| table.stream_job_status == StreamJobStatus::Created)
743    }
744
745    /// Get a table by name, if it's a created table,
746    /// or if it's an internal table (whether created or not).
747    pub fn get_created_table_or_any_internal_table_by_name(
748        &self,
749        table_name: &str,
750    ) -> Option<&Arc<TableCatalog>> {
751        self.table_by_name.get(table_name).filter(|&table| {
752            table.stream_job_status == StreamJobStatus::Created || table.is_internal_table()
753        })
754    }
755
756    pub fn get_table_by_id(&self, table_id: &TableId) -> Option<&Arc<TableCatalog>> {
757        self.table_by_id.get(table_id)
758    }
759
760    pub fn get_created_table_by_id(&self, table_id: &TableId) -> Option<&Arc<TableCatalog>> {
761        self.table_by_id
762            .get(table_id)
763            .filter(|&table| table.stream_job_status == StreamJobStatus::Created)
764    }
765
766    pub fn get_view_by_name(&self, view_name: &str) -> Option<&Arc<ViewCatalog>> {
767        self.view_by_name.get(view_name)
768    }
769
770    pub fn get_view_by_id(&self, view_id: &ViewId) -> Option<&Arc<ViewCatalog>> {
771        self.view_by_id.get(view_id)
772    }
773
774    pub fn get_source_by_name(&self, source_name: &str) -> Option<&Arc<SourceCatalog>> {
775        self.source_by_name.get(source_name)
776    }
777
778    pub fn get_source_by_id(&self, source_id: &SourceId) -> Option<&Arc<SourceCatalog>> {
779        self.source_by_id.get(source_id)
780    }
781
782    pub fn get_sink_by_name(&self, sink_name: &str) -> Option<&Arc<SinkCatalog>> {
783        self.sink_by_name.get(sink_name)
784    }
785
786    pub fn get_sink_by_id(&self, sink_id: &SinkId) -> Option<&Arc<SinkCatalog>> {
787        self.sink_by_id.get(sink_id)
788    }
789
790    pub fn get_subscription_by_name(
791        &self,
792        subscription_name: &str,
793    ) -> Option<&Arc<SubscriptionCatalog>> {
794        self.subscription_by_name.get(subscription_name)
795    }
796
797    pub fn get_subscription_by_id(
798        &self,
799        subscription_id: &SubscriptionId,
800    ) -> Option<&Arc<SubscriptionCatalog>> {
801        self.subscription_by_id.get(subscription_id)
802    }
803
804    pub fn get_index_by_name(&self, index_name: &str) -> Option<&Arc<IndexCatalog>> {
805        self.index_by_name.get(index_name)
806    }
807
808    pub fn get_index_by_id(&self, index_id: &IndexId) -> Option<&Arc<IndexCatalog>> {
809        self.index_by_id.get(index_id)
810    }
811
812    pub fn get_indexes_by_table_id(&self, table_id: &TableId) -> Vec<Arc<IndexCatalog>> {
813        self.indexes_by_table_id
814            .get(table_id)
815            .cloned()
816            .unwrap_or_default()
817    }
818
819    pub fn get_system_table_by_name(&self, table_name: &str) -> Option<&Arc<SystemTableCatalog>> {
820        self.system_table_by_name.get(table_name)
821    }
822
823    pub fn get_table_name_by_id(&self, table_id: TableId) -> Option<String> {
824        self.table_by_id
825            .get(&table_id)
826            .map(|table| table.name.clone())
827    }
828
829    pub fn get_function_by_id(&self, function_id: FunctionId) -> Option<&Arc<FunctionCatalog>> {
830        self.function_by_id.get(&function_id)
831    }
832
833    pub fn get_function_by_name_inputs(
834        &self,
835        name: &str,
836        inputs: &mut [ExprImpl],
837    ) -> Option<&Arc<FunctionCatalog>> {
838        infer_type_with_sigmap(
839            FuncName::Udf(name.to_owned()),
840            inputs,
841            &self.function_registry,
842        )
843        .ok()?;
844        let args = inputs.iter().map(|x| x.return_type()).collect_vec();
845        self.function_by_name.get(name)?.get(&args)
846    }
847
848    pub fn get_function_by_name_args(
849        &self,
850        name: &str,
851        args: &[DataType],
852    ) -> Option<&Arc<FunctionCatalog>> {
853        let args = args.iter().map(|x| Some(x.clone())).collect_vec();
854        let func = infer_type_name(
855            &self.function_registry,
856            FuncName::Udf(name.to_owned()),
857            &args,
858        )
859        .ok()?;
860
861        let args = func
862            .inputs_type
863            .iter()
864            .filter_map(|x| {
865                if let SigDataType::Exact(t) = x {
866                    Some(t.clone())
867                } else {
868                    None
869                }
870            })
871            .collect_vec();
872
873        self.function_by_name.get(name)?.get(&args)
874    }
875
876    pub fn get_functions_by_name(&self, name: &str) -> Option<Vec<&Arc<FunctionCatalog>>> {
877        let functions = self.function_by_name.get(name)?;
878        if functions.is_empty() {
879            return None;
880        }
881        Some(functions.values().collect())
882    }
883
884    pub fn get_connection_by_id(
885        &self,
886        connection_id: &ConnectionId,
887    ) -> Option<&Arc<ConnectionCatalog>> {
888        self.connection_by_id.get(connection_id)
889    }
890
891    pub fn get_connection_by_name(&self, connection_name: &str) -> Option<&Arc<ConnectionCatalog>> {
892        self.connection_by_name.get(connection_name)
893    }
894
895    pub fn get_secret_by_name(&self, secret_name: &str) -> Option<&Arc<SecretCatalog>> {
896        self.secret_by_name.get(secret_name)
897    }
898
899    pub fn get_secret_by_id(&self, secret_id: &SecretId) -> Option<&Arc<SecretCatalog>> {
900        self.secret_by_id.get(secret_id)
901    }
902
903    /// get all sources referencing the connection
904    pub fn get_source_ids_by_connection(
905        &self,
906        connection_id: ConnectionId,
907    ) -> Option<Vec<SourceId>> {
908        self.connection_source_ref
909            .get(&connection_id)
910            .map(|c| c.to_owned())
911    }
912
913    /// get all sinks referencing the connection
914    pub fn get_sink_ids_by_connection(&self, connection_id: ConnectionId) -> Option<Vec<SinkId>> {
915        self.connection_sink_ref
916            .get(&connection_id)
917            .map(|s| s.to_owned())
918    }
919
920    pub fn get_grant_object_by_oid(&self, oid: u32) -> Option<Object> {
921        #[allow(clippy::manual_map)]
922        if self.get_created_table_by_id(&TableId::new(oid)).is_some()
923            || self.get_index_by_id(&IndexId::new(oid)).is_some()
924        {
925            Some(Object::TableId(oid))
926        } else if self.get_source_by_id(&oid).is_some() {
927            Some(Object::SourceId(oid))
928        } else if self.get_sink_by_id(&oid).is_some() {
929            Some(Object::SinkId(oid))
930        } else if self.get_view_by_id(&oid).is_some() {
931            Some(Object::ViewId(oid))
932        } else {
933            None
934        }
935    }
936
937    pub fn contains_object(&self, oid: u32) -> bool {
938        self.table_by_id.contains_key(&TableId::new(oid))
939            || self.index_by_id.contains_key(&IndexId::new(oid))
940            || self.source_by_id.contains_key(&oid)
941            || self.sink_by_id.contains_key(&oid)
942            || self.view_by_id.contains_key(&oid)
943            || self.function_by_id.contains_key(&FunctionId::new(oid))
944            || self.subscription_by_id.contains_key(&oid)
945            || self.connection_by_id.contains_key(&oid)
946    }
947
948    pub fn id(&self) -> SchemaId {
949        self.id
950    }
951
952    pub fn database_id(&self) -> DatabaseId {
953        self.database_id
954    }
955
956    pub fn name(&self) -> String {
957        self.name.clone()
958    }
959}
960
961impl OwnedByUserCatalog for SchemaCatalog {
962    fn owner(&self) -> UserId {
963        self.owner
964    }
965}
966
967impl From<&PbSchema> for SchemaCatalog {
968    fn from(schema: &PbSchema) -> Self {
969        Self {
970            id: schema.id,
971            owner: schema.owner,
972            name: schema.name.clone(),
973            database_id: schema.database_id,
974            table_by_name: HashMap::new(),
975            table_by_id: HashMap::new(),
976            source_by_name: HashMap::new(),
977            source_by_id: HashMap::new(),
978            sink_by_name: HashMap::new(),
979            sink_by_id: HashMap::new(),
980            index_by_name: HashMap::new(),
981            index_by_id: HashMap::new(),
982            indexes_by_table_id: HashMap::new(),
983            system_table_by_name: HashMap::new(),
984            view_by_name: HashMap::new(),
985            view_by_id: HashMap::new(),
986            function_registry: FunctionRegistry::default(),
987            function_by_name: HashMap::new(),
988            function_by_id: HashMap::new(),
989            connection_by_name: HashMap::new(),
990            connection_by_id: HashMap::new(),
991            secret_by_name: HashMap::new(),
992            secret_by_id: HashMap::new(),
993            _secret_source_ref: HashMap::new(),
994            _secret_sink_ref: HashMap::new(),
995            connection_source_ref: HashMap::new(),
996            connection_sink_ref: HashMap::new(),
997            subscription_by_name: HashMap::new(),
998            subscription_by_id: HashMap::new(),
999        }
1000    }
1001}